Refactor downloading (#116)

The way it was done was so horrid, it was not only impossible to debug,
with TODO comments saying it sucks, and other shit like that, but it was
also just full of data races. You can rest easy however - I left most of
the data races in there <3 For nostalgia (totally not because it's a
massive pain to fix that).

We now do single-threaded download, which can not only saturate my 100
Mbit/s line without any hickups, it can also go up to ~600000 Mbit/s for
localhost transfers :) So I think it's fine.
This commit is contained in:
Lion 2024-09-22 20:04:45 +02:00 committed by GitHub
commit 007cd6573e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 74 additions and 54 deletions

View File

@ -57,7 +57,6 @@ std::string GetGamePath() {
std::string Ver = CheckVer(GetGameDir());
Ver = Ver.substr(0, Ver.find('.', Ver.find('.') + 1));
Path += Ver + "\\";
info("Game user path: '" + Path + "'");
return Path;
}
#elif defined(__linux__)
@ -70,7 +69,6 @@ std::string GetGamePath() {
std::string Ver = CheckVer(GetGameDir());
Ver = Ver.substr(0, Ver.find('.', Ver.find('.') + 1));
Path += Ver + "/";
info("Game user path: '" + Path + "'");
return Path;
}
#endif

View File

@ -7,6 +7,8 @@
///
#include "Network/network.hpp"
#include <chrono>
#include <mutex>
#if defined(_WIN32)
#include <ws2tcpip.h>
@ -129,29 +131,44 @@ void AsyncUpdate(uint64_t& Rcv, uint64_t Size, const std::string& Name) {
} while (!Terminate && Rcv < Size);
}
char* TCPRcvRaw(SOCKET Sock, uint64_t& GRcv, uint64_t Size) {
// MICROSOFT, I DONT CARE, WRITE BETTER CODE
#undef min
std::vector<char> TCPRcvRaw(SOCKET Sock, uint64_t& GRcv, uint64_t Size) {
if (Sock == -1) {
Terminate = true;
UUl("Invalid Socket");
return nullptr;
return {};
}
char* File = new char[Size];
std::vector<char> File(Size);
uint64_t Rcv = 0;
int i = 0;
do {
int Len = int(Size - Rcv);
if (Len > 1000000)
Len = 1000000;
auto start = std::chrono::high_resolution_clock::now();
// receive at most some MB at a time
int Len = std::min(int(Size - Rcv), 2 * 1024 * 1024);
int32_t Temp = recv(Sock, &File[Rcv], Len, MSG_WAITALL);
if (Temp < 1) {
info(std::to_string(Temp));
UUl("Socket Closed Code 1");
KillSocket(Sock);
Terminate = true;
delete[] File;
return nullptr;
return {};
}
Rcv += Temp;
GRcv += Temp;
// every 8th iteration calculate download speed for that iteration
if (i % 8 == 0) {
auto end = std::chrono::high_resolution_clock::now();
auto difference = end - start;
float bits_per_s = float(Temp * 8) / float(std::chrono::duration_cast<std::chrono::milliseconds>(difference).count());
float megabits_per_s = bits_per_s / 1000;
debug("Download speed: " + std::to_string(uint32_t(megabits_per_s)) + "Mbit/s");
}
++i;
} while (Rcv < Size && !Terminate);
return File;
}
@ -185,44 +202,41 @@ SOCKET InitDSock() {
return DSock;
}
std::string MultiDownload(SOCKET MSock, SOCKET DSock, uint64_t Size, const std::string& Name) {
std::vector<char> MultiDownload(SOCKET MSock, SOCKET DSock, uint64_t Size, const std::string& Name) {
uint64_t GRcv = 0;
uint64_t GRcv = 0, MSize = Size / 2, DSize = Size - MSize;
uint64_t MSize = Size / 2;
uint64_t DSize = Size - MSize;
std::thread Au(AsyncUpdate, std::ref(GRcv), Size, Name);
std::thread Au([&] { AsyncUpdate(GRcv, Size, Name); });
std::packaged_task<char*()> task([&] { return TCPRcvRaw(MSock, GRcv, MSize); });
std::future<char*> f1 = task.get_future();
std::thread Dt(std::move(task));
Dt.detach();
const std::vector<char> MData = TCPRcvRaw(MSock, GRcv, MSize);
char* DData = TCPRcvRaw(DSock, GRcv, DSize);
if (!DData) {
if (MData.empty()) {
MultiKill(MSock, DSock);
return "";
return {};
}
f1.wait();
char* MData = f1.get();
const std::vector<char> DData = TCPRcvRaw(DSock, GRcv, DSize);
if (!MData) {
if (DData.empty()) {
MultiKill(MSock, DSock);
return "";
return {};
}
if (Au.joinable())
Au.join();
// ensure that GRcv is good before joining the async update thread
GRcv = MData.size() + DData.size();
if (GRcv != Size) {
error("Something went wrong during download; didn't get enough data. Expected " + std::to_string(Size) + " bytes, got " + std::to_string(GRcv) + " bytes instead");
return {};
}
/// omg yes very ugly my god but i was in a rush will revisit
std::string Ret(Size, 0);
memcpy(&Ret[0], MData, MSize);
delete[] MData;
Au.join();
memcpy(&Ret[MSize], DData, DSize);
delete[] DData;
return Ret;
std::vector<char> Result{};
Result.insert(Result.begin(), MData.begin(), MData.end());
Result.insert(Result.end(), DData.begin(), DData.end());
return Result;
}
void InvalidResource(const std::string& File) {
@ -249,7 +263,7 @@ void SyncResources(SOCKET Sock) {
Ret.clear();
int Amount = 0, Pos = 0;
std::string a, t;
std::string PathToSaveTo, t;
for (const std::string& name : FNames) {
if (!name.empty()) {
t += name.substr(name.find_last_of('/') + 1) + ";";
@ -277,21 +291,23 @@ void SyncResources(SOCKET Sock) {
for (auto FN = FNames.begin(), FS = FSizes.begin(); FN != FNames.end() && !Terminate; ++FN, ++FS) {
auto pos = FN->find_last_of('/');
if (pos != std::string::npos) {
a = "Resources" + FN->substr(pos);
} else
PathToSaveTo = "Resources" + FN->substr(pos);
} else {
continue;
}
Pos++;
if (fs::exists(a)) {
auto FileSize = std::stoull(*FS);
if (fs::exists(PathToSaveTo)) {
if (FS->find_first_not_of("0123456789") != std::string::npos)
continue;
if (fs::file_size(a) == std::stoull(*FS)) {
UpdateUl(false, std::to_string(Pos) + "/" + std::to_string(Amount) + ": " + a.substr(a.find_last_of('/')));
if (fs::file_size(PathToSaveTo) == FileSize) {
UpdateUl(false, std::to_string(Pos) + "/" + std::to_string(Amount) + ": " + PathToSaveTo.substr(PathToSaveTo.find_last_of('/')));
std::this_thread::sleep_for(std::chrono::milliseconds(50));
try {
if (!fs::exists(GetGamePath() + "mods/multiplayer")) {
fs::create_directories(GetGamePath() + "mods/multiplayer");
}
auto modname = a.substr(a.find_last_of('/'));
auto modname = PathToSaveTo.substr(PathToSaveTo.find_last_of('/'));
#if defined(__linux__)
// Linux version of the game doesnt support uppercase letters in mod names
for (char& c : modname) {
@ -300,7 +316,7 @@ void SyncResources(SOCKET Sock) {
#endif
auto name = GetGamePath() + "mods/multiplayer" + modname;
auto tmp_name = name + ".tmp";
fs::copy_file(a, tmp_name, fs::copy_options::overwrite_existing);
fs::copy_file(PathToSaveTo, tmp_name, fs::copy_options::overwrite_existing);
fs::rename(tmp_name, name);
} catch (std::exception& e) {
error("Failed copy to the mods folder! " + std::string(e.what()));
@ -310,11 +326,12 @@ void SyncResources(SOCKET Sock) {
WaitForConfirm();
continue;
} else
remove(a.c_str());
remove(PathToSaveTo.c_str());
}
CheckForDir();
std::string FName = a.substr(a.find_last_of('/'));
std::string FName = PathToSaveTo.substr(PathToSaveTo.find_last_of('/'));
do {
debug("Loading file '" + FName + "' to '" + PathToSaveTo + "'");
TCPSend("f" + *FN, Sock);
std::string Data = TCPRcv(Sock);
@ -326,19 +343,23 @@ void SyncResources(SOCKET Sock) {
std::string Name = std::to_string(Pos) + "/" + std::to_string(Amount) + ": " + FName;
Data = MultiDownload(Sock, DSock, std::stoull(*FS), Name);
std::vector<char> DownloadedFile = MultiDownload(Sock, DSock, FileSize, Name);
if (Terminate)
break;
UpdateUl(false, std::to_string(Pos) + "/" + std::to_string(Amount) + ": " + FName);
std::ofstream LFS;
LFS.open(a.c_str(), std::ios_base::app | std::ios::binary);
if (LFS.is_open()) {
LFS.write(&Data[0], Data.size());
LFS.close();
}
} while (fs::file_size(a) != std::stoull(*FS) && !Terminate);
// 1. write downloaded file to disk
{
std::ofstream OutFile(PathToSaveTo, std::ios::binary | std::ios::trunc);
OutFile.write(DownloadedFile.data(), DownloadedFile.size());
}
// 2. verify size
if (std::filesystem::file_size(PathToSaveTo) != DownloadedFile.size()) {
error("Failed to write the entire file '" + PathToSaveTo + "' correctly (file size mismatch)");
Terminate = true;
}
} while (fs::file_size(PathToSaveTo) != std::stoull(*FS) && !Terminate);
if (!Terminate) {
if (!fs::exists(GetGamePath() + "mods/multiplayer")) {
fs::create_directories(GetGamePath() + "mods/multiplayer");
@ -351,7 +372,7 @@ void SyncResources(SOCKET Sock) {
}
#endif
fs::copy_file(a, GetGamePath() + "mods/multiplayer" + FName, fs::copy_options::overwrite_existing);
fs::copy_file(PathToSaveTo, GetGamePath() + "mods/multiplayer" + FName, fs::copy_options::overwrite_existing);
}
WaitForConfirm();
}

View File

@ -317,6 +317,7 @@ void PreGame(const std::string& GamePath) {
info("Game Version : " + GameVer);
CheckMP(GetGamePath() + "mods/multiplayer");
info("Game user path: " + GetGamePath());
if (!Dev) {
std::string LatestHash = HTTP::Get("https://backend.beammp.com/sha/mod?branch=" + Branch + "&pk=" + PublicKey);