diff --git a/Cargo.lock b/Cargo.lock index b8f553d..b907411 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -497,6 +497,7 @@ name = "hbbs" version = "0.1.0" dependencies = [ "hbb_common 0.1.0", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.115 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.115 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.57 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index cf87eff..c0d888c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,10 @@ version = "0.1.0" authors = ["open-trade "] edition = "2018" +[[bin]] +name = "hbbf" +path = "src/hbbf/main.rs" + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] @@ -12,6 +16,7 @@ sled = "0.31" serde_derive = "1.0" serde = "1.0" serde_json = "1.0" +lazy_static = "1.4" [workspace] members = ["libs/hbb_common"] diff --git a/libs/hbb_common b/libs/hbb_common index dc39de2..8209a1f 160000 --- a/libs/hbb_common +++ b/libs/hbb_common @@ -1 +1 @@ -Subproject commit dc39de26981c2916c053762eef6961f6b659fb0c +Subproject commit 8209a1f2d1d11b7019e224fdc161f138415f6d53 diff --git a/src/hbbf/main.rs b/src/hbbf/main.rs new file mode 100644 index 0000000..c351f73 --- /dev/null +++ b/src/hbbf/main.rs @@ -0,0 +1,83 @@ +use hbb_common::{ + env_logger, log, + protobuf::Message as _, + rendezvous_proto::*, + sleep, + tcp::{new_listener, FramedStream}, + tokio, ResultType, +}; +use std::{ + collections::HashMap, + net::SocketAddr, + sync::{Arc, Mutex}, +}; + +lazy_static::lazy_static! { + static ref PEERS: Arc>> = Arc::new(Mutex::new(HashMap::new())); +} + +#[tokio::main] +async fn main() -> ResultType<()> { + env_logger::init(); + let addr = "0.0.0.0:21117"; + log::info!("Listening on {}", addr); + let mut listener = new_listener(addr, true).await?; + loop { + tokio::select! { + Ok((stream, addr)) = listener.accept() => { + tokio::spawn(async move { + make_pair(FramedStream::from(stream), addr).await.ok(); + }); + } + } + } +} + +async fn make_pair(stream: FramedStream, addr: SocketAddr) -> ResultType<()> { + let mut stream = stream; + if let Some(Ok(bytes)) = stream.next_timeout(30_000).await { + if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) { + if let Some(rendezvous_message::Union::request_forward(rf)) = msg_in.union { + if !rf.uuid.is_empty() { + let peer = PEERS.lock().unwrap().remove(&rf.uuid); + if let Some(peer) = peer { + log::info!("Forward request {} from {} got paired", rf.uuid, addr); + return forward(stream, peer).await; + } else { + log::info!("New forward request {} from {}", rf.uuid, addr); + PEERS.lock().unwrap().insert(rf.uuid.clone(), stream); + sleep(30.).await; + PEERS.lock().unwrap().remove(&rf.uuid); + } + } + } + } + } + Ok(()) +} + +async fn forward(stream: FramedStream, peer: FramedStream) -> ResultType<()> { + let mut peer = peer; + let mut stream = stream; + peer.set_raw(); + stream.set_raw(); + loop { + tokio::select! { + res = peer.next() => { + if let Some(Ok(bytes)) = res { + stream.send_bytes(bytes.into()).await?; + } else { + break; + } + }, + res = stream.next() => { + if let Some(Ok(bytes)) = res { + peer.send_bytes(bytes.into()).await?; + } else { + break; + } + }, + } + } + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 7c6a57a..14c443d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,7 @@ use hbbs::*; async fn main() -> ResultType<()> { env_logger::init(); let addr = "0.0.0.0:21116"; - log::info!("Start Server {}", addr); + log::info!("Listening on {}", addr); RendezvousServer::start(&addr).await?; Ok(()) }