From 63c9515e86718952051a603583a333310c6ca2a8 Mon Sep 17 00:00:00 2001 From: SantaSpeen Date: Tue, 18 Jul 2023 22:08:00 +0300 Subject: [PATCH] Add UDP part --- src/core/Client.py | 16 ++++++------ src/core/Client.pyi | 5 ++-- src/core/core.py | 4 +-- src/core/tcp_server.py | 2 +- src/core/udp_server.py | 55 ++++++++++++++++++++++++++++++----------- src/core/udp_server.pyi | 14 ++++++----- 6 files changed, 63 insertions(+), 33 deletions(-) diff --git a/src/core/Client.py b/src/core/Client.py index 98da2ed..336777b 100644 --- a/src/core/Client.py +++ b/src/core/Client.py @@ -21,9 +21,10 @@ class Client: self.__alive = True self.__packets_queue = [] self.__tasks = [] - self._down_rw = (None, None) + self._down_sock = (None, None) + self._udp_sock = (None, None) self._loop = asyncio.get_event_loop() - self._log = utils.get_logger("client(None:0)") + self._log = utils.get_logger("player(None:0)") self._addr = writer.get_extra_info("sockname") self._cid = -1 self._key = None @@ -177,9 +178,10 @@ class Client: if int_header > 100 * MB: await self.kick("Header size limit exceeded") - self.log.warning(f"Client {self.nick}:{self.cid} sent header of >100MB - " - f"assuming malicious intent and disconnecting the client.") + self.log.warning("Client sent header of >100MB - " + "assuming malicious intent and disconnecting the client.") self.__packets_queue.append(None) + self.log.debug(f"Last recv: {await self.__reader.read(100 * MB)}") continue data = await self.__reader.read(int_header) @@ -206,7 +208,7 @@ class Client: async def _split_load(self, start, end, d_sock, filename): # TODO: Speed limiter real_size = end - start - writer = self._down_rw[1] if d_sock else self.__writer + writer = self._down_sock[1] if d_sock else self.__writer who = 'dwn' if d_sock else 'srv' if config.Server["debug"]: self.log.debug(f"[{who}] Real size: {real_size / MB}mb; {real_size == end}, {real_size * 2 == end}") @@ -245,7 +247,7 @@ class Client: return await self._send(b"AG") t = 0 - while not self._down_rw[0]: + while not self._down_sock[0]: await asyncio.sleep(0.1) t += 1 if t > 50: @@ -523,7 +525,7 @@ class Client: except Exception as e: self.log.debug(f"Error while closing writer: {e}") try: - _, down_w = self._down_rw + _, down_w = self._down_sock if down_w and not down_w.is_closing(): down_w.close() except Exception as e: diff --git a/src/core/Client.pyi b/src/core/Client.pyi index b2d7c2a..db67960 100644 --- a/src/core/Client.pyi +++ b/src/core/Client.pyi @@ -5,7 +5,7 @@ # Licence: FPA # (c) kuitoi.su 2023 import asyncio -from asyncio import StreamReader, StreamWriter +from asyncio import StreamReader, StreamWriter, DatagramTransport from logging import Logger from typing import Tuple, List, Dict @@ -19,7 +19,8 @@ class Client: self.__reader = reader self.__writer = writer self.__packets_queue = [] - self._down_rw: Tuple[StreamReader, StreamWriter] | Tuple[None, None] = (None, None) + self._udp_sock: Tuple[DatagramTransport, tuple] | Tuple[None, None] = (None, None) + self._down_sock: Tuple[StreamReader, StreamWriter] | Tuple[None, None] = (None, None) self._log = utils.get_logger("client(id: )") self._addr = writer.get_extra_info("sockname") self._loop = asyncio.get_event_loop() diff --git a/src/core/core.py b/src/core/core.py index 67d5bac..e669a9d 100644 --- a/src/core/core.py +++ b/src/core/core.py @@ -250,7 +250,7 @@ class Core: self.clients.append(None) tasks = [] # self.udp.start, - f_tasks = [self.tcp.start, console.start, self.stop_me, self.heartbeat, self.check_alive] + f_tasks = [self.tcp.start, self.udp._start, console.start, self.stop_me, self.heartbeat, self.check_alive] for task in f_tasks: tasks.append(asyncio.create_task(task())) t = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) @@ -270,7 +270,7 @@ class Core: finally: self.run = False self.tcp.stop() - # self.udp.stop() + self.udp._stop() await self.stop() def start(self): diff --git a/src/core/tcp_server.py b/src/core/tcp_server.py index 5b9facd..52ac2cf 100644 --- a/src/core/tcp_server.py +++ b/src/core/tcp_server.py @@ -89,7 +89,7 @@ class TCPServer: cid = (await reader.read(1))[0] client = self.Core.get_client(cid=cid) if client: - client._down_rw = (reader, writer) + client._down_sock = (reader, writer) self.log.debug(f"Client: {client.nick}:{cid} - HandleDownload!") else: writer.close() diff --git a/src/core/udp_server.py b/src/core/udp_server.py index a2d5599..305bf35 100644 --- a/src/core/udp_server.py +++ b/src/core/udp_server.py @@ -26,30 +26,55 @@ class UDPServer(asyncio.DatagramTransport): self.transport = transport def datagram_received(self, data, addr): - message = data.decode() - print('Received %r from %s' % (message, addr)) - print('Send %r to %s' % (message, addr)) - self.transport.sendto(data, addr) + cid = data[0] - 1 + code = data[2:3].decode() - async def start(self): + client = self.Core.get_client(cid=cid) + if client and client._udp_sock != (self.transport, addr): + client._udp_sock = (self.transport, addr) + self.log.debug(f"Set UDP Sock for CID: {cid}") + else: + self.log.debug(f"Client not found.") + + match code: + case "p": + self.log.debug(f"[{cid}] Send ping") + # TODO: Call event onSentPing + self.transport.sendto(b"p", addr) # Send ping + case "Z": + # TODO: Positions synchronization + # TODO: Call event onChangePosition + pass + case _: + self.log.debug(f"[{cid}] Unknown code: {code}") + + def connection_lost(self, exc): + if exc is not None and exc != KeyboardInterrupt: + self.log.debug(f'Connection raised: {exc}') + self.log.debug(f'Disconnected.') + self.transport.close() + + async def _start(self): self.log.debug("Starting UDP server.") self.run = True try: self.transport, _ = await self.loop.create_datagram_endpoint( - lambda: self, - local_addr=(self.host, self.port), - reuse_port=True + lambda: UDPServer(self.Core), + local_addr=(self.host, self.port) ) + self.log.debug(f"UDP server started on {self.transport.get_extra_info('sockname')}") + return except OSError as e: - self.log.error("Cannot bind port or other error") - raise e - except BaseException as e: - self.log.error(f"Error: {e}") - raise e - finally: self.run = False self.Core.run = False + self.log.error("Cannot bind port or other error") + self.log.exception(e) + except Exception as e: + self.run = False + self.Core.run = False + self.log.error(f"Error: {e}") + self.log.exception(e) - def stop(self): + def _stop(self): self.log.debug("Stopping UDP server") self.transport.close() diff --git a/src/core/udp_server.pyi b/src/core/udp_server.pyi index 8666508..9d4b22d 100644 --- a/src/core/udp_server.pyi +++ b/src/core/udp_server.pyi @@ -5,22 +5,24 @@ # Licence: FPA # (c) kuitoi.su 2023 import asyncio +from asyncio import DatagramTransport from typing import Tuple from core import utils +from core.core import Core -class UDPServer: +class UDPServer(asyncio.DatagramTransport): - def __init__(self, core, host=None, port=None): + def __init__(self, core: Core, host=None, port=None): self.log = utils.get_logger("UDPServer") self.loop = asyncio.get_event_loop() self.Core = core self.host = host self.port = port self.run = False - self.transport = None - def connection_made(self, transport: asyncio.DatagramTransport): ... + self.transport: DatagramTransport = None + def connection_made(self, transport: DatagramTransport): ... def datagram_received(self, data: bytes, addr: Tuple[str, int]): ... - async def start(self) -> None: ... - async def stop(self) -> None: ... \ No newline at end of file + async def _start(self) -> None: ... + async def _stop(self) -> None: ... \ No newline at end of file