[!] FIX Legacy mode

[+] MyNestedCompleter
[+] players_completer
This commit is contained in:
Maxim Khomutov 2024-07-31 15:50:37 +03:00
parent 72035c226b
commit d003601b58

View File

@ -13,7 +13,8 @@ from typing import AnyStr
from prompt_toolkit import PromptSession, print_formatted_text, HTML from prompt_toolkit import PromptSession, print_formatted_text, HTML
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.completion import NestedCompleter from prompt_toolkit.completion import Completer, WordCompleter
from prompt_toolkit.document import Document
from prompt_toolkit.history import FileHistory from prompt_toolkit.history import FileHistory
try: try:
@ -27,6 +28,84 @@ from core import get_logger
from modules.ConsoleSystem.RCON import RCONSystem from modules.ConsoleSystem.RCON import RCONSystem
class BadCompleter(Exception): ...
class MyNestedCompleter(Completer):
def __init__(self, options, ignore_case=True):
self.options = self._from_nested_dict(options)
self.ignore_case = ignore_case
def __repr__(self) -> str:
return f"MyNestedCompleter({self.options!r}, ignore_case={self.ignore_case!r})"
@classmethod
def _from_nested_dict(cls, data, r=False):
options: dict[str, Completer | None] = {}
for key, value in data.items():
if isinstance(value, Completer):
options[key] = value
elif isinstance(value, dict):
options[key] = cls._from_nested_dict(value, True)
elif isinstance(value, set):
options[key] = cls._from_nested_dict({item: None for item in value}, True)
elif isinstance(value, bool):
if value:
options[key] = None
else:
if isinstance(value, str) and value == "<playerlist>":
options[key] = players_completer
else:
if value is not None:
raise BadCompleter(f"{value!r} for key {key!r} have not valid type.")
options[key] = None
if r:
return cls(options)
return options
def load(self, data):
self.options = self._from_nested_dict(data)
def get_completions(self, document, complete_event):
# Split document.
text = document.text_before_cursor.lstrip()
stripped_len = len(document.text_before_cursor) - len(text)
# If there is a space, check for the first term, and use a
# subcompleter.
if " " in text:
first_term = text.split()[0]
completer = self.options.get(first_term)
# If we have a sub completer, use this for the completions.
if completer is not None:
remaining_text = text[len(first_term):].lstrip()
move_cursor = len(text) - len(remaining_text) + stripped_len
new_document = Document(
remaining_text,
cursor_position=document.cursor_position - move_cursor,
)
yield from completer.get_completions(new_document, complete_event)
# No space in the input: behave exactly like `WordCompleter`.
else:
completer = WordCompleter(
list(self.options.keys()), ignore_case=self.ignore_case
)
yield from completer.get_completions(document, complete_event)
def tick_players(self, _):
clients = ev.call_event("_get_player", raw=True)[0]
self.options = {}
for k in clients.keys():
self.options[k] = None
players_completer = MyNestedCompleter({})
class Console: class Console:
def __init__(self, def __init__(self,
@ -36,7 +115,11 @@ class Console:
debug=False) -> None: debug=False) -> None:
self.__logger = get_logger("console") self.__logger = get_logger("console")
self.__run = False self.__run = False
self.no_cmd = False try:
self.session = PromptSession(history=FileHistory('./.cmdhistory'))
self.__legacy_mode = False
except NoConsoleScreenBufferError:
self.__legacy_mode = True
self.__prompt_in = prompt_in self.__prompt_in = prompt_in
self.__prompt_out = prompt_out self.__prompt_out = prompt_out
self.__not_found = not_found self.__not_found = not_found
@ -47,11 +130,11 @@ class Console:
self.__man = dict() self.__man = dict()
self.__desc = dict() self.__desc = dict()
self.__print_logger = get_logger("print") self.__print_logger = get_logger("print")
self.completer = MyNestedCompleter(self.__alias)
self.add_command("man", self.__create_man_message, i18n.man_message_man, i18n.help_message_man, self.add_command("man", self.__create_man_message, i18n.man_message_man, i18n.help_message_man,
custom_completer={"man": {}}) custom_completer={"man": {}})
self.add_command("help", self.__create_help_message, i18n.man_message_help, i18n.help_message_help, self.add_command("help", self.__create_help_message, i18n.man_message_help, i18n.help_message_help,
custom_completer={"help": {"--raw": None}}) custom_completer={"help": {"--raw": False}})
self.completer = NestedCompleter.from_nested_dict(self.__alias)
rcon = RCONSystem rcon = RCONSystem
rcon.console = self rcon.console = self
self.rcon = rcon self.rcon = rcon
@ -79,15 +162,11 @@ class Console:
def __create_man_message(self, argv: list) -> AnyStr: def __create_man_message(self, argv: list) -> AnyStr:
if len(argv) == 0: if len(argv) == 0:
return self.__man.get("man") return self.__man.get("man")
x = argv[0]
if self.__alias.get(x) is None:
return i18n.man_command_not_found.format(x)
man_message = self.__man.get(x) x = argv[0]
if man_message: if x not in self.__alias:
return man_message return i18n.man_command_not_found.format(x)
else: return self.__man.get(x)
return i18n.man_message_not_found
# noinspection PyStringFormat # noinspection PyStringFormat
def __create_help_message(self, argv: list) -> AnyStr: def __create_help_message(self, argv: list) -> AnyStr:
@ -122,9 +201,6 @@ class Console:
return message return message
def __update_completer(self):
self.completer = NestedCompleter.from_nested_dict(self.__alias)
def del_command(self, func): def del_command(self, func):
self.__debug(f"delete command: func={func};") self.__debug(f"delete command: func={func};")
keys = [] keys = []
@ -139,35 +215,32 @@ class Console:
self.__man.pop(key) self.__man.pop(key)
self.__desc.pop(key) self.__desc.pop(key)
self.__debug("Deleted.") self.__debug("Deleted.")
self.__update_completer() self.completer.load(self.__alias)
def add_command(self, key: str, func, man: str = None, desc: str = None, custom_completer: dict = None) -> dict: def add_command(self, key: str, func, man: str = None, desc: str = None, custom_completer: dict = None) -> dict:
key = key.format(" ", "-")
if not isinstance(key, str): if not isinstance(key, str):
raise TypeError("key must be string") raise TypeError("key must be string")
key = key.replace(" ", "-")
self.__debug(f"added user command: key={key}; func={func};") self.__debug(f"added user command: key={key}; func={func};")
self.__alias.update(custom_completer or {key: None}) self.__alias.update(custom_completer or {key: None})
self.__alias["man"].update({key: None}) self.__alias["man"].update({key: None})
self.__func.update({key: {"f": func}}) self.__func.update({key: {"f": func}})
self.__man.update({key: f'html:<seagreen>{i18n.man_for} <b>{key}</b>\n{man}</seagreen>' if man else None}) self.__man.update({key: f'html:<seagreen>{i18n.man_for} <b>{key}</b>\n{man if man else "No page"}</seagreen>'})
self.__desc.update({key: desc}) self.__desc.update({key: desc})
self.__update_completer() self.completer.load(self.__alias)
return self.__alias.copy() return self.__alias.copy()
def _write(self, t): def _write(self, text):
if self.no_cmd: if self.__legacy_mode:
print(t) print(text)
return return
try: _type = text.split(":")[0]
if t.startswith("html:"): match _type:
print_formatted_text(HTML(t[5:])) case "html":
else: print_formatted_text(HTML(text[5:]))
print_formatted_text(t) case _:
except NoConsoleScreenBufferError: print_formatted_text(text)
print("Works in non cmd mode.")
self.no_cmd = True
print(t)
def write(self, s: AnyStr): def write(self, s: AnyStr):
if isinstance(s, (list, tuple)): if isinstance(s, (list, tuple)):
@ -177,12 +250,12 @@ class Console:
self._write(s) self._write(s)
def log(self, s: AnyStr) -> None: def log(self, s: AnyStr) -> None:
if isinstance(s, (list, tuple)): # if isinstance(s, (list, tuple)):
for text in s: # for text in s:
self.__logger.info(f"{text}") # self.__logger.info(f"{text}")
else: # else:
self.__logger.info(f"{s}") # self.__logger.info(f"{s}")
# self.write(s) self.write(s)
def __lshift__(self, s: AnyStr) -> None: def __lshift__(self, s: AnyStr) -> None:
self.write(s) self.write(s)
@ -221,6 +294,7 @@ class Console:
except RecursionError: except RecursionError:
raise raise
except Exception as e: except Exception as e:
print(e)
cls.handleError(record) cls.handleError(record)
logging.StreamHandler.emit = emit logging.StreamHandler.emit = emit
@ -233,58 +307,74 @@ class Console:
# builtins.print = self.__builtins_print # builtins.print = self.__builtins_print
async def read_input(self): async def _parse_input(self, inp):
session = PromptSession(history=FileHistory('./.cmdhistory')) cmd_s = inp.split(" ")
while True: cmd = cmd_s[0]
try: if cmd == "":
with patch_stdout(): return True
if self.no_cmd: else:
cmd_in = input(self.__prompt_in) found_in_lua = False
else: d = ev.call_lua_event("onConsoleInput", inp)
try: if len(d) > 0:
cmd_in = await session.prompt_async( for text in d:
self.__prompt_in, if text is not None:
completer=self.completer, found_in_lua = True
auto_suggest=AutoSuggestFromHistory() self.log(text)
) command_object = self.__func.get(cmd)
except NoConsoleScreenBufferError: if command_object:
print("Works in non cmd mode.") func = command_object['f']
self.no_cmd = True if inspect.iscoroutinefunction(func):
cmd_s = cmd_in.split(" ") out = await func(cmd_s[1:])
cmd = cmd_s[0]
if cmd == "":
continue
else: else:
found_in_lua = False out = func(cmd_s[1:])
d = ev.call_lua_event("onConsoleInput", cmd_in) if out:
if len(d) > 0: self.log(out)
for text in d: else:
if text is not None: if not found_in_lua:
found_in_lua = True self.log(self.__not_found % cmd)
self.log(text)
command_object = self.__func.get(cmd) async def _read_input(self):
if command_object: with patch_stdout():
func = command_object['f'] while self.__run:
if inspect.iscoroutinefunction(func): try:
out = await func(cmd_s[1:]) inp = await self.session.prompt_async(
else: self.__prompt_in, completer=self.completer, auto_suggest=AutoSuggestFromHistory()
out = func(cmd_s[1:]) )
if out: if await self._parse_input(inp):
self.log(out) continue
else: except EOFError:
if not found_in_lua: pass
self.log(self.__not_found % cmd) except KeyboardInterrupt:
self.__run = False
except Exception as e:
self.__logger.error("Exception in console.py:")
self.__logger.exception(e)
async def _read_input_legacy(self):
while self.__run:
try:
inp = input(self.__prompt_in)
if await self._parse_input(inp):
continue
except UnicodeDecodeError:
self.__logger.error("UnicodeDecodeError")
self.__run = False
except KeyboardInterrupt: except KeyboardInterrupt:
raise KeyboardInterrupt self.__run = False
except ConnectionResetError as e:
self.__debug(f"ConnectionResetError {e}")
except Exception as e: except Exception as e:
print(f"Error in console.py: {e}") self.__logger.error("Exception in console.py:")
self.__logger.exception(e) self.__logger.exception(e)
async def start(self): async def start(self):
ev.register("serverTick_0.5s", players_completer.tick_players)
# ev.register("get_players_completer", lambda _: players_completer)
self.__run = True self.__run = True
await self.read_input() if self.__legacy_mode:
await self._read_input_legacy()
else:
await self._read_input()
self.__debug("Closing console.")
raise KeyboardInterrupt
def stop(self, *args, **kwargs): def stop(self, *args, **kwargs):
self.__run = False self.__run = False