From fc7f790defcf3b5db91d071ba4cf35c8cde3c6c0 Mon Sep 17 00:00:00 2001 From: open-trade Date: Fri, 19 Mar 2021 16:38:10 +0800 Subject: [PATCH] refactor --- src/hbbr/main.rs | 13 ++++++------- src/hbbr/relay_server.rs | 19 +++++++++++++++++-- src/main.rs | 14 ++++++-------- src/rendezvous_server.rs | 19 ++++++++++++++++++- 4 files changed, 47 insertions(+), 18 deletions(-) diff --git a/src/hbbr/main.rs b/src/hbbr/main.rs index e33080f..4225f91 100644 --- a/src/hbbr/main.rs +++ b/src/hbbr/main.rs @@ -1,12 +1,10 @@ use clap::App; mod relay_server; -use hbb_common::{env_logger::*, tokio, ResultType}; -use relay_server::start; +use hbb_common::{env_logger::*, ResultType}; +use relay_server::*; +use std::sync::{Arc, Mutex}; -const DEFAULT_PORT: &'static str = "21117"; - -#[tokio::main] -async fn main() -> ResultType<()> { +fn main() -> ResultType<()> { init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "info")); let args = format!( "-p, --port=[NUMBER(default={})] 'Sets the listening port'", @@ -18,6 +16,7 @@ async fn main() -> ResultType<()> { .about("RustDesk Relay Server") .args_from_usage(&args) .get_matches(); - start(matches.value_of("port").unwrap_or(DEFAULT_PORT)).await?; + let stop: Arc> = Default::default(); + start(matches.value_of("port").unwrap_or(DEFAULT_PORT), stop)?; Ok(()) } diff --git a/src/hbbr/relay_server.rs b/src/hbbr/relay_server.rs index 9becb4c..a95f16c 100644 --- a/src/hbbr/relay_server.rs +++ b/src/hbbr/relay_server.rs @@ -4,7 +4,11 @@ use hbb_common::{ rendezvous_proto::*, sleep, tcp::{new_listener, FramedStream}, - tokio, ResultType, + tokio::{ + self, + time::{interval, Duration}, + }, + ResultType, }; use std::{ collections::HashMap, @@ -16,9 +20,13 @@ lazy_static::lazy_static! { static ref PEERS: Arc>> = Arc::new(Mutex::new(HashMap::new())); } -pub async fn start(port: &str) -> ResultType<()> { +pub const DEFAULT_PORT: &'static str = "21117"; + +#[tokio::main(basic_scheduler)] +pub async fn start(port: &str, stop: Arc>) -> ResultType<()> { let addr = format!("0.0.0.0:{}", port); log::info!("Listening on {}", addr); + let mut timer = interval(Duration::from_millis(300)); let mut listener = new_listener(addr, false).await?; loop { tokio::select! { @@ -27,8 +35,15 @@ pub async fn start(port: &str) -> ResultType<()> { make_pair(FramedStream::from(stream), addr).await.ok(); }); } + _ = timer.tick() => { + if *stop.lock().unwrap() { + log::info!("Stopped"); + break; + } + } } } + Ok(()) } async fn make_pair(stream: FramedStream, addr: SocketAddr) -> ResultType<()> { diff --git a/src/main.rs b/src/main.rs index 9c64b71..8978a55 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,13 +2,12 @@ // https://blog.csdn.net/bytxl/article/details/44344855 use clap::App; -use hbb_common::{env_logger::*, log, tokio, ResultType}; +use hbb_common::{env_logger::*, log, ResultType}; use hbbs::*; use ini::Ini; -const DEFAULT_PORT: &'static str = "21116"; +use std::sync::{Arc, Mutex}; -#[tokio::main] -async fn main() -> ResultType<()> { +fn main() -> ResultType<()> { init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "info")); let args = format!( "-c --config=[FILE] +takes_value 'Sets a custom config file' @@ -72,12 +71,11 @@ async fn main() -> ResultType<()> { .map(|x| x.to_owned()) .collect(); let addr = format!("0.0.0.0:{}", port); - log::info!("Listening on {}", addr); let addr2 = format!("0.0.0.0:{}", port.parse::().unwrap_or(0) - 1); - log::info!("Listening on {}, extra port for NAT test", addr2); log::info!("relay-servers={:?}", relay_servers); log::info!("serial={}", serial); log::info!("rendezvous-servers={:?}", rendezvous_servers); + let stop: Arc> = Default::default(); RendezvousServer::start( &addr, &addr2, @@ -85,7 +83,7 @@ async fn main() -> ResultType<()> { serial, rendezvous_servers, get_arg("software-url", ""), - ) - .await?; + stop, + )?; Ok(()) } diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs index 88320a2..80452d4 100644 --- a/src/rendezvous_server.rs +++ b/src/rendezvous_server.rs @@ -11,7 +11,12 @@ use hbb_common::{ rendezvous_proto::*, tcp::{new_listener, FramedStream}, timeout, - tokio::{self, net::TcpStream, sync::mpsc}, + tokio::{ + self, + net::TcpStream, + sync::mpsc, + time::{interval, Duration}, + }, tokio_util::codec::Framed, udp::FramedSocket, AddrMangle, ResultType, @@ -61,6 +66,8 @@ struct PeerMap { db: super::SledAsync, } +pub const DEFAULT_PORT: &'static str = "21116"; + impl PeerMap { fn new() -> ResultType { Ok(Self { @@ -135,6 +142,7 @@ pub struct RendezvousServer { } impl RendezvousServer { + #[tokio::main(basic_scheduler)] pub async fn start( addr: &str, addr2: &str, @@ -142,6 +150,7 @@ impl RendezvousServer { serial: i32, rendezvous_servers: Vec, software_url: String, + stop: Arc>, ) -> ResultType<()> { let mut socket = FramedSocket::new(addr).await?; let (tx, mut rx) = mpsc::unbounded_channel::<(RendezvousMessage, SocketAddr)>(); @@ -161,8 +170,15 @@ impl RendezvousServer { }; let mut listener = new_listener(addr, false).await?; let mut listener2 = new_listener(addr2, false).await?; + let mut timer = interval(Duration::from_millis(300)); loop { tokio::select! { + _ = timer.tick() => { + if *stop.lock().unwrap() { + log::info!("Stopped"); + break; + } + } Some((msg, addr)) = rx.recv() => { allow_err!(socket.send(&msg, addr).await); } @@ -274,6 +290,7 @@ impl RendezvousServer { } } } + Ok(()) } #[inline]