From 7bda3dce290ae8d2396d92355720a5413b707bec Mon Sep 17 00:00:00 2001 From: SantaSpeen Date: Fri, 14 Jul 2023 19:13:13 +0300 Subject: [PATCH] core.py: Change default cid to -1 for auth; Change client logger; Update logic of client.kick(); Remove "kostil" :))); Add client._split_load().....(5h); DO SendFile - sending mods.; Move sync_resources back; Add client.ready for blocking "alive" info; Refactor client.remove_me(); Add name for web Thread; Minor fixes; tcp_server.py: From auth always return client; Add events call in auth; Refactor set_down_rw; Minor fixes; --- src/core/core.py | 190 +++++++++++++++++++++++++++++++---------- src/core/core.pyi | 5 +- src/core/tcp_server.py | 41 ++++----- 3 files changed, 169 insertions(+), 67 deletions(-) diff --git a/src/core/core.py b/src/core/core.py index f36f4dd..ec4ef1e 100644 --- a/src/core/core.py +++ b/src/core/core.py @@ -5,6 +5,7 @@ # Licence: FPA # (c) kuitoi.su 2023 import asyncio +import math import os import random import zlib @@ -29,15 +30,16 @@ class Client: self.addr = writer.get_extra_info("sockname") self.loop = asyncio.get_event_loop() self.Core = core - self.cid = 0 + self.cid = -1 self.key = None self.nick = None self.roles = None self.guest = True self.alive = True + self.ready = False def _update_logger(self): - self.log = utils.get_logger(f"client({self.nick}:{self.cid})") + self.log = utils.get_logger(f"{self.nick}:{self.cid})") self.log.debug(f"Update logger") def is_disconnected(self): @@ -54,10 +56,13 @@ class Client: return False async def kick(self, reason): - self.log.info(f"Client: \"IP: {self.addr!r}; ID: {self.cid}\" - kicked with reason: \"{reason}\"") + if not self.alive: + self.log.debug(f"Kick({reason}) skipped;") + return + self.log.info(f"Kicked with reason: \"{reason}\"") await self.tcp_send(b"K" + bytes(reason, "utf-8")) self.alive = False - await self.remove_me() + # await self.remove_me() async def tcp_send(self, data, to_all=False, writer=None): @@ -81,18 +86,15 @@ class Client: if len(data) == 10: data += b"." header = len(data).to_bytes(4, "little", signed=True) - self.log.debug(f'len(data) {len(data)}; send {header + data}') + self.log.debug(f'len: {len(data)}; send: {header + data}') try: writer.write(header + data) await writer.drain() except ConnectionError: - self.log.debug('Disconnected') + self.log.debug('tcp_send: Disconnected') self.alive = False - async def recv(self, kostil=False): - # if not self.is_disconnected(): - # self.log.debug(f"Client with {self.nick}({self.cid}) disconnected") - # return b"" + async def recv(self): try: header = await self.reader.read(4) # header: 4 bytes @@ -102,19 +104,15 @@ class Client: if int_header <= 0: await asyncio.sleep(0.1) - if not self.alive: - self.log.debug(f"Disconnected") - self.writer.close() - return b'' - if kostil: - return - self.log.debug(f"Header: {header}") - await self.kick("Invalid packet - header negative") + self.is_disconnected() + if self.alive: + self.log.debug(f"Header: {header}") + await self.kick("Invalid packet - header negative") return b"" if int_header > 100 * MB: await self.kick("Header size limit exceeded") - self.log.warn(f"Client {self.nick}({self.cid}) sent header of >100MB - " + self.log.warn(f"Client {self.nick}:{self.cid} sent header of >100MB - " f"assuming malicious intent and disconnecting the client.") return b"" @@ -132,36 +130,130 @@ class Client: return data except ConnectionError: self.alive = False - await self.remove_me() return b"" + async def _split_load(self, start, end, d_sock, filename): + real_size = end - start + writer = self.down_rw[1] if d_sock else self.writer + who = 'dwn' if d_sock else 'srv' + self.log.debug(f"[{who}] Real size: {real_size/MB}mb; {real_size == end}, {real_size*2 == end}") + + with open(filename, 'rb') as f: + f.seek(start) + data = f.read(end) + try: + writer.write(data) + await writer.drain() + self.log.debug(f"[{who}] File sent.") + except ConnectionError: + self.alive = False + self.log.debug(f"[{who}] Disconnected.") + # break + return real_size + + # chunk_size = 125 * MB + # if chunk_size > real_size: + # chunk_size = real_size + # chunks = math.floor(real_size / chunk_size) + # self.log.debug(f"[{who}] s:{start}, e:{end}, c:{chunks}, cz:{chunk_size/MB}mb, rs:{real_size/MB}mb") + # dw = 0 + # for chunk in range(1, chunks + 1): + # chunk_end = start + (chunk_size * chunk) + # chunk_start = chunk_end - chunk_size + # # if chunk_start != 0: + # # chunk_start -= 1 + # real_size -= chunk_size + # if chunk_size > real_size: + # chunk_end = real_size + # self.log.debug(f"[{who}] Chunk: {chunk}; Start: {chunk_start}; End: {chunk_end/MB};") + # with open(filename, 'rb') as f: + # f.seek(chunk_start) + # data = f.read(chunk_end) + # try: + # writer.write(data) + # await writer.drain() + # except ConnectionError: + # self.alive = False + # self.log.debug(f"[{who}] Disconnected") + # break + # dw += len(data) + # del data + # self.log.debug(f"[{who}] File sent.") + # return dw + async def sync_resources(self): - while True: + while self.alive: data = await self.recv() + self.log.debug(f"data: {data!r}") if data.startswith(b"f"): - # TODO: SendFile file = data[1:].decode("utf-8") self.log.debug(f"Sending File: {file}") - await self.kick(f"TODO: SendFile({file})") - elif data.startswith(b"SR"): - self.log.debug("Sending Mod Info") - mod_list = '%s;%s;' + size = -1 for mod in self.Core.mods_list: if type(mod) == int: continue - mod_list = (mod_list % (mod['path'], mod['size'])).replace(";", ";%s;") - mod_list = mod_list.replace("%s;", "") + if mod.get('path') == file: + size = mod['size'] + self.log.debug("File is accept.") + break + if size == -1: + await self.tcp_send(b"CO") + await self.kick(f"Not allowed mod: " + file) + return + await self.tcp_send(b"AG") + t = 0 + while not self.down_rw[0]: + await asyncio.sleep(0.1) + t += 1 + if t > 50: + await self.kick("Missing download socket") + return + self.log.info(f"Requested mode: {file!r}") + self.log.debug(f"Mode size: {size/MB}") + + msize = math.floor(size / 2) + # uploads = [ + # asyncio.create_task(self._split_load(0, msize, False, file)), # SplitLoad_0 + # asyncio.create_task(self._split_load(msize, size, True, file)) # SplitLoad_1 + # ] + # await asyncio.wait(uploads) + uploads = [ + self._split_load(0, msize, False, file), + self._split_load(msize, size, True, file) + ] + sl0, sl1 = await asyncio.gather(*uploads) + sent = sl0 + sl1 + ok = sent == size + lost = size - sent + self.log.debug(f"SplitLoad_0: {sl0}; SplitLoad_1: {sl1}; At all ({ok}): Sent: {sent}; Lost: {lost}") + self.log.debug(f"SplitLoad_0: {sl0/MB}mb; " + f"SplitLoad_1: {sl1/MB}MB; At all ({ok}): Sent: {sent/MB}mb; Lost: {lost/MB}mb") + if not ok: + self.alive = False + self.log.error(f"Error while sending.") + return + elif data.startswith(b"SR"): + path_list = '' + size_list = '' + for mod in self.Core.mods_list: + if type(mod) == int: + continue + path_list += f"{mod['path']};" + size_list += f"{mod['size']};" + mod_list = path_list + size_list + self.log.debug(f"Mods List: {mod_list}") if len(mod_list) == 0: await self.tcp_send(b"-") else: await self.tcp_send(bytes(mod_list, "utf-8")) - data = await self.recv() - if data == b"Done": + elif data == b"Done": await self.tcp_send(b"M/levels/" + bytes(config.Game['map'], 'utf-8') + b"/info.json") break + return async def looper(self): - # self.is_disconnected() + await self.tcp_send(b"P" + bytes(f"{self.cid}", "utf-8")) # Send clientID + await self.sync_resources() while self.alive: data = await self.recv() if data == b"": @@ -169,29 +261,38 @@ class Client: break else: await asyncio.sleep(.2) + self.is_disconnected() continue code = data.decode()[0] self.log.debug(f"Received code: {code}, data: {data}") match code: case "H": # Client connected + self.ready = True await self.tcp_send(b"Sn" + bytes(self.nick, "utf-8"), to_all=True) case "C": # Chat await self.tcp_send(data, to_all=True) async def remove_me(self): - self.log.debug(f"Removing client {self.nick}({self.cid})") await asyncio.sleep(0.3) - if not self.writer.is_closing(): - self.writer.close() - if (self.cid > 0 or self.nick is not None) and self.Core.clients_by_id.get(self.cid): - _, down_w = self.down_rw - if down_w and not down_w.is_closing(): - down_w.close() + self.alive = False + if (self.cid > 0 or self.nick is not None) and \ + self.Core.clients_by_nick.get(self.nick): + # if self.ready: + # await self.tcp_send(b"", to_all=True) # I'm disconnected. + self.log.debug(f"Removing client {self.nick}:{self.cid}") + self.log.info("Disconnected") self.Core.clients[self.cid] = None self.Core.clients_by_id.pop(self.cid) self.Core.clients_by_nick.pop(self.nick) + else: + self.log.debug(f"Removing client; Closing connection...") + if not self.writer.is_closing(): + self.writer.close() + _, down_w = self.down_rw + if down_w and not down_w.is_closing(): + down_w.close() class Core: @@ -218,7 +319,7 @@ class Core: self.BeamMP_version = "3.2.0" def get_client(self, cid=None, nick=None): - if cid: + if cid is not None: return self.clients_by_id.get(cid) if nick: return self.clients_by_nick.get(nick) @@ -237,9 +338,10 @@ class Core: if not self.clients[cid]: client.cid = cid self.clients_by_nick.update({client.nick: client}) - self.log.debug(f"Inserting client: id{client.cid}") + self.log.debug(f"Inserting client: {client.nick}:{client.cid}") self.clients_by_id.update({client.cid: client}) self.clients[client.cid] = client + # noinspection PyProtectedMember client._update_logger() return await self.insert_client(client) @@ -270,11 +372,13 @@ class Core: for client in self.clients: if not client: continue + if not client.ready: + client.is_disconnected() + continue await client.tcp_send(bytes(ca, "utf-8")) @staticmethod def start_web(): - global uvserver uvconfig = uvicorn.Config("modules.WebAPISystem.app:web_app", host=config.WebAPI["server_ip"], port=config.WebAPI["server_port"], @@ -318,7 +422,6 @@ class Core: # Sentry? ok = False body = {} - code = 0 for server_url in BEAM_backend: url = "https://" + server_url + "/heartbeat" try: @@ -380,10 +483,11 @@ class Core: # WebApi Start if config.WebAPI["enabled"]: self.log.debug("Initializing WebAPI...") - web_thread = Thread(target=self.start_web) + web_thread = Thread(target=self.start_web, name="WebApiThread") web_thread.start() self.log.debug(f"WebAPI started at new thread: {web_thread.name}") self.web_thread = web_thread + # noinspection PyProtectedMember self.web_stop = webapp._stop await asyncio.sleep(.3) diff --git a/src/core/core.pyi b/src/core/core.pyi index 3af42f4..8b07224 100644 --- a/src/core/core.pyi +++ b/src/core/core.pyi @@ -24,17 +24,19 @@ class Client: self.addr = writer.get_extra_info("sockname") self.loop = asyncio.get_event_loop() self.Core = core - self.cid: int = 0 + self.cid: int = -1 self.key: str = None self.nick: str = None self.roles: str = None self.guest = True self.alive = True + self.ready = False def is_disconnected(self) -> bool: ... async def kick(self, reason: str) -> None: ... async def tcp_send(self, data: bytes, to_all:bool = False, writer: StreamWriter = None) -> None: ... async def sync_resources(self) -> None: ... async def recv(self) -> bytes: ... + async def _split_load(self, start: int, end: int, d_sock: bool, filename: str) -> None: ... async def looper(self) -> None: ... def _update_logger(self) -> None: ... async def remove_me(self) -> None: ... @@ -59,6 +61,7 @@ class Core: self.web_stop: Callable = lambda: None self.client_major_version = "2.0" self.BeamMP_version = "3.2.0" + def get_client(self, cid=None, nick=None) -> Client | None: ... async def insert_client(self, client: Client) -> None: ... def create_client(self, *args, **kwargs) -> Client: ... def get_clients_list(self, need_cid=False) -> str: ... diff --git a/src/core/tcp_server.py b/src/core/tcp_server.py index c5365f5..6dd4952 100644 --- a/src/core/tcp_server.py +++ b/src/core/tcp_server.py @@ -6,8 +6,6 @@ # (c) kuitoi.su 2023 import asyncio import traceback -from asyncio import AbstractEventLoop -from threading import Thread import aiohttp @@ -27,19 +25,20 @@ class TCPServer: client = self.Core.create_client(reader, writer) self.log.info(f"Identifying new ClientConnection...") data = await client.recv() - self.log.debug(f"recv1 data: {data}") + self.log.debug(f"Version: {data}") if data.decode("utf-8") != f"VC{self.Core.client_major_version}": await client.kick("Outdated Version.") - return False, None + return False, client else: await client.tcp_send(b"S") # Accepted client version data = await client.recv() - self.log.debug(f"recv2 data: {data}") + self.log.debug(f"Key: {data}") if len(data) > 50: await client.kick("Invalid Key (too long)!") - return False, None + return False, client client.key = data.decode("utf-8") + ev.call_event("auth_sent_key", client) try: async with aiohttp.ClientSession() as session: url = 'https://auth.beammp.com/pkToUser' @@ -48,32 +47,31 @@ class TCPServer: self.log.debug(f"res: {res}") if res.get("error"): await client.kick('Invalid key! Please restart your game.') - return False, None + return False, client client.nick = res["username"] client.roles = res["roles"] client.guest = res["guest"] + # noinspection PyProtectedMember client._update_logger() except Exception as e: self.log.error(f"Auth error: {e}") await client.kick('Invalid authentication data! Try to reconnect in 5 minutes.') - return False, None + return False, client for _client in self.Core.clients: if not _client: continue if _client.nick == client.nick and _client.guest == client.guest: await client.kick('Stale Client (replaced by new client)') - return False, None + return False, client - ev.call_event("on_auth", client) + ev.call_event("auth_ok", client) if len(self.Core.clients_by_id) > config.Game["players"]: await client.kick("Server full!") - return False, None + return False, client else: self.log.info("Identification success") - await client.tcp_send(b"P" + bytes(f"{client.cid}", "utf-8")) - await client.sync_resources() await self.Core.insert_client(client) return True, client @@ -81,17 +79,13 @@ class TCPServer: async def set_down_rw(self, reader, writer): try: cid = (await reader.read(1))[0] - ok = False - for _client in self.Core.clients: - if not _client: - continue - if _client.cid == cid: - _client.down_rw = (reader, writer) - ok = True - self.log.debug(f"Client: {_client.nick}:{cid} - HandleDownload!") - if not ok: + client = self.Core.get_client(cid=cid) + if client: + client.down_rw = (reader, writer) + self.log.debug(f"Client: {client.nick}:{cid} - HandleDownload!") + else: writer.close() - self.log.debug(f"Unknown client - HandleDownload") + self.log.debug(f"Unknown client id:{cid} - HandleDownload") finally: return @@ -126,6 +120,7 @@ class TCPServer: _, cl = await self.handle_code(code, reader, writer) if cl: await cl.remove_me() + del cl break except Exception as e: self.log.error("Error while connecting..")