kcp stream

This commit is contained in:
RustDesk 2025-06-03 19:41:30 +08:00 committed by GitHub
parent fa160b2864
commit b69b097c6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 133 additions and 1 deletions

View File

@ -8,7 +8,7 @@ edition = "2018"
[dependencies] [dependencies]
# new flexi_logger failed on rustc 1.75 # new flexi_logger failed on rustc 1.75
flexi_logger = { version = "0.27", features = ["async"] } flexi_logger = { version = "0.30", features = ["async"] }
protobuf = { version = "3.7", features = ["with-bytes"] } protobuf = { version = "3.7", features = ["with-bytes"] }
tokio = { version = "1.44", features = ["full"] } tokio = { version = "1.44", features = ["full"] }
tokio-util = { version = "0.7", features = ["full"] } tokio-util = { version = "0.7", features = ["full"] }
@ -47,6 +47,7 @@ base64 = "0.22"
url = "2.5" url = "2.5"
sha2 = "0.10" sha2 = "0.10"
whoami = "1.5" whoami = "1.5"
kcp-sys="0.1"
[target.'cfg(not(any(target_os = "android", target_os = "ios")))'.dependencies] [target.'cfg(not(any(target_os = "android", target_os = "ios")))'.dependencies]
mac_address = "1.1" mac_address = "1.1"

View File

@ -53,6 +53,7 @@ message PunchHoleSent {
string relay_server = 3; string relay_server = 3;
NatType nat_type = 4; NatType nat_type = 4;
string version = 5; string version = 5;
bool is_udp = 6;
} }
message RegisterPk { message RegisterPk {
@ -94,6 +95,7 @@ message PunchHoleResponse {
} }
string other_failure = 7; string other_failure = 7;
int32 feedback = 8; int32 feedback = 8;
bool is_udp = 9;
} }
message ConfigUpdate { message ConfigUpdate {

128
src/kcp_stream.rs Normal file
View File

@ -0,0 +1,128 @@
use crate::tcp::{DynTcpStream, FramedStream};
use kcp_sys::{
endpoint::*,
packet_def::{Bytes, BytesMut, KcpPacket},
stream,
};
use std::{net::SocketAddr, sync::Arc};
use tokio::{net::UdpSocket, sync::mpsc};
pub struct KcpStream {
pub endpoint: Arc<KcpEndpoint>,
pub stream: FramedStream,
}
impl KcpStream {
fn create_framed(stream: stream::KcpStream, local_addr: Option<SocketAddr>) -> FramedStream {
FramedStream(
tokio_util::codec::Framed::new(
DynTcpStream(Box::new(stream)),
crate::bytes_codec::BytesCodec::new(),
),
local_addr.unwrap_or(crate::config::Config::get_any_listen_addr(true)),
None,
0,
)
}
pub async fn accept(
udp_socket: Arc<UdpSocket>,
from_addr: SocketAddr,
) -> crate::ResultType<Self> {
let mut endpoint = KcpEndpoint::new();
endpoint.run().await;
let (input, output) = (endpoint.input_sender(), endpoint.output_receiver().unwrap());
udp_socket.connect(&[from_addr][..]).await?;
Self::kcp_io(udp_socket.clone(), input, output).await;
let conn_id = endpoint.accept().await?;
if let Some(stream) = stream::KcpStream::new(&endpoint, conn_id) {
Ok(Self {
endpoint: Arc::new(endpoint),
stream: Self::create_framed(stream, udp_socket.local_addr().ok()),
})
} else {
Err(anyhow::anyhow!("Failed to create KcpStream"))
}
}
pub async fn connect(
udp_socket: Arc<UdpSocket>,
to_addr: SocketAddr,
timeout: std::time::Duration,
) -> crate::ResultType<Self> {
let mut endpoint = KcpEndpoint::new();
endpoint.run().await;
let (input, output) = (endpoint.input_sender(), endpoint.output_receiver().unwrap());
udp_socket.connect(&[to_addr][..]).await?;
Self::kcp_io(udp_socket.clone(), input, output).await;
let conn_id = endpoint.connect(timeout, 0, 0, Bytes::new()).await.unwrap();
if let Some(stream) = stream::KcpStream::new(&endpoint, conn_id) {
Ok(Self {
endpoint: Arc::new(endpoint),
stream: Self::create_framed(stream, udp_socket.local_addr().ok()),
})
} else {
Err(anyhow::anyhow!("Failed to create KcpStream"))
}
}
async fn kcp_io(
udp_socket: Arc<UdpSocket>,
input: mpsc::Sender<KcpPacket>,
mut output: mpsc::Receiver<KcpPacket>,
) {
let udp = udp_socket.clone();
tokio::spawn(async move {
loop {
tokio::select! {
Some(data) = output.recv() => {
if let Err(e) = udp.send(&data.inner()).await {
// Break on fatal errors, but ignore WouldBlock or Interrupted
if e.kind() != std::io::ErrorKind::WouldBlock && e.kind() != std::io::ErrorKind::Interrupted {
log::error!("kcp send error: {:?}", e);
break;
}
}
}
else => {
log::debug!("kcp endpoint output closed");
break;
}
}
}
});
let udp = udp_socket.clone();
tokio::spawn(async move {
let mut buf = vec![0; 10240];
loop {
tokio::select! {
result = udp.recv_from(&mut buf) => {
match result {
Ok((size, _)) => {
input
.send(BytesMut::from(&buf[..size]).into())
.await.ok();
}
Err(e) => {
// Break on fatal errors, but ignore WouldBlock or Interrupted
if e.kind() != std::io::ErrorKind::WouldBlock && e.kind() != std::io::ErrorKind::Interrupted {
log::error!("kcp recv_from error: {:?}", e);
break;
}
}
}
}
else => {
log::debug!("kcp endpoint input closed");
break;
}
}
}
});
}
}

View File

@ -61,6 +61,7 @@ pub mod websocket;
pub mod stream; pub mod stream;
pub use stream::Stream; pub use stream::Stream;
pub use whoami; pub use whoami;
pub mod kcp_stream;
pub type SessionID = uuid::Uuid; pub type SessionID = uuid::Uuid;