diff --git a/Cargo.toml b/Cargo.toml index fc0e7bb..7e74c7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,10 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = [] +webrtc = ["dep:webrtc"] + [dependencies] # new flexi_logger failed on rustc 1.75 flexi_logger = { version = "0.27", features = ["async"] } @@ -61,6 +65,7 @@ rustls-pki-types = "1.11" rustls-native-certs = "0.8" webpki-roots = "1.0.4" async-recursion = "1.1" +webrtc = { version = "0.14.0", optional = true } [target.'cfg(not(any(target_os = "android", target_os = "ios")))'.dependencies] mac_address = "1.1" @@ -70,6 +75,10 @@ machine-uid = { git = "https://github.com/rustdesk-org/machine-uid" } [build-dependencies] protobuf-codegen = { version = "3.7" } +[dev-dependencies] +clap = "4.5.51" +webrtc = "0.14.0" + [target.'cfg(target_os = "windows")'.dependencies] winapi = { version = "0.3", features = [ "winuser", diff --git a/examples/webrtc.rs b/examples/webrtc.rs new file mode 100644 index 0000000..2c993ca --- /dev/null +++ b/examples/webrtc.rs @@ -0,0 +1,154 @@ +extern crate hbb_common; + +#[cfg(feature = "webrtc")] +use hbb_common::webrtc::WebRTCStream; + +use std::io::Write; +use anyhow::Result; +use bytes::Bytes; +use clap::{Arg, Command}; +use tokio::time::Duration; + +#[cfg(not(feature = "webrtc"))] +#[tokio::main] +async fn main() -> Result<()> { + println!( + "The webrtc feature is not enabled. \ + Please enable the webrtc feature to run this example." + ); + Ok(()) +} + +#[cfg(feature = "webrtc")] +#[tokio::main] +async fn main() -> Result<()> { + let app = Command::new("webrtc-stream") + .about("An example of webrtc stream using hbb_common and webrtc-rs") + .arg( + Arg::new("debug") + .long("debug") + .short('d') + .action(clap::ArgAction::SetTrue) + .help("Prints debug log information"), + ) + .arg( + Arg::new("offer") + .long("offer") + .short('o') + .help("set offer from other endpoint"), + ); + + let matches = app.clone().get_matches(); + + let debug = matches.contains_id("debug"); + if debug { + println!("Debug log enabled"); + env_logger::Builder::new() + .format(|buf, record| { + writeln!( + buf, + "{}:{} [{}] {} - {}", + record.file().unwrap_or("unknown"), + record.line().unwrap_or(0), + record.level(), + chrono::Local::now().format("%H:%M:%S.%6f"), + record.args() + ) + }) + .filter(Some("hbb_common"), log::LevelFilter::Debug) + .init(); + } + + let remote_endpoint = if let Some(endpoint) = matches.get_one::("offer") { + endpoint.to_string() + } else { + "".to_string() + }; + + let webrtc_stream = WebRTCStream::new(&remote_endpoint, false, 30000).await?; + // Print the offer to be sent to the other peer + let local_endpoint = webrtc_stream.get_local_endpoint().await?; + + if remote_endpoint.is_empty() { + println!(); + // Wait for the answer to be pasted + println!( + "Start new terminal run: \n{} \ncopy remote endpoint and paste here", + format!( + "cargo r --features webrtc --example webrtc -- --offer {}", + local_endpoint + ) + ); + // readline blocking + let line = std::io::stdin() + .lines() + .next() + .ok_or_else(|| anyhow::anyhow!("No input received"))??; + webrtc_stream.set_remote_endpoint(&line).await?; + } else { + println!( + "Copy local endpoint and paste to the other peer: \n{}", + local_endpoint + ); + } + + let s1 = webrtc_stream.clone(); + tokio::spawn(async move { + let _ = read_loop(s1).await; + }); + + let s2 = webrtc_stream.clone(); + tokio::spawn(async move { + let _ = write_loop(s2).await; + }); + + println!("Press ctrl-c to stop"); + tokio::select! { + _ = tokio::signal::ctrl_c() => { + println!(); + } + }; + + Ok(()) +} + +// read_loop shows how to read from the datachannel directly +#[cfg(feature = "webrtc")] +async fn read_loop(mut stream: WebRTCStream) -> Result<()> { + loop { + let Some(res) = stream.next().await else { + println!("WebRTC stream closed; Exit the read_loop"); + return Ok(()); + }; + match res { + Err(e) => { + println!("WebRTC stream read error: {}; Exit the read_loop", e); + return Ok(()); + } + Ok(data) => { + println!("Message from stream: {}", String::from_utf8(data.to_vec())?); + } + } + } +} + +// write_loop shows how to write to the webrtc stream directly +#[cfg(feature = "webrtc")] +async fn write_loop(mut stream: WebRTCStream) -> Result<()> { + let mut result = Result::<()>::Ok(()); + while result.is_ok() { + let timeout = tokio::time::sleep(Duration::from_secs(5)); + tokio::pin!(timeout); + + tokio::select! { + _ = timeout.as_mut() =>{ + let message = webrtc::peer_connection::math_rand_alpha(15); + result = stream.send_bytes(Bytes::from(message.clone())).await; + println!("Sent '{message}' {}", result.is_ok()); + } + }; + } + println!("WebRTC stream write failed; Exit the write_loop"); + + Ok(()) +} diff --git a/src/config.rs b/src/config.rs index 84456f4..e99a90c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2547,6 +2547,7 @@ pub mod keys { pub const OPTION_TRACKPAD_SPEED: &str = "trackpad-speed"; pub const OPTION_REGISTER_DEVICE: &str = "register-device"; pub const OPTION_RELAY_SERVER: &str = "relay-server"; + pub const OPTION_ICE_SERVERS: &str = "ice-servers"; pub const OPTION_DISABLE_UDP: &str = "disable-udp"; pub const OPTION_ALLOW_INSECURE_TLS_FALLBACK: &str = "allow-insecure-tls-fallback"; pub const OPTION_SHOW_VIRTUAL_MOUSE: &str = "show-virtual-mouse"; @@ -2743,6 +2744,7 @@ pub mod keys { OPTION_ENABLE_ANDROID_SOFTWARE_ENCODING_HALF_SCALE, OPTION_ENABLE_TRUSTED_DEVICES, OPTION_RELAY_SERVER, + OPTION_ICE_SERVERS, OPTION_DISABLE_UDP, OPTION_ALLOW_INSECURE_TLS_FALLBACK, ]; diff --git a/src/lib.rs b/src/lib.rs index 851e4b1..372f4ca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,6 +59,8 @@ pub mod fingerprint; pub use flexi_logger; pub mod stream; pub mod websocket; +#[cfg(feature = "webrtc")] +pub mod webrtc; #[cfg(any(target_os = "android", target_os = "ios"))] pub use rustls_platform_verifier; pub use stream::Stream; diff --git a/src/socket_client.rs b/src/socket_client.rs index f0e5b05..9178b74 100644 --- a/src/socket_client.rs +++ b/src/socket_client.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "webrtc")] +use crate::webrtc::{self, is_webrtc_endpoint}; use crate::{ config::{Config, NetworkType}, tcp::FramedStream, @@ -129,6 +131,12 @@ pub async fn connect_tcp< target: T, ms_timeout: u64, ) -> ResultType { + #[cfg(feature = "webrtc")] + if is_webrtc_endpoint(&target.to_string()) { + return Ok(Stream::WebRTC( + webrtc::WebRTCStream::new(&target.to_string(), false, ms_timeout).await?, + )); + } let target_str = check_ws(&target.to_string()); if is_ws_endpoint(&target_str) { return Ok(Stream::WebSocket( diff --git a/src/stream.rs b/src/stream.rs index 987d9be..a8e6b6c 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,10 +1,14 @@ use crate::{config, tcp, websocket, ResultType}; +#[cfg(feature = "webrtc")] +use crate::webrtc; use sodiumoxide::crypto::secretbox::Key; use std::net::SocketAddr; use tokio::net::TcpStream; // support Websocket and tcp. pub enum Stream { + #[cfg(feature = "webrtc")] + WebRTC(webrtc::WebRTCStream), WebSocket(websocket::WsFramedStream), Tcp(tcp::FramedStream), } @@ -13,6 +17,8 @@ impl Stream { #[inline] pub fn set_send_timeout(&mut self, ms: u64) { match self { + #[cfg(feature = "webrtc")] + Stream::WebRTC(s) => s.set_send_timeout(ms), Stream::WebSocket(s) => s.set_send_timeout(ms), Stream::Tcp(s) => s.set_send_timeout(ms), } @@ -21,6 +27,8 @@ impl Stream { #[inline] pub fn set_raw(&mut self) { match self { + #[cfg(feature = "webrtc")] + Stream::WebRTC(s) => s.set_raw(), Stream::WebSocket(s) => s.set_raw(), Stream::Tcp(s) => s.set_raw(), } @@ -29,6 +37,8 @@ impl Stream { #[inline] pub async fn send_bytes(&mut self, bytes: bytes::Bytes) -> ResultType<()> { match self { + #[cfg(feature = "webrtc")] + Stream::WebRTC(s) => s.send_bytes(bytes).await, Stream::WebSocket(s) => s.send_bytes(bytes).await, Stream::Tcp(s) => s.send_bytes(bytes).await, } @@ -37,6 +47,8 @@ impl Stream { #[inline] pub async fn send_raw(&mut self, bytes: Vec) -> ResultType<()> { match self { + #[cfg(feature = "webrtc")] + Stream::WebRTC(s) => s.send_raw(bytes).await, Stream::WebSocket(s) => s.send_raw(bytes).await, Stream::Tcp(s) => s.send_raw(bytes).await, } @@ -45,6 +57,8 @@ impl Stream { #[inline] pub fn set_key(&mut self, key: Key) { match self { + #[cfg(feature = "webrtc")] + Stream::WebRTC(s) => s.set_key(key), Stream::WebSocket(s) => s.set_key(key), Stream::Tcp(s) => s.set_key(key), } @@ -53,6 +67,8 @@ impl Stream { #[inline] pub fn is_secured(&self) -> bool { match self { + #[cfg(feature = "webrtc")] + Stream::WebRTC(s) => s.is_secured(), Stream::WebSocket(s) => s.is_secured(), Stream::Tcp(s) => s.is_secured(), } @@ -64,6 +80,8 @@ impl Stream { timeout: u64, ) -> Option> { match self { + #[cfg(feature = "webrtc")] + Stream::WebRTC(s) => s.next_timeout(timeout).await, Stream::WebSocket(s) => s.next_timeout(timeout).await, Stream::Tcp(s) => s.next_timeout(timeout).await, } @@ -87,6 +105,8 @@ impl Stream { #[inline] pub async fn send(&mut self, msg: &impl protobuf::Message) -> ResultType<()> { match self { + #[cfg(feature = "webrtc")] + Self::WebRTC(s) => s.send(msg).await, Self::WebSocket(ws) => ws.send(msg).await, Self::Tcp(tcp) => tcp.send(msg).await, } @@ -96,6 +116,8 @@ impl Stream { #[inline] pub async fn next(&mut self) -> Option> { match self { + #[cfg(feature = "webrtc")] + Self::WebRTC(s) => s.next().await, Self::WebSocket(ws) => ws.next().await, Self::Tcp(tcp) => tcp.next().await, } @@ -104,6 +126,8 @@ impl Stream { #[inline] pub fn local_addr(&self) -> SocketAddr { match self { + #[cfg(feature = "webrtc")] + Self::WebRTC(s) => s.local_addr(), Self::WebSocket(ws) => ws.local_addr(), Self::Tcp(tcp) => tcp.local_addr(), } @@ -113,4 +137,13 @@ impl Stream { pub fn from(stream: TcpStream, stream_addr: SocketAddr) -> Self { Self::Tcp(tcp::FramedStream::from(stream, stream_addr)) } + + #[inline] + #[cfg(feature = "webrtc")] + pub fn get_webrtc_stream(&self) -> Option { + match self { + Self::WebRTC(s) => Some(s.clone()), + _ => None, + } + } } diff --git a/src/webrtc.rs b/src/webrtc.rs new file mode 100644 index 0000000..8f3c410 --- /dev/null +++ b/src/webrtc.rs @@ -0,0 +1,770 @@ +use std::collections::HashMap; +use std::io::{Error, ErrorKind}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::Arc; +use std::time::Duration; + +use webrtc::api::setting_engine::SettingEngine; +use webrtc::api::APIBuilder; +use webrtc::data_channel::RTCDataChannel; +use webrtc::ice::mdns::MulticastDnsMode; +use webrtc::ice_transport::ice_server::RTCIceServer; +use webrtc::peer_connection::configuration::RTCConfiguration; +use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState; +use webrtc::peer_connection::policy::ice_transport_policy::RTCIceTransportPolicy; +use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; +use webrtc::peer_connection::RTCPeerConnection; + +use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; +use base64::Engine; +use bytes::{Bytes, BytesMut}; +use tokio::sync::watch; +use tokio::sync::Mutex; +use tokio::time::timeout; +use url::Url; + +use crate::config; +use crate::protobuf::Message; +use crate::sodiumoxide::crypto::secretbox::Key; +use crate::ResultType; + +pub struct WebRTCStream { + pc: Arc, + stream: Arc>>, + state_notify: watch::Receiver, + send_timeout: u64, +} + +/// Standard maximum message size for WebRTC data channels (RFC 8831, 65535 bytes). +/// Most browsers, including Chromium, enforce this protocol limit. +const DATA_CHANNEL_BUFFER_SIZE: u16 = u16::MAX; + +// use 3 public STUN servers to find out the NAT type, 2 must be the same address but different ports +// https://stackoverflow.com/questions/72805316/determine-nat-mapping-behaviour-using-two-stun-servers +// luckily nextcloud supports two ports for STUN +// unluckily webrtc-rs does not use the same port to do the STUN request +static DEFAULT_ICE_SERVERS: [&str; 3] = [ + "stun:stun.cloudflare.com:3478", + "stun:stun.nextcloud.com:3478", + "stun:stun.nextcloud.com:443", +]; + +lazy_static::lazy_static! { + static ref SESSIONS: Arc::>> = Default::default(); +} + +impl Clone for WebRTCStream { + fn clone(&self) -> Self { + WebRTCStream { + pc: self.pc.clone(), + stream: self.stream.clone(), + state_notify: self.state_notify.clone(), + send_timeout: self.send_timeout, + } + } +} + +impl WebRTCStream { + #[inline] + fn get_remote_offer(endpoint: &str) -> ResultType { + // Ensure the endpoint starts with the "webrtc://" prefix + if !endpoint.starts_with("webrtc://") { + return Err( + Error::new(ErrorKind::InvalidInput, "Invalid WebRTC endpoint format").into(), + ); + } + + // Extract the Base64-encoded SDP part + let encoded_sdp = &endpoint["webrtc://".len()..]; + // Decode the Base64 string + let decoded_bytes = BASE64_STANDARD + .decode(encoded_sdp) + .map_err(|_| Error::new(ErrorKind::InvalidInput, "Failed to decode Base64 SDP"))?; + Ok(String::from_utf8(decoded_bytes).map_err(|_| { + Error::new( + ErrorKind::InvalidInput, + "Failed to convert decoded bytes to UTF-8", + ) + })?) + } + + #[inline] + fn sdp_to_endpoint(sdp: &str) -> String { + let encoded_sdp = BASE64_STANDARD.encode(sdp); + format!("webrtc://{}", encoded_sdp) + } + + #[inline] + fn get_key_for_sdp(sdp: &RTCSessionDescription) -> ResultType { + let binding = sdp.unmarshal()?; + let Some(fingerprint) = binding.attribute("fingerprint") else { + // find fingerprint attribute in media descriptions + for media in &binding.media_descriptions { + if media.media_name.media != "application" { + continue; + } + if let Some(fp) = media + .attributes + .iter() + .find(|x| x.key == "fingerprint") + .and_then(|x| x.value.clone()) + { + return Ok(fp); + } + } + return Err(anyhow::anyhow!("SDP fingerprint attribute not found")); + }; + Ok(fingerprint.to_string()) + } + + #[inline] + fn get_key_for_sdp_json(sdp_json: &str) -> ResultType { + if sdp_json.is_empty() { + return Ok("".to_string()); + } + let sdp = serde_json::from_str::(&sdp_json)?; + Self::get_key_for_sdp(&sdp) + } + + #[inline] + async fn get_key_for_peer(pc: &Arc, is_local: bool) -> ResultType { + let Some(desc) = (match is_local { + true => pc.local_description().await, + false => pc.remote_description().await, + }) else { + return Err(anyhow::anyhow!("PeerConnection description is not set")); + }; + Self::get_key_for_sdp(&desc) + } + + #[inline] + fn get_ice_server_from_url(url: &str) -> Option { + // standard url format with turn scheme: turn://user:pass@host:port + match Url::parse(url) { + Ok(u) => { + if u.scheme() == "turn" + || u.scheme() == "turns" + || u.scheme() == "stun" + || u.scheme() == "stuns" + { + Some(RTCIceServer { + urls: vec![format!( + "{}:{}:{}", + u.scheme(), + u.host_str().unwrap_or_default(), + u.port().unwrap_or(3478) + )], + username: u.username().to_string(), + credential: u.password().unwrap_or_default().to_string(), + ..Default::default() + }) + } else { + None + } + } + Err(_) => None, + } + } + + #[inline] + fn get_ice_servers() -> Vec { + let mut ice_servers = Vec::new(); + let cfg = config::Config::get_option(config::keys::OPTION_ICE_SERVERS); + + let mut has_stun = false; + + for url in cfg.split(',').map(str::trim) { + if let Some(ice_server) = Self::get_ice_server_from_url(url) { + // Detect STUN in user config + if ice_server + .urls + .iter() + .any(|u| u.starts_with("stun:") || u.starts_with("stuns:")) + { + has_stun = true; + } + + ice_servers.push(ice_server); + } + } + + // If there is no STUN (either TURN-only or empty config) → prepend defaults + if !has_stun { + ice_servers.insert( + 0, + RTCIceServer { + urls: DEFAULT_ICE_SERVERS.iter().map(|s| s.to_string()).collect(), + ..Default::default() + }, + ); + } + ice_servers + } + + pub async fn new( + remote_endpoint: &str, + force_relay: bool, + ms_timeout: u64, + ) -> ResultType { + log::debug!("New webrtc stream to endpoint: {}", remote_endpoint); + let remote_offer = if remote_endpoint.is_empty() { + "".into() + } else { + Self::get_remote_offer(remote_endpoint)? + }; + + let mut key = Self::get_key_for_sdp_json(&remote_offer)?; + let sessions_lock = SESSIONS.lock().await; + if let Some(cached_stream) = sessions_lock.get(&key) { + if !key.is_empty() { + log::debug!("Start webrtc with cached peer"); + return Ok(cached_stream.clone()); + } + } + drop(sessions_lock); + + let start_local_offer = remote_offer.is_empty(); + // Create a SettingEngine and enable Detach + let mut s = SettingEngine::default(); + s.detach_data_channels(); + s.set_ice_multicast_dns_mode(MulticastDnsMode::Disabled); + + // Create the API object + let api = APIBuilder::new().with_setting_engine(s).build(); + + // Prepare the configuration, get ICE servers from config + let config = RTCConfiguration { + ice_servers: Self::get_ice_servers(), + ice_transport_policy: if force_relay { + RTCIceTransportPolicy::Relay + } else { + RTCIceTransportPolicy::All + }, + ..Default::default() + }; + + let (notify_tx, notify_rx) = watch::channel(false); + // Create a new RTCPeerConnection + let pc = Arc::new(api.new_peer_connection(config).await?); + let bootstrap_dc = if start_local_offer { + let dc_open_notify = notify_tx.clone(); + // Create a data channel with label "bootstrap" + let dc = pc.create_data_channel("bootstrap", None).await?; + dc.on_open(Box::new(move || { + log::debug!("Local data channel bootstrap open."); + let _ = dc_open_notify.send(true); + Box::pin(async {}) + })); + dc + } else { + // Wait for the data channel to be created by the remote peer + // Here we create a dummy data channel to satisfy the type system + Arc::new(RTCDataChannel::default()) + }; + + let stream = Arc::new(Mutex::new(bootstrap_dc)); + if !start_local_offer { + // Register data channel creation handling + let dc_open_notify = notify_tx.clone(); + let stream_for_dc = stream.clone(); + pc.on_data_channel(Box::new(move |dc: Arc| { + let d_label = dc.label().to_owned(); + let dc_open_notify2 = dc_open_notify.clone(); + let stream_for_dc_clone = stream_for_dc.clone(); + log::debug!("Remote data channel {} ready", d_label); + Box::pin(async move { + let mut stream_lock = stream_for_dc_clone.lock().await; + *stream_lock = dc.clone(); + drop(stream_lock); + dc.on_open(Box::new(move || { + let _ = dc_open_notify2.send(true); + Box::pin(async {}) + })); + }) + })); + } + + // This will notify you when the peer has connected/disconnected + let stream_for_close = stream.clone(); + let pc_for_close = pc.clone(); + pc.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { + let stream_for_close2 = stream_for_close.clone(); + let on_connection_notify = notify_tx.clone(); + let pc_for_close2 = pc_for_close.clone(); + Box::pin(async move { + log::debug!("WebRTC session peer connection state: {}", s); + match s { + RTCPeerConnectionState::Disconnected + | RTCPeerConnectionState::Failed + | RTCPeerConnectionState::Closed => { + let _ = on_connection_notify.send(true); + log::debug!("WebRTC session closing due to disconnected"); + let _ = stream_for_close2.lock().await.close().await; + log::debug!("WebRTC session stream closed"); + + let mut sessions_lock = SESSIONS.lock().await; + match Self::get_key_for_peer(&pc_for_close2, start_local_offer).await { + Ok(k) => { + sessions_lock.remove(&k); + log::debug!("WebRTC session removed key: {}", k); + } + Err(e) => { + log::error!( + "Failed to extract key for peer during session cleanup: {:?}", + e + ); + // Fallback: try to remove any session associated with this peer connection + let keys_to_remove: Vec = sessions_lock + .iter() + .filter_map(|(key, session)| { + if Arc::ptr_eq(&session.pc, &pc_for_close2) { + Some(key.clone()) + } else { + None + } + }) + .collect(); + for k in keys_to_remove { + sessions_lock.remove(&k); + log::debug!("WebRTC session removed by fallback key: {}", k); + } + } + } + } + _ => {} + } + }) + })); + + // process offer/answer + if start_local_offer { + let sdp = pc.create_offer(None).await?; + let mut gather_complete = pc.gathering_complete_promise().await; + pc.set_local_description(sdp.clone()).await?; + let _ = gather_complete.recv().await; + + log::debug!("local offer:\n{}", sdp.sdp); + // get local sdp key + key = Self::get_key_for_sdp(&sdp)?; + log::debug!("Start webrtc with local key: {}", key); + } else { + let sdp = serde_json::from_str::(&remote_offer)?; + pc.set_remote_description(sdp.clone()).await?; + let answer = pc.create_answer(None).await?; + let mut gather_complete = pc.gathering_complete_promise().await; + pc.set_local_description(answer).await?; + let _ = gather_complete.recv().await; + + log::debug!("remote offer:\n{}", sdp.sdp); + // get remote sdp key + key = Self::get_key_for_sdp(&sdp)?; + log::debug!("Start webrtc with remote key: {}", key); + } + + let mut final_lock = SESSIONS.lock().await; + if let Some(session) = final_lock.get(&key) { + pc.close().await.ok(); + return Ok(session.clone()); + } + + let webrtc_stream = Self { + pc, + stream, + state_notify: notify_rx, + send_timeout: ms_timeout, + }; + final_lock.insert(key, webrtc_stream.clone()); + Ok(webrtc_stream) + } + + #[inline] + pub async fn get_local_endpoint(&self) -> ResultType { + if let Some(local_desc) = self.pc.local_description().await { + let sdp = serde_json::to_string(&local_desc)?; + let endpoint = Self::sdp_to_endpoint(&sdp); + Ok(endpoint) + } else { + Err(anyhow::anyhow!("Local desc is not set")) + } + } + + #[inline] + pub async fn set_remote_endpoint(&self, endpoint: &str) -> ResultType<()> { + let offer = Self::get_remote_offer(endpoint)?; + log::debug!("WebRTC set remote sdp: {}", offer); + let sdp = serde_json::from_str::(&offer)?; + self.pc.set_remote_description(sdp).await?; + Ok(()) + } + + #[inline] + pub fn set_raw(&mut self) { + // not-supported + } + + #[inline] + pub fn local_addr(&self) -> SocketAddr { + SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0) + } + + #[inline] + pub fn set_send_timeout(&mut self, ms: u64) { + self.send_timeout = ms; + } + + #[inline] + pub fn set_key(&mut self, _key: Key) { + // not-supported + // WebRTC uses built-in DTLS encryption for secure communication. + // DTLS handles key exchange and encryption automatically, so explicit key management is not required. + } + + #[inline] + pub fn is_secured(&self) -> bool { + true + } + + #[inline] + pub async fn send(&mut self, msg: &impl Message) -> ResultType<()> { + self.send_raw(msg.write_to_bytes()?).await + } + + #[inline] + pub async fn send_raw(&mut self, msg: Vec) -> ResultType<()> { + self.send_bytes(Bytes::from(msg)).await + } + + #[inline] + async fn wait_for_connect_result(&mut self) { + if *self.state_notify.borrow() { + return; + } + let _ = self.state_notify.changed().await; + } + + pub async fn send_bytes(&mut self, bytes: Bytes) -> ResultType<()> { + if self.send_timeout > 0 { + match timeout( + Duration::from_millis(self.send_timeout), + self.wait_for_connect_result(), + ) + .await + { + Ok(_) => {} + Err(_) => { + self.pc.close().await.ok(); + return Err(Error::new( + ErrorKind::TimedOut, + "WebRTC send wait for connect timeout", + ) + .into()); + } + } + } else { + self.wait_for_connect_result().await; + } + let stream = self.stream.lock().await.clone(); + stream.send(&bytes).await?; + Ok(()) + } + + #[inline] + pub async fn next(&mut self) -> Option> { + self.wait_for_connect_result().await; + let stream = self.stream.lock().await.clone(); + + // TODO reuse buffer? + let mut buffer = BytesMut::zeroed(DATA_CHANNEL_BUFFER_SIZE as usize); + let dc = stream.detach().await.ok()?; + let n = match dc.read(&mut buffer).await { + Ok(n) => n, + Err(err) => { + self.pc.close().await.ok(); + return Some(Err(Error::new( + ErrorKind::Other, + format!("data channel read error: {}", err), + ))); + } + }; + if n == 0 { + self.pc.close().await.ok(); + return Some(Err(Error::new( + ErrorKind::Other, + "data channel read exited with 0 bytes", + ))); + } + buffer.truncate(n); + Some(Ok(buffer)) + } + + #[inline] + pub async fn next_timeout(&mut self, ms: u64) -> Option> { + match timeout(Duration::from_millis(ms), self.next()).await { + Ok(res) => res, + Err(_) => None, + } + } +} + +pub fn is_webrtc_endpoint(endpoint: &str) -> bool { + // use sdp base64 json string as endpoint, or prefix webrtc: + endpoint.starts_with("webrtc://") +} + +#[cfg(test)] +mod tests { + use crate::config; + use crate::webrtc::WebRTCStream; + use crate::webrtc::DEFAULT_ICE_SERVERS; + use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; + + #[test] + fn test_webrtc_ice_url() { + assert_eq!( + WebRTCStream::get_ice_server_from_url("turn://example.com:3478") + .unwrap_or_default() + .urls[0], + "turn:example.com:3478" + ); + + assert_eq!( + WebRTCStream::get_ice_server_from_url("turn://example.com") + .unwrap_or_default() + .urls[0], + "turn:example.com:3478" + ); + + assert_eq!( + WebRTCStream::get_ice_server_from_url("turn://123@example.com") + .unwrap_or_default() + .username, + "123" + ); + + assert_eq!( + WebRTCStream::get_ice_server_from_url("turn://123@example.com") + .unwrap_or_default() + .credential, + "" + ); + + assert_eq!( + WebRTCStream::get_ice_server_from_url("turn://123:321@example.com") + .unwrap_or_default() + .credential, + "321" + ); + + assert_eq!( + WebRTCStream::get_ice_server_from_url("stun://example.com:3478") + .unwrap_or_default() + .urls[0], + "stun:example.com:3478" + ); + + assert_eq!( + WebRTCStream::get_ice_server_from_url("http://123:123@example.com:3478"), + None + ); + + config::Config::set_option("ice-servers".to_string(), "".to_string()); + assert_eq!( + WebRTCStream::get_ice_servers()[0].urls[0], + DEFAULT_ICE_SERVERS[0].to_string() + ); + + config::Config::set_option( + "ice-servers".to_string(), + ",stun://example.com,turn://example.com,sdf".to_string(), + ); + assert_eq!( + WebRTCStream::get_ice_servers()[0].urls[0], + "stun:example.com:3478" + ); + assert_eq!( + WebRTCStream::get_ice_servers()[1].urls[0], + "turn:example.com:3478" + ); + assert_eq!(WebRTCStream::get_ice_servers().len(), 2); + config::Config::set_option( + "ice-servers".to_string(), + "".to_string(), + ); + } + + #[test] + fn test_webrtc_session_key() { + let mut sdp_str = "".to_owned(); + assert_eq!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer(sdp_str).unwrap_or_default() + ) + .unwrap_or_default(), + "" + ); + + sdp_str = "\ +v=0 +o=- 7400546379179479477 208696200 IN IP4 0.0.0.0 +s=- +t=0 0 +a=fingerprint:sha-256 97:52:D6:1F:1E:87:6C:DA:B8:21:95:64:A5:85:89:FA:02:71:C7:4D:B3:FD:25:92:40:FB:6B:65:24:3C:79:88 +a=group:BUNDLE 0 +a=extmap-allow-mixed +m=application 9 UDP/DTLS/SCTP webrtc-datachannel +c=IN IP4 0.0.0.0 +a=setup:actpass +a=mid:0 +a=sendrecv +a=sctp-port:5000 +a=ice-ufrag:RMWjjpXfpXbDPdMz +a=ice-pwd:BtIqlWHfwhsJdFiBROeLuEbNmYfHxRfT".to_owned(); + assert_eq!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer(sdp_str).unwrap_or_default() + ).unwrap_or_default(), + "sha-256 97:52:D6:1F:1E:87:6C:DA:B8:21:95:64:A5:85:89:FA:02:71:C7:4D:B3:FD:25:92:40:FB:6B:65:24:3C:79:88" + ); + + sdp_str = "\ +v=0 +o=- 7400546379179479477 208696200 IN IP4 0.0.0.0 +s=- +t=0 0 +a=group:BUNDLE 0 +a=extmap-allow-mixed +m=application 9 UDP/DTLS/SCTP webrtc-datachannel +c=IN IP4 0.0.0.0 +a=fingerprint:sha-256 97:52:D6:1F:1E:87:6C:DA:B8:21:95:64:A5:85:89:FA:02:71:C7:4D:B3:FD:25:92:40:FB:6B:65:24:3C:79:88 +a=setup:actpass +a=mid:0 +a=sendrecv +a=sctp-port:5000 +a=ice-ufrag:RMWjjpXfpXbDPdMz +a=ice-pwd:BtIqlWHfwhsJdFiBROeLuEbNmYfHxRfT".to_owned(); + assert_eq!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer(sdp_str).unwrap_or_default() + ).unwrap_or_default(), + "sha-256 97:52:D6:1F:1E:87:6C:DA:B8:21:95:64:A5:85:89:FA:02:71:C7:4D:B3:FD:25:92:40:FB:6B:65:24:3C:79:88" + ); + + sdp_str = "\ +v=0 +o=- 7400546379179479477 208696200 IN IP4 0.0.0.0 +s=- +t=0 0 +a=group:BUNDLE 0 +a=extmap-allow-mixed +m=application 9 UDP/DTLS/SCTP webrtc-datachannel +c=IN IP4 0.0.0.0 +a=setup:actpass +a=mid:0 +a=sendrecv +a=sctp-port:5000 +a=ice-ufrag:RMWjjpXfpXbDPdMz +a=ice-pwd:BtIqlWHfwhsJdFiBROeLuEbNmYfHxRfT" + .to_owned(); + assert!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer(sdp_str).unwrap_or_default() + ) + .is_err(), + "can not find fingerprint attribute" + ); + + sdp_str = "\ +v=0 +o=- 7400546379179479477 208696200 IN IP4 0.0.0.0 +s=- +t=0 0 +a=group:BUNDLE 0 +a=extmap-allow-mixed +m=audio 9 UDP/DTLS/SCTP webrtc-datachannel +c=IN IP4 0.0.0.0 +a=fingerprint:sha-256 97:52:D6:1F:1E:87:6C:DA:B8:21:95:64:A5:85:89:FA:02:71:C7:4D:B3:FD:25:92:40:FB:6B:65:24:3C:79:88 +a=setup:actpass +a=mid:0 +a=sendrecv +a=sctp-port:5000 +a=ice-ufrag:RMWjjpXfpXbDPdMz +a=ice-pwd:BtIqlWHfwhsJdFiBROeLuEbNmYfHxRfT".to_owned(); + assert!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer(sdp_str).unwrap_or_default() + ) + .is_err(), + "can not find datachannel fingerprint attribute" + ); + + assert!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer("".to_owned()).unwrap_or_default() + ) + .is_err(), + "invalid sdp should error" + ); + + assert!( + WebRTCStream::get_key_for_sdp_json("{}").is_err(), + "empty sdp json should error" + ); + + assert!( + WebRTCStream::get_key_for_sdp_json("{ss}").is_err(), + "invalid sdp json should error" + ); + + let endpoint = "webrtc://eyJ0eXBlIjoiYW5zd2VyIiwic2RwIjoidj0wXHJcbm89LSA0MTA1NDk3NTY2NDgyMTQzODEwIDYwMzk1NzQw\ +MCBJTiBJUDQgMC4wLjAuMFxyXG5zPS1cclxudD0wIDBcclxuYT1maW5nZXJwcmludDpzaGEtMjU2IDYxOjYwOjc0OjQwOjI4OkNFOjBCOjBDOjc1OjRCOj\ +EwOjlBOkVFOjc3OkY1OjQ0OjU3Ojg0OjUxOkRCOjA0OjkyOjRBOjEwOjFDOjRFOjVGOjdFOkYxOkIzOjcxOjIyXHJcbmE9Z3JvdXA6QlVORExFIDBcclxu\ +YT1leHRtYXAtYWxsb3ctbWl4ZWRcclxubT1hcHBsaWNhdGlvbiA5IFVEUC9EVExTL1NDVFAgd2VicnRjLWRhdGFjaGFubmVsXHJcbmM9SU4gSVA0IDAuMC\ +4wLjBcclxuYT1zZXR1cDphY3RpdmVcclxuYT1taWQ6MFxyXG5hPXNlbmRyZWN2XHJcbmE9c2N0cC1wb3J0OjUwMDBcclxuYT1pY2UtdWZyYWc6SHlnU1Rr\ +V2RsRlpHRG1XWlxyXG5hPWljZS1wd2Q6SkJneFZWaGZveVhHdHZha1VWcnBQeHVOSVpMU3llS1pcclxuYT1jYW5kaWRhdGU6OTYzOTg4MzQ4IDEgdWRwID\ +IxMzA3MDY0MzEgMTkyLjE2OC4xLjIgNjQwMDcgdHlwIGhvc3RcclxuYT1jYW5kaWRhdGU6OTYzOTg4MzQ4IDIgdWRwIDIxMzA3MDY0MzEgMTkyLjE2OC4x\ +LjIgNjQwMDcgdHlwIGhvc3RcclxuYT1jYW5kaWRhdGU6MTg2MTA0NTE5MCAxIHVkcCAxNjk0NDk4ODE1IDE0LjIxMi42OC4xMiAyNzAwNCB0eXAgc3JmbH\ +ggcmFkZHIgMC4wLjAuMCBycG9ydCA2NDAwOFxyXG5hPWNhbmRpZGF0ZToxODYxMDQ1MTkwIDIgdWRwIDE2OTQ0OTg4MTUgMTQuMjEyLjY4LjEyIDI3MDA0\ +IHR5cCBzcmZseCByYWRkciAwLjAuMC4wIHJwb3J0IDY0MDA4XHJcbmE9ZW5kLW9mLWNhbmRpZGF0ZXNcclxuIn0=".to_owned(); + assert_eq!( + WebRTCStream::get_key_for_sdp_json( + &WebRTCStream::get_remote_offer(&endpoint).unwrap_or_default() + ).unwrap_or_default(), + "sha-256 61:60:74:40:28:CE:0B:0C:75:4B:10:9A:EE:77:F5:44:57:84:51:DB:04:92:4A:10:1C:4E:5F:7E:F1:B3:71:22" + ); + } + + #[tokio::test] + async fn test_webrtc_new_stream() { + let mut endpoint = "webrtc://sdfsdf".to_owned(); + assert!( + WebRTCStream::new(&endpoint, false, 10000).await.is_err(), + "invalid webrtc endpoint should error" + ); + + endpoint = "wss://sdfsdf".to_owned(); + assert!( + WebRTCStream::new(&endpoint, false, 10000).await.is_err(), + "invalid webrtc endpoint should error" + ); + + assert!( + WebRTCStream::new("", false, 10000).await.is_ok(), + "local webrtc endpoint should ok" + ); + + endpoint = "webrtc://eyJ0eXBlIjoiYW5zd2VyIiwic2RwIjoidj0wXHJcbm89LSA0MTA1NDk3NTY2NDgyMTQzODEwIDYwMzk1NzQw\ +MCBJTiBJUDQgMC4wLjAuMFxyXG5zPS1cclxudD0wIDBcclxuYT1maW5nZXJwcmludDpzaGEtMjU2IDYxOjYwOjc0OjQwOjI4OkNFOjBCOjBDOjc1OjRCOj\ +EwOjlBOkVFOjc3OkY1OjQ0OjU3Ojg0OjUxOkRCOjA0OjkyOjRBOjEwOjFDOjRFOjVGOjdFOkYxOkIzOjcxOjIyXHJcbmE9Z3JvdXA6QlVORExFIDBcclxu\ +YT1leHRtYXAtYWxsb3ctbWl4ZWRcclxubT1hcHBsaWNhdGlvbiA5IFVEUC9EVExTL1NDVFAgd2VicnRjLWRhdGFjaGFubmVsXHJcbmM9SU4gSVA0IDAuMC\ +4wLjBcclxuYT1zZXR1cDphY3RpdmVcclxuYT1taWQ6MFxyXG5hPXNlbmRyZWN2XHJcbmE9c2N0cC1wb3J0OjUwMDBcclxuYT1pY2UtdWZyYWc6SHlnU1Rr\ +V2RsRlpHRG1XWlxyXG5hPWljZS1wd2Q6SkJneFZWaGZveVhHdHZha1VWcnBQeHVOSVpMU3llS1pcclxuYT1jYW5kaWRhdGU6OTYzOTg4MzQ4IDEgdWRwID\ +IxMzA3MDY0MzEgMTkyLjE2OC4xLjIgNjQwMDcgdHlwIGhvc3RcclxuYT1jYW5kaWRhdGU6OTYzOTg4MzQ4IDIgdWRwIDIxMzA3MDY0MzEgMTkyLjE2OC4x\ +LjIgNjQwMDcgdHlwIGhvc3RcclxuYT1jYW5kaWRhdGU6MTg2MTA0NTE5MCAxIHVkcCAxNjk0NDk4ODE1IDE0LjIxMi42OC4xMiAyNzAwNCB0eXAgc3JmbH\ +ggcmFkZHIgMC4wLjAuMCBycG9ydCA2NDAwOFxyXG5hPWNhbmRpZGF0ZToxODYxMDQ1MTkwIDIgdWRwIDE2OTQ0OTg4MTUgMTQuMjEyLjY4LjEyIDI3MDA0\ +IHR5cCBzcmZseCByYWRkciAwLjAuMC4wIHJwb3J0IDY0MDA4XHJcbmE9ZW5kLW9mLWNhbmRpZGF0ZXNcclxuIn0=".to_owned(); + assert!( + WebRTCStream::new(&endpoint, false, 10000).await.is_err(), + "connect to an 'answer' webrtc endpoint should error" + ); + } +}