mirror of
https://github.com/rustdesk/hbb_common.git
synced 2026-04-09 01:16:03 +00:00
more udp punch
This commit is contained in:
@@ -1,128 +0,0 @@
|
||||
use crate::tcp::{DynTcpStream, FramedStream};
|
||||
use kcp_sys::{
|
||||
endpoint::*,
|
||||
packet_def::{Bytes, BytesMut, KcpPacket},
|
||||
stream,
|
||||
};
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use tokio::{net::UdpSocket, sync::mpsc};
|
||||
|
||||
pub struct KcpStream {
|
||||
pub endpoint: Arc<KcpEndpoint>,
|
||||
pub stream: FramedStream,
|
||||
}
|
||||
|
||||
impl KcpStream {
|
||||
fn create_framed(stream: stream::KcpStream, local_addr: Option<SocketAddr>) -> FramedStream {
|
||||
FramedStream(
|
||||
tokio_util::codec::Framed::new(
|
||||
DynTcpStream(Box::new(stream)),
|
||||
crate::bytes_codec::BytesCodec::new(),
|
||||
),
|
||||
local_addr.unwrap_or(crate::config::Config::get_any_listen_addr(true)),
|
||||
None,
|
||||
0,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn accept(
|
||||
udp_socket: Arc<UdpSocket>,
|
||||
from_addr: SocketAddr,
|
||||
) -> crate::ResultType<Self> {
|
||||
let mut endpoint = KcpEndpoint::new();
|
||||
endpoint.run().await;
|
||||
|
||||
let (input, output) = (endpoint.input_sender(), endpoint.output_receiver().unwrap());
|
||||
udp_socket.connect(&[from_addr][..]).await?;
|
||||
Self::kcp_io(udp_socket.clone(), input, output).await;
|
||||
|
||||
let conn_id = endpoint.accept().await?;
|
||||
if let Some(stream) = stream::KcpStream::new(&endpoint, conn_id) {
|
||||
Ok(Self {
|
||||
endpoint: Arc::new(endpoint),
|
||||
stream: Self::create_framed(stream, udp_socket.local_addr().ok()),
|
||||
})
|
||||
} else {
|
||||
Err(anyhow::anyhow!("Failed to create KcpStream"))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn connect(
|
||||
udp_socket: Arc<UdpSocket>,
|
||||
to_addr: SocketAddr,
|
||||
timeout: std::time::Duration,
|
||||
) -> crate::ResultType<Self> {
|
||||
let mut endpoint = KcpEndpoint::new();
|
||||
endpoint.run().await;
|
||||
|
||||
let (input, output) = (endpoint.input_sender(), endpoint.output_receiver().unwrap());
|
||||
udp_socket.connect(&[to_addr][..]).await?;
|
||||
Self::kcp_io(udp_socket.clone(), input, output).await;
|
||||
|
||||
let conn_id = endpoint.connect(timeout, 0, 0, Bytes::new()).await.unwrap();
|
||||
if let Some(stream) = stream::KcpStream::new(&endpoint, conn_id) {
|
||||
Ok(Self {
|
||||
endpoint: Arc::new(endpoint),
|
||||
stream: Self::create_framed(stream, udp_socket.local_addr().ok()),
|
||||
})
|
||||
} else {
|
||||
Err(anyhow::anyhow!("Failed to create KcpStream"))
|
||||
}
|
||||
}
|
||||
|
||||
async fn kcp_io(
|
||||
udp_socket: Arc<UdpSocket>,
|
||||
input: mpsc::Sender<KcpPacket>,
|
||||
mut output: mpsc::Receiver<KcpPacket>,
|
||||
) {
|
||||
let udp = udp_socket.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(data) = output.recv() => {
|
||||
if let Err(e) = udp.send(&data.inner()).await {
|
||||
// Break on fatal errors, but ignore WouldBlock or Interrupted
|
||||
if e.kind() != std::io::ErrorKind::WouldBlock && e.kind() != std::io::ErrorKind::Interrupted {
|
||||
log::error!("kcp send error: {:?}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
else => {
|
||||
log::debug!("kcp endpoint output closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let udp = udp_socket.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut buf = vec![0; 10240];
|
||||
loop {
|
||||
tokio::select! {
|
||||
result = udp.recv_from(&mut buf) => {
|
||||
match result {
|
||||
Ok((size, _)) => {
|
||||
input
|
||||
.send(BytesMut::from(&buf[..size]).into())
|
||||
.await.ok();
|
||||
}
|
||||
Err(e) => {
|
||||
// Break on fatal errors, but ignore WouldBlock or Interrupted
|
||||
if e.kind() != std::io::ErrorKind::WouldBlock && e.kind() != std::io::ErrorKind::Interrupted {
|
||||
log::error!("kcp recv_from error: {:?}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else => {
|
||||
log::debug!("kcp endpoint input closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -61,7 +61,6 @@ pub mod websocket;
|
||||
pub mod stream;
|
||||
pub use stream::Stream;
|
||||
pub use whoami;
|
||||
pub mod kcp_stream;
|
||||
|
||||
pub type SessionID = uuid::Uuid;
|
||||
|
||||
|
||||
@@ -6,8 +6,8 @@ use crate::{
|
||||
ResultType, Stream,
|
||||
};
|
||||
use anyhow::Context;
|
||||
use std::net::SocketAddr;
|
||||
use tokio::net::ToSocketAddrs;
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use tokio::net::{ToSocketAddrs, UdpSocket};
|
||||
use tokio_socks::{IntoTargetAddr, TargetAddr};
|
||||
|
||||
#[inline]
|
||||
@@ -207,6 +207,14 @@ async fn test_target(target: &str) -> ResultType<SocketAddr> {
|
||||
.context(format!("Failed to look up host for {target}"))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn new_direct_udp_for(target: &str) -> ResultType<(Arc<UdpSocket>, SocketAddr)> {
|
||||
let peer_addr = test_target(target).await?;
|
||||
let local_addr = Config::get_any_listen_addr(peer_addr.is_ipv4());
|
||||
let socket = UdpSocket::bind(local_addr).await?;
|
||||
Ok((Arc::new(socket), peer_addr))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn new_udp_for(
|
||||
target: &str,
|
||||
|
||||
10
src/tcp.rs
10
src/tcp.rs
@@ -22,16 +22,16 @@ use tokio_socks::IntoTargetAddr;
|
||||
use tokio_util::codec::Framed;
|
||||
|
||||
pub trait TcpStreamTrait: AsyncRead + AsyncWrite + Unpin {}
|
||||
pub struct DynTcpStream(pub(crate) Box<dyn TcpStreamTrait + Send + Sync>);
|
||||
pub struct DynTcpStream(pub Box<dyn TcpStreamTrait + Send + Sync>);
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Encrypt(pub Key, pub u64, pub u64);
|
||||
|
||||
pub struct FramedStream(
|
||||
pub(crate) Framed<DynTcpStream, BytesCodec>,
|
||||
pub(crate) SocketAddr,
|
||||
pub(crate) Option<Encrypt>,
|
||||
pub(crate) u64,
|
||||
pub Framed<DynTcpStream, BytesCodec>,
|
||||
pub SocketAddr,
|
||||
pub Option<Encrypt>,
|
||||
pub u64,
|
||||
);
|
||||
|
||||
impl Deref for FramedStream {
|
||||
|
||||
Reference in New Issue
Block a user