prototype...

This commit is contained in:
open-trade
2020-03-06 17:18:22 +08:00
parent d188a5f5ea
commit 1695b5e357
11 changed files with 1045 additions and 10 deletions

5
src/lib.rs Normal file
View File

@@ -0,0 +1,5 @@
mod rendezvous_server;
pub use rendezvous_server::*;
#[path = "./protos/message.rs"]
mod message_proto;
pub use message_proto::*;

View File

@@ -1,3 +1,12 @@
fn main() {
println!("Hello, world!");
}
// https://tools.ietf.org/rfc/rfc5128.txt
// https://blog.csdn.net/bytxl/article/details/44344855
use hbbs::*;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
RendezvousServer::start("0.0.0.0:21116").await?;
Ok(())
}

301
src/protos/message.rs Normal file
View File

@@ -0,0 +1,301 @@
// This file is generated by rust-protobuf 2.10.2. Do not edit
// @generated
// https://github.com/rust-lang/rust-clippy/issues/702
#![allow(unknown_lints)]
#![allow(clippy::all)]
#![cfg_attr(rustfmt, rustfmt_skip)]
#![allow(box_pointers)]
#![allow(dead_code)]
#![allow(missing_docs)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
#![allow(trivial_casts)]
#![allow(unsafe_code)]
#![allow(unused_imports)]
#![allow(unused_results)]
//! Generated file from `message.proto`
use protobuf::Message as Message_imported_for_functions;
use protobuf::ProtobufEnum as ProtobufEnum_imported_for_functions;
/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_10_2;
#[derive(PartialEq,Clone,Default)]
pub struct Message {
// message fields
pub addr: ::std::string::String,
// message oneof groups
pub union: ::std::option::Option<Message_oneof_union>,
// special fields
pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize,
}
impl<'a> ::std::default::Default for &'a Message {
fn default() -> &'a Message {
<Message as ::protobuf::Message>::default_instance()
}
}
#[derive(Clone,PartialEq,Debug)]
pub enum Message_oneof_union {
socket_addr(::std::vec::Vec<u8>),
}
impl Message {
pub fn new() -> Message {
::std::default::Default::default()
}
// string addr = 1;
pub fn get_addr(&self) -> &str {
&self.addr
}
pub fn clear_addr(&mut self) {
self.addr.clear();
}
// Param is passed by value, moved
pub fn set_addr(&mut self, v: ::std::string::String) {
self.addr = v;
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_addr(&mut self) -> &mut ::std::string::String {
&mut self.addr
}
// Take field
pub fn take_addr(&mut self) -> ::std::string::String {
::std::mem::replace(&mut self.addr, ::std::string::String::new())
}
// bytes socket_addr = 6;
pub fn get_socket_addr(&self) -> &[u8] {
match self.union {
::std::option::Option::Some(Message_oneof_union::socket_addr(ref v)) => v,
_ => &[],
}
}
pub fn clear_socket_addr(&mut self) {
self.union = ::std::option::Option::None;
}
pub fn has_socket_addr(&self) -> bool {
match self.union {
::std::option::Option::Some(Message_oneof_union::socket_addr(..)) => true,
_ => false,
}
}
// Param is passed by value, moved
pub fn set_socket_addr(&mut self, v: ::std::vec::Vec<u8>) {
self.union = ::std::option::Option::Some(Message_oneof_union::socket_addr(v))
}
// Mutable pointer to the field.
pub fn mut_socket_addr(&mut self) -> &mut ::std::vec::Vec<u8> {
if let ::std::option::Option::Some(Message_oneof_union::socket_addr(_)) = self.union {
} else {
self.union = ::std::option::Option::Some(Message_oneof_union::socket_addr(::std::vec::Vec::new()));
}
match self.union {
::std::option::Option::Some(Message_oneof_union::socket_addr(ref mut v)) => v,
_ => panic!(),
}
}
// Take field
pub fn take_socket_addr(&mut self) -> ::std::vec::Vec<u8> {
if self.has_socket_addr() {
match self.union.take() {
::std::option::Option::Some(Message_oneof_union::socket_addr(v)) => v,
_ => panic!(),
}
} else {
::std::vec::Vec::new()
}
}
}
impl ::protobuf::Message for Message {
fn is_initialized(&self) -> bool {
true
}
fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
while !is.eof()? {
let (field_number, wire_type) = is.read_tag_unpack()?;
match field_number {
1 => {
::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.addr)?;
},
6 => {
if wire_type != ::protobuf::wire_format::WireTypeLengthDelimited {
return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
}
self.union = ::std::option::Option::Some(Message_oneof_union::socket_addr(is.read_bytes()?));
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
};
}
::std::result::Result::Ok(())
}
// Compute sizes of nested messages
#[allow(unused_variables)]
fn compute_size(&self) -> u32 {
let mut my_size = 0;
if !self.addr.is_empty() {
my_size += ::protobuf::rt::string_size(1, &self.addr);
}
if let ::std::option::Option::Some(ref v) = self.union {
match v {
&Message_oneof_union::socket_addr(ref v) => {
my_size += ::protobuf::rt::bytes_size(6, &v);
},
};
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
}
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
if !self.addr.is_empty() {
os.write_string(1, &self.addr)?;
}
if let ::std::option::Option::Some(ref v) = self.union {
match v {
&Message_oneof_union::socket_addr(ref v) => {
os.write_bytes(6, v)?;
},
};
}
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
fn get_cached_size(&self) -> u32 {
self.cached_size.get()
}
fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
&self.unknown_fields
}
fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
&mut self.unknown_fields
}
fn as_any(&self) -> &dyn (::std::any::Any) {
self as &dyn (::std::any::Any)
}
fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
self as &mut dyn (::std::any::Any)
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
self
}
fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
Self::descriptor_static()
}
fn new() -> Message {
Message::new()
}
fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::reflect::MessageDescriptor,
};
unsafe {
descriptor.get(|| {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"addr",
|m: &Message| { &m.addr },
|m: &mut Message| { &mut m.addr },
));
fields.push(::protobuf::reflect::accessor::make_singular_bytes_accessor::<_>(
"socket_addr",
Message::has_socket_addr,
Message::get_socket_addr,
));
::protobuf::reflect::MessageDescriptor::new::<Message>(
"Message",
fields,
file_descriptor_proto()
)
})
}
}
fn default_instance() -> &'static Message {
static mut instance: ::protobuf::lazy::Lazy<Message> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const Message,
};
unsafe {
instance.get(Message::new)
}
}
}
impl ::protobuf::Clear for Message {
fn clear(&mut self) {
self.addr.clear();
self.union = ::std::option::Option::None;
self.unknown_fields.clear();
}
}
impl ::std::fmt::Debug for Message {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for Message {
fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef {
::protobuf::reflect::ProtobufValueRef::Message(self)
}
}
static file_descriptor_proto_data: &'static [u8] = b"\
\n\rmessage.proto\x12\x03hbb\"=\n\x07Message\x12\x0e\n\x04addr\x18\x01\
\x20\x01(\tB\0\x12\x17\n\x0bsocket_addr\x18\x06\x20\x01(\x0cH\0B\0B\x07\
\n\x05union:\0B\0b\x06proto3\
";
static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::descriptor::FileDescriptorProto,
};
fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
::protobuf::parse_from_bytes(file_descriptor_proto_data).unwrap()
}
pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
unsafe {
file_descriptor_proto_lazy.get(|| {
parse_descriptor_proto()
})
}
}

93
src/rendezvous_server.rs Normal file
View File

@@ -0,0 +1,93 @@
use super::message_proto::Message;
use bytes::Bytes;
use futures::{FutureExt, SinkExt};
use protobuf::{parse_from_bytes, Message as _};
use std::{
collections::HashMap,
error::Error,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
time::{SystemTime, UNIX_EPOCH},
};
use tokio::net::UdpSocket;
use tokio::stream::StreamExt;
use tokio_util::{codec::BytesCodec, udp::UdpFramed};
/// Certain router and firewalls scan the packet and if they
/// find an IP address belonging to their pool that they use to do the NAT mapping/translation, so here we mangle the ip address
pub struct V4AddrMangle(Vec<u8>);
impl V4AddrMangle {
pub fn encode(addr: SocketAddrV4) -> Self {
let tm = (SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as u32) as u128;
let ip = u32::from_ne_bytes(addr.ip().octets()) as u128;
let port = addr.port() as u128;
let v = ((ip + tm) << 49) | (tm << 17) | (port + (tm & 0xFFFF));
let bytes = v.to_ne_bytes();
let mut n_padding = 0;
for i in bytes.iter().rev() {
if i == &0u8 {
n_padding += 1;
} else {
break;
}
}
Self(bytes[..(16 - n_padding)].to_vec())
}
pub fn decode(&self) -> SocketAddrV4 {
let mut padded = [0u8; 16];
padded[..self.0.len()].copy_from_slice(&self.0);
let number = u128::from_ne_bytes(padded);
let tm = (number >> 17) & (u32::max_value() as u128);
let ip = (((number >> 49) - tm) as u32).to_ne_bytes();
let port = (number & 0xFFFFFF) - (tm & 0xFFFF);
SocketAddrV4::new(Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]), port as u16)
}
}
pub struct Peer {
socket_addr: SocketAddr,
}
type PeerMap = HashMap<String, Peer>;
pub struct RendezvousServer {
peer_map: PeerMap,
}
impl RendezvousServer {
pub async fn start(addr: &str) -> Result<Self, Box<dyn Error>> {
let socket = UdpSocket::bind(addr).await?;
let mut socket = UdpFramed::new(socket, BytesCodec::new());
let rs = Self {
peer_map: PeerMap::new(),
};
while let Some(Ok((bytes, addr))) = socket.next().await {
if let SocketAddr::V4(addr_v4) = addr {
if let Ok(msg_in) = parse_from_bytes::<Message>(&bytes) {
let msg_out = Message::new();
socket
.send((Bytes::from(msg_out.write_to_bytes().unwrap()), addr))
.await?;
}
}
}
Ok(rs)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_mangle() {
let addr = SocketAddrV4::new(Ipv4Addr::new(192, 168, 16, 32), 21116);
assert_eq!(addr, V4AddrMangle::encode(addr).decode());
println!("{:?}", V4AddrMangle::encode(addr).0);
}
}