From d8b829818e1c7fa2ec8c3a2ac64364977ce69882 Mon Sep 17 00:00:00 2001 From: open-trade Date: Sat, 7 Mar 2020 22:45:22 +0800 Subject: [PATCH] refactor --- src/rendezvous_server.rs | 70 +++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs index cd44fbe..bc36a06 100644 --- a/src/rendezvous_server.rs +++ b/src/rendezvous_server.rs @@ -1,5 +1,5 @@ use super::message_proto::*; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use futures::SinkExt; use protobuf::{parse_from_bytes, Message as _}; use std::{ @@ -8,11 +8,7 @@ use std::{ net::{Ipv4Addr, SocketAddr, SocketAddrV4}, time::{Duration, SystemTime, UNIX_EPOCH}, }; -use tokio::{ - net::UdpSocket, - stream::StreamExt, - time::{self, delay_for}, -}; +use tokio::{net::UdpSocket, stream::StreamExt, time::delay_for}; use tokio_util::{codec::BytesCodec, udp::UdpFramed}; /// Certain router and firewalls scan the packet and if they @@ -74,43 +70,43 @@ impl RendezvousServer { peer_map: PeerMap::new(), }; while let Some(Ok((bytes, addr))) = socket.next().await { - if let SocketAddr::V4(addr_v4) = addr { - if let Ok(msg_in) = parse_from_bytes::(&bytes) { - match msg_in.union { - Some(Message_oneof_union::register_peer(rp)) => { - if rp.hbb_addr.len() > 0 { - rs.peer_map.insert( - rp.hbb_addr, - Peer { - socket_addr: addr_v4, - }, - ); - } - } - Some(Message_oneof_union::peek_peer(pp)) => { - rs.handle_peek_peer(&pp, addr, &mut socket).await?; - } - _ => {} - } - } - } + rs.handle_msg(&bytes, addr, &mut socket).await?; } Ok(()) } - pub async fn handle_peek_peer( - &self, - pp: &PeekPeer, + pub async fn handle_msg( + &mut self, + bytes: &BytesMut, addr: SocketAddr, socket: &mut FramedSocket, ) -> ResultType { - if let Some(peer) = self.peer_map.get(&pp.hbb_addr) { - let mut msg_out = Message::new(); - msg_out.set_peek_peer_response(PeekPeerResponse { - socket_addr: V4AddrMangle::encode(&peer.socket_addr), - ..Default::default() - }); - send_to(&msg_out, addr, socket).await?; + if let Ok(msg_in) = parse_from_bytes::(&bytes) { + if let SocketAddr::V4(addr_v4) = addr { + match msg_in.union { + Some(Message_oneof_union::register_peer(rp)) => { + if rp.hbb_addr.len() > 0 { + self.peer_map.insert( + rp.hbb_addr, + Peer { + socket_addr: addr_v4, + }, + ); + } + } + Some(Message_oneof_union::peek_peer(pp)) => { + if let Some(peer) = self.peer_map.get(&pp.hbb_addr) { + let mut msg_out = Message::new(); + msg_out.set_peek_peer_response(PeekPeerResponse { + socket_addr: V4AddrMangle::encode(&peer.socket_addr), + ..Default::default() + }); + send_to(&msg_out, addr, socket).await?; + } + } + _ => {} + } + } } Ok(()) } @@ -159,7 +155,7 @@ mod tests { }); send_to(&msg_out, to_addr, &mut socket).await; if let Ok(Some(Ok((bytes, _)))) = - time::timeout(Duration::from_millis(1), socket.next()).await + tokio::time::timeout(Duration::from_millis(1), socket.next()).await { if let Ok(msg_in) = parse_from_bytes::(&bytes) { assert_eq!(