refacor relay_response to support initiate relay from server side

This commit is contained in:
open-trade 2020-09-27 08:54:43 +08:00
parent 8d4949232a
commit 77b1072c7d
2 changed files with 26 additions and 25 deletions

@ -1 +1 @@
Subproject commit ef0bf8e43ad9017d5b7851ad0f261b6503554e65 Subproject commit 661e7303cc4ce14cc2a8e812200e2464a4da1c32

View File

@ -181,7 +181,7 @@ impl RendezvousServer {
} else { } else {
break; break;
} }
allow_err!(rs.handle_tcp_punch_hole_request(addr, ph.id).await); allow_err!(rs.handle_tcp_punch_hole_request(addr, ph).await);
} }
Some(rendezvous_message::Union::request_relay(mut rf)) => { Some(rendezvous_message::Union::request_relay(mut rf)) => {
if let Some(sender) = sender.take() { if let Some(sender) = sender.take() {
@ -196,11 +196,17 @@ impl RendezvousServer {
rs.tx.send((msg_out, peer.socket_addr)).ok(); rs.tx.send((msg_out, peer.socket_addr)).ok();
} }
} }
Some(rendezvous_message::Union::request_relay_response(mut rfr)) => { Some(rendezvous_message::Union::relay_response(mut rr)) => {
let addr_b = AddrMangle::decode(&rfr.socket_addr); let addr_b = AddrMangle::decode(&rr.socket_addr);
rfr.socket_addr = Default::default(); rr.socket_addr = Default::default();
let id = rr.get_id();
if !id.is_empty() {
if let Some(peer) = rs.pm.get(&id).await {
rr.set_pk(peer.pk.clone());
}
}
let mut msg_out = RendezvousMessage::new(); let mut msg_out = RendezvousMessage::new();
msg_out.set_request_relay_response(rfr); msg_out.set_relay_response(rr);
allow_err!(rs.send_to_tcp_sync(&msg_out, addr_b).await); allow_err!(rs.send_to_tcp_sync(&msg_out, addr_b).await);
break; break;
} }
@ -297,14 +303,13 @@ impl RendezvousServer {
socket.send(&msg_out, addr).await? socket.send(&msg_out, addr).await?
} }
Some(rendezvous_message::Union::punch_hole_request(ph)) => { Some(rendezvous_message::Union::punch_hole_request(ph)) => {
let id = ph.id; if self.pm.is_in_memory(&ph.id) {
if self.pm.is_in_memory(&id) { self.handle_udp_punch_hole_request(addr, ph).await?;
self.handle_udp_punch_hole_request(addr, id).await?;
} else { } else {
// not in memory, fetch from db with spawn in case blocking me // not in memory, fetch from db with spawn in case blocking me
let mut me = self.clone(); let mut me = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
allow_err!(me.handle_udp_punch_hole_request(addr, id).await); allow_err!(me.handle_udp_punch_hole_request(addr, ph).await);
}); });
} }
} }
@ -420,14 +425,10 @@ impl RendezvousServer {
Some(peer) => peer.pk, Some(peer) => peer.pk,
_ => Vec::new(), _ => Vec::new(),
}; };
let mut relay_server = phs.relay_server;
if relay_server.is_empty() {
relay_server = self.relay_server.clone();
}
let mut p = PunchHoleResponse { let mut p = PunchHoleResponse {
socket_addr: AddrMangle::encode(addr), socket_addr: AddrMangle::encode(addr),
pk, pk,
relay_server, relay_server: phs.relay_server.clone(),
..Default::default() ..Default::default()
}; };
if let Ok(t) = phs.nat_type.enum_value() { if let Ok(t) = phs.nat_type.enum_value() {
@ -458,13 +459,9 @@ impl RendezvousServer {
&addr &addr
); );
let mut msg_out = RendezvousMessage::new(); let mut msg_out = RendezvousMessage::new();
let mut relay_server = la.relay_server;
if relay_server.is_empty() {
relay_server = self.relay_server.clone();
}
let mut p = PunchHoleResponse { let mut p = PunchHoleResponse {
socket_addr: la.local_addr.clone(), socket_addr: la.local_addr.clone(),
relay_server, relay_server: la.relay_server,
..Default::default() ..Default::default()
}; };
p.set_is_local(true); p.set_is_local(true);
@ -481,8 +478,9 @@ impl RendezvousServer {
async fn handle_punch_hole_request( async fn handle_punch_hole_request(
&mut self, &mut self,
addr: SocketAddr, addr: SocketAddr,
id: String, ph: PunchHoleRequest,
) -> ResultType<(RendezvousMessage, Option<SocketAddr>)> { ) -> ResultType<(RendezvousMessage, Option<SocketAddr>)> {
let id = ph.id;
// punch hole request from A, relay to B, // punch hole request from A, relay to B,
// check if in same intranet first, // check if in same intranet first,
// fetch local addrs if in same intranet. // fetch local addrs if in same intranet.
@ -518,6 +516,7 @@ impl RendezvousServer {
); );
msg_out.set_fetch_local_addr(FetchLocalAddr { msg_out.set_fetch_local_addr(FetchLocalAddr {
socket_addr, socket_addr,
relay_server: self.relay_server.clone(),
..Default::default() ..Default::default()
}); });
} else { } else {
@ -529,6 +528,8 @@ impl RendezvousServer {
); );
msg_out.set_punch_hole(PunchHole { msg_out.set_punch_hole(PunchHole {
socket_addr, socket_addr,
nat_type: ph.nat_type,
relay_server: self.relay_server.clone(),
..Default::default() ..Default::default()
}); });
} }
@ -574,9 +575,9 @@ impl RendezvousServer {
async fn handle_tcp_punch_hole_request( async fn handle_tcp_punch_hole_request(
&mut self, &mut self,
addr: SocketAddr, addr: SocketAddr,
id: String, ph: PunchHoleRequest,
) -> ResultType<()> { ) -> ResultType<()> {
let (msg, to_addr) = self.handle_punch_hole_request(addr, id).await?; let (msg, to_addr) = self.handle_punch_hole_request(addr, ph).await?;
if let Some(addr) = to_addr { if let Some(addr) = to_addr {
self.tx.send((msg, addr))?; self.tx.send((msg, addr))?;
} else { } else {
@ -589,9 +590,9 @@ impl RendezvousServer {
async fn handle_udp_punch_hole_request( async fn handle_udp_punch_hole_request(
&mut self, &mut self,
addr: SocketAddr, addr: SocketAddr,
id: String, ph: PunchHoleRequest,
) -> ResultType<()> { ) -> ResultType<()> {
let (msg, to_addr) = self.handle_punch_hole_request(addr, id).await?; let (msg, to_addr) = self.handle_punch_hole_request(addr, ph).await?;
self.tx.send(( self.tx.send((
msg, msg,
match to_addr { match to_addr {