bunch of changes to prevent deadlocking the server

This commit is contained in:
Luuk van Oijen 2023-11-23 10:56:51 +01:00
parent e15833084a
commit 04c1b76402

View File

@ -1,11 +1,12 @@
use std::net::SocketAddr;
use std::sync::{Arc, Mutex, atomic::{AtomicBool, Ordering}};
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
use std::time::Instant;
use std::collections::HashMap;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, UdpSocket};
use tokio::task::{JoinHandle, JoinSet};
use tokio::sync::{Mutex, mpsc};
use num_enum::IntoPrimitive;
@ -126,7 +127,7 @@ pub struct Server {
tcp_listener: Arc<TcpListener>,
pub udp_socket: Arc<UdpSocket>,
clients_incoming: Arc<Mutex<Vec<Client>>>,
clients_incoming_rx: mpsc::Receiver<Client>,
clients_queue: Vec<(Client, Vec<tokio::sync::oneshot::Receiver<Argument>>, Vec<Argument>)>,
pub clients: Vec<Client>,
@ -161,137 +162,150 @@ impl Server {
let plugins = load_plugins();
// Start client runtime
let clients_incoming = Arc::new(Mutex::new(Vec::new()));
let clients_incoming_ref = Arc::clone(&clients_incoming);
let (clients_incoming_tx, clients_incoming_rx) = mpsc::channel(100);
debug!("Client acception runtime starting...");
let connect_runtime_handle = tokio::spawn(async move {
let mut set = JoinSet::new();
loop {
match tcp_listener_ref.accept().await {
Ok((mut socket, addr)) => {
info!("New client connected: {:?}", addr);
tokio::select! {
new_conn = tcp_listener_ref.accept() => {
match new_conn {
Ok((mut socket, addr)) => {
info!("New client connected: {:?}", addr);
let cfg_ref = config_ref.clone();
let ci_ref = clients_incoming_ref.clone();
let cfg_ref = config_ref.clone();
let ci_ref = clients_incoming_tx.clone();
set.spawn(async move {
socket.set_nodelay(true); // TODO: Is this good?
set.spawn(async move {
socket.set_nodelay(true); // TODO: Is this good?
socket.readable().await.expect("Failed to wait for socket to become readable!");
let mut tmp = vec![0u8; 1];
while socket.peek(&mut tmp).await.expect("Failed to peek socket!") == 0 {}
// Authentication works a little differently than normal
// Not sure why, but the BeamMP source code shows they
// also only read a single byte during authentication
socket.read_exact(&mut tmp).await.expect("Failed to read from socket!");
let code = tmp[0];
match code as char {
'C' => {
let mut client = Client::new(socket).await;
match client.authenticate(&cfg_ref).await {
Ok(is_client) if is_client => {
let mut lock = ci_ref
.lock()
.map_err(|e| error!("{:?}", e))
.expect("Failed to acquire lock on mutex!");
lock.push(client);
drop(lock);
},
Ok(_is_client) => {
debug!("Downloader?");
},
Err(e) => {
error!("Authentication error occured, kicking player...");
error!("{:?}", e);
client.kick("Failed to authenticate player!").await;
// client.disconnect();
}
socket.readable().await.expect("Failed to wait for socket to become readable!");
let mut tmp = vec![0u8; 1];
while socket.peek(&mut tmp).await.expect("Failed to peek socket!") == 0 {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
},
'D' => {
// Download connection (for old protocol)
// This crashes the client after sending over 1 mod.
// I have no idea why, perhaps I'm missing something that I'm supposed to send it.
// TODO: Implement this: https://github.com/BeamMP/BeamMP-Server/blob/master/src/TNetwork.cpp#L775
socket.readable().await;
let mut tmp = [0u8; 1];
socket.read_exact(&mut tmp).await;
let id = tmp[0] as usize;
debug!("[D] HandleDownload connection for client id: {}", id);
let mut sent_mods = Vec::new();
'download: while let Ok(_) = socket.writable().await {
{
let lock = CLIENT_MOD_PROGRESS.lock().await;
if lock.get(&(id as u8)).is_none() { continue; }
}
let mod_id = {
let lock = CLIENT_MOD_PROGRESS.lock().await;
*lock.get(&(id as u8)).unwrap()
};
if sent_mods.contains(&mod_id) { continue; }
debug!("[D] Starting download!");
let mut mod_name = {
if mod_id < 0 {
break 'download;
}
if mod_id as usize >= cfg_ref.mods.len() {
break 'download;
}
let bmod = &cfg_ref.mods[mod_id as usize]; // TODO: This is a bit uhh yeah
debug!("[D] Mod name: {}", bmod.0);
bmod.0.clone()
};
if mod_name.starts_with("/") == false {
mod_name = format!("/{mod_name}");
}
debug!("[D] Starting transfer of mod {mod_name}!");
let mod_path = format!("Resources/Client{mod_name}");
if let Ok(file_data) = std::fs::read(mod_path) {
{
trace!("[D] Sending packets!");
if let Err(e) = socket.write(&file_data[(file_data.len()/2)..]).await {
error!("{:?}", e);
}
trace!("[D] Packets sent!");
}
}
sent_mods.push(mod_id);
}
debug!("[D] Done!");
},
'G' => {
// This is probably an HTTP GET request!
let mut tmp = [0u8; 3];
// Authentication works a little differently than normal
// Not sure why, but the BeamMP source code shows they
// also only read a single byte during authentication
socket.read_exact(&mut tmp).await.expect("Failed to read from socket!");
if tmp[0] as char == 'E' && tmp[1] as char == 'T' && tmp[2] as char == ' ' {
trace!("HTTP GET request found!");
handle_http_get(socket).await;
} else {
trace!("Unknown G packet received, not sure what to do!");
}
},
_ => {},
};
});
}
Err(e) => error!("Failed to accept incoming connection: {:?}", e),
let code = tmp[0];
match code as char {
'C' => {
info!("hi");
let mut client = Client::new(socket).await;
match client.authenticate(&cfg_ref).await {
Ok(is_client) if is_client => {
info!("bye");
ci_ref.send(client).await;
},
Ok(_is_client) => {
debug!("Downloader?");
},
Err(e) => {
error!("Authentication error occured, kicking player...");
error!("{:?}", e);
client.kick("Failed to authenticate player!").await;
// client.disconnect();
}
}
},
'D' => {
// Download connection (for old protocol)
// This crashes the client after sending over 1 mod.
// I have no idea why, perhaps I'm missing something that I'm supposed to send it.
// TODO: Implement this: https://github.com/BeamMP/BeamMP-Server/blob/master/src/TNetwork.cpp#L775
socket.readable().await;
let mut tmp = [0u8; 1];
socket.read_exact(&mut tmp).await;
let id = tmp[0] as usize;
debug!("[D] HandleDownload connection for client id: {}", id);
let mut sent_mods = Vec::new();
'download: while let Ok(_) = socket.writable().await {
{
let lock = CLIENT_MOD_PROGRESS.lock().await;
if lock.get(&(id as u8)).is_none() { continue; }
}
let mod_id = {
let lock = CLIENT_MOD_PROGRESS.lock().await;
*lock.get(&(id as u8)).unwrap()
};
if sent_mods.contains(&mod_id) { continue; }
debug!("[D] Starting download!");
let mut mod_name = {
if mod_id < 0 {
break 'download;
}
if mod_id as usize >= cfg_ref.mods.len() {
break 'download;
}
let bmod = &cfg_ref.mods[mod_id as usize]; // TODO: This is a bit uhh yeah
debug!("[D] Mod name: {}", bmod.0);
bmod.0.clone()
};
if mod_name.starts_with("/") == false {
mod_name = format!("/{mod_name}");
}
debug!("[D] Starting transfer of mod {mod_name}!");
let mod_path = format!("Resources/Client{mod_name}");
if let Ok(file_data) = std::fs::read(mod_path) {
{
trace!("[D] Sending packets!");
if let Err(e) = socket.write(&file_data[(file_data.len()/2)..]).await {
error!("{:?}", e);
}
trace!("[D] Packets sent!");
}
}
sent_mods.push(mod_id);
}
debug!("[D] Done!");
},
'G' => {
// This is probably an HTTP GET request!
let mut tmp = [0u8; 3];
socket.read_exact(&mut tmp).await.expect("Failed to read from socket!");
if tmp[0] as char == 'E' && tmp[1] as char == 'T' && tmp[2] as char == ' ' {
trace!("HTTP GET request found!");
handle_http_get(socket).await;
} else {
trace!("Unknown G packet received, not sure what to do!");
}
},
_ => {
tokio::task::yield_now().await;
},
};
});
info!("Client pushed to joinset!");
}
Err(e) => error!("Failed to accept incoming connection: {:?}", e),
}
},
_ = tokio::time::sleep(tokio::time::Duration::from_millis(50)) => {
error!("time out!");
},
}
if set.is_empty() == false {
info!("what!");
// Because join_next() is cancel safe, we can simply cancel it after N duration
// so at worst this client acceptance loop blocks for N duration
tokio::select!(
_ = set.join_next() => {},
_ = tokio::time::sleep(tokio::time::Duration::from_millis(10)) => {},
)
tokio::select! {
_ = tokio::time::sleep(tokio::time::Duration::from_millis(10)) => {
error!("join_next timed out!");
},
_ = set.join_next() => {
info!("join_next ran!");
},
}
}
}
});
@ -301,7 +315,7 @@ impl Server {
tcp_listener: tcp_listener,
udp_socket: udp_socket,
clients_incoming: clients_incoming,
clients_incoming_rx,
clients_queue: Vec::new(),
clients: Vec::new(),
@ -375,34 +389,29 @@ impl Server {
// with the client acception runtime. If that one locks, the server won't accept
// more clients, but it will at least still process all other clients
let mut joined_names = Vec::new();
if let Ok(mut clients_incoming_lock) = self.clients_incoming.try_lock() { // TODO: Why do I use try_lock here?
if clients_incoming_lock.len() > 0 {
trace!(
"Accepting {} incoming clients...",
clients_incoming_lock.len()
);
for client in clients_incoming_lock.drain(..) {
let userdata = client.get_userdata();
let (name, role, is_guest, beammp_id) = (userdata.username.clone(), userdata.roles.clone(), userdata.guest, userdata.uid.clone());
info!("Welcome {name}!");
joined_names.push(name.clone());
let mut vrx = Vec::new();
for plugin in &self.plugins {
let (tx, rx) = tokio::sync::oneshot::channel();
plugin.send_event(PluginBoundPluginEvent::CallEventHandler((ScriptEvent::OnPlayerAuthenticated { name: name.clone(), role: role.clone(), is_guest, identifiers: PlayerIdentifiers {
ip: String::from("not yet implemented"),
beammp_id: beammp_id.clone(),
} }, Some(tx)))).await;
// TODO: This never returns, because it blocks the entire process function
// from running, so it never manages to run the function correctly.
// let res = rx.await.unwrap_or(Argument::Number(-1f32));
// debug!("res: {:?}", res);
vrx.push(rx);
}
self.clients_queue.push((client, vrx, Vec::new()));
match self.clients_incoming_rx.try_recv() {
Ok(client) => {
let userdata = client.get_userdata();
let (name, role, is_guest, beammp_id) = (userdata.username.clone(), userdata.roles.clone(), userdata.guest, userdata.uid.clone());
info!("Welcome {name}!");
joined_names.push(name.clone());
let mut vrx = Vec::new();
for plugin in &self.plugins {
let (tx, rx) = tokio::sync::oneshot::channel();
plugin.send_event(PluginBoundPluginEvent::CallEventHandler((ScriptEvent::OnPlayerAuthenticated { name: name.clone(), role: role.clone(), is_guest, identifiers: PlayerIdentifiers {
ip: String::from("not yet implemented"),
beammp_id: beammp_id.clone(),
} }, Some(tx)))).await;
// TODO: This never returns, because it blocks the entire process function
// from running, so it never manages to run the function correctly.
// let res = rx.await.unwrap_or(Argument::Number(-1f32));
// debug!("res: {:?}", res);
vrx.push(rx);
}
trace!("Accepted incoming clients!");
}
self.clients_queue.push((client, vrx, Vec::new()));
},
Err(mpsc::error::TryRecvError::Empty) => {},
Err(e) => error!("Error while receiving new clients from acception runtime: {:?}", e),
}
// Bit scuffed but it just polls the return values until all lua plugins have returned