[+] Permission System (into bot)

[+] MultiServer System
[+] (bot) perms_handler
[+] (hosts) new api
[+] IN_DOCKER
[+] Signals handlers
[+] easydict
[~] Version 1.3.0 -> 2.0.0
[~] (perms) Update is_allow
This commit is contained in:
Maxim Khomutov 2024-01-17 05:59:24 +03:00
parent c01ae2d812
commit 2237582f80
6 changed files with 249 additions and 175 deletions

View File

@ -3,4 +3,5 @@ mcrcon~=0.7.0
vk~=3.0
ruamel.yaml~=0.18.5
requests~=2.31.0
loguru~=0.7.2
loguru~=0.7.2
easydict~=1.11

View File

@ -1,7 +1,9 @@
import sys
from pathlib import Path
import requests
import vk
from easydict import EasyDict
from loguru import logger
import modules
@ -24,7 +26,7 @@ class Bot:
logger.info(f"[BOT] ID группы: {self.group_id}")
def _test(self):
Permissions.perm_file = Path(config["perms_file"])
Permissions.perms_file = Path(config["perms_file"])
self.perms = Permissions.load()
self.hosts = Hosts.load()
# Check token
@ -47,142 +49,164 @@ class Bot:
else:
self.vk.messages.send(message=message, peer_id=peer_id, random_id=0)
def _handle_rcon(self, message, _write=True, allow=False):
"""Проверка прав и выполнение RCON команды"""
from_id = message['from_id']
peer_id = message['peer_id']
text = message['text']
logger.info(f"[BOT] {peer_id}:{from_id}:{text}")
tsplit = text.split(" ")
if allow:
role = "console"
else:
if tsplit[1] in self.hosts.hosts:
props = {"cmd": " ".join(tsplit[2:]), "server": tsplit[1]}
else:
props = {"cmd": " ".join(tsplit[1:])}
allow, role = self.perms.is_allowed(from_id, props['cmd'])
if allow:
answer, _ = self.hosts.rcon(**props)
if not answer:
answer = "Выполнено без ответа."
logger.info(f"[BOT] User: {from_id}({role}) in Chat: {peer_id} use RCON cmd: \"{props['cmd']}\", "
f"with answer: \"{answer}\"")
if _write:
self.write(peer_id, ("" if not props.get("server") else f"Ответ от {self.hosts._hosts_meta[props["server"]].get("name", props["server"])}:\n") + answer)
else:
return answer
else:
logger.info(f"[BOT] User: {from_id}({role}) in Chat: {peer_id} no have rights RCON cmd: \"{props['cmd']}\".")
if self.perms.no_rights: # Если есть текст
self.write(peer_id, self.perms.no_rights)
def _handle_bot(self, message, **_):
cmds = ("Доступные команды:\n"
" .bot help - Вывести это сообщение.\n"
" .bot info - Выводит краткую информацию о боте.\n"
" .bot hosts list - Список доступных хостов\n"
" .bot hosts reload - Перезагружает hosts.yml\n"
# " .bot perms user [add | del] <group> - (WIP) \n"
# " .bot perms list - (WIP) Выводит список групп \n"
" .bot perms reload - Перезагружает permissions.yml")
tsplit = message.text.split(" ")
if len(tsplit) == 1:
if not message.has_perm("bot.help"): return
message.reply(cmds)
return
match tsplit[1]:
case "hosts":
if not message.has_perm(["bot.hosts", "bot.hosts.*", "bot.hosts.reload", "bot.hosts.list"]): return
match tsplit[2] if len(tsplit) > 2 else None:
case "list":
if not message.has_perm(["bot.hosts.*", "bot.hosts.list"]): return
s = ""
for host in self.hosts.hosts:
ping = 0
r, e = self.hosts.mine(host, True)
if not e:
ping = r.latency
_, e = self.hosts.rcon("list", host, True)
meta = self.hosts._hosts_meta[host]
name = meta.get("name")
rcon_ok = meta.get('rcon_ok')
mine_ok = meta.get('mine_ok')
if (not rcon_ok and meta['rcon'] > 0) or (not mine_ok and meta['online'] > 0):
s += f"\nㅤ⛔ {host} ({name})"
else:
s += f"\nㅤ✅ {host} ({name})"
if "-a" in tsplit or "--all" in tsplit:
# noinspection SpellCheckingInspection
s += (f":\n"
f"important: {meta['important']}\n"
f"rcon_default: {meta['rcon'] == 1}\n"
f"mine_default: {meta['online'] == 1}\n"
f"rcon: {not bool(e)}\n"
f"ping: {ping:.4f}ms\n"
f"rcon_ok: {rcon_ok}\n"
f"mine_ok: {mine_ok}")
message.reply("Список хостов:" + s)
case "reload":
if not message.has_perm(["bot.hosts.*", "bot.hosts.reload"]): return
self.hosts = Hosts.load()
message.reply("hosts.yml - Загружен")
case _:
message.reply(".bot hosts [list | reload]")
case "perms":
if not message.has_perm(["bot.perms", "bot.perms.*", "bot.perms.reload"]): return
match tsplit[2] if len(tsplit) > 2 else None:
case "reload":
if not message.has_perm(["bot.perms.*", "bot.perms.reload"]): return
self.perms = Permissions.load()
message.reply("permissions.yml - Загружен")
case _:
message.reply(".bot perms [reload]")
case "info":
if not message.has_perm(["bot.info"]): return
message.reply(f"RconVkBot\n"
f"Версия бота: {modules.__version__}, последняя: {not is_new_version}")
case _:
if not message.has_perm(["bot.help"]): return
message.reply(cmds)
def _handle_bot(self, message):
from_id = message['from_id']
if self.perms.is_allowed(from_id, "bot"):
peer_id = message['peer_id']
text = message['text']
logger.info(f"[BOT] {peer_id}:{from_id}:{text}")
tsplit = text.split(" ")
cmds = ("Доступные команды:\n"
" .bot help - Вывести это сообщение.\n"
" .bot info - Выводит краткую информацию о боте.\n"
" .bot hosts list - Список доступных хостов\n"
" .bot hosts reload - Перезагружает hosts.yml\n"
# " .bot perms user [add | del] <group> - не реализовано \n"
# " .bot perms list - Выводит список групп \n"
" .bot perms reload - Перезагружает permissions.yml")
if len(tsplit) == 1:
self.write(peer_id, cmds)
return
match tsplit[1]:
case "hosts":
match tsplit[2] if len(tsplit) > 2 else None:
case "list":
s = ""
for host in self.hosts.hosts:
ping = 0
r, e = self.hosts.mine(host, True)
if not e:
ping = r.latency
_, e = self.hosts.rcon("list", host, True)
meta = self.hosts._hosts_meta[host]
name = meta.get("name")
rcon_ok = meta.get('rcon_ok')
mine_ok = meta.get('mine_ok')
if (not rcon_ok and meta['rcon'] > 0) or (not mine_ok and meta['online'] > 0):
s += f"\nㅤ⛔ {host} ({name})"
else:
s += f"\nㅤ✅ {host} ({name})"
if "-a" in tsplit or "--all" in tsplit:
# noinspection SpellCheckingInspection
s += (f":\n"
f"important: {meta['important']}\n"
f"rcon_default: {meta['rcon'] == 1}\n"
f"mine_default: {meta['online'] == 1}\n"
f"rcon: {not bool(e)}\n"
f"ping: {ping:.4f}ms\n"
f"rcon_ok: {rcon_ok}\n"
f"mine_ok: {mine_ok}")
self.write(peer_id, "Список хостов:" + s)
case "reload":
self.hosts = Hosts.load()
self.write(peer_id, "hosts.yml - Загружен")
case _:
self.write(peer_id, ".bot hosts [list | reload]")
case "perms":
match tsplit[2] if len(tsplit) > 2 else None:
case "reload":
self.perms = Permissions.load()
self.write(peer_id, "permissions.yml - Загружен")
case _:
self.write(peer_id, ".bot perms [reload]")
case "info":
self.write(peer_id, f"RconVkBot\n"
f"Версия бота: {modules.__version__}, последняя: {not is_new_version}")
case _:
self.write(peer_id, cmds)
def _handle_rcon(self, message, role, host, text, _write=True):
"""Проверка прав и выполнение RCON команды"""
if len(text) == 0: return
cmd = text.split(" ")[0]
if not message.has_perm(["bot.rcon.*.*", f"bot.rcon.*.{cmd}", f"bot.rcon.{host}.*", f"bot.rcon.{host}.{cmd}"]):
return
answer, _ = self.hosts.rcon(text, host)
if not answer:
answer = "Выполнено без ответа."
logger.info(f"[BOT] User: {message['from_id']}({role}) in Chat: {message.peer_id} use RCON cmd: \"{text}\", "
f"with answer: \"{answer}\"")
message.reply(("" if host == "default" else f"Ответ от {self.hosts.get_name(host)}:\n") + answer)
def _handle_online(self, message, host, **_):
server, _ = self.hosts.mine(host)
players = server.players
message.reply(f"На сервере сейчас {players.online}/{players.max}")
def _perm_handler(self, message, perms: list | str, func: callable):
from_id = message.from_id
peer_id = message.peer_id
message.has_perm = lambda x: self.perms.is_allowed(from_id, x)[0]
if isinstance(perms, str):
perms = [perms]
host, text = self.hosts.parse_host(message['text'])
for i, V in enumerate(perms):
perms[i] = V.format(host=host)
allow, role = self.perms.is_allowed(from_id, perms)
logger.info(f"[BOT] {host}:{peer_id}:{from_id}:{self.perms.get_role(from_id, True)} {message['text']}")
if allow:
func(message=message, role=role, host=host, text=text)
else:
if self.perms.no_rights: # Если есть текст
message.reply(self.perms.no_rights)
def message_handle(self, message):
from_id = message['from_id']
peer_id = message['peer_id']
text = message['text']
match text:
case i if i.startswith(".rcon "):
self._handle_rcon(message)
case i if i.startswith(".bot"):
self._handle_bot(message)
from_id = message.from_id
peer_id = message.peer_id
message.reply = lambda text: self.write(peer_id, text)
sw = lambda t, x: t.startswith(x)
match message.text:
case i if sw(i, ".bot"):
perms = [
"bot.help", "bot.info",
"bot.perms", "bot.perms.*", "bot.perms.reload",
"bot.hosts", "bot.hosts.*", "bot.hosts.reload", "bot.hosts.list"
]
self._perm_handler(message, perms, self._handle_bot)
case i if sw(i, ".rcon "):
perms = ["bot.rcon.*", "bot.rcon.{host}"]
self._perm_handler(message, perms, self._handle_rcon)
case "!help":
logger.info(f"[BOT] {peer_id}:{from_id}:{text}")
self.write(peer_id, self.help_message)
case "!online":
logger.info(f"[BOT] {peer_id}:{from_id}:{text}")
server, _ = self.hosts.mine()
players = server.players
self.write(peer_id, f"На сервере сейчас {players.online}/{players.max}")
perms = ["bot.help"]
self._perm_handler(message, perms, lambda **_: self.write(peer_id, self.help_message))
case i if sw(i, "!online"):
perms = ["bot.online.*", "bot.online.{host}"]
self._perm_handler(message, perms, self._handle_online)
case "!id":
logger.info(f"[BOT] {peer_id}:{from_id}:{text}")
self.write(peer_id,
f"Твой ID: {from_id}\n"
f"Роль: {self.perms.get_role(from_id)}\n"
f"Ник: {self.perms.get_nick(from_id)}")
def __id(**_):
self.write(peer_id, ""
f"Твой ID: {from_id}\n"
f"Роль: {self.perms.get_role(from_id)}\n"
f"Ник: {self.perms.get_nick(from_id)}")
self._perm_handler(message, "bot.id", __id)
def listen(self):
server, key, ts = self.get_lp_server()
session = requests.Session()
logger.info("[BOT] Начинаю получать сообщения..")
logger.info("[BOT] {host}:{chat_id}:{user_id}:{role} {text}")
while True:
lp = requests.get(f'{server}?act=a_check&key={key}&ts={ts}&wait=25').json()
lp = session.get(f'{server}?act=a_check&key={key}&ts={ts}&wait=3').json()
try:
if lp.get('failed') is not None:
key = self.get_lp_server()[1]
if ts != lp.get('ts') and lp.get('updates'):
updates = lp['updates'][0]
if updates['type'] == "message_new":
self.message_handle(updates['object']['message'])
# noinspection PyTypeChecker
self.message_handle(EasyDict(**updates['object']['message']))
ts = lp.get('ts')
except Exception as i:
ts = lp.get('ts')
logger.exception(i)
def stop(self):
self.hosts.unload()
def stop(self, signum=-1, frame=None):
logger.debug(f"{signum=} {frame=}")
if signum == -1:
logger.info("Выход.")
self.hosts.unload()
sys.exit(0)

View File

@ -60,6 +60,21 @@ class Hosts:
self._hosts_meta["connected"] = False
return None, e
def parse_host(self, s: str | list, index: int = 1) -> tuple[str, str]:
if isinstance(s, str):
s = s.split(" ")
if len(s)-1 >= index:
host = s[index]
if host in self._hosts_meta:
return host, " ".join(s[index+1:])
return "default", " ".join(s[index:])
def get_name(self, host) -> str:
h = self._hosts_meta.get(host)
if h:
return h.get("name", host)
return host
def _connect(self) -> None:
if self._hosts is None or len(self._hosts) == 0:
logger.error("[HOSTS] Не найдено ни одного хоста.")

View File

@ -10,9 +10,9 @@ from ruamel.yaml import YAML
yaml = YAML()
yaml.default_flow_style = False
IN_DOCKER = "DOCKER_CONTAINER" in os.environ
IN_DOCKER = "IN_DOCKER" in os.environ
__version__ = '1.3.1'
__version__ = '2.0.0'
raw_config_main = """\
vk_token: ""
@ -36,26 +36,50 @@ perms:
admin: # Имя группы
name: Админ # Имя группы, которое будет отображаться в боте
ids: # вк ИД входящих в состав группы
- 370926160
- 370926160
parent: # Наследование прав
- helper
allow: # Какие команды разрешены, "*" - все
- '*'
- helper
allow: # Права, подробнее в readme.md
- bot.*
# - bot.help
# - bot.info
# - bot.hosts
# - bot.hosts.*
# - bot.hosts.list
# - bot.hosts.reload
# - bot.perms
# - bot.hosts.*
# - bot.hosts.reload
# - bot.cmd.*
# - bot.cmd.help
# - bot.cmd.id
# - bot.online.*
# - bot.online.default
# - bot.online.lobby
# - bot.history.*
# - bot.history.default
# - bot.history.lobby
# - bot.rcon.*
# - bot.rcon.*.*
helper:
name: Хелпер
ids:
- 583018016
- 583018016
allow:
- bot.rcon.* # См. host.yml
- say
- mute
- warn
- bot.rcon.default
- bot.rcon.lobby
- bot.rcon.survival
- bot.rcon.*.say
- bot.rcon.*.mute
- bot.rcon.survival.ban
- bot.rcon.survival.tempban
default:
name: Игрок
allow:
- bot.online.* # См. host.yml
- bot.history.* # См. host.yml
- bot.cmd.help
- bot.cmd.id
- bot.cmd.online.*
- bot.cmd.history.*
"""
raw_config_hosts = """\
@ -71,8 +95,8 @@ hosts:
# Разрешение: bot.rcon.<name>; bot.online.<name>; bot.history.<name>
# При запуске бота будет проверка доступности всего
rcon: 2 # RCON будет доступен по команде .rcon lobby <cmd> (разрешение: bot.rcon.lobby)
# !online будет доступен по команде !online lobby (разрешение: bot.online.lobby)
# !history будет доступен по команде !history lobby (разрешение: bot.history.lobby)
# !online будет доступен по команде !online lobby (разрешение: bot.cmd.online.lobby)
# !history будет доступен по команде !history lobby (разрешение: bot.cmd.history.lobby)
online: 2
rcon: # RCON подключение
host: 192.168.0.31
@ -119,17 +143,15 @@ hosts:
important: true
rcon: 0
online: 1
rcon: null
rcon:
mine:
host: 192.168.0.31
port: 15009
"""
raw_help = """\
Тебе не нужна помощь, ты и так беспомощный, кожаный ублюдок. Так уж и быть, подскажу пару команд...
!help - Вывести это сообщение.
!online - Показать текущий онлайн на сервере.
Бот сделан кожанным петухом - админом, все вопросы к нему, я не причём.
!help - Вывести это сообщение
!online - Показать текущий онлайн на сервере
"""
config_dir = "./config/"
@ -176,9 +198,11 @@ with open(config_file_main) as f:
config = yaml.load(f)
logger.info("Запуск..")
if IN_DOCKER:
logger.info("Обнаружен запуск из DOCKER")
if not os.path.exists(config["help_file"]):
logger.info(f"Создание: {config["help_file"]}...")
with open(config.vk.help_file, "w", encoding="utf-8") as f:
with open(config["help_file"], "w", encoding="utf-8") as f:
f.write(raw_help)

View File

@ -7,7 +7,7 @@ from modules import yaml, raw_config_perms, enter_to_exit
class Permissions:
perm_file = Path("permissions.yml")
perms_file = Path("permissions.yml")
def __init__(self, **kwargs):
logger.debug(f"[PERMS] Initializing Permissions")
@ -17,7 +17,7 @@ class Permissions:
self.no_rights = kwargs.get("noRights")
self._perms = kwargs.get('perms')
if not self._perms or not isinstance(self._perms, dict):
logger.error(f"[PERMS] Блок: {"perms"!r}, в {self.perm_file!r} - Не валидный")
logger.error(f"[PERMS] Блок: {"perms"!r}, в {self.perms_file!r} - Не валидный")
logger.debug(f"perms: {type(self._perms)}")
logger.debug(self._perms)
enter_to_exit()
@ -26,15 +26,14 @@ class Permissions:
self.__handle_members()
logger.info(f"[PERMS] Права загружены")
def __handle_parents(self, p=None):
if p is None:
p = {}
def __handle_parents(self, r=True):
p = {}
for parent, v in self._perms.items():
for child in v.get("parent", []):
p[child] = parent
if p.get(child) == parent and p.get(parent) == child:
logger.warning(f"[PERMS] Рекурсивное присваивание запрещено: "
f"perms.{child}.parent.{parent} - perms.{parent}.parent.{child} ({self.perm_file!r})")
f"perms.{child}.parent.{parent} - perms.{parent}.parent.{child} ({self.perms_file!r})")
del p[parent]
for child, parent in p.items():
@ -46,6 +45,10 @@ class Permissions:
else:
logger.warning(f"[PERMS] Группа {child!r} - не найдена (perms.{parent}.parent.{child})")
if r:
logger.debug(f"[PERMS] Again :)")
self.__handle_parents(False)
def __handle_members(self):
self.__handle_parents()
for role, role_data in self._perms.items():
@ -64,23 +67,34 @@ class Permissions:
}
logger.debug(f"{self._members=}")
def is_allowed(self, member: int, _perms: str | list) -> tuple[bool, str]:
if isinstance(_perms, str):
_perms = [_perms]
for perm in _perms:
user = self._members.get(member)
if user:
friendly = user['friendly']
allow = user['allow']
if (("*" in allow) or (perm in allow)) and (f"-{perm}" not in allow):
return True, friendly
return False, friendly
return False, self._no_role
def is_allowed(self, member: int, perms: str | list, raw_role=False) -> tuple[bool, str]:
if isinstance(perms, str):
perms = [perms]
logger.debug(perms)
allow = False, self._no_role
user = self._members.get(member)
if user:
allow_list = user['allow']
logger.debug(f"{user=} {allow_list=}")
role = self.get_role(member, raw_role)
for perm in perms:
if f"-{perm}" in allow_list:
allow = False, role
logger.debug(f"Found -{perm=}")
break
if not allow[0]:
if ((("*" in allow_list) or ("bot.*" in allow_list) or (perm in allow_list))
and (f"-{perm}" not in allow_list)):
allow = True, role
else:
allow = False, role
logger.debug(allow)
return allow
def get_role(self, member):
u = self._members.get(member)
if u:
return u['friendly']
def get_role(self, member, raw_role=False):
user = self._members.get(member)
if user:
return user['friendly'] if not raw_role else user['role']
return self._no_role
def get_nick(self, member):
@ -91,20 +105,16 @@ class Permissions:
@classmethod
def load(cls):
if os.path.exists(cls.perm_file):
data = yaml.load(cls.perm_file)
if os.path.exists(cls.perms_file):
data = yaml.load(cls.perms_file)
if not data:
os.remove(cls.perm_file)
os.remove(cls.perms_file)
return Permissions.load()
else:
logger.info(f"Создание: {cls.perm_file}...")
logger.info(f"Создание: {cls.perms_file}...")
data = yaml.load(raw_config_perms)
with open(cls.perm_file, mode="w", encoding="utf-8") as f:
with open(cls.perms_file, mode="w", encoding="utf-8") as f:
yaml.dump(data, f)
logger.info(f"[PERMS] {cls.perm_file} - загружен")
logger.info(f"[PERMS] {cls.perms_file} - загружен")
return Permissions(**data)
if __name__ == '__main__':
perms = Permissions.load()

View File

@ -1,6 +1,6 @@
# pip install pyinstaller-versionfile
# create-version-file metadata.yml --outfile version.txt
Version: 1.3.1
Version: 2.0.0
CompanyName: anidev
FileDescription: Бот для майнкрафта, использует RCON и VK API. Исходники можно найти по "SantaSpeen/Rcon-VK-Bot"
InternalName: VkBot-Rcon