[enhance] rewrite websocket next.

This commit is contained in:
YinMo19 2025-04-24 00:16:14 +08:00
parent 13ffda490d
commit bac2ffd31e

View File

@ -63,7 +63,6 @@ impl WsFramedStream {
_ => return Err(Error::new(ErrorKind::Other, "Unsupported stream type").into()),
};
let ws = Self {
stream: ws_stream,
addr,
@ -111,7 +110,6 @@ impl WsFramedStream {
WebSocketStream::from_raw_socket(MaybeTlsStream::Plain(stream), Role::Client, None)
.await;
Ok(Self {
stream: ws_stream,
addr,
@ -125,7 +123,6 @@ impl WsFramedStream {
WebSocketStream::from_raw_socket(MaybeTlsStream::Plain(stream), Role::Client, None)
.await;
Self {
stream: ws_stream,
addr,
@ -179,67 +176,49 @@ impl WsFramedStream {
pub async fn next(&mut self) -> Option<Result<BytesMut, Error>> {
log::debug!("Waiting for next message");
loop {
match self.stream.next().await {
Some(Ok(msg)) => {
log::debug!("Received message: {:?}", &msg);
match msg {
WsMessage::Binary(data) => {
log::info!("Received binary data ({} bytes)", data.len());
let mut bytes = BytesMut::from(&data[..]);
if let Some(key) = self.encrypt.as_mut() {
log::debug!("Decrypting data with seq: {}", key.2);
match key.dec(&mut bytes) {
Ok(_) => {
log::debug!("Decryption successful");
return Some(Ok(bytes));
}
Err(e) => {
log::error!("Decryption failed: {}", e);
return Some(Err(e));
}
}
}
return Some(Ok(bytes));
}
// WsMessage::Ping(ping) => {
// log::info!("Received ping ({} bytes)", ping.len());
// let mut writer = self.writer.lock().await;
// if let Err(e) = writer.send(WsMessage::Pong(ping)).await {
// log::error!("Failed to send pong: {}", e);
// return Some(Err(Error::new(
// ErrorKind::Other,
// format!("Failed to send pong: {}", e),
// )));
// }
// log::debug!("Pong sent");
// }
// WsMessage::Pong(_) => {
// log::debug!("Received pong");
// }
// WsMessage::Close(frame) => {
// log::info!("Connection closed: {:?}", frame);
// return None;
// }
_ => {
log::warn!("Unhandled message :{}", &msg);
}
}
}
Some(Err(e)) => {
log::error!("WebSocket error: {}", e);
while let Some(msg) = self.stream.next().await {
let msg = match msg {
Ok(msg) => msg,
Err(e) => {
return Some(Err(Error::new(
ErrorKind::Other,
format!("Failed to send pong: {}", e),
format!("WebSocket protocol error: {}", e),
)));
}
None => {
log::info!("Connection closed gracefully");
};
log::debug!("Received message type: {}", msg.to_string());
match msg {
WsMessage::Binary(data) => {
log::info!("Received binary data ({} bytes)", data.len());
let mut bytes = BytesMut::from(&data[..]);
if let Some(key) = self.encrypt.as_mut() {
log::debug!("Decrypting data with seq: {}", key.2);
match key.dec(&mut bytes) {
Ok(_) => {
log::debug!("Decryption successful");
return Some(Ok(bytes));
}
Err(e) => {
log::error!("Decryption failed: {}", e);
return Some(Err(e));
}
}
}
return Some(Ok(bytes));
}
WsMessage::Close(_) => {
log::info!("Received close frame");
return None;
}
_ => {
log::debug!("Unhandled message type: {}", msg.to_string());
continue;
}
}
}
None
}
#[inline]