diff --git a/.gitignore b/.gitignore index 4565963..4c16996 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ debian-build debian/.debhelper debian/debhelper-build-stamp .DS_Store +.vscode diff --git a/libs/hbb_common/protos/rendezvous.proto b/libs/hbb_common/protos/rendezvous.proto index 2c5f1b3..894eba1 100644 --- a/libs/hbb_common/protos/rendezvous.proto +++ b/libs/hbb_common/protos/rendezvous.proto @@ -148,6 +148,16 @@ message PeerDiscovery { string misc = 7; } +message OnlineRequest { + string id = 1; + repeated string peers = 2; +} + +message OnlineResponse { + repeated string onlines = 1; + repeated string offlines = 2; +} + message RendezvousMessage { oneof union { RegisterPeer register_peer = 6; @@ -167,5 +177,7 @@ message RendezvousMessage { TestNatRequest test_nat_request = 20; TestNatResponse test_nat_response = 21; PeerDiscovery peer_discovery = 22; + OnlineRequest online_request = 23; + OnlineResponse online_response = 24; } } diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs index 60f95d9..9650602 100644 --- a/src/rendezvous_server.rs +++ b/src/rendezvous_server.rs @@ -756,6 +756,40 @@ impl RendezvousServer { } } + #[inline] + async fn handle_online_request( + &mut self, + stream: &mut FramedStream, + peers: Vec, + ) -> ResultType<()> { + let mut onlines = Vec::new(); + for peer_id in &peers { + if let Some(peer) = self.pm.get_in_memory(&peer_id).await { + let (elapsed, _) = { + let r = peer.read().await; + (r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr) + }; + if elapsed < REG_TIMEOUT { + onlines.push(peer_id.clone()); + } + } + } + let offlines = peers + .into_iter() + .filter(|p| onlines.iter().position(|r| r == p).is_none()) + .collect::>(); + + let mut msg_out = RendezvousMessage::new(); + msg_out.set_online_response(OnlineResponse { + onlines, + offlines, + ..Default::default() + }); + stream.send(&msg_out).await?; + + Ok(()) + } + #[inline] async fn send_to_tcp(&mut self, msg: RendezvousMessage, addr: SocketAddr) { let mut tcp = self.tcp_punch.lock().await.remove(&addr); @@ -1014,8 +1048,8 @@ impl RendezvousServer { } async fn handle_listener2(&self, stream: TcpStream, addr: SocketAddr) { + let mut rs = self.clone(); if addr.ip().to_string() == "127.0.0.1" { - let rs = self.clone(); tokio::spawn(async move { let mut stream = stream; let mut buffer = [0; 64]; @@ -1033,13 +1067,20 @@ impl RendezvousServer { let mut stream = stream; if let Some(Ok(bytes)) = stream.next_timeout(30_000).await { if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) { - if let Some(rendezvous_message::Union::TestNatRequest(_)) = msg_in.union { - let mut msg_out = RendezvousMessage::new(); - msg_out.set_test_nat_response(TestNatResponse { - port: addr.port() as _, - ..Default::default() - }); - stream.send(&msg_out).await.ok(); + match msg_in.union { + Some(rendezvous_message::Union::TestNatRequest(_)) => { + let mut msg_out = RendezvousMessage::new(); + msg_out.set_test_nat_response(TestNatResponse { + port: addr.port() as _, + ..Default::default() + }); + stream.send(&msg_out).await.ok(); + } + Some(rendezvous_message::Union::OnlineRequest(or)) => { + allow_err!(rs.handle_online_request(&mut stream, or.peers).await); + } + _ => { + } } } }