From ea2d715cae5520c9964502cb85a0335c4bd78c8e Mon Sep 17 00:00:00 2001 From: SantaSpeen Date: Wed, 19 Jul 2023 20:23:16 +0300 Subject: [PATCH] DO Speed limiter; Minor fixes; --- src/core/Client.py | 67 +++++++++++++++++++++++++++++++++------------ src/core/Client.pyi | 6 ++-- 2 files changed, 53 insertions(+), 20 deletions(-) diff --git a/src/core/Client.py b/src/core/Client.py index c9a90f4..41f6a74 100644 --- a/src/core/Client.py +++ b/src/core/Client.py @@ -7,6 +7,7 @@ import asyncio import json import math +import time import zlib from core import utils @@ -92,6 +93,12 @@ class Client: await self._send(b"K" + bytes(reason, "utf-8")) self.__alive = False + async def send_message(self, message, to_all=True): + pass + + async def send_event(self, event_name, event_data): + pass + async def _send(self, data, to_all=False, to_self=True, to_udp=False, writer=None): # TNetwork.cpp; Line: 383 @@ -198,7 +205,7 @@ class Client: self.log.warning("Client sent header of >100MB - " "assuming malicious intent and disconnecting the client.") self.__packets_queue.append(None) - self.log.debug(f"Last recv: {await self.__reader.read(100 * MB)}") + self.log.error(f"Last recv: {await self.__reader.read(100 * MB)}") continue data = await self.__reader.read(int_header) @@ -222,8 +229,7 @@ class Client: self.__alive = False self.__packets_queue.append(None) - async def _split_load(self, start, end, d_sock, filename): - # TODO: Speed limiter + async def _split_load(self, start, end, d_sock, filename, speed_limit=None): real_size = end - start writer = self._down_sock[1] if d_sock else self.__writer who = 'dwn' if d_sock else 'srv' @@ -232,17 +238,31 @@ class Client: with open(filename, 'rb') as f: f.seek(start) - data = f.read(end) - try: - writer.write(data) - await writer.drain() - self.log.debug(f"[{who}] File sent.") - except ConnectionError: - self.__alive = False - self.log.debug(f"[{who}] Disconnected.") - return real_size + total_sent = 0 + start_time = time.monotonic() + while total_sent < real_size: + data = f.read(min(MB, real_size - total_sent)) # read data in chunks of 1MB or less + try: + writer.write(data) + await writer.drain() + self.log.debug(f"[{who}] Sent {len(data)} bytes.") + except ConnectionError: + self.__alive = False + self.log.debug(f"[{who}] Disconnected.") + break + total_sent += len(data) + + # Calculate delay based on speed limit + if speed_limit: + elapsed_time = time.monotonic() - start_time + expected_time = total_sent / (speed_limit * MB) + if expected_time > elapsed_time: + await asyncio.sleep(expected_time - elapsed_time) + + return total_sent async def _sync_resources(self): + tsr = time.monotonic() while self.__alive: data = await self._recv(True) if data.startswith(b"f"): @@ -270,13 +290,22 @@ class Client: if t > 50: await self.kick("Missing download socket") return - + speed = 10 + if speed: + speed = speed / 2 half_size = math.floor(size / 2) + t = time.monotonic() uploads = [ - self._split_load(0, half_size, False, file), - self._split_load(half_size, size, True, file) + self._split_load(0, half_size, False, file, speed), + self._split_load(half_size, size, True, file, speed) ] sl0, sl1 = await asyncio.gather(*uploads) + tr = time.monotonic() - t + # TODO: i18n + msg = f"Mod sent: Size {round(size / MB, 3)}mb Speed {int(size / tr / MB)}Mb/s ({int(tr)}s)" + if speed: + msg += f" of limit {int(speed * 2)}Mb/s" + self.log.info(msg) sent = sl0 + sl1 ok = sent == size lost = size - sent @@ -284,7 +313,7 @@ class Client: if not ok: self.__alive = False # TODO: i18n - self.log.error(f"Error while sending.") + self.log.error(f"Error while sending: {file!r}") return elif data.startswith(b"SR"): path_list = '' @@ -304,6 +333,7 @@ class Client: for c in range(int(config.Game['max_cars'] * 2.3)): self._cars.append(None) await self._send(b"M/levels/" + bytes(config.Game['map'], 'utf-8') + b"/info.json") + self.log.info(f"Syncing time: {time.monotonic() - tsr}") break return @@ -312,7 +342,8 @@ class Client: s = data[sep:sep + 3] id_sep = s.find('-') if id_sep == -1: - self.log.debug(f"Invalid packet: Could not parse pid/vid from packet, as there is no '-' separator: '{data}'") + self.log.debug( + f"Invalid packet: Could not parse pid/vid from packet, as there is no '-' separator: '{data}'") return -1, -1 cid = s[:id_sep] vid = s[id_sep + 1:] @@ -462,7 +493,7 @@ class Client: sup = data.find(":", 2) if sup == -1: await self._send("C:Server: Invalid message.") - msg = data[sup+2:] + msg = data[sup + 2:] if not msg: self.log.debug("Tried to send an empty event, ignoring") return diff --git a/src/core/Client.pyi b/src/core/Client.pyi index c532e17..75aa7c8 100644 --- a/src/core/Client.pyi +++ b/src/core/Client.pyi @@ -51,11 +51,13 @@ class Client: def cars(self) -> List[dict | None]: ... def is_disconnected(self) -> bool: ... async def kick(self, reason: str) -> None: ... + async def send_message(self, message: str | bytes, to_all: bool = True) -> None:... + async def send_event(self, event_name: str, event_data: str) -> None: ... async def _send(self, data: bytes | str, to_all: bool = False, to_self: bool = True, to_udp: bool = False, writer: StreamWriter = None) -> None: ... async def _sync_resources(self) -> None: ... - async def __handle_packet(self, data, int_header): ... + # async def __handle_packet(self, data, int_header): ... async def _recv(self, one=False) -> bytes | None: ... - async def _split_load(self, start: int, end: int, d_sock: bool, filename: str) -> None: ... + async def _split_load(self, start: int, end: int, d_sock: bool, filename: str, sl: float) -> None: ... async def _get_cid_vid(self, s: str) -> Tuple[int, int]: ... async def _handle_car_codes(self, data) -> None: ... async def _handle_codes(self, data) -> None: ...