From a4940f4634ab43568dedecacee2f31b5d2dc3907 Mon Sep 17 00:00:00 2001 From: fufesou Date: Tue, 26 Jul 2022 21:54:18 +0800 Subject: [PATCH 1/2] peer_online_state: serve online state Signed-off-by: fufesou --- .gitignore | 1 + libs/hbb_common/protos/rendezvous.proto | 12 ++++++ src/rendezvous_server.rs | 57 +++++++++++++++++++++---- 3 files changed, 62 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index 4565963..4c16996 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ debian-build debian/.debhelper debian/debhelper-build-stamp .DS_Store +.vscode diff --git a/libs/hbb_common/protos/rendezvous.proto b/libs/hbb_common/protos/rendezvous.proto index 2c5f1b3..894eba1 100644 --- a/libs/hbb_common/protos/rendezvous.proto +++ b/libs/hbb_common/protos/rendezvous.proto @@ -148,6 +148,16 @@ message PeerDiscovery { string misc = 7; } +message OnlineRequest { + string id = 1; + repeated string peers = 2; +} + +message OnlineResponse { + repeated string onlines = 1; + repeated string offlines = 2; +} + message RendezvousMessage { oneof union { RegisterPeer register_peer = 6; @@ -167,5 +177,7 @@ message RendezvousMessage { TestNatRequest test_nat_request = 20; TestNatResponse test_nat_response = 21; PeerDiscovery peer_discovery = 22; + OnlineRequest online_request = 23; + OnlineResponse online_response = 24; } } diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs index 60f95d9..9650602 100644 --- a/src/rendezvous_server.rs +++ b/src/rendezvous_server.rs @@ -756,6 +756,40 @@ impl RendezvousServer { } } + #[inline] + async fn handle_online_request( + &mut self, + stream: &mut FramedStream, + peers: Vec, + ) -> ResultType<()> { + let mut onlines = Vec::new(); + for peer_id in &peers { + if let Some(peer) = self.pm.get_in_memory(&peer_id).await { + let (elapsed, _) = { + let r = peer.read().await; + (r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr) + }; + if elapsed < REG_TIMEOUT { + onlines.push(peer_id.clone()); + } + } + } + let offlines = peers + .into_iter() + .filter(|p| onlines.iter().position(|r| r == p).is_none()) + .collect::>(); + + let mut msg_out = RendezvousMessage::new(); + msg_out.set_online_response(OnlineResponse { + onlines, + offlines, + ..Default::default() + }); + stream.send(&msg_out).await?; + + Ok(()) + } + #[inline] async fn send_to_tcp(&mut self, msg: RendezvousMessage, addr: SocketAddr) { let mut tcp = self.tcp_punch.lock().await.remove(&addr); @@ -1014,8 +1048,8 @@ impl RendezvousServer { } async fn handle_listener2(&self, stream: TcpStream, addr: SocketAddr) { + let mut rs = self.clone(); if addr.ip().to_string() == "127.0.0.1" { - let rs = self.clone(); tokio::spawn(async move { let mut stream = stream; let mut buffer = [0; 64]; @@ -1033,13 +1067,20 @@ impl RendezvousServer { let mut stream = stream; if let Some(Ok(bytes)) = stream.next_timeout(30_000).await { if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) { - if let Some(rendezvous_message::Union::TestNatRequest(_)) = msg_in.union { - let mut msg_out = RendezvousMessage::new(); - msg_out.set_test_nat_response(TestNatResponse { - port: addr.port() as _, - ..Default::default() - }); - stream.send(&msg_out).await.ok(); + match msg_in.union { + Some(rendezvous_message::Union::TestNatRequest(_)) => { + let mut msg_out = RendezvousMessage::new(); + msg_out.set_test_nat_response(TestNatResponse { + port: addr.port() as _, + ..Default::default() + }); + stream.send(&msg_out).await.ok(); + } + Some(rendezvous_message::Union::OnlineRequest(or)) => { + allow_err!(rs.handle_online_request(&mut stream, or.peers).await); + } + _ => { + } } } } From f7fc45a3d2280a9026858a8be4157eb3a1b301c8 Mon Sep 17 00:00:00 2001 From: fufesou Date: Wed, 27 Jul 2022 00:35:40 +0800 Subject: [PATCH 2/2] peer_online_state: response online state bits Signed-off-by: fufesou --- libs/hbb_common/protos/rendezvous.proto | 3 +- src/rendezvous_server.rs | 50 +++++++------------------ 2 files changed, 15 insertions(+), 38 deletions(-) diff --git a/libs/hbb_common/protos/rendezvous.proto b/libs/hbb_common/protos/rendezvous.proto index 894eba1..1ac60f3 100644 --- a/libs/hbb_common/protos/rendezvous.proto +++ b/libs/hbb_common/protos/rendezvous.proto @@ -154,8 +154,7 @@ message OnlineRequest { } message OnlineResponse { - repeated string onlines = 1; - repeated string offlines = 2; + bytes states = 1; } message RendezvousMessage { diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs index 9650602..b93e04f 100644 --- a/src/rendezvous_server.rs +++ b/src/rendezvous_server.rs @@ -89,12 +89,7 @@ enum LoopFailure { impl RendezvousServer { #[tokio::main(flavor = "multi_thread")] - pub async fn start( - port: i32, - serial: i32, - key: &str, - rmem: usize, - ) -> ResultType<()> { + pub async fn start(port: i32, serial: i32, key: &str, rmem: usize) -> ResultType<()> { let (key, sk) = Self::get_server_sk(key); let addr = format!("0.0.0.0:{}", port); let addr2 = format!("0.0.0.0:{}", port - 1); @@ -762,27 +757,26 @@ impl RendezvousServer { stream: &mut FramedStream, peers: Vec, ) -> ResultType<()> { - let mut onlines = Vec::new(); - for peer_id in &peers { + let mut states = BytesMut::zeroed((peers.len() + 7) / 8); + for i in 0..peers.len() { + let peer_id = &peers[i]; + // bytes index from left to right + let states_idx = i / 8; + let bit_idx = 7 - i % 8; if let Some(peer) = self.pm.get_in_memory(&peer_id).await { let (elapsed, _) = { let r = peer.read().await; (r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr) }; if elapsed < REG_TIMEOUT { - onlines.push(peer_id.clone()); + states[states_idx] |= 0x01 << bit_idx; } } } - let offlines = peers - .into_iter() - .filter(|p| onlines.iter().position(|r| r == p).is_none()) - .collect::>(); let mut msg_out = RendezvousMessage::new(); msg_out.set_online_response(OnlineResponse { - onlines, - offlines, + states: states.into(), ..Default::default() }); stream.send(&msg_out).await?; @@ -1079,29 +1073,19 @@ impl RendezvousServer { Some(rendezvous_message::Union::OnlineRequest(or)) => { allow_err!(rs.handle_online_request(&mut stream, or.peers).await); } - _ => { - } + _ => {} } } } }); } - async fn handle_listener( - &self, - stream: TcpStream, - addr: SocketAddr, - key: &str, - ws: bool, - ) { + async fn handle_listener(&self, stream: TcpStream, addr: SocketAddr, key: &str, ws: bool) { log::debug!("Tcp connection from {:?}, ws: {}", addr, ws); let mut rs = self.clone(); let key = key.to_owned(); tokio::spawn(async move { - allow_err!( - rs.handle_listener_inner(stream, addr, &key, ws) - .await - ); + allow_err!(rs.handle_listener_inner(stream, addr, &key, ws).await); }); } @@ -1121,10 +1105,7 @@ impl RendezvousServer { while let Ok(Some(Ok(msg))) = timeout(30_000, b.next()).await { match msg { tungstenite::Message::Binary(bytes) => { - if !self - .handle_tcp(&bytes, &mut sink, addr, key, ws) - .await - { + if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await { break; } } @@ -1135,10 +1116,7 @@ impl RendezvousServer { let (a, mut b) = Framed::new(stream, BytesCodec::new()).split(); sink = Some(Sink::TcpStream(a)); while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await { - if !self - .handle_tcp(&bytes, &mut sink, addr, key, ws) - .await - { + if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await { break; } }