[-] json perms

[+] yaml perms
[+] Check connections
[~] Move funcs to modules
[+] Mod for pyinstaller
[+] Autogenerate configs
This commit is contained in:
2024-01-12 17:33:55 +03:00
parent c7056a29f8
commit 032f2bd06f
4 changed files with 256 additions and 88 deletions

5
requirements.txt Normal file
View File

@@ -0,0 +1,5 @@
mcstatus~=11.1.1
mcrcon~=0.7.0
vk~=3.0
ruamel.yaml~=0.18.5
requests~=2.31.0

View File

@@ -1,88 +1,20 @@
import re
import sys
import json
import traceback
from datetime import datetime
import requests
import vk
from mcrcon import MCRcon
def log(text, lvl=0):
print(f"[{datetime.now()}] [{['INFO ', 'ERROR'][lvl]}] {text}")
log("Starting..")
with open('config-t.json' if "-t" in sys.argv else 'config.json') as f:
config = json.load(f)
with open('help_message.txt') as f:
help_message = f.read()
host = config['rcon']['host']
port = config['rcon']['port']
password = config['rcon']['password']
def rcon(cmd):
try:
with MCRcon(host, password, port) as mcr:
text = mcr.command(cmd)
return re.sub(r'§.', '', text)
except Exception as e:
log(f"RCON ERROR with command: {cmd}", 1)
print(traceback.format_exc())
return f"Rcon error: {e}"
class Permissions:
def __init__(self, permission_file):
self.permission_file = permission_file
self._raw_file = {}
self._members = {}
self._parse_file()
def _parse_file(self):
with open(self.permission_file) as pf:
self._raw_file = json.load(pf)
for role, role_data in self._raw_file.items():
members = role_data.get("members", [])
allow = role_data.get("allow", [])
for member in members:
self._members[member] = {
"role": role,
"allow": allow
}
def is_allow(self, vk_id, cmd):
u = self._members.get(vk_id)
if u is not None:
role = u['role']
allow = u['allow']
if allow is True:
return True, role
elif cmd in allow:
return True, role
return False, role
return False, None
def get_role(self, vk_id):
u = self._members.get(vk_id)
if u is not None:
return u['role']
return None
from modules import log, config, rcon, perms, get_server_status
class Bot:
def __init__(self, perms: Permissions):
self.vk = vk.API(access_token=config['vk']['token'], v=5.131)
def __init__(self):
self.vk = vk.API(access_token=config.vk.token, v=5.199)
self.group_id = vk.groups.getById()[0]['id']
with open('help_message.txt') as f:
self.help_message = f.read()
log(f"Group id: {self.group_id}")
self.perms = perms
def get_lp_server(self):
lp = vk.groups.getLongPollServer(group_id=self.group_id)
@@ -100,7 +32,7 @@ class Bot:
vk.messages.send(message=message, peer_id=peer_id, random_id=0)
def rcon_cmd_handle(self, cmd, from_id, peer_id, _write=True, _allow=False):
a, r = self.perms.is_allow(from_id, cmd.split()[0])
a, r = perms.is_allowed(from_id, cmd.split()[0])
if _allow:
r = cmd
if a or _allow:
@@ -117,15 +49,16 @@ class Bot:
from_id = message['from_id']
peer_id = message['peer_id']
text = message['text']
if text.startswith(".rcon "):
self.rcon_cmd_handle(text[6:], from_id, peer_id)
if text == "!help":
self.write(peer_id, help_message)
elif text == "!online":
text = self.rcon_cmd_handle('list', from_id, peer_id, False, True).replace("\n", "")
self.write(peer_id, text)
elif text == "!id":
self.write(peer_id, f"Твой ID: {from_id}\nРоль в боте: {self.perms.get_role(from_id) or 'Отсутствует'}")
match text:
case i if i.startwith(".rcon "):
self.rcon_cmd_handle(i[6:], from_id, peer_id)
case "!help":
self.write(peer_id, self.help_message)
case "!online":
online = get_server_status().online
self.write(peer_id, f"На сервере сейчас {online} {""}")
case "!id":
self.write(peer_id, f"Твой ID: {from_id}\nРоль: {perms.get_role(from_id)}")
def listen(self):
server, key, ts = self.get_lp_server()
@@ -144,15 +77,38 @@ class Bot:
except KeyboardInterrupt:
print('\nExiting...')
exit(0)
sys.exit(1)
except Exception as e:
ts = lp.get('ts')
print(f"Found exception: {e}")
print(traceback.format_exc())
traceback.print_exc()
if __name__ == '__main__':
_perms = Permissions(config['permission_file'])
bot = Bot(_perms)
bot.listen()
if not config.vk.token:
log("Токен ВК не найден.\nВыход...", 1)
input("\n\nНажмите Enter для продолжения..")
sys.exit(1)
try:
bot = Bot()
try:
# Test RCON
bot.rcon_cmd_handle("list", 0, 0, False, True)
log("RCON работает.")
except Exception as e:
log("RCON не отвечает. Проверьте блок \"rcon\" с config.json", 1)
raise e
try:
# Test Minecraft Server
log(f"Проверка сервера. Онлайн: {get_server_status().online}")
except Exception as e:
log("Сервер не отвечает. Проверьте блок \"minecraft\" с config.json", 1)
raise e
bot.listen()
except Exception as e:
log(f"Exception: {e}", 1)
traceback.print_exc()
finally:
log("Выход..")
input("\n\nНажмите Enter для продолжения..")

85
src/modules/__init__.py Normal file
View File

@@ -0,0 +1,85 @@
import json
import os
import re
import traceback
from collections import namedtuple
from datetime import datetime
from pathlib import Path
from mcrcon import MCRcon
from .perms import Permissions
def log(text, lvl=0):
print(f"[{datetime.now()}] [{['INFO ', 'ERROR'][lvl]}] {text}")
raw_config = """\
{
"vk": {
"token": "",
"help_file": "help_message.txt"
},
"rcon": {
"host": "127.0.0.1",
"port": 25575,
"password": "P@ssw0rd"
},
"minecraft": {
"host": "127.0.0.1",
"port": 25565,
"java": true
},
"permissions_file": "permissions.yml"
}"""
raw_help = """\
Тебе не нужна помощь, ты и так беспомощный, кожаный ублюдок. Так уж и быть, подскажу пару команд...
!help - Вывести это сообщение.
!online - Показать текущий онлайн на сервере.
Бот сделан кожанным петухом - админом, все вопросы к нему, я не причём.
"""
if not os.path.exists("config.json"):
log("Generating: config.json...")
with open("config.json", "w") as f:
f.write(raw_config)
with open('config.json') as f:
config = json.load(f, object_hook=lambda x: namedtuple('X', x.keys())(*x.values()))
log("Starting..")
if not os.path.exists(config.vk.help_file):
log(f"Generating: {config.vk.help_file}...")
with open(config.vk.help_file, "w") as f:
f.write(raw_help)
if config.minecraft.java:
from mcstatus import JavaServer as mcs
else:
from mcstatus import BedrockServer as mcs
host = config.rcon.host
port = config.rcon.port
password = config.rcon.password
def rcon(cmd):
try:
with MCRcon(host, password, port) as mcr:
text = mcr.command(cmd)
return re.sub(r'§.', '', text)
except Exception as e:
log(f"RCON ERROR with command: {cmd}", 1)
print(traceback.format_exc())
return f"Rcon error: {e}"
def get_server_status():
server = mcs.lookup(config.minecraft.host, config.minecraft.port)
return server.status()
Permissions.perm_file = Path(config.permissions_file)
perms = Permissions.load()

122
src/modules/perms.py Normal file
View File

@@ -0,0 +1,122 @@
import os.path
from datetime import datetime
from pathlib import Path
from ruamel.yaml import YAML
def log(text, lvl=0):
print(f"[{datetime.now()}] [{['INFO ', 'ERROR'][lvl]}] {text}")
yaml = YAML()
yaml.default_flow_style = False
class Permissions:
perm_file = Path("permissions.yml")
def __init__(self, **kwargs):
self._no_role = kwargs.get("noRole")
self.no_rights = kwargs.get("noRights")
self._perms = kwargs['perms']
self._members = {}
if kwargs['useLuckPerms']:
log("[PERMS] Using LuckPerms mode")
log("[PERMS] LuckPerms mode support still in development")
sys.exit(1)
self._luck_perms = kwargs['LuckPerms']
log("[PERMS] Permissions loaded")
self.__handle_members()
def __handle_members(self):
for role, role_data in self._perms.items():
members = role_data.get("ids", [])
allow = role_data.get("allow", [])
allow = set(allow)
if len(allow) > 0:
for member in members:
if member in self._members:
self._members[member]["allow"] |= allow
else:
self._members[member] = {
"role": role,
"friendly": role_data.get("name", role),
"allow": allow
}
def is_allowed(self, member, cmd):
u = self._members.get(member)
if u:
friendly = u['friendly']
allow = u['allow']
if ("*" in allow) or (cmd in allow):
return True, friendly
return False, friendly
return False, self._no_role
def get_role(self, member):
u = self._members.get(member)
if u:
return u['friendly']
return self._no_role
@classmethod
def load(cls):
if os.path.exists(cls.perm_file):
data = yaml.load(cls.perm_file)
if not data:
os.remove(cls.perm_file)
return Permissions.load()
else:
log(f"Generating permissions file: {cls.perm_file}")
import textwrap
raw = textwrap.dedent("""\
noRole: Нет роли
noRights: Нет прав # null для отключения
perms:
admins: # Имя группы
name: Админ # Имя группы, которое будет отображаться в боте
ids: # вк ИД входящих в состав группы
- 370926160
allow: # Какие команды разрешены, "*" - все
- '*'
# Пример настройки
helpers:
name: Хелпер
ids:
- 583018016
allow:
- say
- mute
- warn
# Находится в режиме тестирования
# Интеграция с базой данных LuckPerms (Нужна именно внешняя база данных)
useLuckPerms: false
LuckPerms:
# Смотрите настройку LuckPerms
server: global
# Разрешенные варианты: MySQL, MariaDB, PostgreSQL
storage-method: PostgreSQL
data:
# Указывайте host:port
address: 127.0.0.1:5432
# База данных в которой хранятся настройки LuckPerms
database: minecraftDB
# Логин и пароль для доступа к БД
username: user
password: user
# Смотрите настройку LuckPerms
table-prefix: luckperms_
""")
data = yaml.load(raw)
with open(cls.perm_file, mode="w", encoding="utf-8") as f:
yaml.dump(data, f)
return Permissions(**data)
if __name__ == '__main__':
perms = Permissions.load()