mirror of
https://github.com/BeamMP/BeamMP-Server.git
synced 2025-07-04 00:36:14 +00:00
implement async accept, async read for tcp
This commit is contained in:
parent
24723c01da
commit
303a619ece
@ -6,6 +6,8 @@
|
|||||||
#include "Sync.h"
|
#include "Sync.h"
|
||||||
#include "Transport.h"
|
#include "Transport.h"
|
||||||
#include <boost/asio.hpp>
|
#include <boost/asio.hpp>
|
||||||
|
#include <boost/asio/execution_context.hpp>
|
||||||
|
#include <boost/asio/thread_pool.hpp>
|
||||||
#include <boost/thread/scoped_thread.hpp>
|
#include <boost/thread/scoped_thread.hpp>
|
||||||
#include <boost/thread/synchronized_value.hpp>
|
#include <boost/thread/synchronized_value.hpp>
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
@ -43,7 +45,7 @@ struct Client {
|
|||||||
/// conjunction with something else. Blocks other writes.
|
/// conjunction with something else. Blocks other writes.
|
||||||
void tcp_write_file_raw(const std::filesystem::path& path);
|
void tcp_write_file_raw(const std::filesystem::path& path);
|
||||||
|
|
||||||
Client(ClientID id, class Network& network, ip::tcp::socket&& tcp_sockem_udp_endpointst);
|
Client(ClientID id, class Network& network, ip::tcp::socket&& tcp_socket);
|
||||||
~Client();
|
~Client();
|
||||||
|
|
||||||
ip::tcp::socket& tcp_socket() { return m_tcp_socket; }
|
ip::tcp::socket& tcp_socket() { return m_tcp_socket; }
|
||||||
@ -56,6 +58,15 @@ struct Client {
|
|||||||
const uint64_t udp_magic;
|
const uint64_t udp_magic;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
/// Timeout used for typical tcp reads.
|
||||||
|
boost::chrono::seconds m_read_timeout { 5 };
|
||||||
|
/// Timeout used for typical tcp writes.
|
||||||
|
boost::chrono::seconds m_write_timeout { 5 };
|
||||||
|
/// Timeout used for mod download tcp writes.
|
||||||
|
/// This is typically orders of magnitude larger
|
||||||
|
/// to allow for slow downloads.
|
||||||
|
boost::chrono::seconds m_download_write_timeout { 5 };
|
||||||
|
|
||||||
void tcp_main();
|
void tcp_main();
|
||||||
|
|
||||||
std::mutex m_tcp_read_mtx;
|
std::mutex m_tcp_read_mtx;
|
||||||
@ -66,6 +77,9 @@ private:
|
|||||||
|
|
||||||
boost::scoped_thread<> m_tcp_thread;
|
boost::scoped_thread<> m_tcp_thread;
|
||||||
|
|
||||||
|
std::vector<uint8_t> m_header { bmp::Header::SERIALIZED_SIZE };
|
||||||
|
bmp::Packet m_packet{};
|
||||||
|
|
||||||
class Network& m_network;
|
class Network& m_network;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -160,6 +174,14 @@ public:
|
|||||||
|
|
||||||
size_t vehicle_count() const;
|
size_t vehicle_count() const;
|
||||||
|
|
||||||
|
/// To be called by accept() async handler once an accept() is completed.
|
||||||
|
void accept();
|
||||||
|
|
||||||
|
/// Gets the async i/o context of the network - can be used to "schedule" tasks on it.
|
||||||
|
boost::asio::thread_pool& context() {
|
||||||
|
return m_threadpool;
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void handle_packet(ClientID id, const bmp::Packet& packet);
|
void handle_packet(ClientID id, const bmp::Packet& packet);
|
||||||
|
|
||||||
@ -171,6 +193,16 @@ private:
|
|||||||
void udp_read_main();
|
void udp_read_main();
|
||||||
void tcp_listen_main();
|
void tcp_listen_main();
|
||||||
|
|
||||||
|
void handle_identification(ClientID id, const bmp::Packet& packet, std::shared_ptr<Client>& client);
|
||||||
|
|
||||||
|
void handle_authentication(ClientID id, const bmp::Packet& packet, std::shared_ptr<Client>& client);
|
||||||
|
|
||||||
|
/// On failure, throws an exception with the error for the client.
|
||||||
|
static void authenticate_user(const std::string& public_key, std::shared_ptr<Client>& client);
|
||||||
|
|
||||||
|
/// Called by accept() once completed (completion handler).
|
||||||
|
void handle_accept(const boost::system::error_code& ec);
|
||||||
|
|
||||||
Sync<std::unordered_map<ClientID, Client::Ptr>> m_clients {};
|
Sync<std::unordered_map<ClientID, Client::Ptr>> m_clients {};
|
||||||
Sync<std::unordered_map<VehicleID, Vehicle::Ptr>> m_vehicles {};
|
Sync<std::unordered_map<VehicleID, Vehicle::Ptr>> m_vehicles {};
|
||||||
Sync<std::unordered_map<uint64_t, ClientID>> m_client_magics {};
|
Sync<std::unordered_map<uint64_t, ClientID>> m_client_magics {};
|
||||||
@ -184,18 +216,16 @@ private:
|
|||||||
return new_id;
|
return new_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
thread_pool m_threadpool { std::thread::hardware_concurrency() };
|
||||||
|
Sync<bool> m_shutdown { false };
|
||||||
|
|
||||||
|
ip::udp::socket m_udp_socket { m_threadpool };
|
||||||
|
|
||||||
|
ip::tcp::socket m_tcp_listener { m_threadpool };
|
||||||
|
ip::tcp::acceptor m_tcp_acceptor { m_threadpool };
|
||||||
|
/// This socket gets accepted into, and is then moved.
|
||||||
|
ip::tcp::socket m_temp_socket { m_threadpool };
|
||||||
|
|
||||||
boost::scoped_thread<> m_tcp_listen_thread;
|
boost::scoped_thread<> m_tcp_listen_thread;
|
||||||
boost::scoped_thread<> m_udp_read_thread;
|
boost::scoped_thread<> m_udp_read_thread;
|
||||||
|
|
||||||
io_context m_io {};
|
|
||||||
thread_pool m_threadpool {};
|
|
||||||
Sync<bool> m_shutdown { false };
|
|
||||||
ip::udp::socket m_udp_socket { m_io };
|
|
||||||
|
|
||||||
void handle_identification(ClientID id, const bmp::Packet& packet, std::shared_ptr<Client>& client);
|
|
||||||
|
|
||||||
void handle_authentication(ClientID id, const bmp::Packet& packet, std::shared_ptr<Client>& client);
|
|
||||||
|
|
||||||
/// On failure, throws an exception with the error for the client.
|
|
||||||
static void authenticate_user(const std::string& public_key, std::shared_ptr<Client>& client);
|
|
||||||
};
|
};
|
||||||
|
@ -10,6 +10,7 @@
|
|||||||
#include "ServerInfo.h"
|
#include "ServerInfo.h"
|
||||||
#include "TLuaEngine.h"
|
#include "TLuaEngine.h"
|
||||||
#include "Util.h"
|
#include "Util.h"
|
||||||
|
#include <boost/asio/read.hpp>
|
||||||
#include <boost/thread/synchronized_value.hpp>
|
#include <boost/thread/synchronized_value.hpp>
|
||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
#include <nlohmann/json.hpp>
|
#include <nlohmann/json.hpp>
|
||||||
@ -88,14 +89,45 @@ Client::~Client() {
|
|||||||
|
|
||||||
Client::Client(ClientID id, Network& network, ip::tcp::socket&& tcp_socket)
|
Client::Client(ClientID id, Network& network, ip::tcp::socket&& tcp_socket)
|
||||||
: id(id)
|
: id(id)
|
||||||
, udp_magic(id ^ uint64_t(std::rand()) ^ uint64_t(this))
|
, udp_magic(id ^ (uint64_t(std::rand()) << 32) ^ uint64_t(this))
|
||||||
, m_tcp_socket(std::forward<ip::tcp::socket&&>(tcp_socket))
|
, m_tcp_socket(std::forward<ip::tcp::socket&&>(tcp_socket))
|
||||||
, m_network(network) {
|
, m_network(network) {
|
||||||
beammp_debugf("Client {} created", id);
|
beammp_debugf("Client {} created", id);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Client::start_tcp() {
|
void Client::start_tcp() {
|
||||||
m_tcp_thread = boost::scoped_thread<>(&Client::tcp_main, this);
|
beammp_tracef("{}", __func__);
|
||||||
|
async_read(m_tcp_socket, buffer(m_header), [this](const auto& ec, size_t) {
|
||||||
|
if (ec) {
|
||||||
|
beammp_errorf("TCP read() failed: {}", ec.message());
|
||||||
|
m_network.disconnect(id, "read() failed");
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
bmp::Header hdr {};
|
||||||
|
hdr.deserialize_from(m_header);
|
||||||
|
beammp_tracef("Got header with purpose {}, size {} from {}", int(hdr.purpose), hdr.size, id);
|
||||||
|
// delete previous packet if any exists
|
||||||
|
m_packet = {};
|
||||||
|
m_packet.purpose = hdr.purpose;
|
||||||
|
m_packet.flags = hdr.flags;
|
||||||
|
m_packet.raw_data.resize(hdr.size);
|
||||||
|
async_read(m_tcp_socket, buffer(m_packet.raw_data), [this](const auto& ec, size_t bytes) {
|
||||||
|
if (ec) {
|
||||||
|
beammp_errorf("TCP read() failed: {}", ec.message());
|
||||||
|
m_network.disconnect(id, "read() failed");
|
||||||
|
} else {
|
||||||
|
beammp_tracef("Got body of size {} from {}", bytes, id);
|
||||||
|
m_network.handle_packet(id, m_packet);
|
||||||
|
// recv another packet!
|
||||||
|
start_tcp();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch(const std::exception& e) {
|
||||||
|
beammp_errorf("Error while processing TCP packet from client {}: {}", id, e.what());
|
||||||
|
m_network.disconnect(id, "Failed to understand");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void Client::tcp_main() {
|
void Client::tcp_main() {
|
||||||
@ -134,8 +166,7 @@ void Network::udp_write(bmp::Packet& packet, const ip::udp::endpoint& to_ep) {
|
|||||||
|
|
||||||
Network::Network()
|
Network::Network()
|
||||||
: m_tcp_listen_thread(&Network::tcp_listen_main, this)
|
: m_tcp_listen_thread(&Network::tcp_listen_main, this)
|
||||||
, m_udp_read_thread(&Network::udp_read_main, this)
|
, m_udp_read_thread(&Network::udp_read_main, this) {
|
||||||
, m_threadpool(std::thread::hardware_concurrency()) {
|
|
||||||
Application::RegisterShutdownHandler([this] {
|
Application::RegisterShutdownHandler([this] {
|
||||||
*m_shutdown = true;
|
*m_shutdown = true;
|
||||||
m_tcp_listen_thread.interrupt();
|
m_tcp_listen_thread.interrupt();
|
||||||
@ -161,24 +192,23 @@ Network::~Network() {
|
|||||||
void Network::tcp_listen_main() {
|
void Network::tcp_listen_main() {
|
||||||
Application::SetSubsystemStatus("TCP", Application::Status::Starting);
|
Application::SetSubsystemStatus("TCP", Application::Status::Starting);
|
||||||
ip::tcp::endpoint listen_ep(ip::address::from_string("0.0.0.0"), static_cast<uint16_t>(Application::Settings.Port));
|
ip::tcp::endpoint listen_ep(ip::address::from_string("0.0.0.0"), static_cast<uint16_t>(Application::Settings.Port));
|
||||||
ip::tcp::socket listener(m_threadpool);
|
|
||||||
boost::system::error_code ec;
|
boost::system::error_code ec;
|
||||||
listener.open(listen_ep.protocol(), ec);
|
m_tcp_listener.open(listen_ep.protocol(), ec);
|
||||||
if (ec) {
|
if (ec) {
|
||||||
beammp_errorf("Failed to open socket: {}", ec.message());
|
beammp_errorf("Failed to open socket: {}", ec.message());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
socket_base::linger linger_opt {};
|
socket_base::linger linger_opt {};
|
||||||
linger_opt.enabled(false);
|
linger_opt.enabled(false);
|
||||||
listener.set_option(linger_opt, ec);
|
m_tcp_listener.set_option(linger_opt, ec);
|
||||||
if (ec) {
|
if (ec) {
|
||||||
beammp_errorf("Failed to set up listening socket to not linger / reuse address. "
|
beammp_errorf("Failed to set up listening socket to not linger / reuse address. "
|
||||||
"This may cause the socket to refuse to bind(). Error: {}",
|
"This may cause the socket to refuse to bind(). Error: {}",
|
||||||
ec.message());
|
ec.message());
|
||||||
}
|
}
|
||||||
|
|
||||||
ip::tcp::acceptor acceptor(m_threadpool, listen_ep);
|
m_tcp_acceptor = { m_threadpool, listen_ep };
|
||||||
acceptor.listen(socket_base::max_listen_connections, ec);
|
m_tcp_acceptor.listen(socket_base::max_listen_connections, ec);
|
||||||
if (ec) {
|
if (ec) {
|
||||||
beammp_errorf("listen() failed, which is needed for the server to operate. "
|
beammp_errorf("listen() failed, which is needed for the server to operate. "
|
||||||
"Shutting down. Error: {}",
|
"Shutting down. Error: {}",
|
||||||
@ -186,24 +216,38 @@ void Network::tcp_listen_main() {
|
|||||||
Application::GracefullyShutdown();
|
Application::GracefullyShutdown();
|
||||||
}
|
}
|
||||||
Application::SetSubsystemStatus("TCP", Application::Status::Good);
|
Application::SetSubsystemStatus("TCP", Application::Status::Good);
|
||||||
while (!*m_shutdown) {
|
|
||||||
auto new_socket = acceptor.accept();
|
// start an accept. this is async and will call itself repeatedly.
|
||||||
if (ec) {
|
accept();
|
||||||
beammp_errorf("Failed to accept client: {}", ec.message());
|
|
||||||
continue;
|
// wait for all tasks to complete
|
||||||
}
|
m_threadpool.join();
|
||||||
// TODO: Remove log
|
|
||||||
beammp_debugf("New connection from {}", new_socket.remote_endpoint().address().to_string(), new_socket.remote_endpoint().port());
|
|
||||||
auto new_id = new_client_id();
|
|
||||||
std::shared_ptr<Client> new_client(std::make_shared<Client>(new_id, *this, std::move(new_socket)));
|
|
||||||
m_clients->emplace(new_id, new_client);
|
|
||||||
}
|
|
||||||
Application::SetSubsystemStatus("TCP", Application::Status::Shutdown);
|
Application::SetSubsystemStatus("TCP", Application::Status::Shutdown);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Network::accept() {
|
||||||
|
// first create a socket!
|
||||||
|
m_temp_socket = ip::tcp::socket(m_threadpool);
|
||||||
|
// then use that client's tcp socket to accept into
|
||||||
|
m_tcp_acceptor.async_accept(m_temp_socket, [this](const auto& ec) { handle_accept(ec); });
|
||||||
|
}
|
||||||
|
|
||||||
|
void Network::handle_accept(const boost::system::error_code& ec) {
|
||||||
|
if (ec) {
|
||||||
|
beammp_errorf("Failed accepting new client: {}", ec.message());
|
||||||
|
} else {
|
||||||
|
auto new_id = new_client_id();
|
||||||
|
beammp_debugf("New connection from {}", m_temp_socket.remote_endpoint().address().to_string(), m_temp_socket.remote_endpoint().port());
|
||||||
|
std::shared_ptr<Client> new_client(std::make_shared<Client>(new_id, *this, std::move(m_temp_socket)));
|
||||||
|
m_clients->emplace(new_id, new_client);
|
||||||
|
new_client->start_tcp();
|
||||||
|
}
|
||||||
|
accept();
|
||||||
|
}
|
||||||
|
|
||||||
void Network::udp_read_main() {
|
void Network::udp_read_main() {
|
||||||
Application::SetSubsystemStatus("UDP", Application::Status::Starting);
|
Application::SetSubsystemStatus("UDP", Application::Status::Starting);
|
||||||
m_udp_socket = ip::udp::socket(m_io, ip::udp::endpoint(ip::udp::v4(), Application::Settings.Port));
|
m_udp_socket = ip::udp::socket(m_threadpool, ip::udp::endpoint(ip::udp::v4(), Application::Settings.Port));
|
||||||
Application::SetSubsystemStatus("UDP", Application::Status::Good);
|
Application::SetSubsystemStatus("UDP", Application::Status::Good);
|
||||||
while (!*m_shutdown) {
|
while (!*m_shutdown) {
|
||||||
try {
|
try {
|
||||||
@ -260,6 +304,9 @@ void Network::udp_read_main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void Network::disconnect(ClientID id, const std::string& msg) {
|
void Network::disconnect(ClientID id, const std::string& msg) {
|
||||||
|
// this has to be scheduled, because the thread which did this cannot!!! do it itself.
|
||||||
|
beammp_debugf("Scheduling disconnect for {}", id);
|
||||||
|
post(context(), [id, msg, this] {
|
||||||
beammp_infof("Disconnecting client {}: {}", id, msg);
|
beammp_infof("Disconnecting client {}: {}", id, msg);
|
||||||
// deadlock-free algorithm to acquire a lock on all these
|
// deadlock-free algorithm to acquire a lock on all these
|
||||||
// this is a little ugly but saves a headache here in the future
|
// this is a little ugly but saves a headache here in the future
|
||||||
@ -279,6 +326,7 @@ void Network::disconnect(ClientID id, const std::string& msg) {
|
|||||||
});
|
});
|
||||||
// TODO: Despawn vehicles owned by this player
|
// TODO: Despawn vehicles owned by this player
|
||||||
clients->erase(id);
|
clients->erase(id);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unordered_map<ClientID, Client::Ptr> Network::playing_clients() const {
|
std::unordered_map<ClientID, Client::Ptr> Network::playing_clients() const {
|
||||||
@ -323,6 +371,7 @@ void Network::handle_packet(ClientID id, const bmp::Packet& packet) {
|
|||||||
}
|
}
|
||||||
client = clients->at(id);
|
client = clients->at(id);
|
||||||
}
|
}
|
||||||
|
beammp_tracef("Client {} is in state {}", int(id), int(client->state.get()));
|
||||||
switch (*client->state) {
|
switch (*client->state) {
|
||||||
case bmp::State::None:
|
case bmp::State::None:
|
||||||
// move to identification
|
// move to identification
|
||||||
|
Loading…
x
Reference in New Issue
Block a user