major refactor of Client and Server

this refactor includes changes to TClient:

- all member fields are now public, but protected with Sync (an alias
  for boost::synchronized_value
- removed all (now) obsolete getters and setters

changes to TServer and TNetwork:

- thread-safe ID generation, previously it was possible for there to be
  ID duplicates. this is now solved by moving id generation and
  assignment into the same mutex locked context.
- deprecated ForEachClientWeak and replaced some usages of it with
  ForEachClient, getting rid of the weak_ptr shit in most places
- implemented a bunch of new functions for getting rid of more weak_ptr
  everywhere
This commit is contained in:
Lion Kortlepel 2024-01-08 14:55:53 +01:00
parent c6aa7776fc
commit b9f73f77c3
No known key found for this signature in database
GPG Key ID: 4322FF2B4C71259B
13 changed files with 375 additions and 388 deletions

View File

@ -10,6 +10,8 @@
#include "BoostAliases.h"
#include "Common.h"
#include "Compat.h"
#include "RWMutex.h"
#include "Sync.h"
#include "VehicleData.h"
class TServer;
@ -41,76 +43,47 @@ public:
void AddNewCar(int Ident, const std::string& Data);
void SetCarData(int Ident, const std::string& Data);
void SetCarPosition(int Ident, const std::string& Data);
TVehicleDataLockPair GetAllCars();
void SetName(const std::string& Name) { mName = Name; }
void SetRoles(const std::string& Role) { mRole = Role; }
void SetIdentifier(const std::string& key, const std::string& value) { mIdentifiers[key] = value; }
void SetName(const std::string& NewName) { Name = NewName; }
void SetRoles(const std::string& NewRole) { Role = NewRole; }
void SetIdentifier(const std::string& key, const std::string& value);
std::string GetCarData(int Ident);
std::string GetCarPositionRaw(int Ident);
void SetUDPAddr(const ip::udp::endpoint& Addr) { mUDPAddress = Addr; }
void SetDownSock(ip::tcp::socket&& CSock) { mDownSocket = std::move(CSock); }
void SetTCPSock(ip::tcp::socket&& CSock) { mSocket = std::move(CSock); }
void Disconnect(std::string_view Reason);
bool IsDisconnected() const { return !mSocket.is_open(); }
bool IsDisconnected() const { return !TCPSocket->is_open(); }
// locks
void DeleteCar(int Ident);
[[nodiscard]] const std::unordered_map<std::string, std::string>& GetIdentifiers() const { return mIdentifiers; }
[[nodiscard]] const ip::udp::endpoint& GetUDPAddr() const { return mUDPAddress; }
[[nodiscard]] ip::udp::endpoint& GetUDPAddr() { return mUDPAddress; }
[[nodiscard]] ip::tcp::socket& GetDownSock() { return mDownSocket; }
[[nodiscard]] const ip::tcp::socket& GetDownSock() const { return mDownSocket; }
[[nodiscard]] ip::tcp::socket& GetTCPSock() { return mSocket; }
[[nodiscard]] const ip::tcp::socket& GetTCPSock() const { return mSocket; }
[[nodiscard]] std::string GetRoles() const { return mRole; }
[[nodiscard]] std::string GetName() const { return mName; }
void SetUnicycleID(int ID) { mUnicycleID = ID; }
void SetID(int ID) { mID = ID; }
[[nodiscard]] int GetOpenCarID() const;
[[nodiscard]] int GetCarCount() const;
void ClearCars();
[[nodiscard]] int GetID() const { return mID; }
[[nodiscard]] int GetUnicycleID() const { return mUnicycleID; }
[[nodiscard]] bool IsConnected() const { return mIsConnected; }
[[nodiscard]] bool IsSynced() const { return mIsSynced; }
[[nodiscard]] bool IsSyncing() const { return mIsSyncing; }
[[nodiscard]] bool IsGuest() const { return mIsGuest; }
void SetIsGuest(bool NewIsGuest) { mIsGuest = NewIsGuest; }
void SetIsSynced(bool NewIsSynced) { mIsSynced = NewIsSynced; }
void SetIsSyncing(bool NewIsSyncing) { mIsSyncing = NewIsSyncing; }
void EnqueuePacket(const std::vector<uint8_t>& Packet);
[[nodiscard]] std::queue<std::vector<uint8_t>>& MissedPacketQueue() { return mPacketsSync; }
[[nodiscard]] const std::queue<std::vector<uint8_t>>& MissedPacketQueue() const { return mPacketsSync; }
[[nodiscard]] size_t MissedPacketQueueSize() const { return mPacketsSync.size(); }
[[nodiscard]] std::mutex& MissedPacketQueueMutex() const { return mMissedPacketsMutex; }
void SetIsConnected(bool NewIsConnected) { mIsConnected = NewIsConnected; }
void SetIsConnected(bool NewIsConnected) { IsConnected = NewIsConnected; }
[[nodiscard]] TServer& Server() const;
void UpdatePingTime();
int SecondsSinceLastPing();
Sync<bool> IsConnected = false;
Sync<bool> IsSynced = false;
Sync<bool> IsSyncing = false;
Sync<std::unordered_map<std::string, std::string>> Identifiers;
Sync<ip::tcp::socket> TCPSocket;
Sync<ip::tcp::socket> DownSocket;
Sync<ip::udp::endpoint> UDPAddress {};
Sync<int> UnicycleID = -1;
Sync<std::string> Role;
Sync<std::string> DID;
Sync<int> ID = -1;
Sync<bool> IsGuest = false;
Sync<std::string> Name = std::string("Unknown Client");
Sync<TSetOfVehicleData> VehicleData;
Sync<SparseArray<std::string>> VehiclePosition;
Sync<std::queue<std::vector<uint8_t>>> MissedPacketsQueue;
Sync<std::chrono::time_point<std::chrono::high_resolution_clock>> LastPingTime;
private:
void InsertVehicle(int ID, const std::string& Data);
TServer& mServer;
bool mIsConnected = false;
bool mIsSynced = false;
bool mIsSyncing = false;
mutable std::mutex mMissedPacketsMutex;
std::queue<std::vector<uint8_t>> mPacketsSync;
std::unordered_map<std::string, std::string> mIdentifiers;
bool mIsGuest = false;
mutable std::mutex mVehicleDataMutex;
mutable std::mutex mVehiclePositionMutex;
TSetOfVehicleData mVehicleData;
SparseArray<std::string> mVehiclePosition;
std::string mName = "Unknown Client";
ip::tcp::socket mSocket;
ip::tcp::socket mDownSocket;
ip::udp::endpoint mUDPAddress {};
int mUnicycleID = -1;
std::string mRole;
std::string mDID;
int mID = -1;
std::chrono::time_point<std::chrono::high_resolution_clock> mLastPingTime;
};
std::optional<std::weak_ptr<TClient>> GetClient(class TServer& Server, int ID);
std::optional<std::shared_ptr<TClient>> GetClient(class TServer& Server, int ID);

9
include/Sync.h Normal file
View File

@ -0,0 +1,9 @@
#pragma once
#include <boost/thread/synchronized_value.hpp>
/// This header provides convenience aliases for synchronization primitives.
template<typename T>
using Sync = boost::synchronized_value<T>;

View File

@ -220,6 +220,8 @@ private:
// Debug functions, slow
std::queue<std::pair<TLuaChunk, std::shared_ptr<TLuaResult>>> Debug_GetStateExecuteQueue();
std::vector<TLuaEngine::QueuedFunction> Debug_GetStateFunctionQueue();
sol::table Lua_JsonDecode(const std::string& str);
private:
sol::table Lua_TriggerGlobalEvent(const std::string& EventName, sol::variadic_args EventArgs);
@ -230,7 +232,6 @@ private:
sol::table Lua_GetPlayerVehicles(int ID);
std::pair<sol::table, std::string> Lua_GetPositionRaw(int PID, int VID);
sol::table Lua_HttpCreateConnection(const std::string& host, uint16_t port);
sol::table Lua_JsonDecode(const std::string& str);
int Lua_GetPlayerIDByName(const std::string& Name);
sol::table Lua_FS_ListFiles(const std::string& Path);
sol::table Lua_FS_ListDirectories(const std::string& Path);

View File

@ -43,8 +43,9 @@ private:
void OnConnect(const std::weak_ptr<TClient>& c);
void TCPClient(const std::weak_ptr<TClient>& c);
void Looper(const std::weak_ptr<TClient>& c);
int OpenID();
void OnDisconnect(const std::shared_ptr<TClient>& ClientPtr);
void OnDisconnect(const std::weak_ptr<TClient>& ClientPtr);
void OnDisconnect(TClient& Client);
void Parse(TClient& c, const std::vector<uint8_t>& Packet);
void SendFile(TClient& c, const std::string& Name);
static bool TCPSendRaw(TClient& C, ip::tcp::socket& socket, const uint8_t* Data, size_t Size);

View File

@ -22,14 +22,19 @@ public:
void InsertClient(const std::shared_ptr<TClient>& Ptr);
void RemoveClient(const std::weak_ptr<TClient>&);
void RemoveClient(TClient&);
// in Fn, return true to continue, return false to break
void ForEachClient(const std::function<bool(std::weak_ptr<TClient>)>& Fn);
[[deprecated("Use ForEachClient instead")]] void ForEachClientWeak(const std::function<bool(std::weak_ptr<TClient>)>& Fn);
void ForEachClient(const std::function<bool(const std::shared_ptr<TClient>&)> Fn);
size_t ClientCount() const;
void GlobalParser(const std::weak_ptr<TClient>& Client, std::vector<uint8_t>&& Packet, TPPSMonitor& PPSMonitor, TNetwork& Network);
static void HandleEvent(TClient& c, const std::string& Data);
RWMutex& GetClientMutex() const { return mClientsMutex; }
// thread-safe ID lookup & claim
void ClaimFreeIDFor(TClient& Client);
const TScopedTimer UptimeTimer;
// asio io context

View File

@ -7,29 +7,28 @@
void TClient::DeleteCar(int Ident) {
// TODO: Send delete packets
std::unique_lock lock(mVehicleDataMutex);
auto iter = std::find_if(mVehicleData.begin(), mVehicleData.end(), [&](auto& elem) {
auto VehData = VehicleData.synchronize();
auto iter = std::find_if(VehData->begin(), VehData->end(), [&](auto& elem) {
return Ident == elem.ID();
});
if (iter != mVehicleData.end()) {
mVehicleData.erase(iter);
if (iter != VehData->end()) {
VehData->erase(iter);
} else {
beammp_debug("tried to erase a vehicle that doesn't exist (not an error)");
}
}
void TClient::ClearCars() {
std::unique_lock lock(mVehicleDataMutex);
mVehicleData.clear();
VehicleData->clear();
}
int TClient::GetOpenCarID() const {
int OpenID = 0;
bool found;
std::unique_lock lock(mVehicleDataMutex);
auto VehData = VehicleData.synchronize();
do {
found = true;
for (auto& v : mVehicleData) {
for (auto& v : *VehData) {
if (v.ID() == OpenID) {
OpenID++;
found = false;
@ -40,46 +39,41 @@ int TClient::GetOpenCarID() const {
}
void TClient::AddNewCar(int Ident, const std::string& Data) {
std::unique_lock lock(mVehicleDataMutex);
mVehicleData.emplace_back(Ident, Data);
}
TClient::TVehicleDataLockPair TClient::GetAllCars() {
return { &mVehicleData, std::unique_lock(mVehicleDataMutex) };
VehicleData->emplace_back(Ident, Data);
}
std::string TClient::GetCarPositionRaw(int Ident) {
std::unique_lock lock(mVehiclePositionMutex);
try {
return mVehiclePosition.at(size_t(Ident));
return VehiclePosition->at(size_t(Ident));
} catch (const std::out_of_range& oor) {
beammp_debugf("Failed to get vehicle position for {}: {}", Ident, oor.what());
beammp_debugf("GetCarPositionRaw failed for id {}, as that car doesn't exist on client id {}: {}", Ident, int(ID), oor.what());
return "";
}
}
void TClient::Disconnect(std::string_view Reason) {
beammp_debugf("Disconnecting client {} for reason: {}", GetID(), Reason);
auto LockedSocket = TCPSocket.synchronize();
beammp_debugf("Disconnecting client {} for reason: {}", int(ID), Reason);
boost::system::error_code ec;
mSocket.shutdown(socket_base::shutdown_both, ec);
LockedSocket->shutdown(socket_base::shutdown_both, ec);
if (ec) {
beammp_debugf("Failed to shutdown client socket: {}", ec.message());
}
mSocket.close(ec);
LockedSocket->close(ec);
if (ec) {
beammp_debugf("Failed to close client socket: {}", ec.message());
}
}
void TClient::SetCarPosition(int Ident, const std::string& Data) {
std::unique_lock lock(mVehiclePositionMutex);
mVehiclePosition[size_t(Ident)] = Data;
// ugly but this is c++ so
VehiclePosition->operator[](size_t(Ident)) = Data;
}
std::string TClient::GetCarData(int Ident) {
{ // lock
std::unique_lock lock(mVehicleDataMutex);
for (auto& v : mVehicleData) {
auto Lock = VehicleData.synchronize();
for (auto& v : *Lock) {
if (v.ID() == Ident) {
return v.Data();
}
@ -91,8 +85,8 @@ std::string TClient::GetCarData(int Ident) {
void TClient::SetCarData(int Ident, const std::string& Data) {
{ // lock
std::unique_lock lock(mVehicleDataMutex);
for (auto& v : mVehicleData) {
auto Lock = VehicleData.synchronize();
for (auto& v : *Lock) {
if (v.ID() == Ident) {
v.SetData(Data);
return;
@ -103,7 +97,7 @@ void TClient::SetCarData(int Ident, const std::string& Data) {
}
int TClient::GetCarCount() const {
return int(mVehicleData.size());
return int(VehicleData->size());
}
TServer& TClient::Server() const {
@ -111,45 +105,44 @@ TServer& TClient::Server() const {
}
void TClient::EnqueuePacket(const std::vector<uint8_t>& Packet) {
std::unique_lock Lock(mMissedPacketsMutex);
mPacketsSync.push(Packet);
MissedPacketsQueue->push(Packet);
}
TClient::TClient(TServer& Server, ip::tcp::socket&& Socket)
: mServer(Server)
, mSocket(std::move(Socket))
, mDownSocket(ip::tcp::socket(Server.IoCtx()))
, mLastPingTime(std::chrono::high_resolution_clock::now()) {
: TCPSocket(std::move(Socket))
, DownSocket(ip::tcp::socket(Server.IoCtx()))
, LastPingTime(std::chrono::high_resolution_clock::now())
, mServer(Server) {
}
TClient::~TClient() {
beammp_debugf("client destroyed: {} ('{}')", this->GetID(), this->GetName());
beammp_debugf("client destroyed: {} ('{}')", ID.get(), Name.get());
}
void TClient::UpdatePingTime() {
mLastPingTime = std::chrono::high_resolution_clock::now();
LastPingTime = std::chrono::high_resolution_clock::now();
}
int TClient::SecondsSinceLastPing() {
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::high_resolution_clock::now() - mLastPingTime)
std::chrono::high_resolution_clock::now() - LastPingTime.get())
.count();
return int(seconds);
}
std::optional<std::weak_ptr<TClient>> GetClient(TServer& Server, int ID) {
std::optional<std::weak_ptr<TClient>> MaybeClient { std::nullopt };
Server.ForEachClient([&](std::weak_ptr<TClient> CPtr) -> bool {
ReadLock Lock(Server.GetClientMutex());
if (!CPtr.expired()) {
auto C = CPtr.lock();
if (C->GetID() == ID) {
MaybeClient = CPtr;
return false;
}
std::optional<std::shared_ptr<TClient>> GetClient(TServer& Server, int ID) {
std::optional<std::shared_ptr<TClient>> MaybeClient { std::nullopt };
Server.ForEachClient([ID, &MaybeClient](const auto& Client) -> bool {
if (Client->ID.get() == ID) {
MaybeClient = Client;
return false;
}
} else {
beammp_debugf("Found an expired client while looking for id {}", ID);
}
return true;
});
return MaybeClient;
}
void TClient::SetIdentifier(const std::string& key, const std::string& value) {
// I know this is bad, but what can ya do
Identifiers->operator[](key) = value;
}

View File

@ -120,11 +120,11 @@ static inline std::pair<bool, std::string> InternalTriggerClientEvent(int Player
return { true, "" };
} else {
auto MaybeClient = GetClient(LuaAPI::MP::Engine->Server(), PlayerID);
if (!MaybeClient || MaybeClient.value().expired()) {
if (!MaybeClient) {
beammp_lua_errorf("TriggerClientEvent invalid Player ID '{}'", PlayerID);
return { false, "Invalid Player ID" };
}
auto c = MaybeClient.value().lock();
auto c = MaybeClient.value();
if (!LuaAPI::MP::Engine->Network().Respond(*c, StringToVector(Packet), true)) {
beammp_lua_errorf("Respond failed, dropping client {}", PlayerID);
LuaAPI::MP::Engine->Network().ClientKick(*c, "Disconnected after failing to receive packets");
@ -141,11 +141,11 @@ std::pair<bool, std::string> LuaAPI::MP::TriggerClientEvent(int PlayerID, const
std::pair<bool, std::string> LuaAPI::MP::DropPlayer(int ID, std::optional<std::string> MaybeReason) {
auto MaybeClient = GetClient(Engine->Server(), ID);
if (!MaybeClient || MaybeClient.value().expired()) {
if (!MaybeClient) {
beammp_lua_errorf("Tried to drop client with id {}, who doesn't exist", ID);
return { false, "Player does not exist" };
}
auto c = MaybeClient.value().lock();
auto c = MaybeClient.value();
LuaAPI::MP::Engine->Network().ClientKick(*c, MaybeReason.value_or("No reason"));
return { true, "" };
}
@ -159,14 +159,14 @@ std::pair<bool, std::string> LuaAPI::MP::SendChatMessage(int ID, const std::stri
Result.first = true;
} else {
auto MaybeClient = GetClient(Engine->Server(), ID);
if (MaybeClient && !MaybeClient.value().expired()) {
auto c = MaybeClient.value().lock();
if (!c->IsSynced()) {
if (MaybeClient) {
auto c = MaybeClient.value();
if (!c->IsSynced) {
Result.first = false;
Result.second = "Player still syncing data";
return Result;
}
LogChatMessage("<Server> (to \"" + c->GetName() + "\")", -1, Message);
LogChatMessage("<Server> (to \"" + c->Name.get() + "\")", -1, Message);
if (!Engine->Network().Respond(*c, StringToVector(Packet), true)) {
beammp_errorf("Failed to send chat message back to sender (id {}) - did the sender disconnect?", ID);
// TODO: should we return an error here?
@ -185,13 +185,13 @@ std::pair<bool, std::string> LuaAPI::MP::SendChatMessage(int ID, const std::stri
std::pair<bool, std::string> LuaAPI::MP::RemoveVehicle(int PID, int VID) {
std::pair<bool, std::string> Result;
auto MaybeClient = GetClient(Engine->Server(), PID);
if (!MaybeClient || MaybeClient.value().expired()) {
if (!MaybeClient) {
beammp_lua_error("RemoveVehicle invalid Player ID");
Result.first = false;
Result.second = "Invalid Player ID";
return Result;
}
auto c = MaybeClient.value().lock();
auto c = MaybeClient.value();
if (!c->GetCarData(VID).empty()) {
std::string Destroy = "Od:" + std::to_string(PID) + "-" + std::to_string(VID);
Engine->Network().SendToAll(nullptr, StringToVector(Destroy), true, true);
@ -274,8 +274,8 @@ void LuaAPI::MP::Sleep(size_t Ms) {
bool LuaAPI::MP::IsPlayerConnected(int ID) {
auto MaybeClient = GetClient(Engine->Server(), ID);
if (MaybeClient && !MaybeClient.value().expired()) {
return MaybeClient.value().lock()->IsConnected();
if (MaybeClient) {
return MaybeClient.value()->IsConnected.get();
} else {
return false;
}
@ -283,8 +283,8 @@ bool LuaAPI::MP::IsPlayerConnected(int ID) {
bool LuaAPI::MP::IsPlayerGuest(int ID) {
auto MaybeClient = GetClient(Engine->Server(), ID);
if (MaybeClient && !MaybeClient.value().expired()) {
return MaybeClient.value().lock()->IsGuest();
if (MaybeClient) {
return MaybeClient.value()->IsGuest.get();
} else {
return false;
}

View File

@ -266,10 +266,10 @@ void TConsole::Command_Kick(const std::string&, const std::vector<std::string>&
std::for_each(Name2.begin(), Name2.end(), [](char& c) { c = char(std::tolower(char(c))); });
return StringStartsWith(Name1, Name2) || StringStartsWith(Name2, Name1);
};
mLuaEngine->Server().ForEachClient([&](std::weak_ptr<TClient> Client) -> bool {
mLuaEngine->Server().ForEachClientWeak([&](std::weak_ptr<TClient> Client) -> bool {
if (!Client.expired()) {
auto locked = Client.lock();
if (NameCompare(locked->GetName(), Name)) {
if (NameCompare(locked->Name.get(), Name)) {
mLuaEngine->Network().ClientKick(*locked, Reason);
Kicked = true;
return false;
@ -364,11 +364,11 @@ void TConsole::Command_List(const std::string&, const std::vector<std::string>&
} else {
std::stringstream ss;
ss << std::left << std::setw(25) << "Name" << std::setw(6) << "ID" << std::setw(6) << "Cars" << std::endl;
mLuaEngine->Server().ForEachClient([&](std::weak_ptr<TClient> Client) -> bool {
mLuaEngine->Server().ForEachClientWeak([&](std::weak_ptr<TClient> Client) -> bool {
if (!Client.expired()) {
auto locked = Client.lock();
ss << std::left << std::setw(25) << locked->GetName()
<< std::setw(6) << locked->GetID()
ss << std::left << std::setw(25) << locked->Name.get()
<< std::setw(6) << locked->ID.get()
<< std::setw(6) << locked->GetCarCount() << "\n";
}
return true;
@ -391,18 +391,15 @@ void TConsole::Command_Status(const std::string&, const std::vector<std::string>
size_t SyncingCount = 0;
size_t MissedPacketQueueSum = 0;
int LargestSecondsSinceLastPing = 0;
mLuaEngine->Server().ForEachClient([&](std::weak_ptr<TClient> Client) -> bool {
if (!Client.expired()) {
auto Locked = Client.lock();
CarCount += Locked->GetCarCount();
ConnectedCount += Locked->IsConnected() ? 1 : 0;
GuestCount += Locked->IsGuest() ? 1 : 0;
SyncedCount += Locked->IsSynced() ? 1 : 0;
SyncingCount += Locked->IsSyncing() ? 1 : 0;
MissedPacketQueueSum += Locked->MissedPacketQueueSize();
if (Locked->SecondsSinceLastPing() < LargestSecondsSinceLastPing) {
LargestSecondsSinceLastPing = Locked->SecondsSinceLastPing();
}
mLuaEngine->Server().ForEachClient([&](std::shared_ptr<TClient> Client) -> bool {
CarCount += size_t(Client->GetCarCount());
ConnectedCount += Client->IsConnected ? 1 : 0;
GuestCount += Client->IsGuest ? 1 : 0;
SyncedCount += Client->IsSynced ? 1 : 0;
SyncingCount += Client->IsSyncing ? 1 : 0;
MissedPacketQueueSum += Client->MissedPacketsQueue->size();
if (Client->SecondsSinceLastPing() < LargestSecondsSinceLastPing) {
LargestSecondsSinceLastPing = Client->SecondsSinceLastPing();
}
return true;
});

View File

@ -153,11 +153,8 @@ THeartbeatThread::THeartbeatThread(TResourceManager& ResourceManager, TServer& S
}
std::string THeartbeatThread::GetPlayers() {
std::string Return;
mServer.ForEachClient([&](const std::weak_ptr<TClient>& ClientPtr) -> bool {
ReadLock Lock(mServer.GetClientMutex());
if (!ClientPtr.expired()) {
Return += ClientPtr.lock()->GetName() + ";";
}
mServer.ForEachClient([&](const std::shared_ptr<TClient>& ClientPtr) -> bool {
Return += ClientPtr->Name.get() + ";";
return true;
});
return Return;

View File

@ -480,13 +480,13 @@ sol::table TLuaEngine::StateThreadData::Lua_TriggerLocalEvent(const std::string&
sol::table TLuaEngine::StateThreadData::Lua_GetPlayerIdentifiers(int ID) {
auto MaybeClient = GetClient(mEngine->Server(), ID);
if (MaybeClient && !MaybeClient.value().expired()) {
auto IDs = MaybeClient.value().lock()->GetIdentifiers();
if (IDs.empty()) {
if (MaybeClient) {
auto IDs = MaybeClient.value()->Identifiers.synchronize();
if (IDs->empty()) {
return sol::lua_nil;
}
sol::table Result = mStateView.create_table();
for (const auto& Pair : IDs) {
for (const auto& Pair : *IDs) {
Result[Pair.first] = Pair.second;
}
return Result;
@ -497,10 +497,10 @@ sol::table TLuaEngine::StateThreadData::Lua_GetPlayerIdentifiers(int ID) {
sol::table TLuaEngine::StateThreadData::Lua_GetPlayers() {
sol::table Result = mStateView.create_table();
mEngine->Server().ForEachClient([&](std::weak_ptr<TClient> Client) -> bool {
mEngine->Server().ForEachClientWeak([&](std::weak_ptr<TClient> Client) -> bool {
if (!Client.expired()) {
auto locked = Client.lock();
Result[locked->GetID()] = locked->GetName();
Result[locked->ID.get()] = locked->Name.get();
}
return true;
});
@ -509,11 +509,11 @@ sol::table TLuaEngine::StateThreadData::Lua_GetPlayers() {
int TLuaEngine::StateThreadData::Lua_GetPlayerIDByName(const std::string& Name) {
int Id = -1;
mEngine->mServer->ForEachClient([&Id, &Name](std::weak_ptr<TClient> Client) -> bool {
mEngine->mServer->ForEachClientWeak([&Id, &Name](std::weak_ptr<TClient> Client) -> bool {
if (!Client.expired()) {
auto locked = Client.lock();
if (locked->GetName() == Name) {
Id = locked->GetID();
if (locked->Name.get() == Name) {
Id = locked->ID.get();
return false;
}
}
@ -550,8 +550,8 @@ sol::table TLuaEngine::StateThreadData::Lua_FS_ListDirectories(const std::string
std::string TLuaEngine::StateThreadData::Lua_GetPlayerName(int ID) {
auto MaybeClient = GetClient(mEngine->Server(), ID);
if (MaybeClient && !MaybeClient.value().expired()) {
return MaybeClient.value().lock()->GetName();
if (MaybeClient) {
return MaybeClient.value()->Name.get();
} else {
return "";
}
@ -559,19 +559,15 @@ std::string TLuaEngine::StateThreadData::Lua_GetPlayerName(int ID) {
sol::table TLuaEngine::StateThreadData::Lua_GetPlayerVehicles(int ID) {
auto MaybeClient = GetClient(mEngine->Server(), ID);
if (MaybeClient && !MaybeClient.value().expired()) {
auto Client = MaybeClient.value().lock();
TClient::TSetOfVehicleData VehicleData;
{ // Vehicle Data Lock Scope
auto LockedData = Client->GetAllCars();
VehicleData = *LockedData.VehicleData;
} // End Vehicle Data Lock Scope
if (VehicleData.empty()) {
if (MaybeClient) {
auto Client = MaybeClient.value();
auto VehicleData = Client->VehicleData.synchronize();
if (VehicleData->empty()) {
return sol::lua_nil;
}
sol::state_view StateView(mState);
sol::table Result = StateView.create_table();
for (const auto& v : VehicleData) {
for (const auto& v : *VehicleData) {
Result[v.ID()] = v.Data().substr(3);
}
return Result;
@ -582,8 +578,8 @@ sol::table TLuaEngine::StateThreadData::Lua_GetPlayerVehicles(int ID) {
std::pair<sol::table, std::string> TLuaEngine::StateThreadData::Lua_GetPositionRaw(int PID, int VID) {
std::pair<sol::table, std::string> Result;
auto MaybeClient = GetClient(mEngine->Server(), PID);
if (MaybeClient && !MaybeClient.value().expired()) {
auto Client = MaybeClient.value().lock();
if (MaybeClient) {
auto Client = MaybeClient.value();
std::string VehiclePos = Client->GetCarPositionRaw(VID);
if (VehiclePos.empty()) {
@ -1083,7 +1079,7 @@ void TLuaResult::MarkAsReady() {
void TLuaResult::WaitUntilReady() {
std::unique_lock readyLock(*this->ReadyMutex);
// wait if not ready yet
if(!this->Ready)
if (!this->Ready)
this->ReadyCondition->wait(readyLock);
}

View File

@ -9,6 +9,7 @@
#include <array>
#include <boost/asio/ip/address.hpp>
#include <boost/asio/ip/address_v4.hpp>
#include <boost/thread/synchronized_value.hpp>
#include <cstring>
typedef boost::asio::detail::socket_option::integer<SOL_SOCKET, SO_RCVTIMEO> rcv_timeout_option;
@ -35,10 +36,8 @@ TNetwork::TNetwork(TServer& Server, TPPSMonitor& PPSMonitor, TResourceManager& R
Application::SetSubsystemStatus("UDPNetwork", Application::Status::Starting);
Application::RegisterShutdownHandler([&] {
beammp_debug("Kicking all players due to shutdown");
Server.ForEachClient([&](std::weak_ptr<TClient> client) -> bool {
if (!client.expired()) {
ClientKick(*client.lock(), "Server shutdown");
}
Server.ForEachClient([&](std::shared_ptr<TClient> Client) -> bool {
ClientKick(*Client, "Server shutdown");
return true;
});
});
@ -87,19 +86,10 @@ void TNetwork::UDPServerMain() {
if (Data.empty() || Pos > Data.begin() + 2)
continue;
uint8_t ID = uint8_t(Data.at(0)) - 1;
mServer.ForEachClient([&](std::weak_ptr<TClient> ClientPtr) -> bool {
std::shared_ptr<TClient> Client;
{
ReadLock Lock(mServer.GetClientMutex());
if (!ClientPtr.expired()) {
Client = ClientPtr.lock();
} else
return true;
}
if (Client->GetID() == ID) {
Client->SetUDPAddr(client);
Client->SetIsConnected(true);
mServer.ForEachClient([&](std::shared_ptr<TClient> Client) -> bool {
if (Client->ID == ID) {
Client->UDPAddress = client;
Client->IsConnected = true;
Data.erase(Data.begin(), Data.begin() + 2);
mServer.GlobalParser(ClientPtr, std::move(Data), mPPSMonitor, *this);
}
@ -191,8 +181,8 @@ void TNetwork::Identify(TConnection&& RawConnection) {
} else {
beammp_errorf("Invalid code got in Identify: '{}'", Code);
}
} catch(const std::exception& e) {
beammp_errorf("Error during handling of code {} - client left in invalid state, closing socket", Code);
} catch (const std::exception& e) {
beammp_errorf("Error during handling of code {}->client left in invalid state, closing socket", Code);
boost::system::error_code ec;
RawConnection.Socket.shutdown(socket_base::shutdown_both, ec);
if (ec) {
@ -215,16 +205,11 @@ void TNetwork::HandleDownload(TConnection&& Conn) {
return;
}
auto ID = uint8_t(D);
mServer.ForEachClient([&](const std::weak_ptr<TClient>& ClientPtr) -> bool {
ReadLock Lock(mServer.GetClientMutex());
if (!ClientPtr.expired()) {
auto c = ClientPtr.lock();
if (c->GetID() == ID) {
c->SetDownSock(std::move(Conn.Socket));
}
}
return true;
});
auto Client = GetClient(mServer, ID);
if (Client.has_value()) {
auto New = Sync<ip::tcp::socket>(std::move(Conn.Socket));
Client.value()->DownSocket.swap(New);
}
}
std::string HashPassword(const std::string& str) {
@ -260,8 +245,9 @@ std::shared_ptr<TClient> TNetwork::Authentication(TConnection&& RawConnection) {
return nullptr;
}
if (!TCPSend(*Client, StringToVector("A"))) { //changed to A for Accepted version
// TODO: handle
if (!TCPSend(*Client, StringToVector("A"))) { // changed to A for Accepted version
OnDisconnect(Client);
return nullptr;
}
Data = TCPRcv(*Client);
@ -273,8 +259,8 @@ std::shared_ptr<TClient> TNetwork::Authentication(TConnection&& RawConnection) {
std::string key(reinterpret_cast<const char*>(Data.data()), Data.size());
nlohmann::json AuthReq{};
std::string AuthResStr{};
nlohmann::json AuthReq {};
std::string AuthResStr {};
try {
AuthReq = nlohmann::json {
{ "key", key }
@ -298,7 +284,7 @@ std::shared_ptr<TClient> TNetwork::Authentication(TConnection&& RawConnection) {
Client->SetName(AuthRes["username"]);
Client->SetRoles(AuthRes["roles"]);
Client->SetIsGuest(AuthRes["guest"]);
Client->IsGuest = AuthRes["guest"];
for (const auto& ID : AuthRes["identifiers"]) {
auto Raw = std::string(ID);
auto SepIndex = Raw.find(':');
@ -316,24 +302,25 @@ std::shared_ptr<TClient> TNetwork::Authentication(TConnection&& RawConnection) {
return nullptr;
}
if(!Application::Settings.Password.empty()) { // ask password
if(!TCPSend(*Client, StringToVector("S"))) {
// TODO: handle
if (!Application::Settings.Password.empty()) { // ask password
if (!TCPSend(*Client, StringToVector("S"))) {
OnDisconnect(Client);
return {};
}
beammp_info("Waiting for password");
Data = TCPRcv(*Client);
std::string Pass = std::string(reinterpret_cast<const char*>(Data.data()), Data.size());
if(Pass != HashPassword(Application::Settings.Password)) {
beammp_debug(Client->GetName() + " attempted to connect with a wrong password");
if (Pass != HashPassword(Application::Settings.Password)) {
beammp_debug(Client->Name.get() + " attempted to connect with a wrong password");
ClientKick(*Client, "Wrong password!");
return {};
} else {
beammp_debug(Client->GetName() + " used the correct password");
beammp_debug(Client->Name.get() + " used the correct password");
}
}
beammp_debug("Name -> " + Client->GetName() + ", Guest -> " + std::to_string(Client->IsGuest()) + ", Roles -> " + Client->GetRoles());
mServer.ForEachClient([&](const std::weak_ptr<TClient>& ClientPtr) -> bool {
beammp_debug("Name-> " + Client->Name.get() + ", Guest-> " + std::to_string(Client->IsGuest.get()) + ", Roles-> " + Client->Role.get());
mServer.ForEachClientWeak([&](const std::weak_ptr<TClient>& ClientPtr) -> bool {
std::shared_ptr<TClient> Cl;
{
ReadLock Lock(mServer.GetClientMutex());
@ -342,7 +329,7 @@ std::shared_ptr<TClient> TNetwork::Authentication(TConnection&& RawConnection) {
} else
return true;
}
if (Cl->GetName() == Client->GetName() && Cl->IsGuest() == Client->IsGuest()) {
if (Cl->Name.get() == Client->Name.get() && Cl->IsGuest == Client->IsGuest) {
Cl->Disconnect("Stale Client (not a real player)");
return false;
}
@ -350,7 +337,7 @@ std::shared_ptr<TClient> TNetwork::Authentication(TConnection&& RawConnection) {
return true;
});
auto Futures = LuaAPI::MP::Engine->TriggerEvent("onPlayerAuth", "", Client->GetName(), Client->GetRoles(), Client->IsGuest(), Client->GetIdentifiers());
auto Futures = LuaAPI::MP::Engine->TriggerEvent("onPlayerAuth", "", Client->Name.get(), Client->Role.get(), Client->IsGuest.get(), Client->Identifiers.get());
TLuaEngine::WaitForAll(Futures);
bool NotAllowed = std::any_of(Futures.begin(), Futures.end(),
[](const std::shared_ptr<TLuaResult>& Result) {
@ -377,7 +364,13 @@ std::shared_ptr<TClient> TNetwork::Authentication(TConnection&& RawConnection) {
if (mServer.ClientCount() < size_t(Application::Settings.MaxPlayers)) {
beammp_info("Identification success");
mServer.InsertClient(Client);
TCPClient(Client);
try {
TCPClient(Client);
} catch (const std::exception& e) {
beammp_infof("Client {} disconnected: {}", Client->ID.get(), e.what());
OnDisconnect(Client);
return {};
}
} else {
ClientKick(*Client, "Server full!");
}
@ -392,7 +385,7 @@ std::shared_ptr<TClient> TNetwork::CreateClient(ip::tcp::socket&& TCPSock) {
bool TNetwork::TCPSend(TClient& c, const std::vector<uint8_t>& Data, bool IsSync) {
if (!IsSync) {
if (c.IsSyncing()) {
if (c.IsSyncing) {
if (!Data.empty()) {
if (Data.at(0) == 'O' || Data.at(0) == 'A' || Data.at(0) == 'C' || Data.at(0) == 'E') {
c.EnqueuePacket(Data);
@ -402,8 +395,6 @@ bool TNetwork::TCPSend(TClient& c, const std::vector<uint8_t>& Data, bool IsSync
}
}
auto& Sock = c.GetTCPSock();
/*
* our TCP protocol sends a header of 4 bytes, followed by the data.
*
@ -418,7 +409,7 @@ bool TNetwork::TCPSend(TClient& c, const std::vector<uint8_t>& Data, bool IsSync
std::memcpy(ToSend.data(), &Size, sizeof(Size));
std::memcpy(ToSend.data() + sizeof(Size), Data.data(), Data.size());
boost::system::error_code ec;
write(Sock, buffer(ToSend), ec);
write(*c.TCPSocket.synchronize(), buffer(ToSend), ec);
if (ec) {
beammp_debugf("write(): {}", ec.message());
c.Disconnect("write() failed");
@ -435,11 +426,10 @@ std::vector<uint8_t> TNetwork::TCPRcv(TClient& c) {
}
int32_t Header {};
auto& Sock = c.GetTCPSock();
boost::system::error_code ec;
std::array<uint8_t, sizeof(Header)> HeaderData;
read(Sock, buffer(HeaderData), ec);
read(*c.TCPSocket.synchronize(), buffer(HeaderData), ec);
if (ec) {
// TODO: handle this case (read failed)
beammp_debugf("TCPRcv: Reading header failed: {}", ec.message());
@ -448,8 +438,8 @@ std::vector<uint8_t> TNetwork::TCPRcv(TClient& c) {
Header = *reinterpret_cast<int32_t*>(HeaderData.data());
if (Header < 0) {
ClientKick(c, "Invalid packet - header negative");
beammp_errorf("Client {} send negative TCP header, ignoring packet", c.GetID());
ClientKick(c, "Invalid packet->header negative");
beammp_errorf("Client {} send negative TCP header, ignoring packet", c.ID.get());
return {};
}
@ -459,10 +449,10 @@ std::vector<uint8_t> TNetwork::TCPRcv(TClient& c) {
Data.resize(Header);
} else {
ClientKick(c, "Header size limit exceeded");
beammp_warn("Client " + c.GetName() + " (" + std::to_string(c.GetID()) + ") sent header of >100MB - assuming malicious intent and disconnecting the client.");
beammp_warn("Client " + c.Name.get() + " (" + std::to_string(c.ID.get()) + ") sent header of >100MB->assuming malicious intent and disconnecting the client.");
return {};
}
auto N = read(Sock, buffer(Data), ec);
auto N = read(*c.TCPSocket.synchronize(), buffer(Data), ec);
if (ec) {
// TODO: handle this case properly
beammp_debugf("TCPRcv: Reading data failed: {}", ec.message());
@ -485,7 +475,7 @@ std::vector<uint8_t> TNetwork::TCPRcv(TClient& c) {
void TNetwork::ClientKick(TClient& c, const std::string& R) {
beammp_info("Client kicked: " + R);
if (!TCPSend(c, StringToVector("K" + R))) {
beammp_debugf("tried to kick player '{}' (id {}), but was already disconnected", c.GetName(), c.GetID());
beammp_debugf("tried to kick player '{}' (id {}), but was already disconnected", c.Name.get(), c.ID.get());
}
c.Disconnect("Kicked");
}
@ -498,24 +488,24 @@ void TNetwork::Looper(const std::weak_ptr<TClient>& c) {
beammp_debug("client is disconnected, breaking client loop");
break;
}
if (!Client->IsSyncing() && Client->IsSynced() && Client->MissedPacketQueueSize() != 0) {
if (!Client->IsSyncing.get() && Client->IsSynced.get() && Client->MissedPacketsQueue->size() != 0) {
// debug("sending " + std::to_string(Client->MissedPacketQueueSize()) + " queued packets");
while (Client->MissedPacketQueueSize() > 0) {
while (Client->MissedPacketsQueue->size() > 0) {
std::vector<uint8_t> QData {};
{ // locked context
std::unique_lock lock(Client->MissedPacketQueueMutex());
if (Client->MissedPacketQueueSize() <= 0) {
auto Lock = Client->MissedPacketsQueue;
if (Lock->size() <= 0) {
break;
}
QData = Client->MissedPacketQueue().front();
Client->MissedPacketQueue().pop();
QData = Lock->front();
Lock->pop();
} // end locked context
// beammp_debug("sending a missed packet: " + QData);
if (!TCPSend(*Client, QData, true)) {
Client->Disconnect("Failed to TCPSend while clearing the missed packet queue");
std::unique_lock lock(Client->MissedPacketQueueMutex());
while (!Client->MissedPacketQueue().empty()) {
Client->MissedPacketQueue().pop();
auto Lock = Client->MissedPacketsQueue;
while (!Lock->empty()) {
Lock->pop();
}
break;
}
@ -528,12 +518,12 @@ void TNetwork::Looper(const std::weak_ptr<TClient>& c) {
void TNetwork::TCPClient(const std::weak_ptr<TClient>& c) {
// TODO: the c.expired() might cause issues here, remove if you end up here with your debugger
if (c.expired() || !c.lock()->GetTCPSock().is_open()) {
if (c.expired() || !c.lock()->TCPSocket->is_open()) {
mServer.RemoveClient(c);
return;
}
OnConnect(c);
RegisterThread("(" + std::to_string(c.lock()->GetID()) + ") \"" + c.lock()->GetName() + "\"");
RegisterThread("(" + std::to_string(c.lock()->ID.get()) + ") \"" + c.lock()->Name.get() + "\"");
std::thread QueueSync(&TNetwork::Looper, this, c);
@ -561,6 +551,7 @@ void TNetwork::TCPClient(const std::weak_ptr<TClient>& c) {
if (!c.expired()) {
auto Client = c.lock();
OnDisconnect(c);
return;
} else {
beammp_warn("client expired in TCPClient, should never happen");
}
@ -568,12 +559,9 @@ void TNetwork::TCPClient(const std::weak_ptr<TClient>& c) {
void TNetwork::UpdatePlayer(TClient& Client) {
std::string Packet = ("Ss") + std::to_string(mServer.ClientCount()) + "/" + std::to_string(Application::Settings.MaxPlayers) + ":";
mServer.ForEachClient([&](const std::weak_ptr<TClient>& ClientPtr) -> bool {
mServer.ForEachClient([&](const std::shared_ptr<TClient>& Client) -> bool {
ReadLock Lock(mServer.GetClientMutex());
if (!ClientPtr.expired()) {
auto c = ClientPtr.lock();
Packet += c->GetName() + ",";
}
Packet += Client->Name.get() + ",";
return true;
});
Packet = Packet.substr(0, Packet.length() - 1);
@ -582,6 +570,10 @@ void TNetwork::UpdatePlayer(TClient& Client) {
}
void TNetwork::OnDisconnect(const std::weak_ptr<TClient>& ClientPtr) {
// this is how one checks that the ClientPtr is not empty (as opposed to expired)
if (ClientPtr.owner_before(std::weak_ptr<TClient> {})) {
return;
}
std::shared_ptr<TClient> LockedClientPtr { nullptr };
try {
LockedClientPtr = ClientPtr.lock();
@ -591,64 +583,52 @@ void TNetwork::OnDisconnect(const std::weak_ptr<TClient>& ClientPtr) {
}
beammp_assert(LockedClientPtr != nullptr);
TClient& c = *LockedClientPtr;
beammp_info(c.GetName() + (" Connection Terminated"));
OnDisconnect(c);
}
void TNetwork::OnDisconnect(TClient& c) {
beammp_info(c.Name.get() + (" Connection Terminated"));
std::string Packet;
TClient::TSetOfVehicleData VehicleData;
{ // Vehicle Data Lock Scope
auto LockedData = c.GetAllCars();
VehicleData = *LockedData.VehicleData;
} // End Vehicle Data Lock Scope
for (auto& v : VehicleData) {
Packet = "Od:" + std::to_string(c.GetID()) + "-" + std::to_string(v.ID());
SendToAll(&c, StringToVector(Packet), false, true);
{
auto Locked = c.VehicleData.synchronize();
for (auto& v : *Locked) {
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent("onVehicleDeleted", "", c.ID.get(), v.ID()));
Packet = "Od:" + std::to_string(c.ID.get()) + "-" + std::to_string(v.ID());
SendToAll(&c, StringToVector(Packet), false, true);
}
}
Packet = ("L") + c.GetName() + (" left the server!");
Packet = ("L") + c.Name.get() + (" left the server!");
SendToAll(&c, StringToVector(Packet), false, true);
Packet.clear();
auto Futures = LuaAPI::MP::Engine->TriggerEvent("onPlayerDisconnect", "", c.GetID());
LuaAPI::MP::Engine->WaitForAll(Futures);
auto Futures = LuaAPI::MP::Engine->TriggerEvent("onPlayerDisconnect", "", c.ID.get());
TLuaEngine::WaitForAll(Futures);
c.Disconnect("Already Disconnected (OnDisconnect)");
mServer.RemoveClient(ClientPtr);
mServer.RemoveClient(c);
}
int TNetwork::OpenID() {
int ID = 0;
bool found;
do {
found = true;
mServer.ForEachClient([&](const std::weak_ptr<TClient>& ClientPtr) -> bool {
ReadLock Lock(mServer.GetClientMutex());
if (!ClientPtr.expired()) {
auto c = ClientPtr.lock();
if (c->GetID() == ID) {
found = false;
ID++;
}
}
return true;
});
} while (!found);
return ID;
void TNetwork::OnDisconnect(const std::shared_ptr<TClient>& ClientPtr) {
if (ClientPtr == nullptr) {
return;
}
OnDisconnect(*ClientPtr);
}
void TNetwork::OnConnect(const std::weak_ptr<TClient>& c) {
beammp_assert(!c.expired());
beammp_info("Client connected");
auto LockedClient = c.lock();
LockedClient->SetID(OpenID());
beammp_info("Assigned ID " + std::to_string(LockedClient->GetID()) + " to " + LockedClient->GetName());
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent("onPlayerConnecting", "", LockedClient->GetID()));
mServer.ClaimFreeIDFor(*LockedClient);
beammp_info("Assigned ID " + std::to_string(LockedClient->ID.get()) + " to " + LockedClient->Name.get());
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent("onPlayerConnecting", "", LockedClient->ID.get()));
SyncResources(*LockedClient);
if (LockedClient->IsDisconnected())
return;
(void)Respond(*LockedClient, StringToVector("M" + Application::Settings.MapName), true); // Send the Map on connect
beammp_info(LockedClient->GetName() + " : Connected");
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent("onPlayerJoining", "", LockedClient->GetID()));
beammp_info(LockedClient->Name.get() + " : Connected");
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent("onPlayerJoining", "", LockedClient->ID.get()));
}
void TNetwork::SyncResources(TClient& c) {
if (!TCPSend(c, StringToVector("P" + std::to_string(c.GetID())))) {
// TODO handle
if (!TCPSend(c, StringToVector("P" + std::to_string(c.ID.get())))) {
throw std::runtime_error("Failed to send 'P' to client");
}
std::vector<uint8_t> Data;
while (!c.IsDisconnected()) {
@ -680,7 +660,7 @@ void TNetwork::Parse(TClient& c, const std::vector<uint8_t>& Packet) {
if (ToSend.empty())
ToSend = "-";
if (!TCPSend(c, StringToVector(ToSend))) {
// TODO: error
throw std::runtime_error("Failed to send packet to client");
}
}
return;
@ -690,11 +670,11 @@ void TNetwork::Parse(TClient& c, const std::vector<uint8_t>& Packet) {
}
void TNetwork::SendFile(TClient& c, const std::string& UnsafeName) {
beammp_info(c.GetName() + " requesting : " + UnsafeName.substr(UnsafeName.find_last_of('/')));
beammp_info(c.Name.get() + " requesting : " + UnsafeName.substr(UnsafeName.find_last_of('/')));
if (!fs::path(UnsafeName).has_filename()) {
if (!TCPSend(c, StringToVector("CO"))) {
// TODO: handle
OnDisconnect(c);
}
beammp_warn("File " + UnsafeName + " is not a file!");
return;
@ -704,24 +684,24 @@ void TNetwork::SendFile(TClient& c, const std::string& UnsafeName) {
if (!std::filesystem::exists(FileName)) {
if (!TCPSend(c, StringToVector("CO"))) {
// TODO: handle
OnDisconnect(c);
}
beammp_warn("File " + UnsafeName + " could not be accessed!");
return;
}
if (!TCPSend(c, StringToVector("AG"))) {
// TODO: handle
OnDisconnect(c);
}
/// Wait for connections
int T = 0;
while (!c.GetDownSock().is_open() && T < 50) {
while (!c.DownSocket->is_open() && T < 50) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
T++;
}
if (!c.GetDownSock().is_open()) {
if (!c.DownSocket->is_open()) {
beammp_error("Client doesn't have a download socket!");
if (!c.IsDisconnected())
c.Disconnect("Missing download socket");
@ -804,31 +784,54 @@ void TNetwork::SplitLoad(TClient& c, size_t Sent, size_t Size, bool D, const std
Data.resize(Split);
else
Data.resize(Size);
ip::tcp::socket* TCPSock { nullptr };
if (D)
TCPSock = &c.GetDownSock();
else
TCPSock = &c.GetTCPSock();
while (!c.IsDisconnected() && Sent < Size) {
size_t Diff = Size - Sent;
if (Diff > Split) {
f.seekg(Sent, std::ios_base::beg);
f.read(reinterpret_cast<char*>(Data.data()), Split);
if (!TCPSendRaw(c, *TCPSock, Data.data(), Split)) {
if (!c.IsDisconnected())
c.Disconnect("TCPSendRaw failed in mod download (1)");
break;
if (D) {
auto TCPSock = c.DownSocket.synchronize();
while (!c.IsDisconnected() && Sent < Size) {
size_t Diff = Size - Sent;
if (Diff > Split) {
f.seekg(Sent, std::ios_base::beg);
f.read(reinterpret_cast<char*>(Data.data()), Split);
if (!TCPSendRaw(c, *TCPSock, Data.data(), Split)) {
if (!c.IsDisconnected())
c.Disconnect("TCPSendRaw failed in mod download (1)");
break;
}
Sent += Split;
} else {
f.seekg(Sent, std::ios_base::beg);
f.read(reinterpret_cast<char*>(Data.data()), Diff);
if (!TCPSendRaw(c, *TCPSock, Data.data(), int32_t(Diff))) {
if (!c.IsDisconnected())
c.Disconnect("TCPSendRaw failed in mod download (2)");
break;
}
Sent += Diff;
}
Sent += Split;
} else {
f.seekg(Sent, std::ios_base::beg);
f.read(reinterpret_cast<char*>(Data.data()), Diff);
if (!TCPSendRaw(c, *TCPSock, Data.data(), int32_t(Diff))) {
if (!c.IsDisconnected())
c.Disconnect("TCPSendRaw failed in mod download (2)");
break;
}
} else {
auto TCPSock = c.TCPSocket.synchronize();
while (!c.IsDisconnected() && Sent < Size) {
size_t Diff = Size - Sent;
if (Diff > Split) {
f.seekg(Sent, std::ios_base::beg);
f.read(reinterpret_cast<char*>(Data.data()), Split);
if (!TCPSendRaw(c, *TCPSock, Data.data(), Split)) {
if (!c.IsDisconnected())
c.Disconnect("TCPSendRaw failed in mod download (1)");
break;
}
Sent += Split;
} else {
f.seekg(Sent, std::ios_base::beg);
f.read(reinterpret_cast<char*>(Data.data()), Diff);
if (!TCPSendRaw(c, *TCPSock, Data.data(), int32_t(Diff))) {
if (!c.IsDisconnected())
c.Disconnect("TCPSendRaw failed in mod download (2)");
break;
}
Sent += Diff;
}
Sent += Diff;
}
}
}
@ -869,36 +872,24 @@ bool TNetwork::SyncClient(const std::weak_ptr<TClient>& c) {
return false;
}
auto LockedClient = c.lock();
if (LockedClient->IsSynced())
if (LockedClient->IsSynced.get())
return true;
// Syncing, later set isSynced
// after syncing is done, we apply all packets they missed
if (!Respond(*LockedClient, StringToVector("Sn" + LockedClient->GetName()), true)) {
if (!Respond(*LockedClient, StringToVector("Sn" + LockedClient->Name.get()), true)) {
return false;
}
// ignore error
(void)SendToAll(LockedClient.get(), StringToVector("JWelcome " + LockedClient->GetName() + "!"), false, true);
(void)SendToAll(LockedClient.get(), StringToVector("JWelcome " + LockedClient->Name.get() + "!"), false, true);
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent("onPlayerJoin", "", LockedClient->GetID()));
LockedClient->SetIsSyncing(true);
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent("onPlayerJoin", "", LockedClient->ID.get()));
LockedClient->IsSyncing = true;
bool Return = false;
bool res = true;
mServer.ForEachClient([&](const std::weak_ptr<TClient>& ClientPtr) -> bool {
std::shared_ptr<TClient> client;
{
ReadLock Lock(mServer.GetClientMutex());
if (!ClientPtr.expired()) {
client = ClientPtr.lock();
} else
return true;
}
TClient::TSetOfVehicleData VehicleData;
{ // Vehicle Data Lock Scope
auto LockedData = client->GetAllCars();
VehicleData = *LockedData.VehicleData;
} // End Vehicle Data Lock Scope
if (client != LockedClient) {
for (auto& v : VehicleData) {
mServer.ForEachClient([&](const std::shared_ptr<TClient>& Client) -> bool {
auto VehicleData = Client->VehicleData.synchronize();
if (Client != LockedClient) {
for (auto& v : *VehicleData) {
if (LockedClient->IsDisconnected()) {
Return = true;
res = false;
@ -910,32 +901,22 @@ bool TNetwork::SyncClient(const std::weak_ptr<TClient>& c) {
return true;
});
LockedClient->SetIsSyncing(false);
LockedClient->IsSyncing = false;
if (Return) {
return res;
}
LockedClient->SetIsSynced(true);
beammp_info(LockedClient->GetName() + (" is now synced!"));
LockedClient->IsSynced = true;
beammp_info(LockedClient->Name.get() + (" is now synced!"));
return true;
}
void TNetwork::SendToAll(TClient* c, const std::vector<uint8_t>& Data, bool Self, bool Rel) {
if (!Self)
beammp_assert(c);
char C = Data.at(0);
bool ret = true;
mServer.ForEachClient([&](std::weak_ptr<TClient> ClientPtr) -> bool {
std::shared_ptr<TClient> Client;
try {
ReadLock Lock(mServer.GetClientMutex());
Client = ClientPtr.lock();
} catch (const std::exception&) {
// continue
beammp_warn("Client expired, shouldn't happen - if a client disconnected recently, you can ignore this");
return true;
}
char C = static_cast<char>(Data.at(0));
mServer.ForEachClient([&](const std::shared_ptr<TClient>& Client) -> bool {
if (Self || Client.get() != c) {
if (Client->IsSynced() || Client->IsSyncing()) {
if (Client->IsSynced || Client->IsSyncing) {
if (Rel || C == 'W' || C == 'Y' || C == 'V' || C == 'E') {
if (C == 'O' || C == 'T' || Data.size() > 1000) {
if (Data.size() > 400) {
@ -945,38 +926,34 @@ void TNetwork::SendToAll(TClient* c, const std::vector<uint8_t>& Data, bool Self
} else {
Client->EnqueuePacket(Data);
}
// ret = SendLarge(*Client, Data);
} else {
Client->EnqueuePacket(Data);
// ret = TCPSend(*Client, Data);
}
} else {
ret = UDPSend(*Client, Data);
if (!UDPSend(*Client, Data)) {
OnDisconnect(Client);
}
}
}
}
return true;
});
if (!ret) {
// TODO: handle
}
return;
}
bool TNetwork::UDPSend(TClient& Client, std::vector<uint8_t> Data) {
if (!Client.IsConnected() || Client.IsDisconnected()) {
if (!Client.IsConnected || Client.IsDisconnected()) {
// this can happen if we try to send a packet to a client that is either
// 1. not yet fully connected, or
// 2. disconnected and not yet fully removed
// this is fine can can be ignored :^)
return true;
}
const auto Addr = Client.GetUDPAddr();
const auto Addr = Client.UDPAddress;
if (Data.size() > 400) {
CompressProperly(Data);
}
boost::system::error_code ec;
mUDPSock.send_to(buffer(Data), Addr, 0, ec);
mUDPSock.send_to(buffer(Data), *Addr, 0, ec);
if (ec) {
beammp_debugf("UDP sendto() failed: {}", ec.message());
if (!Client.IsDisconnected())

View File

@ -33,7 +33,7 @@ void TPPSMonitor::operator()() {
Application::SetPPS("-");
continue;
}
mServer.ForEachClient([&](const std::weak_ptr<TClient>& ClientPtr) -> bool {
mServer.ForEachClientWeak([&](const std::weak_ptr<TClient>& ClientPtr) -> bool {
std::shared_ptr<TClient> c;
{
ReadLock Lock(mServer.GetClientMutex());
@ -48,7 +48,7 @@ void TPPSMonitor::operator()() {
}
// kick on "no ping"
if (c->SecondsSinceLastPing() > (20 * 60)) {
beammp_debug("client " + std::string("(") + std::to_string(c->GetID()) + ")" + c->GetName() + " timing out: " + std::to_string(c->SecondsSinceLastPing()) + ", pps: " + Application::PPS());
beammp_debug("client " + std::string("(") + std::to_string(c->ID.get()) + ")" + c->Name.get() + " timing out: " + std::to_string(c->SecondsSinceLastPing()) + ", pps: " + Application::PPS());
TimedOutClients.push_back(c);
}

View File

@ -118,7 +118,17 @@ TServer::TServer(const std::vector<std::string_view>& Arguments) {
}
Application::SetSubsystemStatus("Server", Application::Status::Good);
}
void TServer::RemoveClient(TClient& Client) {
beammp_debug("removing client " + Client.Name.get() + " (" + std::to_string(ClientCount()) + ")");
Client.ClearCars();
WriteLock Lock(mClientsMutex);
// erase all clients whose id matches, and those who have expired
std::erase_if(mClients,
[&](const std::weak_ptr<TClient>& C) {
return C.expired()
|| C.lock()->ID.get() == Client.ID.get();
});
}
void TServer::RemoveClient(const std::weak_ptr<TClient>& WeakClientPtr) {
std::shared_ptr<TClient> LockedClientPtr { nullptr };
try {
@ -129,14 +139,39 @@ void TServer::RemoveClient(const std::weak_ptr<TClient>& WeakClientPtr) {
}
beammp_assert(LockedClientPtr != nullptr);
TClient& Client = *LockedClientPtr;
beammp_debug("removing client " + Client.GetName() + " (" + std::to_string(ClientCount()) + ")");
// TODO: Send delete packets for all cars
Client.ClearCars();
WriteLock Lock(mClientsMutex);
mClients.erase(WeakClientPtr.lock());
RemoveClient(Client);
}
void TServer::ForEachClient(const std::function<bool(std::weak_ptr<TClient>)>& Fn) {
void TServer::ClaimFreeIDFor(TClient& ClientToChange) {
ReadLock lock(mClientsMutex);
int ID = 0;
bool found = true;
do {
found = true;
for (const auto& Client : mClients) {
if (Client->ID.get() == ID) {
found = false;
++ID;
}
}
} while (!found);
ClientToChange.ID = ID;
}
void TServer::ForEachClient(const std::function<bool(const std::shared_ptr<TClient>&)> Fn) {
decltype(mClients) Clients;
{
ReadLock lock(mClientsMutex);
Clients = mClients;
}
for (auto& Client : Clients) {
if (!Fn(Client)) {
break;
}
}
}
void TServer::ForEachClientWeak(const std::function<bool(std::weak_ptr<TClient>)>& Fn) {
decltype(mClients) Clients;
{
ReadLock lock(mClientsMutex);
@ -196,7 +231,7 @@ void TServer::GlobalParser(const std::weak_ptr<TClient>& Client, std::vector<uin
return;
case 'O':
if (Packet.size() > 1000) {
beammp_debug(("Received data from: ") + LockedClient->GetName() + (" Size: ") + std::to_string(Packet.size()));
beammp_debug(("Received data from: ") + LockedClient->Name.get() + (" Size: ") + std::to_string(Packet.size()));
}
ParseVehicle(*LockedClient, StringPacket, Network);
return;
@ -210,12 +245,12 @@ void TServer::GlobalParser(const std::weak_ptr<TClient>& Client, std::vector<uin
Message = PacketAsString.substr(ColonPos + 2);
}
if (Message.empty()) {
beammp_debugf("Empty chat message received from '{}' ({}), ignoring it", LockedClient->GetName(), LockedClient->GetID());
beammp_debugf("Empty chat message received from '{}' ({}), ignoring it", LockedClient->Name.get(), LockedClient->ID.get());
return;
}
auto Futures = LuaAPI::MP::Engine->TriggerEvent("onChatMessage", "", LockedClient->GetID(), LockedClient->GetName(), Message);
auto Futures = LuaAPI::MP::Engine->TriggerEvent("onChatMessage", "", LockedClient->ID.get(), LockedClient->Name.get(), Message);
TLuaEngine::WaitForAll(Futures);
LogChatMessage(LockedClient->GetName(), LockedClient->GetID(), PacketAsString.substr(PacketAsString.find(':', 3) + 1));
LogChatMessage(LockedClient->Name.get(), LockedClient->ID.get(), PacketAsString.substr(PacketAsString.find(':', 3) + 1));
if (std::any_of(Futures.begin(), Futures.end(),
[](const std::shared_ptr<TLuaResult>& Elem) {
return !Elem->Error
@ -224,7 +259,7 @@ void TServer::GlobalParser(const std::weak_ptr<TClient>& Client, std::vector<uin
})) {
break;
}
std::string SanitizedPacket = fmt::format("C:{}: {}", LockedClient->GetName(), Message);
std::string SanitizedPacket = fmt::format("C:{}: {}", LockedClient->Name.get(), Message);
Network.SendToAll(nullptr, StringToVector(SanitizedPacket), true, true);
return;
}
@ -249,7 +284,7 @@ void TServer::HandleEvent(TClient& c, const std::string& RawData) {
// E:Name:Data
// Data is allowed to have ':'
if (RawData.size() < 2) {
beammp_debugf("Client '{}' ({}) tried to send an empty event, ignoring", c.GetName(), c.GetID());
beammp_debugf("Client '{}' ({}) tried to send an empty event, ignoring", c.Name.get(), c.ID.get());
return;
}
auto NameDataSep = RawData.find(':', 2);
@ -258,7 +293,7 @@ void TServer::HandleEvent(TClient& c, const std::string& RawData) {
}
std::string Name = RawData.substr(2, NameDataSep - 2);
std::string Data = RawData.substr(NameDataSep + 1);
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent(Name, "", c.GetID(), Data));
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent(Name, "", c.ID.get(), Data));
}
bool TServer::IsUnicycle(TClient& c, const std::string& CarJson) {
@ -269,14 +304,15 @@ bool TServer::IsUnicycle(TClient& c, const std::string& CarJson) {
return true;
}
} catch (const std::exception& e) {
beammp_warn("Failed to parse vehicle data as json for client " + std::to_string(c.GetID()) + ": '" + CarJson + "'.");
beammp_warn("Failed to parse vehicle data as json for client " + std::to_string(c.ID.get()) + ": '" + CarJson + "'.");
}
return false;
}
bool TServer::ShouldSpawn(TClient& c, const std::string& CarJson, int ID) {
if (IsUnicycle(c, CarJson) && c.GetUnicycleID() < 0) {
c.SetUnicycleID(ID);
auto UnicycleID = c.UnicycleID.synchronize();
if (IsUnicycle(c, CarJson) && *UnicycleID < 0) {
*UnicycleID = ID;
return true;
} else {
return c.GetCarCount() < Application::Settings.MaxCars;
@ -296,11 +332,11 @@ void TServer::ParseVehicle(TClient& c, const std::string& Pckt, TNetwork& Networ
beammp_tracef("got 'Os' packet: '{}' ({})", Packet, Packet.size());
if (Data.at(0) == '0') {
int CarID = c.GetOpenCarID();
beammp_debugf("'{}' created a car with ID {}", c.GetName(), CarID);
beammp_debugf("'{}' created a car with ID {}", c.Name.get(), CarID);
std::string CarJson = Packet.substr(5);
Packet = "Os:" + c.GetRoles() + ":" + c.GetName() + ":" + std::to_string(c.GetID()) + "-" + std::to_string(CarID) + ":" + CarJson;
auto Futures = LuaAPI::MP::Engine->TriggerEvent("onVehicleSpawn", "", c.GetID(), CarID, Packet.substr(3));
Packet = "Os:" + c.Role.get() + ":" + c.Name.get() + ":" + std::to_string(c.ID.get()) + "-" + std::to_string(CarID) + ":" + CarJson;
auto Futures = LuaAPI::MP::Engine->TriggerEvent("onVehicleSpawn", "", c.ID.get(), CarID, Packet.substr(3));
TLuaEngine::WaitForAll(Futures);
bool ShouldntSpawn = std::any_of(Futures.begin(), Futures.end(),
[](const std::shared_ptr<TLuaResult>& Result) {
@ -314,11 +350,11 @@ void TServer::ParseVehicle(TClient& c, const std::string& Pckt, TNetwork& Networ
if (!Network.Respond(c, StringToVector(Packet), true)) {
// TODO: handle
}
std::string Destroy = "Od:" + std::to_string(c.GetID()) + "-" + std::to_string(CarID);
std::string Destroy = "Od:" + std::to_string(c.ID.get()) + "-" + std::to_string(CarID);
if (!Network.Respond(c, StringToVector(Destroy), true)) {
// TODO: handle
}
beammp_debugf("{} (force : car limit/lua) removed ID {}", c.GetName(), CarID);
beammp_debugf("{} (force : car limit/lua) removed ID {}", c.Name.get(), CarID);
}
}
return;
@ -328,8 +364,8 @@ void TServer::ParseVehicle(TClient& c, const std::string& Pckt, TNetwork& Networ
if (MaybePidVid) {
std::tie(PID, VID) = MaybePidVid.value();
}
if (PID != -1 && VID != -1 && PID == c.GetID()) {
auto Futures = LuaAPI::MP::Engine->TriggerEvent("onVehicleEdited", "", c.GetID(), VID, Packet.substr(3));
if (PID != -1 && VID != -1 && PID == c.ID.get()) {
auto Futures = LuaAPI::MP::Engine->TriggerEvent("onVehicleEdited", "", c.ID.get(), VID, Packet.substr(3));
TLuaEngine::WaitForAll(Futures);
bool ShouldntAllow = std::any_of(Futures.begin(), Futures.end(),
[](const std::shared_ptr<TLuaResult>& Result) {
@ -338,15 +374,16 @@ void TServer::ParseVehicle(TClient& c, const std::string& Pckt, TNetwork& Networ
auto FoundPos = Packet.find('{');
FoundPos = FoundPos == std::string::npos ? 0 : FoundPos; // attempt at sanitizing this
if ((c.GetUnicycleID() != VID || IsUnicycle(c, Packet.substr(FoundPos)))
auto UnicycleID = c.UnicycleID.synchronize();
if ((*UnicycleID != VID || IsUnicycle(c, Packet.substr(FoundPos)))
&& !ShouldntAllow) {
Network.SendToAll(&c, StringToVector(Packet), false, true);
Apply(c, VID, Packet);
} else {
if (c.GetUnicycleID() == VID) {
c.SetUnicycleID(-1);
if (*UnicycleID == VID) {
*UnicycleID = -1;
}
std::string Destroy = "Od:" + std::to_string(c.GetID()) + "-" + std::to_string(VID);
std::string Destroy = "Od:" + std::to_string(c.ID.get()) + "-" + std::to_string(VID);
Network.SendToAll(nullptr, StringToVector(Destroy), true, true);
c.DeleteCar(VID);
}
@ -359,15 +396,16 @@ void TServer::ParseVehicle(TClient& c, const std::string& Pckt, TNetwork& Networ
if (MaybePidVid) {
std::tie(PID, VID) = MaybePidVid.value();
}
if (PID != -1 && VID != -1 && PID == c.GetID()) {
if (c.GetUnicycleID() == VID) {
c.SetUnicycleID(-1);
if (PID != -1 && VID != -1 && PID == c.ID.get()) {
auto UnicycleID = c.UnicycleID.synchronize();
if (*UnicycleID == VID) {
*UnicycleID = -1;
}
Network.SendToAll(nullptr, StringToVector(Packet), true, true);
// TODO: should this trigger on all vehicle deletions?
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent("onVehicleDeleted", "", c.GetID(), VID));
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent("onVehicleDeleted", "", c.ID.get(), VID));
c.DeleteCar(VID);
beammp_debug(c.GetName() + (" deleted car with ID ") + std::to_string(VID));
beammp_debug(c.Name.get() + (" deleted car with ID ") + std::to_string(VID));
}
return;
}
@ -378,9 +416,9 @@ void TServer::ParseVehicle(TClient& c, const std::string& Pckt, TNetwork& Networ
std::tie(PID, VID) = MaybePidVid.value();
}
if (PID != -1 && VID != -1 && PID == c.GetID()) {
if (PID != -1 && VID != -1 && PID == c.ID.get()) {
Data = Data.substr(Data.find('{'));
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent("onVehicleReset", "", c.GetID(), VID, Data));
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent("onVehicleReset", "", c.ID.get(), VID, Data));
Network.SendToAll(&c, StringToVector(Packet), false, true);
}
return;