7 Commits

Author SHA1 Message Date
7464a4095d [+] Warning on udp_addr != main_addr 2024-08-12 18:43:42 +03:00
3dc2232db2 [+] Fix onModsSending 2024-08-05 18:40:20 +03:00
a16e2e39d9 [~] go home... 2024-08-02 18:18:53 +03:00
ac2aba4b27 [~] Basic events 2024-08-02 16:33:13 +03:00
66db0aa9f7 [~] Minor 2024-08-02 16:17:52 +03:00
2197a32354 [+] PluginConsole
[+] ev.unregister_by_id
[+] Completes for PluginsLoader
[~] asyncio.to_thread
[~] console.legacy_mode
2024-08-02 16:03:14 +03:00
1e95a7519b [+] PluginConsole
[+] ev.unregister_by_id
[+] Completes for PluginsLoader
[~] asyncio.to_thread
[~] console.legacy_mode
2024-08-02 16:03:11 +03:00
12 changed files with 232 additions and 102 deletions

View File

@@ -7,3 +7,4 @@ colorama~=0.4.6
cryptography~=42.0.4
prompt_toolkit~=3.0.47
requests~=2.32.3
Pygments~=2.18.0

View File

@@ -62,6 +62,7 @@ class Client:
self._connect_time = 0
self._last_position = {}
self._last_recv = time.monotonic()
self.__tpt_id = 0
@property
def _writer(self):
@@ -227,7 +228,7 @@ class Client:
await writer.drain()
return True
except Exception as e:
self.log.debug(f'[TCP] Disconnected: {e}')
self.log.debug(f'[TCP] Disconnected: {e}; {writer=}')
self.__alive = False
await self._remove_me()
return False
@@ -286,24 +287,32 @@ class Client:
await self._tpc_put(None)
async def _split_load(self, start, end, d_sock, filename, speed_limit=None):
real_size = end - start
size = end - start
writer = self._down_sock[1] if d_sock else self.__writer
who = 'dwn' if d_sock else 'srv'
self.log.debug(f"[{who}] Real size: {real_size / MB}mb; {real_size == end}, {real_size * 2 == end}")
who = 'DSock' if d_sock else 'MSock'
self.log.debug(f"[{who}] Started; start,end={(start, end)}")
with open(filename, 'rb') as f:
f.seek(start)
total_sent = 0
start_time = time.monotonic()
while total_sent < real_size:
data = f.read(min(MB, real_size - total_sent)) # read data in chunks of 1MB or less
while total_sent < size:
if (size - total_sent) == 0:
break
data = f.read(min(MB, size - total_sent)) # read data in chunks of 1MB or less
try:
writer.write(data)
async with asyncio.timeout(120): # ~100kb/s
await writer.drain()
# self.log.debug(f"[{who}] Sent {len(data)} bytes.")
except ConnectionError:
except TimeoutError:
self.log.debug(f"[{who}] TimeoutError; Sock: {writer}")
self.log.error("TimeoutError")
self.__alive = False
break
except ConnectionError:
self.log.debug(f"[{who}] Disconnected; Sock: {writer}")
self.__alive = False
self.log.debug(f"[{who}] Disconnected.")
break
total_sent += len(data)
@@ -313,7 +322,7 @@ class Client:
expected_time = total_sent / (speed_limit * MB)
if expected_time > elapsed_time:
await asyncio.sleep(expected_time - elapsed_time)
self.log.debug(f"[{who}] Ready. {total_sent=}")
return total_sent
async def _sync_resources(self):
@@ -333,10 +342,10 @@ class Client:
size = mod['size']
self.log.debug("File is accept.")
break
self.log.debug(f"Mode size: {size}")
# self.log.debug(f"Mod size: {size}")
if size == -1:
await self._send(b"CO")
await self.kick(f"Not allowed mod: " + file)
await self.kick(f"Requested not allowed file: " + file)
return
await self._send(b"AG")
t = 0
@@ -344,22 +353,22 @@ class Client:
await asyncio.sleep(0.1)
t += 1
if t > 50:
await self.kick("Missing download socket")
await self.kick("Error (Missing DSock)")
return
if config.Options['use_queue']:
while self._core.lock_upload:
await asyncio.sleep(.2)
self._core.lock_upload = True
speed = config.Options["speed_limit"]
speed = config.Options["speed_limit"] or 25*B
if speed:
speed = speed / 2
half_size = math.floor(size / 2)
half_size = size // 2
t = time.monotonic()
uploads = [
self._split_load(0, half_size, False, file, speed),
self._split_load(half_size, size, True, file, speed)
]
sl0, sl1 = await asyncio.gather(*uploads)
self.log.debug(f"Sending: {size=}; sl0={(0, half_size)}; sl1={(half_size, size)}")
async with asyncio.TaskGroup() as tg:
sl0 = tg.create_task(self._split_load(0, half_size, False, file, speed))
sl1 = tg.create_task(self._split_load(half_size, size, True, file, speed))
sl0, sl1 = sl0.result(), sl1.result()
tr = (time.monotonic() - t) or 0.0001
if self._core.lock_upload:
self._core.lock_upload = False
@@ -370,17 +379,17 @@ class Client:
sent = sl0 + sl1
ok = sent == size
lost = size - sent
self.log.debug(f"SplitLoad_0: {sl0}; SplitLoad_1: {sl1}; At all ({ok}): Sent: {sent}; Lost: {lost}")
self.log.debug(f"Sent; sl_0: {sl0}; sl_1: {sl1}; size==sent is {ok}: {size}-{sent}={lost}")
if not ok:
self.__alive = False
self.log.error(i18n.client_mod_sent_error.format(repr(file)))
e = i18n.client_mod_sent_error.format(repr(file))
await self._send(f"E{e}")
self.log.error(e)
return
elif data.startswith(b"SR"):
path_list = ''
size_list = ''
for mod in self._core.mods_list:
if type(mod) == int:
continue
for mod in self._core.mods_list[1:]:
path_list += f"{mod['path']};"
size_list += f"{mod['size']};"
mod_list = path_list + size_list
@@ -390,6 +399,7 @@ class Client:
else:
await self._send(mod_list)
elif data == b"Done":
self.log.debug("recv Done")
await self._send(f"M/levels/{config.Game['map']}/info.json")
break
return
@@ -691,7 +701,7 @@ class Client:
self.__alive = False
return
if len(data) == 0:
self.__alive = False
await self.kick("Bad data from client")
return
_bytes = False
@@ -810,9 +820,9 @@ class Client:
await self._send(f"P{self.cid}") # Send clientID
await self._sync_resources()
ev.call_lua_event("onPlayerJoining", self.cid)
ev.register("serverTick", self.__tick_player_tcp)
ev.register("serverTick", self.__tick_player_udp)
ev.register("serverTick_1s", self._tick_pps)
self.__tpt_id = ev.register("serverTick", self.__tick_player_tcp)
self.__tpu_id = ev.register("serverTick", self.__tick_player_udp)
self.__tpp_id = ev.register("serverTick_1s", self._tick_pps)
await self._recv()
async def _remove_me(self):
@@ -830,18 +840,23 @@ class Client:
ev.call_lua_event("onPlayerDisconnect", self.cid)
ev.call_event("onPlayerDisconnect", player=self)
await ev.call_async_event("onPlayerDisconnect", player=self)
ev.unregister(self.__tick_player_tcp)
ev.unregister(self.__tick_player_udp)
ev.unregister(self._tick_pps)
if self.__tpt_id:
ev.unregister_by_id(self.__tpt_id) # self.__tick_player_tcp
ev.unregister_by_id(self.__tpu_id) # self.__tick_player_udp
ev.unregister_by_id(self.__tpp_id) # self._tick_pps
gt = round((time.monotonic() - self._connect_time) / 60, 2)
self.log.info(i18n.client_player_disconnected.format(gt))
self._core.clients[self.cid] = None
del self._core.clients_by_id[self.cid]
del self._core.clients_by_nick[self.nick]
self.log.debug(f"TPC: "
f"Recv: {self._tpc_count_total_recv}; {self._tpc_size_total_recv / KB:.4f}kb; "
f"Sent: {self._tpc_count_total_sent}; {self._tpc_size_total_sent / KB:.4f}kb;")
self.log.debug(f"UDP: "
f"Recv: {self._udp_count_total_recv}; {self._udp_size_total_recv / KB:.4f}kb; "
f"Sent: {self._udp_count_total_sent}; {self._udp_size_total_sent / KB:.4f}kb;")
else:
self.log.debug(f"Removing client; Closing connection...")
self.log.debug(f"TPC: Recv: {self._tpc_count_total_recv}; {self._tpc_size_total_recv / KB:.4f}kb; Sent: {self._tpc_count_total_sent}; {self._tpc_size_total_sent / KB:.4f}kb;")
self.log.debug(f"UDP: Recv: {self._udp_count_total_recv}; {self._udp_size_total_recv / KB:.4f}kb; Sent: {self._udp_count_total_sent}; {self._udp_size_total_sent / KB:.4f}kb;")
await asyncio.sleep(0.001)
try:
self.__writer.close()

View File

@@ -55,6 +55,9 @@ class Client:
self._unicycle: Dict[str, Union[int, str]] = {"id": -1, "packet": ""}
self._last_position = {}
self._lock = Lock()
self.__tpt_id = 0
self.__tpu_id = 0
self.__tpp_id = 0
async def __gracefully_kick(self): ...
@property
@@ -92,7 +95,7 @@ class Client:
async def _send(self, data: bytes | str, to_all: bool = False, to_self: bool = True, to_udp: bool = False, writer: StreamWriter = None) -> None: ...
async def _sync_resources(self) -> None: ...
async def _recv(self, one=False) -> bytes | None: ...
async def _split_load(self, start: int, end: int, d_sock: bool, filename: str, sl: float) -> None: ...
async def _split_load(self, start: int, end: int, d_sock: bool, filename: str, sl: float) -> int: ...
async def _get_cid_vid(self, s: str) -> Tuple[int, int]: ...
async def _spawn_car(self, data: str) -> None: ...
async def delete_car(self, car_id: int) -> None: ...

View File

@@ -10,7 +10,7 @@ __title__ = 'KuiToi-Server'
__description__ = 'BeamingDrive Multiplayer server compatible with BeamMP clients.'
__url__ = 'https://github.com/kuitoi/kuitoi-Server'
__version__ = '0.4.8'
__build__ = 2679 # Я это считаю лог файлами
__build__ = 2800 # Я это считаю лог файлами
__author__ = 'SantaSpeen'
__author_email__ = 'admin@anidev.ru'
__license__ = "FPA"
@@ -19,6 +19,7 @@ __copyright__ = 'Copyright 2024 © SantaSpeen (Maxim Khomutov)'
import asyncio
import builtins
import sys
import time
import webbrowser
import prompt_toolkit.shortcuts as shortcuts
@@ -30,15 +31,17 @@ from modules import ConfigProvider, EventsSystem
from modules import Console
from modules import MultiLanguage
builtins.Ts = time.monotonic()
args, _ = parser.parse_known_args()
if args.version:
print(f"{__title__}:\n\tVersion: {__version__}\n\tBuild: {__build__}")
exit(0)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# loop.set_task_factory(asyncio.eager_task_factory)
asyncio.set_event_loop(loop)
log = get_logger("core.init")
# Config file init

View File

@@ -136,7 +136,7 @@ class Core:
try:
for client in self.clients:
ca = f"Ss{len(self.clients_by_id)}/{config.Game['players']}:{self.get_clients_list()}"
if not client or not client.alive:
if not client or not client.alive or not client.ready:
continue
await client._send(ca)
except Exception as e:

View File

@@ -126,9 +126,10 @@ class TCPServer:
result, client = await self.auth_client(reader, writer)
if result:
await client._looper()
return result, client
return "U", client
case "D":
await self.set_down_rw(reader, writer)
return "D", None
case "P":
writer.write(b"P")
await writer.drain()
@@ -137,34 +138,28 @@ class TCPServer:
self.log.warning(f"Unknown code: {code}")
self.log.warning("Report about that!")
writer.close()
return False, None
return "E", None
async def handle_client(self, reader, writer):
while self.run:
self._connections.add(writer)
try:
ip = writer.get_extra_info('peername')[0]
if self.rl.is_banned(ip):
await self.rl.notify(ip, writer)
writer.close()
break
data = await reader.read(1)
if not data:
break
return
code = data.decode()
self.log.debug(f"Received {code!r} from {writer.get_extra_info('sockname')!r}")
# task = asyncio.create_task(self.handle_code(code, reader, writer))
# await asyncio.wait([task], return_when=asyncio.FIRST_EXCEPTION)
_, cl = await self.handle_code(code, reader, writer)
self.log.debug(f"cl returned: {cl}")
_type, cl = await self.handle_code(code, reader, writer)
self.log.debug(f"[{_type}] cl returned: {cl}")
if cl:
await cl._remove_me()
break
except Exception as e:
self.log.error("Error while handling connection...")
self.log.exception(e)
traceback.print_exc()
break
async def start(self):
self.log.debug("Starting TCP server.")
@@ -172,17 +167,12 @@ class TCPServer:
try:
self.server = await asyncio.start_server(self.handle_client, self.host, self.port,
backlog=int(config.Game["players"] * 4))
self.log.debug(f"TCP server started on {self.server.sockets[0].getsockname()!r}")
while True:
async with self.server:
self.log.debug(f"TCP server started on {self.server.sockets[0].getsockname()!r}")
await self.server.serve_forever()
except OSError as e:
self.log.error(i18n.core_bind_failed.format(e))
raise e
except KeyboardInterrupt:
pass
except ConnectionResetError as e:
self.log.debug(f"ConnectionResetError {e}")
except Exception as e:
self.log.exception(e)
raise e

View File

@@ -6,6 +6,7 @@
# (c) kuitoi.su 2024
import asyncio
import json
import time
from core import utils
@@ -30,11 +31,16 @@ class UDPServer(asyncio.DatagramTransport):
async def handle_datagram(self, packet, addr):
try:
cid = packet[0] - 1
if cid > config.Game['players'] * 4:
return
client = self._core.get_client(cid=cid)
if client:
if not client.alive:
client.log.debug(f"Still sending UDP data: {packet}")
if client._udp_sock != (self.transport, addr):
self.log.debug(f"udp_addr={addr[0]}; main_addr={client.addr}")
if addr[0] != client.addr:
self.log.warning(f"udp_addr != main_addr. Is this bug?")
client._udp_sock = (self.transport, addr)
self.log.debug(f"Set UDP Sock for CID: {cid}")
await client._udp_put(packet)

View File

@@ -22,7 +22,6 @@ class UDPServer(asyncio.DatagramTransport):
self.host = host
self.port = port
self.run = False
# self.transport: DatagramTransport = None
def connection_made(self, transport: DatagramTransport): ...
async def handle_datagram(self, data: bytes, addr: Tuple[str, int]):
def datagram_received(self, data: bytes, addr: Tuple[str, int]): ...

View File

@@ -144,6 +144,10 @@ class Console:
rcon.console = self
self.rcon = rcon
@property
def legacy_mode(self):
return self.__legacy_mode
def __debug(self, *x):
self.__logger.debug(' '.join(x))
# if self.__is_debug:
@@ -213,12 +217,13 @@ class Console:
if v['f'] is func:
keys.append(k)
for key in keys:
self.__debug(f"Delete: key={key}")
self.__debug(f"{key=}")
self.__alias.pop(key)
self.__alias["man"].pop(key)
self.__func.pop(key)
self.__man.pop(key)
self.__desc.pop(key)
if keys:
self.__debug("Deleted.")
self.completer.load(self.__alias)

View File

@@ -38,6 +38,8 @@ class EventsSystem:
"onChangePosition": [], # Only sync, no handler
"onPlayerDisconnect": [], # No handler
"onServerStopped": [], # No handler
"onCarSpawned": [], # No handler
"onCarDeleted": [], # No handler
"serverTick": [],
"serverTick_0.5s": [],
"serverTick_1s": [],
@@ -60,8 +62,10 @@ class EventsSystem:
"onCarReset": [],
"onCarChanged": [],
"onCarFocusMove": [],
"onPlayerDisconnect": [],
"onServerStopped": [],
"onPlayerDisconnect": [],
"onCarSpawned": [],
"onCarDeleted": [],
"serverTick": [],
"serverTick_0.5s": [],
"serverTick_1s": [],
@@ -96,17 +100,34 @@ class EventsSystem:
self.log.debug("used builtins_hook")
builtins.ev = self
def unregister(self, func):
self.log.debug(f"unregister {func}")
def unregister_by_id(self, _id):
self.log.debug(f"unregister_by_id '{_id}'")
if not isinstance(_id, int):
return
s = a = 0
for k, funcs in self.__events.items():
for f in funcs:
if f is func:
if id(f) == _id:
s += 1
self.__events[k].remove(f)
for k, funcs in self.__async_events.items():
for f in funcs:
if id(f) == _id:
a += 1
self.__async_events[k].remove(f)
self.log.debug(f"unregister in {s + a} events; S:{s}; A:{a};")
def unregister(self, func):
self.log.debug(f"unregister '{func.__name__}' id: {id(func)}")
s = a = 0
for k, funcs in self.__events.items():
for f in funcs:
if f == func:
s += 1
self.__events[k].remove(func)
for k, funcs in self.__async_events.items():
for f in funcs:
if f is func:
if f == func:
a += 1
self.__async_events[k].remove(func)
self.log.debug(f"unregister in {s + a} events; S:{s}; A:{a};")
@@ -116,8 +137,8 @@ class EventsSystem:
event_name in self.__events.keys() or
event_name in self.__lua_events.keys())
def register(self, event_name, event_func, async_event=False, lua=None):
self.log.debug(f"register(event_name='{event_name}', event_func='{event_func}', "
def register(self, event_name, event_func, async_event=False, lua=None, return_id=True):
self.log.debug(f"register(event_name='{event_name}', event_func='{event_func.__name__}'(id: {id(event_func)}), "
f"async_event={async_event}, lua_event={lua}):")
if lua:
if event_name not in self.__lua_events:
@@ -140,6 +161,8 @@ class EventsSystem:
self.__events[event_name] = []
self.__events[event_name].append(event_func)
self.log.debug("Register ok")
if return_id:
return id(event_func)
async def call_as_events(self, *args, **kwargs):
return await self.call_async_event(*args, **kwargs) + self.call_event(*args, **kwargs)

View File

@@ -3,12 +3,17 @@
```python
class EventsSystem:
@staticmethod
def register(event_name, event_func, async_event: bool = False, lua: bool | object = None): ...
def unregister_by_id(_id: int) -> None: ...
@staticmethod
async def call_async_event(event_name, *args, **kwargs) -> list[Any]: ...
def unregister(func: Callable | Awaitable): ...
@staticmethod
def call_event(event_name, *data, **kwargs) -> list[Any]: ...
def register(event_name: str, event_func: Callable | Awaitable, async_event: bool = False, lua: bool | object = None) -> None | int: ...
@staticmethod
def call_lua_event(event_name, *data) -> list[Any]: ...
class ev(EventsSystem): ...
async def call_as_events(event_name: str, *args, **kwargs) -> list[Any]: ...
@staticmethod
async def call_async_event(event_name: str, *args, **kwargs) -> list[Any]: ...
@staticmethod
def call_event(event_name: str, *data, **kwargs) -> list[Any]: ...
@staticmethod
def call_lua_event(event_name: str, *data) -> list[Any]: ...
```

View File

@@ -11,12 +11,22 @@ import inspect
import os
import subprocess
import sys
import textwrap
import time
import types
from contextlib import contextmanager
from pathlib import Path
from threading import Thread
from prompt_toolkit import PromptSession, HTML
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.history import FileHistory
from prompt_toolkit.lexers import PygmentsLexer
try:
from pygments.lexers.python import Python3Lexer
except ImportError:
print("ImportError: Python3Lexer")
exit(1)
from core import get_logger
@@ -67,13 +77,13 @@ class KuiToi:
def register(self, event_name, event_func):
self.log.debug(f"Registering event {event_name}")
self.__funcs.append(event_func)
ev.register(event_name, event_func)
_id = ev.register(event_name, event_func)
self.__funcs.append(_id)
def _unload(self):
for f in self.__funcs:
console.del_command(f)
ev.unregister(f)
ev.unregister_by_id(f)
def call_event(self, event_name, *args, **kwargs):
self.log.debug(f"Called event {event_name}")
@@ -121,17 +131,88 @@ class PluginsLoader:
self.plugins_dir = plugins_dir
self.log = get_logger("PluginsLoader")
self.loaded = []
self.pl_completer = Completer({})
self.pl_files_completer = Completer({})
self._scan_dir(None)
ev.register("serverTick_5s", self._scan_dir)
ev.register("_plugins_start", self.start)
ev.register("_plugins_unload", self.unload)
ev.register("_plugins_get", lambda _: "Plugins: " + ", ".join(f"{i[0]}:{'on' if i[1] else 'off'}" for i in self.loaded))
console.add_command("plugins", self._parse_console, None, "Plugins manipulations", {"plugins": {"reload", "load", "unload", "list"}})
console.add_command("pl", lambda _: ev.call_event("_plugins_get")[0])
ev.register("_plugins_get",
lambda _: "Plugins: " + ", ".join(f"{i[0]}:{'on' if i[1] else 'off'}" for i in self.loaded))
console.add_command("plugins", self._parse_console, None, "Plugins manipulations",
{"plugins": {
"reload": self.pl_completer,
"load": self.pl_files_completer,
"unload": self.pl_completer,
"list": None,
}})
console.add_command("plugin", self._plugin_console, None, "plugin console", {"plugin": self.pl_completer})
sys.path.append(self._pip_dir)
os.makedirs(self._pip_dir, exist_ok=True)
console.add_command("install", self._pip_install)
def _scan_dir(self, _):
_load = {}
for file in os.listdir(self.plugins_dir):
file_path = os.path.join(self.plugins_dir, file)
if os.path.isfile(file_path) and file.endswith(".py"):
_load[file] = None
self.pl_files_completer.load(_load)
async def _plugin_console(self, x):
usage = 'Usage: plugin <name>'
if not x:
return usage
if x[0] in self.plugins:
plugin = self.plugins[x[0]]['plugin']
kt: KuiToi = plugin.kt
work = True
session = None
if not console.legacy_mode:
session = PromptSession(history=FileHistory(f'{kt.dir}/.cmdhistory'), lexer=PygmentsLexer(Python3Lexer))
def bottom_toolbar():
x = lambda x: f'<b><style bg="ansired">{x}</style></b>'
c = lambda c: f'<style fg="#b3d6f4">{x(c)}</style>'
return HTML(f'[PluginConsole KuiToi@{x(kt.name)}] {c("^D")} Return to the main console {c("^C")} Exit ')
while work:
try:
if session:
inp = await session.prompt_async(">> ", auto_suggest=AutoSuggestFromHistory(), bottom_toolbar=bottom_toolbar)
else:
inp = input(f"@{kt.name} > ")
self.log.debug(f"[_plugin_console] {inp=}")
if inp == "exit":
return "Exited"
if not inp:
continue
if inp.split(' ')[0] in ['import', 'from']:
kt.log.warning("Imports not allowed here... Sorry bro.")
continue
code = textwrap.dedent(f"""\
async def _console():
try:
i = {inp}
if i:
print(f"{{i!r}}")
except Exception as e:
kt.log.exception(e)""")
exec(code, plugin.__dict__)
kt.log.debug(await plugin._console())
except SyntaxError as e:
kt.log.error(f"SyntaxError: {e.msg}")
except EOFError:
return
except KeyboardInterrupt as e:
raise e
except UnicodeDecodeError as e:
raise e
except Exception as e:
kt.log.exception(e)
return "Plugin not found"
async def _parse_console(self, x):
usage = 'Usage: plugin [reload <name> | load <file.py> | unload <name> | list]'
usage = 'Usage: plugins [reload <name> | load <file.py> | unload <name> | list]'
if not x:
return usage
match x[0]:
@@ -244,6 +325,7 @@ class PluginsLoader:
th = Thread(target=plugin.load, name=f"{pl_name}.load()")
th.start()
th.join()
self.pl_completer.options[pl_name] = None
self.loaded.append((pl_name, True))
self.log.debug(f"Plugin loaded: {file}. Settings: {self.plugins[pl_name]}")
return pl_name
@@ -283,22 +365,20 @@ class PluginsLoader:
async def start(self, _):
for pl_name, pl_data in self.plugins.items():
try:
func = pl_data['start']['func']
if pl_data['start']['async']:
self.log.debug(f"Start async plugin: {pl_name}")
t = self.loop.create_task(pl_data['start']['func']())
self.plugins_tasks.append(t)
t = self.loop.create_task(func())
else:
self.log.debug(f"Start sync plugin: {pl_name}")
th = Thread(target=pl_data['start']['func'], name=f"Thread {pl_name}")
th.start()
self.plugins_tasks.append(th)
t = self.loop.create_task(asyncio.to_thread(func))
self.plugins_tasks.append(t)
except Exception as e:
self.log.exception(e)
async def unload(self, _):
t = []
for n in self.plugins.keys():
await asyncio.sleep(0.01)
t.append(self._unload_by_name(n))
self.log.debug(await asyncio.gather(*t))
self.log.debug("Plugins unloaded")