usbredir: add free channels stream

TODO: watch for chardev ownership changes instead

Signed-off-by: Marc-André Lureau <marcandre.lureau@redhat.com>
This commit is contained in:
Marc-André Lureau 2021-08-20 21:26:26 +04:00
parent 36d9aa07ab
commit b1fff9263e
5 changed files with 93 additions and 12 deletions

View File

@ -21,3 +21,4 @@ futures-util = { version = "0.3.8", features = ["async-await-macro"] }
once_cell = "1.5" once_cell = "1.5"
futures = "0.3.13" futures = "0.3.13"
usbredirhost = "0.0.1" usbredirhost = "0.0.1"
async-broadcast = "0.3.3"

View File

@ -34,7 +34,7 @@ mod display;
pub use display::*; pub use display::*;
mod usbredir; mod usbredir;
pub use usbredir::*; pub use usbredir::UsbRedir;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {

View File

@ -1,3 +1,5 @@
use async_broadcast::{broadcast, Receiver, Sender};
use futures::Stream;
use std::{ use std::{
cell::RefCell, cell::RefCell,
collections::HashMap, collections::HashMap,
@ -7,10 +9,11 @@ use std::{
io::{AsRawFd, RawFd}, io::{AsRawFd, RawFd},
net::UnixStream, net::UnixStream,
}, },
pin::Pin,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
task::{Context, Poll},
thread::JoinHandle, thread::JoinHandle,
}; };
use usbredirhost::{ use usbredirhost::{
rusb::{self, UsbContext}, rusb::{self, UsbContext},
Device, DeviceHandler, LogLevel, Device, DeviceHandler, LogLevel,
@ -172,14 +175,21 @@ impl Key {
} }
} }
#[derive(Debug, Clone, Copy)]
enum Event {
NFreeChannels(i32),
}
#[derive(Debug)] #[derive(Debug)]
struct Inner { struct Inner {
chardevs: Vec<Chardev>, chardevs: Vec<Chardev>,
handlers: HashMap<Key, Handler>, handlers: HashMap<Key, Handler>,
channel: (Sender<Event>, Receiver<Event>),
} }
impl Inner { impl Inner {
async fn available_chardev(&self) -> Option<&Chardev> { // could make use of async combinators..
async fn first_available_chardev(&self) -> Option<&Chardev> {
for c in &self.chardevs { for c in &self.chardevs {
if c.proxy.owner().await.unwrap_or_default().is_empty() { if c.proxy.owner().await.unwrap_or_default().is_empty() {
return Some(c); return Some(c);
@ -187,6 +197,16 @@ impl Inner {
} }
None None
} }
async fn n_available_chardev(&self) -> usize {
let mut n = 0;
for c in &self.chardevs {
if c.proxy.owner().await.unwrap_or_default().is_empty() {
n += 1;
}
}
n
}
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -196,9 +216,12 @@ pub struct UsbRedir {
impl UsbRedir { impl UsbRedir {
pub fn new(chardevs: Vec<Chardev>) -> Self { pub fn new(chardevs: Vec<Chardev>) -> Self {
let mut channel = broadcast(1);
channel.0.set_overflow(true);
Self { Self {
inner: Arc::new(RefCell::new(Inner { inner: Arc::new(RefCell::new(Inner {
chardevs, chardevs,
channel,
handlers: Default::default(), handlers: Default::default(),
})), })),
} }
@ -211,20 +234,30 @@ impl UsbRedir {
) -> Result<bool> { ) -> Result<bool> {
let mut inner = self.inner.borrow_mut(); let mut inner = self.inner.borrow_mut();
let key = Key::from_device(device); let key = Key::from_device(device);
let mut nfree = inner.n_available_chardev().await as _;
if state { if state {
if !inner.handlers.contains_key(&key) { if !inner.handlers.contains_key(&key) {
let chardev = inner let chardev = inner
.available_chardev() .first_available_chardev()
.await .await
.ok_or_else(|| Error::Failed("There are no free USB channels".into()))?; .ok_or_else(|| Error::Failed("There are no free USB channels".into()))?;
let handler = Handler::new(device, chardev).await?; let handler = Handler::new(device, chardev).await?;
inner.handlers.insert(key, handler); inner.handlers.insert(key, handler);
} }
nfree -= 1;
} else { } else {
inner.handlers.remove(&key); inner.handlers.remove(&key);
nfree += 1;
} }
// We should do better and watch for owner properties changes, but this would require tasks
let _ = inner
.channel
.0
.broadcast(Event::NFreeChannels(nfree))
.await;
Ok(state) Ok(state)
} }
@ -233,6 +266,35 @@ impl UsbRedir {
inner.handlers.contains_key(&Key::from_device(device)) inner.handlers.contains_key(&Key::from_device(device))
} }
pub async fn n_free_channels(&self) -> i32 {
self.inner.borrow().n_available_chardev().await as _
}
pub fn receive_n_free_channels(&self) -> Pin<Box<dyn Stream<Item = i32>>> {
Box::pin(NFreeChannelsStream {
receiver: self.inner.borrow().channel.1.clone(),
})
}
}
#[derive(Debug)]
struct NFreeChannelsStream {
receiver: Receiver<Event>,
}
impl Stream for NFreeChannelsStream {
type Item = i32;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = Pin::new(self);
match Stream::poll_next(Pin::new(&mut this.receiver), cx) {
Poll::Ready(Some(Event::NFreeChannels(n))) => Poll::Ready(Some(n)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
} }
fn fd_poll_readable(fd: RawFd, wait: Option<RawFd>) -> std::io::Result<bool> { fn fd_poll_readable(fd: RawFd, wait: Option<RawFd>) -> std::io::Result<bool> {
@ -248,15 +310,20 @@ fn fd_poll_readable(fd: RawFd, wait: Option<RawFd>) -> std::io::Result<bool> {
revents: 0, revents: 0,
}); });
} }
let ret = unsafe { libc::poll(fds.as_mut_ptr(), let ret = unsafe {
libc::poll(
fds.as_mut_ptr(),
fds.len() as _, fds.len() as _,
if wait.is_some() { -1 } else { 0 }) }; if wait.is_some() { -1 } else { 0 },
)
};
if ret < 0 { if ret < 0 {
Err(std::io::Error::last_os_error()) Err(std::io::Error::last_os_error())
} else if ret == 0 { } else if ret == 0 {
Ok(false) Ok(false)
} else if fds[0].revents & libc::POLLHUP != 0 || } else if fds[0].revents & libc::POLLHUP != 0
(wait.is_some() && fds[1].revents & libc::POLLIN != 0) { || (wait.is_some() && fds[1].revents & libc::POLLIN != 0)
{
Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "hup")) Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "hup"))
} else { } else {
Ok(fds[0].revents & libc::POLLIN != 0) Ok(fds[0].revents & libc::POLLIN != 0)

View File

@ -16,3 +16,4 @@ keycodemap = { path = "../keycodemap" }
gtk = { package = "gtk4", git = "https://github.com/gtk-rs/gtk4-rs" } gtk = { package = "gtk4", git = "https://github.com/gtk-rs/gtk4-rs" }
rdw = { git = "https://gitlab.gnome.org/malureau/rdw.git" } rdw = { git = "https://gitlab.gnome.org/malureau/rdw.git" }
futures-util = "0.3.13" futures-util = "0.3.13"
futures = "0.3.13"

View File

@ -48,6 +48,18 @@ impl Handler {
})); }));
}); });
let usbredir = self.usbredir.clone();
MainContext::default().spawn_local(clone!(@weak widget => async move {
use futures::stream::StreamExt; // for `next`
widget
.set_property("free-channels", usbredir.n_free_channels().await)
.unwrap();
let mut n = usbredir.receive_n_free_channels();
while let Some(n) = n.next().await {
widget.set_property("free-channels", n).unwrap();
}
}));
widget widget
} }
} }