diff --git a/src/websocket.rs b/src/websocket.rs index a40b719..8ab25c7 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -5,16 +5,12 @@ use crate::{ ResultType, }; use bytes::{BufMut, Bytes, BytesMut}; -use futures::stream::SplitSink; use futures::{SinkExt, StreamExt}; -use std::sync::Arc; use std::{ io::{Error, ErrorKind}, net::SocketAddr, time::Duration, }; -use tokio::sync::Mutex; -use tokio::time::Instant; use tokio::{net::TcpStream, time::timeout}; use tokio_tungstenite::{ connect_async, tungstenite::protocol::Message as WsMessage, MaybeTlsStream, WebSocketStream, @@ -26,9 +22,7 @@ use tungstenite::protocol::Role; pub struct Encrypt(Key, u64, u64); pub struct WsFramedStream { - // stream: WebSocketStream>, - writer: Arc>, WsMessage>>>, - reader: futures::stream::SplitStream>>, + stream: WebSocketStream>, addr: SocketAddr, encrypt: Option, send_timeout: u64, @@ -71,17 +65,14 @@ impl WsFramedStream { _ => return Err(Error::new(ErrorKind::Other, "Unsupported stream type").into()), }; - let (writer, reader) = ws_stream.split(); - let mut ws = Self { - writer: Arc::new(Mutex::new(writer)), - reader, + let ws = Self { + stream: ws_stream, addr, encrypt: None, send_timeout: ms_timeout, }; - ws.start_heartbeat(); Ok(ws) } else { log::info!("{:?}", url_str); @@ -104,47 +95,17 @@ impl WsFramedStream { _ => return Err(Error::new(ErrorKind::Other, "Unsupported stream type").into()), }; - let (writer, reader) = stream.split(); let mut ws = Self { - writer: Arc::new(Mutex::new(writer)), - reader, + stream, addr, encrypt: None, send_timeout: ms_timeout, }; - ws.start_heartbeat(); Ok(ws) } } - fn start_heartbeat(&self) { - let writer = Arc::clone(&self.writer); - tokio::spawn(async move { - let mut last_pong = Instant::now(); - let mut interval = tokio::time::interval(HEARTBEAT_INTERVAL); - - loop { - tokio::select! { - _ = interval.tick() => { - let mut lock = writer.lock().await; - if let Err(e) = lock.send(WsMessage::Ping(Bytes::new())).await { - log::error!("Heartbeat failed: {}", e); - break; - } - log::debug!("Sent ping"); - } - _ = tokio::time::sleep(HEARTBEAT_TIMEOUT) => { - if last_pong.elapsed() > HEARTBEAT_TIMEOUT { - log::error!("Heartbeat timeout"); - break; - } - } - } - } - }); - } - pub fn set_raw(&mut self) {} pub async fn from_tcp_stream(stream: TcpStream, addr: SocketAddr) -> ResultType { @@ -152,11 +113,9 @@ impl WsFramedStream { WebSocketStream::from_raw_socket(MaybeTlsStream::Plain(stream), Role::Client, None) .await; - let (writer, reader) = ws_stream.split(); Ok(Self { - writer: Arc::new(Mutex::new(writer)), - reader, + stream: ws_stream, addr, encrypt: None, send_timeout: 0, @@ -168,11 +127,9 @@ impl WsFramedStream { WebSocketStream::from_raw_socket(MaybeTlsStream::Plain(stream), Role::Client, None) .await; - let (writer, reader) = ws_stream.split(); Self { - writer: Arc::new(Mutex::new(writer)), - reader, + stream: ws_stream, addr, encrypt: None, send_timeout: 0, @@ -208,11 +165,14 @@ impl WsFramedStream { #[inline] pub async fn send_bytes(&mut self, bytes: Bytes) -> ResultType<()> { let msg = WsMessage::Binary(bytes); - let mut writer = self.writer.lock().await; if self.send_timeout > 0 { - timeout(Duration::from_millis(self.send_timeout), writer.send(msg)).await?? + timeout( + Duration::from_millis(self.send_timeout), + self.stream.send(msg), + ) + .await?? } else { - writer.send(msg).await? + self.stream.send(msg).await? }; Ok(()) } @@ -223,7 +183,7 @@ impl WsFramedStream { let start = std::time::Instant::now(); loop { - match self.reader.next().await { + match self.stream.next().await { Some(Ok(msg)) => { log::debug!("Received message: {:?}", &msg); match msg { @@ -245,25 +205,25 @@ impl WsFramedStream { } return Some(Ok(bytes)); } - WsMessage::Ping(ping) => { - log::info!("Received ping ({} bytes)", ping.len()); - let mut writer = self.writer.lock().await; - if let Err(e) = writer.send(WsMessage::Pong(ping)).await { - log::error!("Failed to send pong: {}", e); - return Some(Err(Error::new( - ErrorKind::Other, - format!("Failed to send pong: {}", e), - ))); - } - log::debug!("Pong sent"); - } - WsMessage::Pong(_) => { - log::debug!("Received pong"); - } - WsMessage::Close(frame) => { - log::info!("Connection closed: {:?}", frame); - return None; - } + // WsMessage::Ping(ping) => { + // log::info!("Received ping ({} bytes)", ping.len()); + // let mut writer = self.writer.lock().await; + // if let Err(e) = writer.send(WsMessage::Pong(ping)).await { + // log::error!("Failed to send pong: {}", e); + // return Some(Err(Error::new( + // ErrorKind::Other, + // format!("Failed to send pong: {}", e), + // ))); + // } + // log::debug!("Pong sent"); + // } + // WsMessage::Pong(_) => { + // log::debug!("Received pong"); + // } + // WsMessage::Close(frame) => { + // log::info!("Connection closed: {:?}", frame); + // return None; + // } _ => { log::warn!("Unhandled message :{}", &msg); }