[!] FIX tcp. Againg?...
This commit is contained in:
2024-07-27 17:00:27 +03:00
parent b0303f3e6d
commit 2ee74c310d
4 changed files with 165 additions and 37 deletions

View File

@@ -8,7 +8,9 @@ import asyncio
import math
import os
import random
import statistics
import time
from collections import deque
import aiohttp
@@ -19,13 +21,22 @@ from core.udp_server import UDPServer
from modules import PluginsLoader
def calc_ticks(ticks, duration):
while ticks and ticks[0] < time.monotonic() - duration:
ticks.popleft()
return len(ticks) / duration
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# noinspection PyProtectedMember
class Core:
def __init__(self):
self.tick_counter = 0
self.log = utils.get_logger("core")
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop = asyncio.get_event_loop()
self.start_time = time.monotonic()
self.run = False
self.direct = False
@@ -39,6 +50,8 @@ class Core:
self.tcp = TCPServer
self.udp = UDPServer
self.tps = 10
self.lock_upload = False
self.client_major_version = "2.0"
@@ -97,24 +110,30 @@ class Core:
out = out[:-1]
return out
async def check_alive(self):
self.log.debug("Starting alive checker.")
maxp = config.Game['players']
async def _check_alive(self, _):
# self.log.debug("alive checker.")
try:
while self.run:
await asyncio.sleep(1)
ca = f"Ss{len(self.clients_by_id)}/{maxp}:{self.get_clients_list()}"
for client in self.clients:
if not client:
continue
if not client.ready:
client.is_disconnected()
continue
if not client.alive:
await client.kick("You are not alive!")
await client._send(ca)
for client in self.clients:
if not client:
continue
if not client.ready:
client.is_disconnected()
continue
if not client.alive:
await client.kick("You are not alive!")
except Exception as e:
self.log.error("Error in check_alive.")
self.log.error("Error in _check_alive.")
self.log.exception(e)
async def _send_online(self, _):
try:
for client in self.clients:
ca = f"Ss{len(self.clients_by_id)}/{config.Game['players']}:{self.get_clients_list()}"
if not client or not client.alive:
continue
await client._send(ca)
except Exception as e:
self.log.error("Error in _send_online.")
self.log.exception(e)
async def __gracefully_kick(self):
@@ -217,7 +236,7 @@ class Core:
if test:
return bool(body)
await asyncio.sleep(5)
await asyncio.sleep(15)
except Exception as e:
self.log.error(f"Error in heartbeat: {e}")
@@ -237,6 +256,89 @@ class Core:
else:
return "Client not found."
async def _useful_ticks(self, _):
target_tps = 20
tasks = []
self.tick_counter += 1
events = {
0.5: "serverTick_0.5s",
1: "serverTick_1s",
2: "serverTick_2s",
3: "serverTick_3s",
4: "serverTick_4s",
5: "serverTick_5s",
10: "serverTick_10s",
30: "serverTick_30s",
60: "serverTick_60s"
}
for interval in sorted(events.keys(), reverse=True):
if self.tick_counter % (interval * target_tps) == 0:
ev.call_event(events[interval])
tasks.append(ev.call_async_event(events[interval]))
await asyncio.gather(*tasks)
if self.tick_counter == (60 * target_tps):
self.tick_counter = 0
async def _tick(self):
try:
ticks = 0
target_tps = 20
target_interval = 1 / target_tps
last_tick_time = time.monotonic()
ev.register("serverTick", self._useful_ticks)
ticks_2s = deque(maxlen=2 * int(target_tps) + 1)
ticks_5s = deque(maxlen=5 * int(target_tps) + 1)
ticks_30s = deque(maxlen=30 * int(target_tps) + 1)
ticks_60s = deque(maxlen=60 * int(target_tps) + 1)
console.add_command("tps", lambda
_: f"{calc_ticks(ticks_2s, 2):.2f}TPS; last: {calc_ticks(ticks_5s, 5):.2f}TPS/5s; {calc_ticks(ticks_30s, 30):.2f}TPS/30s; {calc_ticks(ticks_60s, 60):.2f}TPS/60s;",
None, "Print TPS", {"tps": None})
_add_to_sleep = deque(maxlen=13 * int(target_tps))
_add_to_sleep.append(0.013)
self.log.debug("tick system started")
while self.run:
start_time = time.monotonic()
ev.call_event("serverTick")
await ev.call_async_event("serverTick")
# Calculate the time taken for this tick
end_time = time.monotonic()
tick_duration = end_time - start_time
# Calculate the time to sleep to maintain target TPS
sleep_time = target_interval - tick_duration - statistics.fmean(_add_to_sleep)
if sleep_time > 0:
await asyncio.sleep(sleep_time)
# Update tick count and time
ticks += 1
current_time = time.monotonic()
ticks_2s.append(current_time)
ticks_5s.append(current_time)
ticks_30s.append(current_time)
ticks_60s.append(current_time)
# Calculate TPS
elapsed_time = current_time - last_tick_time
if elapsed_time >= 1:
self.tps = ticks / elapsed_time
# if self.tps < 5:
# self.log.warning(f"Low TPS: {self.tps:.2f}")
# Reset for next calculation
last_tick_time = current_time
ticks = 0
tw = time.monotonic() - start_time - sleep_time
_add_to_sleep.append(tw)
# if elapsed_time >= 1:
# self.log.debug(
# f"ts: {sleep_time}; tw: {tw}; tw-ts: {tw - sleep_time} ({statistics.fmean(_add_to_sleep)});")
self.log.debug("tick system stopped")
except Exception as e:
self.log.exception(e)
async def main(self):
self.tcp = self.tcp(self, self.server_ip, self.server_port)
self.udp = self.udp(self, self.server_ip, self.server_port)
@@ -278,8 +380,10 @@ class Core:
for i in range(int(config.Game["players"] * 2.3)): # * 2.3 For down sock and buffer.
self.clients.append(None)
tasks = []
# self.udp.start
f_tasks = [self.tcp.start, self.udp._start, console.start, self.heartbeat, self.check_alive]
ev.register("serverTick_1s", self._check_alive)
ev.register("serverTick_1s", self._send_online)
# ev.register("serverTick_5s", self.heartbeat)
f_tasks = [self.tcp.start, self.udp._start, console.start, self._tick, self.heartbeat]
if config.RCON['enabled']:
console.rcon.version = f"KuiToi {__version__}"
rcon = console.rcon(config.RCON['password'], config.RCON['server_ip'], config.RCON['server_port'])
@@ -301,19 +405,19 @@ class Core:
self.log.error(f"Exception: {e}")
self.log.exception(e)
finally:
self.run = False
self.tcp.stop()
self.udp._stop()
await self.stop()
def start(self):
asyncio.run(self.main())
async def stop(self):
self.run = False
ev.call_lua_event("onShutdown")
await self.__gracefully_kick()
self.tcp.stop()
self.udp._stop()
ev.call_event("onServerStopped")
await ev.call_async_event("onServerStopped")
await self.__gracefully_kick()
if config.Options['use_lua']:
await ev.call_async_event("_lua_plugins_unload")
await ev.call_async_event("_plugins_unload")

View File

@@ -17,6 +17,8 @@ from .udp_server import UDPServer
class Core:
def __init__(self):
self.tick_counter = 0
self.tps = 10
self.start_time = time.monotonic()
self.log = utils.get_logger("core")
self.loop = asyncio.get_event_loop()
@@ -41,11 +43,11 @@ class Core:
async def insert_client(self, client: Client) -> None: ...
def create_client(self, *args, **kwargs) -> Client: ...
def get_clients_list(self, need_cid=False) -> str: ...
async def check_alive(self) -> None: ...
async def _check_alive(self) -> None: ...
async def _send_online(self) -> None: ...
async def _useful_ticks(self, _) -> None: ...
async def __gracefully_kick(self): ...
@staticmethod
def start_web() -> None: ...
def stop_me(self) -> None: ...
def _tick(self) -> None: ...
async def heartbeat(self, test=False) -> None: ...
async def kick_cmd(self, args: list) -> None | str: ...
async def main(self) -> None: ...

View File

@@ -184,14 +184,13 @@ class TCPServer:
self.run = False
self.Core.run = False
async def stop(self):
def stop(self):
self.log.debug("Stopping TCP server")
try:
self.server.close()
for conn in self._connections:
conn.close()
await conn.wait_closed()
await self.server.wait_closed()
# await conn.wait_closed()
# await self.server.wait_closed()
except Exception as e:
self.log.exception(e)
self.log.debug("Stopped")

View File

@@ -38,6 +38,16 @@ class EventsSystem:
"onChangePosition": [], # Only sync, no handler
"onPlayerDisconnect": [], # No handler
"onServerStopped": [], # No handler
"serverTick": [],
"serverTick_0.5s": [],
"serverTick_1s": [],
"serverTick_2s": [],
"serverTick_3s": [],
"serverTick_4s": [],
"serverTick_5s": [],
"serverTick_10s": [],
"serverTick_30s": [],
"serverTick_60s": [],
}
self.__async_events = {
"onServerStarted": [],
@@ -51,7 +61,17 @@ class EventsSystem:
"onCarChanged": [],
"onCarFocusMove": [],
"onPlayerDisconnect": [],
"onServerStopped": []
"onServerStopped": [],
"serverTick": [],
"serverTick_0.5s": [],
"serverTick_1s": [],
"serverTick_2s": [],
"serverTick_3s": [],
"serverTick_4s": [],
"serverTick_5s": [],
"serverTick_10s": [],
"serverTick_30s": [],
"serverTick_60s": [],
}
self.__lua_events = {
@@ -109,7 +129,8 @@ class EventsSystem:
self.log.debug("Register ok")
async def call_async_event(self, event_name, *args, **kwargs):
self.log.debug(f"Calling async event: '{event_name}'")
if not event_name.startswith("serverTick"):
self.log.debug(f"Calling async event: '{event_name}'")
funcs_data = []
if event_name in self.__async_events.keys():
for func in self.__async_events[event_name]:
@@ -125,8 +146,10 @@ class EventsSystem:
return funcs_data
def call_event(self, event_name, *args, **kwargs):
if event_name not in ["onChangePosition", "onSentPing"]: # UDP events
def call_event(self, event_name: str, *args, **kwargs):
if event_name not in (
"onChangePosition", "onSentPing", # UDP events
) and not event_name.startswith("serverTick"):
self.log.debug(f"Calling sync event: '{event_name}'")
funcs_data = []