From 69f60499dbef16a7976d9eff737254509bf7fc0b Mon Sep 17 00:00:00 2001 From: open-trade Date: Mon, 9 Mar 2020 23:54:36 +0800 Subject: [PATCH] refactored --- Cargo.lock | 7 -- Cargo.toml | 1 - libs/hbb_common | 2 +- src/rendezvous_server.rs | 143 +++++++++++++++++++-------------------- 4 files changed, 69 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e182b4c..b25168f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -170,7 +170,6 @@ name = "hbbs" version = "0.1.0" dependencies = [ "hbb_common 0.1.0", - "simple-error 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -405,11 +404,6 @@ dependencies = [ "libc 0.2.67 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "simple-error" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "slab" version = "0.4.2" @@ -599,7 +593,6 @@ dependencies = [ "checksum regex 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "322cf97724bea3ee221b78fe25ac9c46114ebb51747ad5babd51a2fc6a8235a8" "checksum regex-syntax 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)" = "1132f845907680735a84409c3bebc64d1364a5683ffbce899550cd09d5eaefc1" "checksum signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41" -"checksum simple-error 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "339844c9af2d844b9230bb28e8f819a7790cbf20a29b5cbd2b59916a03a1ef51" "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" "checksum socket2 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "e8b74de517221a2cb01a53349cf54182acdc31a074727d3079068448c0676d85" "checksum syn 1.0.16 (registry+https://github.com/rust-lang/crates.io-index)" = "123bd9499cfb380418d509322d7a6d52e5315f064fe4b3ad18a53d6b92c07859" diff --git a/Cargo.toml b/Cargo.toml index 2f3c748..51acc4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,6 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -simple-error = "0.2" hbb_common = { path = "libs/hbb_common" } [workspace] diff --git a/libs/hbb_common b/libs/hbb_common index 7ba274f..4a2f7f1 160000 --- a/libs/hbb_common +++ b/libs/hbb_common @@ -1 +1 @@ -Subproject commit 7ba274fdaba5057fdefe6a3b47dc3255e78da3fe +Subproject commit 4a2f7f1edd9f0e49018ea4989ed8b7c3adffd50b diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs index 3f75e77..cbd5355 100644 --- a/src/rendezvous_server.rs +++ b/src/rendezvous_server.rs @@ -1,11 +1,6 @@ use hbb_common::{ - bytes::BytesMut, - log, - message_proto::*, - protobuf::parse_from_bytes, - tokio::{net::UdpSocket, stream::StreamExt}, - udp::FramedSocket, - AddrMangle, ResultType, + bytes::BytesMut, log, message_proto::*, protobuf::parse_from_bytes, tokio::net::UdpSocket, + udp::FramedSocket, AddrMangle, ResultType, }; use std::{collections::HashMap, net::SocketAddr}; @@ -86,7 +81,7 @@ impl RendezvousServer { mod tests { use super::*; use hbb_common::tokio; - use std::time::Duration; + use std::io::{Error, ErrorKind}; #[allow(unused_must_use)] #[tokio::main] @@ -100,76 +95,74 @@ mod tests { let addr_server = format!("127.0.0.1:{}", port_server); let f1 = RendezvousServer::start(&addr_server); let addr_server = addr_server.parse().unwrap(); - let f2 = async { - // B register it to server - let socket_b = UdpSocket::bind("127.0.0.1:0").await.unwrap(); - let local_addr_b = socket_b.local_addr().unwrap(); - let mut socket_b = FramedSocket::new(socket_b); - let mut msg_out = Message::new(); - msg_out.set_register_peer(RegisterPeer { - hbb_addr: "123".to_string(), - ..Default::default() - }); - socket_b.send(&msg_out, addr_server).await; - - // A send punch request to server - let socket_a = UdpSocket::bind("127.0.0.1:0").await.unwrap(); - let local_addr_a = socket_a.local_addr().unwrap(); - let mut socket_a = FramedSocket::new(socket_a); - msg_out.set_punch_hole_request(PunchHoleRequest { - hbb_addr: "123".to_string(), - ..Default::default() - }); - socket_a.send(&msg_out, addr_server).await; - - println!( - "A {:?} request punch hole to B {:?} via server {:?}", - local_addr_a, local_addr_b, addr_server, - ); - - // on B side, responsed to A's punch request forwarded from server - if let Ok(Some(Ok((bytes, addr)))) = - tokio::time::timeout(Duration::from_millis(1000), socket_b.next()).await - { - assert_eq!(addr_server, addr); - let msg_in = parse_from_bytes::(&bytes).unwrap(); - let remote_addr_a = AddrMangle::decode(&msg_in.get_punch_hole().socket_addr[..]); - assert_eq!(local_addr_a, remote_addr_a); - - // B punch A - socket_b - .get_mut() - .send_to(&b"SYN"[..], &remote_addr_a) - .await; - - msg_out.set_punch_hole_sent(PunchHoleSent { - socket_addr: AddrMangle::encode(&remote_addr_a), - ..Default::default() - }); - socket_b.send(&msg_out, addr_server).await; - } - - // on A side - socket_a.next().await; // skip "SYN" - if let Ok(Some(Ok((bytes, addr)))) = - tokio::time::timeout(Duration::from_millis(1000), socket_a.next()).await - { - assert_eq!(addr_server, addr); - let msg_in = parse_from_bytes::(&bytes).unwrap(); - let remote_addr_b = - AddrMangle::decode(&msg_in.get_punch_hole_response().socket_addr[..]); - assert_eq!(local_addr_b, remote_addr_b); - } - - if true { - Err(Box::new(simple_error::SimpleError::new("done"))) - } else { - Ok(()) - } - }; + let f2 = punch_hole(addr_server); tokio::try_join!(f1, f2); } + async fn punch_hole(addr_server: SocketAddr) -> ResultType<()> { + // B register it to server + let socket_b = UdpSocket::bind("127.0.0.1:0").await?; + let local_addr_b = socket_b.local_addr().unwrap(); + let mut socket_b = FramedSocket::new(socket_b); + let mut msg_out = Message::new(); + msg_out.set_register_peer(RegisterPeer { + hbb_addr: "123".to_string(), + ..Default::default() + }); + socket_b.send(&msg_out, addr_server).await?; + + // A send punch request to server + let socket_a = UdpSocket::bind("127.0.0.1:0").await?; + let local_addr_a = socket_a.local_addr().unwrap(); + let mut socket_a = FramedSocket::new(socket_a); + msg_out.set_punch_hole_request(PunchHoleRequest { + hbb_addr: "123".to_string(), + ..Default::default() + }); + socket_a.send(&msg_out, addr_server).await?; + + println!( + "A {:?} request punch hole to B {:?} via server {:?}", + local_addr_a, local_addr_b, addr_server, + ); + + // on B side, responsed to A's punch request forwarded from server + if let Some(Ok((bytes, addr))) = socket_b.next_timeout(1000).await { + assert_eq!(addr_server, addr); + let msg_in = parse_from_bytes::(&bytes)?; + let remote_addr_a = AddrMangle::decode(&msg_in.get_punch_hole().socket_addr[..]); + assert_eq!(local_addr_a, remote_addr_a); + + // B punch A + socket_b + .get_mut() + .send_to(&b"SYN"[..], &remote_addr_a) + .await?; + + msg_out.set_punch_hole_sent(PunchHoleSent { + socket_addr: AddrMangle::encode(&remote_addr_a), + ..Default::default() + }); + socket_b.send(&msg_out, addr_server).await?; + } else { + panic!("failed"); + } + + // on A side + socket_a.next().await; // skip "SYN" + if let Some(Ok((bytes, addr))) = socket_a.next_timeout(1000).await { + assert_eq!(addr_server, addr); + let msg_in = parse_from_bytes::(&bytes)?; + let remote_addr_b = + AddrMangle::decode(&msg_in.get_punch_hole_response().socket_addr[..]); + assert_eq!(local_addr_b, remote_addr_b); + } else { + panic!("failed"); + } + + Err(Box::new(Error::new(ErrorKind::Other, "done"))) + } + #[test] fn test_rs() { self::test_rs_async();