From 303a619ece67f33a6c12dec4f644ca706de0541a Mon Sep 17 00:00:00 2001 From: Lion Kortlepel Date: Sat, 20 Jan 2024 19:20:33 +0100 Subject: [PATCH] implement async accept, async read for tcp --- include/Network.h | 56 +++++++++++++++----- src/Network.cpp | 127 ++++++++++++++++++++++++++++++++-------------- 2 files changed, 131 insertions(+), 52 deletions(-) diff --git a/include/Network.h b/include/Network.h index bbb03da..4b90e4d 100644 --- a/include/Network.h +++ b/include/Network.h @@ -6,6 +6,8 @@ #include "Sync.h" #include "Transport.h" #include +#include +#include #include #include #include @@ -43,7 +45,7 @@ struct Client { /// conjunction with something else. Blocks other writes. void tcp_write_file_raw(const std::filesystem::path& path); - Client(ClientID id, class Network& network, ip::tcp::socket&& tcp_sockem_udp_endpointst); + Client(ClientID id, class Network& network, ip::tcp::socket&& tcp_socket); ~Client(); ip::tcp::socket& tcp_socket() { return m_tcp_socket; } @@ -56,6 +58,15 @@ struct Client { const uint64_t udp_magic; private: + /// Timeout used for typical tcp reads. + boost::chrono::seconds m_read_timeout { 5 }; + /// Timeout used for typical tcp writes. + boost::chrono::seconds m_write_timeout { 5 }; + /// Timeout used for mod download tcp writes. + /// This is typically orders of magnitude larger + /// to allow for slow downloads. + boost::chrono::seconds m_download_write_timeout { 5 }; + void tcp_main(); std::mutex m_tcp_read_mtx; @@ -65,6 +76,9 @@ private: ip::tcp::socket m_tcp_socket; boost::scoped_thread<> m_tcp_thread; + + std::vector m_header { bmp::Header::SERIALIZED_SIZE }; + bmp::Packet m_packet{}; class Network& m_network; }; @@ -160,6 +174,14 @@ public: size_t vehicle_count() const; + /// To be called by accept() async handler once an accept() is completed. + void accept(); + + /// Gets the async i/o context of the network - can be used to "schedule" tasks on it. + boost::asio::thread_pool& context() { + return m_threadpool; + } + private: void handle_packet(ClientID id, const bmp::Packet& packet); @@ -171,6 +193,16 @@ private: void udp_read_main(); void tcp_listen_main(); + void handle_identification(ClientID id, const bmp::Packet& packet, std::shared_ptr& client); + + void handle_authentication(ClientID id, const bmp::Packet& packet, std::shared_ptr& client); + + /// On failure, throws an exception with the error for the client. + static void authenticate_user(const std::string& public_key, std::shared_ptr& client); + + /// Called by accept() once completed (completion handler). + void handle_accept(const boost::system::error_code& ec); + Sync> m_clients {}; Sync> m_vehicles {}; Sync> m_client_magics {}; @@ -184,18 +216,16 @@ private: return new_id; } + thread_pool m_threadpool { std::thread::hardware_concurrency() }; + Sync m_shutdown { false }; + + ip::udp::socket m_udp_socket { m_threadpool }; + + ip::tcp::socket m_tcp_listener { m_threadpool }; + ip::tcp::acceptor m_tcp_acceptor { m_threadpool }; + /// This socket gets accepted into, and is then moved. + ip::tcp::socket m_temp_socket { m_threadpool }; + boost::scoped_thread<> m_tcp_listen_thread; boost::scoped_thread<> m_udp_read_thread; - - io_context m_io {}; - thread_pool m_threadpool {}; - Sync m_shutdown { false }; - ip::udp::socket m_udp_socket { m_io }; - - void handle_identification(ClientID id, const bmp::Packet& packet, std::shared_ptr& client); - - void handle_authentication(ClientID id, const bmp::Packet& packet, std::shared_ptr& client); - - /// On failure, throws an exception with the error for the client. - static void authenticate_user(const std::string& public_key, std::shared_ptr& client); }; diff --git a/src/Network.cpp b/src/Network.cpp index 0715b9e..813849a 100644 --- a/src/Network.cpp +++ b/src/Network.cpp @@ -10,6 +10,7 @@ #include "ServerInfo.h" #include "TLuaEngine.h" #include "Util.h" +#include #include #include #include @@ -88,14 +89,45 @@ Client::~Client() { Client::Client(ClientID id, Network& network, ip::tcp::socket&& tcp_socket) : id(id) - , udp_magic(id ^ uint64_t(std::rand()) ^ uint64_t(this)) + , udp_magic(id ^ (uint64_t(std::rand()) << 32) ^ uint64_t(this)) , m_tcp_socket(std::forward(tcp_socket)) , m_network(network) { beammp_debugf("Client {} created", id); } void Client::start_tcp() { - m_tcp_thread = boost::scoped_thread<>(&Client::tcp_main, this); + beammp_tracef("{}", __func__); + async_read(m_tcp_socket, buffer(m_header), [this](const auto& ec, size_t) { + if (ec) { + beammp_errorf("TCP read() failed: {}", ec.message()); + m_network.disconnect(id, "read() failed"); + } else { + try { + bmp::Header hdr {}; + hdr.deserialize_from(m_header); + beammp_tracef("Got header with purpose {}, size {} from {}", int(hdr.purpose), hdr.size, id); + // delete previous packet if any exists + m_packet = {}; + m_packet.purpose = hdr.purpose; + m_packet.flags = hdr.flags; + m_packet.raw_data.resize(hdr.size); + async_read(m_tcp_socket, buffer(m_packet.raw_data), [this](const auto& ec, size_t bytes) { + if (ec) { + beammp_errorf("TCP read() failed: {}", ec.message()); + m_network.disconnect(id, "read() failed"); + } else { + beammp_tracef("Got body of size {} from {}", bytes, id); + m_network.handle_packet(id, m_packet); + // recv another packet! + start_tcp(); + } + }); + } catch(const std::exception& e) { + beammp_errorf("Error while processing TCP packet from client {}: {}", id, e.what()); + m_network.disconnect(id, "Failed to understand"); + } + } + }); } void Client::tcp_main() { @@ -134,8 +166,7 @@ void Network::udp_write(bmp::Packet& packet, const ip::udp::endpoint& to_ep) { Network::Network() : m_tcp_listen_thread(&Network::tcp_listen_main, this) - , m_udp_read_thread(&Network::udp_read_main, this) - , m_threadpool(std::thread::hardware_concurrency()) { + , m_udp_read_thread(&Network::udp_read_main, this) { Application::RegisterShutdownHandler([this] { *m_shutdown = true; m_tcp_listen_thread.interrupt(); @@ -161,24 +192,23 @@ Network::~Network() { void Network::tcp_listen_main() { Application::SetSubsystemStatus("TCP", Application::Status::Starting); ip::tcp::endpoint listen_ep(ip::address::from_string("0.0.0.0"), static_cast(Application::Settings.Port)); - ip::tcp::socket listener(m_threadpool); boost::system::error_code ec; - listener.open(listen_ep.protocol(), ec); + m_tcp_listener.open(listen_ep.protocol(), ec); if (ec) { beammp_errorf("Failed to open socket: {}", ec.message()); return; } socket_base::linger linger_opt {}; linger_opt.enabled(false); - listener.set_option(linger_opt, ec); + m_tcp_listener.set_option(linger_opt, ec); if (ec) { beammp_errorf("Failed to set up listening socket to not linger / reuse address. " "This may cause the socket to refuse to bind(). Error: {}", ec.message()); } - ip::tcp::acceptor acceptor(m_threadpool, listen_ep); - acceptor.listen(socket_base::max_listen_connections, ec); + m_tcp_acceptor = { m_threadpool, listen_ep }; + m_tcp_acceptor.listen(socket_base::max_listen_connections, ec); if (ec) { beammp_errorf("listen() failed, which is needed for the server to operate. " "Shutting down. Error: {}", @@ -186,24 +216,38 @@ void Network::tcp_listen_main() { Application::GracefullyShutdown(); } Application::SetSubsystemStatus("TCP", Application::Status::Good); - while (!*m_shutdown) { - auto new_socket = acceptor.accept(); - if (ec) { - beammp_errorf("Failed to accept client: {}", ec.message()); - continue; - } - // TODO: Remove log - beammp_debugf("New connection from {}", new_socket.remote_endpoint().address().to_string(), new_socket.remote_endpoint().port()); - auto new_id = new_client_id(); - std::shared_ptr new_client(std::make_shared(new_id, *this, std::move(new_socket))); - m_clients->emplace(new_id, new_client); - } + + // start an accept. this is async and will call itself repeatedly. + accept(); + + // wait for all tasks to complete + m_threadpool.join(); Application::SetSubsystemStatus("TCP", Application::Status::Shutdown); } +void Network::accept() { + // first create a socket! + m_temp_socket = ip::tcp::socket(m_threadpool); + // then use that client's tcp socket to accept into + m_tcp_acceptor.async_accept(m_temp_socket, [this](const auto& ec) { handle_accept(ec); }); +} + +void Network::handle_accept(const boost::system::error_code& ec) { + if (ec) { + beammp_errorf("Failed accepting new client: {}", ec.message()); + } else { + auto new_id = new_client_id(); + beammp_debugf("New connection from {}", m_temp_socket.remote_endpoint().address().to_string(), m_temp_socket.remote_endpoint().port()); + std::shared_ptr new_client(std::make_shared(new_id, *this, std::move(m_temp_socket))); + m_clients->emplace(new_id, new_client); + new_client->start_tcp(); + } + accept(); +} + void Network::udp_read_main() { Application::SetSubsystemStatus("UDP", Application::Status::Starting); - m_udp_socket = ip::udp::socket(m_io, ip::udp::endpoint(ip::udp::v4(), Application::Settings.Port)); + m_udp_socket = ip::udp::socket(m_threadpool, ip::udp::endpoint(ip::udp::v4(), Application::Settings.Port)); Application::SetSubsystemStatus("UDP", Application::Status::Good); while (!*m_shutdown) { try { @@ -260,25 +304,29 @@ void Network::udp_read_main() { } void Network::disconnect(ClientID id, const std::string& msg) { - beammp_infof("Disconnecting client {}: {}", id, msg); - // deadlock-free algorithm to acquire a lock on all these - // this is a little ugly but saves a headache here in the future - auto all = boost::synchronize(m_clients, m_udp_endpoints, m_client_magics); - auto& clients = std::get<0>(all); - auto& endpoints = std::get<1>(all); - auto& magics = std::get<2>(all); + // this has to be scheduled, because the thread which did this cannot!!! do it itself. + beammp_debugf("Scheduling disconnect for {}", id); + post(context(), [id, msg, this] { + beammp_infof("Disconnecting client {}: {}", id, msg); + // deadlock-free algorithm to acquire a lock on all these + // this is a little ugly but saves a headache here in the future + auto all = boost::synchronize(m_clients, m_udp_endpoints, m_client_magics); + auto& clients = std::get<0>(all); + auto& endpoints = std::get<1>(all); + auto& magics = std::get<2>(all); - if (clients->contains(id)) { - auto client = clients->at(id); - beammp_debugf("Removing client udp magic {}", client->udp_magic); - magics->erase(client->udp_magic); - } - std::erase_if(*endpoints, [&](const auto& item) { - const auto& [key, value] = item; - return value == id; + if (clients->contains(id)) { + auto client = clients->at(id); + beammp_debugf("Removing client udp magic {}", client->udp_magic); + magics->erase(client->udp_magic); + } + std::erase_if(*endpoints, [&](const auto& item) { + const auto& [key, value] = item; + return value == id; + }); + // TODO: Despawn vehicles owned by this player + clients->erase(id); }); - // TODO: Despawn vehicles owned by this player - clients->erase(id); } std::unordered_map Network::playing_clients() const { @@ -323,6 +371,7 @@ void Network::handle_packet(ClientID id, const bmp::Packet& packet) { } client = clients->at(id); } + beammp_tracef("Client {} is in state {}", int(id), int(client->state.get())); switch (*client->state) { case bmp::State::None: // move to identification