wip racing read_tcp and read_udp

This commit is contained in:
Luuk van Oijen
2023-11-20 14:49:56 +01:00
parent 4551a2da60
commit 5a42fe87fd
2 changed files with 120 additions and 50 deletions

View File

@@ -53,6 +53,28 @@ async fn main() {
let mut status = server.get_server_status();
loop {
// TODO: Error handling
if server.clients.len() > 0 {
tokio::select! {
ret = server::read_tcp(&mut server.clients) => {
match ret {
Ok(ret) => if let Some((index, packet)) = ret {
server.process_tcp(index, packet).await;
},
Err(e) => error!("Error: {e}"),
}
},
ret = server::read_udp(&mut server.udp_socket) => {
if let Some((addr, packet)) = ret {
server.process_udp(addr, packet).await;
}
},
}
} else {
// TODO: Scuffed?
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
if let Err(e) = server.process().await {
error!("{:?}", e);
}

View File

@@ -78,9 +78,54 @@ pub struct ServerStatus {
pub max_players: usize,
}
pub async fn read_tcp(clients: &mut Vec<Client>) -> anyhow::Result<Option<(usize, RawPacket)>> {
let (result, index, _) = futures::future::select_all(
clients.iter_mut().map(|client| Box::pin(client.process_blocking()))
).await;
Ok(match result {
Ok(packet_opt) => {
if let Some(raw_packet) = packet_opt {
// self.parse_packet(index, raw_packet).await?;
Some((index, raw_packet))
} else {
None
}
},
Err(e) => {
if let Some(client) = clients.get_mut(index) {
client.kick(&format!("Kicked: {:?}", e)).await;
}
None
}
})
}
pub async fn read_udp(udp_socket: &UdpSocket) -> Option<(SocketAddr, RawPacket)> {
let mut data = vec![0u8; 4096];
let data_size;
let data_addr;
match udp_socket.recv_from(&mut data).await {
Ok((0, _)) => {
error!("UDP socket is readable, yet has 0 bytes to read!");
return None;
}
Ok((n, addr)) => (data_size, data_addr) = (n, addr),
Err(_) => return None,
}
let packet = RawPacket {
header: data_size as u32,
data: data[..data_size].to_vec(),
};
Some((data_addr, packet))
}
pub struct Server {
tcp_listener: Arc<TcpListener>,
udp_socket: Arc<UdpSocket>,
pub udp_socket: Arc<UdpSocket>,
clients_incoming: Arc<Mutex<Vec<Client>>>,
clients_queue: Vec<(Client, Vec<tokio::sync::oneshot::Receiver<Argument>>, Vec<Argument>)>,
@@ -280,50 +325,50 @@ impl Server {
}
}
async fn process_tcp(&mut self) -> anyhow::Result<()> {
if self.clients.len() > 0 {
let (result, index, _) = futures::future::select_all(
self.clients.iter_mut().map(|client| Box::pin(client.process_blocking()))
).await;
pub async fn process_tcp(&mut self, index: usize, raw_packet: RawPacket) -> anyhow::Result<()> {
// if self.clients.len() > 0 {
// let (result, index, _) = futures::future::select_all(
// self.clients.iter_mut().map(|client| Box::pin(client.process_blocking()))
// ).await;
//
// match result {
// Ok(packet_opt) => {
// if let Some(raw_packet) = packet_opt {
// self.parse_packet(index, raw_packet).await?;
// }
// },
// Err(e) => {
// if let Some(client) = self.clients.get_mut(index) {
// client.kick(&format!("Kicked: {:?}", e)).await;
// }
// }
// }
// } else {
// // TODO: Find a better solution than this lol
// tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
// }
match result {
Ok(packet_opt) => {
if let Some(raw_packet) = packet_opt {
self.parse_packet(index, raw_packet).await?;
}
},
Err(e) => {
if let Some(client) = self.clients.get_mut(index) {
client.kick(&format!("Kicked: {:?}", e)).await;
}
}
}
} else {
// TODO: Find a better solution than this lol
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
}
self.parse_packet(index, raw_packet).await?;
Ok(())
}
async fn process_udp(&mut self) -> anyhow::Result<()> {
pub async fn process_udp(&mut self, addr: SocketAddr, packet: RawPacket) -> anyhow::Result<()> {
// Process UDP packets
// TODO: Use a UDP addr -> client ID look up table
if let Some((addr, packet)) = self.read_udp_packets_blocking().await {
if packet.data.len() == 0 {
return Ok(()); // what!
}
let id = packet.data[0] - 1; // Offset by 1
let data = packet.data[2..].to_vec();
let packet_processed = RawPacket {
header: data.len() as u32,
data,
};
'search: for i in 0..self.clients.len() {
if self.clients[i].id == id {
self.parse_packet_udp(i, addr, packet_processed).await?;
break 'search;
}
if packet.data.len() == 0 {
return Ok(()); // what!
}
let id = packet.data[0] - 1; // Offset by 1
let data = packet.data[2..].to_vec();
let packet_processed = RawPacket {
header: data.len() as u32,
data,
};
'search: for i in 0..self.clients.len() {
if self.clients[i].id == id {
self.parse_packet_udp(i, addr, packet_processed).await?;
break 'search;
}
}
@@ -454,15 +499,15 @@ impl Server {
// But technically it works, and keeping the latency low should really make
// it a non-issue I think.
// TODO: Handle result
tokio::select! {
_ = self.process_udp() => {},
_ = tokio::time::sleep(tokio::time::Duration::from_nanos(1_000)) => {},
};
tokio::select! {
_ = tokio::time::sleep(tokio::time::Duration::from_nanos(1_000)) => {},
_ = self.process_tcp() => {},
};
// tokio::select! {
// _ = self.process_udp() => {},
// _ = tokio::time::sleep(tokio::time::Duration::from_nanos(1_000)) => {},
// };
//
// tokio::select! {
// _ = tokio::time::sleep(tokio::time::Duration::from_nanos(1_000)) => {},
// _ = self.process_tcp() => {},
// };
self.process_authenticated_clients().await?;
self.process_lua_events().await?;
@@ -479,14 +524,17 @@ impl Server {
.await;
}
let id = self.clients[i].id;
let name = self.clients[i].get_name().to_string();
let name = self.clients.get(i).ok_or(ServerError::ClientDoesntExist)?.get_name().to_string();
for plugin in &mut self.plugins {
plugin.send_event(PluginBoundPluginEvent::CallEventHandler((ScriptEvent::OnPlayerDisconnect { pid: id, name: name.clone() }, None))).await;
}
info!("Disconnecting client {}...", id);
self.clients.remove(i);
if i == self.clients.len() - 1 {
self.clients.remove(i);
} else {
self.clients.swap_remove(i);
}
info!("Client {} disconnected!", id);
}
}