From d003601b58d7f3198549d8ca652f66fc697aa311 Mon Sep 17 00:00:00 2001 From: santaspeen Date: Wed, 31 Jul 2024 15:50:37 +0300 Subject: [PATCH] [!] FIX Legacy mode [+] MyNestedCompleter [+] players_completer --- src/modules/ConsoleSystem/__init__.py | 256 +++++++++++++++++--------- 1 file changed, 173 insertions(+), 83 deletions(-) diff --git a/src/modules/ConsoleSystem/__init__.py b/src/modules/ConsoleSystem/__init__.py index 5624a82..1f1e797 100644 --- a/src/modules/ConsoleSystem/__init__.py +++ b/src/modules/ConsoleSystem/__init__.py @@ -13,7 +13,8 @@ from typing import AnyStr from prompt_toolkit import PromptSession, print_formatted_text, HTML from prompt_toolkit.auto_suggest import AutoSuggestFromHistory -from prompt_toolkit.completion import NestedCompleter +from prompt_toolkit.completion import Completer, WordCompleter +from prompt_toolkit.document import Document from prompt_toolkit.history import FileHistory try: @@ -27,6 +28,84 @@ from core import get_logger from modules.ConsoleSystem.RCON import RCONSystem +class BadCompleter(Exception): ... + + +class MyNestedCompleter(Completer): + def __init__(self, options, ignore_case=True): + self.options = self._from_nested_dict(options) + self.ignore_case = ignore_case + + def __repr__(self) -> str: + return f"MyNestedCompleter({self.options!r}, ignore_case={self.ignore_case!r})" + + @classmethod + def _from_nested_dict(cls, data, r=False): + options: dict[str, Completer | None] = {} + for key, value in data.items(): + if isinstance(value, Completer): + options[key] = value + elif isinstance(value, dict): + options[key] = cls._from_nested_dict(value, True) + elif isinstance(value, set): + options[key] = cls._from_nested_dict({item: None for item in value}, True) + elif isinstance(value, bool): + if value: + options[key] = None + else: + if isinstance(value, str) and value == "": + options[key] = players_completer + else: + if value is not None: + raise BadCompleter(f"{value!r} for key {key!r} have not valid type.") + options[key] = None + if r: + return cls(options) + return options + + def load(self, data): + self.options = self._from_nested_dict(data) + + def get_completions(self, document, complete_event): + # Split document. + text = document.text_before_cursor.lstrip() + stripped_len = len(document.text_before_cursor) - len(text) + + # If there is a space, check for the first term, and use a + # subcompleter. + if " " in text: + first_term = text.split()[0] + completer = self.options.get(first_term) + + # If we have a sub completer, use this for the completions. + if completer is not None: + remaining_text = text[len(first_term):].lstrip() + move_cursor = len(text) - len(remaining_text) + stripped_len + + new_document = Document( + remaining_text, + cursor_position=document.cursor_position - move_cursor, + ) + + yield from completer.get_completions(new_document, complete_event) + + # No space in the input: behave exactly like `WordCompleter`. + else: + completer = WordCompleter( + list(self.options.keys()), ignore_case=self.ignore_case + ) + yield from completer.get_completions(document, complete_event) + + def tick_players(self, _): + clients = ev.call_event("_get_player", raw=True)[0] + self.options = {} + for k in clients.keys(): + self.options[k] = None + + +players_completer = MyNestedCompleter({}) + + class Console: def __init__(self, @@ -36,7 +115,11 @@ class Console: debug=False) -> None: self.__logger = get_logger("console") self.__run = False - self.no_cmd = False + try: + self.session = PromptSession(history=FileHistory('./.cmdhistory')) + self.__legacy_mode = False + except NoConsoleScreenBufferError: + self.__legacy_mode = True self.__prompt_in = prompt_in self.__prompt_out = prompt_out self.__not_found = not_found @@ -47,11 +130,11 @@ class Console: self.__man = dict() self.__desc = dict() self.__print_logger = get_logger("print") + self.completer = MyNestedCompleter(self.__alias) self.add_command("man", self.__create_man_message, i18n.man_message_man, i18n.help_message_man, custom_completer={"man": {}}) self.add_command("help", self.__create_help_message, i18n.man_message_help, i18n.help_message_help, - custom_completer={"help": {"--raw": None}}) - self.completer = NestedCompleter.from_nested_dict(self.__alias) + custom_completer={"help": {"--raw": False}}) rcon = RCONSystem rcon.console = self self.rcon = rcon @@ -79,15 +162,11 @@ class Console: def __create_man_message(self, argv: list) -> AnyStr: if len(argv) == 0: return self.__man.get("man") - x = argv[0] - if self.__alias.get(x) is None: - return i18n.man_command_not_found.format(x) - man_message = self.__man.get(x) - if man_message: - return man_message - else: - return i18n.man_message_not_found + x = argv[0] + if x not in self.__alias: + return i18n.man_command_not_found.format(x) + return self.__man.get(x) # noinspection PyStringFormat def __create_help_message(self, argv: list) -> AnyStr: @@ -122,9 +201,6 @@ class Console: return message - def __update_completer(self): - self.completer = NestedCompleter.from_nested_dict(self.__alias) - def del_command(self, func): self.__debug(f"delete command: func={func};") keys = [] @@ -139,35 +215,32 @@ class Console: self.__man.pop(key) self.__desc.pop(key) self.__debug("Deleted.") - self.__update_completer() + self.completer.load(self.__alias) def add_command(self, key: str, func, man: str = None, desc: str = None, custom_completer: dict = None) -> dict: - key = key.format(" ", "-") - if not isinstance(key, str): raise TypeError("key must be string") + + key = key.replace(" ", "-") self.__debug(f"added user command: key={key}; func={func};") self.__alias.update(custom_completer or {key: None}) self.__alias["man"].update({key: None}) self.__func.update({key: {"f": func}}) - self.__man.update({key: f'html:{i18n.man_for} {key}\n{man}' if man else None}) + self.__man.update({key: f'html:{i18n.man_for} {key}\n{man if man else "No page"}'}) self.__desc.update({key: desc}) - self.__update_completer() + self.completer.load(self.__alias) return self.__alias.copy() - def _write(self, t): - if self.no_cmd: - print(t) + def _write(self, text): + if self.__legacy_mode: + print(text) return - try: - if t.startswith("html:"): - print_formatted_text(HTML(t[5:])) - else: - print_formatted_text(t) - except NoConsoleScreenBufferError: - print("Works in non cmd mode.") - self.no_cmd = True - print(t) + _type = text.split(":")[0] + match _type: + case "html": + print_formatted_text(HTML(text[5:])) + case _: + print_formatted_text(text) def write(self, s: AnyStr): if isinstance(s, (list, tuple)): @@ -177,12 +250,12 @@ class Console: self._write(s) def log(self, s: AnyStr) -> None: - if isinstance(s, (list, tuple)): - for text in s: - self.__logger.info(f"{text}") - else: - self.__logger.info(f"{s}") - # self.write(s) + # if isinstance(s, (list, tuple)): + # for text in s: + # self.__logger.info(f"{text}") + # else: + # self.__logger.info(f"{s}") + self.write(s) def __lshift__(self, s: AnyStr) -> None: self.write(s) @@ -221,6 +294,7 @@ class Console: except RecursionError: raise except Exception as e: + print(e) cls.handleError(record) logging.StreamHandler.emit = emit @@ -233,58 +307,74 @@ class Console: # builtins.print = self.__builtins_print - async def read_input(self): - session = PromptSession(history=FileHistory('./.cmdhistory')) - while True: - try: - with patch_stdout(): - if self.no_cmd: - cmd_in = input(self.__prompt_in) - else: - try: - cmd_in = await session.prompt_async( - self.__prompt_in, - completer=self.completer, - auto_suggest=AutoSuggestFromHistory() - ) - except NoConsoleScreenBufferError: - print("Works in non cmd mode.") - self.no_cmd = True - cmd_s = cmd_in.split(" ") - cmd = cmd_s[0] - if cmd == "": - continue + async def _parse_input(self, inp): + cmd_s = inp.split(" ") + cmd = cmd_s[0] + if cmd == "": + return True + else: + found_in_lua = False + d = ev.call_lua_event("onConsoleInput", inp) + if len(d) > 0: + for text in d: + if text is not None: + found_in_lua = True + self.log(text) + command_object = self.__func.get(cmd) + if command_object: + func = command_object['f'] + if inspect.iscoroutinefunction(func): + out = await func(cmd_s[1:]) else: - found_in_lua = False - d = ev.call_lua_event("onConsoleInput", cmd_in) - if len(d) > 0: - for text in d: - if text is not None: - found_in_lua = True - self.log(text) - command_object = self.__func.get(cmd) - if command_object: - func = command_object['f'] - if inspect.iscoroutinefunction(func): - out = await func(cmd_s[1:]) - else: - out = func(cmd_s[1:]) - if out: - self.log(out) - else: - if not found_in_lua: - self.log(self.__not_found % cmd) + out = func(cmd_s[1:]) + if out: + self.log(out) + else: + if not found_in_lua: + self.log(self.__not_found % cmd) + + async def _read_input(self): + with patch_stdout(): + while self.__run: + try: + inp = await self.session.prompt_async( + self.__prompt_in, completer=self.completer, auto_suggest=AutoSuggestFromHistory() + ) + if await self._parse_input(inp): + continue + except EOFError: + pass + except KeyboardInterrupt: + self.__run = False + except Exception as e: + self.__logger.error("Exception in console.py:") + self.__logger.exception(e) + + async def _read_input_legacy(self): + while self.__run: + try: + inp = input(self.__prompt_in) + if await self._parse_input(inp): + continue + except UnicodeDecodeError: + self.__logger.error("UnicodeDecodeError") + self.__run = False except KeyboardInterrupt: - raise KeyboardInterrupt - except ConnectionResetError as e: - self.__debug(f"ConnectionResetError {e}") + self.__run = False except Exception as e: - print(f"Error in console.py: {e}") + self.__logger.error("Exception in console.py:") self.__logger.exception(e) async def start(self): + ev.register("serverTick_0.5s", players_completer.tick_players) + # ev.register("get_players_completer", lambda _: players_completer) self.__run = True - await self.read_input() + if self.__legacy_mode: + await self._read_input_legacy() + else: + await self._read_input() + self.__debug("Closing console.") + raise KeyboardInterrupt def stop(self, *args, **kwargs): self.__run = False