diff --git a/src/server/mod.rs b/src/server/mod.rs index 2499bff..69c5278 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,11 +1,12 @@ use std::net::SocketAddr; -use std::sync::{Arc, Mutex, atomic::{AtomicBool, Ordering}}; +use std::sync::{Arc, atomic::{AtomicBool, Ordering}}; use std::time::Instant; use std::collections::HashMap; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, UdpSocket}; use tokio::task::{JoinHandle, JoinSet}; +use tokio::sync::{Mutex, mpsc}; use num_enum::IntoPrimitive; @@ -126,7 +127,7 @@ pub struct Server { tcp_listener: Arc, pub udp_socket: Arc, - clients_incoming: Arc>>, + clients_incoming_rx: mpsc::Receiver, clients_queue: Vec<(Client, Vec>, Vec)>, pub clients: Vec, @@ -161,137 +162,150 @@ impl Server { let plugins = load_plugins(); // Start client runtime - let clients_incoming = Arc::new(Mutex::new(Vec::new())); - let clients_incoming_ref = Arc::clone(&clients_incoming); + let (clients_incoming_tx, clients_incoming_rx) = mpsc::channel(100); debug!("Client acception runtime starting..."); let connect_runtime_handle = tokio::spawn(async move { let mut set = JoinSet::new(); loop { - match tcp_listener_ref.accept().await { - Ok((mut socket, addr)) => { - info!("New client connected: {:?}", addr); + tokio::select! { + new_conn = tcp_listener_ref.accept() => { + match new_conn { + Ok((mut socket, addr)) => { + info!("New client connected: {:?}", addr); - let cfg_ref = config_ref.clone(); - let ci_ref = clients_incoming_ref.clone(); + let cfg_ref = config_ref.clone(); + let ci_ref = clients_incoming_tx.clone(); - set.spawn(async move { - socket.set_nodelay(true); // TODO: Is this good? + set.spawn(async move { + socket.set_nodelay(true); // TODO: Is this good? - socket.readable().await.expect("Failed to wait for socket to become readable!"); - let mut tmp = vec![0u8; 1]; - while socket.peek(&mut tmp).await.expect("Failed to peek socket!") == 0 {} - // Authentication works a little differently than normal - // Not sure why, but the BeamMP source code shows they - // also only read a single byte during authentication - socket.read_exact(&mut tmp).await.expect("Failed to read from socket!"); - let code = tmp[0]; - - match code as char { - 'C' => { - let mut client = Client::new(socket).await; - match client.authenticate(&cfg_ref).await { - Ok(is_client) if is_client => { - let mut lock = ci_ref - .lock() - .map_err(|e| error!("{:?}", e)) - .expect("Failed to acquire lock on mutex!"); - lock.push(client); - drop(lock); - }, - Ok(_is_client) => { - debug!("Downloader?"); - }, - Err(e) => { - error!("Authentication error occured, kicking player..."); - error!("{:?}", e); - client.kick("Failed to authenticate player!").await; - // client.disconnect(); - } + socket.readable().await.expect("Failed to wait for socket to become readable!"); + let mut tmp = vec![0u8; 1]; + while socket.peek(&mut tmp).await.expect("Failed to peek socket!") == 0 { + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; } - }, - 'D' => { - // Download connection (for old protocol) - // This crashes the client after sending over 1 mod. - // I have no idea why, perhaps I'm missing something that I'm supposed to send it. - // TODO: Implement this: https://github.com/BeamMP/BeamMP-Server/blob/master/src/TNetwork.cpp#L775 - - socket.readable().await; - let mut tmp = [0u8; 1]; - socket.read_exact(&mut tmp).await; - let id = tmp[0] as usize; - debug!("[D] HandleDownload connection for client id: {}", id); - let mut sent_mods = Vec::new(); - 'download: while let Ok(_) = socket.writable().await { - { - let lock = CLIENT_MOD_PROGRESS.lock().await; - if lock.get(&(id as u8)).is_none() { continue; } - } - let mod_id = { - let lock = CLIENT_MOD_PROGRESS.lock().await; - *lock.get(&(id as u8)).unwrap() - }; - if sent_mods.contains(&mod_id) { continue; } - debug!("[D] Starting download!"); - let mut mod_name = { - if mod_id < 0 { - break 'download; - } - if mod_id as usize >= cfg_ref.mods.len() { - break 'download; - } - - let bmod = &cfg_ref.mods[mod_id as usize]; // TODO: This is a bit uhh yeah - debug!("[D] Mod name: {}", bmod.0); - - bmod.0.clone() - }; - - if mod_name.starts_with("/") == false { - mod_name = format!("/{mod_name}"); - } - - debug!("[D] Starting transfer of mod {mod_name}!"); - - let mod_path = format!("Resources/Client{mod_name}"); - if let Ok(file_data) = std::fs::read(mod_path) { - { - trace!("[D] Sending packets!"); - if let Err(e) = socket.write(&file_data[(file_data.len()/2)..]).await { - error!("{:?}", e); - } - trace!("[D] Packets sent!"); - } - } - - sent_mods.push(mod_id); - } - debug!("[D] Done!"); - }, - 'G' => { - // This is probably an HTTP GET request! - let mut tmp = [0u8; 3]; + // Authentication works a little differently than normal + // Not sure why, but the BeamMP source code shows they + // also only read a single byte during authentication socket.read_exact(&mut tmp).await.expect("Failed to read from socket!"); - if tmp[0] as char == 'E' && tmp[1] as char == 'T' && tmp[2] as char == ' ' { - trace!("HTTP GET request found!"); - handle_http_get(socket).await; - } else { - trace!("Unknown G packet received, not sure what to do!"); - } - }, - _ => {}, - }; - }); - } - Err(e) => error!("Failed to accept incoming connection: {:?}", e), + let code = tmp[0]; + + match code as char { + 'C' => { + info!("hi"); + let mut client = Client::new(socket).await; + match client.authenticate(&cfg_ref).await { + Ok(is_client) if is_client => { + info!("bye"); + ci_ref.send(client).await; + }, + Ok(_is_client) => { + debug!("Downloader?"); + }, + Err(e) => { + error!("Authentication error occured, kicking player..."); + error!("{:?}", e); + client.kick("Failed to authenticate player!").await; + // client.disconnect(); + } + } + }, + 'D' => { + // Download connection (for old protocol) + // This crashes the client after sending over 1 mod. + // I have no idea why, perhaps I'm missing something that I'm supposed to send it. + // TODO: Implement this: https://github.com/BeamMP/BeamMP-Server/blob/master/src/TNetwork.cpp#L775 + + socket.readable().await; + let mut tmp = [0u8; 1]; + socket.read_exact(&mut tmp).await; + let id = tmp[0] as usize; + debug!("[D] HandleDownload connection for client id: {}", id); + let mut sent_mods = Vec::new(); + 'download: while let Ok(_) = socket.writable().await { + { + let lock = CLIENT_MOD_PROGRESS.lock().await; + if lock.get(&(id as u8)).is_none() { continue; } + } + let mod_id = { + let lock = CLIENT_MOD_PROGRESS.lock().await; + *lock.get(&(id as u8)).unwrap() + }; + if sent_mods.contains(&mod_id) { continue; } + debug!("[D] Starting download!"); + let mut mod_name = { + if mod_id < 0 { + break 'download; + } + if mod_id as usize >= cfg_ref.mods.len() { + break 'download; + } + + let bmod = &cfg_ref.mods[mod_id as usize]; // TODO: This is a bit uhh yeah + debug!("[D] Mod name: {}", bmod.0); + + bmod.0.clone() + }; + + if mod_name.starts_with("/") == false { + mod_name = format!("/{mod_name}"); + } + + debug!("[D] Starting transfer of mod {mod_name}!"); + + let mod_path = format!("Resources/Client{mod_name}"); + if let Ok(file_data) = std::fs::read(mod_path) { + { + trace!("[D] Sending packets!"); + if let Err(e) = socket.write(&file_data[(file_data.len()/2)..]).await { + error!("{:?}", e); + } + trace!("[D] Packets sent!"); + } + } + + sent_mods.push(mod_id); + } + debug!("[D] Done!"); + }, + 'G' => { + // This is probably an HTTP GET request! + let mut tmp = [0u8; 3]; + socket.read_exact(&mut tmp).await.expect("Failed to read from socket!"); + if tmp[0] as char == 'E' && tmp[1] as char == 'T' && tmp[2] as char == ' ' { + trace!("HTTP GET request found!"); + handle_http_get(socket).await; + } else { + trace!("Unknown G packet received, not sure what to do!"); + } + }, + _ => { + tokio::task::yield_now().await; + }, + }; + }); + info!("Client pushed to joinset!"); + } + Err(e) => error!("Failed to accept incoming connection: {:?}", e), + } + }, + _ = tokio::time::sleep(tokio::time::Duration::from_millis(50)) => { + error!("time out!"); + }, } if set.is_empty() == false { + info!("what!"); // Because join_next() is cancel safe, we can simply cancel it after N duration // so at worst this client acceptance loop blocks for N duration - tokio::select!( - _ = set.join_next() => {}, - _ = tokio::time::sleep(tokio::time::Duration::from_millis(10)) => {}, - ) + tokio::select! { + _ = tokio::time::sleep(tokio::time::Duration::from_millis(10)) => { + error!("join_next timed out!"); + }, + _ = set.join_next() => { + info!("join_next ran!"); + }, + } } } }); @@ -301,7 +315,7 @@ impl Server { tcp_listener: tcp_listener, udp_socket: udp_socket, - clients_incoming: clients_incoming, + clients_incoming_rx, clients_queue: Vec::new(), clients: Vec::new(), @@ -375,34 +389,29 @@ impl Server { // with the client acception runtime. If that one locks, the server won't accept // more clients, but it will at least still process all other clients let mut joined_names = Vec::new(); - if let Ok(mut clients_incoming_lock) = self.clients_incoming.try_lock() { // TODO: Why do I use try_lock here? - if clients_incoming_lock.len() > 0 { - trace!( - "Accepting {} incoming clients...", - clients_incoming_lock.len() - ); - for client in clients_incoming_lock.drain(..) { - let userdata = client.get_userdata(); - let (name, role, is_guest, beammp_id) = (userdata.username.clone(), userdata.roles.clone(), userdata.guest, userdata.uid.clone()); - info!("Welcome {name}!"); - joined_names.push(name.clone()); - let mut vrx = Vec::new(); - for plugin in &self.plugins { - let (tx, rx) = tokio::sync::oneshot::channel(); - plugin.send_event(PluginBoundPluginEvent::CallEventHandler((ScriptEvent::OnPlayerAuthenticated { name: name.clone(), role: role.clone(), is_guest, identifiers: PlayerIdentifiers { - ip: String::from("not yet implemented"), - beammp_id: beammp_id.clone(), - } }, Some(tx)))).await; - // TODO: This never returns, because it blocks the entire process function - // from running, so it never manages to run the function correctly. - // let res = rx.await.unwrap_or(Argument::Number(-1f32)); - // debug!("res: {:?}", res); - vrx.push(rx); - } - self.clients_queue.push((client, vrx, Vec::new())); + match self.clients_incoming_rx.try_recv() { + Ok(client) => { + let userdata = client.get_userdata(); + let (name, role, is_guest, beammp_id) = (userdata.username.clone(), userdata.roles.clone(), userdata.guest, userdata.uid.clone()); + info!("Welcome {name}!"); + joined_names.push(name.clone()); + let mut vrx = Vec::new(); + for plugin in &self.plugins { + let (tx, rx) = tokio::sync::oneshot::channel(); + plugin.send_event(PluginBoundPluginEvent::CallEventHandler((ScriptEvent::OnPlayerAuthenticated { name: name.clone(), role: role.clone(), is_guest, identifiers: PlayerIdentifiers { + ip: String::from("not yet implemented"), + beammp_id: beammp_id.clone(), + } }, Some(tx)))).await; + // TODO: This never returns, because it blocks the entire process function + // from running, so it never manages to run the function correctly. + // let res = rx.await.unwrap_or(Argument::Number(-1f32)); + // debug!("res: {:?}", res); + vrx.push(rx); } - trace!("Accepted incoming clients!"); - } + self.clients_queue.push((client, vrx, Vec::new())); + }, + Err(mpsc::error::TryRecvError::Empty) => {}, + Err(e) => error!("Error while receiving new clients from acception runtime: {:?}", e), } // Bit scuffed but it just polls the return values until all lua plugins have returned