diff --git a/examples/webrtc_dummy.rs b/examples/webrtc_dummy.rs index 88ac9be..78d5490 100644 --- a/examples/webrtc_dummy.rs +++ b/examples/webrtc_dummy.rs @@ -12,17 +12,12 @@ pub struct WebRTCStream { impl Clone for WebRTCStream { fn clone(&self) -> Self { - WebRTCStream { - } + WebRTCStream {} } } impl WebRTCStream { - - pub async fn new( - _: &str, - _: u64, - ) -> ResultType { + pub async fn new(_: &str, _: u64) -> ResultType { Ok(Self {}) } @@ -46,3 +41,5 @@ impl WebRTCStream { None } } + +fn main() {} diff --git a/src/webrtc.rs b/src/webrtc.rs index 8ffb91b..7fda077 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -81,17 +81,46 @@ impl WebRTCStream { } #[inline] - async fn get_key_for_peer(pc: &Arc) -> String { - if let Some(local_desc) = pc.local_description().await { - if local_desc.sdp_type != webrtc::peer_connection::sdp::sdp_type::RTCSdpType::Offer { - let Some(remote_desc) = pc.remote_description().await else { - return "".into(); - }; - return serde_json::to_string(&remote_desc).unwrap_or_default(); + fn get_key_for_sdp(sdp: &RTCSessionDescription) -> ResultType { + let binding = sdp.unmarshal()?; + let Some(fingerprint) = binding.attribute("fingerprint") else { + // find fingerprint attribute in media descriptions + for media in &binding.media_descriptions { + if media.media_name.media != "application" { + continue; + } + if let Some(fp) = media + .attributes + .iter() + .find(|x| x.key == "fingerprint") + .and_then(|x| x.value.clone()) + { + return Ok(fp); + } } - return serde_json::to_string(&local_desc).unwrap_or_default(); + return Err(anyhow::anyhow!("SDP fingerprint attribute not found")); + }; + Ok(fingerprint.to_string()) + } + + #[inline] + fn get_key_for_sdp_json(sdp_json: &str) -> ResultType { + if sdp_json.is_empty() { + return Ok("".to_string()); } - "".into() + let sdp = serde_json::from_str::(&sdp_json)?; + Self::get_key_for_sdp(&sdp) + } + + #[inline] + async fn get_key_for_peer(pc: &Arc, is_local: bool) -> ResultType { + let Some(desc) = (match is_local { + true => pc.local_description().await, + false => pc.remote_description().await, + }) else { + return Err(anyhow::anyhow!("PeerConnection description is not set")); + }; + Self::get_key_for_sdp(&desc) } pub async fn new(remote_endpoint: &str, ms_timeout: u64) -> ResultType { @@ -102,7 +131,7 @@ impl WebRTCStream { Self::get_remote_offer(remote_endpoint)? }; - let mut key = remote_offer.clone(); + let mut key = Self::get_key_for_sdp_json(&remote_offer)?; let mut lock = SESSIONS.lock().await; if let Some(cached_stream) = lock.get(&key) { if !key.is_empty() { @@ -128,98 +157,110 @@ impl WebRTCStream { ..Default::default() }; + let start_local_offer = remote_offer.is_empty(); let (notify_tx, notify_rx) = watch::channel(false); - let dc_open_notify = notify_tx.clone(); // Create a new RTCPeerConnection let pc = Arc::new(api.new_peer_connection(config).await?); - let bootstrap_dc = if remote_offer.is_empty() { + let bootstrap_dc = if start_local_offer { + let dc_open_notify = notify_tx.clone(); // Create a data channel with label "bootstrap" - pc.create_data_channel("bootstrap", None).await? + let dc = pc.create_data_channel("bootstrap", None).await?; + dc.on_open(Box::new(move || { + log::debug!("Local data channel bootstrap open."); + let _ = dc_open_notify.send(true); + Box::pin(async {}) + })); + dc } else { // Wait for the data channel to be created by the remote peer // Here we create a dummy data channel to satisfy the type system Arc::new(RTCDataChannel::default()) }; - bootstrap_dc.on_open(Box::new(move || { - log::debug!("Local data channel bootstrap open."); - let _ = dc_open_notify.send(true); - Box::pin(async {}) - })); - let stream = Arc::new(Mutex::new(bootstrap_dc.clone())); + let stream = Arc::new(Mutex::new(bootstrap_dc)); + if !start_local_offer { + // Register data channel creation handling + let dc_open_notify = notify_tx.clone(); + let stream_for_dc = stream.clone(); + pc.on_data_channel(Box::new(move |dc: Arc| { + let d_label = dc.label().to_owned(); + let dc_open_notify2 = dc_open_notify.clone(); + let stream_for_dc_clone = stream_for_dc.clone(); + log::debug!("Remote data channel {} ready", d_label); + Box::pin(async move { + let mut stream_lock = stream_for_dc_clone.lock().await; + *stream_lock = dc.clone(); + drop(stream_lock); + dc.on_open(Box::new(move || { + let _ = dc_open_notify2.send(true); + Box::pin(async {}) + })); + }) + })); + } // This will notify you when the peer has connected/disconnected - let on_connection_notify = notify_tx.clone(); let stream_for_close = stream.clone(); let pc_for_close = pc.clone(); pc.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { let stream_for_close2 = stream_for_close.clone(); - let on_connection_notify2 = on_connection_notify.clone(); + let on_connection_notify = notify_tx.clone(); let pc_for_close2 = pc_for_close.clone(); Box::pin(async move { - log::debug!("Peer connection state : {}", s); + log::debug!("WebRTC session peer connection state: {}", s); match s { RTCPeerConnectionState::Disconnected | RTCPeerConnectionState::Failed | RTCPeerConnectionState::Closed => { - let _ = on_connection_notify2.send(true); + let _ = on_connection_notify.send(true); log::debug!("WebRTC session closing due to disconnected"); let _ = stream_for_close2.lock().await.close().await; log::debug!("WebRTC session stream closed"); let mut lock = SESSIONS.lock().await; - let key = WebRTCStream::get_key_for_peer(&pc_for_close2).await; - lock.remove(&key); - log::debug!( - "WebRTC session removed key from cache: {} current len: {}", - key, - lock.len() - ); + match Self::get_key_for_peer(&pc_for_close2, start_local_offer).await.ok() { + Some(k) => { + lock.remove(&k); + log::debug!( + "WebRTC session removed key from cache: {} current len: {}", + k, + lock.len() + ); + } + None => return, + } } _ => {} } }) })); - // Register data channel creation handling - let remote_dc_open_notify = notify_tx.clone(); - let stream_for_dc = stream.clone(); - pc.on_data_channel(Box::new(move |dc: Arc| { - let d_label = dc.label().to_owned(); - let notify = remote_dc_open_notify.clone(); - let stream_for_dc_clone = stream_for_dc.clone(); - log::debug!("Remote data channel {} ready", d_label); - Box::pin(async move { - let mut stream_lock = stream_for_dc_clone.lock().await; - *stream_lock = dc.clone(); - drop(stream_lock); - dc.on_open(Box::new(move || { - let _ = notify.send(true); - Box::pin(async {}) - })); - }) - })); - // process offer/answer - if remote_offer.is_empty() { + if start_local_offer { let sdp = pc.create_offer(None).await?; let mut gather_complete = pc.gathering_complete_promise().await; pc.set_local_description(sdp.clone()).await?; let _ = gather_complete.recv().await; - key = Self::get_key_for_peer(&pc).await; - log::debug!("Start webrtc with local: {}", key); + log::debug!("local offer:\n{}", sdp.sdp); + // get local sdp key + key = Self::get_key_for_sdp(&sdp)?; + log::debug!("Start webrtc with local key: {}", key); } else { let sdp = serde_json::from_str::(&remote_offer)?; - pc.set_remote_description(sdp).await?; + pc.set_remote_description(sdp.clone()).await?; let answer = pc.create_answer(None).await?; let mut gather_complete = pc.gathering_complete_promise().await; pc.set_local_description(answer).await?; let _ = gather_complete.recv().await; - log::debug!("Start webrtc with remote: {}", remote_offer); + + log::debug!("remote offer:\n{}", sdp.sdp); + // get remote sdp key + key = Self::get_key_for_sdp(&sdp)?; + log::debug!("Start webrtc with remote key: {}", key); } - let webrtc_stream = WebRTCStream { + let webrtc_stream = Self { pc, stream, state_notify: notify_rx, @@ -237,7 +278,7 @@ impl WebRTCStream { let endpoint = Self::sdp_to_endpoint(&sdp); Ok(endpoint) } else { - Err(anyhow::anyhow!("Local description is not set")) + Err(anyhow::anyhow!("Local desc is not set")) } } @@ -366,6 +407,178 @@ pub fn is_webrtc_endpoint(endpoint: &str) -> bool { #[cfg(test)] mod tests { + use crate::webrtc::WebRTCStream; + use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; + #[test] - fn test_dc() {} + fn test_webrtc_session_key() { + let mut sdp_str = "".to_owned(); + assert_eq!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer(sdp_str).unwrap_or_default() + ) + .unwrap_or_default(), + "" + ); + + sdp_str = "v=0 +o=- 7400546379179479477 208696200 IN IP4 0.0.0.0 +s=- +t=0 0 +a=fingerprint:sha-256 97:52:D6:1F:1E:87:6C:DA:B8:21:95:64:A5:85:89:FA:02:71:C7:4D:B3:FD:25:92:40:FB:6B:65:24:3C:79:88 +a=group:BUNDLE 0 +a=extmap-allow-mixed +m=application 9 UDP/DTLS/SCTP webrtc-datachannel +c=IN IP4 0.0.0.0 +a=setup:actpass +a=mid:0 +a=sendrecv +a=sctp-port:5000 +a=ice-ufrag:RMWjjpXfpXbDPdMz +a=ice-pwd:BtIqlWHfwhsJdFiBROeLuEbNmYfHxRfT".to_owned(); + assert_eq!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer(sdp_str).unwrap_or_default() + ).unwrap_or_default(), + "sha-256 97:52:D6:1F:1E:87:6C:DA:B8:21:95:64:A5:85:89:FA:02:71:C7:4D:B3:FD:25:92:40:FB:6B:65:24:3C:79:88" + ); + + sdp_str = "v=0 +o=- 7400546379179479477 208696200 IN IP4 0.0.0.0 +s=- +t=0 0 +a=group:BUNDLE 0 +a=extmap-allow-mixed +m=application 9 UDP/DTLS/SCTP webrtc-datachannel +c=IN IP4 0.0.0.0 +a=fingerprint:sha-256 97:52:D6:1F:1E:87:6C:DA:B8:21:95:64:A5:85:89:FA:02:71:C7:4D:B3:FD:25:92:40:FB:6B:65:24:3C:79:88 +a=setup:actpass +a=mid:0 +a=sendrecv +a=sctp-port:5000 +a=ice-ufrag:RMWjjpXfpXbDPdMz +a=ice-pwd:BtIqlWHfwhsJdFiBROeLuEbNmYfHxRfT".to_owned(); + assert_eq!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer(sdp_str).unwrap_or_default() + ).unwrap_or_default(), + "sha-256 97:52:D6:1F:1E:87:6C:DA:B8:21:95:64:A5:85:89:FA:02:71:C7:4D:B3:FD:25:92:40:FB:6B:65:24:3C:79:88" + ); + + sdp_str = "v=0 +o=- 7400546379179479477 208696200 IN IP4 0.0.0.0 +s=- +t=0 0 +a=group:BUNDLE 0 +a=extmap-allow-mixed +m=application 9 UDP/DTLS/SCTP webrtc-datachannel +c=IN IP4 0.0.0.0 +a=setup:actpass +a=mid:0 +a=sendrecv +a=sctp-port:5000 +a=ice-ufrag:RMWjjpXfpXbDPdMz +a=ice-pwd:BtIqlWHfwhsJdFiBROeLuEbNmYfHxRfT" + .to_owned(); + assert!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer(sdp_str).unwrap_or_default() + ) + .is_err(), + "can not find fingerprint attribute" + ); + + sdp_str = "v=0 +o=- 7400546379179479477 208696200 IN IP4 0.0.0.0 +s=- +t=0 0 +a=group:BUNDLE 0 +a=extmap-allow-mixed +m=audio 9 UDP/DTLS/SCTP webrtc-datachannel +c=IN IP4 0.0.0.0 +a=fingerprint:sha-256 97:52:D6:1F:1E:87:6C:DA:B8:21:95:64:A5:85:89:FA:02:71:C7:4D:B3:FD:25:92:40:FB:6B:65:24:3C:79:88 +a=setup:actpass +a=mid:0 +a=sendrecv +a=sctp-port:5000 +a=ice-ufrag:RMWjjpXfpXbDPdMz +a=ice-pwd:BtIqlWHfwhsJdFiBROeLuEbNmYfHxRfT".to_owned(); + assert!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer(sdp_str).unwrap_or_default() + ) + .is_err(), + "can not find datachannel fingerprint attribute" + ); + + assert!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer("".to_owned()).unwrap_or_default() + ) + .is_err(), + "invalid sdp should error" + ); + + assert!( + WebRTCStream::get_key_for_sdp_json("{}").is_err(), + "empty sdp json should error" + ); + + assert!( + WebRTCStream::get_key_for_sdp_json("{ss}").is_err(), + "invalid sdp json should error" + ); + + let endpoint = "webrtc://eyJ0eXBlIjoiYW5zd2VyIiwic2RwIjoidj0wXHJcbm89LSA0MTA1NDk3NTY2NDgyMTQzODEwIDYwMzk1NzQw\ +MCBJTiBJUDQgMC4wLjAuMFxyXG5zPS1cclxudD0wIDBcclxuYT1maW5nZXJwcmludDpzaGEtMjU2IDYxOjYwOjc0OjQwOjI4OkNFOjBCOjBDOjc1OjRCOj\ +EwOjlBOkVFOjc3OkY1OjQ0OjU3Ojg0OjUxOkRCOjA0OjkyOjRBOjEwOjFDOjRFOjVGOjdFOkYxOkIzOjcxOjIyXHJcbmE9Z3JvdXA6QlVORExFIDBcclxu\ +YT1leHRtYXAtYWxsb3ctbWl4ZWRcclxubT1hcHBsaWNhdGlvbiA5IFVEUC9EVExTL1NDVFAgd2VicnRjLWRhdGFjaGFubmVsXHJcbmM9SU4gSVA0IDAuMC\ +4wLjBcclxuYT1zZXR1cDphY3RpdmVcclxuYT1taWQ6MFxyXG5hPXNlbmRyZWN2XHJcbmE9c2N0cC1wb3J0OjUwMDBcclxuYT1pY2UtdWZyYWc6SHlnU1Rr\ +V2RsRlpHRG1XWlxyXG5hPWljZS1wd2Q6SkJneFZWaGZveVhHdHZha1VWcnBQeHVOSVpMU3llS1pcclxuYT1jYW5kaWRhdGU6OTYzOTg4MzQ4IDEgdWRwID\ +IxMzA3MDY0MzEgMTkyLjE2OC4xLjIgNjQwMDcgdHlwIGhvc3RcclxuYT1jYW5kaWRhdGU6OTYzOTg4MzQ4IDIgdWRwIDIxMzA3MDY0MzEgMTkyLjE2OC4x\ +LjIgNjQwMDcgdHlwIGhvc3RcclxuYT1jYW5kaWRhdGU6MTg2MTA0NTE5MCAxIHVkcCAxNjk0NDk4ODE1IDE0LjIxMi42OC4xMiAyNzAwNCB0eXAgc3JmbH\ +ggcmFkZHIgMC4wLjAuMCBycG9ydCA2NDAwOFxyXG5hPWNhbmRpZGF0ZToxODYxMDQ1MTkwIDIgdWRwIDE2OTQ0OTg4MTUgMTQuMjEyLjY4LjEyIDI3MDA0\ +IHR5cCBzcmZseCByYWRkciAwLjAuMC4wIHJwb3J0IDY0MDA4XHJcbmE9ZW5kLW9mLWNhbmRpZGF0ZXNcclxuIn0=".to_owned(); + assert_eq!( + WebRTCStream::get_key_for_sdp_json( + &WebRTCStream::get_remote_offer(&endpoint).unwrap_or_default() + ).unwrap_or_default(), + "sha-256 61:60:74:40:28:CE:0B:0C:75:4B:10:9A:EE:77:F5:44:57:84:51:DB:04:92:4A:10:1C:4E:5F:7E:F1:B3:71:22" + ); + } + + #[tokio::test] + async fn test_webrtc_new_stream() { + let mut endpoint = "webrtc://sdfsdf".to_owned(); + assert!( + WebRTCStream::new(&endpoint, 10000).await.is_err(), + "invalid webrtc endpoint should error" + ); + + endpoint = "wss://sdfsdf".to_owned(); + assert!( + WebRTCStream::new(&endpoint, 10000).await.is_err(), + "invalid webrtc endpoint should error" + ); + + assert!( + WebRTCStream::new("", 10000).await.is_ok(), + "local webrtc endpoint should ok" + ); + + endpoint = "webrtc://eyJ0eXBlIjoiYW5zd2VyIiwic2RwIjoidj0wXHJcbm89LSA0MTA1NDk3NTY2NDgyMTQzODEwIDYwMzk1NzQw\ +MCBJTiBJUDQgMC4wLjAuMFxyXG5zPS1cclxudD0wIDBcclxuYT1maW5nZXJwcmludDpzaGEtMjU2IDYxOjYwOjc0OjQwOjI4OkNFOjBCOjBDOjc1OjRCOj\ +EwOjlBOkVFOjc3OkY1OjQ0OjU3Ojg0OjUxOkRCOjA0OjkyOjRBOjEwOjFDOjRFOjVGOjdFOkYxOkIzOjcxOjIyXHJcbmE9Z3JvdXA6QlVORExFIDBcclxu\ +YT1leHRtYXAtYWxsb3ctbWl4ZWRcclxubT1hcHBsaWNhdGlvbiA5IFVEUC9EVExTL1NDVFAgd2VicnRjLWRhdGFjaGFubmVsXHJcbmM9SU4gSVA0IDAuMC\ +4wLjBcclxuYT1zZXR1cDphY3RpdmVcclxuYT1taWQ6MFxyXG5hPXNlbmRyZWN2XHJcbmE9c2N0cC1wb3J0OjUwMDBcclxuYT1pY2UtdWZyYWc6SHlnU1Rr\ +V2RsRlpHRG1XWlxyXG5hPWljZS1wd2Q6SkJneFZWaGZveVhHdHZha1VWcnBQeHVOSVpMU3llS1pcclxuYT1jYW5kaWRhdGU6OTYzOTg4MzQ4IDEgdWRwID\ +IxMzA3MDY0MzEgMTkyLjE2OC4xLjIgNjQwMDcgdHlwIGhvc3RcclxuYT1jYW5kaWRhdGU6OTYzOTg4MzQ4IDIgdWRwIDIxMzA3MDY0MzEgMTkyLjE2OC4x\ +LjIgNjQwMDcgdHlwIGhvc3RcclxuYT1jYW5kaWRhdGU6MTg2MTA0NTE5MCAxIHVkcCAxNjk0NDk4ODE1IDE0LjIxMi42OC4xMiAyNzAwNCB0eXAgc3JmbH\ +ggcmFkZHIgMC4wLjAuMCBycG9ydCA2NDAwOFxyXG5hPWNhbmRpZGF0ZToxODYxMDQ1MTkwIDIgdWRwIDE2OTQ0OTg4MTUgMTQuMjEyLjY4LjEyIDI3MDA0\ +IHR5cCBzcmZseCByYWRkciAwLjAuMC4wIHJwb3J0IDY0MDA4XHJcbmE9ZW5kLW9mLWNhbmRpZGF0ZXNcclxuIn0=".to_owned(); + assert!( + WebRTCStream::new(&endpoint, 10000).await.is_err(), + "connect to an 'answer' webrtc endpoint should error" + ); + } }