From 8cd4e59918f259d08f3d3d79071e1cc8d7630fe9 Mon Sep 17 00:00:00 2001 From: SantaSpeen Date: Sat, 1 Oct 2022 13:41:08 +0300 Subject: [PATCH] Add asyncio support --- src/console/Console.py | 50 +++++++++++++++++++++++++++-------------- src/console/Console.pyi | 23 ++++++++++--------- 2 files changed, 46 insertions(+), 27 deletions(-) diff --git a/src/console/Console.py b/src/console/Console.py index 625d1f2..af7d94f 100644 --- a/src/console/Console.py +++ b/src/console/Console.py @@ -10,9 +10,15 @@ import builtins import logging import sys import traceback +import asyncio from builtins import RecursionError from typing import AnyStr +try: + from aioconsole import ainput +except ImportError: + ainput = None + class ConsoleIO: @@ -67,6 +73,8 @@ class Console: self.is_run = False + self.async_loop = async_loop + self.get_IO = ConsoleIO def __debug(self, *x): @@ -126,7 +134,6 @@ class Console: message += f"%-{max_len}s; %-{max_len_v}s; %s\n" % (k, v, doc) else: - if doc is None: doc = " No help message found" message += f" %{max_len}s :%s\n" % (k, doc) @@ -229,6 +236,29 @@ class Console: builtins.print = self.__builtins_print + def _cli_logic(self, cmd_in): + try: + cmd = cmd_in.split(" ")[0] + if cmd == "": + pass + else: + command_object = self.__alias.get(cmd) + if command_object: + command = command_object['f'] + if command_object['e']: + argv = cmd_in[len(cmd) + 1:].split(" ") + output = command(argv) + else: + output = command() + self.log(str(output)) + + else: + self.log(self.__not_found % cmd) + except Exception as e: + ConsoleIO.write_err("\rDuring the execution of the command, an error occurred:\n\n" + + str(traceback.format_exc()) + + "\nType Enter to continue.") + def run(self) -> None: """ def run(self) -> None: @@ -245,22 +275,8 @@ class Console: """ self.__debug(f"run while {whl}") while whl: - try: - ConsoleIO.write("\r" + self.__prompt_in + " ") - cmd_in = ConsoleIO.read() - cmd = cmd_in.split(" ")[0] - if cmd == "": - pass - else: - command_object = self.__alias.get(cmd) - if command_object: - command = command_object['f'] - if command_object['e']: - argv = cmd_in[len(cmd) + 1:].split(" ") - output = command(argv) - else: - output = command() - self.log(str(output)) + ConsoleIO.write("\r" + self.__prompt_in + " ") + self._cli_logic(ConsoleIO.read()) async def async_run(self) -> None: """ diff --git a/src/console/Console.pyi b/src/console/Console.pyi index fcbcd22..99d0692 100644 --- a/src/console/Console.pyi +++ b/src/console/Console.pyi @@ -6,8 +6,8 @@ # (c) ahegao.ovh 2022 from _typeshed import SupportsWrite -from builtins import function -from typing import AnyStr +from asyncio import AbstractEventLoop +from typing import AnyStr, NoReturn class ConsoleIO: @@ -29,7 +29,8 @@ class Console(object): prompt_out: str = "]:", not_found: str = "Command \"%s\" not found in alias.", file: SupportsWrite[str] or None = Console, - debug: bool = False) -> None: + debug: bool = False, + async_loop: AbstractEventLoop = None) -> NoReturn: self.get_IO: ConsoleIO = ConsoleIO self.is_run: bool = False @@ -38,11 +39,13 @@ class Console(object): @property def alias(self) -> dict: ... def add(self, key: str, func, argv: bool = False, man: str = "No have manual message") -> dict:... - def log(self, s: AnyStr, r='\r') -> None: ... - def __lshift__(self, s: AnyStr) -> None: + def log(self, s: AnyStr, r='\r') -> NoReturn: ... + def __lshift__(self, s: AnyStr) -> NoReturn: self.write(s) - def write(self, s: AnyStr, r='\r') -> None: ... - def logger_hook(self) -> None: ... - def builtins_hook(self) -> None: ... - def run(self) -> None: ... - def run_while(self, whl) -> None:... \ No newline at end of file + def write(self, s: AnyStr, r='\r') -> NoReturn: ... + def logger_hook(self) -> NoReturn: ... + def builtins_hook(self) -> NoReturn: ... + def run(self) -> NoReturn: ... + def run_while(self, whl) -> NoReturn:... + async def async_run(self) -> NoReturn: ... + async def async_run_while(self, whl) -> NoReturn: ...