Add UDP part

This commit is contained in:
Maxim Khomutov 2023-07-18 22:08:00 +03:00
parent cfeb2e9823
commit 63c9515e86
6 changed files with 63 additions and 33 deletions

View File

@ -21,9 +21,10 @@ class Client:
self.__alive = True self.__alive = True
self.__packets_queue = [] self.__packets_queue = []
self.__tasks = [] self.__tasks = []
self._down_rw = (None, None) self._down_sock = (None, None)
self._udp_sock = (None, None)
self._loop = asyncio.get_event_loop() self._loop = asyncio.get_event_loop()
self._log = utils.get_logger("client(None:0)") self._log = utils.get_logger("player(None:0)")
self._addr = writer.get_extra_info("sockname") self._addr = writer.get_extra_info("sockname")
self._cid = -1 self._cid = -1
self._key = None self._key = None
@ -177,9 +178,10 @@ class Client:
if int_header > 100 * MB: if int_header > 100 * MB:
await self.kick("Header size limit exceeded") await self.kick("Header size limit exceeded")
self.log.warning(f"Client {self.nick}:{self.cid} sent header of >100MB - " self.log.warning("Client sent header of >100MB - "
f"assuming malicious intent and disconnecting the client.") "assuming malicious intent and disconnecting the client.")
self.__packets_queue.append(None) self.__packets_queue.append(None)
self.log.debug(f"Last recv: {await self.__reader.read(100 * MB)}")
continue continue
data = await self.__reader.read(int_header) data = await self.__reader.read(int_header)
@ -206,7 +208,7 @@ class Client:
async def _split_load(self, start, end, d_sock, filename): async def _split_load(self, start, end, d_sock, filename):
# TODO: Speed limiter # TODO: Speed limiter
real_size = end - start real_size = end - start
writer = self._down_rw[1] if d_sock else self.__writer writer = self._down_sock[1] if d_sock else self.__writer
who = 'dwn' if d_sock else 'srv' who = 'dwn' if d_sock else 'srv'
if config.Server["debug"]: if config.Server["debug"]:
self.log.debug(f"[{who}] Real size: {real_size / MB}mb; {real_size == end}, {real_size * 2 == end}") self.log.debug(f"[{who}] Real size: {real_size / MB}mb; {real_size == end}, {real_size * 2 == end}")
@ -245,7 +247,7 @@ class Client:
return return
await self._send(b"AG") await self._send(b"AG")
t = 0 t = 0
while not self._down_rw[0]: while not self._down_sock[0]:
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
t += 1 t += 1
if t > 50: if t > 50:
@ -523,7 +525,7 @@ class Client:
except Exception as e: except Exception as e:
self.log.debug(f"Error while closing writer: {e}") self.log.debug(f"Error while closing writer: {e}")
try: try:
_, down_w = self._down_rw _, down_w = self._down_sock
if down_w and not down_w.is_closing(): if down_w and not down_w.is_closing():
down_w.close() down_w.close()
except Exception as e: except Exception as e:

View File

@ -5,7 +5,7 @@
# Licence: FPA # Licence: FPA
# (c) kuitoi.su 2023 # (c) kuitoi.su 2023
import asyncio import asyncio
from asyncio import StreamReader, StreamWriter from asyncio import StreamReader, StreamWriter, DatagramTransport
from logging import Logger from logging import Logger
from typing import Tuple, List, Dict from typing import Tuple, List, Dict
@ -19,7 +19,8 @@ class Client:
self.__reader = reader self.__reader = reader
self.__writer = writer self.__writer = writer
self.__packets_queue = [] self.__packets_queue = []
self._down_rw: Tuple[StreamReader, StreamWriter] | Tuple[None, None] = (None, None) self._udp_sock: Tuple[DatagramTransport, tuple] | Tuple[None, None] = (None, None)
self._down_sock: Tuple[StreamReader, StreamWriter] | Tuple[None, None] = (None, None)
self._log = utils.get_logger("client(id: )") self._log = utils.get_logger("client(id: )")
self._addr = writer.get_extra_info("sockname") self._addr = writer.get_extra_info("sockname")
self._loop = asyncio.get_event_loop() self._loop = asyncio.get_event_loop()

View File

@ -250,7 +250,7 @@ class Core:
self.clients.append(None) self.clients.append(None)
tasks = [] tasks = []
# self.udp.start, # self.udp.start,
f_tasks = [self.tcp.start, console.start, self.stop_me, self.heartbeat, self.check_alive] f_tasks = [self.tcp.start, self.udp._start, console.start, self.stop_me, self.heartbeat, self.check_alive]
for task in f_tasks: for task in f_tasks:
tasks.append(asyncio.create_task(task())) tasks.append(asyncio.create_task(task()))
t = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) t = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
@ -270,7 +270,7 @@ class Core:
finally: finally:
self.run = False self.run = False
self.tcp.stop() self.tcp.stop()
# self.udp.stop() self.udp._stop()
await self.stop() await self.stop()
def start(self): def start(self):

View File

@ -89,7 +89,7 @@ class TCPServer:
cid = (await reader.read(1))[0] cid = (await reader.read(1))[0]
client = self.Core.get_client(cid=cid) client = self.Core.get_client(cid=cid)
if client: if client:
client._down_rw = (reader, writer) client._down_sock = (reader, writer)
self.log.debug(f"Client: {client.nick}:{cid} - HandleDownload!") self.log.debug(f"Client: {client.nick}:{cid} - HandleDownload!")
else: else:
writer.close() writer.close()

View File

@ -26,30 +26,55 @@ class UDPServer(asyncio.DatagramTransport):
self.transport = transport self.transport = transport
def datagram_received(self, data, addr): def datagram_received(self, data, addr):
message = data.decode() cid = data[0] - 1
print('Received %r from %s' % (message, addr)) code = data[2:3].decode()
print('Send %r to %s' % (message, addr))
self.transport.sendto(data, addr)
async def start(self): client = self.Core.get_client(cid=cid)
if client and client._udp_sock != (self.transport, addr):
client._udp_sock = (self.transport, addr)
self.log.debug(f"Set UDP Sock for CID: {cid}")
else:
self.log.debug(f"Client not found.")
match code:
case "p":
self.log.debug(f"[{cid}] Send ping")
# TODO: Call event onSentPing
self.transport.sendto(b"p", addr) # Send ping
case "Z":
# TODO: Positions synchronization
# TODO: Call event onChangePosition
pass
case _:
self.log.debug(f"[{cid}] Unknown code: {code}")
def connection_lost(self, exc):
if exc is not None and exc != KeyboardInterrupt:
self.log.debug(f'Connection raised: {exc}')
self.log.debug(f'Disconnected.')
self.transport.close()
async def _start(self):
self.log.debug("Starting UDP server.") self.log.debug("Starting UDP server.")
self.run = True self.run = True
try: try:
self.transport, _ = await self.loop.create_datagram_endpoint( self.transport, _ = await self.loop.create_datagram_endpoint(
lambda: self, lambda: UDPServer(self.Core),
local_addr=(self.host, self.port), local_addr=(self.host, self.port)
reuse_port=True
) )
self.log.debug(f"UDP server started on {self.transport.get_extra_info('sockname')}")
return
except OSError as e: except OSError as e:
self.log.error("Cannot bind port or other error")
raise e
except BaseException as e:
self.log.error(f"Error: {e}")
raise e
finally:
self.run = False self.run = False
self.Core.run = False self.Core.run = False
self.log.error("Cannot bind port or other error")
self.log.exception(e)
except Exception as e:
self.run = False
self.Core.run = False
self.log.error(f"Error: {e}")
self.log.exception(e)
def stop(self): def _stop(self):
self.log.debug("Stopping UDP server") self.log.debug("Stopping UDP server")
self.transport.close() self.transport.close()

View File

@ -5,22 +5,24 @@
# Licence: FPA # Licence: FPA
# (c) kuitoi.su 2023 # (c) kuitoi.su 2023
import asyncio import asyncio
from asyncio import DatagramTransport
from typing import Tuple from typing import Tuple
from core import utils from core import utils
from core.core import Core
class UDPServer: class UDPServer(asyncio.DatagramTransport):
def __init__(self, core, host=None, port=None): def __init__(self, core: Core, host=None, port=None):
self.log = utils.get_logger("UDPServer") self.log = utils.get_logger("UDPServer")
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
self.Core = core self.Core = core
self.host = host self.host = host
self.port = port self.port = port
self.run = False self.run = False
self.transport = None self.transport: DatagramTransport = None
def connection_made(self, transport: asyncio.DatagramTransport): ... def connection_made(self, transport: DatagramTransport): ...
def datagram_received(self, data: bytes, addr: Tuple[str, int]): ... def datagram_received(self, data: bytes, addr: Tuple[str, int]): ...
async def start(self) -> None: ... async def _start(self) -> None: ...
async def stop(self) -> None: ... async def _stop(self) -> None: ...