first step towards less cpu usage

This commit is contained in:
Luuk van Oijen
2023-11-13 12:33:15 +01:00
parent b5c6f2c3c7
commit f2b9384447
5 changed files with 149 additions and 48 deletions

51
Cargo.lock generated
View File

@@ -97,6 +97,7 @@ dependencies = [
"anyhow",
"async-trait",
"flate2",
"futures",
"lazy_static",
"log",
"mlua",
@@ -280,6 +281,21 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.29"
@@ -287,6 +303,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@@ -295,6 +312,34 @@ version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c"
[[package]]
name = "futures-executor"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa"
[[package]]
name = "futures-macro"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.39",
]
[[package]]
name = "futures-sink"
version = "0.3.29"
@@ -313,10 +358,16 @@ version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]

View File

@@ -17,6 +17,7 @@ num_enum = "0.5.7"
async-trait = "0.1.58"
tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "net", "io-util", "sync"] }
futures = "0.3.29"
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }

View File

@@ -40,14 +40,9 @@ async fn main() {
.map_err(|e| error!("{:?}", e))
.expect("Failed to start server!");
// TODO: It'd be nicer if we didn't have to rely on this interval to limit the amount of times
// the loop can run per second. It'd be much better if it idled until one of the connections
// had a packet ready.
let mut interval = tokio::time::interval(tokio::time::Duration::from_nanos(1000)); // 5 ms = max 200 ticks per second
loop {
if let Err(e) = server.process().await {
error!("{:?}", e);
}
interval.tick().await;
}
}

View File

@@ -324,6 +324,13 @@ impl Client {
Ok(None)
}
pub async fn process_blocking(&mut self) -> anyhow::Result<Option<RawPacket>> {
if let Some(packet) = self.read_packet_waiting().await? {
return Ok(Some(packet));
}
Ok(None)
}
pub fn disconnect(&mut self) {
self.state = ClientState::Disconnect;
}

View File

@@ -192,6 +192,91 @@ impl Server {
})
}
async fn process_tcp(&mut self, joined_names: Vec<String>) -> anyhow::Result<()> {
// 'packet_wait: loop {
// // Process all the clients (TCP)
// let mut packets = Vec::new();
// for i in 0..self.clients.len() {
// if let Some(client) = self.clients.get_mut(i) {
// match client.process().await {
// Ok(packet_opt) => {
// if let Some(raw_packet) = packet_opt {
// packets.push((i, raw_packet));
// }
// }
// Err(e) => client.kick(&format!("Kicked: {:?}", e)).await,
// }
//
// // More efficient than broadcasting as we are already looping
// for name in joined_names.iter() {
// self.clients[i]
// .queue_packet(Packet::Notification(NotificationPacket::new(format!(
// "Welcome {}!",
// name.to_string()
// ))))
// .await;
// }
// }
// }
//
// if packets.len() > 0 {
// for (i, raw_packet) in packets {
// self.parse_packet(i, raw_packet).await?;
// }
//
// break 'packet_wait;
// }
// }
if self.clients.len() > 0 {
let (result, index, _) = futures::future::select_all(
self.clients.iter_mut().map(|client| Box::pin(client.process_blocking()))
).await;
match result {
Ok(packet_opt) => {
if let Some(raw_packet) = packet_opt {
self.parse_packet(index, raw_packet).await?;
}
},
Err(e) => {
if let Some(client) = self.clients.get_mut(index) {
client.kick(&format!("Kicked: {:?}", e)).await;
}
}
}
} else {
// TODO: Find a better solution than this lol
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
}
Ok(())
}
async fn process_udp(&mut self) -> anyhow::Result<()> {
// Process UDP packets
// TODO: Use a UDP addr -> client ID look up table
for (addr, packet) in self.read_udp_packets().await {
if packet.data.len() == 0 {
continue;
}
let id = packet.data[0] - 1; // Offset by 1
let data = packet.data[2..].to_vec();
let packet_processed = RawPacket {
header: data.len() as u32,
data,
};
'search: for i in 0..self.clients.len() {
if self.clients[i].id == id {
self.parse_packet_udp(i, addr, packet_processed).await?;
break 'search;
}
}
}
Ok(())
}
pub async fn process(&mut self) -> anyhow::Result<()> {
// Bit weird, but this is all to avoid deadlocking the server if anything goes wrong
// with the client acception runtime. If that one locks, the server won't accept
@@ -218,49 +303,11 @@ impl Server {
}
}
// Process UDP packets
// TODO: Use a UDP addr -> client ID look up table
for (addr, packet) in self.read_udp_packets().await {
if packet.data.len() == 0 {
continue;
}
let id = packet.data[0] - 1; // Offset by 1
let data = packet.data[2..].to_vec();
let packet_processed = RawPacket {
header: data.len() as u32,
data,
};
'search: for i in 0..self.clients.len() {
if self.clients[i].id == id {
self.parse_packet_udp(i, addr, packet_processed).await?;
break 'search;
}
}
}
// Process all the clients (TCP)
for i in 0..self.clients.len() {
if let Some(client) = self.clients.get_mut(i) {
match client.process().await {
Ok(packet_opt) => {
if let Some(raw_packet) = packet_opt {
self.parse_packet(i, raw_packet.clone()).await?;;
}
}
Err(e) => client.kick(&format!("Kicked: {:?}", e)).await,
}
// More efficient than broadcasting as we are already looping
for name in joined_names.iter() {
self.clients[i]
.queue_packet(Packet::Notification(NotificationPacket::new(format!(
"Welcome {}!",
name.to_string()
))))
.await;
}
}
}
self.process_udp().await;
tokio::select! {
_ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {},
_ = self.process_tcp(joined_names) => {},
};
// I'm sorry for this code :(
for i in 0..self.clients.len() {