From b1fff9263e15dc3aa6b953a73791197e529935aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Lureau?= Date: Fri, 20 Aug 2021 21:26:26 +0400 Subject: [PATCH] usbredir: add free channels stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TODO: watch for chardev ownership changes instead Signed-off-by: Marc-André Lureau --- qemu-display/Cargo.toml | 1 + qemu-display/src/lib.rs | 2 +- qemu-display/src/usbredir.rs | 87 +++++++++++++++++++++++++++++++----- qemu-rdw/Cargo.toml | 1 + qemu-rdw/src/usbredir.rs | 14 +++++- 5 files changed, 93 insertions(+), 12 deletions(-) diff --git a/qemu-display/Cargo.toml b/qemu-display/Cargo.toml index a5d1d6b..07e1303 100644 --- a/qemu-display/Cargo.toml +++ b/qemu-display/Cargo.toml @@ -21,3 +21,4 @@ futures-util = { version = "0.3.8", features = ["async-await-macro"] } once_cell = "1.5" futures = "0.3.13" usbredirhost = "0.0.1" +async-broadcast = "0.3.3" diff --git a/qemu-display/src/lib.rs b/qemu-display/src/lib.rs index 0385dcb..133ecc6 100644 --- a/qemu-display/src/lib.rs +++ b/qemu-display/src/lib.rs @@ -34,7 +34,7 @@ mod display; pub use display::*; mod usbredir; -pub use usbredir::*; +pub use usbredir::UsbRedir; #[cfg(test)] mod tests { diff --git a/qemu-display/src/usbredir.rs b/qemu-display/src/usbredir.rs index d900b66..9f8cf57 100644 --- a/qemu-display/src/usbredir.rs +++ b/qemu-display/src/usbredir.rs @@ -1,3 +1,5 @@ +use async_broadcast::{broadcast, Receiver, Sender}; +use futures::Stream; use std::{ cell::RefCell, collections::HashMap, @@ -7,10 +9,11 @@ use std::{ io::{AsRawFd, RawFd}, net::UnixStream, }, + pin::Pin, sync::{Arc, Mutex}, + task::{Context, Poll}, thread::JoinHandle, }; - use usbredirhost::{ rusb::{self, UsbContext}, Device, DeviceHandler, LogLevel, @@ -172,14 +175,21 @@ impl Key { } } +#[derive(Debug, Clone, Copy)] +enum Event { + NFreeChannels(i32), +} + #[derive(Debug)] struct Inner { chardevs: Vec, handlers: HashMap, + channel: (Sender, Receiver), } impl Inner { - async fn available_chardev(&self) -> Option<&Chardev> { + // could make use of async combinators.. + async fn first_available_chardev(&self) -> Option<&Chardev> { for c in &self.chardevs { if c.proxy.owner().await.unwrap_or_default().is_empty() { return Some(c); @@ -187,6 +197,16 @@ impl Inner { } None } + + async fn n_available_chardev(&self) -> usize { + let mut n = 0; + for c in &self.chardevs { + if c.proxy.owner().await.unwrap_or_default().is_empty() { + n += 1; + } + } + n + } } #[derive(Clone, Debug)] @@ -196,9 +216,12 @@ pub struct UsbRedir { impl UsbRedir { pub fn new(chardevs: Vec) -> Self { + let mut channel = broadcast(1); + channel.0.set_overflow(true); Self { inner: Arc::new(RefCell::new(Inner { chardevs, + channel, handlers: Default::default(), })), } @@ -211,20 +234,30 @@ impl UsbRedir { ) -> Result { let mut inner = self.inner.borrow_mut(); let key = Key::from_device(device); + let mut nfree = inner.n_available_chardev().await as _; if state { if !inner.handlers.contains_key(&key) { let chardev = inner - .available_chardev() + .first_available_chardev() .await .ok_or_else(|| Error::Failed("There are no free USB channels".into()))?; let handler = Handler::new(device, chardev).await?; inner.handlers.insert(key, handler); } + nfree -= 1; } else { inner.handlers.remove(&key); + nfree += 1; } + // We should do better and watch for owner properties changes, but this would require tasks + let _ = inner + .channel + .0 + .broadcast(Event::NFreeChannels(nfree)) + .await; + Ok(state) } @@ -233,30 +266,64 @@ impl UsbRedir { inner.handlers.contains_key(&Key::from_device(device)) } + + pub async fn n_free_channels(&self) -> i32 { + self.inner.borrow().n_available_chardev().await as _ + } + + pub fn receive_n_free_channels(&self) -> Pin>> { + Box::pin(NFreeChannelsStream { + receiver: self.inner.borrow().channel.1.clone(), + }) + } +} + +#[derive(Debug)] +struct NFreeChannelsStream { + receiver: Receiver, +} + +impl Stream for NFreeChannelsStream { + type Item = i32; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = Pin::new(self); + + match Stream::poll_next(Pin::new(&mut this.receiver), cx) { + Poll::Ready(Some(Event::NFreeChannels(n))) => Poll::Ready(Some(n)), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } } fn fd_poll_readable(fd: RawFd, wait: Option) -> std::io::Result { let mut fds = vec![libc::pollfd { fd, - events: libc::POLLIN|libc::POLLHUP, + events: libc::POLLIN | libc::POLLHUP, revents: 0, }]; if let Some(wait) = wait { fds.push(libc::pollfd { fd: wait, - events: libc::POLLIN|libc::POLLHUP, + events: libc::POLLIN | libc::POLLHUP, revents: 0, }); } - let ret = unsafe { libc::poll(fds.as_mut_ptr(), - fds.len() as _, - if wait.is_some() { -1 } else { 0 }) }; + let ret = unsafe { + libc::poll( + fds.as_mut_ptr(), + fds.len() as _, + if wait.is_some() { -1 } else { 0 }, + ) + }; if ret < 0 { Err(std::io::Error::last_os_error()) } else if ret == 0 { Ok(false) - } else if fds[0].revents & libc::POLLHUP != 0 || - (wait.is_some() && fds[1].revents & libc::POLLIN != 0) { + } else if fds[0].revents & libc::POLLHUP != 0 + || (wait.is_some() && fds[1].revents & libc::POLLIN != 0) + { Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "hup")) } else { Ok(fds[0].revents & libc::POLLIN != 0) diff --git a/qemu-rdw/Cargo.toml b/qemu-rdw/Cargo.toml index 989c2bc..451ea04 100644 --- a/qemu-rdw/Cargo.toml +++ b/qemu-rdw/Cargo.toml @@ -16,3 +16,4 @@ keycodemap = { path = "../keycodemap" } gtk = { package = "gtk4", git = "https://github.com/gtk-rs/gtk4-rs" } rdw = { git = "https://gitlab.gnome.org/malureau/rdw.git" } futures-util = "0.3.13" +futures = "0.3.13" diff --git a/qemu-rdw/src/usbredir.rs b/qemu-rdw/src/usbredir.rs index 07b7a96..49682f4 100644 --- a/qemu-rdw/src/usbredir.rs +++ b/qemu-rdw/src/usbredir.rs @@ -42,12 +42,24 @@ impl Handler { if state { item.set_property("active", false).unwrap(); } - widget.emit_by_name("show-error",&[&e.to_string()]).unwrap(); + widget.emit_by_name("show-error", &[&e.to_string()]).unwrap(); }, } })); }); + let usbredir = self.usbredir.clone(); + MainContext::default().spawn_local(clone!(@weak widget => async move { + use futures::stream::StreamExt; // for `next` + widget + .set_property("free-channels", usbredir.n_free_channels().await) + .unwrap(); + let mut n = usbredir.receive_n_free_channels(); + while let Some(n) = n.next().await { + widget.set_property("free-channels", n).unwrap(); + } + })); + widget } }