diff --git a/src/main.rs b/src/main.rs index 128c6ab..611e08b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -53,6 +53,28 @@ async fn main() { let mut status = server.get_server_status(); loop { + // TODO: Error handling + if server.clients.len() > 0 { + tokio::select! { + ret = server::read_tcp(&mut server.clients) => { + match ret { + Ok(ret) => if let Some((index, packet)) = ret { + server.process_tcp(index, packet).await; + }, + Err(e) => error!("Error: {e}"), + } + }, + ret = server::read_udp(&mut server.udp_socket) => { + if let Some((addr, packet)) = ret { + server.process_udp(addr, packet).await; + } + }, + } + } else { + // TODO: Scuffed? + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + } + if let Err(e) = server.process().await { error!("{:?}", e); } diff --git a/src/server/mod.rs b/src/server/mod.rs index 0e0ba27..758de8d 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -78,9 +78,54 @@ pub struct ServerStatus { pub max_players: usize, } +pub async fn read_tcp(clients: &mut Vec) -> anyhow::Result> { + let (result, index, _) = futures::future::select_all( + clients.iter_mut().map(|client| Box::pin(client.process_blocking())) + ).await; + + Ok(match result { + Ok(packet_opt) => { + if let Some(raw_packet) = packet_opt { + // self.parse_packet(index, raw_packet).await?; + Some((index, raw_packet)) + } else { + None + } + }, + Err(e) => { + if let Some(client) = clients.get_mut(index) { + client.kick(&format!("Kicked: {:?}", e)).await; + } + None + } + }) +} + +pub async fn read_udp(udp_socket: &UdpSocket) -> Option<(SocketAddr, RawPacket)> { + let mut data = vec![0u8; 4096]; + let data_size; + let data_addr; + + match udp_socket.recv_from(&mut data).await { + Ok((0, _)) => { + error!("UDP socket is readable, yet has 0 bytes to read!"); + return None; + } + Ok((n, addr)) => (data_size, data_addr) = (n, addr), + Err(_) => return None, + } + + let packet = RawPacket { + header: data_size as u32, + data: data[..data_size].to_vec(), + }; + + Some((data_addr, packet)) +} + pub struct Server { tcp_listener: Arc, - udp_socket: Arc, + pub udp_socket: Arc, clients_incoming: Arc>>, clients_queue: Vec<(Client, Vec>, Vec)>, @@ -280,50 +325,50 @@ impl Server { } } - async fn process_tcp(&mut self) -> anyhow::Result<()> { - if self.clients.len() > 0 { - let (result, index, _) = futures::future::select_all( - self.clients.iter_mut().map(|client| Box::pin(client.process_blocking())) - ).await; + pub async fn process_tcp(&mut self, index: usize, raw_packet: RawPacket) -> anyhow::Result<()> { + // if self.clients.len() > 0 { + // let (result, index, _) = futures::future::select_all( + // self.clients.iter_mut().map(|client| Box::pin(client.process_blocking())) + // ).await; + // + // match result { + // Ok(packet_opt) => { + // if let Some(raw_packet) = packet_opt { + // self.parse_packet(index, raw_packet).await?; + // } + // }, + // Err(e) => { + // if let Some(client) = self.clients.get_mut(index) { + // client.kick(&format!("Kicked: {:?}", e)).await; + // } + // } + // } + // } else { + // // TODO: Find a better solution than this lol + // tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; + // } - match result { - Ok(packet_opt) => { - if let Some(raw_packet) = packet_opt { - self.parse_packet(index, raw_packet).await?; - } - }, - Err(e) => { - if let Some(client) = self.clients.get_mut(index) { - client.kick(&format!("Kicked: {:?}", e)).await; - } - } - } - } else { - // TODO: Find a better solution than this lol - tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; - } + self.parse_packet(index, raw_packet).await?; Ok(()) } - async fn process_udp(&mut self) -> anyhow::Result<()> { + pub async fn process_udp(&mut self, addr: SocketAddr, packet: RawPacket) -> anyhow::Result<()> { // Process UDP packets // TODO: Use a UDP addr -> client ID look up table - if let Some((addr, packet)) = self.read_udp_packets_blocking().await { - if packet.data.len() == 0 { - return Ok(()); // what! - } - let id = packet.data[0] - 1; // Offset by 1 - let data = packet.data[2..].to_vec(); - let packet_processed = RawPacket { - header: data.len() as u32, - data, - }; - 'search: for i in 0..self.clients.len() { - if self.clients[i].id == id { - self.parse_packet_udp(i, addr, packet_processed).await?; - break 'search; - } + if packet.data.len() == 0 { + return Ok(()); // what! + } + let id = packet.data[0] - 1; // Offset by 1 + let data = packet.data[2..].to_vec(); + let packet_processed = RawPacket { + header: data.len() as u32, + data, + }; + 'search: for i in 0..self.clients.len() { + if self.clients[i].id == id { + self.parse_packet_udp(i, addr, packet_processed).await?; + break 'search; } } @@ -454,15 +499,15 @@ impl Server { // But technically it works, and keeping the latency low should really make // it a non-issue I think. // TODO: Handle result - tokio::select! { - _ = self.process_udp() => {}, - _ = tokio::time::sleep(tokio::time::Duration::from_nanos(1_000)) => {}, - }; - - tokio::select! { - _ = tokio::time::sleep(tokio::time::Duration::from_nanos(1_000)) => {}, - _ = self.process_tcp() => {}, - }; + // tokio::select! { + // _ = self.process_udp() => {}, + // _ = tokio::time::sleep(tokio::time::Duration::from_nanos(1_000)) => {}, + // }; + // + // tokio::select! { + // _ = tokio::time::sleep(tokio::time::Duration::from_nanos(1_000)) => {}, + // _ = self.process_tcp() => {}, + // }; self.process_authenticated_clients().await?; self.process_lua_events().await?; @@ -479,14 +524,17 @@ impl Server { .await; } - let id = self.clients[i].id; - let name = self.clients[i].get_name().to_string(); + let name = self.clients.get(i).ok_or(ServerError::ClientDoesntExist)?.get_name().to_string(); for plugin in &mut self.plugins { plugin.send_event(PluginBoundPluginEvent::CallEventHandler((ScriptEvent::OnPlayerDisconnect { pid: id, name: name.clone() }, None))).await; } info!("Disconnecting client {}...", id); - self.clients.remove(i); + if i == self.clients.len() - 1 { + self.clients.remove(i); + } else { + self.clients.swap_remove(i); + } info!("Client {} disconnected!", id); } }