implement an outbox to ensure only one write() at a time

This commit is contained in:
Lion Kortlepel 2024-03-11 21:49:49 +01:00
parent e836ad5133
commit d11262f38d
No known key found for this signature in database
GPG Key ID: 4322FF2B4C71259B
2 changed files with 42 additions and 8 deletions

View File

@ -363,7 +363,7 @@ void ClientNetwork::handle_server_leaving(bmp::ClientPacket& packet) {
void ClientNetwork::client_tcp_read(std::function<void(bmp::ClientPacket&&)> handler) {
m_tmp_header_buffer.resize(bmp::ClientHeader::SERIALIZED_SIZE);
boost::asio::async_read(m_game_socket, buffer(m_tmp_header_buffer),
bind_executor(m_strand, [this, handler](auto ec, auto) {
bind_executor(m_read_strand, [this, handler](auto ec, auto) {
if (ec) {
disconnect(fmt::format("Failed to read from game: {}", ec.message()));
} else {
@ -379,7 +379,7 @@ void ClientNetwork::client_tcp_read(std::function<void(bmp::ClientPacket&&)> han
m_tmp_packet.pid, m_tmp_packet.vid,
m_tmp_packet.get_readable_data().size());
boost::asio::async_read(m_game_socket, buffer(m_tmp_packet.raw_data),
bind_executor(m_strand, [handler, this](auto ec, auto) {
bind_executor(m_read_strand, [handler, this](auto ec, auto) {
if (ec) {
disconnect(fmt::format("Failed to read from game: {}", ec.message()));
} else {
@ -392,9 +392,27 @@ void ClientNetwork::client_tcp_read(std::function<void(bmp::ClientPacket&&)> han
}
void ClientNetwork::client_tcp_write(bmp::ClientPacket&& packet, std::function<void(boost::system::error_code)> handler) {
auto header = packet.finalize();
boost::asio::post(m_write_strand, [this, packet = std::move(packet), handler = std::move(handler)] {
auto purpose = packet.purpose;
m_outbox.emplace_back(OutPacket { std::move(packet), handler });
if (m_outbox.size() > 1) {
// outstanding async_write
spdlog::debug("Outstanding write, not writing 0x{:x} now", int(purpose));
return;
}
client_tcp_write_impl();
});
}
void ClientNetwork::client_tcp_write_impl() {
OutPacket out_packet = std::move(m_outbox.front());
spdlog::debug("Writing 0x{:x} now", int(out_packet.packet.purpose));
// don't pop just yet
auto header = out_packet.packet.finalize();
// copy packet
auto owned_packet = std::make_shared<bmp::ClientPacket>(std::move(packet));
auto owned_packet = std::make_shared<bmp::ClientPacket>(std::move(out_packet.packet));
// serialize header
auto header_data = std::make_shared<std::vector<uint8_t>>(bmp::ClientHeader::SERIALIZED_SIZE);
header.serialize_to(*header_data);
@ -403,7 +421,8 @@ void ClientNetwork::client_tcp_write(bmp::ClientPacket&& packet, std::function<v
buffer(owned_packet->raw_data)
};
boost::asio::async_write(m_game_socket, buffers,
[this, header_data, owned_packet, handler](auto ec, auto size) {
bind_executor(m_write_strand, [this, header_data, owned_packet, handler = std::move(out_packet.handler)](auto ec, auto size) {
// only now pop the packet we just sent from the outbox
spdlog::trace("Wrote {} bytes for 0x{:x}", size, int(owned_packet->purpose));
if (handler) {
handler(ec);
@ -415,7 +434,13 @@ void ClientNetwork::client_tcp_write(bmp::ClientPacket&& packet, std::function<v
spdlog::debug("Sent packet of type 0x{:x}", int(owned_packet->purpose));
}
}
});
m_outbox.pop_front();
// trigger the next send if there's more to send
if (!m_outbox.empty()) {
// async so this is not recursion!
client_tcp_write_impl();
}
}));
}
std::vector<uint8_t> ClientNetwork::json_to_vec(const nlohmann::json& value) {
@ -508,7 +533,7 @@ void ClientNetwork::handle_server_packet(bmp::Packet&& packet) {
case bmp::MapInfo: {
auto data = packet.get_readable_data();
auto map = std::string(data.begin(), data.end());
spdlog::debug("Map: '{}'", map);
spdlog::debug("Map: '{}'", map);
client_tcp_write(bmp::ClientPacket {
.purpose = bmp::ClientPurpose::MapInfo,
.raw_data = json_to_vec({ "map", map }),

View File

@ -8,6 +8,7 @@
#include "Version.h"
#include <boost/asio.hpp>
#include <deque>
#include <nlohmann/json.hpp>
using namespace boost::asio;
@ -32,6 +33,13 @@ private:
void client_tcp_read(std::function<void(bmp::ClientPacket&&)> handler);
void client_tcp_write(bmp::ClientPacket&& packet, std::function<void(boost::system::error_code)> handler = nullptr);
struct OutPacket {
bmp::ClientPacket packet;
std::function<void(boost::system::error_code)> handler;
};
std::deque<OutPacket> m_outbox {};
void client_tcp_write_impl();
void handle_packet(bmp::ClientPacket& packet);
void handle_client_identification(bmp::ClientPacket& packet);
void handle_login(bmp::ClientPacket& packet);
@ -68,7 +76,8 @@ private:
bmp::ClientState m_client_state;
ip::tcp::acceptor m_acceptor { m_io };
boost::asio::strand<ip::tcp::socket::executor_type> m_strand { m_game_socket.get_executor() };
boost::asio::io_context::strand m_read_strand { m_io };
boost::asio::io_context::strand m_write_strand { m_io };
// temporary packet and header buffer for async reads
bmp::ClientPacket m_tmp_packet {};