diff --git a/src/lib.rs b/src/lib.rs index a5d10de..b1770f2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,103 +58,8 @@ pub use uuid; pub mod fingerprint; pub use flexi_logger; pub mod websocket; -use sodiumoxide::crypto::secretbox::Key; - -// support Websocket and tcp. -pub enum Stream { - WebSocket(websocket::WsFramedStream), - Tcp(tcp::FramedStream), -} - -impl Stream { - pub fn set_send_timeout(&mut self, ms: u64) { - match self { - Stream::WebSocket(s) => s.set_send_timeout(ms), - Stream::Tcp(s) => s.set_send_timeout(ms), - } - } - - pub fn set_raw(&mut self) { - match self { - Stream::WebSocket(s) => s.set_raw(), - Stream::Tcp(s) => s.set_raw(), - } - } - - pub async fn send_bytes(&mut self, bytes: bytes::Bytes) -> ResultType<()> { - match self { - Stream::WebSocket(s) => s.send_bytes(bytes).await, - Stream::Tcp(s) => s.send_bytes(bytes).await, - } - } - - pub async fn send_raw(&mut self, bytes: Vec) -> ResultType<()> { - match self { - Stream::WebSocket(s) => s.send_raw(bytes).await, - Stream::Tcp(s) => s.send_raw(bytes).await, - } - } - - pub fn set_key(&mut self, key: Key) { - match self { - Stream::WebSocket(s) => s.set_key(key), - Stream::Tcp(s) => s.set_key(key), - } - } - - pub fn is_secured(&self) -> bool { - match self { - Stream::WebSocket(s) => s.is_secured(), - Stream::Tcp(s) => s.is_secured(), - } - } - - pub async fn next_timeout( - &mut self, - timeout: u64, - ) -> Option> { - match self { - Stream::WebSocket(s) => s.next_timeout(timeout).await, - Stream::Tcp(s) => s.next_timeout(timeout).await, - } - } - - /// establish connect from websocket - pub async fn connect_websocket( - url: impl AsRef, - local_addr: Option, - proxy_conf: Option<&config::Socks5Server>, - timeout_ms: u64, - ) -> ResultType { - let ws_stream = - websocket::WsFramedStream::new(url, local_addr, proxy_conf, timeout_ms).await?; - log::debug!("WebSocket connection established"); - Ok(Self::WebSocket(ws_stream)) - } - - /// send message - pub async fn send(&mut self, msg: &impl protobuf::Message) -> ResultType<()> { - match self { - Self::WebSocket(ws) => ws.send(msg).await, - Self::Tcp(tcp) => tcp.send(msg).await, - } - } - - /// receive message - pub async fn next(&mut self) -> Option> { - match self { - Self::WebSocket(ws) => ws.next().await, - Self::Tcp(tcp) => tcp.next().await, - } - } - - pub fn local_addr(&self) -> SocketAddr { - match self { - Self::WebSocket(ws) => ws.local_addr(), - Self::Tcp(tcp) => tcp.local_addr(), - } - } -} +pub mod stream; +pub use stream::Stream; pub type SessionID = uuid::Uuid; diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..947fe5a --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,102 @@ +use crate::tcp; +use crate::websocket; +use sodiumoxide::crypto::secretbox::Key; +use crate::config; +use crate::ResultType; +use std::net::SocketAddr; + +// support Websocket and tcp. +pub enum Stream { + WebSocket(websocket::WsFramedStream), + Tcp(tcp::FramedStream), +} + +impl Stream { + pub fn set_send_timeout(&mut self, ms: u64) { + match self { + Stream::WebSocket(s) => s.set_send_timeout(ms), + Stream::Tcp(s) => s.set_send_timeout(ms), + } + } + + pub fn set_raw(&mut self) { + match self { + Stream::WebSocket(s) => s.set_raw(), + Stream::Tcp(s) => s.set_raw(), + } + } + + pub async fn send_bytes(&mut self, bytes: bytes::Bytes) -> ResultType<()> { + match self { + Stream::WebSocket(s) => s.send_bytes(bytes).await, + Stream::Tcp(s) => s.send_bytes(bytes).await, + } + } + + pub async fn send_raw(&mut self, bytes: Vec) -> ResultType<()> { + match self { + Stream::WebSocket(s) => s.send_raw(bytes).await, + Stream::Tcp(s) => s.send_raw(bytes).await, + } + } + + pub fn set_key(&mut self, key: Key) { + match self { + Stream::WebSocket(s) => s.set_key(key), + Stream::Tcp(s) => s.set_key(key), + } + } + + pub fn is_secured(&self) -> bool { + match self { + Stream::WebSocket(s) => s.is_secured(), + Stream::Tcp(s) => s.is_secured(), + } + } + + pub async fn next_timeout( + &mut self, + timeout: u64, + ) -> Option> { + match self { + Stream::WebSocket(s) => s.next_timeout(timeout).await, + Stream::Tcp(s) => s.next_timeout(timeout).await, + } + } + + /// establish connect from websocket + pub async fn connect_websocket( + url: impl AsRef, + local_addr: Option, + proxy_conf: Option<&config::Socks5Server>, + timeout_ms: u64, + ) -> ResultType { + let ws_stream = + websocket::WsFramedStream::new(url, local_addr, proxy_conf, timeout_ms).await?; + log::debug!("WebSocket connection established"); + Ok(Self::WebSocket(ws_stream)) + } + + /// send message + pub async fn send(&mut self, msg: &impl protobuf::Message) -> ResultType<()> { + match self { + Self::WebSocket(ws) => ws.send(msg).await, + Self::Tcp(tcp) => tcp.send(msg).await, + } + } + + /// receive message + pub async fn next(&mut self) -> Option> { + match self { + Self::WebSocket(ws) => ws.next().await, + Self::Tcp(tcp) => tcp.next().await, + } + } + + pub fn local_addr(&self) -> SocketAddr { + match self { + Self::WebSocket(ws) => ws.local_addr(), + Self::Tcp(tcp) => tcp.local_addr(), + } + } +}