From 4f57680a671284dbcc57187e547baaa1f4966a39 Mon Sep 17 00:00:00 2001 From: Luuk van Oijen Date: Tue, 14 Nov 2023 14:06:05 +0100 Subject: [PATCH] more lua stuff working now + cleanup client code --- Resources/Server/TestPlugin/main.lua | 7 +- src/server/client.rs | 160 +++++++++++++------------- src/server/mod.rs | 163 +++++++++++++++++++-------- src/server/plugins/backend_lua.rs | 42 ++++--- 4 files changed, 228 insertions(+), 144 deletions(-) diff --git a/Resources/Server/TestPlugin/main.lua b/Resources/Server/TestPlugin/main.lua index e0f5eac..030a57d 100644 --- a/Resources/Server/TestPlugin/main.lua +++ b/Resources/Server/TestPlugin/main.lua @@ -5,13 +5,16 @@ function onPluginLoaded() print("HI!") end -function onPlayerAuthenticated(name) - print("hi welcome mista " .. name) +function onPlayerAuthenticated(joined_name) + print("hi welcome mista " .. joined_name) print("current players:") for id, name in pairs(MP.GetPlayers()) do print("- [" .. id .. "]" .. name) end + print("yipee") + + return 0 -- 0 = do not block end MP.RegisterEventHandler("onPluginLoaded", "onPluginLoaded") diff --git a/src/server/client.rs b/src/server/client.rs index 616657a..26e15aa 100644 --- a/src/server/client.rs +++ b/src/server/client.rs @@ -107,88 +107,88 @@ impl Client { pub async fn authenticate(&mut self, config: &super::Config) -> anyhow::Result { debug!("Authenticating client {}...", self.id); - 'waiting_for_c: loop { - self.socket.readable().await?; - let mut tmp = vec![0u8; 1]; - while self.socket.peek(&mut tmp).await? == 0 {} - // Authentication works a little differently than normal - // Not sure why, but the BeamMP source code shows they - // also only read a single byte during authentication - let code = self.read_raw(1).await?[0]; - debug!("code: '{}' / {}", code as char, code); - match code as char { - 'C' => { - // We now delete existing data for this client ID, just in case. - // TODO: This seems like a recipe for disaster - let mut lock = CLIENT_MOD_PROGRESS.lock().await; - lock.remove(&(self.id as usize)); + self.socket.readable().await?; + // let mut tmp = vec![0u8; 1]; + // while self.socket.peek(&mut tmp).await? == 0 {} + // // Authentication works a little differently than normal + // // Not sure why, but the BeamMP source code shows they + // // also only read a single byte during authentication + // let code = self.read_raw(1).await?[0]; + // debug!("code: '{}' / {}", code as char, code); + // match code as char { + // 'C' => { + // + // break 'waiting_for_c; + // } + // 'D' => { + // // The download sequence is so awful + // // It currently requires us to track what the next file is that + // // we need to provide, which is hard to do with the current + // // server design. + // // I think I will simply keep a counter around that will + // // track what the next mod is per client. + // // TODO: Clean this up. It also needs to be moved out of the client code IMO + // + // let id = self.read_raw(1).await?[0] as usize; + // debug!("HandleDownload connection for client id: {}", id); + // + // let mut mod_name = { + // let mut lock = CLIENT_MOD_PROGRESS.lock().await; + // if lock.get(&id).is_none() { lock.insert(id, 0); } + // let next_id = lock.get_mut(&id).unwrap(); + // + // let bmod = &config.mods[*next_id]; // TODO: This is a bit uhh yeah + // debug!("Mod name: {}", bmod.0); + // + // *next_id += 1; + // + // if *next_id >= config.mods.len() { + // // I think this is where the connection should be closed, instead of after + // // just 1 mod. + // } + // + // bmod.0.clone() + // }; + // + // if mod_name.starts_with("/") == false { + // mod_name = format!("/{mod_name}"); + // } + // + // let mod_path = format!("Resources/Client{mod_name}"); + // let file_data = std::fs::read(mod_path)?; + // + // let packet = RawPacket::from_data(file_data[(file_data.len()/2)..].to_vec()); + // + // { + // let mut lock = self.write_half.lock().await; + // lock.writable().await?; + // trace!("Sending packets!"); + // if let Err(e) = tcp_write_raw(lock.deref_mut(), Packet::Raw(packet)).await { + // error!("{:?}", e); + // } + // trace!("Packets sent!"); + // drop(lock); + // } + // + // // return Err(ClientError::IsDownloader.into()); + // return Ok(false); + // } + // _ => { + // error!("Unknown code: {}", code); + // return Err(ClientError::AuthenticateError.into()); + // } + // } - // TODO: Check client version - trace!("Client version packet"); - self.socket.readable().await?; - let packet = self.read_packet_waiting().await?; - debug!("{:?}", packet); - break 'waiting_for_c; - } - 'D' => { - // The download sequence is so awful - // It currently requires us to track what the next file is that - // we need to provide, which is hard to do with the current - // server design. - // I think I will simply keep a counter around that will - // track what the next mod is per client. - // TODO: Clean this up. It also needs to be moved out of the client code IMO + // We now delete existing data for this client ID, just in case. + // TODO: This seems like a recipe for disaster + let mut lock = CLIENT_MOD_PROGRESS.lock().await; + lock.remove(&(self.id as usize)); - let id = self.read_raw(1).await?[0] as usize; - debug!("HandleDownload connection for client id: {}", id); - - let mut mod_name = { - let mut lock = CLIENT_MOD_PROGRESS.lock().await; - if lock.get(&id).is_none() { lock.insert(id, 0); } - let next_id = lock.get_mut(&id).unwrap(); - - let bmod = &config.mods[*next_id]; // TODO: This is a bit uhh yeah - debug!("Mod name: {}", bmod.0); - - *next_id += 1; - - if *next_id >= config.mods.len() { - // I think this is where the connection should be closed, instead of after - // just 1 mod. - } - - bmod.0.clone() - }; - - if mod_name.starts_with("/") == false { - mod_name = format!("/{mod_name}"); - } - - let mod_path = format!("Resources/Client{mod_name}"); - let file_data = std::fs::read(mod_path)?; - - let packet = RawPacket::from_data(file_data[(file_data.len()/2)..].to_vec()); - - { - let mut lock = self.write_half.lock().await; - lock.writable().await?; - trace!("Sending packets!"); - if let Err(e) = tcp_write_raw(lock.deref_mut(), Packet::Raw(packet)).await { - error!("{:?}", e); - } - trace!("Packets sent!"); - drop(lock); - } - - // return Err(ClientError::IsDownloader.into()); - return Ok(false); - } - _ => { - error!("Unknown code: {}", code); - return Err(ClientError::AuthenticateError.into()); - } - } - } + // TODO: Check client version + trace!("Client version packet"); + self.socket.readable().await?; + let packet = self.read_packet_waiting().await?; + debug!("{:?}", packet); self.write_packet(Packet::Raw(RawPacket::from_code('S'))) .await?; diff --git a/src/server/mod.rs b/src/server/mod.rs index 094917b..73a1f5e 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -3,6 +3,7 @@ use std::sync::{Arc, Mutex, atomic::{AtomicBool, Ordering}}; use std::time::Instant; use std::collections::HashMap; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, UdpSocket}; use tokio::task::{JoinHandle, JoinSet}; @@ -88,6 +89,7 @@ pub struct Server { udp_socket: Arc, clients_incoming: Arc>>, + clients_queue: Vec<(Client, Vec>, Vec)>, pub clients: Vec, @@ -128,7 +130,7 @@ impl Server { let mut set = JoinSet::new(); loop { match tcp_listener_ref.accept().await { - Ok((socket, addr)) => { + Ok((mut socket, addr)) => { info!("New client connected: {:?}", addr); let cfg_ref = config_ref.clone(); @@ -137,26 +139,40 @@ impl Server { set.spawn(async move { socket.set_nodelay(true); // TODO: Is this good? - let mut client = Client::new(socket); - match client.authenticate(&cfg_ref).await { - Ok(is_client) if is_client => { - let mut lock = ci_ref - .lock() - .map_err(|e| error!("{:?}", e)) - .expect("Failed to acquire lock on mutex!"); - lock.push(client); - drop(lock); + socket.readable().await.expect("Failed to wait for socket to become readable!"); + let mut tmp = vec![0u8; 1]; + while socket.peek(&mut tmp).await.expect("Failed to peek socket!") == 0 {} + // Authentication works a little differently than normal + // Not sure why, but the BeamMP source code shows they + // also only read a single byte during authentication + socket.read_exact(&mut tmp).await.expect("Failed to read from socket!"); + let code = tmp[0]; + + match code as char { + 'C' => { + let mut client = Client::new(socket); + match client.authenticate(&cfg_ref).await { + Ok(is_client) if is_client => { + let mut lock = ci_ref + .lock() + .map_err(|e| error!("{:?}", e)) + .expect("Failed to acquire lock on mutex!"); + lock.push(client); + drop(lock); + }, + Ok(_is_client) => { + debug!("Downloader?"); + }, + Err(e) => { + error!("Authentication error occured, kicking player..."); + error!("{:?}", e); + client.kick("Failed to authenticate player!").await; + // client.disconnect(); + } + } }, - Ok(_is_client) => { - debug!("Downloader?"); - }, - Err(e) => { - error!("Authentication error occured, kicking player..."); - error!("{:?}", e); - client.kick("Failed to authenticate player!").await; - // client.disconnect(); - } - } + _ => {}, + }; }); } Err(e) => error!("Failed to accept incoming connection: {:?}", e), @@ -179,6 +195,7 @@ impl Server { udp_socket: udp_socket, clients_incoming: clients_incoming, + clients_queue: Vec::new(), clients: Vec::new(), @@ -277,7 +294,7 @@ impl Server { Ok(()) } - pub async fn process(&mut self) -> anyhow::Result<()> { + async fn process_authenticated_clients(&mut self) -> anyhow::Result<()> { // Bit weird, but this is all to avoid deadlocking the server if anything goes wrong // with the client acception runtime. If that one locks, the server won't accept // more clients, but it will at least still process all other clients @@ -298,23 +315,96 @@ impl Server { info!("Welcome {name}!"); joined_names.push(name.clone()); let arg = Argument::String(name); + let mut vrx = Vec::new(); for plugin in &self.plugins { let (tx, rx) = tokio::sync::oneshot::channel(); plugin.send_event(PluginBoundPluginEvent::CallEventHandler((ScriptEvent::OnPlayerAuthenticated, vec![arg.clone()], Some(tx)))).await; - // TODO: This never returns?? - let res = rx.await.unwrap_or(Argument::Number(-1f32)); - println!("res: {:?}", res); + // TODO: This never returns, because it blocks the entire process function + // from running, so it never manages to run the function correctly. + // let res = rx.await.unwrap_or(Argument::Number(-1f32)); + // debug!("res: {:?}", res); + vrx.push(rx); } - self.clients.push(clients_incoming_lock.swap_remove(i)); + self.clients_queue.push((clients_incoming_lock.swap_remove(i), vrx, Vec::new())); } trace!("Accepted incoming clients!"); } } + // Bit scuffed but it just polls the return values until all lua plugins have returned + // If this blocks, the server is stuck! + // TODO: Reduce allocations needed here (use swap_remove) + let mut not_done_clients = Vec::new(); + for (mut client, mut vrx, mut res) in self.clients_queue.drain(..) { + let mut not_done = Vec::new(); + for mut rx in vrx.drain(..) { + match rx.try_recv() { + Ok(v) => { debug!("hi: {:?}", v); res.push(v); }, + Err(tokio::sync::oneshot::error::TryRecvError::Empty) => not_done.push(rx), + Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {}, + } + } + vrx = not_done; + + if vrx.len() == 0 { + let mut allowed = true; + for v in res { + match v { + Argument::Number(n) => if n == 1f32 || n == -1f32 { allowed = false; }, + Argument::Boolean(b) => if b { allowed = false; }, + _ => {}, // TODO: Handle this somehow? + } + } + if allowed { + self.clients.push(client); + } else { + // TODO: Custom kick message defined from within lua somehow? + // TODO: Kicking the client and then immediately dropping them results in the + // kick message not showing up, instead displaying that the socket closed. + client.kick("You are not allowed to join this server!").await; + } + } else { + not_done_clients.push((client, vrx, res)); + } + } + self.clients_queue = not_done_clients; + + Ok(()) + } + + async fn process_lua_events(&mut self) -> anyhow::Result<()> { + // Receive plugin events and process them + for plugin in &mut self.plugins { + for event in plugin.get_events() { + debug!("event: {:?}", event); + // TODO: Error handling (?) + match event { + ServerBoundPluginEvent::PluginLoaded => plugin.send_event(PluginBoundPluginEvent::CallEventHandler((ScriptEvent::OnPluginLoaded, Vec::new(), None))).await, + ServerBoundPluginEvent::RequestPlayerCount(responder) => { let _ = responder.send(PluginBoundPluginEvent::PlayerCount(self.clients.len())); } + ServerBoundPluginEvent::RequestPlayers(responder) => { + trace!("request players received"); + let mut players = HashMap::new(); + for client in &self.clients { + players.insert(client.id, client.get_name().to_string()); + } + trace!("sending player list..."); + let _ = responder.send(PluginBoundPluginEvent::Players(players)); + trace!("player list sent"); + } + _ => {}, + } + } + } + + Ok(()) + } + + pub async fn process(&mut self) -> anyhow::Result<()> { // In the future, we should find a way to race process_tcp and process_udp // because this introduces some latency and isn't great! // But technically it works, and keeping the latency low should really make // it a non-issue I think. + // TODO: Handle result tokio::select! { _ = self.process_udp() => {}, _ = tokio::time::sleep(tokio::time::Duration::from_nanos(1_000)) => {}, @@ -325,6 +415,9 @@ impl Server { _ = self.process_tcp() => {}, }; + self.process_authenticated_clients().await?; + self.process_lua_events().await?; + // I'm sorry for this code :( for i in 0..self.clients.len() { if self.clients.get(i).ok_or(ServerError::ClientDoesntExist)?.state == ClientState::Disconnect { @@ -363,26 +456,6 @@ impl Server { self.broadcast(Packet::Raw(RawPacket::from_str(&data)), None).await; } - // Receive plugin events and process them - for plugin in &mut self.plugins { - for event in plugin.get_events() { - debug!("event: {:?}", event); - // TODO: Error handling (?) - match event { - ServerBoundPluginEvent::PluginLoaded => plugin.send_event(PluginBoundPluginEvent::CallEventHandler((ScriptEvent::OnPluginLoaded, Vec::new(), None))).await, - ServerBoundPluginEvent::RequestPlayerCount(responder) => { let _ = responder.send(PluginBoundPluginEvent::PlayerCount(self.clients.len())); } - ServerBoundPluginEvent::RequestPlayers(responder) => { - let mut players = HashMap::new(); - for client in &self.clients { - players.insert(client.id, client.get_name().to_string()); - } - let _ = responder.send(PluginBoundPluginEvent::Players(players)); - } - _ => {}, - } - } - } - Ok(()) } diff --git a/src/server/plugins/backend_lua.rs b/src/server/plugins/backend_lua.rs index 0a8c2a5..1aab9c9 100644 --- a/src/server/plugins/backend_lua.rs +++ b/src/server/plugins/backend_lua.rs @@ -81,6 +81,7 @@ impl UserData for Context { error!("Failed to send packet: {:?}", e); } let message = rx.blocking_recv(); + trace!("received player info"); if let Ok(message) = message { if let PluginBoundPluginEvent::Players(players) = message { let table = lua.create_table()?; @@ -139,27 +140,34 @@ impl Backend for BackendLua { ScriptEvent::OnPlayerAuthenticated => "onPlayerAuthenticated", }; + let mut ret = -1f32; // TODO: Error handling - let ctx: Context = self.lua.globals().get("MP").expect("MP is missing!"); - let lock = ctx.handlers.lock().expect("Mutex is poisoned!"); - if let Some(handler_name) = lock.get(event_name) { - let func: LuaResult = self.lua.globals().get(handler_name.clone()); - if let Ok(func) = func { - let mapped_args = args.into_iter().map(|arg| { - match arg { - Argument::String(s) => if let Ok(lua_str) = self.lua.create_string(&s) { Some(Value::String(lua_str)) } else { None }, - Argument::Boolean(b) => Some(Value::Boolean(b)), - Argument::Number(f) => Some(Value::Number(f as f64)), + { + let ctx: Context = self.lua.globals().get("MP").expect("MP is missing!"); + let lock = ctx.handlers.lock().expect("Mutex is poisoned!"); + if let Some(handler_name) = lock.get(event_name) { + let func: LuaResult = self.lua.globals().get(handler_name.clone()); + if let Ok(func) = func { + let mapped_args = args.into_iter().map(|arg| { + match arg { + Argument::String(s) => if let Ok(lua_str) = self.lua.create_string(&s) { Some(Value::String(lua_str)) } else { None }, + Argument::Boolean(b) => Some(Value::Boolean(b)), + Argument::Number(f) => Some(Value::Number(f as f64)), + } + }).filter(|v| v.is_some()); + match func.call::<_, Option>(Variadic::from_iter(mapped_args)) { + Ok(res) => { trace!("fn ret: {:?}", ret); ret = res.unwrap_or(-1f32); } + Err(e) => { + error!("[LUA] {}", e); + ret = -1f32; + }, } - }).filter(|v| v.is_some()); - match func.call::<_, Option>(Variadic::from_iter(mapped_args)) { - Ok(res) => if let Some(resp) = resp { resp.send(Argument::Number(res.unwrap_or(-1f32))).expect("Failed to send!"); } else {} - Err(e) => { - error!("[LUA] {}", e); - if let Some(resp) = resp { resp.send(Argument::Number(-1f32)).expect("Failed to send!"); } else {} - }, } } } + + debug!("sending result..."); + if let Some(resp) = resp { resp.send(Argument::Number(ret)).expect("Failed to send!"); } + debug!("call_event_handler done"); } }