diff --git a/src/relay_server.rs b/src/relay_server.rs index 15f630f..1158338 100644 --- a/src/relay_server.rs +++ b/src/relay_server.rs @@ -6,6 +6,7 @@ use hbb_common::{ tcp::{new_listener, FramedStream}, tokio::{ self, + net::TcpListener, time::{interval, Duration}, }, ResultType, @@ -26,8 +27,19 @@ pub const DEFAULT_PORT: &'static str = "21117"; pub async fn start(port: &str, license: &str, stop: Arc>) -> ResultType<()> { let addr = format!("0.0.0.0:{}", port); log::info!("Listening on tcp {}", addr); - let mut timer = interval(Duration::from_millis(300)); let mut listener = new_listener(addr, false).await?; + loop { + if *stop.lock().unwrap() { + sleep(0.1).await; + continue; + } + log::info!("Start"); + io_loop(&mut listener, license, stop.clone()).await; + } +} + +async fn io_loop(listener: &mut TcpListener, license: &str, stop: Arc>) { + let mut timer = interval(Duration::from_millis(100)); loop { tokio::select! { Ok((stream, addr)) = listener.accept() => { @@ -44,7 +56,6 @@ pub async fn start(port: &str, license: &str, stop: Arc>) -> ResultT } } } - Ok(()) } async fn make_pair(stream: FramedStream, addr: SocketAddr, license: &str) -> ResultType<()> { diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs index b229b92..98e5a63 100644 --- a/src/rendezvous_server.rs +++ b/src/rendezvous_server.rs @@ -9,11 +9,12 @@ use hbb_common::{ log, protobuf::{Message as _, MessageField}, rendezvous_proto::*, + sleep, tcp::{new_listener, FramedStream}, timeout, tokio::{ self, - net::TcpStream, + net::{TcpListener, TcpStream}, sync::mpsc, time::{interval, Duration}, }, @@ -137,6 +138,7 @@ impl PeerMap { const REG_TIMEOUT: i32 = 30_000; type Sink = SplitSink, Bytes>; type Sender = mpsc::UnboundedSender<(RendezvousMessage, SocketAddr)>; +type Receiver = mpsc::UnboundedReceiver<(RendezvousMessage, SocketAddr)>; static mut ROTATION_RELAY_SERVER: usize = 0; #[derive(Clone)] @@ -184,7 +186,34 @@ impl RendezvousServer { }; let mut listener = new_listener(addr, false).await?; let mut listener2 = new_listener(addr2, false).await?; - let mut timer = interval(Duration::from_millis(300)); + loop { + if *stop.lock().unwrap() { + sleep(0.1).await; + continue; + } + log::info!("Start"); + rs.io_loop( + &mut rx, + &mut listener, + &mut listener2, + &mut socket, + license, + stop.clone(), + ) + .await; + } + } + + async fn io_loop( + &mut self, + rx: &mut Receiver, + listener: &mut TcpListener, + listener2: &mut TcpListener, + socket: &mut FramedSocket, + license: &str, + stop: Arc>, + ) { + let mut timer = interval(Duration::from_millis(100)); loop { tokio::select! { _ = timer.tick() => { @@ -197,7 +226,7 @@ impl RendezvousServer { allow_err!(socket.send(&msg, addr).await); } Some(Ok((bytes, addr))) = socket.next() => { - allow_err!(rs.handle_msg(&bytes, addr, &mut socket, license).await); + allow_err!(self.handle_msg(&bytes, addr, socket, license).await); } Ok((stream, addr)) = listener2.accept() => { let stream = FramedStream::from(stream); @@ -220,8 +249,8 @@ impl RendezvousServer { Ok((stream, addr)) = listener.accept() => { log::debug!("Tcp connection from {:?}", addr); let (a, mut b) = Framed::new(stream, BytesCodec::new()).split(); - let tcp_punch = rs.tcp_punch.clone(); - let mut rs = rs.clone(); + let tcp_punch = self.tcp_punch.clone(); + let mut rs = self.clone(); let license = license.to_owned(); tokio::spawn(async move { let mut sender = Some(a); @@ -305,7 +334,6 @@ impl RendezvousServer { } } } - Ok(()) } #[inline]