Prepare for sync_resources;

Fix async bugs;
Recreate ID system;
Add Ss (Player counter) code;
This commit is contained in:
2023-07-13 17:49:23 +03:00
parent 1f595db700
commit 52893513d0
5 changed files with 260 additions and 163 deletions

View File

@@ -6,6 +6,7 @@
# (c) kuitoi.su 2023
import asyncio
import os
import random
import zlib
from threading import Thread
@@ -36,8 +37,8 @@ class Client:
self.alive = True
def _update_logger(self):
self.log.debug(f"Update logger")
self.log = utils.get_logger(f"client({self.nick}:{self.cid})")
self.log.debug(f"Update logger")
def is_disconnected(self):
if not self.alive:
@@ -55,11 +56,10 @@ class Client:
async def kick(self, reason):
self.log.info(f"Client: \"IP: {self.addr!r}; ID: {self.cid}\" - kicked with reason: \"{reason}\"")
await self.tcp_send(b"K" + bytes(reason, "utf-8"))
# self.writer.close()
# await self.writer.wait_closed()
self.alive = False
await self.remove_me()
async def tcp_send(self, data):
async def tcp_send(self, data, to_all=False, writer=None):
# TNetwork.cpp; Line: 383
# BeamMP TCP protocol sends a header of 4 bytes, followed by the data.
@@ -67,63 +67,98 @@ class Client:
# ^------^^---...-^
# size data
if writer is None:
writer = self.writer
if to_all:
for client in self.Core.clients:
if not client:
continue
await client.tcp_send(data)
return
self.log.debug(f"tcp_send({data})")
if len(data) == 10:
data += b"."
header = len(data).to_bytes(4, "little", signed=True)
self.log.debug(f'len(data) {len(data)}; send {header + data}')
self.writer.write(header + data)
await self.writer.drain()
try:
writer.write(header + data)
await writer.drain()
except ConnectionError:
self.log.debug('Disconnected')
self.alive = False
async def recv(self):
async def recv(self, kostil=False):
# if not self.is_disconnected():
# self.log.debug(f"Client with {self.nick}({self.cid}) disconnected")
# return b""
header = await self.reader.read(4) # header: 4 bytes
try:
header = await self.reader.read(4) # header: 4 bytes
int_header = 0
for i in range(len(header)):
int_header += header[i]
int_header = 0
for i in range(len(header)):
int_header += header[i]
if int_header <= 0:
await self.kick("Invalid packet - header negative")
return b""
if int_header <= 0:
await asyncio.sleep(0.1)
if not self.alive:
self.log.debug(f"Disconnected")
self.writer.close()
return b''
if kostil:
return
self.log.debug(f"Header: {header}")
await self.kick("Invalid packet - header negative")
return b""
if int_header > 100 * MB:
await self.kick("Header size limit exceeded")
self.log.warn(f"Client {self.nick}({self.cid}) sent header of >100MB - "
f"assuming malicious intent and disconnecting the client.")
return b""
if int_header > 100 * MB:
await self.kick("Header size limit exceeded")
self.log.warn(f"Client {self.nick}({self.cid}) sent header of >100MB - "
f"assuming malicious intent and disconnecting the client.")
return b""
data = await self.reader.read(101 * MB)
self.log.debug(f"header: `{header}`; int_header: `{int_header}`; data: `{data}`;")
data = await self.reader.read(100 * MB)
self.log.debug(f"header: `{header}`; int_header: `{int_header}`; data: `{data}`;")
if len(data) != int_header:
self.log.debug(f"WARN Expected to read {int_header} bytes, instead got {len(data)}")
if len(data) != int_header:
self.log.debug(f"WARN Expected to read {int_header} bytes, instead got {len(data)}")
abg = b"ABG:"
if len(data) > len(abg) and data.startswith(abg):
data = zlib.decompress(data[len(abg):])
self.log.debug(f"ABG: {data}")
abg = b"ABG:"
if len(data) > len(abg) and data.startswith(abg):
data = zlib.decompress(data[len(abg):])
self.log.debug(f"ABG: {data}")
return data
return data
return data
except ConnectionError:
self.alive = False
await self.remove_me()
return b""
async def sync_resources(self):
while True:
data = await self.recv()
if not data:
await asyncio.sleep(.1)
continue
self.log.debug(f"Received: {data}")
if data.startswith(b"f"):
# TODO: SendFile
pass
file = data[1:].decode("utf-8")
self.log.debug(f"Sending File: {file}")
await self.kick(f"TODO: SendFile({file})")
elif data.startswith(b"SR"):
# TODO: Create mods list
self.log.debug("Sending Mod Info")
mods = []
mod_list = b''
# * code *
if len(mods) == 0:
mod_list = '%s;%s;'
for mod in self.Core.mods_list:
if type(mod) == int:
continue
mod_list = (mod_list % (mod['path'], mod['size'])).replace(";", ";%s;")
mod_list = mod_list.replace("%s;", "")
if len(mod_list) == 0:
await self.tcp_send(b"-")
else:
await self.tcp_send(mod_list)
await self.tcp_send(bytes(mod_list, "utf-8"))
data = await self.recv()
if data == b"Done":
await self.tcp_send(b"M/levels/" + bytes(config.Game['map'], 'utf-8') + b"/info.json")
@@ -131,7 +166,6 @@ class Client:
async def looper(self):
# self.is_disconnected()
self.log.debug(f"Alive: {self.alive}")
await self.tcp_send(b"P" + bytes(f"{self.cid}", "utf-8"))
await self.sync_resources()
while self.alive:
@@ -139,19 +173,31 @@ class Client:
if data == b"":
if not self.alive:
break
elif self.is_disconnected():
break
else:
await asyncio.sleep(.2)
continue
code = data.decode()[0]
self.log.debug(f"Code: {code}, data: {data}")
self.log.debug(f"Received code: {code}, data: {data}")
match code:
case "H":
# Client connected
await self.tcp_send(b"Sn" + bytes(self.nick, "utf-8"))
await self.tcp_send(b"Sn" + bytes(self.nick, "utf-8"), to_all=True)
case "C":
# Chat
await self.tcp_send(data)
await self.tcp_send(data, to_all=True)
async def remove_me(self):
self.log.debug(f"Removing client {self.nick}({self.cid})")
await asyncio.sleep(0.3)
if not self.writer.is_closing():
self.writer.close()
if (self.cid > 0 or self.nick is not None) and self.Core.clients_by_id.get(self.cid):
_, down_w = self.down_rw
if down_w and not down_w.is_closing():
down_w.close()
self.Core.clients[self.cid] = None
self.Core.clients_by_id.pop(self.cid)
self.Core.clients_by_nick.pop(self.nick)
class Core:
@@ -177,40 +223,60 @@ class Core:
self.client_major_version = "2.0"
self.BeamMP_version = "3.2.0"
def get_client(self, sock=None, cid=None, nick=None):
def get_client(self, cid=None, nick=None):
if cid:
return self.clients_by_id.get(cid)
if nick:
return self.clients_by_nick.get(nick)
if sock:
return self.clients_by_nick.get(sock.getsockname())
def insert_client(self, client):
self.log.debug(f"Inserting client: {client.cid}")
self.clients_by_nick.update({client.nick: client})
self.clients_by_id.update({client.cid: client})
self.clients[client.cid] = client
def create_client(self, *args, **kwargs):
client = Client(*args, **kwargs)
cid = 1
for client in self.clients:
if client.cid == cid:
async def insert_client(self, client):
await asyncio.sleep(random.randint(3, 9) * 0.01)
cid = 0
for _client in self.clients:
if not _client:
break
if _client.cid == cid:
cid += 1
else:
break
client.cid = cid
client._update_logger()
self.log.debug(f"Create client; client.cid: {client.cid};")
await asyncio.sleep(random.randint(3, 9) * 0.01)
if not self.clients[cid]:
client.cid = cid
self.clients_by_nick.update({client.nick: client})
self.log.debug(f"Inserting client: id{client.cid}")
self.clients_by_id.update({client.cid: client})
self.clients[client.cid] = client
client._update_logger()
return
await self.insert_client(client)
def create_client(self, *args, **kwargs):
self.log.debug(f"Create client")
client = Client(core=self, *args, **kwargs)
return client
async def check_alive(self):
await asyncio.sleep(5)
self.log.debug(f"Checking if clients is alive")
def get_clients_list(self, need_cid=False):
out = ""
for client in self.clients:
d = client.is_disconnected()
if d:
self.log.debug(f"Client ID: {client.cid} died...")
if not client:
continue
out += f"{client.nick}"
if need_cid:
out += f":{client.cid}"
out += ","
if out:
out = out[:-1]
return out
async def check_alive(self):
maxp = config.Game['players']
while self.run:
await asyncio.sleep(1)
ca = f"Ss{len(self.clients_by_id)}/{maxp}:{self.get_clients_list()}"
for client in self.clients:
if not client:
continue
await client.tcp_send(bytes(ca, "utf-8"))
@staticmethod
def start_web():
@@ -223,10 +289,10 @@ class Core:
webapp.uvserver = uvserver
uvserver.run()
@staticmethod
async def stop_me():
async def stop_me(self):
while webapp.data_run[0]:
await asyncio.sleep(1)
self.run = False
raise KeyboardInterrupt
# noinspection SpellCheckingInspection,PyPep8Naming
@@ -246,69 +312,76 @@ class Core:
modstotalsize = self.mods_list[0]
modstotal = len(self.mods_list) - 1
while self.run:
data = {"uuid": config.Auth["key"], "players": len(self.clients), "maxplayers": config.Game["players"],
"port": config.Server["server_port"], "map": f"/levels/{config.Game['map']}/info.json",
"private": config.Auth['private'], "version": self.BeamMP_version,
"clientversion": self.client_major_version,
"name": config.Server["name"], "modlist": modlist, "modstotalsize": modstotalsize,
"modstotal": modstotal, "playerslist": "", "desc": config.Server['description'], "pass": False}
self.log.debug(f"Auth: data {data}")
try:
data = {"uuid": config.Auth["key"], "players": len(self.clients), "maxplayers": config.Game["players"],
"port": config.Server["server_port"], "map": f"/levels/{config.Game['map']}/info.json",
"private": config.Auth['private'], "version": self.BeamMP_version,
"clientversion": self.client_major_version,
"name": config.Server["name"], "modlist": modlist, "modstotalsize": modstotalsize,
"modstotal": modstotal, "playerslist": "", "desc": config.Server['description'], "pass": False}
self.log.debug(f"Auth: data {data}")
# Sentry?
ok = False
body = {}
code = 0
for server_url in BEAM_backend:
url = "https://" + server_url + "/heartbeat"
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, data=data, headers={"api-v": "2"}) as response:
code = response.status
body = await response.json()
self.log.debug(f"Auth: code {code}, body {body}")
ok = True
break
except Exception as e:
self.log.debug(f"Auth: Error `{e}` while auth with `{server_url}`")
continue
# Sentry?
ok = False
body = {}
code = 0
for server_url in BEAM_backend:
url = "https://" + server_url + "/heartbeat"
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, data=data, headers={"api-v": "2"}) as response:
code = response.status
body = await response.json()
self.log.debug(f"Auth: code {code}, body {body}")
ok = True
break
except Exception as e:
self.log.debug(f"Auth: Error `{e}` while auth with `{server_url}`")
continue
if ok:
if not (body.get("status") is not None and
body.get("code") is not None and
body.get("msg") is not None):
self.log.error("Missing/invalid json members in backend response")
raise KeyboardInterrupt
if ok:
if not (body.get("status") is not None and
body.get("code") is not None and
body.get("msg") is not None):
self.log.error("Missing/invalid json members in backend response")
raise KeyboardInterrupt
if test:
status = body.get("status")
msg = body.get("msg")
if status == "2000":
self.log.info(f"Authenticated! {msg}")
elif status == "200":
self.log.info(f"Resumed authenticated session. {msg}")
else:
self.log.error(f"Backend REFUSED the auth key. Reason: "
f"{msg or 'Backend did not provide a reason'}")
if test:
status = body.get("status")
msg = body.get("msg")
if status == "2000":
self.log.info(f"Authenticated! {msg}")
elif status == "200":
self.log.info(f"Resumed authenticated session. {msg}")
else:
self.log.error(f"Backend REFUSED the auth key. Reason: "
f"{msg or 'Backend did not provide a reason'}")
self.log.info(f"Server still runnig, but only in Direct connect mode.")
self.direct = True
else:
self.direct = True
if test:
self.log.error("Cannot auth...")
if not config.Auth['private']:
raise KeyboardInterrupt
if test:
self.log.info(f"Server still runnig, but only in Direct connect mode.")
self.direct = True
else:
self.direct = True
if test:
self.log.error("Cannot auth...")
if not config.Auth['private']:
raise KeyboardInterrupt
if test:
self.log.info(f"Server still runnig, but only in Direct connect mode.")
if test:
return ok
if test:
return ok
await asyncio.sleep(5)
await asyncio.sleep(5)
except Exception as e:
self.log.error(f"Error in heartbeat: {e}")
async def main(self):
self.run = True
self.tcp = self.tcp(self, self.server_ip, self.server_port)
self.udp = self.udp(self, self.server_ip, self.server_port)
console.add_command(
"list",
lambda x: f"Players list: {self.get_clients_list(True)}"
)
try:
# WebApi Start
if config.WebAPI["enabled"]:
@@ -336,9 +409,11 @@ class Core:
self.log.info(f"Loaded {lmods} mods: {round(self.mods_list[0] / MB, 2)}mb")
await self.heartbeat(True)
for i in range(int(config.Game["players"] * 1.3)):
self.clients.append(None)
tasks = []
# self.check_alive()
nrtasks = [self.tcp.start, self.udp.start, console.start, self.stop_me, self.heartbeat, ]
# self.udp.start,
nrtasks = [self.tcp.start, console.start, self.stop_me, self.heartbeat, self.check_alive]
for task in nrtasks:
tasks.append(asyncio.create_task(task()))
t = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
@@ -354,7 +429,7 @@ class Core:
pass
finally:
self.tcp.stop()
self.udp.stop()
# self.udp.stop()
self.run = False
def start(self):