[fallback] remove pingpong

This commit is contained in:
YinMo19 2025-04-23 11:42:53 +08:00
parent 5c6b12c438
commit 836dbbc144

View File

@ -5,16 +5,12 @@ use crate::{
ResultType,
};
use bytes::{BufMut, Bytes, BytesMut};
use futures::stream::SplitSink;
use futures::{SinkExt, StreamExt};
use std::sync::Arc;
use std::{
io::{Error, ErrorKind},
net::SocketAddr,
time::Duration,
};
use tokio::sync::Mutex;
use tokio::time::Instant;
use tokio::{net::TcpStream, time::timeout};
use tokio_tungstenite::{
connect_async, tungstenite::protocol::Message as WsMessage, MaybeTlsStream, WebSocketStream,
@ -26,9 +22,7 @@ use tungstenite::protocol::Role;
pub struct Encrypt(Key, u64, u64);
pub struct WsFramedStream {
// stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
writer: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, WsMessage>>>,
reader: futures::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
addr: SocketAddr,
encrypt: Option<Encrypt>,
send_timeout: u64,
@ -71,17 +65,14 @@ impl WsFramedStream {
_ => return Err(Error::new(ErrorKind::Other, "Unsupported stream type").into()),
};
let (writer, reader) = ws_stream.split();
let mut ws = Self {
writer: Arc::new(Mutex::new(writer)),
reader,
let ws = Self {
stream: ws_stream,
addr,
encrypt: None,
send_timeout: ms_timeout,
};
ws.start_heartbeat();
Ok(ws)
} else {
log::info!("{:?}", url_str);
@ -104,47 +95,17 @@ impl WsFramedStream {
_ => return Err(Error::new(ErrorKind::Other, "Unsupported stream type").into()),
};
let (writer, reader) = stream.split();
let mut ws = Self {
writer: Arc::new(Mutex::new(writer)),
reader,
stream,
addr,
encrypt: None,
send_timeout: ms_timeout,
};
ws.start_heartbeat();
Ok(ws)
}
}
fn start_heartbeat(&self) {
let writer = Arc::clone(&self.writer);
tokio::spawn(async move {
let mut last_pong = Instant::now();
let mut interval = tokio::time::interval(HEARTBEAT_INTERVAL);
loop {
tokio::select! {
_ = interval.tick() => {
let mut lock = writer.lock().await;
if let Err(e) = lock.send(WsMessage::Ping(Bytes::new())).await {
log::error!("Heartbeat failed: {}", e);
break;
}
log::debug!("Sent ping");
}
_ = tokio::time::sleep(HEARTBEAT_TIMEOUT) => {
if last_pong.elapsed() > HEARTBEAT_TIMEOUT {
log::error!("Heartbeat timeout");
break;
}
}
}
}
});
}
pub fn set_raw(&mut self) {}
pub async fn from_tcp_stream(stream: TcpStream, addr: SocketAddr) -> ResultType<Self> {
@ -152,11 +113,9 @@ impl WsFramedStream {
WebSocketStream::from_raw_socket(MaybeTlsStream::Plain(stream), Role::Client, None)
.await;
let (writer, reader) = ws_stream.split();
Ok(Self {
writer: Arc::new(Mutex::new(writer)),
reader,
stream: ws_stream,
addr,
encrypt: None,
send_timeout: 0,
@ -168,11 +127,9 @@ impl WsFramedStream {
WebSocketStream::from_raw_socket(MaybeTlsStream::Plain(stream), Role::Client, None)
.await;
let (writer, reader) = ws_stream.split();
Self {
writer: Arc::new(Mutex::new(writer)),
reader,
stream: ws_stream,
addr,
encrypt: None,
send_timeout: 0,
@ -208,11 +165,14 @@ impl WsFramedStream {
#[inline]
pub async fn send_bytes(&mut self, bytes: Bytes) -> ResultType<()> {
let msg = WsMessage::Binary(bytes);
let mut writer = self.writer.lock().await;
if self.send_timeout > 0 {
timeout(Duration::from_millis(self.send_timeout), writer.send(msg)).await??
timeout(
Duration::from_millis(self.send_timeout),
self.stream.send(msg),
)
.await??
} else {
writer.send(msg).await?
self.stream.send(msg).await?
};
Ok(())
}
@ -223,7 +183,7 @@ impl WsFramedStream {
let start = std::time::Instant::now();
loop {
match self.reader.next().await {
match self.stream.next().await {
Some(Ok(msg)) => {
log::debug!("Received message: {:?}", &msg);
match msg {
@ -245,25 +205,25 @@ impl WsFramedStream {
}
return Some(Ok(bytes));
}
WsMessage::Ping(ping) => {
log::info!("Received ping ({} bytes)", ping.len());
let mut writer = self.writer.lock().await;
if let Err(e) = writer.send(WsMessage::Pong(ping)).await {
log::error!("Failed to send pong: {}", e);
return Some(Err(Error::new(
ErrorKind::Other,
format!("Failed to send pong: {}", e),
)));
}
log::debug!("Pong sent");
}
WsMessage::Pong(_) => {
log::debug!("Received pong");
}
WsMessage::Close(frame) => {
log::info!("Connection closed: {:?}", frame);
return None;
}
// WsMessage::Ping(ping) => {
// log::info!("Received ping ({} bytes)", ping.len());
// let mut writer = self.writer.lock().await;
// if let Err(e) = writer.send(WsMessage::Pong(ping)).await {
// log::error!("Failed to send pong: {}", e);
// return Some(Err(Error::new(
// ErrorKind::Other,
// format!("Failed to send pong: {}", e),
// )));
// }
// log::debug!("Pong sent");
// }
// WsMessage::Pong(_) => {
// log::debug!("Received pong");
// }
// WsMessage::Close(frame) => {
// log::info!("Connection closed: {:?}", frame);
// return None;
// }
_ => {
log::warn!("Unhandled message :{}", &msg);
}