Compare commits

..

10 Commits

Author SHA1 Message Date
7dd3faac12 Update TODOs 2023-07-18 22:43:49 +03:00
ef69df10d6 Compress DATA 2023-07-18 22:40:35 +03:00
a226b17612 Compress DATA 2023-07-18 22:40:27 +03:00
69348e9339 Some fixes in onChatReceive 2023-07-18 22:33:49 +03:00
31d8cf7842 try to check_alive 2023-07-18 22:33:15 +03:00
45d45a820c Update TODOs 2023-07-18 22:31:41 +03:00
aa440a1e3d Update TODOs 2023-07-18 22:31:14 +03:00
63c9515e86 Add UDP part 2023-07-18 22:08:00 +03:00
cfeb2e9823 heartbeat fix 2023-07-18 19:53:02 +03:00
85b85114b5 heartbeat fix 2023-07-18 19:52:44 +03:00
7 changed files with 92 additions and 62 deletions

View File

@ -21,16 +21,17 @@ BeamingDrive Multiplayer (BeamMP) server compatible with BeamMP clients.
- [x] Packets handled (Recursive finding second packet) - [x] Packets handled (Recursive finding second packet)
- [x] Car synchronizations: - [x] Car synchronizations:
- [x] State packets - [x] State packets
- [ ] Debug (gear?)
- [x] Spawn cars - [x] Spawn cars
- [x] Delete cars - [x] Delete cars
- [x] Edit cars - [x] Edit cars
- [x] Reset cars - [x] Reset cars
- [x] "ABG:" (compressed data) - [x] "ABG:" (compressed data)
- [x] Decompress data - [x] Decompress data
- [ ] Compress data - [x] Compress data
- [ ] UDP Server part: - [ ] UDP Server part:
- [ ] Players synchronizations _(Code: Zp)_ - [x] Ping
- [ ] Ping _(Code: p)_ - [ ] Position synchronizations _(Code: Zp)_
- [x] Additional: - [x] Additional:
- [ ] KuiToi System - [ ] KuiToi System
- [ ] Servers counter - [ ] Servers counter
@ -51,6 +52,7 @@ BeamingDrive Multiplayer (BeamMP) server compatible with BeamMP clients.
- [x] Async support - [x] Async support
- [x] Plugins support - [x] Plugins support
- [ ] KuiToi class - [ ] KuiToi class
- [ ] Client class
- [x] Load Python plugins - [x] Load Python plugins
- [x] Async support - [x] Async support
- [ ] Load Lua plugins (Original BeamMP compatibility) - [ ] Load Lua plugins (Original BeamMP compatibility)
@ -58,7 +60,7 @@ BeamingDrive Multiplayer (BeamMP) server compatible with BeamMP clients.
- [x] Core - [x] Core
- [x] Console - [x] Console
- [x] WebAPI - [x] WebAPI
- [x] HTTP API Server (fastapi) - [ ] HTTP API Server (fastapi)
- [x] Stop and Start with core - [x] Stop and Start with core
- [x] Configure FastAPI logger - [x] Configure FastAPI logger
- [ ] Sync with event system - [ ] Sync with event system

View File

@ -21,9 +21,10 @@ class Client:
self.__alive = True self.__alive = True
self.__packets_queue = [] self.__packets_queue = []
self.__tasks = [] self.__tasks = []
self._down_rw = (None, None) self._down_sock = (None, None)
self._udp_sock = (None, None)
self._loop = asyncio.get_event_loop() self._loop = asyncio.get_event_loop()
self._log = utils.get_logger("client(None:0)") self._log = utils.get_logger("player(None:0)")
self._addr = writer.get_extra_info("sockname") self._addr = writer.get_extra_info("sockname")
self._cid = -1 self._cid = -1
self._key = None self._key = None
@ -112,11 +113,7 @@ class Client:
continue continue
if not to_udp or code in ['V', 'W', 'Y', 'E']: if not to_udp or code in ['V', 'W', 'Y', 'E']:
if code in ['O', 'T'] or len(data) > 1000: if code in ['O', 'T'] or len(data) > 1000:
if len(data) > 400: await client._send(data)
# TODO: Compress data
await client._send(data)
else:
await client._send(data)
else: else:
await client._send(data) await client._send(data)
else: else:
@ -124,6 +121,9 @@ class Client:
self.log.debug(f"UDP Part not ready: {code}") self.log.debug(f"UDP Part not ready: {code}")
return return
if len(data) > 400:
data = b"ABG:" + zlib.compress(data, level=zlib.Z_BEST_COMPRESSION)
header = len(data).to_bytes(4, "little", signed=True) header = len(data).to_bytes(4, "little", signed=True)
self.log.debug(f'len: {len(data)}; send: {header + data!r}') self.log.debug(f'len: {len(data)}; send: {header + data!r}')
try: try:
@ -177,9 +177,10 @@ class Client:
if int_header > 100 * MB: if int_header > 100 * MB:
await self.kick("Header size limit exceeded") await self.kick("Header size limit exceeded")
self.log.warning(f"Client {self.nick}:{self.cid} sent header of >100MB - " self.log.warning("Client sent header of >100MB - "
f"assuming malicious intent and disconnecting the client.") "assuming malicious intent and disconnecting the client.")
self.__packets_queue.append(None) self.__packets_queue.append(None)
self.log.debug(f"Last recv: {await self.__reader.read(100 * MB)}")
continue continue
data = await self.__reader.read(int_header) data = await self.__reader.read(int_header)
@ -206,7 +207,7 @@ class Client:
async def _split_load(self, start, end, d_sock, filename): async def _split_load(self, start, end, d_sock, filename):
# TODO: Speed limiter # TODO: Speed limiter
real_size = end - start real_size = end - start
writer = self._down_rw[1] if d_sock else self.__writer writer = self._down_sock[1] if d_sock else self.__writer
who = 'dwn' if d_sock else 'srv' who = 'dwn' if d_sock else 'srv'
if config.Server["debug"]: if config.Server["debug"]:
self.log.debug(f"[{who}] Real size: {real_size / MB}mb; {real_size == end}, {real_size * 2 == end}") self.log.debug(f"[{who}] Real size: {real_size / MB}mb; {real_size == end}, {real_size * 2 == end}")
@ -245,7 +246,7 @@ class Client:
return return
await self._send(b"AG") await self._send(b"AG")
t = 0 t = 0
while not self._down_rw[0]: while not self._down_sock[0]:
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
t += 1 t += 1
if t > 50: if t > 50:
@ -451,12 +452,7 @@ class Client:
message = ev_data["message"] message = ev_data["message"]
to_all = ev_data.get("to_all") to_all = ev_data.get("to_all")
if to_all is None: if to_all is None:
if need_send:
need_send = False
to_all = True to_all = True
if to_all:
if need_send:
need_send = False
to_self = ev_data.get("to_self") to_self = ev_data.get("to_self")
if to_self is None: if to_self is None:
to_self = True to_self = True
@ -465,6 +461,7 @@ class Client:
if to_client: if to_client:
writer = to_client._writer writer = to_client._writer
await self._send(f"C:{message}", to_all=to_all, to_self=to_self, writer=writer) await self._send(f"C:{message}", to_all=to_all, to_self=to_self, writer=writer)
need_send = False
except KeyError | AttributeError: except KeyError | AttributeError:
self.log.error(f"Returns invalid data: {ev_data}") self.log.error(f"Returns invalid data: {ev_data}")
if need_send: if need_send:
@ -523,7 +520,7 @@ class Client:
except Exception as e: except Exception as e:
self.log.debug(f"Error while closing writer: {e}") self.log.debug(f"Error while closing writer: {e}")
try: try:
_, down_w = self._down_rw _, down_w = self._down_sock
if down_w and not down_w.is_closing(): if down_w and not down_w.is_closing():
down_w.close() down_w.close()
except Exception as e: except Exception as e:

View File

@ -5,7 +5,7 @@
# Licence: FPA # Licence: FPA
# (c) kuitoi.su 2023 # (c) kuitoi.su 2023
import asyncio import asyncio
from asyncio import StreamReader, StreamWriter from asyncio import StreamReader, StreamWriter, DatagramTransport
from logging import Logger from logging import Logger
from typing import Tuple, List, Dict from typing import Tuple, List, Dict
@ -19,7 +19,8 @@ class Client:
self.__reader = reader self.__reader = reader
self.__writer = writer self.__writer = writer
self.__packets_queue = [] self.__packets_queue = []
self._down_rw: Tuple[StreamReader, StreamWriter] | Tuple[None, None] = (None, None) self._udp_sock: Tuple[DatagramTransport, tuple] | Tuple[None, None] = (None, None)
self._down_sock: Tuple[StreamReader, StreamWriter] | Tuple[None, None] = (None, None)
self._log = utils.get_logger("client(id: )") self._log = utils.get_logger("client(id: )")
self._addr = writer.get_extra_info("sockname") self._addr = writer.get_extra_info("sockname")
self._loop = asyncio.get_event_loop() self._loop = asyncio.get_event_loop()

View File

@ -95,16 +95,20 @@ class Core:
async def check_alive(self): async def check_alive(self):
maxp = config.Game['players'] maxp = config.Game['players']
while self.run: try:
await asyncio.sleep(1) while self.run:
ca = f"Ss{len(self.clients_by_id)}/{maxp}:{self.get_clients_list()}" await asyncio.sleep(1)
for client in self.clients: ca = f"Ss{len(self.clients_by_id)}/{maxp}:{self.get_clients_list()}"
if not client: for client in self.clients:
continue if not client:
if not client.ready: continue
client.is_disconnected() if not client.ready:
continue client.is_disconnected()
await client._send(bytes(ca, "utf-8")) continue
await client._send(bytes(ca, "utf-8"))
except Exception as e:
self.log.error("Error in check_alive.")
self.log.exception(e)
@staticmethod @staticmethod
def start_web(): def start_web():
@ -141,7 +145,7 @@ class Core:
modstotal = len(self.mods_list) - 1 modstotal = len(self.mods_list) - 1
while self.run: while self.run:
try: try:
data = {"uuid": config.Auth["key"], "players": len(self.clients), "maxplayers": config.Game["players"], data = {"uuid": config.Auth["key"], "players": len(self.clients_by_id), "maxplayers": config.Game["players"],
"port": config.Server["server_port"], "map": f"/levels/{config.Game['map']}/info.json", "port": config.Server["server_port"], "map": f"/levels/{config.Game['map']}/info.json",
"private": config.Auth['private'], "version": self.BeamMP_version, "private": config.Auth['private'], "version": self.BeamMP_version,
"clientversion": self.client_major_version, "clientversion": self.client_major_version,
@ -188,13 +192,12 @@ class Core:
self.direct = True self.direct = True
else: else:
self.direct = True self.direct = True
if test:
self.log.error("Cannot auth...")
if not config.Auth['private']:
raise KeyboardInterrupt
if test: if test:
# TODO: i18n # TODO: i18n
self.log.error("Cannot authenticate server.")
self.log.info(f"Server still runnig, but only in Direct connect mode.") self.log.info(f"Server still runnig, but only in Direct connect mode.")
# if not config.Auth['private']:
# raise KeyboardInterrupt
if test: if test:
return ok return ok
@ -251,7 +254,7 @@ class Core:
self.clients.append(None) self.clients.append(None)
tasks = [] tasks = []
# self.udp.start, # self.udp.start,
f_tasks = [self.tcp.start, console.start, self.stop_me, self.heartbeat, self.check_alive] f_tasks = [self.tcp.start, self.udp._start, console.start, self.stop_me, self.heartbeat, self.check_alive]
for task in f_tasks: for task in f_tasks:
tasks.append(asyncio.create_task(task())) tasks.append(asyncio.create_task(task()))
t = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) t = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
@ -271,7 +274,7 @@ class Core:
finally: finally:
self.run = False self.run = False
self.tcp.stop() self.tcp.stop()
# self.udp.stop() self.udp._stop()
await self.stop() await self.stop()
def start(self): def start(self):

View File

@ -89,7 +89,7 @@ class TCPServer:
cid = (await reader.read(1))[0] cid = (await reader.read(1))[0]
client = self.Core.get_client(cid=cid) client = self.Core.get_client(cid=cid)
if client: if client:
client._down_rw = (reader, writer) client._down_sock = (reader, writer)
self.log.debug(f"Client: {client.nick}:{cid} - HandleDownload!") self.log.debug(f"Client: {client.nick}:{cid} - HandleDownload!")
else: else:
writer.close() writer.close()

View File

@ -26,30 +26,55 @@ class UDPServer(asyncio.DatagramTransport):
self.transport = transport self.transport = transport
def datagram_received(self, data, addr): def datagram_received(self, data, addr):
message = data.decode() cid = data[0] - 1
print('Received %r from %s' % (message, addr)) code = data[2:3].decode()
print('Send %r to %s' % (message, addr))
self.transport.sendto(data, addr)
async def start(self): client = self.Core.get_client(cid=cid)
if client and client._udp_sock != (self.transport, addr):
client._udp_sock = (self.transport, addr)
self.log.debug(f"Set UDP Sock for CID: {cid}")
else:
self.log.debug(f"Client not found.")
match code:
case "p":
self.log.debug(f"[{cid}] Send ping")
# TODO: Call event onSentPing
self.transport.sendto(b"p", addr) # Send ping
case "Z":
# TODO: Positions synchronization
# TODO: Call event onChangePosition
pass
case _:
self.log.debug(f"[{cid}] Unknown code: {code}")
def connection_lost(self, exc):
if exc is not None and exc != KeyboardInterrupt:
self.log.debug(f'Connection raised: {exc}')
self.log.debug(f'Disconnected.')
self.transport.close()
async def _start(self):
self.log.debug("Starting UDP server.") self.log.debug("Starting UDP server.")
self.run = True self.run = True
try: try:
self.transport, _ = await self.loop.create_datagram_endpoint( self.transport, _ = await self.loop.create_datagram_endpoint(
lambda: self, lambda: UDPServer(self.Core),
local_addr=(self.host, self.port), local_addr=(self.host, self.port)
reuse_port=True
) )
self.log.debug(f"UDP server started on {self.transport.get_extra_info('sockname')}")
return
except OSError as e: except OSError as e:
self.log.error("Cannot bind port or other error")
raise e
except BaseException as e:
self.log.error(f"Error: {e}")
raise e
finally:
self.run = False self.run = False
self.Core.run = False self.Core.run = False
self.log.error("Cannot bind port or other error")
self.log.exception(e)
except Exception as e:
self.run = False
self.Core.run = False
self.log.error(f"Error: {e}")
self.log.exception(e)
def stop(self): def _stop(self):
self.log.debug("Stopping UDP server") self.log.debug("Stopping UDP server")
self.transport.close() self.transport.close()

View File

@ -5,22 +5,24 @@
# Licence: FPA # Licence: FPA
# (c) kuitoi.su 2023 # (c) kuitoi.su 2023
import asyncio import asyncio
from asyncio import DatagramTransport
from typing import Tuple from typing import Tuple
from core import utils from core import utils
from core.core import Core
class UDPServer: class UDPServer(asyncio.DatagramTransport):
def __init__(self, core, host=None, port=None): def __init__(self, core: Core, host=None, port=None):
self.log = utils.get_logger("UDPServer") self.log = utils.get_logger("UDPServer")
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
self.Core = core self.Core = core
self.host = host self.host = host
self.port = port self.port = port
self.run = False self.run = False
self.transport = None self.transport: DatagramTransport = None
def connection_made(self, transport: asyncio.DatagramTransport): ... def connection_made(self, transport: DatagramTransport): ...
def datagram_received(self, data: bytes, addr: Tuple[str, int]): ... def datagram_received(self, data: bytes, addr: Tuple[str, int]): ...
async def start(self) -> None: ... async def _start(self) -> None: ...
async def stop(self) -> None: ... async def _stop(self) -> None: ...