partially implement queuing while sycing client

This commit is contained in:
Lion Kortlepel 2021-02-28 14:39:48 +01:00 committed by Anonymous275
parent 4edd1ac100
commit 7cd420a1a5
4 changed files with 28 additions and 12 deletions

View File

@ -2,6 +2,7 @@
#include <chrono> #include <chrono>
#include <memory> #include <memory>
#include <queue>
#include <string> #include <string>
#include <unordered_set> #include <unordered_set>
@ -46,9 +47,13 @@ public:
int GetID() const { return mID; } int GetID() const { return mID; }
bool IsConnected() const { return mIsConnected; } bool IsConnected() const { return mIsConnected; }
bool IsSynced() const { return mIsSynced; } bool IsSynced() const { return mIsSynced; }
bool IsSyncing() const { return mIsSyncing; }
bool IsGuest() const { return mIsGuest; } bool IsGuest() const { return mIsGuest; }
void SetIsGuest(bool NewIsGuest) { mIsGuest = NewIsGuest; } void SetIsGuest(bool NewIsGuest) { mIsGuest = NewIsGuest; }
void SetIsSynced(bool NewIsSynced) { mIsSynced = NewIsSynced; } void SetIsSynced(bool NewIsSynced) { mIsSynced = NewIsSynced; }
void SetIsSyncing(bool NewIsSyncing) { mIsSyncing = NewIsSyncing; }
void EnqueueMissedPacketDuringSyncing(const std::string& Packet) { mMissedPacketsDuringSyncing.push(Packet); }
size_t MissedPacketQueueSize() const { return mMissedPacketsDuringSyncing.size(); }
void SetIsConnected(bool NewIsConnected) { mIsConnected = NewIsConnected; } void SetIsConnected(bool NewIsConnected) { mIsConnected = NewIsConnected; }
TServer& Server() const; TServer& Server() const;
void UpdatePingTime(); void UpdatePingTime();
@ -58,6 +63,8 @@ private:
TServer& mServer; TServer& mServer;
bool mIsConnected = false; bool mIsConnected = false;
bool mIsSynced = false; bool mIsSynced = false;
bool mIsSyncing = false;
std::queue<std::string> mMissedPacketsDuringSyncing;
bool mIsGuest = false; bool mIsGuest = false;
std::mutex mVehicleDataMutex; std::mutex mVehicleDataMutex;
TSetOfVehicleData mVehicleData; TSetOfVehicleData mVehicleData;

View File

@ -16,7 +16,7 @@ public:
void operator()() override; void operator()() override;
bool TCPSend(TClient& c, const std::string& Data); bool TCPSend(TClient& c, const std::string& Data, bool IsSync = false);
void SendLarge(TClient& c, std::string Data); void SendLarge(TClient& c, std::string Data);
void Respond(TClient& c, const std::string& MSG, bool Rel); void Respond(TClient& c, const std::string& MSG, bool Rel);
std::shared_ptr<TClient> CreateClient(SOCKET TCPSock); std::shared_ptr<TClient> CreateClient(SOCKET TCPSock);

View File

@ -80,7 +80,7 @@ void TTCPServer::Authentication(SOCKET TCPSock) {
ClientKick(*Client, "Invalid version header!"); ClientKick(*Client, "Invalid version header!");
return; return;
} }
TCPSend(*Client, "S"); TCPSend(*Client, "S", false);
Rc = TCPRcv(*Client); Rc = TCPRcv(*Client);
@ -153,7 +153,12 @@ std::shared_ptr<TClient> TTCPServer::CreateClient(SOCKET TCPSock) {
return c; return c;
} }
bool TTCPServer::TCPSend(TClient& c, const std::string& Data) { bool TTCPServer::TCPSend(TClient& c, const std::string& Data, bool IsSync) {
if (c.IsSyncing() && !IsSync) {
c.EnqueueMissedPacketDuringSyncing(Data);
return true;
} else if (!c.IsSyncing() && c.IsSynced() && c.MissedPacketQueueSize() != 0) {
}
int32_t Size, Sent; int32_t Size, Sent;
std::string Send(4, 0); std::string Send(4, 0);
Size = int32_t(Data.size()); Size = int32_t(Data.size());
@ -265,7 +270,7 @@ std::string TTCPServer::TCPRcv(TClient& c) {
void TTCPServer::ClientKick(TClient& c, const std::string& R) { void TTCPServer::ClientKick(TClient& c, const std::string& R) {
info("Client kicked: " + R); info("Client kicked: " + R);
TCPSend(c, "E" + R); TCPSend(c, "E" + R, false);
c.SetStatus(-2); c.SetStatus(-2);
CloseSocketProper(c.GetTCPSock()); CloseSocketProper(c.GetTCPSock());
} }
@ -374,7 +379,7 @@ void TTCPServer::SyncResources(TClient& c) {
#ifndef DEBUG #ifndef DEBUG
try { try {
#endif #endif
TCPSend(c, "P" + std::to_string(c.GetID())); TCPSend(c, "P" + std::to_string(c.GetID()), false);
std::string Data; std::string Data;
while (c.GetStatus() > -1) { while (c.GetStatus() > -1) {
Data = TCPRcv(c); Data = TCPRcv(c);
@ -406,7 +411,7 @@ void TTCPServer::Parse(TClient& c, const std::string& Packet) {
std::string ToSend = mResourceManager.FileList() + mResourceManager.FileSizes(); std::string ToSend = mResourceManager.FileList() + mResourceManager.FileSizes();
if (ToSend.empty()) if (ToSend.empty())
ToSend = "-"; ToSend = "-";
TCPSend(c, ToSend); TCPSend(c, ToSend, false);
} }
return; return;
default: default:
@ -418,11 +423,11 @@ void TTCPServer::SendFile(TClient& c, const std::string& Name) {
info(c.GetName() + " requesting : " + Name.substr(Name.find_last_of('/'))); info(c.GetName() + " requesting : " + Name.substr(Name.find_last_of('/')));
if (!std::filesystem::exists(Name)) { if (!std::filesystem::exists(Name)) {
TCPSend(c, "CO"); TCPSend(c, "CO", false);
warn("File " + Name + " could not be accessed!"); warn("File " + Name + " could not be accessed!");
return; return;
} else } else
TCPSend(c, "AG"); TCPSend(c, "AG", false);
///Wait for connections ///Wait for connections
int T = 0; int T = 0;
@ -515,7 +520,7 @@ void TTCPServer::SendLarge(TClient& c, std::string Data) {
std::string CMP(Comp(Data)); std::string CMP(Comp(Data));
Data = "ABG:" + CMP; Data = "ABG:" + CMP;
} }
TCPSend(c, Data); TCPSend(c, Data, false);
} }
void TTCPServer::Respond(TClient& c, const std::string& MSG, bool Rel) { void TTCPServer::Respond(TClient& c, const std::string& MSG, bool Rel) {
@ -524,7 +529,7 @@ void TTCPServer::Respond(TClient& c, const std::string& MSG, bool Rel) {
if (C == 'O' || C == 'T' || MSG.length() > 1000) { if (C == 'O' || C == 'T' || MSG.length() > 1000) {
SendLarge(c, MSG); SendLarge(c, MSG);
} else { } else {
TCPSend(c, MSG); TCPSend(c, MSG, false);
} }
} else { } else {
UDPServer().UDPSend(c, MSG); UDPServer().UDPSend(c, MSG);
@ -538,12 +543,14 @@ void TTCPServer::SyncClient(const std::weak_ptr<TClient>& c) {
auto LockedClient = c.lock(); auto LockedClient = c.lock();
if (LockedClient->IsSynced()) if (LockedClient->IsSynced())
return; return;
LockedClient->SetIsSynced(true); // Syncing, later set isSynced
// after syncing is done, we apply all packets they missed
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
Respond(*LockedClient, ("Sn") + LockedClient->GetName(), true); Respond(*LockedClient, ("Sn") + LockedClient->GetName(), true);
UDPServer().SendToAll(LockedClient.get(), ("JWelcome ") + LockedClient->GetName() + "!", false, true); UDPServer().SendToAll(LockedClient.get(), ("JWelcome ") + LockedClient->GetName() + "!", false, true);
TriggerLuaEvent(("onPlayerJoin"), false, nullptr, std::make_unique<TLuaArg>(TLuaArg { { LockedClient->GetID() } }), false); TriggerLuaEvent(("onPlayerJoin"), false, nullptr, std::make_unique<TLuaArg>(TLuaArg { { LockedClient->GetID() } }), false);
bool Return = false; bool Return = false;
LockedClient->SetIsSyncing(true);
mServer.ForEachClient([&](const std::weak_ptr<TClient>& ClientPtr) -> bool { mServer.ForEachClient([&](const std::weak_ptr<TClient>& ClientPtr) -> bool {
if (!ClientPtr.expired()) { if (!ClientPtr.expired()) {
auto client = ClientPtr.lock(); auto client = ClientPtr.lock();
@ -565,9 +572,11 @@ void TTCPServer::SyncClient(const std::weak_ptr<TClient>& c) {
} }
return true; return true;
}); });
LockedClient->SetIsSyncing(false);
if (Return) { if (Return) {
return; return;
} }
LockedClient->SetIsSynced(true);
info(LockedClient->GetName() + (" is now synced!")); info(LockedClient->GetName() + (" is now synced!"));
} }

View File

@ -102,7 +102,7 @@ void TUDPServer::SendToAll(TClient* c, const std::string& Data, bool Self, bool
if (C == 'O' || C == 'T' || Data.length() > 1000) if (C == 'O' || C == 'T' || Data.length() > 1000)
mTCPServer.SendLarge(*Client, Data); mTCPServer.SendLarge(*Client, Data);
else else
mTCPServer.TCPSend(*Client, Data); mTCPServer.TCPSend(*Client, Data, false);
} else } else
UDPSend(*Client, Data); UDPSend(*Client, Data);
} }