diff --git a/.gitignore b/.gitignore index b32c2cd..4565963 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ db* debian-build debian/.debhelper debian/debhelper-build-stamp +.DS_Store diff --git a/Cargo.lock b/Cargo.lock index 56d59da..9c7bc1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -194,9 +194,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +checksum = "f0b3de4a0c5e67e16066a0715723abd91edc2f9001d09c46e1dca929351e130e" [[package]] name = "cc" @@ -749,7 +749,7 @@ dependencies = [ "log", "mac_address", "protobuf", - "protobuf-codegen-pure", + "protobuf-codegen", "quinn", "rand", "regex", @@ -760,7 +760,7 @@ dependencies = [ "sodiumoxide", "tokio", "tokio-socks", - "tokio-util 0.6.9", + "tokio-util 0.7.1", "toml", "winapi", "zstd", @@ -1463,60 +1463,56 @@ dependencies = [ [[package]] name = "protobuf" -version = "3.0.0-alpha.2" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5ef59c35c7472ce5e1b6c5924b87585143d1fc2cf39eae0009bba6c4df62f1" +checksum = "4ee4a7d8b91800c8f167a6268d1a1026607368e1adc84e98fe044aeb905302f7" +dependencies = [ + "bytes", + "once_cell", + "protobuf-support", + "thiserror", +] [[package]] name = "protobuf-codegen" -version = "3.0.0-alpha.2" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89100ee819f69b77a4cab389fec9dd155a305af4c615e6413ec1ef9341f333ef" +checksum = "07b893e5e7d3395545d5244f8c0d33674025bd566b26c03bfda49b82c6dec45e" dependencies = [ "anyhow", + "once_cell", "protobuf", "protobuf-parse", - "thiserror", -] - -[[package]] -name = "protobuf-codegen-pure" -version = "3.0.0-alpha.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79453e74d08190551e821533ee42c447f9e21ca26f83520e120e6e8af27f6879" -dependencies = [ - "anyhow", - "protobuf", - "protobuf-codegen", - "protobuf-parse", - "thiserror", -] - -[[package]] -name = "protobuf-parse" -version = "3.0.0-alpha.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c265ffc69976efc3056955b881641add3186ad0be893ef10622482d80d1d2b68" -dependencies = [ - "anyhow", - "protobuf", - "protoc", + "regex", "tempfile", "thiserror", ] [[package]] -name = "protoc" -version = "3.0.0-alpha.2" +name = "protobuf-parse" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f1f8b318a54d18fbe542513331e058f4f8ce6502e542e057c50c7e5e803fdab" +checksum = "9b1447dd751c434cc1b415579837ebd0411ed7d67d465f38010da5d7cd33af4d" dependencies = [ "anyhow", + "indexmap", "log", + "protobuf", + "protobuf-support", + "tempfile", "thiserror", "which", ] +[[package]] +name = "protobuf-support" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ca157fe12fc7ee2e315f2f735e27df41b3d97cdd70ea112824dac1ffb08ee1c" +dependencies = [ + "thiserror", +] + [[package]] name = "punycode" version = "0.4.1" @@ -2197,10 +2193,11 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.18.2" +version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4903bf0427cf68dddd5aa6a93220756f8be0c34fcfa9f5e6191e103e15a31395" +checksum = "57aec3cfa4c296db7255446efb4928a6be304b431a806216105542a67b6ca82e" dependencies = [ + "autocfg", "bytes", "libc", "memchr", @@ -2284,11 +2281,9 @@ checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" dependencies = [ "bytes", "futures-core", - "futures-io", "futures-sink", "log", "pin-project-lite", - "slab", "tokio", ] @@ -2300,9 +2295,13 @@ checksum = "0edfdeb067411dba2044da6d1cb2df793dd35add7888d73c16e3381ded401764" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", + "futures-util", "pin-project-lite", + "slab", "tokio", + "tracing", ] [[package]] diff --git a/libs/hbb_common/Cargo.toml b/libs/hbb_common/Cargo.toml index bc31223..4b28fc1 100644 --- a/libs/hbb_common/Cargo.toml +++ b/libs/hbb_common/Cargo.toml @@ -7,11 +7,11 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -protobuf = "3.0.0-alpha.2" -tokio = { version = "1.15", features = ["full"] } -tokio-util = { version = "0.6", features = ["full"] } +protobuf = { version = "3.1", features = ["with-bytes"] } +tokio = { version = "1.20", features = ["full"] } +tokio-util = { version = "0.7", features = ["full"] } futures = "0.3" -bytes = "1.1" +bytes = "1.2" log = "0.4" env_logger = "0.9" socket2 = { version = "0.3", features = ["reuseport"] } @@ -38,7 +38,7 @@ mac_address = "1.1" quic = [] [build-dependencies] -protobuf-codegen-pure = "3.0.0-alpha.2" +protobuf-codegen = "3.1" [target.'cfg(target_os = "windows")'.dependencies] winapi = { version = "0.3", features = ["winuser"] } diff --git a/libs/hbb_common/build.rs b/libs/hbb_common/build.rs index 99dacb7..225ec34 100644 --- a/libs/hbb_common/build.rs +++ b/libs/hbb_common/build.rs @@ -1,9 +1,14 @@ fn main() { std::fs::create_dir_all("src/protos").unwrap(); - protobuf_codegen_pure::Codegen::new() + protobuf_codegen::Codegen::new() + .pure() .out_dir("src/protos") .inputs(&["protos/rendezvous.proto", "protos/message.proto"]) .include("protos") + .customize( + protobuf_codegen::Customize::default() + .tokio_bytes(true) + ) .run() .expect("Codegen failed."); } diff --git a/src/common.rs b/src/common.rs index 74b9e78..253d3d1 100644 --- a/src/common.rs +++ b/src/common.rs @@ -3,10 +3,9 @@ use hbb_common::{anyhow::Context, log, ResultType}; use ini::Ini; use sodiumoxide::crypto::sign; use std::{ - collections::HashMap, io::prelude::*, io::Read, - net::{IpAddr, SocketAddr}, + net::SocketAddr, time::{Instant, SystemTime}, }; diff --git a/src/database.rs b/src/database.rs index cf64faf..41ad5e3 100644 --- a/src/database.rs +++ b/src/database.rs @@ -8,9 +8,7 @@ use std::{ops::DerefMut, str::FromStr}; //use sqlx::postgres::PgPoolOptions; //use sqlx::mysql::MySqlPoolOptions; -pub(crate) type DB = sqlx::Sqlite; pub(crate) type MapValue = serde_json::map::Map; -pub(crate) type MapStr = std::collections::HashMap; type Pool = deadpool::managed::Pool; pub struct DbPool { @@ -107,13 +105,6 @@ impl Database { .await?) } - pub async fn get_peer_id(&self, guid: &[u8]) -> ResultType> { - Ok(sqlx::query!("select id from peer where guid = ?", guid) - .fetch_optional(self.pool.get().await?.deref_mut()) - .await? - .map(|x| x.id)) - } - #[inline] pub async fn get_conn(&self) -> ResultType> { Ok(self.pool.get().await?) @@ -135,8 +126,8 @@ impl Database { pub async fn insert_peer( &self, id: &str, - uuid: &Vec, - pk: &Vec, + uuid: &[u8], + pk: &[u8], info: &str, ) -> ResultType> { let guid = uuid::Uuid::new_v4().as_bytes().to_vec(); @@ -157,7 +148,7 @@ impl Database { &self, guid: &Vec, id: &str, - pk: &Vec, + pk: &[u8], info: &str, ) -> ResultType<()> { sqlx::query!( @@ -209,13 +200,6 @@ mod tests { } } -#[inline] -pub fn guid2str(guid: &Vec) -> String { - let mut bytes = [0u8; 16]; - bytes[..].copy_from_slice(&guid); - uuid::Uuid::from_bytes(bytes).to_string() -} - pub(crate) fn get_str(v: &Value) -> Option<&str> { match v { Value::String(v) => { diff --git a/src/peer.rs b/src/peer.rs index 72999b7..49d440d 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -4,6 +4,7 @@ use hbb_common::{ log, rendezvous_proto::*, tokio::sync::{Mutex, RwLock}, + bytes::Bytes, ResultType, }; use serde_derive::{Deserialize, Serialize}; @@ -25,13 +26,12 @@ pub(crate) struct PeerInfo { pub(crate) ip: String, } -#[derive(Clone, Debug)] pub(crate) struct Peer { pub(crate) socket_addr: SocketAddr, pub(crate) last_reg_time: Instant, pub(crate) guid: Vec, - pub(crate) uuid: Vec, - pub(crate) pk: Vec, + pub(crate) uuid: Bytes, + pub(crate) pk: Bytes, pub(crate) user: Option>, pub(crate) info: PeerInfo, pub(crate) disabled: bool, @@ -44,8 +44,8 @@ impl Default for Peer { socket_addr: "0.0.0.0:0".parse().unwrap(), last_reg_time: get_expired_time(), guid: Vec::new(), - uuid: Vec::new(), - pk: Vec::new(), + uuid: Bytes::new(), + pk: Bytes::new(), info: Default::default(), user: None, disabled: false, @@ -93,8 +93,8 @@ impl PeerMap { id: String, peer: LockPeer, addr: SocketAddr, - uuid: Vec, - pk: Vec, + uuid: Bytes, + pk: Bytes, ip: String, ) -> register_pk_response::Result { log::info!("update_pk {} {:?} {:?} {:?}", id, addr, uuid, pk); @@ -139,8 +139,8 @@ impl PeerMap { if let Ok(Some(v)) = self.db.get_peer(id).await { let peer = Peer { guid: v.guid, - uuid: v.uuid, - pk: v.pk, + uuid: v.uuid.into(), + pk: v.pk.into(), user: v.user, info: serde_json::from_str::(&v.info).unwrap_or_default(), disabled: v.status == Some(0), @@ -177,9 +177,4 @@ impl PeerMap { pub(crate) async fn is_in_memory(&self, id: &str) -> bool { self.map.read().await.contains_key(id) } - - #[inline] - pub(crate) async fn remove(&self, id: &str) { - self.map.write().await.remove(id); - } } diff --git a/src/relay_server.rs b/src/relay_server.rs index b6c677c..7b198d6 100644 --- a/src/relay_server.rs +++ b/src/relay_server.rs @@ -420,7 +420,7 @@ async fn make_pair_(stream: impl StreamTrait, addr: SocketAddr, key: &str, limit let mut stream = stream; if let Ok(Some(Ok(bytes))) = timeout(30_000, stream.recv()).await { if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) { - if let Some(rendezvous_message::Union::request_relay(rf)) = msg_in.union { + if let Some(rendezvous_message::Union::RequestRelay(rf)) = msg_in.union { if !key.is_empty() && rf.licence_key != key { return; } diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs index 08c6c85..60f95d9 100644 --- a/src/rendezvous_server.rs +++ b/src/rendezvous_server.rs @@ -89,7 +89,12 @@ enum LoopFailure { impl RendezvousServer { #[tokio::main(flavor = "multi_thread")] - pub async fn start(port: i32, serial: i32, key: &str, rmem: usize) -> ResultType<()> { + pub async fn start( + port: i32, + serial: i32, + key: &str, + rmem: usize, + ) -> ResultType<()> { let (key, sk) = Self::get_server_sk(key); let addr = format!("0.0.0.0:{}", port); let addr2 = format!("0.0.0.0:{}", port - 1); @@ -138,7 +143,6 @@ impl RendezvousServer { log::info!("local-ip: {:?}", rs.inner.local_ip); std::env::set_var("PORT_FOR_API", port.to_string()); rs.parse_relay_servers(&get_arg("relay-servers")); - let pm = rs.pm.clone(); let mut listener = new_listener(&addr, false).await?; let mut listener2 = new_listener(&addr2, false).await?; let mut listener3 = new_listener(&addr3, false).await?; @@ -299,7 +303,7 @@ impl RendezvousServer { ) -> ResultType<()> { if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) { match msg_in.union { - Some(rendezvous_message::Union::register_peer(rp)) => { + Some(rendezvous_message::Union::RegisterPeer(rp)) => { // B registered if rp.id.len() > 0 { log::trace!("New peer registered: {:?} {:?}", &rp.id, &addr); @@ -315,7 +319,7 @@ impl RendezvousServer { } } } - Some(rendezvous_message::Union::register_pk(rk)) => { + Some(rendezvous_message::Union::RegisterPk(rk)) => { if rk.uuid.is_empty() || rk.pk.is_empty() { return Ok(()); } @@ -402,7 +406,7 @@ impl RendezvousServer { }); socket.send(&msg_out, addr).await? } - Some(rendezvous_message::Union::punch_hole_request(ph)) => { + Some(rendezvous_message::Union::PunchHoleRequest(ph)) => { if self.pm.is_in_memory(&ph.id).await { self.handle_udp_punch_hole_request(addr, ph, key).await?; } else { @@ -414,13 +418,13 @@ impl RendezvousServer { }); } } - Some(rendezvous_message::Union::punch_hole_sent(phs)) => { + Some(rendezvous_message::Union::PunchHoleSent(phs)) => { self.handle_hole_sent(phs, addr, Some(socket)).await?; } - Some(rendezvous_message::Union::local_addr(la)) => { + Some(rendezvous_message::Union::LocalAddr(la)) => { self.handle_local_addr(la, addr, Some(socket)).await?; } - Some(rendezvous_message::Union::configure_update(mut cu)) => { + Some(rendezvous_message::Union::ConfigureUpdate(mut cu)) => { if addr.ip() == ADDR_127 && cu.serial > self.inner.serial { let mut inner: Inner = (*self.inner).clone(); inner.serial = cu.serial; @@ -441,7 +445,7 @@ impl RendezvousServer { ); } } - Some(rendezvous_message::Union::software_update(su)) => { + Some(rendezvous_message::Union::SoftwareUpdate(su)) => { if !self.inner.version.is_empty() && su.url != self.inner.version { let mut msg_out = RendezvousMessage::new(); msg_out.set_software_update(SoftwareUpdate { @@ -468,7 +472,7 @@ impl RendezvousServer { ) -> bool { if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) { match msg_in.union { - Some(rendezvous_message::Union::punch_hole_request(ph)) => { + Some(rendezvous_message::Union::PunchHoleRequest(ph)) => { // there maybe several attempt, so sink can be none if let Some(sink) = sink.take() { self.tcp_punch.lock().await.insert(addr, sink); @@ -476,24 +480,24 @@ impl RendezvousServer { allow_err!(self.handle_tcp_punch_hole_request(addr, ph, &key, ws).await); return true; } - Some(rendezvous_message::Union::request_relay(mut rf)) => { + Some(rendezvous_message::Union::RequestRelay(mut rf)) => { // there maybe several attempt, so sink can be none if let Some(sink) = sink.take() { self.tcp_punch.lock().await.insert(addr, sink); } if let Some(peer) = self.pm.get_in_memory(&rf.id).await { let mut msg_out = RendezvousMessage::new(); - rf.socket_addr = AddrMangle::encode(addr); + rf.socket_addr = AddrMangle::encode(addr).into(); msg_out.set_request_relay(rf); let peer_addr = peer.read().await.socket_addr; self.tx.send(Data::Msg(msg_out, peer_addr)).ok(); } return true; } - Some(rendezvous_message::Union::relay_response(mut rr)) => { + Some(rendezvous_message::Union::RelayResponse(mut rr)) => { let addr_b = AddrMangle::decode(&rr.socket_addr); rr.socket_addr = Default::default(); - let id = rr.get_id(); + let id = rr.id(); if !id.is_empty() { let pk = self.get_pk(&rr.version, id.to_owned()).await; rr.set_pk(pk); @@ -510,13 +514,13 @@ impl RendezvousServer { msg_out.set_relay_response(rr); allow_err!(self.send_to_tcp_sync(msg_out, addr_b).await); } - Some(rendezvous_message::Union::punch_hole_sent(phs)) => { + Some(rendezvous_message::Union::PunchHoleSent(phs)) => { allow_err!(self.handle_hole_sent(phs, addr, None).await); } - Some(rendezvous_message::Union::local_addr(la)) => { + Some(rendezvous_message::Union::LocalAddr(la)) => { allow_err!(self.handle_local_addr(la, addr, None).await); } - Some(rendezvous_message::Union::test_nat_request(tar)) => { + Some(rendezvous_message::Union::TestNatRequest(tar)) => { let mut msg_out = RendezvousMessage::new(); let mut res = TestNatResponse { port: addr.port() as _, @@ -531,7 +535,7 @@ impl RendezvousServer { msg_out.set_test_nat_response(res); Self::send_to_sink(sink, msg_out).await; } - Some(rendezvous_message::Union::register_pk(_rk)) => { + Some(rendezvous_message::Union::RegisterPk(_)) => { let res = register_pk_response::Result::NOT_SUPPORT; let mut msg_out = RendezvousMessage::new(); msg_out.set_register_pk_response(RegisterPkResponse { @@ -607,7 +611,7 @@ impl RendezvousServer { ); let mut msg_out = RendezvousMessage::new(); let mut p = PunchHoleResponse { - socket_addr: AddrMangle::encode(addr), + socket_addr: AddrMangle::encode(addr).into(), pk: self.get_pk(&phs.version, phs.id).await, relay_server: phs.relay_server.clone(), ..Default::default() @@ -714,7 +718,7 @@ impl RendezvousServer { _ => false, }, }; - let socket_addr = AddrMangle::encode(addr); + let socket_addr = AddrMangle::encode(addr).into(); if same_intranet { log::debug!( "Fetch local addr {:?} {:?} request from {:?}", @@ -858,7 +862,7 @@ impl RendezvousServer { self.relay_servers = self.relay_servers0.clone(); } - fn get_relay_server(&self, pa: IpAddr, pb: IpAddr) -> String { + fn get_relay_server(&self, _pa: IpAddr, _pb: IpAddr) -> String { if self.relay_servers.is_empty() { return "".to_owned(); } else if self.relay_servers.len() == 1 { @@ -1029,7 +1033,7 @@ impl RendezvousServer { let mut stream = stream; if let Some(Ok(bytes)) = stream.next_timeout(30_000).await { if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) { - if let Some(rendezvous_message::Union::test_nat_request(_)) = msg_in.union { + if let Some(rendezvous_message::Union::TestNatRequest(_)) = msg_in.union { let mut msg_out = RendezvousMessage::new(); msg_out.set_test_nat_response(TestNatResponse { port: addr.port() as _, @@ -1042,12 +1046,21 @@ impl RendezvousServer { }); } - async fn handle_listener(&self, stream: TcpStream, addr: SocketAddr, key: &str, ws: bool) { + async fn handle_listener( + &self, + stream: TcpStream, + addr: SocketAddr, + key: &str, + ws: bool, + ) { log::debug!("Tcp connection from {:?}, ws: {}", addr, ws); let mut rs = self.clone(); let key = key.to_owned(); tokio::spawn(async move { - allow_err!(rs.handle_listener_inner(stream, addr, &key, ws).await); + allow_err!( + rs.handle_listener_inner(stream, addr, &key, ws) + .await + ); }); } @@ -1067,7 +1080,10 @@ impl RendezvousServer { while let Ok(Some(Ok(msg))) = timeout(30_000, b.next()).await { match msg { tungstenite::Message::Binary(bytes) => { - if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await { + if !self + .handle_tcp(&bytes, &mut sink, addr, key, ws) + .await + { break; } } @@ -1078,7 +1094,10 @@ impl RendezvousServer { let (a, mut b) = Framed::new(stream, BytesCodec::new()).split(); sink = Some(Sink::TcpStream(a)); while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await { - if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await { + if !self + .handle_tcp(&bytes, &mut sink, addr, key, ws) + .await + { break; } } @@ -1091,13 +1110,13 @@ impl RendezvousServer { } #[inline] - async fn get_pk(&mut self, version: &str, id: String) -> Vec { + async fn get_pk(&mut self, version: &str, id: String) -> Bytes { if version.is_empty() || self.inner.sk.is_none() { - Vec::new() + Bytes::new() } else { match self.pm.get(&id).await { Some(peer) => { - let pk = peer.read().await.pk.clone(); + let pk = peer.read().await.pk.clone().into(); sign::sign( &hbb_common::message_proto::IdPk { id, @@ -1108,8 +1127,9 @@ impl RendezvousServer { .unwrap_or_default(), &self.inner.sk.as_ref().unwrap(), ) + .into() } - _ => Vec::new(), + _ => Bytes::new(), } } } @@ -1213,13 +1233,6 @@ async fn test_hbbs(addr: SocketAddr) -> ResultType<()> { } } -#[inline] -fn distance(a: &(i32, i32), b: &(i32, i32)) -> i32 { - let dx = a.0 - b.0; - let dy = a.1 - b.1; - dx * dx + dy * dy -} - #[inline] async fn send_rk_res( socket: &mut FramedSocket,