local addrs

This commit is contained in:
open-trade 2020-03-12 19:25:37 +08:00
parent 5e5d8927e2
commit 983486d0ed
2 changed files with 63 additions and 23 deletions

@ -1 +1 @@
Subproject commit 489ff100300235ef3b9e7820f1b13dadc75be5f8
Subproject commit 56d07b3457150de0caa0d08d17b99ec3d267ef61

View File

@ -1,5 +1,5 @@
use hbb_common::{
bytes::BytesMut, log, message_proto::*, protobuf::parse_from_bytes, udp::FramedSocket,
bytes::BytesMut, log, protobuf::parse_from_bytes, rendezvous_proto::*, udp::FramedSocket,
AddrMangle, ResultType,
};
use std::{collections::HashMap, net::SocketAddr};
@ -41,43 +41,83 @@ impl RendezvousServer {
addr: SocketAddr,
socket: &mut FramedSocket,
) -> ResultType<()> {
if let Ok(msg_in) = parse_from_bytes::<Message>(&bytes) {
if let Ok(msg_in) = parse_from_bytes::<RendezvousMessage>(&bytes) {
match msg_in.union {
Some(Message_oneof_union::register_peer(rp)) => {
Some(RendezvousMessage_oneof_union::register_peer(rp)) => {
// B registered
if rp.hbb_addr.len() > 0 {
log::debug!("New peer registered: {:?} {:?}", &rp.hbb_addr, &addr);
self.peer_map
.insert(rp.hbb_addr, Peer { socket_addr: addr });
}
}
Some(Message_oneof_union::punch_hole_request(ph)) => {
// punch hole request from A, forward to B
Some(RendezvousMessage_oneof_union::punch_hole_request(ph)) => {
// punch hole request from A, forward to B,
// check if in same intranet first,
// fetch local addrs if in same intranet.
// because punch hole won't work if in the same intranet,
// all routers will drop such self-connections.
if let Some(peer) = self.peer_map.get(&ph.hbb_addr) {
log::debug!(
"Punch hole {:?} {:?} request from {:?}",
&ph.hbb_addr,
&peer.socket_addr,
&addr
);
let mut msg_out = Message::new();
msg_out.set_punch_hole(PunchHole {
socket_addr: AddrMangle::encode(&addr),
..Default::default()
});
let mut msg_out = RendezvousMessage::new();
let same_intranet = match peer.socket_addr {
SocketAddr::V4(a) => match addr {
SocketAddr::V4(b) => a.ip() == b.ip(),
_ => false,
},
SocketAddr::V6(a) => match addr {
SocketAddr::V6(b) => a.ip() == b.ip(),
_ => false,
},
};
let socket_addr = AddrMangle::encode(&addr);
if same_intranet {
log::debug!(
"Fetch local addrs {:?} {:?} request from {:?}",
&ph.hbb_addr,
&peer.socket_addr,
&addr
);
msg_out.set_fetch_local_addrs(FetchLocalAddrs {
socket_addr,
..Default::default()
});
} else {
log::debug!(
"Punch hole {:?} {:?} request from {:?}",
&ph.hbb_addr,
&peer.socket_addr,
&addr
);
msg_out.set_punch_hole(PunchHole {
socket_addr,
..Default::default()
});
}
socket.send(&msg_out, peer.socket_addr).await?;
}
}
Some(Message_oneof_union::punch_hole_sent(phs)) => {
// punch hole sent from B, tell A that B ready
Some(RendezvousMessage_oneof_union::punch_hole_sent(phs)) => {
// punch hole sent from B, tell A that B is ready to be connected
let addr_a = AddrMangle::decode(&phs.socket_addr);
log::debug!("Punch hole response to {:?} from {:?}", &addr_a, &addr);
let mut msg_out = Message::new();
let mut msg_out = RendezvousMessage::new();
msg_out.set_punch_hole_response(PunchHoleResponse {
socket_addr: AddrMangle::encode(&addr),
..Default::default()
});
socket.send(&msg_out, addr_a).await?;
}
Some(RendezvousMessage_oneof_union::local_addrs(la)) => {
// forward local addrs of B to A
let addr_a = AddrMangle::decode(&la.socket_addr);
log::debug!("Local addrs response to {:?} from {:?}", &addr_a, &addr);
let mut msg_out = RendezvousMessage::new();
msg_out.set_local_addrs_response(LocalAddrsResponse {
socket_addrs: la.local_addrs,
..Default::default()
});
socket.send(&msg_out, addr_a).await?;
}
_ => {}
}
}
@ -110,7 +150,7 @@ mod tests {
// B register it to server
let mut socket_b = FramedSocket::new("127.0.0.1:0").await?;
let local_addr_b = socket_b.get_ref().local_addr().unwrap();
let mut msg_out = Message::new();
let mut msg_out = RendezvousMessage::new();
msg_out.set_register_peer(RegisterPeer {
hbb_addr: "123".to_string(),
..Default::default()
@ -134,7 +174,7 @@ mod tests {
// on B side, responsed to A's punch request forwarded from server
if let Some(Ok((bytes, addr))) = socket_b.next_timeout(1000).await {
assert_eq!(addr_server, addr);
let msg_in = parse_from_bytes::<Message>(&bytes)?;
let msg_in = parse_from_bytes::<RendezvousMessage>(&bytes)?;
let remote_addr_a = AddrMangle::decode(&msg_in.get_punch_hole().socket_addr);
assert_eq!(local_addr_a, remote_addr_a);
@ -157,7 +197,7 @@ mod tests {
socket_a.next().await; // skip "SYN"
if let Some(Ok((bytes, addr))) = socket_a.next_timeout(1000).await {
assert_eq!(addr_server, addr);
let msg_in = parse_from_bytes::<Message>(&bytes)?;
let msg_in = parse_from_bytes::<RendezvousMessage>(&bytes)?;
let remote_addr_b = AddrMangle::decode(&msg_in.get_punch_hole_response().socket_addr);
assert_eq!(local_addr_b, remote_addr_b);
} else {