start rewriting networking

This commit is contained in:
Lion Kortlepel
2024-01-15 20:39:32 +01:00
parent 443871ec0f
commit 7e9bb0cbf2
18 changed files with 150 additions and 2046 deletions

3
.gitmodules vendored
View File

@@ -4,3 +4,6 @@
[submodule "vcpkg"]
path = vcpkg
url = https://github.com/Microsoft/vcpkg.git
[submodule "deps/BeamMP-Protocol"]
path = deps/BeamMP-Protocol
url = https://github.com/BeamMP/BeamMP-Protocol

View File

@@ -15,6 +15,8 @@ include(cmake/StandardSettings.cmake)
include(cmake/StaticAnalyzers.cmake)
include(cmake/Git.cmake)
add_subdirectory(deps/BeamMP-Protocol)
# below are options which should be changed
### SETTINGS ###
@@ -23,7 +25,6 @@ include(cmake/Git.cmake)
set(PRJ_HEADERS
include/ArgsParser.h
include/BoostAliases.h
include/Client.h
include/Common.h
include/Compat.h
include/Cryptography.h
@@ -41,19 +42,15 @@ set(PRJ_HEADERS
include/THeartbeatThread.h
include/TLuaEngine.h
include/TLuaPlugin.h
include/TNetwork.h
include/TPluginMonitor.h
include/TPPSMonitor.h
include/TResourceManager.h
include/TScopedTimer.h
include/TServer.h
include/VehicleData.h
include/Env.h
include/Network.h
)
# add all source files (.cpp) to this, except the one with main()
set(PRJ_SOURCES
src/ArgsParser.cpp
src/Client.cpp
src/Common.cpp
src/Compat.cpp
src/Http.cpp
@@ -64,14 +61,11 @@ set(PRJ_SOURCES
src/THeartbeatThread.cpp
src/TLuaEngine.cpp
src/TLuaPlugin.cpp
src/TNetwork.cpp
src/TPluginMonitor.cpp
src/TPPSMonitor.cpp
src/TResourceManager.cpp
src/TScopedTimer.cpp
src/TServer.cpp
src/VehicleData.cpp
src/Env.cpp
src/Network.cpp
)
find_package(Lua REQUIRED)
@@ -98,7 +92,9 @@ set(PRJ_LIBRARIES
httplib::httplib
libzip::zip
OpenSSL::SSL OpenSSL::Crypto
protocol
${LUA_LIBRARIES}
zstd::libzstd_static
)
# add dependency find_package calls and similar here
@@ -111,6 +107,7 @@ find_package(libzip CONFIG REQUIRED)
find_package(RapidJSON CONFIG REQUIRED)
find_package(sol2 CONFIG REQUIRED)
find_package(toml11 CONFIG REQUIRED)
find_package(zstd CONFIG REQUIRED)
include_directories(include)

1
deps/BeamMP-Protocol vendored Submodule

Submodule deps/BeamMP-Protocol added at 7ed05d3bae

View File

@@ -1,88 +0,0 @@
#pragma once
#include <chrono>
#include <memory>
#include <optional>
#include <queue>
#include <string>
#include <unordered_set>
#include "BoostAliases.h"
#include "Common.h"
#include "Compat.h"
#include "RWMutex.h"
#include "Sync.h"
#include "VehicleData.h"
class TServer;
#ifdef BEAMMP_WINDOWS
// for socklen_t
#include <WS2tcpip.h>
#endif // WINDOWS
struct TConnection final {
ip::tcp::socket Socket;
ip::tcp::endpoint SockAddr;
};
class TClient final {
public:
using TSetOfVehicleData = std::vector<TVehicleData>;
TClient(TServer& Server, ip::tcp::socket&& Socket);
TClient(const TClient&) = delete;
~TClient();
TClient& operator=(const TClient&) = delete;
void AddNewCar(int Ident, const std::string& Data);
void SetCarData(int Ident, const std::string& Data);
void SetCarPosition(int Ident, const std::string& Data);
void SetName(const std::string& NewName) { Name = NewName; }
void SetRoles(const std::string& NewRole) { Role = NewRole; }
void SetIdentifier(const std::string& key, const std::string& value);
std::string GetCarData(int Ident);
std::string GetCarPositionRaw(int Ident);
bool IsDisconnected() const { return !TCPSocket->is_open(); }
// locks
void DeleteCar(int Ident);
[[nodiscard]] int GetOpenCarID() const;
[[nodiscard]] int GetCarCount() const;
void ClearCars();
void EnqueuePacket(const std::vector<uint8_t>& Packet);
void SetIsConnected(bool NewIsConnected) { IsConnected = NewIsConnected; }
[[nodiscard]] TServer& Server() const;
void UpdatePingTime();
int SecondsSinceLastPing();
Sync<bool> IsConnected = false;
Sync<bool> IsSynced = false;
Sync<bool> IsSyncing = false;
Sync<std::unordered_map<std::string, std::string>> Identifiers;
Sync<ip::tcp::socket> TCPSocket;
Sync<ip::tcp::socket> DownSocket;
Sync<ip::udp::endpoint> UDPAddress {};
Sync<int> UnicycleID = -1;
Sync<std::string> Role;
Sync<std::string> DID;
Sync<int> ID = -1;
Sync<bool> IsGuest = false;
Sync<std::string> Name = std::string("Unknown Client");
Sync<TSetOfVehicleData> VehicleData;
Sync<SparseArray<std::string>> VehiclePosition;
Sync<std::queue<std::vector<uint8_t>>> MissedPacketsQueue;
Sync<std::chrono::time_point<std::chrono::high_resolution_clock>> LastPingTime;
friend class TNetwork;
private:
/// ONLY call after the client has been cleaned up, all cars deleted, etc.
void CloseSockets(std::string_view Reason);
void InsertVehicle(int ID, const std::string& Data);
TServer& mServer;
};
std::optional<std::shared_ptr<TClient>> GetClient(class TServer& Server, int ID);

View File

@@ -265,57 +265,5 @@ void RegisterThread(const std::string& str);
void LogChatMessage(const std::string& name, int id, const std::string& msg);
#define Biggest 30000
template <typename T>
inline T Comp(const T& Data) {
std::array<char, Biggest> C {};
// obsolete
C.fill(0);
z_stream defstream;
defstream.zalloc = nullptr;
defstream.zfree = nullptr;
defstream.opaque = nullptr;
defstream.avail_in = uInt(Data.size());
defstream.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(&Data[0]));
defstream.avail_out = Biggest;
defstream.next_out = reinterpret_cast<Bytef*>(C.data());
deflateInit(&defstream, Z_BEST_COMPRESSION);
deflate(&defstream, Z_SYNC_FLUSH);
deflate(&defstream, Z_FINISH);
deflateEnd(&defstream);
size_t TotalOut = defstream.total_out;
T Ret;
Ret.resize(TotalOut);
std::fill(Ret.begin(), Ret.end(), 0);
std::copy_n(C.begin(), TotalOut, Ret.begin());
return Ret;
}
template <typename T>
inline T DeComp(const T& Compressed) {
std::array<char, Biggest> C {};
// not needed
C.fill(0);
z_stream infstream;
infstream.zalloc = nullptr;
infstream.zfree = nullptr;
infstream.opaque = nullptr;
infstream.avail_in = Biggest;
infstream.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(&Compressed[0]));
infstream.avail_out = Biggest;
infstream.next_out = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(C.data()));
inflateInit(&infstream);
inflate(&infstream, Z_SYNC_FLUSH);
inflate(&infstream, Z_FINISH);
inflateEnd(&infstream);
size_t TotalOut = infstream.total_out;
T Ret;
Ret.resize(TotalOut);
std::fill(Ret.begin(), Ret.end(), 0);
std::copy_n(C.begin(), TotalOut, Ret.begin());
return Ret;
}
std::string GetPlatformAgnosticErrorString();
#define S_DSN SU_RAW

64
include/Network.h Normal file
View File

@@ -0,0 +1,64 @@
#pragma once
#include "State.h"
#include "Sync.h"
#include "Transport.h"
#include <boost/asio.hpp>
#include <cstdint>
#include <memory>
#include <filesystem>
#include <unordered_map>
#include <vector>
using ClientID = uint32_t;
using VehicleID = uint16_t;
using namespace boost::asio;
struct Packet {
bmp::Purpose purpose;
bmp::Flags flags;
std::vector<uint8_t> data;
bmp::Header header() const;
};
struct Client {
using Ptr = std::shared_ptr<Client>;
ClientID id;
bmp::State state;
Packet tcp_read(boost::system::error_code& ec);
void tcp_write(const Packet& packet, boost::system::error_code& ec);
void tcp_write_file_raw(const std::filesystem::path& path, boost::system::error_code& ec);
Packet udp_read(boost::system::error_code& ec, ip::udp::socket& socket);
void udp_write(const Packet& packet, ip::udp::socket& socket, boost::system::error_code& ec);
Client(ip::udp::endpoint& ep, ip::tcp::socket&& socket);
~Client();
private:
std::mutex m_tcp_read_mtx;
std::mutex m_tcp_write_mtx;
std::mutex m_udp_read_mtx;
ip::udp::endpoint m_udp_ep;
ip::tcp::socket m_tcp_socket;
};
struct Vehicle {
using Ptr = std::shared_ptr<Vehicle>;
ClientID owner;
std::vector<uint8_t> data;
};
class Network {
public:
private:
Sync<std::unordered_map<ClientID, Client::Ptr>> m_clients;
Sync<std::unordered_map<VehicleID, Vehicle::Ptr>> m_vehicles;
boost::asio::io_context m_io;
};

View File

@@ -1,9 +1,10 @@
#pragma once
#include <boost/thread/synchronized_value.hpp>
#include <mutex>
/// This header provides convenience aliases for synchronization primitives.
template<typename T>
using Sync = boost::synchronized_value<T>;
using Sync = boost::synchronized_value<T, std::recursive_mutex>;

View File

@@ -1,57 +0,0 @@
#pragma once
#include "BoostAliases.h"
#include "Compat.h"
#include "TResourceManager.h"
#include "TServer.h"
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/udp.hpp>
struct TConnection;
class TNetwork {
public:
TNetwork(TServer& Server, TPPSMonitor& PPSMonitor, TResourceManager& ResourceManager);
[[nodiscard]] bool TCPSend(TClient& c, const std::vector<uint8_t>& Data, bool IsSync = false);
[[nodiscard]] bool SendLarge(TClient& c, std::vector<uint8_t> Data, bool isSync = false);
[[nodiscard]] bool Respond(TClient& c, const std::vector<uint8_t>& MSG, bool Rel, bool isSync = false);
std::shared_ptr<TClient> CreateClient(ip::tcp::socket&& TCPSock);
std::vector<uint8_t> TCPRcv(TClient& c);
void ClientKick(TClient& c, const std::string& R);
void Disconnect(const std::shared_ptr<TClient>& ClientPtr);
void Disconnect(const std::weak_ptr<TClient>& ClientPtr);
void Disconnect(TClient& Client);
[[nodiscard]] bool SyncClient(const std::weak_ptr<TClient>& c);
void Identify(TConnection&& client);
std::shared_ptr<TClient> Authentication(TConnection&& ClientConnection);
void SyncResources(TClient& c);
[[nodiscard]] bool UDPSend(TClient& Client, std::vector<uint8_t> Data);
void SendToAll(TClient* c, const std::vector<uint8_t>& Data, bool Self, bool Rel);
void UpdatePlayer(TClient& Client);
private:
void UDPServerMain();
void TCPServerMain();
TServer& mServer;
TPPSMonitor& mPPSMonitor;
ip::udp::socket mUDPSock;
TResourceManager& mResourceManager;
std::thread mUDPThread;
std::thread mTCPThread;
std::vector<uint8_t> UDPRcvFromClient(ip::udp::endpoint& ClientEndpoint);
void HandleDownload(TConnection&& TCPSock);
void OnConnect(const std::weak_ptr<TClient>& c);
void TCPClient(const std::shared_ptr<TClient>& c);
void Looper(const std::shared_ptr<TClient>& c);
void Parse(TClient& c, const std::vector<uint8_t>& Packet);
void SendFile(TClient& c, const std::string& Name);
bool TCPSendRaw(TClient& C, ip::tcp::socket& socket, const uint8_t* Data, size_t Size);
void SplitLoad(TClient& c, size_t Sent, size_t Size, bool D, const std::string& Name);
const uint8_t* SendSplit(TClient& c, ip::tcp::socket& Socket, const uint8_t* DataPtr, size_t Size);
};
std::string HashPassword(const std::string& str);
std::vector<uint8_t> StringToVector(const std::string& Str);

View File

@@ -1,27 +0,0 @@
#pragma once
#include "Common.h"
#include "TServer.h"
#include <optional>
class TNetwork;
class TPPSMonitor : public IThreaded {
public:
explicit TPPSMonitor(TServer& Server);
virtual ~TPPSMonitor() {}
void operator()() override;
void SetInternalPPS(int NewPPS) { mInternalPPS = NewPPS; }
void IncrementInternalPPS() { ++mInternalPPS; }
[[nodiscard]] int InternalPPS() const { return mInternalPPS; }
void SetNetwork(TNetwork& Server) { mNetwork = std::ref(Server); }
private:
TNetwork& Network() { return mNetwork->get(); }
TServer& mServer;
std::optional<std::reference_wrapper<TNetwork>> mNetwork { std::nullopt };
int mInternalPPS { 0 };
};

View File

@@ -1,21 +0,0 @@
#pragma once
#include "Common.h"
class TResourceManager {
public:
TResourceManager();
[[nodiscard]] size_t MaxModSize() const { return mMaxModSize; }
[[nodiscard]] std::string FileList() const { return mFileList; }
[[nodiscard]] std::string TrimmedList() const { return mTrimmedList; }
[[nodiscard]] std::string FileSizes() const { return mFileSizes; }
[[nodiscard]] int ModsLoaded() const { return mModsLoaded; }
private:
size_t mMaxModSize = 0;
std::string mFileSizes;
std::string mFileList;
std::string mTrimmedList;
int mModsLoaded = 0;
};

View File

@@ -1,59 +0,0 @@
#pragma once
#include "IThreaded.h"
#include "RWMutex.h"
#include "TScopedTimer.h"
#include <functional>
#include <memory>
#include <mutex>
#include <unordered_set>
#include "BoostAliases.h"
class TClient;
class TNetwork;
class TPPSMonitor;
class TServer final {
public:
using TClientSet = std::unordered_set<std::shared_ptr<TClient>>;
TServer(const std::vector<std::string_view>& Arguments);
void InsertClient(const std::shared_ptr<TClient>& Ptr);
void RemoveClient(const std::weak_ptr<TClient>&);
void RemoveClient(TClient&);
// in Fn, return true to continue, return false to break
void ForEachClient(const std::function<bool(const std::shared_ptr<TClient>&)> Fn);
size_t ClientCount() const;
void GlobalParser(const std::shared_ptr<TClient>& Client, std::vector<uint8_t>&& Packet, TPPSMonitor& PPSMonitor, TNetwork& Network);
static void HandleEvent(TClient& c, const std::string& Data);
RWMutex& GetClientMutex() const { return mClientsMutex; }
// thread-safe ID lookup & claim
void ClaimFreeIDFor(TClient& Client);
const TScopedTimer UptimeTimer;
// asio io context
io_context& IoCtx() { return mIoCtx; }
private:
io_context mIoCtx {};
TClientSet mClients;
mutable RWMutex mClientsMutex;
static void ParseVehicle(TClient& c, const std::string& Pckt, TNetwork& Network);
static bool ShouldSpawn(TClient& c, const std::string& CarJson, int ID);
static bool IsUnicycle(TClient& c, const std::string& CarJson);
static void Apply(TClient& c, int VID, const std::string& pckt);
void HandlePosition(TClient& c, const std::string& Packet);
};
struct BufferView {
uint8_t* Data { nullptr };
size_t Size { 0 };
const uint8_t* data() const { return Data; }
uint8_t* data() { return Data; }
size_t size() const { return Size; }
};

View File

@@ -1,154 +0,0 @@
#include "Client.h"
#include "CustomAssert.h"
#include "TServer.h"
#include <memory>
#include <optional>
void TClient::DeleteCar(int Ident) {
// TODO: Send delete packets
auto VehData = VehicleData.synchronize();
auto iter = std::find_if(VehData->begin(), VehData->end(), [&](auto& elem) {
return Ident == elem.ID();
});
if (iter != VehData->end()) {
VehData->erase(iter);
} else {
beammp_debug("tried to erase a vehicle that doesn't exist (not an error)");
}
}
void TClient::ClearCars() {
VehicleData->clear();
}
int TClient::GetOpenCarID() const {
int OpenID = 0;
bool found;
auto VehData = VehicleData.synchronize();
do {
found = true;
for (auto& v : *VehData) {
if (v.ID() == OpenID) {
OpenID++;
found = false;
}
}
} while (!found);
return OpenID;
}
void TClient::AddNewCar(int Ident, const std::string& Data) {
VehicleData->emplace_back(Ident, Data);
}
std::string TClient::GetCarPositionRaw(int Ident) {
try {
return VehiclePosition->at(size_t(Ident));
} catch (const std::out_of_range& oor) {
beammp_debugf("GetCarPositionRaw failed for id {}, as that car doesn't exist on client id {}: {}", Ident, int(ID), oor.what());
return "";
}
}
void TClient::CloseSockets(std::string_view Reason) {
auto LockedSocket = TCPSocket.synchronize();
beammp_debugf("Disconnecting client {} for reason: {}", int(ID), Reason);
boost::system::error_code ec;
LockedSocket->shutdown(socket_base::shutdown_both, ec);
if (ec) {
beammp_debugf("Failed to shutdown client socket of client {}: {}", ID.get(), ec.message());
}
LockedSocket->close(ec);
if (ec) {
beammp_debugf("Failed to close client socket of client {}: {}", ID.get(), ec.message());
}
DownSocket->shutdown(socket_base::shutdown_both, ec);
if (ec) {
beammp_debugf("Failed to shutdown client download socket of client {}: {}", ID.get(), ec.message());
}
DownSocket->close(ec);
if (ec) {
beammp_debugf("Failed to close client download socket of client {}: {}", ID.get(), ec.message());
}
}
void TClient::SetCarPosition(int Ident, const std::string& Data) {
// ugly but this is c++ so
VehiclePosition->operator[](size_t(Ident)) = Data;
}
std::string TClient::GetCarData(int Ident) {
{ // lock
auto Lock = VehicleData.synchronize();
for (auto& v : *Lock) {
if (v.ID() == Ident) {
return v.Data();
}
}
} // unlock
DeleteCar(Ident);
return "";
}
void TClient::SetCarData(int Ident, const std::string& Data) {
{ // lock
auto Lock = VehicleData.synchronize();
for (auto& v : *Lock) {
if (v.ID() == Ident) {
v.SetData(Data);
return;
}
}
} // unlock
DeleteCar(Ident);
}
int TClient::GetCarCount() const {
return int(VehicleData->size());
}
TServer& TClient::Server() const {
return mServer;
}
void TClient::EnqueuePacket(const std::vector<uint8_t>& Packet) {
MissedPacketsQueue->push(Packet);
}
TClient::TClient(TServer& Server, ip::tcp::socket&& Socket)
: TCPSocket(std::move(Socket))
, DownSocket(ip::tcp::socket(Server.IoCtx()))
, LastPingTime(std::chrono::high_resolution_clock::now())
, mServer(Server) {
}
TClient::~TClient() {
beammp_debugf("client destroyed: {} ('{}')", ID.get(), Name.get());
}
void TClient::UpdatePingTime() {
LastPingTime = std::chrono::high_resolution_clock::now();
}
int TClient::SecondsSinceLastPing() {
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::high_resolution_clock::now() - LastPingTime.get())
.count();
return int(seconds);
}
std::optional<std::shared_ptr<TClient>> GetClient(TServer& Server, int ID) {
std::optional<std::shared_ptr<TClient>> MaybeClient { std::nullopt };
Server.ForEachClient([ID, &MaybeClient](const auto& Client) -> bool {
if (Client->ID.get() == ID) {
MaybeClient = Client;
return false;
}
return true;
});
return MaybeClient;
}
void TClient::SetIdentifier(const std::string& key, const std::string& value) {
// I know this is bad, but what can ya do
Identifiers->operator[](key) = value;
}

70
src/Network.cpp Normal file
View File

@@ -0,0 +1,70 @@
#include "Network.h"
#include "Common.h"
Packet Client::tcp_read(boost::system::error_code& ec) {
std::unique_lock lock(m_tcp_read_mtx);
return Packet();
// CONTINUE
}
void Client::tcp_write(const Packet& packet, boost::system::error_code& ec) {
std::unique_lock lock(m_tcp_write_mtx);
auto header = packet.header();
std::vector<uint8_t> header_data(bmp::Header::SERIALIZED_SIZE);
if (header.flags != bmp::Flags::None) {
beammp_errorf("Flags are not implemented");
}
header.serialize_to(header_data);
write(m_tcp_socket, buffer(header_data), ec);
if (!ec) {
write(m_tcp_socket, buffer(packet.data), ec);
}
}
Packet Client::udp_read(boost::system::error_code& ec, ip::udp::socket& socket) {
// maximum we can ever expect from udp
static thread_local std::vector<uint8_t> s_buffer(std::numeric_limits<uint16_t>::max());
std::unique_lock lock(m_udp_read_mtx);
socket.receive_from(buffer(s_buffer),m_udp_ep, {}, ec);
if (!ec) {
Packet packet;
bmp::Header header{};
auto offset = header.deserialize_from(s_buffer);
if (header.flags != bmp::Flags::None) {
beammp_errorf("Flags are not implemented");
return {};
}
}
return {};
}
void Client::udp_write(const Packet& packet, ip::udp::socket& socket, boost::system::error_code& ec) {
auto header = packet.header();
std::vector<uint8_t> data(header.size + bmp::Header::SERIALIZED_SIZE);
auto offset = header.serialize_to(data);
std::copy(packet.data.begin(), packet.data.end(), data.begin() + static_cast<long>(offset));
socket.send_to(buffer(data), m_udp_ep, {}, ec);
}
Client::~Client() {
m_tcp_socket.shutdown(boost::asio::socket_base::shutdown_receive);
}
Client::Client(ip::udp::endpoint& ep, ip::tcp::socket&& socket)
: m_udp_ep(ep)
, m_tcp_socket(std::forward<ip::tcp::socket&&>(socket)) {
}
void Client::tcp_write_file_raw(const std::filesystem::path& path, boost::system::error_code& ec) {
#if defined(BEAMMP_LINUX)
#elif defined(BEAMMP_WINDOWS)
#else
#endif
}
bmp::Header Packet::header() const {
return {
.purpose = purpose,
.flags = bmp::Flags::None,
.rsv = 0,
.size = static_cast<uint32_t>(data.size()),
};
}

View File

@@ -1,948 +0,0 @@
#include "TNetwork.h"
#include "Client.h"
#include "Common.h"
#include "LuaAPI.h"
#include "TLuaEngine.h"
#include "nlohmann/json.hpp"
#include <CustomAssert.h>
#include <Http.h>
#include <array>
#include <boost/asio/ip/address.hpp>
#include <boost/asio/ip/address_v4.hpp>
#include <boost/thread/synchronized_value.hpp>
#include <cstring>
typedef boost::asio::detail::socket_option::integer<SOL_SOCKET, SO_RCVTIMEO> rcv_timeout_option;
std::vector<uint8_t> StringToVector(const std::string& Str) {
return std::vector<uint8_t>(Str.data(), Str.data() + Str.size());
}
static void CompressProperly(std::vector<uint8_t>& Data) {
constexpr std::string_view ABG = "ABG:";
auto CombinedData = std::vector<uint8_t>(ABG.begin(), ABG.end());
auto CompData = Comp(Data);
CombinedData.resize(ABG.size() + CompData.size());
std::copy(CompData.begin(), CompData.end(), CombinedData.begin() + ABG.size());
Data = CombinedData;
}
TNetwork::TNetwork(TServer& Server, TPPSMonitor& PPSMonitor, TResourceManager& ResourceManager)
: mServer(Server)
, mPPSMonitor(PPSMonitor)
, mUDPSock(Server.IoCtx())
, mResourceManager(ResourceManager) {
Application::SetSubsystemStatus("TCPNetwork", Application::Status::Starting);
Application::SetSubsystemStatus("UDPNetwork", Application::Status::Starting);
Application::RegisterShutdownHandler([&] {
beammp_debug("Kicking all players due to shutdown");
Server.ForEachClient([&](std::shared_ptr<TClient> Client) -> bool {
ClientKick(*Client, "Server shutdown");
return true;
});
});
Application::RegisterShutdownHandler([&] {
Application::SetSubsystemStatus("UDPNetwork", Application::Status::ShuttingDown);
if (mUDPThread.joinable()) {
mUDPThread.detach();
}
Application::SetSubsystemStatus("UDPNetwork", Application::Status::Shutdown);
});
Application::RegisterShutdownHandler([&] {
Application::SetSubsystemStatus("TCPNetwork", Application::Status::ShuttingDown);
if (mTCPThread.joinable()) {
mTCPThread.detach();
}
Application::SetSubsystemStatus("TCPNetwork", Application::Status::Shutdown);
});
mTCPThread = std::thread(&TNetwork::TCPServerMain, this);
mUDPThread = std::thread(&TNetwork::UDPServerMain, this);
}
void TNetwork::UDPServerMain() {
RegisterThread("UDPServer");
ip::udp::endpoint UdpListenEndpoint(ip::address::from_string("0.0.0.0"), Application::Settings.Port);
boost::system::error_code ec;
mUDPSock.open(UdpListenEndpoint.protocol(), ec);
if (ec) {
beammp_error("open() failed: " + ec.message());
std::this_thread::sleep_for(std::chrono::seconds(5));
Application::GracefullyShutdown();
}
mUDPSock.bind(UdpListenEndpoint, ec);
if (ec) {
beammp_error("bind() failed: " + ec.message());
std::this_thread::sleep_for(std::chrono::seconds(5));
Application::GracefullyShutdown();
}
Application::SetSubsystemStatus("UDPNetwork", Application::Status::Good);
beammp_info(("Vehicle data network online on port ") + std::to_string(Application::Settings.Port) + (" with a Max of ")
+ std::to_string(Application::Settings.MaxPlayers) + (" Clients"));
while (!Application::IsShuttingDown()) {
try {
ip::udp::endpoint client {};
std::vector<uint8_t> Data = UDPRcvFromClient(client); // Receives any data from Socket
auto Pos = std::find(Data.begin(), Data.end(), ':');
if (Data.empty() || Pos > Data.begin() + 2)
continue;
uint8_t ID = uint8_t(Data.at(0)) - 1;
mServer.ForEachClient([&](std::shared_ptr<TClient> Client) -> bool {
if (Client->ID == ID) {
Client->UDPAddress = client;
Client->IsConnected = true;
Data.erase(Data.begin(), Data.begin() + 2);
mServer.GlobalParser(Client, std::move(Data), mPPSMonitor, *this);
}
return true;
});
} catch (const std::exception& e) {
beammp_error(("fatal: ") + std::string(e.what()));
}
}
}
void TNetwork::TCPServerMain() {
RegisterThread("TCPServer");
ip::tcp::endpoint ListenEp(ip::address::from_string("0.0.0.0"), Application::Settings.Port);
ip::tcp::socket Listener(mServer.IoCtx());
boost::system::error_code ec;
Listener.open(ListenEp.protocol(), ec);
if (ec) {
beammp_errorf("Failed to open socket: {}", ec.message());
return;
}
socket_base::linger LingerOpt {};
LingerOpt.enabled(false);
Listener.set_option(LingerOpt, ec);
if (ec) {
beammp_errorf("Failed to set up listening socket to not linger / reuse address. "
"This may cause the socket to refuse to bind(). Error: {}",
ec.message());
}
ip::tcp::acceptor Acceptor(mServer.IoCtx(), ListenEp);
Acceptor.listen(socket_base::max_listen_connections, ec);
if (ec) {
beammp_errorf("listen() failed, which is needed for the server to operate. "
"Shutting down. Error: {}",
ec.message());
Application::GracefullyShutdown();
}
Application::SetSubsystemStatus("TCPNetwork", Application::Status::Good);
beammp_info("Vehicle event network online");
do {
try {
if (Application::IsShuttingDown()) {
beammp_debug("shutdown during TCP wait for accept loop");
break;
}
ip::tcp::endpoint ClientEp;
ip::tcp::socket ClientSocket = Acceptor.accept(ClientEp, ec);
if (ec) {
beammp_errorf("failed to accept: {}", ec.message());
}
TConnection Conn { std::move(ClientSocket), ClientEp };
std::thread ID(&TNetwork::Identify, this, std::move(Conn));
ID.detach(); // TODO: Add to a queue and attempt to join periodically
} catch (const std::exception& e) {
beammp_error("fatal: " + std::string(e.what()));
}
} while (!Application::IsShuttingDown());
}
#undef GetObject // Fixes Windows
#include "Json.h"
namespace json = rapidjson;
void TNetwork::Identify(TConnection&& RawConnection) {
RegisterThreadAuto();
char Code;
boost::system::error_code ec;
read(RawConnection.Socket, buffer(&Code, 1), ec);
if (ec) {
// TODO: is this right?!
RawConnection.Socket.shutdown(socket_base::shutdown_both, ec);
return;
}
std::shared_ptr<TClient> Client { nullptr };
try {
if (Code == 'C') {
Client = Authentication(std::move(RawConnection));
} else if (Code == 'D') {
HandleDownload(std::move(RawConnection));
} else if (Code == 'P') {
boost::system::error_code ec;
write(RawConnection.Socket, buffer("P"), ec);
return;
} else {
beammp_errorf("Invalid code got in Identify: '{}'", Code);
}
} catch (const std::exception& e) {
beammp_errorf("Error during handling of code {}->client left in invalid state, closing socket", Code);
boost::system::error_code ec;
RawConnection.Socket.shutdown(socket_base::shutdown_both, ec);
if (ec) {
beammp_debugf("Failed to shutdown client socket: {}", ec.message());
}
RawConnection.Socket.close(ec);
if (ec) {
beammp_debugf("Failed to close client socket: {}", ec.message());
}
}
}
void TNetwork::HandleDownload(TConnection&& Conn) {
char D;
boost::system::error_code ec;
read(Conn.Socket, buffer(&D, 1), ec);
if (ec) {
Conn.Socket.shutdown(socket_base::shutdown_both, ec);
// ignore ec
return;
}
auto ID = uint8_t(D);
auto Client = GetClient(mServer, ID);
if (Client.has_value()) {
auto New = Sync<ip::tcp::socket>(std::move(Conn.Socket));
Client.value()->DownSocket.swap(New);
}
}
std::string HashPassword(const std::string& str) {
std::stringstream ret;
unsigned char* hash = SHA256(reinterpret_cast<const unsigned char*>(str.c_str()), str.length(), nullptr);
for (int i = 0; i < 32; i++) {
ret << std::hex << static_cast<int>(hash[i]);
}
return ret.str();
}
std::shared_ptr<TClient> TNetwork::Authentication(TConnection&& RawConnection) {
auto Client = CreateClient(std::move(RawConnection.Socket));
Client->SetIdentifier("ip", RawConnection.SockAddr.address().to_string());
beammp_tracef("This thread is ip {}", RawConnection.SockAddr.address().to_string());
beammp_info("Identifying new ClientConnection...");
auto Data = TCPRcv(*Client);
constexpr std::string_view VC = "VC";
if (Data.size() > 3 && std::equal(Data.begin(), Data.begin() + VC.size(), VC.begin(), VC.end())) {
std::string ClientVersionStr(reinterpret_cast<const char*>(Data.data() + 2), Data.size() - 2);
Version ClientVersion = Application::VersionStrToInts(ClientVersionStr + ".0");
if (ClientVersion.major != Application::ClientMajorVersion()) {
beammp_errorf("Client tried to connect with version '{}', but only versions '{}.x.x' is allowed",
ClientVersion.AsString(), Application::ClientMajorVersion());
ClientKick(*Client, "Outdated Version!");
return nullptr;
}
} else {
ClientKick(*Client, fmt::format("Invalid version header: '{}' ({})", std::string(reinterpret_cast<const char*>(Data.data()), Data.size()), Data.size()));
return nullptr;
}
if (!TCPSend(*Client, StringToVector("A"))) { // changed to A for Accepted version
Disconnect(Client);
return nullptr;
}
Data = TCPRcv(*Client);
if (Data.size() > 50) {
ClientKick(*Client, "Invalid Key (too long)!");
return nullptr;
}
std::string key(reinterpret_cast<const char*>(Data.data()), Data.size());
nlohmann::json AuthReq {};
std::string AuthResStr {};
try {
AuthReq = nlohmann::json {
{ "key", key }
};
auto Target = "/pkToUser";
unsigned int ResponseCode = 0;
AuthResStr = Http::POST(Application::GetBackendUrlForAuth(), 443, Target, AuthReq.dump(), "application/json", &ResponseCode);
} catch (const std::exception& e) {
beammp_debugf("Invalid json sent by client, kicking: {}", e.what());
ClientKick(*Client, "Invalid Key (invalid UTF8 string)!");
return nullptr;
}
try {
nlohmann::json AuthRes = nlohmann::json::parse(AuthResStr);
if (AuthRes["username"].is_string() && AuthRes["roles"].is_string()
&& AuthRes["guest"].is_boolean() && AuthRes["identifiers"].is_array()) {
Client->SetName(AuthRes["username"]);
Client->SetRoles(AuthRes["roles"]);
Client->IsGuest = AuthRes["guest"];
for (const auto& ID : AuthRes["identifiers"]) {
auto Raw = std::string(ID);
auto SepIndex = Raw.find(':');
Client->SetIdentifier(Raw.substr(0, SepIndex), Raw.substr(SepIndex + 1));
}
} else {
beammp_error("Invalid authentication data received from authentication backend");
ClientKick(*Client, "Invalid authentication data!");
return nullptr;
}
} catch (const std::exception& e) {
beammp_errorf("Client sent invalid key. Error was: {}", e.what());
// TODO: we should really clarify that this was a backend response or parsing error
ClientKick(*Client, "Invalid key! Please restart your game.");
return nullptr;
}
if (!Application::Settings.Password.empty()) { // ask password
if (!TCPSend(*Client, StringToVector("S"))) {
Disconnect(Client);
return {};
}
beammp_info("Waiting for password");
Data = TCPRcv(*Client);
std::string Pass = std::string(reinterpret_cast<const char*>(Data.data()), Data.size());
if (Pass != HashPassword(Application::Settings.Password)) {
beammp_debug(Client->Name.get() + " attempted to connect with a wrong password");
ClientKick(*Client, "Wrong password!");
return {};
} else {
beammp_debug(Client->Name.get() + " used the correct password");
}
}
beammp_debug("Name-> " + Client->Name.get() + ", Guest-> " + std::to_string(Client->IsGuest.get()) + ", Roles-> " + Client->Role.get());
mServer.ForEachClient([&](const std::shared_ptr<TClient>& Cl) -> bool {
if (Cl->Name.get() == Client->Name.get() && Cl->IsGuest == Client->IsGuest) {
Disconnect(Cl);
return false;
}
return true;
});
auto Futures = LuaAPI::MP::Engine->TriggerEvent("onPlayerAuth", "", Client->Name.get(), Client->Role.get(), Client->IsGuest.get(), Client->Identifiers.get());
TLuaEngine::WaitForAll(Futures);
bool NotAllowed = std::any_of(Futures.begin(), Futures.end(),
[](const std::shared_ptr<TLuaResult>& Result) {
return !Result->Error && Result->Result.is<int>() && bool(Result->Result.as<int>());
});
std::string Reason;
bool NotAllowedWithReason = std::any_of(Futures.begin(), Futures.end(),
[&Reason](const std::shared_ptr<TLuaResult>& Result) -> bool {
if (!Result->Error && Result->Result.is<std::string>()) {
Reason = Result->Result.as<std::string>();
return true;
}
return false;
});
if (NotAllowed) {
ClientKick(*Client, "you are not allowed on the server!");
return {};
} else if (NotAllowedWithReason) {
ClientKick(*Client, Reason);
return {};
}
if (mServer.ClientCount() < size_t(Application::Settings.MaxPlayers)) {
beammp_info("Identification success");
mServer.InsertClient(Client);
try {
TCPClient(Client);
} catch (const std::exception& e) {
beammp_infof("Client {} disconnected: {}", Client->ID.get(), e.what());
Disconnect(Client);
return {};
}
} else {
ClientKick(*Client, "Server full!");
}
return Client;
}
std::shared_ptr<TClient> TNetwork::CreateClient(ip::tcp::socket&& TCPSock) {
auto c = std::make_shared<TClient>(mServer, std::move(TCPSock));
return c;
}
bool TNetwork::TCPSend(TClient& c, const std::vector<uint8_t>& Data, bool IsSync) {
if (!IsSync) {
if (c.IsSyncing) {
if (!Data.empty()) {
if (Data.at(0) == 'O' || Data.at(0) == 'A' || Data.at(0) == 'C' || Data.at(0) == 'E') {
c.EnqueuePacket(Data);
}
}
return true;
}
}
/*
* our TCP protocol sends a header of 4 bytes, followed by the data.
*
* [][][][][][]...[]
* ^------^^---...-^
* size data
*/
const auto Size = int32_t(Data.size());
std::vector<uint8_t> ToSend;
ToSend.resize(Data.size() + sizeof(Size));
std::memcpy(ToSend.data(), &Size, sizeof(Size));
std::memcpy(ToSend.data() + sizeof(Size), Data.data(), Data.size());
boost::system::error_code ec;
write(*c.TCPSocket.synchronize(), buffer(ToSend), ec);
if (ec) {
beammp_debugf("write(): {}", ec.message());
Disconnect(c);
return false;
}
c.UpdatePingTime();
return true;
}
std::vector<uint8_t> TNetwork::TCPRcv(TClient& c) {
if (c.IsDisconnected()) {
beammp_error("Client disconnected, cancelling TCPRcv");
return {};
}
int32_t Header {};
boost::system::error_code ec;
std::array<uint8_t, sizeof(Header)> HeaderData;
read(*c.TCPSocket.synchronize(), buffer(HeaderData), ec);
if (ec) {
// TODO: handle this case (read failed)
beammp_debugf("TCPRcv: Reading header failed: {}", ec.message());
return {};
}
Header = *reinterpret_cast<int32_t*>(HeaderData.data());
if (Header < 0) {
ClientKick(c, "Invalid packet->header negative");
beammp_errorf("Client {} send negative TCP header, ignoring packet", c.ID.get());
return {};
}
std::vector<uint8_t> Data;
// TODO: This is arbitrary, this needs to be handled another way
if (Header < int32_t(100 * MB)) {
Data.resize(Header);
} else {
ClientKick(c, "Header size limit exceeded");
beammp_warn("Client " + c.Name.get() + " (" + std::to_string(c.ID.get()) + ") sent header of >100MB->assuming malicious intent and disconnecting the client.");
return {};
}
auto N = read(*c.TCPSocket.synchronize(), buffer(Data), ec);
if (ec) {
// TODO: handle this case properly
beammp_debugf("TCPRcv: Reading data failed: {}", ec.message());
return {};
}
if (N != Header) {
beammp_errorf("Expected to read {} bytes, instead got {}", Header, N);
}
constexpr std::string_view ABG = "ABG:";
if (Data.size() >= ABG.size() && std::equal(Data.begin(), Data.begin() + ABG.size(), ABG.begin(), ABG.end())) {
Data.erase(Data.begin(), Data.begin() + ABG.size());
return DeComp(Data);
} else {
return Data;
}
}
void TNetwork::ClientKick(TClient& c, const std::string& R) {
beammp_info("Client kicked: " + R);
if (!TCPSend(c, StringToVector("K" + R))) {
beammp_debugf("tried to kick player '{}' (id {}), but was already disconnected", c.Name.get(), c.ID.get());
}
Disconnect(c);
}
void TNetwork::Looper(const std::shared_ptr<TClient>& Client) {
RegisterThreadAuto();
while (!Client->IsDisconnected()) {
if (!Client->IsSyncing.get() && Client->IsSynced.get() && Client->MissedPacketsQueue->size() != 0) {
while (Client->MissedPacketsQueue->size() > 0) {
std::vector<uint8_t> QData {};
{ // locked context
auto Lock = Client->MissedPacketsQueue;
if (Lock->size() <= 0) {
break;
}
QData = Lock->front();
Lock->pop();
} // end locked context
if (!TCPSend(*Client, QData, true)) {
Disconnect(Client);
auto Lock = Client->MissedPacketsQueue;
while (!Lock->empty()) {
Lock->pop();
}
break;
}
}
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
}
void TNetwork::TCPClient(const std::shared_ptr<TClient>& c) {
OnConnect(c);
RegisterThread("(" + std::to_string(c->ID.get()) + ") \"" + c->Name.get() + "\"");
std::thread QueueSync(&TNetwork::Looper, this, c);
while (true) {
if (c->IsDisconnected()) {
beammp_debug("client status < 0, breaking client loop");
break;
}
auto res = TCPRcv(*c);
if (res.empty()) {
beammp_debug("TCPRcv empty");
Disconnect(c);
break;
}
mServer.GlobalParser(c, std::move(res), mPPSMonitor, *this);
}
if (QueueSync.joinable())
QueueSync.join();
Disconnect(c);
}
void TNetwork::UpdatePlayer(TClient& Client) {
std::string Packet = ("Ss") + std::to_string(mServer.ClientCount()) + "/" + std::to_string(Application::Settings.MaxPlayers) + ":";
mServer.ForEachClient([&](const std::shared_ptr<TClient>& Client) -> bool {
ReadLock Lock(mServer.GetClientMutex());
Packet += Client->Name.get() + ",";
return true;
});
Packet = Packet.substr(0, Packet.length() - 1);
Client.EnqueuePacket(StringToVector(Packet));
//(void)Respond(Client, Packet, true);
}
void TNetwork::Disconnect(const std::weak_ptr<TClient>& ClientPtr) {
// this is how one checks that the ClientPtr is not empty (as opposed to expired)
if (ClientPtr.owner_before(std::weak_ptr<TClient> {})) {
return;
}
std::shared_ptr<TClient> LockedClientPtr { nullptr };
try {
LockedClientPtr = ClientPtr.lock();
} catch (const std::exception&) {
beammp_warn("Client expired in CloseSockets, this is unexpected");
return;
}
beammp_assert(LockedClientPtr != nullptr);
TClient& c = *LockedClientPtr;
Disconnect(c);
}
void TNetwork::Disconnect(TClient& Client) {
beammp_info(Client.Name.get() + (" Connection Terminated"));
std::string Packet;
{
auto Locked = Client.VehicleData.synchronize();
for (auto& v : *Locked) {
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent("onVehicleDeleted", "", Client.ID.get(), v.ID()));
Packet = "Od:" + std::to_string(Client.ID.get()) + "-" + std::to_string(v.ID());
SendToAll(&Client, StringToVector(Packet), false, true);
}
}
Packet = ("L") + Client.Name.get() + (" left the server!");
SendToAll(&Client, StringToVector(Packet), false, true);
Packet.clear();
auto Futures = LuaAPI::MP::Engine->TriggerEvent("onPlayerDisconnect", "", Client.ID.get());
TLuaEngine::WaitForAll(Futures);
Client.CloseSockets("Normal disconnect");
mServer.RemoveClient(Client);
}
void TNetwork::Disconnect(const std::shared_ptr<TClient>& ClientPtr) {
if (ClientPtr == nullptr) {
return;
}
Disconnect(*ClientPtr);
}
void TNetwork::OnConnect(const std::weak_ptr<TClient>& c) {
beammp_assert(!c.expired());
beammp_info("Client connected");
auto LockedClient = c.lock();
mServer.ClaimFreeIDFor(*LockedClient);
beammp_info("Assigned ID " + std::to_string(LockedClient->ID.get()) + " to " + LockedClient->Name.get());
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent("onPlayerConnecting", "", LockedClient->ID.get()));
SyncResources(*LockedClient);
if (LockedClient->IsDisconnected())
return;
(void)Respond(*LockedClient, StringToVector("M" + Application::Settings.MapName), true); // Send the Map on connect
beammp_info(LockedClient->Name.get() + " : Connected");
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent("onPlayerJoining", "", LockedClient->ID.get()));
}
void TNetwork::SyncResources(TClient& c) {
if (!TCPSend(c, StringToVector("P" + std::to_string(c.ID.get())))) {
throw std::runtime_error("Failed to send 'P' to client");
}
std::vector<uint8_t> Data;
while (!c.IsDisconnected()) {
Data = TCPRcv(c);
if (Data.empty()) {
break;
}
constexpr std::string_view Done = "Done";
if (std::equal(Data.begin(), Data.end(), Done.begin(), Done.end()))
break;
Parse(c, Data);
}
}
void TNetwork::Parse(TClient& c, const std::vector<uint8_t>& Packet) {
if (Packet.empty())
return;
char Code = Packet.at(0), SubCode = 0;
if (Packet.size() > 1)
SubCode = Packet.at(1);
switch (Code) {
case 'f':
SendFile(c, std::string(reinterpret_cast<const char*>(Packet.data() + 1), Packet.size() - 1));
return;
case 'S':
if (SubCode == 'R') {
beammp_debug("Sending Mod Info");
std::string ToSend = mResourceManager.FileList() + mResourceManager.FileSizes();
if (ToSend.empty())
ToSend = "-";
if (!TCPSend(c, StringToVector(ToSend))) {
throw std::runtime_error("Failed to send packet to client");
}
}
return;
default:
return;
}
}
void TNetwork::SendFile(TClient& c, const std::string& UnsafeName) {
beammp_info(c.Name.get() + " requesting : " + UnsafeName.substr(UnsafeName.find_last_of('/')));
if (!fs::path(UnsafeName).has_filename()) {
if (!TCPSend(c, StringToVector("CO"))) {
Disconnect(c);
return;
}
beammp_warn("File " + UnsafeName + " is not a file!");
return;
}
auto FileName = fs::path(UnsafeName).filename().string();
FileName = Application::Settings.Resource + "/Client/" + FileName;
if (!std::filesystem::exists(FileName)) {
if (!TCPSend(c, StringToVector("CO"))) {
Disconnect(c);
return;
}
beammp_warn("File " + UnsafeName + " could not be accessed!");
return;
}
if (!TCPSend(c, StringToVector("AG"))) {
Disconnect(c);
return;
}
/// Wait for connections
int T = 0;
while (!c.DownSocket->is_open() && T < 50) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
T++;
}
if (!c.DownSocket->is_open()) {
beammp_error("Client doesn't have a download socket!");
if (!c.IsDisconnected())
Disconnect(c);
return;
}
size_t Size = size_t(std::filesystem::file_size(FileName)), MSize = Size / 2;
std::thread SplitThreads[2] {
std::thread([&] {
RegisterThread("SplitLoad_0");
SplitLoad(c, 0, MSize, false, FileName);
}),
std::thread([&] {
RegisterThread("SplitLoad_1");
SplitLoad(c, MSize, Size, true, FileName);
})
};
for (auto& SplitThread : SplitThreads) {
if (SplitThread.joinable()) {
SplitThread.join();
}
}
}
static std::pair<size_t /* count */, size_t /* last chunk */> SplitIntoChunks(size_t FullSize, size_t ChunkSize) {
if (FullSize < ChunkSize) {
return { 0, FullSize };
}
size_t Count = FullSize / (FullSize / ChunkSize);
size_t LastChunkSize = FullSize - (Count * ChunkSize);
return { Count, LastChunkSize };
}
TEST_CASE("SplitIntoChunks") {
size_t FullSize;
size_t ChunkSize;
SUBCASE("Normal case") {
FullSize = 1234567;
ChunkSize = 1234;
}
SUBCASE("Zero original size") {
FullSize = 0;
ChunkSize = 100;
}
SUBCASE("Equal full size and chunk size") {
FullSize = 125;
ChunkSize = 125;
}
SUBCASE("Even split") {
FullSize = 10000;
ChunkSize = 100;
}
SUBCASE("Odd split") {
FullSize = 13;
ChunkSize = 2;
}
SUBCASE("Large sizes") {
FullSize = 10 * GB;
ChunkSize = 125 * MB;
}
auto [Count, LastSize] = SplitIntoChunks(FullSize, ChunkSize);
CHECK((Count * ChunkSize) + LastSize == FullSize);
}
const uint8_t* /* end ptr */ TNetwork::SendSplit(TClient& c, ip::tcp::socket& Socket, const uint8_t* DataPtr, size_t Size) {
if (TCPSendRaw(c, Socket, DataPtr, Size)) {
return DataPtr + Size;
} else {
return nullptr;
}
}
void TNetwork::SplitLoad(TClient& c, size_t Sent, size_t Size, bool D, const std::string& Name) {
std::ifstream f(Name.c_str(), std::ios::binary);
uint32_t Split = 125 * MB;
std::vector<uint8_t> Data;
if (Size > Split)
Data.resize(Split);
else
Data.resize(Size);
if (D) {
auto TCPSock = c.DownSocket.synchronize();
while (!c.IsDisconnected() && Sent < Size) {
size_t Diff = Size - Sent;
if (Diff > Split) {
f.seekg(Sent, std::ios_base::beg);
f.read(reinterpret_cast<char*>(Data.data()), Split);
if (!TCPSendRaw(c, *TCPSock, Data.data(), Split)) {
if (!c.IsDisconnected())
Disconnect(c);
break;
}
Sent += Split;
} else {
f.seekg(Sent, std::ios_base::beg);
f.read(reinterpret_cast<char*>(Data.data()), Diff);
if (!TCPSendRaw(c, *TCPSock, Data.data(), int32_t(Diff))) {
if (!c.IsDisconnected())
Disconnect(c);
break;
}
Sent += Diff;
}
}
} else {
auto TCPSock = c.TCPSocket.synchronize();
while (!c.IsDisconnected() && Sent < Size) {
size_t Diff = Size - Sent;
if (Diff > Split) {
f.seekg(Sent, std::ios_base::beg);
f.read(reinterpret_cast<char*>(Data.data()), Split);
if (!TCPSendRaw(c, *TCPSock, Data.data(), Split)) {
if (!c.IsDisconnected())
Disconnect(c);
break;
}
Sent += Split;
} else {
f.seekg(Sent, std::ios_base::beg);
f.read(reinterpret_cast<char*>(Data.data()), Diff);
if (!TCPSendRaw(c, *TCPSock, Data.data(), int32_t(Diff))) {
if (!c.IsDisconnected())
Disconnect(c);
break;
}
Sent += Diff;
}
}
}
}
bool TNetwork::TCPSendRaw(TClient& C, ip::tcp::socket& socket, const uint8_t* Data, size_t Size) {
boost::system::error_code ec;
write(socket, buffer(Data, Size), ec);
if (ec) {
beammp_errorf("Failed to send raw data to client: {}", ec.message());
return false;
}
C.UpdatePingTime();
return true;
}
bool TNetwork::SendLarge(TClient& c, std::vector<uint8_t> Data, bool isSync) {
if (Data.size() > 400) {
CompressProperly(Data);
}
return TCPSend(c, Data, isSync);
}
bool TNetwork::Respond(TClient& c, const std::vector<uint8_t>& MSG, bool Rel, bool isSync) {
char C = MSG.at(0);
if (Rel || C == 'W' || C == 'Y' || C == 'V' || C == 'E') {
if (C == 'O' || C == 'T' || MSG.size() > 1000) {
return SendLarge(c, MSG, isSync);
} else {
return TCPSend(c, MSG, isSync);
}
} else {
return UDPSend(c, MSG);
}
}
bool TNetwork::SyncClient(const std::weak_ptr<TClient>& c) {
if (c.expired()) {
return false;
}
auto LockedClient = c.lock();
if (LockedClient->IsSynced.get())
return true;
// Syncing, later set isSynced
// after syncing is done, we apply all packets they missed
if (!Respond(*LockedClient, StringToVector("Sn" + LockedClient->Name.get()), true)) {
return false;
}
// ignore error
(void)SendToAll(LockedClient.get(), StringToVector("JWelcome " + LockedClient->Name.get() + "!"), false, true);
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent("onPlayerJoin", "", LockedClient->ID.get()));
LockedClient->IsSyncing = true;
bool Return = false;
bool res = true;
mServer.ForEachClient([&](const std::shared_ptr<TClient>& Client) -> bool {
auto VehicleData = Client->VehicleData.synchronize();
if (Client != LockedClient) {
for (auto& v : *VehicleData) {
if (LockedClient->IsDisconnected()) {
Return = true;
res = false;
return false;
}
res = Respond(*LockedClient, StringToVector(v.Data()), true, true);
}
}
return true;
});
LockedClient->IsSyncing = false;
if (Return) {
return res;
}
LockedClient->IsSynced = true;
beammp_info(LockedClient->Name.get() + (" is now synced!"));
return true;
}
void TNetwork::SendToAll(TClient* c, const std::vector<uint8_t>& Data, bool Self, bool Rel) {
if (!Self)
beammp_assert(c);
char C = static_cast<char>(Data.at(0));
mServer.ForEachClient([&](const std::shared_ptr<TClient>& Client) -> bool {
if (Self || Client.get() != c) {
if (Client->IsSynced || Client->IsSyncing) {
if (Rel || C == 'W' || C == 'Y' || C == 'V' || C == 'E') {
if (C == 'O' || C == 'T' || Data.size() > 1000) {
if (Data.size() > 400) {
auto CompressedData = Data;
CompressProperly(CompressedData);
Client->EnqueuePacket(CompressedData);
} else {
Client->EnqueuePacket(Data);
}
} else {
Client->EnqueuePacket(Data);
}
} else {
if (!UDPSend(*Client, Data)) {
Disconnect(Client);
}
}
}
}
return true;
});
}
bool TNetwork::UDPSend(TClient& Client, std::vector<uint8_t> Data) {
if (!Client.IsConnected || Client.IsDisconnected()) {
// this can happen if we try to send a packet to a client that is either
// 1. not yet fully connected, or
// 2. disconnected and not yet fully removed
// this is fine can can be ignored :^)
return true;
}
const auto Addr = Client.UDPAddress;
if (Data.size() > 400) {
CompressProperly(Data);
}
boost::system::error_code ec;
mUDPSock.send_to(buffer(Data), *Addr, 0, ec);
if (ec) {
beammp_debugf("UDP sendto() failed: {}", ec.message());
if (!Client.IsDisconnected())
Disconnect(Client);
return false;
}
return true;
}
std::vector<uint8_t> TNetwork::UDPRcvFromClient(ip::udp::endpoint& ClientEndpoint) {
std::array<char, 1024> Ret {};
boost::system::error_code ec;
const auto Rcv = mUDPSock.receive_from(mutable_buffer(Ret.data(), Ret.size()), ClientEndpoint, 0, ec);
if (ec) {
beammp_errorf("UDP recvfrom() failed: {}", ec.message());
return {};
}
beammp_assert(Rcv <= Ret.size());
return std::vector<uint8_t>(Ret.begin(), Ret.begin() + Rcv);
}

View File

@@ -1,61 +0,0 @@
#include "TPPSMonitor.h"
#include "Client.h"
#include "TNetwork.h"
TPPSMonitor::TPPSMonitor(TServer& Server)
: mServer(Server) {
Application::SetSubsystemStatus("PPSMonitor", Application::Status::Starting);
Application::SetPPS("-");
Application::RegisterShutdownHandler([&] {
Application::SetSubsystemStatus("PPSMonitor", Application::Status::ShuttingDown);
if (mThread.joinable()) {
beammp_debug("shutting down PPSMonitor");
mThread.join();
beammp_debug("shut down PPSMonitor");
}
Application::SetSubsystemStatus("PPSMonitor", Application::Status::Shutdown);
});
Start();
}
void TPPSMonitor::operator()() {
RegisterThread("PPSMonitor");
while (!mNetwork) {
// hard(-ish) spin
std::this_thread::yield();
}
beammp_debug("PPSMonitor starting");
Application::SetSubsystemStatus("PPSMonitor", Application::Status::Good);
std::vector<std::shared_ptr<TClient>> TimedOutClients;
while (!Application::IsShuttingDown()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
int C = 0, V = 0;
if (mServer.ClientCount() == 0) {
Application::SetPPS("-");
continue;
}
mServer.ForEachClient([&](const std::shared_ptr<TClient>& c) -> bool {
if (c->GetCarCount() > 0) {
C++;
V += c->GetCarCount();
}
// kick on "no ping"
if (c->SecondsSinceLastPing() > (20 * 60)) {
beammp_debug("client " + std::string("(") + std::to_string(c->ID.get()) + ")" + c->Name.get() + " timing out: " + std::to_string(c->SecondsSinceLastPing()) + ", pps: " + Application::PPS());
TimedOutClients.push_back(c);
}
return true;
});
for (auto& ClientToKick : TimedOutClients) {
Network().ClientKick(*ClientToKick, "Timeout (no ping for way too long)");
}
TimedOutClients.clear();
if (C == 0 || mInternalPPS == 0) {
Application::SetPPS("-");
} else {
int R = (mInternalPPS / C) / V;
Application::SetPPS(std::to_string(R));
}
mInternalPPS = 0;
}
}

View File

@@ -1,36 +0,0 @@
#include "TResourceManager.h"
#include <algorithm>
#include <filesystem>
namespace fs = std::filesystem;
TResourceManager::TResourceManager() {
Application::SetSubsystemStatus("ResourceManager", Application::Status::Starting);
std::string Path = Application::Settings.Resource + "/Client";
if (!fs::exists(Path))
fs::create_directories(Path);
for (const auto& entry : fs::directory_iterator(Path)) {
std::string File(entry.path().string());
if (auto pos = File.find(".zip"); pos != std::string::npos) {
if (File.length() - pos == 4) {
std::replace(File.begin(), File.end(), '\\', '/');
mFileList += File + ';';
if (auto i = File.find_last_of('/'); i != std::string::npos) {
++i;
File = File.substr(i, pos - i);
}
mTrimmedList += "/" + fs::path(File).filename().string() + ';';
mFileSizes += std::to_string(size_t(fs::file_size(entry.path()))) + ';';
mMaxModSize += size_t(fs::file_size(entry.path()));
mModsLoaded++;
}
}
}
if (mModsLoaded) {
beammp_info("Loaded " + std::to_string(mModsLoaded) + " Mods");
}
Application::SetSubsystemStatus("ResourceManager", Application::Status::Good);
}

View File

@@ -1,530 +0,0 @@
#include "TServer.h"
#include "Client.h"
#include "Common.h"
#include "CustomAssert.h"
#include "TNetwork.h"
#include "TPPSMonitor.h"
#include <TLuaPlugin.h>
#include <algorithm>
#include <any>
#include <optional>
#include <sstream>
#include <nlohmann/json.hpp>
#include "LuaAPI.h"
#undef GetObject // Fixes Windows
#include "Json.h"
static std::optional<std::pair<int, int>> GetPidVid(const std::string& str) {
auto IDSep = str.find('-');
std::string pid = str.substr(0, IDSep);
std::string vid = str.substr(IDSep + 1);
if (pid.find_first_not_of("0123456789") == std::string::npos && vid.find_first_not_of("0123456789") == std::string::npos) {
try {
int PID = stoi(pid);
int VID = stoi(vid);
return { { PID, VID } };
} catch (const std::exception&) {
return std::nullopt;
}
}
return std::nullopt;
}
TEST_CASE("GetPidVid") {
SUBCASE("Valid singledigit") {
const auto MaybePidVid = GetPidVid("0-1");
CHECK(MaybePidVid);
auto [pid, vid] = MaybePidVid.value();
CHECK_EQ(pid, 0);
CHECK_EQ(vid, 1);
}
SUBCASE("Valid doubledigit") {
const auto MaybePidVid = GetPidVid("10-12");
CHECK(MaybePidVid);
auto [pid, vid] = MaybePidVid.value();
CHECK_EQ(pid, 10);
CHECK_EQ(vid, 12);
}
SUBCASE("Valid doubledigit 2") {
const auto MaybePidVid = GetPidVid("10-2");
CHECK(MaybePidVid);
auto [pid, vid] = MaybePidVid.value();
CHECK_EQ(pid, 10);
CHECK_EQ(vid, 2);
}
SUBCASE("Valid doubledigit 3") {
const auto MaybePidVid = GetPidVid("33-23");
CHECK(MaybePidVid);
auto [pid, vid] = MaybePidVid.value();
CHECK_EQ(pid, 33);
CHECK_EQ(vid, 23);
}
SUBCASE("Valid doubledigit 4") {
const auto MaybePidVid = GetPidVid("3-23");
CHECK(MaybePidVid);
auto [pid, vid] = MaybePidVid.value();
CHECK_EQ(pid, 3);
CHECK_EQ(vid, 23);
}
SUBCASE("Empty string") {
const auto MaybePidVid = GetPidVid("");
CHECK(!MaybePidVid);
}
SUBCASE("Invalid separator") {
const auto MaybePidVid = GetPidVid("0x0");
CHECK(!MaybePidVid);
}
SUBCASE("Missing pid") {
const auto MaybePidVid = GetPidVid("-0");
CHECK(!MaybePidVid);
}
SUBCASE("Missing vid") {
const auto MaybePidVid = GetPidVid("0-");
CHECK(!MaybePidVid);
}
SUBCASE("Invalid pid") {
const auto MaybePidVid = GetPidVid("x-0");
CHECK(!MaybePidVid);
}
SUBCASE("Invalid vid") {
const auto MaybePidVid = GetPidVid("0-x");
CHECK(!MaybePidVid);
}
}
TServer::TServer(const std::vector<std::string_view>& Arguments) {
beammp_info("BeamMP Server v" + Application::ServerVersionString());
Application::SetSubsystemStatus("Server", Application::Status::Starting);
if (Arguments.size() > 1) {
Application::Settings.CustomIP = Arguments[0];
size_t n = std::count(Application::Settings.CustomIP.begin(), Application::Settings.CustomIP.end(), '.');
auto p = Application::Settings.CustomIP.find_first_not_of(".0123456789");
if (p != std::string::npos || n != 3 || Application::Settings.CustomIP.substr(0, 3) == "127") {
Application::Settings.CustomIP.clear();
beammp_warn("IP Specified is invalid! Ignoring");
} else {
beammp_info("server started with custom IP");
}
}
Application::SetSubsystemStatus("Server", Application::Status::Good);
}
void TServer::RemoveClient(TClient& Client) {
beammp_debug("removing client " + Client.Name.get() + " (" + std::to_string(ClientCount()) + ")");
Client.ClearCars();
WriteLock Lock(mClientsMutex);
// erase all clients whose id matches, and those who have expired
std::erase_if(mClients,
[&](const std::weak_ptr<TClient>& C) {
return C.expired()
|| C.lock()->ID.get() == Client.ID.get();
});
}
void TServer::RemoveClient(const std::weak_ptr<TClient>& WeakClientPtr) {
std::shared_ptr<TClient> LockedClientPtr { nullptr };
try {
LockedClientPtr = WeakClientPtr.lock();
} catch (const std::exception&) {
// silently fail, as there's nothing to do
return;
}
beammp_assert(LockedClientPtr != nullptr);
TClient& Client = *LockedClientPtr;
RemoveClient(Client);
}
void TServer::ClaimFreeIDFor(TClient& ClientToChange) {
ReadLock lock(mClientsMutex);
int ID = 0;
bool found = true;
do {
found = true;
for (const auto& Client : mClients) {
if (Client->ID.get() == ID) {
found = false;
++ID;
}
}
} while (!found);
ClientToChange.ID = ID;
}
void TServer::ForEachClient(const std::function<bool(const std::shared_ptr<TClient>&)> Fn) {
decltype(mClients) Clients;
{
ReadLock lock(mClientsMutex);
Clients = mClients;
}
for (auto& Client : Clients) {
if (!Fn(Client)) {
break;
}
}
}
size_t TServer::ClientCount() const {
ReadLock Lock(mClientsMutex);
return mClients.size();
}
void TServer::GlobalParser(const std::shared_ptr<TClient>& Client, std::vector<uint8_t>&& Packet, TPPSMonitor& PPSMonitor, TNetwork& Network) {
constexpr std::string_view ABG = "ABG:";
if (Packet.size() >= ABG.size() && std::equal(Packet.begin(), Packet.begin() + ABG.size(), ABG.begin(), ABG.end())) {
Packet.erase(Packet.begin(), Packet.begin() + ABG.size());
Packet = DeComp(Packet);
}
if (Packet.empty()) {
return;
}
std::any Res;
char Code = Packet.at(0);
std::string StringPacket(reinterpret_cast<const char*>(Packet.data()), Packet.size());
// V to Y
if (Code <= 89 && Code >= 86) {
PPSMonitor.IncrementInternalPPS();
Network.SendToAll(Client.get(), Packet, false, false);
return;
}
switch (Code) {
case 'H': // initial connection
if (!Network.SyncClient(Client)) {
// TODO handle
}
return;
case 'p':
if (!Network.Respond(*Client, StringToVector("p"), false)) {
// failed to send
Network.Disconnect(Client);
} else {
Network.UpdatePlayer(*Client);
}
return;
case 'O':
if (Packet.size() > 1000) {
beammp_debug(("Received data from: ") + Client->Name.get() + (" Size: ") + std::to_string(Packet.size()));
}
ParseVehicle(*Client, StringPacket, Network);
return;
case 'C': {
if (Packet.size() < 4 || std::find(Packet.begin() + 3, Packet.end(), ':') == Packet.end())
break;
const auto PacketAsString = std::string(reinterpret_cast<const char*>(Packet.data()), Packet.size());
std::string Message = "";
const auto ColonPos = PacketAsString.find(':', 3);
if (ColonPos != std::string::npos && ColonPos + 2 < PacketAsString.size()) {
Message = PacketAsString.substr(ColonPos + 2);
}
if (Message.empty()) {
beammp_debugf("Empty chat message received from '{}' ({}), ignoring it", Client->Name.get(), Client->ID.get());
return;
}
auto Futures = LuaAPI::MP::Engine->TriggerEvent("onChatMessage", "", Client->ID.get(), Client->Name.get(), Message);
TLuaEngine::WaitForAll(Futures);
LogChatMessage(Client->Name.get(), Client->ID.get(), PacketAsString.substr(PacketAsString.find(':', 3) + 1));
if (std::any_of(Futures.begin(), Futures.end(),
[](const std::shared_ptr<TLuaResult>& Elem) {
return !Elem->Error
&& Elem->Result.is<int>()
&& bool(Elem->Result.as<int>());
})) {
break;
}
std::string SanitizedPacket = fmt::format("C:{}: {}", Client->Name.get(), Message);
Network.SendToAll(nullptr, StringToVector(SanitizedPacket), true, true);
return;
}
case 'E':
HandleEvent(*Client, StringPacket);
return;
case 'N':
beammp_trace("got 'N' packet (" + std::to_string(Packet.size()) + ")");
Network.SendToAll(Client.get(), Packet, false, true);
return;
case 'Z': // position packet
PPSMonitor.IncrementInternalPPS();
Network.SendToAll(Client.get(), Packet, false, false);
HandlePosition(*Client, StringPacket);
return;
default:
return;
}
}
void TServer::HandleEvent(TClient& c, const std::string& RawData) {
// E:Name:Data
// Data is allowed to have ':'
if (RawData.size() < 2) {
beammp_debugf("Client '{}' ({}) tried to send an empty event, ignoring", c.Name.get(), c.ID.get());
return;
}
auto NameDataSep = RawData.find(':', 2);
if (NameDataSep == std::string::npos) {
beammp_warn("received event in invalid format (missing ':'), got: '" + RawData + "'");
}
std::string Name = RawData.substr(2, NameDataSep - 2);
std::string Data = RawData.substr(NameDataSep + 1);
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent(Name, "", c.ID.get(), Data));
}
bool TServer::IsUnicycle(TClient& c, const std::string& CarJson) {
try {
auto Car = nlohmann::json::parse(CarJson);
const std::string jbm = "jbm";
if (Car.contains(jbm) && Car[jbm].is_string() && Car[jbm] == "unicycle") {
return true;
}
} catch (const std::exception& e) {
beammp_warn("Failed to parse vehicle data as json for client " + std::to_string(c.ID.get()) + ": '" + CarJson + "'.");
}
return false;
}
bool TServer::ShouldSpawn(TClient& c, const std::string& CarJson, int ID) {
auto UnicycleID = c.UnicycleID.synchronize();
if (IsUnicycle(c, CarJson) && *UnicycleID < 0) {
*UnicycleID = ID;
return true;
} else {
return c.GetCarCount() < Application::Settings.MaxCars;
}
}
void TServer::ParseVehicle(TClient& c, const std::string& Pckt, TNetwork& Network) {
if (Pckt.length() < 6)
return;
std::string Packet = Pckt;
char Code = Packet.at(1);
int PID = -1;
int VID = -1;
std::string Data = Packet.substr(3), pid, vid;
switch (Code) { // Spawned Destroyed Switched/Moved NotFound Reset
case 's':
beammp_tracef("got 'Os' packet: '{}' ({})", Packet, Packet.size());
if (Data.at(0) == '0') {
int CarID = c.GetOpenCarID();
beammp_debugf("'{}' created a car with ID {}", c.Name.get(), CarID);
std::string CarJson = Packet.substr(5);
Packet = "Os:" + c.Role.get() + ":" + c.Name.get() + ":" + std::to_string(c.ID.get()) + "-" + std::to_string(CarID) + ":" + CarJson;
auto Futures = LuaAPI::MP::Engine->TriggerEvent("onVehicleSpawn", "", c.ID.get(), CarID, Packet.substr(3));
TLuaEngine::WaitForAll(Futures);
bool ShouldntSpawn = std::any_of(Futures.begin(), Futures.end(),
[](const std::shared_ptr<TLuaResult>& Result) {
return !Result->Error && Result->Result.is<int>() && Result->Result.as<int>() != 0;
});
if (ShouldSpawn(c, CarJson, CarID) && !ShouldntSpawn) {
c.AddNewCar(CarID, Packet);
Network.SendToAll(nullptr, StringToVector(Packet), true, true);
} else {
if (!Network.Respond(c, StringToVector(Packet), true)) {
// TODO: handle
}
std::string Destroy = "Od:" + std::to_string(c.ID.get()) + "-" + std::to_string(CarID);
if (!Network.Respond(c, StringToVector(Destroy), true)) {
// TODO: handle
}
beammp_debugf("{} (force : car limit/lua) removed ID {}", c.Name.get(), CarID);
}
}
return;
case 'c': {
beammp_trace(std::string(("got 'Oc' packet: '")) + Packet + ("' (") + std::to_string(Packet.size()) + (")"));
auto MaybePidVid = GetPidVid(Data.substr(0, Data.find(':', 1)));
if (MaybePidVid) {
std::tie(PID, VID) = MaybePidVid.value();
}
if (PID != -1 && VID != -1 && PID == c.ID.get()) {
auto Futures = LuaAPI::MP::Engine->TriggerEvent("onVehicleEdited", "", c.ID.get(), VID, Packet.substr(3));
TLuaEngine::WaitForAll(Futures);
bool ShouldntAllow = std::any_of(Futures.begin(), Futures.end(),
[](const std::shared_ptr<TLuaResult>& Result) {
return !Result->Error && Result->Result.is<int>() && Result->Result.as<int>() != 0;
});
auto FoundPos = Packet.find('{');
FoundPos = FoundPos == std::string::npos ? 0 : FoundPos; // attempt at sanitizing this
auto UnicycleID = c.UnicycleID.synchronize();
if ((*UnicycleID != VID || IsUnicycle(c, Packet.substr(FoundPos)))
&& !ShouldntAllow) {
Network.SendToAll(&c, StringToVector(Packet), false, true);
Apply(c, VID, Packet);
} else {
if (*UnicycleID == VID) {
*UnicycleID = -1;
}
std::string Destroy = "Od:" + std::to_string(c.ID.get()) + "-" + std::to_string(VID);
Network.SendToAll(nullptr, StringToVector(Destroy), true, true);
c.DeleteCar(VID);
}
}
return;
}
case 'd': {
beammp_trace(std::string(("got 'Od' packet: '")) + Packet + ("' (") + std::to_string(Packet.size()) + (")"));
auto MaybePidVid = GetPidVid(Data.substr(0, Data.find(':', 1)));
if (MaybePidVid) {
std::tie(PID, VID) = MaybePidVid.value();
}
if (PID != -1 && VID != -1 && PID == c.ID.get()) {
auto UnicycleID = c.UnicycleID.synchronize();
if (*UnicycleID == VID) {
*UnicycleID = -1;
}
Network.SendToAll(nullptr, StringToVector(Packet), true, true);
// TODO: should this trigger on all vehicle deletions?
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent("onVehicleDeleted", "", c.ID.get(), VID));
c.DeleteCar(VID);
beammp_debug(c.Name.get() + (" deleted car with ID ") + std::to_string(VID));
}
return;
}
case 'r': {
beammp_trace(std::string(("got 'Or' packet: '")) + Packet + ("' (") + std::to_string(Packet.size()) + (")"));
auto MaybePidVid = GetPidVid(Data.substr(0, Data.find(':', 1)));
if (MaybePidVid) {
std::tie(PID, VID) = MaybePidVid.value();
}
if (PID != -1 && VID != -1 && PID == c.ID.get()) {
Data = Data.substr(Data.find('{'));
LuaAPI::MP::Engine->ReportErrors(LuaAPI::MP::Engine->TriggerEvent("onVehicleReset", "", c.ID.get(), VID, Data));
Network.SendToAll(&c, StringToVector(Packet), false, true);
}
return;
}
case 't':
beammp_trace(std::string(("got 'Ot' packet: '")) + Packet + ("' (") + std::to_string(Packet.size()) + (")"));
Network.SendToAll(&c, StringToVector(Packet), false, true);
return;
case 'm':
Network.SendToAll(&c, StringToVector(Packet), true, true);
return;
default:
beammp_trace(std::string(("possibly not implemented: '") + Packet + ("' (") + std::to_string(Packet.size()) + (")")));
return;
}
}
void TServer::Apply(TClient& c, int VID, const std::string& pckt) {
auto FoundPos = pckt.find('{');
if (FoundPos == std::string::npos) {
beammp_error("Malformed packet received, no '{' found");
return;
}
std::string Packet = pckt.substr(FoundPos);
std::string VD = c.GetCarData(VID);
if (VD.empty()) {
beammp_error("Tried to apply change to vehicle that does not exist");
return;
}
std::string Header = VD.substr(0, VD.find('{'));
FoundPos = VD.find('{');
if (FoundPos == std::string::npos) {
return;
}
VD = VD.substr(FoundPos);
rapidjson::Document Veh, Pack;
Veh.Parse(VD.c_str());
if (Veh.HasParseError()) {
beammp_error("Could not get vehicle config!");
return;
}
Pack.Parse(Packet.c_str());
if (Pack.HasParseError() || Pack.IsNull()) {
beammp_error("Could not get active vehicle config!");
return;
}
for (auto& M : Pack.GetObject()) {
if (Veh[M.name].IsNull()) {
Veh.AddMember(M.name, M.value, Veh.GetAllocator());
} else {
Veh[M.name] = Pack[M.name];
}
}
rapidjson::StringBuffer Buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(Buffer);
Veh.Accept(writer);
c.SetCarData(VID, Header + Buffer.GetString());
}
void TServer::InsertClient(const std::shared_ptr<TClient>& NewClient) {
beammp_debug("inserting client (" + std::to_string(ClientCount()) + ")");
WriteLock Lock(mClientsMutex); // TODO why is there 30+ threads locked here
(void)mClients.insert(NewClient);
}
struct PidVidData {
int PID;
int VID;
std::string Data;
};
static std::optional<PidVidData> ParsePositionPacket(const std::string& Packet) {
if (Packet.size() < 3) {
// invalid packet
return std::nullopt;
}
// Zp:PID-VID:DATA
std::string withoutCode = Packet.substr(3);
// parse veh ID
if (auto DataBeginPos = withoutCode.find('{'); DataBeginPos != std::string::npos && DataBeginPos != 0) {
// separator is :{, so position of { minus one
auto PidVidOnly = withoutCode.substr(0, DataBeginPos - 1);
auto MaybePidVid = GetPidVid(PidVidOnly);
if (MaybePidVid) {
int PID = -1;
int VID = -1;
// FIXME: check that the VID and PID are valid, so that we don't waste memory
std::tie(PID, VID) = MaybePidVid.value();
std::string Data = withoutCode.substr(DataBeginPos);
return PidVidData {
.PID = PID,
.VID = VID,
.Data = Data,
};
} else {
// invalid packet
return std::nullopt;
}
}
// invalid packet
return std::nullopt;
}
TEST_CASE("ParsePositionPacket") {
const auto TestData = R"({"tim":10.428000331623,"vel":[-2.4171722121385e-05,-9.7184734153252e-06,-7.6420763232237e-06],"rot":[-0.0001296154171915,0.0031575385950029,0.98994906610295,0.14138903660382],"rvel":[5.3640324636461e-05,-9.9824529946024e-05,5.1664064641372e-05],"pos":[-0.27281248907838,-0.20515357944633,0.49695488960431],"ping":0.032999999821186})";
SUBCASE("All the pids and vids") {
for (int pid = 0; pid < 100; ++pid) {
for (int vid = 0; vid < 100; ++vid) {
std::optional<PidVidData> MaybeRes = ParsePositionPacket(fmt::format("Zp:{}-{}:{}", pid, vid, TestData));
CHECK(MaybeRes.has_value());
CHECK_EQ(MaybeRes.value().PID, pid);
CHECK_EQ(MaybeRes.value().VID, vid);
CHECK_EQ(MaybeRes.value().Data, TestData);
}
}
}
}
void TServer::HandlePosition(TClient& c, const std::string& Packet) {
if (auto Parsed = ParsePositionPacket(Packet); Parsed.has_value()) {
c.SetCarPosition(Parsed.value().VID, Parsed.value().Data);
}
}

View File

@@ -6,14 +6,15 @@
"boost-spirit",
"boost-uuid",
"boost-variant",
"boost-iostreams",
"cpp-httplib",
"doctest",
"fmt",
"libzip",
"nlohmann-json",
"openssl",
"rapidjson",
"sol2",
"toml11"
"toml11",
"zstd"
]
}