peer_online_state: serve online state

Signed-off-by: fufesou <shuanglongchen@yeah.net>
This commit is contained in:
fufesou 2022-07-26 21:54:18 +08:00
parent 8c477c8cd0
commit a4940f4634
3 changed files with 62 additions and 8 deletions

1
.gitignore vendored
View File

@ -5,3 +5,4 @@ debian-build
debian/.debhelper
debian/debhelper-build-stamp
.DS_Store
.vscode

View File

@ -148,6 +148,16 @@ message PeerDiscovery {
string misc = 7;
}
message OnlineRequest {
string id = 1;
repeated string peers = 2;
}
message OnlineResponse {
repeated string onlines = 1;
repeated string offlines = 2;
}
message RendezvousMessage {
oneof union {
RegisterPeer register_peer = 6;
@ -167,5 +177,7 @@ message RendezvousMessage {
TestNatRequest test_nat_request = 20;
TestNatResponse test_nat_response = 21;
PeerDiscovery peer_discovery = 22;
OnlineRequest online_request = 23;
OnlineResponse online_response = 24;
}
}

View File

@ -756,6 +756,40 @@ impl RendezvousServer {
}
}
#[inline]
async fn handle_online_request(
&mut self,
stream: &mut FramedStream,
peers: Vec<String>,
) -> ResultType<()> {
let mut onlines = Vec::new();
for peer_id in &peers {
if let Some(peer) = self.pm.get_in_memory(&peer_id).await {
let (elapsed, _) = {
let r = peer.read().await;
(r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr)
};
if elapsed < REG_TIMEOUT {
onlines.push(peer_id.clone());
}
}
}
let offlines = peers
.into_iter()
.filter(|p| onlines.iter().position(|r| r == p).is_none())
.collect::<Vec<_>>();
let mut msg_out = RendezvousMessage::new();
msg_out.set_online_response(OnlineResponse {
onlines,
offlines,
..Default::default()
});
stream.send(&msg_out).await?;
Ok(())
}
#[inline]
async fn send_to_tcp(&mut self, msg: RendezvousMessage, addr: SocketAddr) {
let mut tcp = self.tcp_punch.lock().await.remove(&addr);
@ -1014,8 +1048,8 @@ impl RendezvousServer {
}
async fn handle_listener2(&self, stream: TcpStream, addr: SocketAddr) {
let mut rs = self.clone();
if addr.ip().to_string() == "127.0.0.1" {
let rs = self.clone();
tokio::spawn(async move {
let mut stream = stream;
let mut buffer = [0; 64];
@ -1033,7 +1067,8 @@ impl RendezvousServer {
let mut stream = stream;
if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
if let Some(rendezvous_message::Union::TestNatRequest(_)) = msg_in.union {
match msg_in.union {
Some(rendezvous_message::Union::TestNatRequest(_)) => {
let mut msg_out = RendezvousMessage::new();
msg_out.set_test_nat_response(TestNatResponse {
port: addr.port() as _,
@ -1041,6 +1076,12 @@ impl RendezvousServer {
});
stream.send(&msg_out).await.ok();
}
Some(rendezvous_message::Union::OnlineRequest(or)) => {
allow_err!(rs.handle_online_request(&mut stream, or.peers).await);
}
_ => {
}
}
}
}
});