diff --git a/src/core/__init__.py b/src/core/__init__.py index 57489f0..ac92989 100644 --- a/src/core/__init__.py +++ b/src/core/__init__.py @@ -10,8 +10,8 @@ __title__ = 'KuiToi-Server' __description__ = 'BeamingDrive Multiplayer server compatible with BeamMP clients.' __url__ = 'https://github.com/kuitoi/kuitoi-Server' -__version__ = '0.2.0' -__build__ = 776 +__version__ = '0.2.1' +__build__ = 874 __author__ = 'SantaSpeen' __author_email__ = 'admin@kuitoi.su' __license__ = "FPA" diff --git a/src/core/core.py b/src/core/core.py index e00e21d..c4ca959 100644 --- a/src/core/core.py +++ b/src/core/core.py @@ -6,6 +6,7 @@ # (c) kuitoi.su 2023 import asyncio import os +import random import zlib from threading import Thread @@ -36,8 +37,8 @@ class Client: self.alive = True def _update_logger(self): - self.log.debug(f"Update logger") self.log = utils.get_logger(f"client({self.nick}:{self.cid})") + self.log.debug(f"Update logger") def is_disconnected(self): if not self.alive: @@ -55,11 +56,10 @@ class Client: async def kick(self, reason): self.log.info(f"Client: \"IP: {self.addr!r}; ID: {self.cid}\" - kicked with reason: \"{reason}\"") await self.tcp_send(b"K" + bytes(reason, "utf-8")) - # self.writer.close() - # await self.writer.wait_closed() self.alive = False + await self.remove_me() - async def tcp_send(self, data): + async def tcp_send(self, data, to_all=False, writer=None): # TNetwork.cpp; Line: 383 # BeamMP TCP protocol sends a header of 4 bytes, followed by the data. @@ -67,63 +67,98 @@ class Client: # ^------^^---...-^ # size data + if writer is None: + writer = self.writer + + if to_all: + for client in self.Core.clients: + if not client: + continue + await client.tcp_send(data) + return + self.log.debug(f"tcp_send({data})") if len(data) == 10: data += b"." header = len(data).to_bytes(4, "little", signed=True) self.log.debug(f'len(data) {len(data)}; send {header + data}') - self.writer.write(header + data) - await self.writer.drain() + try: + writer.write(header + data) + await writer.drain() + except ConnectionError: + self.log.debug('Disconnected') + self.alive = False - async def recv(self): + async def recv(self, kostil=False): # if not self.is_disconnected(): # self.log.debug(f"Client with {self.nick}({self.cid}) disconnected") # return b"" - header = await self.reader.read(4) # header: 4 bytes + try: + header = await self.reader.read(4) # header: 4 bytes - int_header = 0 - for i in range(len(header)): - int_header += header[i] + int_header = 0 + for i in range(len(header)): + int_header += header[i] - if int_header <= 0: - await self.kick("Invalid packet - header negative") - return b"" + if int_header <= 0: + await asyncio.sleep(0.1) + if not self.alive: + self.log.debug(f"Disconnected") + self.writer.close() + return b'' + if kostil: + return + self.log.debug(f"Header: {header}") + await self.kick("Invalid packet - header negative") + return b"" - if int_header > 100 * MB: - await self.kick("Header size limit exceeded") - self.log.warn(f"Client {self.nick}({self.cid}) sent header of >100MB - " - f"assuming malicious intent and disconnecting the client.") - return b"" + if int_header > 100 * MB: + await self.kick("Header size limit exceeded") + self.log.warn(f"Client {self.nick}({self.cid}) sent header of >100MB - " + f"assuming malicious intent and disconnecting the client.") + return b"" - data = await self.reader.read(101 * MB) - self.log.debug(f"header: `{header}`; int_header: `{int_header}`; data: `{data}`;") + data = await self.reader.read(100 * MB) + self.log.debug(f"header: `{header}`; int_header: `{int_header}`; data: `{data}`;") - if len(data) != int_header: - self.log.debug(f"WARN Expected to read {int_header} bytes, instead got {len(data)}") + if len(data) != int_header: + self.log.debug(f"WARN Expected to read {int_header} bytes, instead got {len(data)}") - abg = b"ABG:" - if len(data) > len(abg) and data.startswith(abg): - data = zlib.decompress(data[len(abg):]) - self.log.debug(f"ABG: {data}") + abg = b"ABG:" + if len(data) > len(abg) and data.startswith(abg): + data = zlib.decompress(data[len(abg):]) + self.log.debug(f"ABG: {data}") + return data return data - return data + except ConnectionError: + self.alive = False + await self.remove_me() + return b"" async def sync_resources(self): while True: data = await self.recv() + if not data: + await asyncio.sleep(.1) + continue + self.log.debug(f"Received: {data}") if data.startswith(b"f"): # TODO: SendFile - pass + file = data[1:].decode("utf-8") + self.log.debug(f"Sending File: {file}") + await self.kick(f"TODO: SendFile({file})") elif data.startswith(b"SR"): - # TODO: Create mods list self.log.debug("Sending Mod Info") - mods = [] - mod_list = b'' - # * code * - if len(mods) == 0: + mod_list = '%s;%s;' + for mod in self.Core.mods_list: + if type(mod) == int: + continue + mod_list = (mod_list % (mod['path'], mod['size'])).replace(";", ";%s;") + mod_list = mod_list.replace("%s;", "") + if len(mod_list) == 0: await self.tcp_send(b"-") else: - await self.tcp_send(mod_list) + await self.tcp_send(bytes(mod_list, "utf-8")) data = await self.recv() if data == b"Done": await self.tcp_send(b"M/levels/" + bytes(config.Game['map'], 'utf-8') + b"/info.json") @@ -131,7 +166,6 @@ class Client: async def looper(self): # self.is_disconnected() - self.log.debug(f"Alive: {self.alive}") await self.tcp_send(b"P" + bytes(f"{self.cid}", "utf-8")) await self.sync_resources() while self.alive: @@ -139,19 +173,31 @@ class Client: if data == b"": if not self.alive: break - elif self.is_disconnected(): - break else: + await asyncio.sleep(.2) continue code = data.decode()[0] - self.log.debug(f"Code: {code}, data: {data}") + self.log.debug(f"Received code: {code}, data: {data}") match code: case "H": # Client connected - await self.tcp_send(b"Sn" + bytes(self.nick, "utf-8")) + await self.tcp_send(b"Sn" + bytes(self.nick, "utf-8"), to_all=True) case "C": # Chat - await self.tcp_send(data) + await self.tcp_send(data, to_all=True) + + async def remove_me(self): + self.log.debug(f"Removing client {self.nick}({self.cid})") + await asyncio.sleep(0.3) + if not self.writer.is_closing(): + self.writer.close() + if (self.cid > 0 or self.nick is not None) and self.Core.clients_by_id.get(self.cid): + _, down_w = self.down_rw + if down_w and not down_w.is_closing(): + down_w.close() + self.Core.clients[self.cid] = None + self.Core.clients_by_id.pop(self.cid) + self.Core.clients_by_nick.pop(self.nick) class Core: @@ -177,40 +223,60 @@ class Core: self.client_major_version = "2.0" self.BeamMP_version = "3.2.0" - def get_client(self, sock=None, cid=None, nick=None): + def get_client(self, cid=None, nick=None): if cid: return self.clients_by_id.get(cid) if nick: return self.clients_by_nick.get(nick) - if sock: - return self.clients_by_nick.get(sock.getsockname()) - def insert_client(self, client): - self.log.debug(f"Inserting client: {client.cid}") - self.clients_by_nick.update({client.nick: client}) - self.clients_by_id.update({client.cid: client}) - self.clients[client.cid] = client - - def create_client(self, *args, **kwargs): - client = Client(*args, **kwargs) - cid = 1 - for client in self.clients: - if client.cid == cid: + async def insert_client(self, client): + await asyncio.sleep(random.randint(3, 9) * 0.01) + cid = 0 + for _client in self.clients: + if not _client: + break + if _client.cid == cid: cid += 1 else: break - client.cid = cid - client._update_logger() - self.log.debug(f"Create client; client.cid: {client.cid};") + await asyncio.sleep(random.randint(3, 9) * 0.01) + if not self.clients[cid]: + client.cid = cid + self.clients_by_nick.update({client.nick: client}) + self.log.debug(f"Inserting client: id{client.cid}") + self.clients_by_id.update({client.cid: client}) + self.clients[client.cid] = client + client._update_logger() + return + await self.insert_client(client) + + def create_client(self, *args, **kwargs): + self.log.debug(f"Create client") + client = Client(core=self, *args, **kwargs) return client - async def check_alive(self): - await asyncio.sleep(5) - self.log.debug(f"Checking if clients is alive") + def get_clients_list(self, need_cid=False): + out = "" for client in self.clients: - d = client.is_disconnected() - if d: - self.log.debug(f"Client ID: {client.cid} died...") + if not client: + continue + out += f"{client.nick}" + if need_cid: + out += f":{client.cid}" + out += "," + if out: + out = out[:-1] + return out + + async def check_alive(self): + maxp = config.Game['players'] + while self.run: + await asyncio.sleep(1) + ca = f"Ss{len(self.clients_by_id)}/{maxp}:{self.get_clients_list()}" + for client in self.clients: + if not client: + continue + await client.tcp_send(bytes(ca, "utf-8")) @staticmethod def start_web(): @@ -223,10 +289,10 @@ class Core: webapp.uvserver = uvserver uvserver.run() - @staticmethod - async def stop_me(): + async def stop_me(self): while webapp.data_run[0]: await asyncio.sleep(1) + self.run = False raise KeyboardInterrupt # noinspection SpellCheckingInspection,PyPep8Naming @@ -246,69 +312,76 @@ class Core: modstotalsize = self.mods_list[0] modstotal = len(self.mods_list) - 1 while self.run: - data = {"uuid": config.Auth["key"], "players": len(self.clients), "maxplayers": config.Game["players"], - "port": config.Server["server_port"], "map": f"/levels/{config.Game['map']}/info.json", - "private": config.Auth['private'], "version": self.BeamMP_version, - "clientversion": self.client_major_version, - "name": config.Server["name"], "modlist": modlist, "modstotalsize": modstotalsize, - "modstotal": modstotal, "playerslist": "", "desc": config.Server['description'], "pass": False} - self.log.debug(f"Auth: data {data}") + try: + data = {"uuid": config.Auth["key"], "players": len(self.clients), "maxplayers": config.Game["players"], + "port": config.Server["server_port"], "map": f"/levels/{config.Game['map']}/info.json", + "private": config.Auth['private'], "version": self.BeamMP_version, + "clientversion": self.client_major_version, + "name": config.Server["name"], "modlist": modlist, "modstotalsize": modstotalsize, + "modstotal": modstotal, "playerslist": "", "desc": config.Server['description'], "pass": False} + self.log.debug(f"Auth: data {data}") - # Sentry? - ok = False - body = {} - code = 0 - for server_url in BEAM_backend: - url = "https://" + server_url + "/heartbeat" - try: - async with aiohttp.ClientSession() as session: - async with session.post(url, data=data, headers={"api-v": "2"}) as response: - code = response.status - body = await response.json() - self.log.debug(f"Auth: code {code}, body {body}") - ok = True - break - except Exception as e: - self.log.debug(f"Auth: Error `{e}` while auth with `{server_url}`") - continue + # Sentry? + ok = False + body = {} + code = 0 + for server_url in BEAM_backend: + url = "https://" + server_url + "/heartbeat" + try: + async with aiohttp.ClientSession() as session: + async with session.post(url, data=data, headers={"api-v": "2"}) as response: + code = response.status + body = await response.json() + self.log.debug(f"Auth: code {code}, body {body}") + ok = True + break + except Exception as e: + self.log.debug(f"Auth: Error `{e}` while auth with `{server_url}`") + continue - if ok: - if not (body.get("status") is not None and - body.get("code") is not None and - body.get("msg") is not None): - self.log.error("Missing/invalid json members in backend response") - raise KeyboardInterrupt + if ok: + if not (body.get("status") is not None and + body.get("code") is not None and + body.get("msg") is not None): + self.log.error("Missing/invalid json members in backend response") + raise KeyboardInterrupt - if test: - status = body.get("status") - msg = body.get("msg") - if status == "2000": - self.log.info(f"Authenticated! {msg}") - elif status == "200": - self.log.info(f"Resumed authenticated session. {msg}") - else: - self.log.error(f"Backend REFUSED the auth key. Reason: " - f"{msg or 'Backend did not provide a reason'}") + if test: + status = body.get("status") + msg = body.get("msg") + if status == "2000": + self.log.info(f"Authenticated! {msg}") + elif status == "200": + self.log.info(f"Resumed authenticated session. {msg}") + else: + self.log.error(f"Backend REFUSED the auth key. Reason: " + f"{msg or 'Backend did not provide a reason'}") + self.log.info(f"Server still runnig, but only in Direct connect mode.") + self.direct = True + else: + self.direct = True + if test: + self.log.error("Cannot auth...") + if not config.Auth['private']: + raise KeyboardInterrupt + if test: self.log.info(f"Server still runnig, but only in Direct connect mode.") - self.direct = True - else: - self.direct = True - if test: - self.log.error("Cannot auth...") - if not config.Auth['private']: - raise KeyboardInterrupt - if test: - self.log.info(f"Server still runnig, but only in Direct connect mode.") - if test: - return ok + if test: + return ok - await asyncio.sleep(5) + await asyncio.sleep(5) + except Exception as e: + self.log.error(f"Error in heartbeat: {e}") async def main(self): self.run = True self.tcp = self.tcp(self, self.server_ip, self.server_port) self.udp = self.udp(self, self.server_ip, self.server_port) + console.add_command( + "list", + lambda x: f"Players list: {self.get_clients_list(True)}" + ) try: # WebApi Start if config.WebAPI["enabled"]: @@ -336,9 +409,11 @@ class Core: self.log.info(f"Loaded {lmods} mods: {round(self.mods_list[0] / MB, 2)}mb") await self.heartbeat(True) + for i in range(int(config.Game["players"] * 1.3)): + self.clients.append(None) tasks = [] - # self.check_alive() - nrtasks = [self.tcp.start, self.udp.start, console.start, self.stop_me, self.heartbeat, ] + # self.udp.start, + nrtasks = [self.tcp.start, console.start, self.stop_me, self.heartbeat, self.check_alive] for task in nrtasks: tasks.append(asyncio.create_task(task())) t = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) @@ -354,7 +429,7 @@ class Core: pass finally: self.tcp.stop() - self.udp.stop() + # self.udp.stop() self.run = False def start(self): diff --git a/src/core/core.pyi b/src/core/core.pyi index e2fd4bc..3af42f4 100644 --- a/src/core/core.pyi +++ b/src/core/core.pyi @@ -24,7 +24,7 @@ class Client: self.addr = writer.get_extra_info("sockname") self.loop = asyncio.get_event_loop() self.Core = core - self.cid = 0 + self.cid: int = 0 self.key: str = None self.nick: str = None self.roles: str = None @@ -32,12 +32,12 @@ class Client: self.alive = True def is_disconnected(self) -> bool: ... async def kick(self, reason: str) -> None: ... - async def tcp_send(self, data: bytes) -> None: ... + async def tcp_send(self, data: bytes, to_all:bool = False, writer: StreamWriter = None) -> None: ... async def sync_resources(self) -> None: ... async def recv(self) -> bytes: ... async def looper(self) -> None: ... def _update_logger(self) -> None: ... - + async def remove_me(self) -> None: ... class Core: def __init__(self): @@ -45,7 +45,7 @@ class Core: self.loop = asyncio.get_event_loop() self.run = False self.direct = False - self.clients: List[Client]= [] + self.clients: List[Client | None]= [] self.clients_by_id: Dict[{int: Client}]= {} self.clients_by_nick: Dict[{str: Client}] = {} self.clients_counter: int = 0 @@ -59,13 +59,13 @@ class Core: self.web_stop: Callable = lambda: None self.client_major_version = "2.0" self.BeamMP_version = "3.2.0" - def insert_client(self, client: Client) -> None: ... + async def insert_client(self, client: Client) -> None: ... def create_client(self, *args, **kwargs) -> Client: ... + def get_clients_list(self, need_cid=False) -> str: ... async def check_alive(self) -> None: ... @staticmethod def start_web() -> None: ... - @staticmethod - def stop_me() -> None: ... + def stop_me(self) -> None: ... async def heartbeat(self, test=False) -> None: ... async def main(self) -> None: ... def start(self) -> None: ... diff --git a/src/core/tcp_server.py b/src/core/tcp_server.py index e97ae77..a4aa20b 100644 --- a/src/core/tcp_server.py +++ b/src/core/tcp_server.py @@ -6,6 +6,8 @@ # (c) kuitoi.su 2023 import asyncio import traceback +from asyncio import AbstractEventLoop +from threading import Thread import aiohttp @@ -15,10 +17,11 @@ from core import utils class TCPServer: def __init__(self, core, host, port): self.log = utils.get_logger("TCPServer") + self.loop = asyncio.get_event_loop() self.Core = core self.host = host self.port = port - self.loop = asyncio.get_event_loop() + self.run = False async def auth_client(self, reader, writer): client = self.Core.create_client(reader, writer) @@ -37,49 +40,58 @@ class TCPServer: await client.kick("Invalid Key (too long)!") return False, None client.key = data.decode("utf-8") - async with aiohttp.ClientSession() as session: - url = 'https://auth.beammp.com/pkToUser' - async with session.post(url, data={'key': client.key}) as response: - res = await response.json() - self.log.debug(f"res: {res}") try: + async with aiohttp.ClientSession() as session: + url = 'https://auth.beammp.com/pkToUser' + async with session.post(url, data={'key': client.key}) as response: + res = await response.json() + self.log.debug(f"res: {res}") if res.get("error"): await client.kick('Invalid key! Please restart your game.') - return + return False, None client.nick = res["username"] client.roles = res["roles"] client.guest = res["guest"] client._update_logger() except Exception as e: self.log.error(f"Auth error: {e}") - await client.kick('Invalid authentication data! Try to connect in 5 minutes.') + await client.kick('Invalid authentication data! Try to reconnect in 5 minutes.') + return False, None for _client in self.Core.clients: + if not _client: + continue if _client.nick == client.nick and _client.guest == client.guest: await client.kick('Stale Client (replaced by new client)') + return False, None ev.call_event("on_auth", client) - if len(self.Core.clients) > config.Game["players"]: + if len(self.Core.clients_by_id) > config.Game["players"]: await client.kick("Server full!") + return False, None else: self.log.info("Identification success") - self.Core.insert_client(client) + await self.Core.insert_client(client) return True, client async def set_down_rw(self, reader, writer): try: - cid = (await reader.read(1)).decode() # FIXME: wtf? 1 byte? - self.log.debug(f"Client: \"ID: {cid}\" - HandleDownload!") - if not cid.isdigit(): - return False + cid = (await reader.read(1))[0] + ok = False for _client in self.Core.clients: + if not _client: + continue if _client.cid == cid: _client.down_rw = (reader, writer) - return True + ok = True + self.log.debug(f"Client: {_client.nick}:{cid} - HandleDownload!") + if not ok: + writer.close() + self.log.debug(f"Unknown client - HandleDownload") finally: - return False + return async def handle_code(self, code, reader, writer): match code: @@ -87,17 +99,17 @@ class TCPServer: result, client = await self.auth_client(reader, writer) if result: await client.looper() - return True - return False + return result, client case "D": - return await self.set_down_rw(reader, writer) + await self.set_down_rw(reader, writer) case "P": writer.write(b"P") await writer.drain() - return True + writer.close() case _: self.log.error(f"Unknown code: {code}") - return False + writer.close() + return False, None async def handle_client(self, reader, writer): while True: @@ -107,28 +119,37 @@ class TCPServer: break code = data.decode() self.log.debug(f"Received {code!r} from {writer.get_extra_info('sockname')!r}") - result = await self.handle_code(code, reader, writer) - if not result: - break + # task = asyncio.create_task(self.handle_code(code, reader, writer)) + # await asyncio.wait([task], return_when=asyncio.FIRST_EXCEPTION) + _, cl = await self.handle_code(code, reader, writer) + if cl: + await cl.remove_me() + break except Exception as e: self.log.error("Error while connecting..") - self.log.error(f"Error: {e}") + self.log.exception(e) traceback.print_exc() break async def start(self): self.log.debug("Starting TCP server.") + self.run = True try: server = await asyncio.start_server(self.handle_client, self.host, self.port, backlog=int(config.Game["players"] * 1.3)) + self.log.debug(f"TCP server started on {server.sockets[0].getsockname()!r}") + while True: + async with server: + await server.serve_forever() except OSError as e: - self.log.error(f"Error: {e}") - self.Core.run = False + self.log.error("Cannot bind port") raise e - self.log.debug(f"TCP server started on {server.sockets[0].getsockname()!r}") - while True: - async with server: - await server.serve_forever() + except BaseException as e: + self.log.error(f"Error: {e}") + raise e + finally: + self.run = False + self.Core.run = False def stop(self): self.log.debug("Stopping TCP server") diff --git a/src/core/tcp_server.pyi b/src/core/tcp_server.pyi index df73094..4c768b9 100644 --- a/src/core/tcp_server.pyi +++ b/src/core/tcp_server.pyi @@ -15,13 +15,14 @@ from core.core import Client class TCPServer: def __init__(self, core: Core, host, port): self.log = utils.get_logger("TCPServer") + self.loop = asyncio.get_event_loop() self.Core = core self.host = host self.port = port - self.loop = asyncio.get_event_loop() + self.run = False async def auth_client(self, reader: StreamReader, writer: StreamWriter) -> Tuple[bool, Client]: ... async def set_down_rw(self, reader: StreamReader, writer: StreamWriter) -> bool: ... - async def handle_code(self, code: str, reader: StreamReader, writer: StreamWriter) -> bool: ... + async def handle_code(self, code: str, reader: StreamReader, writer: StreamWriter) -> Tuple[bool, Client]: ... async def handle_client(self, reader: StreamReader, writer: StreamWriter) -> None: ... async def start(self) -> None: ... async def stop(self) -> None: ...