rework UDP recv loop, add packet statistics tracking

The UDPServerMain was badly designed and had multiple potential bugs and
unexpected & silent failures. The changes simplify the code here, and
tracks those unexpected failures in app-wide statistics. Also added,
while at that, some more tracking of how many packets and bytes are
sent & received per client
This commit is contained in:
Lion Kortlepel
2022-10-26 14:13:05 +02:00
parent e77dfd5a57
commit 6842dccfc3
4 changed files with 64 additions and 21 deletions

View File

@@ -81,29 +81,39 @@ void TNetwork::UDPServerMain() {
try {
ip::udp::endpoint client {};
std::vector<uint8_t> Data = UDPRcvFromClient(client); // Receives any data from Socket
auto Pos = std::find(Data.begin(), Data.end(), ':');
if (Data.empty() || Pos > Data.begin() + 2)
// It has to be size>=2, because each UDP packet has a header of
// <id+1>:
// where id+1 is one byte.
// We discard a UDP packet if it doesn't start like this.
if (Data.size() < 2 || Data.at(1) != ':') {
++Application::MalformedUdpPackets;
continue;
}
// We do -1 here, because the launcher does +1
// because it uses (or used to use) null-terminated strings
// to represent packets. This would mean that player 0 would
// cause empty packets.
uint8_t ID = uint8_t(Data.at(0)) - 1;
mServer.ForEachClient([&](std::weak_ptr<TClient> ClientPtr) -> bool {
std::shared_ptr<TClient> Client;
{
ReadLock Lock(mServer.GetClientMutex());
if (!ClientPtr.expired()) {
Client = ClientPtr.lock();
} else
return true;
auto MaybeClient = GetClient(mServer, ID);
if (MaybeClient) {
try {
auto Locked = MaybeClient.value().lock();
if (Locked->IsConnected() && Locked->GetUDPAddr() != client) {
beammp_debugf("Client at {}:{} tried to send UDP for client {}", client.address().to_string(), client.port(), Locked->GetID());
++Application::InvalidUdpPackets;
continue;
} else {
Locked->SetUDPAddr(client);
Locked->SetIsConnected(true);
}
Locked->UdpReceived += Data.size();
++Locked->UdpPacketsReceived;
Data.erase(Data.begin() + 0, Data.begin() + 2);
TServer::GlobalParser(Locked, std::move(Data), mPPSMonitor, *this);
} catch (const std::exception&) {
++Application::InvalidUdpPackets;
}
if (Client->GetID() == ID) {
Client->SetUDPAddr(client);
Client->SetIsConnected(true);
Data.erase(Data.begin(), Data.begin() + 2);
TServer::GlobalParser(ClientPtr, std::move(Data), mPPSMonitor, *this);
}
return true;
});
}
} catch (const std::exception& e) {
beammp_error(("fatal: ") + std::string(e.what()));
}
@@ -214,6 +224,7 @@ void TNetwork::HandleDownload(TConnection&& Conn) {
std::shared_ptr<TClient> TNetwork::Authentication(TConnection&& RawConnection) {
auto Client = CreateClient(std::move(RawConnection.Socket));
Client->ConnectionTime = std::chrono::high_resolution_clock::now();
Client->SetIdentifier("ip", RawConnection.SockAddr.address().to_string());
beammp_tracef("This thread is ip {}", RawConnection.SockAddr.address().to_string());
@@ -371,6 +382,7 @@ bool TNetwork::TCPSend(TClient& c, const std::vector<uint8_t>& Data, bool IsSync
return false;
}
c.UpdatePingTime();
c.TcpSent += ToSend.size();
return true;
}
@@ -419,6 +431,7 @@ std::vector<uint8_t> TNetwork::TCPRcv(TClient& c) {
beammp_errorf("Expected to read {} bytes, instead got {}", Header, N);
}
c.TcpReceived += N + HeaderData.size();
constexpr std::string_view ABG = "ABG:";
if (Data.size() >= ABG.size() && std::equal(Data.begin(), Data.begin() + ABG.size(), ABG.begin(), ABG.end())) {
Data.erase(Data.begin(), Data.begin() + ABG.size());
@@ -787,6 +800,7 @@ bool TNetwork::TCPSendRaw(TClient& C, ip::tcp::socket& socket, const uint8_t* Da
return false;
}
C.UpdatePingTime();
C.TcpSent += Size;
return true;
}
@@ -929,6 +943,8 @@ bool TNetwork::UDPSend(TClient& Client, std::vector<uint8_t> Data) {
Client.Disconnect("UDP send failed");
return false;
}
Client.UdpSent += Data.size();
++Client.UdpPacketsSent;
return true;
}