more lua stuff working now + cleanup client code

This commit is contained in:
Luuk van Oijen
2023-11-14 14:06:05 +01:00
parent 1c21298351
commit 4f57680a67
4 changed files with 228 additions and 144 deletions

View File

@@ -5,13 +5,16 @@ function onPluginLoaded()
print("HI!")
end
function onPlayerAuthenticated(name)
print("hi welcome mista " .. name)
function onPlayerAuthenticated(joined_name)
print("hi welcome mista " .. joined_name)
print("current players:")
for id, name in pairs(MP.GetPlayers()) do
print("- [" .. id .. "]" .. name)
end
print("yipee")
return 0 -- 0 = do not block
end
MP.RegisterEventHandler("onPluginLoaded", "onPluginLoaded")

View File

@@ -107,88 +107,88 @@ impl Client {
pub async fn authenticate(&mut self, config: &super::Config) -> anyhow::Result<bool> {
debug!("Authenticating client {}...", self.id);
'waiting_for_c: loop {
self.socket.readable().await?;
let mut tmp = vec![0u8; 1];
while self.socket.peek(&mut tmp).await? == 0 {}
// Authentication works a little differently than normal
// Not sure why, but the BeamMP source code shows they
// also only read a single byte during authentication
let code = self.read_raw(1).await?[0];
debug!("code: '{}' / {}", code as char, code);
match code as char {
'C' => {
// We now delete existing data for this client ID, just in case.
// TODO: This seems like a recipe for disaster
let mut lock = CLIENT_MOD_PROGRESS.lock().await;
lock.remove(&(self.id as usize));
self.socket.readable().await?;
// let mut tmp = vec![0u8; 1];
// while self.socket.peek(&mut tmp).await? == 0 {}
// // Authentication works a little differently than normal
// // Not sure why, but the BeamMP source code shows they
// // also only read a single byte during authentication
// let code = self.read_raw(1).await?[0];
// debug!("code: '{}' / {}", code as char, code);
// match code as char {
// 'C' => {
//
// break 'waiting_for_c;
// }
// 'D' => {
// // The download sequence is so awful
// // It currently requires us to track what the next file is that
// // we need to provide, which is hard to do with the current
// // server design.
// // I think I will simply keep a counter around that will
// // track what the next mod is per client.
// // TODO: Clean this up. It also needs to be moved out of the client code IMO
//
// let id = self.read_raw(1).await?[0] as usize;
// debug!("HandleDownload connection for client id: {}", id);
//
// let mut mod_name = {
// let mut lock = CLIENT_MOD_PROGRESS.lock().await;
// if lock.get(&id).is_none() { lock.insert(id, 0); }
// let next_id = lock.get_mut(&id).unwrap();
//
// let bmod = &config.mods[*next_id]; // TODO: This is a bit uhh yeah
// debug!("Mod name: {}", bmod.0);
//
// *next_id += 1;
//
// if *next_id >= config.mods.len() {
// // I think this is where the connection should be closed, instead of after
// // just 1 mod.
// }
//
// bmod.0.clone()
// };
//
// if mod_name.starts_with("/") == false {
// mod_name = format!("/{mod_name}");
// }
//
// let mod_path = format!("Resources/Client{mod_name}");
// let file_data = std::fs::read(mod_path)?;
//
// let packet = RawPacket::from_data(file_data[(file_data.len()/2)..].to_vec());
//
// {
// let mut lock = self.write_half.lock().await;
// lock.writable().await?;
// trace!("Sending packets!");
// if let Err(e) = tcp_write_raw(lock.deref_mut(), Packet::Raw(packet)).await {
// error!("{:?}", e);
// }
// trace!("Packets sent!");
// drop(lock);
// }
//
// // return Err(ClientError::IsDownloader.into());
// return Ok(false);
// }
// _ => {
// error!("Unknown code: {}", code);
// return Err(ClientError::AuthenticateError.into());
// }
// }
// TODO: Check client version
trace!("Client version packet");
self.socket.readable().await?;
let packet = self.read_packet_waiting().await?;
debug!("{:?}", packet);
break 'waiting_for_c;
}
'D' => {
// The download sequence is so awful
// It currently requires us to track what the next file is that
// we need to provide, which is hard to do with the current
// server design.
// I think I will simply keep a counter around that will
// track what the next mod is per client.
// TODO: Clean this up. It also needs to be moved out of the client code IMO
// We now delete existing data for this client ID, just in case.
// TODO: This seems like a recipe for disaster
let mut lock = CLIENT_MOD_PROGRESS.lock().await;
lock.remove(&(self.id as usize));
let id = self.read_raw(1).await?[0] as usize;
debug!("HandleDownload connection for client id: {}", id);
let mut mod_name = {
let mut lock = CLIENT_MOD_PROGRESS.lock().await;
if lock.get(&id).is_none() { lock.insert(id, 0); }
let next_id = lock.get_mut(&id).unwrap();
let bmod = &config.mods[*next_id]; // TODO: This is a bit uhh yeah
debug!("Mod name: {}", bmod.0);
*next_id += 1;
if *next_id >= config.mods.len() {
// I think this is where the connection should be closed, instead of after
// just 1 mod.
}
bmod.0.clone()
};
if mod_name.starts_with("/") == false {
mod_name = format!("/{mod_name}");
}
let mod_path = format!("Resources/Client{mod_name}");
let file_data = std::fs::read(mod_path)?;
let packet = RawPacket::from_data(file_data[(file_data.len()/2)..].to_vec());
{
let mut lock = self.write_half.lock().await;
lock.writable().await?;
trace!("Sending packets!");
if let Err(e) = tcp_write_raw(lock.deref_mut(), Packet::Raw(packet)).await {
error!("{:?}", e);
}
trace!("Packets sent!");
drop(lock);
}
// return Err(ClientError::IsDownloader.into());
return Ok(false);
}
_ => {
error!("Unknown code: {}", code);
return Err(ClientError::AuthenticateError.into());
}
}
}
// TODO: Check client version
trace!("Client version packet");
self.socket.readable().await?;
let packet = self.read_packet_waiting().await?;
debug!("{:?}", packet);
self.write_packet(Packet::Raw(RawPacket::from_code('S')))
.await?;

View File

@@ -3,6 +3,7 @@ use std::sync::{Arc, Mutex, atomic::{AtomicBool, Ordering}};
use std::time::Instant;
use std::collections::HashMap;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, UdpSocket};
use tokio::task::{JoinHandle, JoinSet};
@@ -88,6 +89,7 @@ pub struct Server {
udp_socket: Arc<UdpSocket>,
clients_incoming: Arc<Mutex<Vec<Client>>>,
clients_queue: Vec<(Client, Vec<tokio::sync::oneshot::Receiver<Argument>>, Vec<Argument>)>,
pub clients: Vec<Client>,
@@ -128,7 +130,7 @@ impl Server {
let mut set = JoinSet::new();
loop {
match tcp_listener_ref.accept().await {
Ok((socket, addr)) => {
Ok((mut socket, addr)) => {
info!("New client connected: {:?}", addr);
let cfg_ref = config_ref.clone();
@@ -137,26 +139,40 @@ impl Server {
set.spawn(async move {
socket.set_nodelay(true); // TODO: Is this good?
let mut client = Client::new(socket);
match client.authenticate(&cfg_ref).await {
Ok(is_client) if is_client => {
let mut lock = ci_ref
.lock()
.map_err(|e| error!("{:?}", e))
.expect("Failed to acquire lock on mutex!");
lock.push(client);
drop(lock);
socket.readable().await.expect("Failed to wait for socket to become readable!");
let mut tmp = vec![0u8; 1];
while socket.peek(&mut tmp).await.expect("Failed to peek socket!") == 0 {}
// Authentication works a little differently than normal
// Not sure why, but the BeamMP source code shows they
// also only read a single byte during authentication
socket.read_exact(&mut tmp).await.expect("Failed to read from socket!");
let code = tmp[0];
match code as char {
'C' => {
let mut client = Client::new(socket);
match client.authenticate(&cfg_ref).await {
Ok(is_client) if is_client => {
let mut lock = ci_ref
.lock()
.map_err(|e| error!("{:?}", e))
.expect("Failed to acquire lock on mutex!");
lock.push(client);
drop(lock);
},
Ok(_is_client) => {
debug!("Downloader?");
},
Err(e) => {
error!("Authentication error occured, kicking player...");
error!("{:?}", e);
client.kick("Failed to authenticate player!").await;
// client.disconnect();
}
}
},
Ok(_is_client) => {
debug!("Downloader?");
},
Err(e) => {
error!("Authentication error occured, kicking player...");
error!("{:?}", e);
client.kick("Failed to authenticate player!").await;
// client.disconnect();
}
}
_ => {},
};
});
}
Err(e) => error!("Failed to accept incoming connection: {:?}", e),
@@ -179,6 +195,7 @@ impl Server {
udp_socket: udp_socket,
clients_incoming: clients_incoming,
clients_queue: Vec::new(),
clients: Vec::new(),
@@ -277,7 +294,7 @@ impl Server {
Ok(())
}
pub async fn process(&mut self) -> anyhow::Result<()> {
async fn process_authenticated_clients(&mut self) -> anyhow::Result<()> {
// Bit weird, but this is all to avoid deadlocking the server if anything goes wrong
// with the client acception runtime. If that one locks, the server won't accept
// more clients, but it will at least still process all other clients
@@ -298,23 +315,96 @@ impl Server {
info!("Welcome {name}!");
joined_names.push(name.clone());
let arg = Argument::String(name);
let mut vrx = Vec::new();
for plugin in &self.plugins {
let (tx, rx) = tokio::sync::oneshot::channel();
plugin.send_event(PluginBoundPluginEvent::CallEventHandler((ScriptEvent::OnPlayerAuthenticated, vec![arg.clone()], Some(tx)))).await;
// TODO: This never returns??
let res = rx.await.unwrap_or(Argument::Number(-1f32));
println!("res: {:?}", res);
// TODO: This never returns, because it blocks the entire process function
// from running, so it never manages to run the function correctly.
// let res = rx.await.unwrap_or(Argument::Number(-1f32));
// debug!("res: {:?}", res);
vrx.push(rx);
}
self.clients.push(clients_incoming_lock.swap_remove(i));
self.clients_queue.push((clients_incoming_lock.swap_remove(i), vrx, Vec::new()));
}
trace!("Accepted incoming clients!");
}
}
// Bit scuffed but it just polls the return values until all lua plugins have returned
// If this blocks, the server is stuck!
// TODO: Reduce allocations needed here (use swap_remove)
let mut not_done_clients = Vec::new();
for (mut client, mut vrx, mut res) in self.clients_queue.drain(..) {
let mut not_done = Vec::new();
for mut rx in vrx.drain(..) {
match rx.try_recv() {
Ok(v) => { debug!("hi: {:?}", v); res.push(v); },
Err(tokio::sync::oneshot::error::TryRecvError::Empty) => not_done.push(rx),
Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {},
}
}
vrx = not_done;
if vrx.len() == 0 {
let mut allowed = true;
for v in res {
match v {
Argument::Number(n) => if n == 1f32 || n == -1f32 { allowed = false; },
Argument::Boolean(b) => if b { allowed = false; },
_ => {}, // TODO: Handle this somehow?
}
}
if allowed {
self.clients.push(client);
} else {
// TODO: Custom kick message defined from within lua somehow?
// TODO: Kicking the client and then immediately dropping them results in the
// kick message not showing up, instead displaying that the socket closed.
client.kick("You are not allowed to join this server!").await;
}
} else {
not_done_clients.push((client, vrx, res));
}
}
self.clients_queue = not_done_clients;
Ok(())
}
async fn process_lua_events(&mut self) -> anyhow::Result<()> {
// Receive plugin events and process them
for plugin in &mut self.plugins {
for event in plugin.get_events() {
debug!("event: {:?}", event);
// TODO: Error handling (?)
match event {
ServerBoundPluginEvent::PluginLoaded => plugin.send_event(PluginBoundPluginEvent::CallEventHandler((ScriptEvent::OnPluginLoaded, Vec::new(), None))).await,
ServerBoundPluginEvent::RequestPlayerCount(responder) => { let _ = responder.send(PluginBoundPluginEvent::PlayerCount(self.clients.len())); }
ServerBoundPluginEvent::RequestPlayers(responder) => {
trace!("request players received");
let mut players = HashMap::new();
for client in &self.clients {
players.insert(client.id, client.get_name().to_string());
}
trace!("sending player list...");
let _ = responder.send(PluginBoundPluginEvent::Players(players));
trace!("player list sent");
}
_ => {},
}
}
}
Ok(())
}
pub async fn process(&mut self) -> anyhow::Result<()> {
// In the future, we should find a way to race process_tcp and process_udp
// because this introduces some latency and isn't great!
// But technically it works, and keeping the latency low should really make
// it a non-issue I think.
// TODO: Handle result
tokio::select! {
_ = self.process_udp() => {},
_ = tokio::time::sleep(tokio::time::Duration::from_nanos(1_000)) => {},
@@ -325,6 +415,9 @@ impl Server {
_ = self.process_tcp() => {},
};
self.process_authenticated_clients().await?;
self.process_lua_events().await?;
// I'm sorry for this code :(
for i in 0..self.clients.len() {
if self.clients.get(i).ok_or(ServerError::ClientDoesntExist)?.state == ClientState::Disconnect {
@@ -363,26 +456,6 @@ impl Server {
self.broadcast(Packet::Raw(RawPacket::from_str(&data)), None).await;
}
// Receive plugin events and process them
for plugin in &mut self.plugins {
for event in plugin.get_events() {
debug!("event: {:?}", event);
// TODO: Error handling (?)
match event {
ServerBoundPluginEvent::PluginLoaded => plugin.send_event(PluginBoundPluginEvent::CallEventHandler((ScriptEvent::OnPluginLoaded, Vec::new(), None))).await,
ServerBoundPluginEvent::RequestPlayerCount(responder) => { let _ = responder.send(PluginBoundPluginEvent::PlayerCount(self.clients.len())); }
ServerBoundPluginEvent::RequestPlayers(responder) => {
let mut players = HashMap::new();
for client in &self.clients {
players.insert(client.id, client.get_name().to_string());
}
let _ = responder.send(PluginBoundPluginEvent::Players(players));
}
_ => {},
}
}
}
Ok(())
}

View File

@@ -81,6 +81,7 @@ impl UserData for Context {
error!("Failed to send packet: {:?}", e);
}
let message = rx.blocking_recv();
trace!("received player info");
if let Ok(message) = message {
if let PluginBoundPluginEvent::Players(players) = message {
let table = lua.create_table()?;
@@ -139,27 +140,34 @@ impl Backend for BackendLua {
ScriptEvent::OnPlayerAuthenticated => "onPlayerAuthenticated",
};
let mut ret = -1f32;
// TODO: Error handling
let ctx: Context = self.lua.globals().get("MP").expect("MP is missing!");
let lock = ctx.handlers.lock().expect("Mutex is poisoned!");
if let Some(handler_name) = lock.get(event_name) {
let func: LuaResult<Function> = self.lua.globals().get(handler_name.clone());
if let Ok(func) = func {
let mapped_args = args.into_iter().map(|arg| {
match arg {
Argument::String(s) => if let Ok(lua_str) = self.lua.create_string(&s) { Some(Value::String(lua_str)) } else { None },
Argument::Boolean(b) => Some(Value::Boolean(b)),
Argument::Number(f) => Some(Value::Number(f as f64)),
{
let ctx: Context = self.lua.globals().get("MP").expect("MP is missing!");
let lock = ctx.handlers.lock().expect("Mutex is poisoned!");
if let Some(handler_name) = lock.get(event_name) {
let func: LuaResult<Function> = self.lua.globals().get(handler_name.clone());
if let Ok(func) = func {
let mapped_args = args.into_iter().map(|arg| {
match arg {
Argument::String(s) => if let Ok(lua_str) = self.lua.create_string(&s) { Some(Value::String(lua_str)) } else { None },
Argument::Boolean(b) => Some(Value::Boolean(b)),
Argument::Number(f) => Some(Value::Number(f as f64)),
}
}).filter(|v| v.is_some());
match func.call::<_, Option<f32>>(Variadic::from_iter(mapped_args)) {
Ok(res) => { trace!("fn ret: {:?}", ret); ret = res.unwrap_or(-1f32); }
Err(e) => {
error!("[LUA] {}", e);
ret = -1f32;
},
}
}).filter(|v| v.is_some());
match func.call::<_, Option<f32>>(Variadic::from_iter(mapped_args)) {
Ok(res) => if let Some(resp) = resp { resp.send(Argument::Number(res.unwrap_or(-1f32))).expect("Failed to send!"); } else {}
Err(e) => {
error!("[LUA] {}", e);
if let Some(resp) = resp { resp.send(Argument::Number(-1f32)).expect("Failed to send!"); } else {}
},
}
}
}
debug!("sending result...");
if let Some(resp) = resp { resp.send(Argument::Number(ret)).expect("Failed to send!"); }
debug!("call_event_handler done");
}
}