DO Speed limiter;

Minor fixes;
This commit is contained in:
Maxim Khomutov 2023-07-19 20:23:16 +03:00
parent 102891c8e8
commit ea2d715cae
2 changed files with 53 additions and 20 deletions

View File

@ -7,6 +7,7 @@
import asyncio import asyncio
import json import json
import math import math
import time
import zlib import zlib
from core import utils from core import utils
@ -92,6 +93,12 @@ class Client:
await self._send(b"K" + bytes(reason, "utf-8")) await self._send(b"K" + bytes(reason, "utf-8"))
self.__alive = False self.__alive = False
async def send_message(self, message, to_all=True):
pass
async def send_event(self, event_name, event_data):
pass
async def _send(self, data, to_all=False, to_self=True, to_udp=False, writer=None): async def _send(self, data, to_all=False, to_self=True, to_udp=False, writer=None):
# TNetwork.cpp; Line: 383 # TNetwork.cpp; Line: 383
@ -198,7 +205,7 @@ class Client:
self.log.warning("Client sent header of >100MB - " self.log.warning("Client sent header of >100MB - "
"assuming malicious intent and disconnecting the client.") "assuming malicious intent and disconnecting the client.")
self.__packets_queue.append(None) self.__packets_queue.append(None)
self.log.debug(f"Last recv: {await self.__reader.read(100 * MB)}") self.log.error(f"Last recv: {await self.__reader.read(100 * MB)}")
continue continue
data = await self.__reader.read(int_header) data = await self.__reader.read(int_header)
@ -222,8 +229,7 @@ class Client:
self.__alive = False self.__alive = False
self.__packets_queue.append(None) self.__packets_queue.append(None)
async def _split_load(self, start, end, d_sock, filename): async def _split_load(self, start, end, d_sock, filename, speed_limit=None):
# TODO: Speed limiter
real_size = end - start real_size = end - start
writer = self._down_sock[1] if d_sock else self.__writer writer = self._down_sock[1] if d_sock else self.__writer
who = 'dwn' if d_sock else 'srv' who = 'dwn' if d_sock else 'srv'
@ -232,17 +238,31 @@ class Client:
with open(filename, 'rb') as f: with open(filename, 'rb') as f:
f.seek(start) f.seek(start)
data = f.read(end) total_sent = 0
try: start_time = time.monotonic()
writer.write(data) while total_sent < real_size:
await writer.drain() data = f.read(min(MB, real_size - total_sent)) # read data in chunks of 1MB or less
self.log.debug(f"[{who}] File sent.") try:
except ConnectionError: writer.write(data)
self.__alive = False await writer.drain()
self.log.debug(f"[{who}] Disconnected.") self.log.debug(f"[{who}] Sent {len(data)} bytes.")
return real_size except ConnectionError:
self.__alive = False
self.log.debug(f"[{who}] Disconnected.")
break
total_sent += len(data)
# Calculate delay based on speed limit
if speed_limit:
elapsed_time = time.monotonic() - start_time
expected_time = total_sent / (speed_limit * MB)
if expected_time > elapsed_time:
await asyncio.sleep(expected_time - elapsed_time)
return total_sent
async def _sync_resources(self): async def _sync_resources(self):
tsr = time.monotonic()
while self.__alive: while self.__alive:
data = await self._recv(True) data = await self._recv(True)
if data.startswith(b"f"): if data.startswith(b"f"):
@ -270,13 +290,22 @@ class Client:
if t > 50: if t > 50:
await self.kick("Missing download socket") await self.kick("Missing download socket")
return return
speed = 10
if speed:
speed = speed / 2
half_size = math.floor(size / 2) half_size = math.floor(size / 2)
t = time.monotonic()
uploads = [ uploads = [
self._split_load(0, half_size, False, file), self._split_load(0, half_size, False, file, speed),
self._split_load(half_size, size, True, file) self._split_load(half_size, size, True, file, speed)
] ]
sl0, sl1 = await asyncio.gather(*uploads) sl0, sl1 = await asyncio.gather(*uploads)
tr = time.monotonic() - t
# TODO: i18n
msg = f"Mod sent: Size {round(size / MB, 3)}mb Speed {int(size / tr / MB)}Mb/s ({int(tr)}s)"
if speed:
msg += f" of limit {int(speed * 2)}Mb/s"
self.log.info(msg)
sent = sl0 + sl1 sent = sl0 + sl1
ok = sent == size ok = sent == size
lost = size - sent lost = size - sent
@ -284,7 +313,7 @@ class Client:
if not ok: if not ok:
self.__alive = False self.__alive = False
# TODO: i18n # TODO: i18n
self.log.error(f"Error while sending.") self.log.error(f"Error while sending: {file!r}")
return return
elif data.startswith(b"SR"): elif data.startswith(b"SR"):
path_list = '' path_list = ''
@ -304,6 +333,7 @@ class Client:
for c in range(int(config.Game['max_cars'] * 2.3)): for c in range(int(config.Game['max_cars'] * 2.3)):
self._cars.append(None) self._cars.append(None)
await self._send(b"M/levels/" + bytes(config.Game['map'], 'utf-8') + b"/info.json") await self._send(b"M/levels/" + bytes(config.Game['map'], 'utf-8') + b"/info.json")
self.log.info(f"Syncing time: {time.monotonic() - tsr}")
break break
return return
@ -312,7 +342,8 @@ class Client:
s = data[sep:sep + 3] s = data[sep:sep + 3]
id_sep = s.find('-') id_sep = s.find('-')
if id_sep == -1: if id_sep == -1:
self.log.debug(f"Invalid packet: Could not parse pid/vid from packet, as there is no '-' separator: '{data}'") self.log.debug(
f"Invalid packet: Could not parse pid/vid from packet, as there is no '-' separator: '{data}'")
return -1, -1 return -1, -1
cid = s[:id_sep] cid = s[:id_sep]
vid = s[id_sep + 1:] vid = s[id_sep + 1:]
@ -462,7 +493,7 @@ class Client:
sup = data.find(":", 2) sup = data.find(":", 2)
if sup == -1: if sup == -1:
await self._send("C:Server: Invalid message.") await self._send("C:Server: Invalid message.")
msg = data[sup+2:] msg = data[sup + 2:]
if not msg: if not msg:
self.log.debug("Tried to send an empty event, ignoring") self.log.debug("Tried to send an empty event, ignoring")
return return

View File

@ -51,11 +51,13 @@ class Client:
def cars(self) -> List[dict | None]: ... def cars(self) -> List[dict | None]: ...
def is_disconnected(self) -> bool: ... def is_disconnected(self) -> bool: ...
async def kick(self, reason: str) -> None: ... async def kick(self, reason: str) -> None: ...
async def send_message(self, message: str | bytes, to_all: bool = True) -> None:...
async def send_event(self, event_name: str, event_data: str) -> None: ...
async def _send(self, data: bytes | str, to_all: bool = False, to_self: bool = True, to_udp: bool = False, writer: StreamWriter = None) -> None: ... async def _send(self, data: bytes | str, to_all: bool = False, to_self: bool = True, to_udp: bool = False, writer: StreamWriter = None) -> None: ...
async def _sync_resources(self) -> None: ... async def _sync_resources(self) -> None: ...
async def __handle_packet(self, data, int_header): ... # async def __handle_packet(self, data, int_header): ...
async def _recv(self, one=False) -> bytes | None: ... async def _recv(self, one=False) -> bytes | None: ...
async def _split_load(self, start: int, end: int, d_sock: bool, filename: str) -> None: ... async def _split_load(self, start: int, end: int, d_sock: bool, filename: str, sl: float) -> None: ...
async def _get_cid_vid(self, s: str) -> Tuple[int, int]: ... async def _get_cid_vid(self, s: str) -> Tuple[int, int]: ...
async def _handle_car_codes(self, data) -> None: ... async def _handle_car_codes(self, data) -> None: ...
async def _handle_codes(self, data) -> None: ... async def _handle_codes(self, data) -> None: ...