From 4e07acdb6cd096af344cefc4a037ed8c69fd9189 Mon Sep 17 00:00:00 2001 From: open-trade Date: Mon, 11 May 2020 15:47:13 +0800 Subject: [PATCH] handle tcp punch response via tcp connection --- src/rendezvous_server.rs | 92 ++++++++++++++++++++++++++++++---------- 1 file changed, 70 insertions(+), 22 deletions(-) diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs index 591693d..a60f16c 100644 --- a/src/rendezvous_server.rs +++ b/src/rendezvous_server.rs @@ -59,18 +59,30 @@ impl RendezvousServer { let tcp_punch = rs.tcp_punch.clone(); tcp_punch.lock().unwrap().insert(addr, a); let tx = tx.clone(); + let mut rs = Self { + peer_map: PeerMap::new(), + tcp_punch: tcp_punch, + }; tokio::spawn(async move { while let Some(Ok(bytes)) = b.next().await { if let Ok(msg_in) = parse_from_bytes::(&bytes) { match msg_in.union { Some(rendezvous_message::Union::punch_hole_request(ph)) => { - tx.send((addr, ph.id)).ok(); + allow_err!(tx.send((addr, ph.id))); + } + Some(rendezvous_message::Union::punch_hole_sent(phs)) => { + allow_err!(rs.handle_hole_sent(&phs, addr, None).await); + break; + } + Some(rendezvous_message::Union::local_addr(la)) => { + allow_err!(rs.handle_local_addr(&la, addr, None).await); + break; } _ => {} } } } - tcp_punch.lock().unwrap().remove(&addr); + rs.tcp_punch.lock().unwrap().remove(&addr); log::debug!("Tcp connection from {:?} closed", addr); }); } @@ -106,28 +118,10 @@ impl RendezvousServer { self.handle_punch_hole_request(addr, &ph.id, socket).await?; } Some(rendezvous_message::Union::punch_hole_sent(phs)) => { - // punch hole sent from B, tell A that B is ready to be connected - let addr_a = AddrMangle::decode(&phs.socket_addr); - log::debug!("Punch hole response to {:?} from {:?}", &addr_a, &addr); - let mut msg_out = RendezvousMessage::new(); - msg_out.set_punch_hole_response(PunchHoleResponse { - socket_addr: AddrMangle::encode(addr), - ..Default::default() - }); - socket.send(&msg_out, addr_a).await?; - self.send_to_tcp(&msg_out, addr_a).await?; + self.handle_hole_sent(&phs, addr, Some(socket)).await?; } Some(rendezvous_message::Union::local_addr(la)) => { - // forward local addrs of B to A - let addr_a = AddrMangle::decode(&la.socket_addr); - log::debug!("Local addrs response to {:?} from {:?}", &addr_a, &addr); - let mut msg_out = RendezvousMessage::new(); - msg_out.set_punch_hole_response(PunchHoleResponse { - socket_addr: la.local_addr, - ..Default::default() - }); - socket.send(&msg_out, addr_a).await?; - self.send_to_tcp(&msg_out, addr_a).await?; + self.handle_local_addr(&la, addr, Some(socket)).await?; } _ => {} } @@ -135,6 +129,60 @@ impl RendezvousServer { Ok(()) } + async fn handle_hole_sent<'a>( + &mut self, + phs: &PunchHoleSent, + addr: SocketAddr, + socket: Option<&'a mut FramedSocket>, + ) -> ResultType<()> { + // punch hole sent from B, tell A that B is ready to be connected + let addr_a = AddrMangle::decode(&phs.socket_addr); + log::debug!( + "{} punch hole response to {:?} from {:?}", + if socket.is_none() { "TCP" } else { "UDP" }, + &addr_a, + &addr + ); + let mut msg_out = RendezvousMessage::new(); + msg_out.set_punch_hole_response(PunchHoleResponse { + socket_addr: AddrMangle::encode(addr), + ..Default::default() + }); + if let Some(socket) = socket { + socket.send(&msg_out, addr_a).await?; + } else { + self.send_to_tcp(&msg_out, addr_a).await?; + } + Ok(()) + } + + async fn handle_local_addr<'a>( + &mut self, + la: &LocalAddr, + addr: SocketAddr, + socket: Option<&'a mut FramedSocket>, + ) -> ResultType<()> { + // forward local addrs of B to A + let addr_a = AddrMangle::decode(&la.socket_addr); + log::debug!( + "{} local addrs response to {:?} from {:?}", + if socket.is_none() { "TCP" } else { "UDP" }, + &addr_a, + &addr + ); + let mut msg_out = RendezvousMessage::new(); + msg_out.set_punch_hole_response(PunchHoleResponse { + socket_addr: la.local_addr.clone(), + ..Default::default() + }); + if let Some(socket) = socket { + socket.send(&msg_out, addr_a).await?; + } else { + self.send_to_tcp(&msg_out, addr_a).await?; + } + Ok(()) + } + async fn handle_punch_hole_request( &mut self, addr: SocketAddr,