From f2b93844475f737837b63ce187e011619dea109a Mon Sep 17 00:00:00 2001 From: Luuk van Oijen Date: Mon, 13 Nov 2023 12:33:15 +0100 Subject: [PATCH] first step towards less cpu usage --- Cargo.lock | 51 +++++++++++++++++ Cargo.toml | 1 + src/main.rs | 5 -- src/server/client.rs | 7 +++ src/server/mod.rs | 133 +++++++++++++++++++++++++++++-------------- 5 files changed, 149 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 747ffb2..863d9ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,6 +97,7 @@ dependencies = [ "anyhow", "async-trait", "flate2", + "futures", "lazy_static", "log", "mlua", @@ -280,6 +281,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.29" @@ -287,6 +303,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -295,6 +312,34 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +[[package]] +name = "futures-executor" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" + +[[package]] +name = "futures-macro" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "futures-sink" version = "0.3.29" @@ -313,10 +358,16 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 89a12bf..aaa1880 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ num_enum = "0.5.7" async-trait = "0.1.58" tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "net", "io-util", "sync"] } +futures = "0.3.29" reqwest = { version = "0.11", features = ["json"] } serde = { version = "1.0", features = ["derive"] } diff --git a/src/main.rs b/src/main.rs index 907f2fc..81a1702 100644 --- a/src/main.rs +++ b/src/main.rs @@ -40,14 +40,9 @@ async fn main() { .map_err(|e| error!("{:?}", e)) .expect("Failed to start server!"); - // TODO: It'd be nicer if we didn't have to rely on this interval to limit the amount of times - // the loop can run per second. It'd be much better if it idled until one of the connections - // had a packet ready. - let mut interval = tokio::time::interval(tokio::time::Duration::from_nanos(1000)); // 5 ms = max 200 ticks per second loop { if let Err(e) = server.process().await { error!("{:?}", e); } - interval.tick().await; } } diff --git a/src/server/client.rs b/src/server/client.rs index c679144..b0c54da 100644 --- a/src/server/client.rs +++ b/src/server/client.rs @@ -324,6 +324,13 @@ impl Client { Ok(None) } + pub async fn process_blocking(&mut self) -> anyhow::Result> { + if let Some(packet) = self.read_packet_waiting().await? { + return Ok(Some(packet)); + } + Ok(None) + } + pub fn disconnect(&mut self) { self.state = ClientState::Disconnect; } diff --git a/src/server/mod.rs b/src/server/mod.rs index dbf131e..269575b 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -192,6 +192,91 @@ impl Server { }) } + async fn process_tcp(&mut self, joined_names: Vec) -> anyhow::Result<()> { + // 'packet_wait: loop { + // // Process all the clients (TCP) + // let mut packets = Vec::new(); + // for i in 0..self.clients.len() { + // if let Some(client) = self.clients.get_mut(i) { + // match client.process().await { + // Ok(packet_opt) => { + // if let Some(raw_packet) = packet_opt { + // packets.push((i, raw_packet)); + // } + // } + // Err(e) => client.kick(&format!("Kicked: {:?}", e)).await, + // } + // + // // More efficient than broadcasting as we are already looping + // for name in joined_names.iter() { + // self.clients[i] + // .queue_packet(Packet::Notification(NotificationPacket::new(format!( + // "Welcome {}!", + // name.to_string() + // )))) + // .await; + // } + // } + // } + // + // if packets.len() > 0 { + // for (i, raw_packet) in packets { + // self.parse_packet(i, raw_packet).await?; + // } + // + // break 'packet_wait; + // } + // } + + if self.clients.len() > 0 { + let (result, index, _) = futures::future::select_all( + self.clients.iter_mut().map(|client| Box::pin(client.process_blocking())) + ).await; + + match result { + Ok(packet_opt) => { + if let Some(raw_packet) = packet_opt { + self.parse_packet(index, raw_packet).await?; + } + }, + Err(e) => { + if let Some(client) = self.clients.get_mut(index) { + client.kick(&format!("Kicked: {:?}", e)).await; + } + } + } + } else { + // TODO: Find a better solution than this lol + tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; + } + + Ok(()) + } + + async fn process_udp(&mut self) -> anyhow::Result<()> { + // Process UDP packets + // TODO: Use a UDP addr -> client ID look up table + for (addr, packet) in self.read_udp_packets().await { + if packet.data.len() == 0 { + continue; + } + let id = packet.data[0] - 1; // Offset by 1 + let data = packet.data[2..].to_vec(); + let packet_processed = RawPacket { + header: data.len() as u32, + data, + }; + 'search: for i in 0..self.clients.len() { + if self.clients[i].id == id { + self.parse_packet_udp(i, addr, packet_processed).await?; + break 'search; + } + } + } + + Ok(()) + } + pub async fn process(&mut self) -> anyhow::Result<()> { // Bit weird, but this is all to avoid deadlocking the server if anything goes wrong // with the client acception runtime. If that one locks, the server won't accept @@ -218,49 +303,11 @@ impl Server { } } - // Process UDP packets - // TODO: Use a UDP addr -> client ID look up table - for (addr, packet) in self.read_udp_packets().await { - if packet.data.len() == 0 { - continue; - } - let id = packet.data[0] - 1; // Offset by 1 - let data = packet.data[2..].to_vec(); - let packet_processed = RawPacket { - header: data.len() as u32, - data, - }; - 'search: for i in 0..self.clients.len() { - if self.clients[i].id == id { - self.parse_packet_udp(i, addr, packet_processed).await?; - break 'search; - } - } - } - - // Process all the clients (TCP) - for i in 0..self.clients.len() { - if let Some(client) = self.clients.get_mut(i) { - match client.process().await { - Ok(packet_opt) => { - if let Some(raw_packet) = packet_opt { - self.parse_packet(i, raw_packet.clone()).await?;; - } - } - Err(e) => client.kick(&format!("Kicked: {:?}", e)).await, - } - - // More efficient than broadcasting as we are already looping - for name in joined_names.iter() { - self.clients[i] - .queue_packet(Packet::Notification(NotificationPacket::new(format!( - "Welcome {}!", - name.to_string() - )))) - .await; - } - } - } + self.process_udp().await; + tokio::select! { + _ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {}, + _ = self.process_tcp(joined_names) => {}, + }; // I'm sorry for this code :( for i in 0..self.clients.len() {