how to await tokio_timer::sleep?

This commit is contained in:
open-trade 2020-03-07 00:37:23 +08:00
parent 1695b5e357
commit f7cb0cfde6
6 changed files with 840 additions and 88 deletions

46
Cargo.lock generated
View File

@ -23,6 +23,11 @@ dependencies = [
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "autocfg"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "1.2.1" version = "1.2.1"
@ -38,6 +43,16 @@ name = "cfg-if"
version = "0.1.10" version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "crossbeam-utils"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "env_logger" name = "env_logger"
version = "0.7.1" version = "0.7.1"
@ -69,6 +84,11 @@ name = "fuchsia-zircon-sys"
version = "0.3.3" version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "futures"
version = "0.1.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.4" version = "0.3.4"
@ -162,6 +182,7 @@ dependencies = [
"protobuf 2.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 2.10.2 (registry+https://github.com/rust-lang/crates.io-index)",
"protobuf-codegen-pure 2.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf-codegen-pure 2.10.2 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-util 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-util 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
@ -462,6 +483,15 @@ dependencies = [
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "tokio-executor"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "0.2.5" version = "0.2.5"
@ -472,6 +502,17 @@ dependencies = [
"syn 1.0.16 (registry+https://github.com/rust-lang/crates.io-index)", "syn 1.0.16 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "tokio-timer"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.3.0" version = "0.3.0"
@ -541,13 +582,16 @@ dependencies = [
"checksum aho-corasick 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)" = "d5e63fd144e18ba274ae7095c0197a870a7b9468abc801dd62f190d80817d2ec" "checksum aho-corasick 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)" = "d5e63fd144e18ba274ae7095c0197a870a7b9468abc801dd62f190d80817d2ec"
"checksum arc-swap 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d7b8a9123b8027467bce0099fe556c628a53c8d83df0507084c31e9ba2e39aff" "checksum arc-swap 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d7b8a9123b8027467bce0099fe556c628a53c8d83df0507084c31e9ba2e39aff"
"checksum atty 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" "checksum atty 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
"checksum autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
"checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" "checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
"checksum bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)" = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" "checksum bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)" = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1"
"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" "checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
"checksum crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
"checksum env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" "checksum env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
"checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3"
"checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
"checksum futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)" = "1b980f2816d6ee8673b6517b52cb0e808a180efc92e5c19d02cdda79066703ef"
"checksum futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5c329ae8753502fb44ae4fc2b622fa2a94652c41e795143765ba0927f92ab780" "checksum futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5c329ae8753502fb44ae4fc2b622fa2a94652c41e795143765ba0927f92ab780"
"checksum futures-channel 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c77d04ce8edd9cb903932b608268b3fffec4163dc053b3b402bf47eac1f1a8" "checksum futures-channel 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c77d04ce8edd9cb903932b608268b3fffec4163dc053b3b402bf47eac1f1a8"
"checksum futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a" "checksum futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a"
@ -592,7 +636,9 @@ dependencies = [
"checksum termcolor 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb6bfa289a4d7c5766392812c0a1f4c1ba45afa1ad47803c11e1f407d846d75f" "checksum termcolor 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb6bfa289a4d7c5766392812c0a1f4c1ba45afa1ad47803c11e1f407d846d75f"
"checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" "checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
"checksum tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "0fa5e81d6bc4e67fe889d5783bd2a128ab2e0cfa487e0be16b6a8d177b101616" "checksum tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "0fa5e81d6bc4e67fe889d5783bd2a128ab2e0cfa487e0be16b6a8d177b101616"
"checksum tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "fb2d1b8f4548dbf5e1f7818512e9c406860678f29c300cdf0ebac72d1a3a1671"
"checksum tokio-macros 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" "checksum tokio-macros 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389"
"checksum tokio-timer 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "93044f2d313c95ff1cb7809ce9a7a05735b012288a888b62d4434fd58c94f296"
"checksum tokio-util 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "af67cdce2b40f8dffb0ee04c853a24217b5d0d3e358f0f5ccc0b5332174ed9a8" "checksum tokio-util 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "af67cdce2b40f8dffb0ee04c853a24217b5d0d3e358f0f5ccc0b5332174ed9a8"
"checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" "checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c"
"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"

View File

@ -10,6 +10,7 @@ edition = "2018"
tokio = { version = "0.2", features = ["full"] } tokio = { version = "0.2", features = ["full"] }
protobuf = "2.10" protobuf = "2.10"
tokio-util = { version = "0.3", features = ["full"] } tokio-util = { version = "0.3", features = ["full"] }
tokio-timer = "0.2"
log = "0.4" log = "0.4"
env_logger = "0.7" env_logger = "0.7"
futures = "0.3" futures = "0.3"

View File

@ -1,9 +1,22 @@
syntax = "proto3"; syntax = "proto3";
package hbb; package hbb;
message RegisterPeer {
string hbb_addr = 1;
}
message PeekPeer {
string hbb_addr = 1;
}
message PeekPeerResponse {
bytes socket_addr = 1;
}
message Message { message Message {
string addr = 1; // hbb address
oneof union { oneof union {
bytes socket_addr = 6; RegisterPeer register_peer = 6;
PeekPeer peek_peer = 7;
PeekPeerResponse peek_peer_response = 8;
} }
} }

View File

@ -1,2 +0,0 @@
mod message;
pub use message::*;

View File

@ -27,9 +27,514 @@ use protobuf::ProtobufEnum as ProtobufEnum_imported_for_functions;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_10_2; // const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_10_2;
#[derive(PartialEq,Clone,Default)] #[derive(PartialEq,Clone,Default)]
pub struct Message { pub struct RegisterPeer {
// message fields // message fields
pub addr: ::std::string::String, pub hbb_addr: ::std::string::String,
// special fields
pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize,
}
impl<'a> ::std::default::Default for &'a RegisterPeer {
fn default() -> &'a RegisterPeer {
<RegisterPeer as ::protobuf::Message>::default_instance()
}
}
impl RegisterPeer {
pub fn new() -> RegisterPeer {
::std::default::Default::default()
}
// string hbb_addr = 1;
pub fn get_hbb_addr(&self) -> &str {
&self.hbb_addr
}
pub fn clear_hbb_addr(&mut self) {
self.hbb_addr.clear();
}
// Param is passed by value, moved
pub fn set_hbb_addr(&mut self, v: ::std::string::String) {
self.hbb_addr = v;
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_hbb_addr(&mut self) -> &mut ::std::string::String {
&mut self.hbb_addr
}
// Take field
pub fn take_hbb_addr(&mut self) -> ::std::string::String {
::std::mem::replace(&mut self.hbb_addr, ::std::string::String::new())
}
}
impl ::protobuf::Message for RegisterPeer {
fn is_initialized(&self) -> bool {
true
}
fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
while !is.eof()? {
let (field_number, wire_type) = is.read_tag_unpack()?;
match field_number {
1 => {
::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.hbb_addr)?;
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
};
}
::std::result::Result::Ok(())
}
// Compute sizes of nested messages
#[allow(unused_variables)]
fn compute_size(&self) -> u32 {
let mut my_size = 0;
if !self.hbb_addr.is_empty() {
my_size += ::protobuf::rt::string_size(1, &self.hbb_addr);
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
}
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
if !self.hbb_addr.is_empty() {
os.write_string(1, &self.hbb_addr)?;
}
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
fn get_cached_size(&self) -> u32 {
self.cached_size.get()
}
fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
&self.unknown_fields
}
fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
&mut self.unknown_fields
}
fn as_any(&self) -> &dyn (::std::any::Any) {
self as &dyn (::std::any::Any)
}
fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
self as &mut dyn (::std::any::Any)
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
self
}
fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
Self::descriptor_static()
}
fn new() -> RegisterPeer {
RegisterPeer::new()
}
fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::reflect::MessageDescriptor,
};
unsafe {
descriptor.get(|| {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"hbb_addr",
|m: &RegisterPeer| { &m.hbb_addr },
|m: &mut RegisterPeer| { &mut m.hbb_addr },
));
::protobuf::reflect::MessageDescriptor::new::<RegisterPeer>(
"RegisterPeer",
fields,
file_descriptor_proto()
)
})
}
}
fn default_instance() -> &'static RegisterPeer {
static mut instance: ::protobuf::lazy::Lazy<RegisterPeer> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const RegisterPeer,
};
unsafe {
instance.get(RegisterPeer::new)
}
}
}
impl ::protobuf::Clear for RegisterPeer {
fn clear(&mut self) {
self.hbb_addr.clear();
self.unknown_fields.clear();
}
}
impl ::std::fmt::Debug for RegisterPeer {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for RegisterPeer {
fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef {
::protobuf::reflect::ProtobufValueRef::Message(self)
}
}
#[derive(PartialEq,Clone,Default)]
pub struct PeekPeer {
// message fields
pub hbb_addr: ::std::string::String,
// special fields
pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize,
}
impl<'a> ::std::default::Default for &'a PeekPeer {
fn default() -> &'a PeekPeer {
<PeekPeer as ::protobuf::Message>::default_instance()
}
}
impl PeekPeer {
pub fn new() -> PeekPeer {
::std::default::Default::default()
}
// string hbb_addr = 1;
pub fn get_hbb_addr(&self) -> &str {
&self.hbb_addr
}
pub fn clear_hbb_addr(&mut self) {
self.hbb_addr.clear();
}
// Param is passed by value, moved
pub fn set_hbb_addr(&mut self, v: ::std::string::String) {
self.hbb_addr = v;
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_hbb_addr(&mut self) -> &mut ::std::string::String {
&mut self.hbb_addr
}
// Take field
pub fn take_hbb_addr(&mut self) -> ::std::string::String {
::std::mem::replace(&mut self.hbb_addr, ::std::string::String::new())
}
}
impl ::protobuf::Message for PeekPeer {
fn is_initialized(&self) -> bool {
true
}
fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
while !is.eof()? {
let (field_number, wire_type) = is.read_tag_unpack()?;
match field_number {
1 => {
::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.hbb_addr)?;
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
};
}
::std::result::Result::Ok(())
}
// Compute sizes of nested messages
#[allow(unused_variables)]
fn compute_size(&self) -> u32 {
let mut my_size = 0;
if !self.hbb_addr.is_empty() {
my_size += ::protobuf::rt::string_size(1, &self.hbb_addr);
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
}
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
if !self.hbb_addr.is_empty() {
os.write_string(1, &self.hbb_addr)?;
}
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
fn get_cached_size(&self) -> u32 {
self.cached_size.get()
}
fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
&self.unknown_fields
}
fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
&mut self.unknown_fields
}
fn as_any(&self) -> &dyn (::std::any::Any) {
self as &dyn (::std::any::Any)
}
fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
self as &mut dyn (::std::any::Any)
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
self
}
fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
Self::descriptor_static()
}
fn new() -> PeekPeer {
PeekPeer::new()
}
fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::reflect::MessageDescriptor,
};
unsafe {
descriptor.get(|| {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"hbb_addr",
|m: &PeekPeer| { &m.hbb_addr },
|m: &mut PeekPeer| { &mut m.hbb_addr },
));
::protobuf::reflect::MessageDescriptor::new::<PeekPeer>(
"PeekPeer",
fields,
file_descriptor_proto()
)
})
}
}
fn default_instance() -> &'static PeekPeer {
static mut instance: ::protobuf::lazy::Lazy<PeekPeer> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const PeekPeer,
};
unsafe {
instance.get(PeekPeer::new)
}
}
}
impl ::protobuf::Clear for PeekPeer {
fn clear(&mut self) {
self.hbb_addr.clear();
self.unknown_fields.clear();
}
}
impl ::std::fmt::Debug for PeekPeer {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for PeekPeer {
fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef {
::protobuf::reflect::ProtobufValueRef::Message(self)
}
}
#[derive(PartialEq,Clone,Default)]
pub struct PeekPeerResponse {
// message fields
pub socket_addr: ::std::vec::Vec<u8>,
// special fields
pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize,
}
impl<'a> ::std::default::Default for &'a PeekPeerResponse {
fn default() -> &'a PeekPeerResponse {
<PeekPeerResponse as ::protobuf::Message>::default_instance()
}
}
impl PeekPeerResponse {
pub fn new() -> PeekPeerResponse {
::std::default::Default::default()
}
// bytes socket_addr = 1;
pub fn get_socket_addr(&self) -> &[u8] {
&self.socket_addr
}
pub fn clear_socket_addr(&mut self) {
self.socket_addr.clear();
}
// Param is passed by value, moved
pub fn set_socket_addr(&mut self, v: ::std::vec::Vec<u8>) {
self.socket_addr = v;
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_socket_addr(&mut self) -> &mut ::std::vec::Vec<u8> {
&mut self.socket_addr
}
// Take field
pub fn take_socket_addr(&mut self) -> ::std::vec::Vec<u8> {
::std::mem::replace(&mut self.socket_addr, ::std::vec::Vec::new())
}
}
impl ::protobuf::Message for PeekPeerResponse {
fn is_initialized(&self) -> bool {
true
}
fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
while !is.eof()? {
let (field_number, wire_type) = is.read_tag_unpack()?;
match field_number {
1 => {
::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.socket_addr)?;
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
};
}
::std::result::Result::Ok(())
}
// Compute sizes of nested messages
#[allow(unused_variables)]
fn compute_size(&self) -> u32 {
let mut my_size = 0;
if !self.socket_addr.is_empty() {
my_size += ::protobuf::rt::bytes_size(1, &self.socket_addr);
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
}
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
if !self.socket_addr.is_empty() {
os.write_bytes(1, &self.socket_addr)?;
}
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
fn get_cached_size(&self) -> u32 {
self.cached_size.get()
}
fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
&self.unknown_fields
}
fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
&mut self.unknown_fields
}
fn as_any(&self) -> &dyn (::std::any::Any) {
self as &dyn (::std::any::Any)
}
fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
self as &mut dyn (::std::any::Any)
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
self
}
fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
Self::descriptor_static()
}
fn new() -> PeekPeerResponse {
PeekPeerResponse::new()
}
fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::reflect::MessageDescriptor,
};
unsafe {
descriptor.get(|| {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
"socket_addr",
|m: &PeekPeerResponse| { &m.socket_addr },
|m: &mut PeekPeerResponse| { &mut m.socket_addr },
));
::protobuf::reflect::MessageDescriptor::new::<PeekPeerResponse>(
"PeekPeerResponse",
fields,
file_descriptor_proto()
)
})
}
}
fn default_instance() -> &'static PeekPeerResponse {
static mut instance: ::protobuf::lazy::Lazy<PeekPeerResponse> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const PeekPeerResponse,
};
unsafe {
instance.get(PeekPeerResponse::new)
}
}
}
impl ::protobuf::Clear for PeekPeerResponse {
fn clear(&mut self) {
self.socket_addr.clear();
self.unknown_fields.clear();
}
}
impl ::std::fmt::Debug for PeekPeerResponse {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for PeekPeerResponse {
fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef {
::protobuf::reflect::ProtobufValueRef::Message(self)
}
}
#[derive(PartialEq,Clone,Default)]
pub struct Message {
// message oneof groups // message oneof groups
pub union: ::std::option::Option<Message_oneof_union>, pub union: ::std::option::Option<Message_oneof_union>,
// special fields // special fields
@ -45,7 +550,9 @@ impl<'a> ::std::default::Default for &'a Message {
#[derive(Clone,PartialEq,Debug)] #[derive(Clone,PartialEq,Debug)]
pub enum Message_oneof_union { pub enum Message_oneof_union {
socket_addr(::std::vec::Vec<u8>), register_peer(RegisterPeer),
peek_peer(PeekPeer),
peek_peer_response(PeekPeerResponse),
} }
impl Message { impl Message {
@ -53,84 +560,171 @@ impl Message {
::std::default::Default::default() ::std::default::Default::default()
} }
// string addr = 1; // .hbb.RegisterPeer register_peer = 6;
pub fn get_addr(&self) -> &str { pub fn get_register_peer(&self) -> &RegisterPeer {
&self.addr
}
pub fn clear_addr(&mut self) {
self.addr.clear();
}
// Param is passed by value, moved
pub fn set_addr(&mut self, v: ::std::string::String) {
self.addr = v;
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_addr(&mut self) -> &mut ::std::string::String {
&mut self.addr
}
// Take field
pub fn take_addr(&mut self) -> ::std::string::String {
::std::mem::replace(&mut self.addr, ::std::string::String::new())
}
// bytes socket_addr = 6;
pub fn get_socket_addr(&self) -> &[u8] {
match self.union { match self.union {
::std::option::Option::Some(Message_oneof_union::socket_addr(ref v)) => v, ::std::option::Option::Some(Message_oneof_union::register_peer(ref v)) => v,
_ => &[], _ => RegisterPeer::default_instance(),
} }
} }
pub fn clear_socket_addr(&mut self) { pub fn clear_register_peer(&mut self) {
self.union = ::std::option::Option::None; self.union = ::std::option::Option::None;
} }
pub fn has_socket_addr(&self) -> bool { pub fn has_register_peer(&self) -> bool {
match self.union { match self.union {
::std::option::Option::Some(Message_oneof_union::socket_addr(..)) => true, ::std::option::Option::Some(Message_oneof_union::register_peer(..)) => true,
_ => false, _ => false,
} }
} }
// Param is passed by value, moved // Param is passed by value, moved
pub fn set_socket_addr(&mut self, v: ::std::vec::Vec<u8>) { pub fn set_register_peer(&mut self, v: RegisterPeer) {
self.union = ::std::option::Option::Some(Message_oneof_union::socket_addr(v)) self.union = ::std::option::Option::Some(Message_oneof_union::register_peer(v))
} }
// Mutable pointer to the field. // Mutable pointer to the field.
pub fn mut_socket_addr(&mut self) -> &mut ::std::vec::Vec<u8> { pub fn mut_register_peer(&mut self) -> &mut RegisterPeer {
if let ::std::option::Option::Some(Message_oneof_union::socket_addr(_)) = self.union { if let ::std::option::Option::Some(Message_oneof_union::register_peer(_)) = self.union {
} else { } else {
self.union = ::std::option::Option::Some(Message_oneof_union::socket_addr(::std::vec::Vec::new())); self.union = ::std::option::Option::Some(Message_oneof_union::register_peer(RegisterPeer::new()));
} }
match self.union { match self.union {
::std::option::Option::Some(Message_oneof_union::socket_addr(ref mut v)) => v, ::std::option::Option::Some(Message_oneof_union::register_peer(ref mut v)) => v,
_ => panic!(), _ => panic!(),
} }
} }
// Take field // Take field
pub fn take_socket_addr(&mut self) -> ::std::vec::Vec<u8> { pub fn take_register_peer(&mut self) -> RegisterPeer {
if self.has_socket_addr() { if self.has_register_peer() {
match self.union.take() { match self.union.take() {
::std::option::Option::Some(Message_oneof_union::socket_addr(v)) => v, ::std::option::Option::Some(Message_oneof_union::register_peer(v)) => v,
_ => panic!(), _ => panic!(),
} }
} else { } else {
::std::vec::Vec::new() RegisterPeer::new()
}
}
// .hbb.PeekPeer peek_peer = 7;
pub fn get_peek_peer(&self) -> &PeekPeer {
match self.union {
::std::option::Option::Some(Message_oneof_union::peek_peer(ref v)) => v,
_ => PeekPeer::default_instance(),
}
}
pub fn clear_peek_peer(&mut self) {
self.union = ::std::option::Option::None;
}
pub fn has_peek_peer(&self) -> bool {
match self.union {
::std::option::Option::Some(Message_oneof_union::peek_peer(..)) => true,
_ => false,
}
}
// Param is passed by value, moved
pub fn set_peek_peer(&mut self, v: PeekPeer) {
self.union = ::std::option::Option::Some(Message_oneof_union::peek_peer(v))
}
// Mutable pointer to the field.
pub fn mut_peek_peer(&mut self) -> &mut PeekPeer {
if let ::std::option::Option::Some(Message_oneof_union::peek_peer(_)) = self.union {
} else {
self.union = ::std::option::Option::Some(Message_oneof_union::peek_peer(PeekPeer::new()));
}
match self.union {
::std::option::Option::Some(Message_oneof_union::peek_peer(ref mut v)) => v,
_ => panic!(),
}
}
// Take field
pub fn take_peek_peer(&mut self) -> PeekPeer {
if self.has_peek_peer() {
match self.union.take() {
::std::option::Option::Some(Message_oneof_union::peek_peer(v)) => v,
_ => panic!(),
}
} else {
PeekPeer::new()
}
}
// .hbb.PeekPeerResponse peek_peer_response = 8;
pub fn get_peek_peer_response(&self) -> &PeekPeerResponse {
match self.union {
::std::option::Option::Some(Message_oneof_union::peek_peer_response(ref v)) => v,
_ => PeekPeerResponse::default_instance(),
}
}
pub fn clear_peek_peer_response(&mut self) {
self.union = ::std::option::Option::None;
}
pub fn has_peek_peer_response(&self) -> bool {
match self.union {
::std::option::Option::Some(Message_oneof_union::peek_peer_response(..)) => true,
_ => false,
}
}
// Param is passed by value, moved
pub fn set_peek_peer_response(&mut self, v: PeekPeerResponse) {
self.union = ::std::option::Option::Some(Message_oneof_union::peek_peer_response(v))
}
// Mutable pointer to the field.
pub fn mut_peek_peer_response(&mut self) -> &mut PeekPeerResponse {
if let ::std::option::Option::Some(Message_oneof_union::peek_peer_response(_)) = self.union {
} else {
self.union = ::std::option::Option::Some(Message_oneof_union::peek_peer_response(PeekPeerResponse::new()));
}
match self.union {
::std::option::Option::Some(Message_oneof_union::peek_peer_response(ref mut v)) => v,
_ => panic!(),
}
}
// Take field
pub fn take_peek_peer_response(&mut self) -> PeekPeerResponse {
if self.has_peek_peer_response() {
match self.union.take() {
::std::option::Option::Some(Message_oneof_union::peek_peer_response(v)) => v,
_ => panic!(),
}
} else {
PeekPeerResponse::new()
} }
} }
} }
impl ::protobuf::Message for Message { impl ::protobuf::Message for Message {
fn is_initialized(&self) -> bool { fn is_initialized(&self) -> bool {
if let Some(Message_oneof_union::register_peer(ref v)) = self.union {
if !v.is_initialized() {
return false;
}
}
if let Some(Message_oneof_union::peek_peer(ref v)) = self.union {
if !v.is_initialized() {
return false;
}
}
if let Some(Message_oneof_union::peek_peer_response(ref v)) = self.union {
if !v.is_initialized() {
return false;
}
}
true true
} }
@ -138,14 +732,23 @@ impl ::protobuf::Message for Message {
while !is.eof()? { while !is.eof()? {
let (field_number, wire_type) = is.read_tag_unpack()?; let (field_number, wire_type) = is.read_tag_unpack()?;
match field_number { match field_number {
1 => {
::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.addr)?;
},
6 => { 6 => {
if wire_type != ::protobuf::wire_format::WireTypeLengthDelimited { if wire_type != ::protobuf::wire_format::WireTypeLengthDelimited {
return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
} }
self.union = ::std::option::Option::Some(Message_oneof_union::socket_addr(is.read_bytes()?)); self.union = ::std::option::Option::Some(Message_oneof_union::register_peer(is.read_message()?));
},
7 => {
if wire_type != ::protobuf::wire_format::WireTypeLengthDelimited {
return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
}
self.union = ::std::option::Option::Some(Message_oneof_union::peek_peer(is.read_message()?));
},
8 => {
if wire_type != ::protobuf::wire_format::WireTypeLengthDelimited {
return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
}
self.union = ::std::option::Option::Some(Message_oneof_union::peek_peer_response(is.read_message()?));
}, },
_ => { _ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
@ -159,13 +762,19 @@ impl ::protobuf::Message for Message {
#[allow(unused_variables)] #[allow(unused_variables)]
fn compute_size(&self) -> u32 { fn compute_size(&self) -> u32 {
let mut my_size = 0; let mut my_size = 0;
if !self.addr.is_empty() {
my_size += ::protobuf::rt::string_size(1, &self.addr);
}
if let ::std::option::Option::Some(ref v) = self.union { if let ::std::option::Option::Some(ref v) = self.union {
match v { match v {
&Message_oneof_union::socket_addr(ref v) => { &Message_oneof_union::register_peer(ref v) => {
my_size += ::protobuf::rt::bytes_size(6, &v); let len = v.compute_size();
my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len;
},
&Message_oneof_union::peek_peer(ref v) => {
let len = v.compute_size();
my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len;
},
&Message_oneof_union::peek_peer_response(ref v) => {
let len = v.compute_size();
my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len;
}, },
}; };
} }
@ -175,13 +784,22 @@ impl ::protobuf::Message for Message {
} }
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
if !self.addr.is_empty() {
os.write_string(1, &self.addr)?;
}
if let ::std::option::Option::Some(ref v) = self.union { if let ::std::option::Option::Some(ref v) = self.union {
match v { match v {
&Message_oneof_union::socket_addr(ref v) => { &Message_oneof_union::register_peer(ref v) => {
os.write_bytes(6, v)?; os.write_tag(6, ::protobuf::wire_format::WireTypeLengthDelimited)?;
os.write_raw_varint32(v.get_cached_size())?;
v.write_to_with_cached_sizes(os)?;
},
&Message_oneof_union::peek_peer(ref v) => {
os.write_tag(7, ::protobuf::wire_format::WireTypeLengthDelimited)?;
os.write_raw_varint32(v.get_cached_size())?;
v.write_to_with_cached_sizes(os)?;
},
&Message_oneof_union::peek_peer_response(ref v) => {
os.write_tag(8, ::protobuf::wire_format::WireTypeLengthDelimited)?;
os.write_raw_varint32(v.get_cached_size())?;
v.write_to_with_cached_sizes(os)?;
}, },
}; };
} }
@ -227,15 +845,20 @@ impl ::protobuf::Message for Message {
unsafe { unsafe {
descriptor.get(|| { descriptor.get(|| {
let mut fields = ::std::vec::Vec::new(); let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( fields.push(::protobuf::reflect::accessor::make_singular_message_accessor::<_, RegisterPeer>(
"addr", "register_peer",
|m: &Message| { &m.addr }, Message::has_register_peer,
|m: &mut Message| { &mut m.addr }, Message::get_register_peer,
)); ));
fields.push(::protobuf::reflect::accessor::make_singular_bytes_accessor::<_>( fields.push(::protobuf::reflect::accessor::make_singular_message_accessor::<_, PeekPeer>(
"socket_addr", "peek_peer",
Message::has_socket_addr, Message::has_peek_peer,
Message::get_socket_addr, Message::get_peek_peer,
));
fields.push(::protobuf::reflect::accessor::make_singular_message_accessor::<_, PeekPeerResponse>(
"peek_peer_response",
Message::has_peek_peer_response,
Message::get_peek_peer_response,
)); ));
::protobuf::reflect::MessageDescriptor::new::<Message>( ::protobuf::reflect::MessageDescriptor::new::<Message>(
"Message", "Message",
@ -259,7 +882,8 @@ impl ::protobuf::Message for Message {
impl ::protobuf::Clear for Message { impl ::protobuf::Clear for Message {
fn clear(&mut self) { fn clear(&mut self) {
self.addr.clear(); self.union = ::std::option::Option::None;
self.union = ::std::option::Option::None;
self.union = ::std::option::Option::None; self.union = ::std::option::Option::None;
self.unknown_fields.clear(); self.unknown_fields.clear();
} }
@ -278,9 +902,14 @@ impl ::protobuf::reflect::ProtobufValue for Message {
} }
static file_descriptor_proto_data: &'static [u8] = b"\ static file_descriptor_proto_data: &'static [u8] = b"\
\n\rmessage.proto\x12\x03hbb\"=\n\x07Message\x12\x0e\n\x04addr\x18\x01\ \n\rmessage.proto\x12\x03hbb\"$\n\x0cRegisterPeer\x12\x12\n\x08hbb_addr\
\x20\x01(\tB\0\x12\x17\n\x0bsocket_addr\x18\x06\x20\x01(\x0cH\0B\0B\x07\ \x18\x01\x20\x01(\tB\0:\0\"\x20\n\x08PeekPeer\x12\x12\n\x08hbb_addr\x18\
\n\x05union:\0B\0b\x06proto3\ \x01\x20\x01(\tB\0:\0\"+\n\x10PeekPeerResponse\x12\x15\n\x0bsocket_addr\
\x18\x01\x20\x01(\x0cB\0:\0\"\x9f\x01\n\x07Message\x12,\n\rregister_peer\
\x18\x06\x20\x01(\x0b2\x11.hbb.RegisterPeerH\0B\0\x12$\n\tpeek_peer\x18\
\x07\x20\x01(\x0b2\r.hbb.PeekPeerH\0B\0\x125\n\x12peek_peer_response\x18\
\x08\x20\x01(\x0b2\x15.hbb.PeekPeerResponseH\0B\0B\x07\n\x05union:\0B\0b\
\x06proto3\
"; ";
static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy { static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy {

View File

@ -1,4 +1,4 @@
use super::message_proto::Message; use super::message_proto::*;
use bytes::Bytes; use bytes::Bytes;
use futures::{FutureExt, SinkExt}; use futures::{FutureExt, SinkExt};
use protobuf::{parse_from_bytes, Message as _}; use protobuf::{parse_from_bytes, Message as _};
@ -18,7 +18,7 @@ use tokio_util::{codec::BytesCodec, udp::UdpFramed};
pub struct V4AddrMangle(Vec<u8>); pub struct V4AddrMangle(Vec<u8>);
impl V4AddrMangle { impl V4AddrMangle {
pub fn encode(addr: SocketAddrV4) -> Self { pub fn encode(addr: &SocketAddrV4) -> Self {
let tm = (SystemTime::now() let tm = (SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.unwrap() .unwrap()
@ -50,7 +50,7 @@ impl V4AddrMangle {
} }
pub struct Peer { pub struct Peer {
socket_addr: SocketAddr, socket_addr: SocketAddrV4,
} }
type PeerMap = HashMap<String, Peer>; type PeerMap = HashMap<String, Peer>;
@ -59,26 +59,68 @@ pub struct RendezvousServer {
peer_map: PeerMap, peer_map: PeerMap,
} }
type FramedSocket = UdpFramed<BytesCodec>;
type ResultType = Result<(), Box<dyn Error>>;
impl RendezvousServer { impl RendezvousServer {
pub async fn start(addr: &str) -> Result<Self, Box<dyn Error>> { pub async fn start(addr: &str) -> ResultType {
let socket = UdpSocket::bind(addr).await?; let socket = UdpSocket::bind(addr).await?;
let mut socket = UdpFramed::new(socket, BytesCodec::new()); let mut socket = UdpFramed::new(socket, BytesCodec::new());
let rs = Self { let mut rs = Self {
peer_map: PeerMap::new(), peer_map: PeerMap::new(),
}; };
while let Some(Ok((bytes, addr))) = socket.next().await { while let Some(Ok((bytes, addr))) = socket.next().await {
if let SocketAddr::V4(addr_v4) = addr { if let SocketAddr::V4(addr_v4) = addr {
if let Ok(msg_in) = parse_from_bytes::<Message>(&bytes) { if let Ok(msg_in) = parse_from_bytes::<Message>(&bytes) {
let msg_out = Message::new(); match msg_in.union {
socket Some(Message_oneof_union::register_peer(rp)) => {
.send((Bytes::from(msg_out.write_to_bytes().unwrap()), addr)) if rp.hbb_addr.len() > 0 {
.await?; rs.peer_map.insert(
rp.hbb_addr,
Peer {
socket_addr: addr_v4,
},
);
}
tokio_timer::sleep(std::time::Duration::from_secs(60));
}
Some(Message_oneof_union::peek_peer(pp)) => {
rs.handle_peek_peer(&pp, addr, &mut socket).await?;
tokio_timer::sleep(std::time::Duration::from_secs(60));
}
_ => {}
}
} }
} }
} }
Ok(rs) Ok(())
} }
pub async fn handle_peek_peer(
&self,
pp: &PeekPeer,
addr: SocketAddr,
socket: &mut FramedSocket,
) -> ResultType {
if let Some(peer) = self.peer_map.get(&pp.hbb_addr) {
let mut msg_out = Message::new();
msg_out.set_peek_peer_response(PeekPeerResponse {
socket_addr: V4AddrMangle::encode(&peer.socket_addr).0.to_vec(),
..Default::default()
});
send_to(&msg_out, addr, socket).await?;
}
Ok(())
}
}
#[inline]
pub async fn send_to(msg: &Message, addr: SocketAddr, socket: &mut FramedSocket) -> ResultType {
socket
.send((Bytes::from(msg.write_to_bytes().unwrap()), addr))
.await?;
Ok(())
} }
#[cfg(test)] #[cfg(test)]
@ -87,7 +129,30 @@ mod tests {
#[test] #[test]
fn test_mangle() { fn test_mangle() {
let addr = SocketAddrV4::new(Ipv4Addr::new(192, 168, 16, 32), 21116); let addr = SocketAddrV4::new(Ipv4Addr::new(192, 168, 16, 32), 21116);
assert_eq!(addr, V4AddrMangle::encode(addr).decode()); assert_eq!(addr, V4AddrMangle::encode(&addr).decode());
println!("{:?}", V4AddrMangle::encode(addr).0); }
#[allow(unused_must_use)]
#[tokio::main]
async fn test_rs_async() {
let server_addr = "0.0.0.0:21116";
let f1 = RendezvousServer::start(server_addr);
let to_addr = server_addr.parse().unwrap();
let f2 = async {
let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let mut socket = UdpFramed::new(socket, BytesCodec::new());
let mut msg_out = Message::new();
msg_out.set_peek_peer(PeekPeer {
hbb_addr: "123".to_string(),
..Default::default()
});
send_to(&msg_out, to_addr, &mut socket).await;
};
tokio::join!(f1, f2);
}
#[test]
fn test_rs() {
self::test_rs_async();
} }
} }