diff --git a/src/core/core.py b/src/core/core.py index 3d84d14..5a16919 100644 --- a/src/core/core.py +++ b/src/core/core.py @@ -8,7 +8,9 @@ import asyncio import math import os import random +import statistics import time +from collections import deque import aiohttp @@ -19,13 +21,22 @@ from core.udp_server import UDPServer from modules import PluginsLoader +def calc_ticks(ticks, duration): + while ticks and ticks[0] < time.monotonic() - duration: + ticks.popleft() + return len(ticks) / duration + +loop = asyncio.new_event_loop() +asyncio.set_event_loop(loop) + + # noinspection PyProtectedMember class Core: def __init__(self): + self.tick_counter = 0 self.log = utils.get_logger("core") - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.loop) + self.loop = asyncio.get_event_loop() self.start_time = time.monotonic() self.run = False self.direct = False @@ -39,6 +50,8 @@ class Core: self.tcp = TCPServer self.udp = UDPServer + self.tps = 10 + self.lock_upload = False self.client_major_version = "2.0" @@ -97,24 +110,30 @@ class Core: out = out[:-1] return out - async def check_alive(self): - self.log.debug("Starting alive checker.") - maxp = config.Game['players'] + async def _check_alive(self, _): + # self.log.debug("alive checker.") try: - while self.run: - await asyncio.sleep(1) - ca = f"Ss{len(self.clients_by_id)}/{maxp}:{self.get_clients_list()}" - for client in self.clients: - if not client: - continue - if not client.ready: - client.is_disconnected() - continue - if not client.alive: - await client.kick("You are not alive!") - await client._send(ca) + for client in self.clients: + if not client: + continue + if not client.ready: + client.is_disconnected() + continue + if not client.alive: + await client.kick("You are not alive!") except Exception as e: - self.log.error("Error in check_alive.") + self.log.error("Error in _check_alive.") + self.log.exception(e) + + async def _send_online(self, _): + try: + for client in self.clients: + ca = f"Ss{len(self.clients_by_id)}/{config.Game['players']}:{self.get_clients_list()}" + if not client or not client.alive: + continue + await client._send(ca) + except Exception as e: + self.log.error("Error in _send_online.") self.log.exception(e) async def __gracefully_kick(self): @@ -217,7 +236,7 @@ class Core: if test: return bool(body) - await asyncio.sleep(5) + await asyncio.sleep(15) except Exception as e: self.log.error(f"Error in heartbeat: {e}") @@ -237,6 +256,89 @@ class Core: else: return "Client not found." + async def _useful_ticks(self, _): + target_tps = 20 + tasks = [] + self.tick_counter += 1 + events = { + 0.5: "serverTick_0.5s", + 1: "serverTick_1s", + 2: "serverTick_2s", + 3: "serverTick_3s", + 4: "serverTick_4s", + 5: "serverTick_5s", + 10: "serverTick_10s", + 30: "serverTick_30s", + 60: "serverTick_60s" + } + for interval in sorted(events.keys(), reverse=True): + if self.tick_counter % (interval * target_tps) == 0: + ev.call_event(events[interval]) + tasks.append(ev.call_async_event(events[interval])) + await asyncio.gather(*tasks) + if self.tick_counter == (60 * target_tps): + self.tick_counter = 0 + + async def _tick(self): + try: + ticks = 0 + target_tps = 20 + target_interval = 1 / target_tps + last_tick_time = time.monotonic() + ev.register("serverTick", self._useful_ticks) + ticks_2s = deque(maxlen=2 * int(target_tps) + 1) + ticks_5s = deque(maxlen=5 * int(target_tps) + 1) + ticks_30s = deque(maxlen=30 * int(target_tps) + 1) + ticks_60s = deque(maxlen=60 * int(target_tps) + 1) + console.add_command("tps", lambda + _: f"{calc_ticks(ticks_2s, 2):.2f}TPS; last: {calc_ticks(ticks_5s, 5):.2f}TPS/5s; {calc_ticks(ticks_30s, 30):.2f}TPS/30s; {calc_ticks(ticks_60s, 60):.2f}TPS/60s;", + None, "Print TPS", {"tps": None}) + _add_to_sleep = deque(maxlen=13 * int(target_tps)) + _add_to_sleep.append(0.013) + + self.log.debug("tick system started") + while self.run: + start_time = time.monotonic() + + ev.call_event("serverTick") + await ev.call_async_event("serverTick") + + # Calculate the time taken for this tick + end_time = time.monotonic() + tick_duration = end_time - start_time + + # Calculate the time to sleep to maintain target TPS + sleep_time = target_interval - tick_duration - statistics.fmean(_add_to_sleep) + if sleep_time > 0: + await asyncio.sleep(sleep_time) + + # Update tick count and time + ticks += 1 + current_time = time.monotonic() + ticks_2s.append(current_time) + ticks_5s.append(current_time) + ticks_30s.append(current_time) + ticks_60s.append(current_time) + + # Calculate TPS + elapsed_time = current_time - last_tick_time + if elapsed_time >= 1: + self.tps = ticks / elapsed_time + # if self.tps < 5: + # self.log.warning(f"Low TPS: {self.tps:.2f}") + # Reset for next calculation + last_tick_time = current_time + ticks = 0 + + tw = time.monotonic() - start_time - sleep_time + _add_to_sleep.append(tw) + # if elapsed_time >= 1: + # self.log.debug( + # f"ts: {sleep_time}; tw: {tw}; tw-ts: {tw - sleep_time} ({statistics.fmean(_add_to_sleep)});") + self.log.debug("tick system stopped") + except Exception as e: + self.log.exception(e) + async def main(self): self.tcp = self.tcp(self, self.server_ip, self.server_port) self.udp = self.udp(self, self.server_ip, self.server_port) @@ -278,8 +380,10 @@ class Core: for i in range(int(config.Game["players"] * 2.3)): # * 2.3 For down sock and buffer. self.clients.append(None) tasks = [] - # self.udp.start - f_tasks = [self.tcp.start, self.udp._start, console.start, self.heartbeat, self.check_alive] + ev.register("serverTick_1s", self._check_alive) + ev.register("serverTick_1s", self._send_online) + # ev.register("serverTick_5s", self.heartbeat) + f_tasks = [self.tcp.start, self.udp._start, console.start, self._tick, self.heartbeat] if config.RCON['enabled']: console.rcon.version = f"KuiToi {__version__}" rcon = console.rcon(config.RCON['password'], config.RCON['server_ip'], config.RCON['server_port']) @@ -301,19 +405,19 @@ class Core: self.log.error(f"Exception: {e}") self.log.exception(e) finally: - self.run = False - self.tcp.stop() - self.udp._stop() await self.stop() def start(self): asyncio.run(self.main()) async def stop(self): + self.run = False ev.call_lua_event("onShutdown") + await self.__gracefully_kick() + self.tcp.stop() + self.udp._stop() ev.call_event("onServerStopped") await ev.call_async_event("onServerStopped") - await self.__gracefully_kick() if config.Options['use_lua']: await ev.call_async_event("_lua_plugins_unload") await ev.call_async_event("_plugins_unload") diff --git a/src/core/core.pyi b/src/core/core.pyi index 049abde..cb44da4 100644 --- a/src/core/core.pyi +++ b/src/core/core.pyi @@ -17,6 +17,8 @@ from .udp_server import UDPServer class Core: def __init__(self): + self.tick_counter = 0 + self.tps = 10 self.start_time = time.monotonic() self.log = utils.get_logger("core") self.loop = asyncio.get_event_loop() @@ -41,11 +43,11 @@ class Core: async def insert_client(self, client: Client) -> None: ... def create_client(self, *args, **kwargs) -> Client: ... def get_clients_list(self, need_cid=False) -> str: ... - async def check_alive(self) -> None: ... + async def _check_alive(self) -> None: ... + async def _send_online(self) -> None: ... + async def _useful_ticks(self, _) -> None: ... async def __gracefully_kick(self): ... - @staticmethod - def start_web() -> None: ... - def stop_me(self) -> None: ... + def _tick(self) -> None: ... async def heartbeat(self, test=False) -> None: ... async def kick_cmd(self, args: list) -> None | str: ... async def main(self) -> None: ... diff --git a/src/core/tcp_server.py b/src/core/tcp_server.py index 8f48cb1..86423d5 100644 --- a/src/core/tcp_server.py +++ b/src/core/tcp_server.py @@ -184,14 +184,13 @@ class TCPServer: self.run = False self.Core.run = False - async def stop(self): + def stop(self): self.log.debug("Stopping TCP server") try: self.server.close() for conn in self._connections: conn.close() - await conn.wait_closed() - await self.server.wait_closed() + # await conn.wait_closed() + # await self.server.wait_closed() except Exception as e: self.log.exception(e) - self.log.debug("Stopped") diff --git a/src/modules/EventsSystem/__init__.py b/src/modules/EventsSystem/__init__.py index c9fdf95..9fb69b1 100644 --- a/src/modules/EventsSystem/__init__.py +++ b/src/modules/EventsSystem/__init__.py @@ -38,6 +38,16 @@ class EventsSystem: "onChangePosition": [], # Only sync, no handler "onPlayerDisconnect": [], # No handler "onServerStopped": [], # No handler + "serverTick": [], + "serverTick_0.5s": [], + "serverTick_1s": [], + "serverTick_2s": [], + "serverTick_3s": [], + "serverTick_4s": [], + "serverTick_5s": [], + "serverTick_10s": [], + "serverTick_30s": [], + "serverTick_60s": [], } self.__async_events = { "onServerStarted": [], @@ -51,7 +61,17 @@ class EventsSystem: "onCarChanged": [], "onCarFocusMove": [], "onPlayerDisconnect": [], - "onServerStopped": [] + "onServerStopped": [], + "serverTick": [], + "serverTick_0.5s": [], + "serverTick_1s": [], + "serverTick_2s": [], + "serverTick_3s": [], + "serverTick_4s": [], + "serverTick_5s": [], + "serverTick_10s": [], + "serverTick_30s": [], + "serverTick_60s": [], } self.__lua_events = { @@ -109,7 +129,8 @@ class EventsSystem: self.log.debug("Register ok") async def call_async_event(self, event_name, *args, **kwargs): - self.log.debug(f"Calling async event: '{event_name}'") + if not event_name.startswith("serverTick"): + self.log.debug(f"Calling async event: '{event_name}'") funcs_data = [] if event_name in self.__async_events.keys(): for func in self.__async_events[event_name]: @@ -125,8 +146,10 @@ class EventsSystem: return funcs_data - def call_event(self, event_name, *args, **kwargs): - if event_name not in ["onChangePosition", "onSentPing"]: # UDP events + def call_event(self, event_name: str, *args, **kwargs): + if event_name not in ( + "onChangePosition", "onSentPing", # UDP events + ) and not event_name.startswith("serverTick"): self.log.debug(f"Calling sync event: '{event_name}'") funcs_data = []