make webrtc-rs optional feature

This commit is contained in:
lc
2025-11-13 16:53:04 +08:00
parent 442160d704
commit 8ae4651bc7
6 changed files with 223 additions and 232 deletions

View File

@@ -1,51 +1,37 @@
use std::io::Write;
use std::sync::Arc;
extern crate hbb_common;
use bytes::{Bytes, BytesMut};
use std::io::Write;
use bytes::Bytes;
use clap::{Arg, Command};
use anyhow::Result;
use tokio::time::Duration;
use webrtc::api::APIBuilder;
use webrtc::api::setting_engine::SettingEngine;
use webrtc::data_channel::RTCDataChannel;
use webrtc::ice_transport::ice_server::RTCIceServer;
use webrtc::peer_connection::configuration::RTCConfiguration;
use webrtc::peer_connection::math_rand_alpha;
use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use webrtc_signal::{self as signal};
// example from https://github.com/webrtc-rs/webrtc/tree/master/examples/examples/data-channels
#[tokio::main]
async fn main() -> Result<()> {
let mut app = Command::new("data-channels")
.version("0.1.0")
.author("Rain Liu <yliu@webrtc.rs>")
.about("An example of Data-Channels.")
.arg(
Arg::new("FULLHELP")
.help("Prints more detailed help information")
.long("fullhelp"),
)
let app = Command::new("webrtc-stream")
.about("An example of webrtc stream using hbb_common and webrtc-rs")
.arg(
Arg::new("debug")
.long("debug")
.short('d')
.action(clap::ArgAction::SetTrue)
.help("Prints debug log information"),
)
.arg(
Arg::new("offer")
.long("offer")
.short('o')
.help("set offer from other endpoint"),
);
let matches = app.clone().get_matches();
if matches.contains_id("FULLHELP") {
app.print_long_help().unwrap();
std::process::exit(0);
}
let debug = matches.contains_id("debug");
if debug {
println!("Debug log enabled");
env_logger::Builder::new()
.format(|buf, record| {
writeln!(
@@ -58,173 +44,67 @@ async fn main() -> Result<()> {
record.args()
)
})
.filter(None, log::LevelFilter::Trace)
.filter(None, log::LevelFilter::Debug)
.init();
}
// Everything below is the WebRTC-rs API! Thanks for using it ❤️.
// Create a SettingEngine and enable Detach
let mut s = SettingEngine::default();
s.detach_data_channels();
// Create the API object
let api = APIBuilder::new()
.with_setting_engine(s)
.build();
// Prepare the configuration
let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}],
..Default::default()
let remote_endpoint = if let Some(endpoint) = matches.get_one::<String>("offer") {
endpoint.to_string()
} else {
"".to_string()
};
// Create a new RTCPeerConnection
let peer_connection = Arc::new(api.new_peer_connection(config).await?);
let webrtc_stream = hbb_common::webrtc::WebRTCStream::new(&remote_endpoint, 30000).await?;
// Print the offer to be sent to the other peer
webrtc_stream.get_local_endpoint().await;
let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);
let bootstrap = peer_connection.create_data_channel("bootstrap", None).await?;
let bootstrap_clone = Arc::clone(&bootstrap);
bootstrap.on_open(Box::new(move || {
println!("Data channel bootstrap open.");
Box::pin(async move {
let _raw = match bootstrap_clone.detach().await {
Ok(raw) => raw,
Err(err) => {
println!("data channel detach got err: {err}");
return;
}
};
})
}));
// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
println!("Peer Connection State has changed: {s}");
if s == RTCPeerConnectionState::Failed {
// Wait until PeerConnection has had no network activity for 30 seconds or another failure.
// It may be reconnected using an ICE Restart.
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
println!("Peer Connection has gone to failed exiting");
let _ = done_tx.try_send(());
}
Box::pin(async {})
}));
// Register data channel creation handling
peer_connection.on_data_channel(Box::new(move |d: Arc<RTCDataChannel>| {
let d_label = d.label().to_owned();
let d_id = d.id();
println!("New DataChannel {d_label} {d_id}");
// Register channel opening handling
Box::pin(async move {
let d2 = Arc::clone(&d);
let d3 = Arc::clone(&d);
let d_label2 = d_label.clone();
let d_id2 = d_id;
d.on_open(Box::new(move || {
println!("Data channel '{d_label2}'-'{d_id2}' open.");
Box::pin(async move {
tokio::spawn(async move {
let _ = read_loop(d2).await;
});
// Handle writing to the data channel
tokio::spawn(async move {
let _ = write_loop(d3).await;
});
})
}));
})
}));
// Wait for the offer to be pasted
println!("Wait for the offer to be pasted");
let line = signal::must_read_stdin()?;
let desc_data = signal::decode(line.as_str())?;
let offer = serde_json::from_str::<RTCSessionDescription>(&desc_data)?;
// Set the remote SessionDescription
peer_connection.set_remote_description(offer).await?;
// Create an answer
let answer = peer_connection.create_answer(None).await?;
// Create channel that is blocked until ICE Gathering is complete
let mut gather_complete = peer_connection.gathering_complete_promise().await;
// Sets the LocalDescription, and starts our UDP listeners
peer_connection.set_local_description(answer).await?;
// Block until ICE Gathering is complete, disabling trickle ICE
// we do this because we only can exchange one signaling message
// in a production application you should exchange ICE Candidates via OnICECandidate
let _ = gather_complete.recv().await;
// Output the answer in base64 so we can paste it in browser
if let Some(local_desc) = peer_connection.local_description().await {
let json_str = serde_json::to_string(&local_desc)?;
println!("{json_str}");
let b64 = signal::encode(&json_str);
println!("--------------------- Copy the below base64 to browser --------------------");
println!("{b64}");
} else {
println!("generate local_description failed!");
if remote_endpoint.is_empty() {
// Wait for the answer to be pasted
println!("Wait for the answer to be pasted");
// readline blocking
let line = std::io::stdin()
.lines()
.next()
.ok_or_else(|| anyhow::anyhow!("No input received"))??;
webrtc_stream.set_remote_endpoint(&line).await?;
}
let s1 = hbb_common::Stream::WebRTC(webrtc_stream.clone());
tokio::spawn(async move {
let _ = read_loop(s1).await;
});
let s2 = hbb_common::Stream::WebRTC(webrtc_stream.clone());
tokio::spawn(async move {
let _ = write_loop(s2).await;
});
println!("Press ctrl-c to stop");
tokio::select! {
_ = done_rx.recv() => {
println!("received done signal!");
}
_ = tokio::signal::ctrl_c() => {
println!();
}
};
peer_connection.close().await?;
Ok(())
}
// read_loop shows how to read from the datachannel directly
async fn read_loop(dc: Arc<RTCDataChannel>) -> Result<()> {
let mut buffer = BytesMut::zeroed(4096);
async fn read_loop(mut stream: hbb_common::Stream) -> Result<()> {
loop {
let d = dc.detach().await?;
println!("RTCDatachannel detach ok");
let n = match d.read(&mut buffer).await {
Ok(n) => n,
Err(err) => {
println!("Datachannel closed; Exit the read_loop: {err}");
return Ok(());
}
};
if n == 0 {
println!("Datachannel read 0 byte; Exit the read_loop");
let Some(res) = stream.next().await else {
println!("Datachannel closed; Exit the read_loop");
return Ok(());
}
println!(
"Message from DataChannel: {}",
String::from_utf8(buffer[..n].to_vec())?
};
println!("Message from DataChannel: {}",
String::from_utf8(res.unwrap().to_vec())?
);
}
}
// write_loop shows how to write to the datachannel directly
async fn write_loop(d: Arc<RTCDataChannel>) -> Result<()> {
let mut result = Result::<usize>::Ok(0);
async fn write_loop(mut stream: hbb_common::Stream) -> Result<()> {
let mut result = Result::<()>::Ok(());
while result.is_ok() {
let timeout = tokio::time::sleep(Duration::from_secs(5));
tokio::pin!(timeout);
@@ -233,7 +113,7 @@ async fn write_loop(d: Arc<RTCDataChannel>) -> Result<()> {
_ = timeout.as_mut() =>{
let message = math_rand_alpha(15);
println!("Sending '{message}'");
result = d.send(&Bytes::from(message)).await.map_err(Into::into);
result = stream.send_bytes(Bytes::from(message)).await;
}
};
}