From fabfa85adc83bfeabc8d018e13b0a96fc1773f09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Lureau?= Date: Mon, 20 Sep 2021 16:52:03 +0400 Subject: [PATCH] Follow zbus API break, everything async /o\ MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit for better or worse... Signed-off-by: Marc-André Lureau --- Cargo.toml | 20 +- qemu-display/Cargo.toml | 2 +- qemu-display/src/audio.rs | 309 +++++++++++---------------- qemu-display/src/chardev.rs | 8 +- qemu-display/src/clipboard.rs | 140 ++++-------- qemu-display/src/console.rs | 113 +++------- qemu-display/src/console_listener.rs | 204 +++++++++--------- qemu-display/src/display.rs | 34 +-- qemu-display/src/error.rs | 5 +- qemu-display/src/event_sender.rs | 25 --- qemu-display/src/lib.rs | 3 - qemu-display/src/usbredir.rs | 29 +-- qemu-rdw/Cargo.toml | 6 +- qemu-rdw/src/audio.rs | 101 +++++---- qemu-rdw/src/clipboard.rs | 276 ++++++++++++++---------- qemu-rdw/src/display.rs | 147 ++++++++----- qemu-rdw/src/main.rs | 73 ++++--- qemu-rdw/src/usbredir.rs | 3 +- qemu-vnc/Cargo.toml | 1 + qemu-vnc/src/main.rs | 151 ++++++------- qemu-vte/Cargo.toml | 4 +- qemu-vte/src/main.rs | 9 +- 22 files changed, 769 insertions(+), 894 deletions(-) delete mode 100644 qemu-display/src/event_sender.rs diff --git a/Cargo.toml b/Cargo.toml index c59b9a2..3afd5e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,18 +11,8 @@ members = [ default-members = ["qemu-rdw"] [patch.crates-io] -vnc = { git = 'https://github.com/elmarco/rust-vnc', branch = 'server' } -usbredirhost = { path = "../usbredir-rs/usbredirhost" } -libusb1-sys = { path = "../rusb/libusb1-sys" } -rusb = { path = "../rusb" } -zbus = { path = "../zbus/zbus" } -zvariant = { path = "../zbus/zvariant" } - -[patch."https://gitlab.gnome.org/malureau/rdw.git"] -rdw = { path = '../rdw/rdw' } - -[patch."https://github.com/gtk-rs/gtk4-rs"] -gdk4-wayland = { path = '../gtk4-rs/gdk4-wayland' } -gdk4-x11 = { path = '../gtk4-rs/gdk4-x11' } -gtk4 = { path = '../gtk4-rs/gtk4' } -gtk4-sys = { path = '../gtk4-rs/gtk4/sys' } +vnc = { git = "https://github.com/elmarco/rust-vnc", branch = "server" } +zbus = { git = "https://gitlab.freedesktop.org/dbus/zbus.git" } +zvariant = { git = "https://gitlab.freedesktop.org/dbus/zbus.git" } +#zbus = { path = "../zbus/zbus" } +#zvariant = { path = "../zbus/zvariant" } diff --git a/qemu-display/Cargo.toml b/qemu-display/Cargo.toml index 07e1303..2aa3d36 100644 --- a/qemu-display/Cargo.toml +++ b/qemu-display/Cargo.toml @@ -12,7 +12,6 @@ derivative = "2.2.0" zbus = { version = "2.0.0-beta", features = ["xml"] } zvariant = { version = "2.4.0", features = ["serde_bytes"] } libc = "0.2.86" -glib = { git = "https://github.com/gtk-rs/gtk-rs-core", optional = true } enumflags2 = { version = "0.6.4", features = ["serde"] } serde = { version = "1.0.123", features = ["derive"] } serde_repr = "0.1.6" @@ -22,3 +21,4 @@ once_cell = "1.5" futures = "0.3.13" usbredirhost = "0.0.1" async-broadcast = "0.3.3" +async-trait = "0.1.48" diff --git a/qemu-display/src/audio.rs b/qemu-display/src/audio.rs index db611a9..b0790b6 100644 --- a/qemu-display/src/audio.rs +++ b/qemu-display/src/audio.rs @@ -1,13 +1,7 @@ -use once_cell::sync::OnceCell; -use std::default::Default; -use std::os::unix::net::UnixStream; -use std::sync::mpsc::{self, Receiver, SendError}; -use std::sync::{Arc, Mutex}; -use std::{os::unix::io::AsRawFd, thread}; +use std::os::unix::{io::AsRawFd, net::UnixStream}; +use zbus::{dbus_interface, dbus_proxy, zvariant::Fd, Connection}; -use zbus::{dbus_interface, dbus_proxy, zvariant::Fd}; - -use crate::{EventSender, Result}; +use crate::Result; #[derive(Debug)] pub struct PCMInfo { @@ -50,24 +44,6 @@ pub struct Volume { pub volume: Vec, } -#[derive(Debug)] -pub enum AudioOutEvent { - Init { id: u64, info: PCMInfo }, - Fini { id: u64 }, - SetEnabled { id: u64, enabled: bool }, - SetVolume { id: u64, volume: Volume }, - Write { id: u64, data: Vec }, -} - -#[derive(Debug)] -pub enum AudioInEvent { - Init { id: u64, info: PCMInfo }, - Fini { id: u64 }, - SetEnabled { id: u64, enabled: bool }, - SetVolume { id: u64, volume: Volume }, - Read { id: u64 }, -} - #[dbus_proxy( default_service = "org.qemu", default_path = "/org/qemu/Display1/Audio", @@ -86,37 +62,31 @@ trait Audio { pub struct Audio { #[derivative(Debug = "ignore")] pub proxy: AsyncAudioProxy<'static>, + out_listener: Option, + in_listener: Option, } -#[derive(Debug)] -pub(crate) struct AudioOutListener> { - tx: E, - err: Arc>>, +#[async_trait::async_trait] +pub trait AudioOutHandler: 'static + Send + Sync { + async fn init(&mut self, id: u64, info: PCMInfo); + + async fn fini(&mut self, id: u64); + + async fn set_enabled(&mut self, id: u64, enabled: bool); + + async fn set_volume(&mut self, id: u64, volume: Volume); + + async fn write(&mut self, id: u64, data: Vec); } -impl> AudioOutListener { - pub(crate) fn new(tx: E) -> Self { - AudioOutListener { - tx, - err: Default::default(), - } - } - - fn send(&mut self, event: AudioOutEvent) { - if let Err(e) = self.tx.send_event(event) { - let _ = self.err.set(e); - } - } - - pub fn err(&self) -> Arc>> { - self.err.clone() - } +struct AudioOutListener { + handler: H, } #[dbus_interface(name = "org.qemu.Display1.AudioOutListener")] -impl> AudioOutListener { +impl AudioOutListener { /// Init method - fn init( + async fn init( &mut self, id: u64, bits: u8, @@ -128,80 +98,73 @@ impl> AudioOutListener { bytes_per_second: u32, be: bool, ) { - self.send(AudioOutEvent::Init { - id, - info: PCMInfo { - bits, - is_signed, - is_float, - freq, - nchannels, - bytes_per_frame, - bytes_per_second, - be, - }, - }) + self.handler + .init( + id, + PCMInfo { + bits, + is_signed, + is_float, + freq, + nchannels, + bytes_per_frame, + bytes_per_second, + be, + }, + ) + .await } /// Fini method - fn fini(&mut self, id: u64) { - self.send(AudioOutEvent::Fini { id }) + async fn fini(&mut self, id: u64) { + self.handler.fini(id).await } /// SetEnabled method - fn set_enabled(&mut self, id: u64, enabled: bool) { - self.send(AudioOutEvent::SetEnabled { id, enabled }) + async fn set_enabled(&mut self, id: u64, enabled: bool) { + self.handler.set_enabled(id, enabled).await } /// SetVolume method - fn set_volume(&mut self, id: u64, mute: bool, volume: serde_bytes::ByteBuf) { - self.send(AudioOutEvent::SetVolume { - id, - volume: Volume { - mute, - volume: volume.into_vec(), - }, - }); + async fn set_volume(&mut self, id: u64, mute: bool, volume: serde_bytes::ByteBuf) { + self.handler + .set_volume( + id, + Volume { + mute, + volume: volume.into_vec(), + }, + ) + .await } /// Write method - fn write(&mut self, id: u64, data: serde_bytes::ByteBuf) { - self.send(AudioOutEvent::Write { - id, - data: data.into_vec(), - }) + async fn write(&mut self, id: u64, data: serde_bytes::ByteBuf) { + self.handler.write(id, data.into_vec()).await } } -#[derive(Debug)] -pub(crate) struct AudioInListener> { - tx: E, - err: Arc>>, +#[async_trait::async_trait] +pub trait AudioInHandler: 'static + Send + Sync { + async fn init(&mut self, id: u64, info: PCMInfo); + + async fn fini(&mut self, id: u64); + + async fn set_enabled(&mut self, id: u64, enabled: bool); + + async fn set_volume(&mut self, id: u64, volume: Volume); + + async fn read(&mut self, id: u64, size: u64) -> Vec; } -impl> AudioInListener { - pub(crate) fn new(tx: E) -> Self { - AudioInListener { - tx, - err: Default::default(), - } - } - - fn send(&mut self, event: AudioInEvent) { - if let Err(e) = self.tx.send_event(event) { - let _ = self.err.set(e); - } - } - - pub fn err(&self) -> Arc>> { - self.err.clone() - } +struct AudioInListener { + handler: H, } #[dbus_interface(name = "org.qemu.Display1.AudioInListener")] -impl> AudioInListener { +impl AudioInListener { /// Init method - fn init( + async fn init( &mut self, id: u64, bits: u8, @@ -213,116 +176,84 @@ impl> AudioInListener { bytes_per_second: u32, be: bool, ) { - self.send(AudioInEvent::Init { - id, - info: PCMInfo { - bits, - is_signed, - is_float, - freq, - nchannels, - bytes_per_frame, - bytes_per_second, - be, - }, - }) + self.handler + .init( + id, + PCMInfo { + bits, + is_signed, + is_float, + freq, + nchannels, + bytes_per_frame, + bytes_per_second, + be, + }, + ) + .await } /// Fini method - fn fini(&mut self, id: u64) { - self.send(AudioInEvent::Fini { id }) + async fn fini(&mut self, id: u64) { + self.handler.fini(id).await } /// SetEnabled method - fn set_enabled(&mut self, id: u64, enabled: bool) { - self.send(AudioInEvent::SetEnabled { id, enabled }) + async fn set_enabled(&mut self, id: u64, enabled: bool) { + self.handler.set_enabled(id, enabled).await } /// SetVolume method - fn set_volume(&mut self, id: u64, mute: bool, volume: serde_bytes::ByteBuf) { - self.send(AudioInEvent::SetVolume { - id, - volume: Volume { - mute, - volume: volume.into_vec(), - }, - }); + async fn set_volume(&mut self, id: u64, mute: bool, volume: serde_bytes::ByteBuf) { + self.handler + .set_volume( + id, + Volume { + mute, + volume: volume.into_vec(), + }, + ) + .await } /// Read method - fn read(&mut self, id: u64, size: u64) -> Vec { - dbg!((id, size)); - vec![0; size as usize] + async fn read(&mut self, id: u64, size: u64) -> Vec { + self.handler.read(id, size).await + // dbg!((id, size)); + // vec![0; size as usize] } } impl Audio { - pub async fn new(conn: &zbus::azync::Connection) -> Result { + pub async fn new(conn: &zbus::Connection) -> Result { let proxy = AsyncAudioProxy::new(conn).await?; - Ok(Self { proxy }) + Ok(Self { + proxy, + in_listener: None, + out_listener: None, + }) } - pub async fn listen_out(&self) -> Result> { + pub async fn register_out_listener(&mut self, handler: H) -> Result<()> { let (p0, p1) = UnixStream::pair()?; - let (tx, rx) = mpsc::channel(); self.proxy .register_out_listener(p0.as_raw_fd().into()) .await?; - - let _thread = thread::spawn(move || { - let c = zbus::ConnectionBuilder::unix_stream(p1) - .p2p() - .build() - .unwrap(); - let mut s = zbus::ObjectServer::new(&c); - let listener = AudioOutListener::new(Mutex::new(tx)); - let err = listener.err(); - s.at("/org/qemu/Display1/AudioOutListener", listener) - .unwrap(); - loop { - if let Err(e) = s.try_handle_next() { - eprintln!("Listener DBus error: {}", e); - return; - } - if let Some(e) = err.get() { - eprintln!("Listener channel error: {}", e); - return; - } - } - }); - - Ok(rx) - } - - pub async fn listen_in(&self) -> Result> { - let (p0, p1) = UnixStream::pair()?; - let (tx, rx) = mpsc::channel(); - self.proxy - .register_in_listener(p0.as_raw_fd().into()) + let c = zbus::ConnectionBuilder::unix_stream(p1) + .p2p() + .build() .await?; - - let _thread = thread::spawn(move || { - let c = zbus::ConnectionBuilder::unix_stream(p1) - .p2p() - .build() + { + let mut server = c.object_server_mut().await; + server + .at( + "/org/qemu/Display1/AudioOutListener", + AudioOutListener { handler }, + ) .unwrap(); - let mut s = zbus::ObjectServer::new(&c); - let listener = AudioInListener::new(Mutex::new(tx)); - let err = listener.err(); - s.at("/org/qemu/Display1/AudioInListener", listener) - .unwrap(); - loop { - if let Err(e) = s.try_handle_next() { - eprintln!("Listener DBus error: {}", e); - return; - } - if let Some(e) = err.get() { - eprintln!("Listener channel error: {}", e); - return; - } - } - }); - - Ok(rx) + server.start_dispatch(); + } + self.out_listener.replace(c); + Ok(()) } } diff --git a/qemu-display/src/chardev.rs b/qemu-display/src/chardev.rs index 7129782..f639e37 100644 --- a/qemu-display/src/chardev.rs +++ b/qemu-display/src/chardev.rs @@ -1,6 +1,8 @@ use std::convert::TryFrom; -use zbus::dbus_proxy; -use zbus::zvariant::{Fd, ObjectPath}; +use zbus::{ + dbus_proxy, + zvariant::{Fd, ObjectPath}, +}; use crate::Result; @@ -36,7 +38,7 @@ pub struct Chardev { } impl Chardev { - pub async fn new(conn: &zbus::azync::Connection, id: &str) -> Result { + pub async fn new(conn: &zbus::Connection, id: &str) -> Result { let obj_path = ObjectPath::try_from(format!("/org/qemu/Display1/Chardev_{}", id))?; let proxy = AsyncChardevProxy::builder(conn) .path(&obj_path)? diff --git a/qemu-display/src/clipboard.rs b/qemu-display/src/clipboard.rs index 97183de..cfdd1eb 100644 --- a/qemu-display/src/clipboard.rs +++ b/qemu-display/src/clipboard.rs @@ -1,12 +1,9 @@ -use once_cell::sync::OnceCell; use serde_repr::{Deserialize_repr, Serialize_repr}; use std::convert::TryFrom; -use std::sync::mpsc::{channel, Sender}; -use std::sync::{Arc, Mutex}; use zbus::{dbus_interface, dbus_proxy, zvariant::ObjectPath}; use zvariant::derive::Type; -use crate::{EventSender, Result}; +use crate::Result; #[repr(u32)] #[derive(Deserialize_repr, Serialize_repr, Type, Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -37,141 +34,88 @@ pub trait Clipboard { ) -> zbus::Result<(String, Vec)>; } -pub type ClipboardReplyTx = Sender)>>; +#[async_trait::async_trait] +pub trait ClipboardHandler: 'static + Send + Sync { + async fn register(&mut self); -// TODO: replace events mpsc with async traits -#[derive(Debug)] -pub enum ClipboardEvent { - Register, - Unregister, - Grab { - selection: ClipboardSelection, - serial: u32, - mimes: Vec, - }, - Release { - selection: ClipboardSelection, - }, - Request { + async fn unregister(&mut self); + + async fn grab(&mut self, selection: ClipboardSelection, serial: u32, mimes: Vec); + + async fn release(&mut self, selection: ClipboardSelection); + + async fn request( + &mut self, selection: ClipboardSelection, mimes: Vec, - tx: Mutex, - }, + ) -> Result<(String, Vec)>; } #[derive(Debug)] -pub(crate) struct ClipboardListener> { - tx: E, - err: Arc>, +pub(crate) struct ClipboardListener { + handler: H, } #[dbus_interface(name = "org.qemu.Display1.Clipboard")] -impl> ClipboardListener { - fn register(&mut self) { - self.send(ClipboardEvent::Register) +impl ClipboardListener { + async fn register(&mut self) { + self.handler.register().await; } - fn unregister(&mut self) { - self.send(ClipboardEvent::Unregister) + async fn unregister(&mut self) { + self.handler.unregister().await; } - fn grab(&mut self, selection: ClipboardSelection, serial: u32, mimes: Vec) { - self.send(ClipboardEvent::Grab { - selection, - serial, - mimes, - }) + async fn grab(&mut self, selection: ClipboardSelection, serial: u32, mimes: Vec) { + self.handler.grab(selection, serial, mimes).await; } - fn release(&mut self, selection: ClipboardSelection) { - self.send(ClipboardEvent::Release { selection }) + async fn release(&mut self, selection: ClipboardSelection) { + self.handler.release(selection).await; } - fn request( + async fn request( &mut self, selection: ClipboardSelection, mimes: Vec, ) -> zbus::fdo::Result<(String, Vec)> { - let (tx, rx) = channel(); - self.send(ClipboardEvent::Request { - selection, - mimes, - tx: Mutex::new(tx), - }); - rx.recv() - .map_err(|e| zbus::fdo::Error::Failed(format!("Request recv failed: {}", e)))? + self.handler + .request(selection, mimes) + .await .map_err(|e| zbus::fdo::Error::Failed(format!("Request failed: {}", e))) } } -impl> ClipboardListener { - pub fn new(tx: E) -> Self { - Self { - tx, - err: Default::default(), - } - } - - fn send(&mut self, event: ClipboardEvent) { - if let Err(e) = self.tx.send_event(event) { - let _ = self.err.set(e.to_string()); - } - } - - pub fn err(&self) -> Arc> { - self.err.clone() - } -} - #[derive(derivative::Derivative)] #[derivative(Debug)] pub struct Clipboard { - conn: zbus::azync::Connection, #[derivative(Debug = "ignore")] pub proxy: AsyncClipboardProxy<'static>, + conn: zbus::Connection, } impl Clipboard { - pub async fn new(conn: &zbus::azync::Connection) -> Result { - let obj_path = ObjectPath::try_from("/org/qemu/Display1/Clipboard")?; + pub async fn new(conn: &zbus::Connection) -> Result { + let obj_path = ObjectPath::try_from("/org/qemu/Display1/Clipboard").unwrap(); let proxy = AsyncClipboardProxy::builder(conn) .path(&obj_path)? .build() .await?; Ok(Self { - conn: conn.clone(), proxy, + conn: conn.clone(), }) } - pub async fn register(&self) -> Result<()> { - self.proxy.register().await?; - Ok(()) - } -} - -#[cfg(feature = "glib")] -impl Clipboard { - pub async fn glib_listen(&self) -> Result> { - let (tx, rx) = glib::MainContext::channel(glib::source::Priority::default()); - let c = self.conn.clone().into(); - let _thread = std::thread::spawn(move || { - let mut s = zbus::ObjectServer::new(&c); - let listener = ClipboardListener::new(tx); - let err = listener.err(); - s.at("/org/qemu/Display1/Clipboard", listener).unwrap(); - loop { - if let Err(e) = s.try_handle_next() { - eprintln!("Listener DBus error: {}", e); - break; - } - if let Some(e) = err.get() { - eprintln!("Listener channel error: {}", e); - break; - } - } - }); - - Ok(rx) + pub async fn register(&self, handler: H) -> Result<()> { + self.conn + .object_server_mut() + .await + .at( + "/org/qemu/Display1/Clipboard", + ClipboardListener { handler }, + ) + .unwrap(); + Ok(self.proxy.register().await?) } } diff --git a/qemu-display/src/console.rs b/qemu-display/src/console.rs index 984adf7..8929ce5 100644 --- a/qemu-display/src/console.rs +++ b/qemu-display/src/console.rs @@ -1,16 +1,15 @@ -use std::convert::TryFrom; -use std::os::unix::net::UnixStream; -use std::sync::mpsc::{self, Receiver, Sender}; -use std::sync::Mutex; -use std::{os::unix::io::AsRawFd, thread}; - +use std::{ + cell::RefCell, + convert::TryFrom, + os::unix::{io::AsRawFd, net::UnixStream}, +}; use zbus::{ dbus_proxy, zvariant::{Fd, ObjectPath}, + Connection, }; -use crate::Result; -use crate::{AsyncKeyboardProxy, AsyncMouseProxy, ConsoleEvent, ConsoleListener}; +use crate::{AsyncKeyboardProxy, AsyncMouseProxy, ConsoleListener, ConsoleListenerHandler, Result}; #[dbus_proxy(default_service = "org.qemu", interface = "org.qemu.Display1.Console")] pub trait Console { @@ -54,10 +53,11 @@ pub struct Console { pub keyboard: AsyncKeyboardProxy<'static>, #[derivative(Debug = "ignore")] pub mouse: AsyncMouseProxy<'static>, + listener: RefCell>, } impl Console { - pub async fn new(conn: &zbus::azync::Connection, idx: u32) -> Result { + pub async fn new(conn: &Connection, idx: u32) -> Result { let obj_path = ObjectPath::try_from(format!("/org/qemu/Display1/Console_{}", idx))?; let proxy = AsyncConsoleProxy::builder(conn) .path(&obj_path)? @@ -75,30 +75,10 @@ impl Console { proxy, keyboard, mouse, + listener: RefCell::new(None), }) } - pub async fn dispatch_signals(&self) -> Result<()> { - use futures_util::{future::FutureExt, select}; - - if let Some(msg) = select!( - msg = self.proxy.next_signal().fuse() => { - msg? - }, - msg = self.keyboard.next_signal().fuse() => { - msg? - }, - msg = self.mouse.next_signal().fuse() => { - msg? - } - ) { - if msg.primary_header().msg_type() == zbus::MessageType::Signal { - log::debug!("Ignoring {:?}", msg); - } - } - Ok(()) - } - pub async fn label(&self) -> Result { Ok(self.proxy.label().await?) } @@ -111,66 +91,25 @@ impl Console { Ok(self.proxy.height().await?) } - pub async fn listen(&self) -> Result<(Receiver, Sender<()>)> { + pub async fn register_listener(&self, handler: H) -> Result<()> { let (p0, p1) = UnixStream::pair()?; - let (tx, rx) = mpsc::channel(); self.proxy.register_listener(p0.as_raw_fd().into()).await?; - - let (wait_tx, wait_rx) = mpsc::channel(); - let _thread = thread::spawn(move || { - let c = zbus::ConnectionBuilder::unix_stream(p1) - .p2p() - .build() + let c = zbus::ConnectionBuilder::unix_stream(p1) + .p2p() + .build() + .await?; + { + let mut server = c.object_server_mut().await; + server + .at("/org/qemu/Display1/Listener", ConsoleListener::new(handler)) .unwrap(); - let mut s = zbus::ObjectServer::new(&c); - let listener = ConsoleListener::new(Mutex::new(tx), wait_rx); - let err = listener.err(); - s.at("/org/qemu/Display1/Listener", listener).unwrap(); - loop { - if let Err(e) = s.try_handle_next() { - eprintln!("Listener DBus error: {}", e); - return; - } - if let Some(e) = err.get() { - eprintln!("Listener channel error: {}", e); - return; - } - } - }); + server.start_dispatch(); + } + self.listener.replace(Some(c)); + Ok(()) + } - Ok((rx, wait_tx)) - } -} - -#[cfg(feature = "glib")] -impl Console { - pub async fn glib_listen(&self) -> Result<(glib::Receiver, Sender<()>)> { - let (p0, p1) = UnixStream::pair()?; - let (tx, rx) = glib::MainContext::channel(glib::source::Priority::default()); - self.proxy.register_listener(p0.as_raw_fd().into()).await?; - - let (wait_tx, wait_rx) = mpsc::channel(); - let _thread = thread::spawn(move || { - let c = zbus::ConnectionBuilder::unix_stream(p1) - .p2p() - .build() - .unwrap(); - let mut s = zbus::ObjectServer::new(&c); - let listener = ConsoleListener::new(tx, wait_rx); - let err = listener.err(); - s.at("/org/qemu/Display1/Listener", listener).unwrap(); - loop { - if let Err(e) = s.try_handle_next() { - eprintln!("Listener DBus error: {}", e); - break; - } - if let Some(e) = err.get() { - eprintln!("Listener channel error: {}", e); - break; - } - } - }); - - Ok((rx, wait_tx)) + pub fn unregister_listener(&mut self) { + self.listener.replace(None); } } diff --git a/qemu-display/src/console_listener.rs b/qemu-display/src/console_listener.rs index 738cd91..9b389c6 100644 --- a/qemu-display/src/console_listener.rs +++ b/qemu-display/src/console_listener.rs @@ -1,15 +1,10 @@ -use once_cell::sync::OnceCell; -use std::ops::Drop; -use std::os::unix::io::IntoRawFd; -use std::os::unix::io::{AsRawFd, RawFd}; -use std::sync::mpsc::{Receiver, RecvError, SendError}; -use std::sync::{Arc, Mutex}; - use derivative::Derivative; +use std::{ + ops::Drop, + os::unix::io::{AsRawFd, IntoRawFd, RawFd}, +}; use zbus::{dbus_interface, zvariant::Fd}; -use crate::EventSender; - #[derive(Derivative)] #[derivative(Debug)] pub struct Scanout { @@ -45,6 +40,17 @@ pub struct ScanoutDMABUF { pub y0_top: bool, } +#[derive(Derivative)] +#[derivative(Debug)] +pub struct Cursor { + pub width: i32, + pub height: i32, + pub hot_x: i32, + pub hot_y: i32, + #[derivative(Debug = "ignore")] + pub data: Vec, +} + impl Drop for ScanoutDMABUF { fn drop(&mut self) { if self.fd >= 0 { @@ -68,39 +74,39 @@ pub struct MouseSet { pub on: i32, } -// TODO: replace events mpsc with async traits -#[derive(Debug)] -pub enum ConsoleEvent { - Scanout(Scanout), - Update(Update), - ScanoutDMABUF(ScanoutDMABUF), - UpdateDMABUF { - x: i32, - y: i32, - w: i32, - h: i32, - }, - MouseSet(MouseSet), - CursorDefine { - width: i32, - height: i32, - hot_x: i32, - hot_y: i32, - data: Vec, - }, - Disconnected, +#[derive(Debug, Copy, Clone)] +pub struct UpdateDMABUF { + pub x: i32, + pub y: i32, + pub w: i32, + pub h: i32, +} + +#[async_trait::async_trait] +pub trait ConsoleListenerHandler: 'static + Send + Sync { + async fn scanout(&mut self, scanout: Scanout); + + async fn update(&mut self, update: Update); + + async fn scanout_dmabuf(&mut self, scanout: ScanoutDMABUF); + + async fn update_dmabuf(&mut self, update: UpdateDMABUF); + + async fn mouse_set(&mut self, set: MouseSet); + + async fn cursor_define(&mut self, cursor: Cursor); + + fn disconnected(&mut self); } #[derive(Debug)] -pub(crate) struct ConsoleListener> { - tx: E, - wait_rx: Mutex>, - err: Arc>>, +pub(crate) struct ConsoleListener { + handler: H, } #[dbus_interface(name = "org.qemu.Display1.Listener")] -impl> ConsoleListener { - fn scanout( +impl ConsoleListener { + async fn scanout( &mut self, width: u32, height: u32, @@ -108,16 +114,18 @@ impl> ConsoleListener { format: u32, data: serde_bytes::ByteBuf, ) { - self.send(ConsoleEvent::Scanout(Scanout { - width, - height, - stride, - format, - data: data.into_vec(), - })) + self.handler + .scanout(Scanout { + width, + height, + stride, + format, + data: data.into_vec(), + }) + .await; } - fn update( + async fn update( &mut self, x: i32, y: i32, @@ -127,19 +135,21 @@ impl> ConsoleListener { format: u32, data: serde_bytes::ByteBuf, ) { - self.send(ConsoleEvent::Update(Update { - x, - y, - w, - h, - stride, - format, - data: data.into_vec(), - })) + self.handler + .update(Update { + x, + y, + w, + h, + stride, + format, + data: data.into_vec(), + }) + .await; } #[dbus_interface(name = "ScanoutDMABUF")] - fn scanout_dmabuf( + async fn scanout_dmabuf( &mut self, fd: Fd, width: u32, @@ -150,66 +160,58 @@ impl> ConsoleListener { y0_top: bool, ) { let fd = unsafe { libc::dup(fd.as_raw_fd()) }; - self.send(ConsoleEvent::ScanoutDMABUF(ScanoutDMABUF { - fd, - width, - height, - stride, - fourcc, - modifier, - y0_top, - })) + self.handler + .scanout_dmabuf(ScanoutDMABUF { + fd, + width, + height, + stride, + fourcc, + modifier, + y0_top, + }) + .await; } #[dbus_interface(name = "UpdateDMABUF")] - fn update_dmabuf(&mut self, x: i32, y: i32, w: i32, h: i32) { - self.send(ConsoleEvent::UpdateDMABUF { x, y, w, h }); - if let Err(e) = self.wait() { - eprintln!("update returned error: {}", e) - } + async fn update_dmabuf(&mut self, x: i32, y: i32, w: i32, h: i32) { + self.handler + .update_dmabuf(UpdateDMABUF { x, y, w, h }) + .await; } - fn mouse_set(&mut self, x: i32, y: i32, on: i32) { - self.send(ConsoleEvent::MouseSet(MouseSet { x, y, on })) + async fn mouse_set(&mut self, x: i32, y: i32, on: i32) { + self.handler.mouse_set(MouseSet { x, y, on }).await; } - fn cursor_define(&mut self, width: i32, height: i32, hot_x: i32, hot_y: i32, data: Vec) { - self.send(ConsoleEvent::CursorDefine { - width, - height, - hot_x, - hot_y, - data, - }) + async fn cursor_define( + &mut self, + width: i32, + height: i32, + hot_x: i32, + hot_y: i32, + data: Vec, + ) { + self.handler + .cursor_define(Cursor { + width, + height, + hot_x, + hot_y, + data, + }) + .await; } } -impl> ConsoleListener { - pub(crate) fn new(tx: E, wait_rx: Receiver<()>) -> Self { - ConsoleListener { - tx, - wait_rx: Mutex::new(wait_rx), - err: Default::default(), - } - } - - fn send(&mut self, event: ConsoleEvent) { - if let Err(e) = self.tx.send_event(event) { - let _ = self.err.set(e); - } - } - - fn wait(&mut self) -> Result<(), RecvError> { - self.wait_rx.lock().unwrap().recv() - } - - pub fn err(&self) -> Arc>> { - self.err.clone() +impl ConsoleListener { + pub(crate) fn new(handler: H) -> Self { + Self { handler } } } -impl> Drop for ConsoleListener { +impl Drop for ConsoleListener { fn drop(&mut self) { - self.send(ConsoleEvent::Disconnected) + self.handler.disconnected(); } } diff --git a/qemu-display/src/display.rs b/qemu-display/src/display.rs index a17224c..736cee5 100644 --- a/qemu-display/src/display.rs +++ b/qemu-display/src/display.rs @@ -1,14 +1,14 @@ use futures::stream::{self, StreamExt}; -use std::collections::HashMap; -use std::convert::TryFrom; -use std::convert::TryInto; -use zbus::azync::Connection; -use zbus::fdo; -use zbus::fdo::ManagedObjects; -use zbus::names::BusName; -use zbus::names::OwnedUniqueName; -use zbus::names::UniqueName; -use zbus::names::WellKnownName; +use std::{ + collections::HashMap, + convert::{TryFrom, TryInto}, +}; +use zbus::{ + fdo, + fdo::ManagedObjects, + names::{BusName, OwnedUniqueName, UniqueName, WellKnownName}, + Connection, +}; use zvariant::OwnedObjectPath; use crate::{AsyncVMProxy, Audio, Chardev, Clipboard, Error, Result, UsbRedir}; @@ -21,10 +21,15 @@ pub struct Display { impl Display { pub async fn by_name(conn: &Connection) -> Result> { let mut hm = HashMap::new(); - let list = fdo::AsyncDBusProxy::new(conn) + let list = match fdo::AsyncDBusProxy::new(conn) .await? .list_queued_owners(WellKnownName::from_str_unchecked("org.qemu")) - .await?; + .await + { + Ok(list) => list, + Err(zbus::fdo::Error::NameHasNoOwner(_)) => vec![], + Err(e) => return Err(e.into()), + }; for dest in list.into_iter() { let name = AsyncVMProxy::builder(conn) .destination(UniqueName::from(&dest))? @@ -47,7 +52,7 @@ impl Display { } else { "org.qemu".try_into().unwrap() }; - let objects = fdo::AsyncObjectManagerProxy::builder(&conn) + let objects = fdo::AsyncObjectManagerProxy::builder(conn) .destination(dest)? .path("/org/qemu/Display1")? .build() @@ -107,7 +112,6 @@ impl Display { .collect() .await; - let redir = UsbRedir::new(chardevs); - redir + UsbRedir::new(chardevs) } } diff --git a/qemu-display/src/error.rs b/qemu-display/src/error.rs index 5ba9ccd..c844d63 100644 --- a/qemu-display/src/error.rs +++ b/qemu-display/src/error.rs @@ -1,9 +1,6 @@ use usbredirhost::rusb; -use std::convert::Infallible; -use std::error; -use std::fmt; -use std::io; +use std::{convert::Infallible, error, fmt, io}; #[derive(Debug)] pub enum Error { diff --git a/qemu-display/src/event_sender.rs b/qemu-display/src/event_sender.rs deleted file mode 100644 index c74797f..0000000 --- a/qemu-display/src/event_sender.rs +++ /dev/null @@ -1,25 +0,0 @@ -use std::sync::mpsc::{SendError, Sender}; -use std::sync::Mutex; - -pub(crate) trait EventSender: Send + Sync { - type Event; - - fn send_event(&self, t: Self::Event) -> Result<(), SendError>; -} - -impl EventSender for Mutex> { - type Event = T; - - fn send_event(&self, t: Self::Event) -> Result<(), SendError> { - self.lock().unwrap().send(t) - } -} - -#[cfg(feature = "glib")] -impl EventSender for glib::Sender { - type Event = T; - - fn send_event(&self, t: Self::Event) -> Result<(), SendError> { - self.send(t) - } -} diff --git a/qemu-display/src/lib.rs b/qemu-display/src/lib.rs index 133ecc6..597ae2e 100644 --- a/qemu-display/src/lib.rs +++ b/qemu-display/src/lib.rs @@ -3,9 +3,6 @@ mod error; pub use error::*; -mod event_sender; -use event_sender::*; - mod vm; pub use vm::*; diff --git a/qemu-display/src/usbredir.rs b/qemu-display/src/usbredir.rs index e5fafb3..773efe2 100644 --- a/qemu-display/src/usbredir.rs +++ b/qemu-display/src/usbredir.rs @@ -23,9 +23,9 @@ use crate::{Chardev, Error, Result}; #[derive(Debug)] struct InnerHandler { + #[allow(unused)] // keep the device opened, as rusb doesn't take it device_fd: Option, stream: UnixStream, - stream_thread: JoinHandle<()>, ctxt: rusb::Context, ctxt_thread: Option>, event: (UnixStream, UnixStream), @@ -90,7 +90,7 @@ impl Handler { Ok(it) => (it, None), Err(rusb::Error::Access) => { let (bus, dev) = (device.bus_number(), device.address()); - let sysbus = zbus::azync::Connection::system().await?; + let sysbus = zbus::Connection::system().await?; let fd = AsyncSystemHelperProxy::new(&sysbus) .await? .open_bus_dev(bus, dev) @@ -110,7 +110,7 @@ impl Handler { // really annoying libusb/usbredir APIs... let event = UnixStream::pair()?; let event_fd = event.1.as_raw_fd(); - let stream_thread = std::thread::spawn(move || loop { + std::thread::spawn(move || loop { let ret = fd_poll_readable(stream_fd, Some(event_fd)); c.interrupt_handle_events(); if ret.is_err() { @@ -122,7 +122,6 @@ impl Handler { inner: Arc::new(Mutex::new(InnerHandler { device_fd, stream, - stream_thread, event, quit: false, ctxt: ctxt.clone(), @@ -162,7 +161,7 @@ impl Drop for Handler { inner.quit = true; inner.ctxt.interrupt_handle_events(); // stream will be dropped and stream_thread will kick context_thread - inner.event.0.write(&[0]).unwrap(); + inner.event.0.write_all(&[0]).unwrap(); } } @@ -234,24 +233,30 @@ impl UsbRedir { ) -> Result { let mut inner = self.inner.borrow_mut(); let key = Key::from_device(device); + let handled = inner.handlers.contains_key(&key); + // We should do better and watch for owner properties changes, but this would require tasks + // anticipate result let mut nfree = inner.n_available_chardev().await as _; - if state { - if !inner.handlers.contains_key(&key) { + match (state, handled) { + (true, false) => { let chardev = inner .first_available_chardev() .await .ok_or_else(|| Error::Failed("There are no free USB channels".into()))?; let handler = Handler::new(device, chardev).await?; inner.handlers.insert(key, handler); + nfree -= 1; + } + (false, true) => { + inner.handlers.remove(&key); + nfree += 1; + } + _ => { + return Ok(state); } - nfree -= 1; - } else { - inner.handlers.remove(&key); - nfree += 1; } - // We should do better and watch for owner properties changes, but this would require tasks let _ = inner.channel.0.broadcast(Event::NFreeChannels(nfree)).await; Ok(state) diff --git a/qemu-rdw/Cargo.toml b/qemu-rdw/Cargo.toml index 451ea04..47eac25 100644 --- a/qemu-rdw/Cargo.toml +++ b/qemu-rdw/Cargo.toml @@ -11,9 +11,9 @@ log = "0.4" pretty_env_logger = "0.4" once_cell = "1.5" zbus = { version = "2.0.0-beta" } -qemu-display = { path = "../qemu-display", features = ["glib"] } +qemu-display = { path = "../qemu-display" } keycodemap = { path = "../keycodemap" } -gtk = { package = "gtk4", git = "https://github.com/gtk-rs/gtk4-rs" } -rdw = { git = "https://gitlab.gnome.org/malureau/rdw.git" } +rdw = { package = "rdw4", git = "https://gitlab.gnome.org/malureau/rdw.git" } futures-util = "0.3.13" futures = "0.3.13" +async-trait = "0.1.48" diff --git a/qemu-rdw/src/audio.rs b/qemu-rdw/src/audio.rs index 40a7bdc..bdf0e54 100644 --- a/qemu-rdw/src/audio.rs +++ b/qemu-rdw/src/audio.rs @@ -1,60 +1,57 @@ -use std::error::Error; -use std::result::Result; -use std::thread; +use std::{error::Error, result::Result}; -use qemu_display::Audio; +use qemu_display::{Audio, AudioOutHandler}; + +#[derive(Debug)] +pub struct Handler { + #[allow(unused)] + audio: Audio, +} #[derive(Debug, Default)] -pub struct Handler { - thread: Option>, +pub struct OutListener { + gst: rdw::GstAudio, +} + +#[async_trait::async_trait] +impl AudioOutHandler for OutListener { + async fn init(&mut self, id: u64, info: qemu_display::PCMInfo) { + if let Err(e) = self.gst.init_out(id, &info.gst_caps()) { + log::warn!("Failed to initialize audio stream: {}", e); + } + } + + async fn fini(&mut self, id: u64) { + self.gst.fini_out(id); + } + + async fn set_enabled(&mut self, id: u64, enabled: bool) { + if let Err(e) = self.gst.set_enabled_out(id, enabled) { + log::warn!("Failed to set enabled audio stream: {}", e); + } + } + + async fn set_volume(&mut self, id: u64, volume: qemu_display::Volume) { + if let Err(e) = self.gst.set_volume_out( + id, + volume.mute, + volume.volume.first().map(|v| *v as f64 / 255f64), + ) { + log::warn!("Failed to set volume: {}", e); + } + } + + async fn write(&mut self, id: u64, data: Vec) { + if let Err(e) = self.gst.write_out(id, data) { + log::warn!("Failed to output stream: {}", e); + } + } } impl Handler { - pub async fn new(audio: Audio) -> Result> { - let rx = audio.listen_out().await?; - let mut gst = rdw::GstAudio::new()?; - - let thread = thread::spawn(move || loop { - match rx.recv() { - Ok(event) => { - use qemu_display::AudioOutEvent::*; - - match event { - Init { id, info } => { - if let Err(e) = gst.init_out(id, &info.gst_caps()) { - log::warn!("Failed to initialize audio stream: {}", e); - } - } - Fini { id } => { - gst.fini_out(id); - } - SetEnabled { id, enabled } => { - if let Err(e) = gst.set_enabled_out(id, enabled) { - log::warn!("Failed to set enabled audio stream: {}", e); - } - } - SetVolume { id, volume } => { - if let Err(e) = gst.set_volume_out( - id, - volume.mute, - volume.volume.first().map(|v| *v as f64 / 255f64), - ) { - log::warn!("Failed to set volume: {}", e); - } - } - Write { id, data } => { - if let Err(e) = gst.write_out(id, data) { - log::warn!("Failed to output stream: {}", e); - } - } - } - } - Err(e) => log::warn!("Audio thread error: {}", e), - } - }); - - Ok(Self { - thread: Some(thread), - }) + pub async fn new(mut audio: Audio) -> Result> { + let gst = rdw::GstAudio::new()?; + audio.register_out_listener(OutListener { gst }).await?; + Ok(Handler { audio }) } } diff --git a/qemu-rdw/src/clipboard.rs b/qemu-rdw/src/clipboard.rs index 5dc9a26..688eeb2 100644 --- a/qemu-rdw/src/clipboard.rs +++ b/qemu-rdw/src/clipboard.rs @@ -1,145 +1,195 @@ -use std::cell::Cell; -use std::error::Error; -use std::rc::Rc; -use std::result::Result; - -use crate::glib::{self, clone, prelude::*, SignalHandlerId, SourceId}; -use gtk::{gdk, gio, prelude::DisplayExt, prelude::*}; -use qemu_display::{ - self as qdl, AsyncClipboardProxy, Clipboard, ClipboardEvent, ClipboardSelection, +use std::{ + error::Error, + result::Result, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, }; +use glib::{clone, SignalHandlerId}; +use gtk::{ + gdk, gio, glib, + prelude::{DisplayExt, *}, +}; +use qemu_display::{AsyncClipboardProxy, Clipboard, ClipboardHandler, ClipboardSelection}; +use rdw::gtk; + #[derive(Debug)] pub struct Handler { - rx: SourceId, + #[allow(unused)] + clipboard: Clipboard, cb_handler: Option, cb_primary_handler: Option, } -impl Handler { - pub async fn new(ctxt: Clipboard) -> Result> { - let rx = ctxt - .glib_listen() - .await - .expect("Failed to listen to the clipboard"); - let proxy = ctxt.proxy.clone(); - let serials = Rc::new([Cell::new(0), Cell::new(0)]); - let current_serials = serials.clone(); - let rx = rx.attach(None, move |evt| { - use ClipboardEvent::*; +#[derive(Debug)] +struct InnerHandler { + proxy: AsyncClipboardProxy<'static>, + serials: Arc<[AtomicU32; 2]>, +} - log::debug!("Clipboard event: {:?}", evt); - match evt { - Register | Unregister => { - current_serials[0].set(0); - current_serials[1].set(0); - } - Grab { - selection, - serial, - mimes, - } => { - if let Some((clipboard, idx)) = clipboard_from_selection(selection) { - if serial < current_serials[idx].get() { - log::debug!("Ignored peer grab: {} < {}", serial, current_serials[idx].get()); - return Continue(true); - } +impl InnerHandler { + fn reset_serials(&mut self) { + self.serials[0].store(0, Ordering::SeqCst); + self.serials[1].store(0, Ordering::SeqCst); + } +} - current_serials[idx].set(serial); - let m: Vec<_> = mimes.iter().map(|s|s.as_str()).collect(); - let p = proxy.clone(); - let content = rdw::ContentProvider::new(&m, move |mime, stream, prio| { - log::debug!("content-provider-write: {:?}", (mime, stream)); +#[async_trait::async_trait] +impl ClipboardHandler for InnerHandler { + async fn register(&mut self) { + self.reset_serials(); + } - let p = p.clone(); - let mime = mime.to_string(); - Some(Box::pin(clone!(@strong stream => @default-return panic!(), async move { - match p.request(selection, &[&mime]).await { - Ok((_, data)) => { - let bytes = glib::Bytes::from(&data); - stream.write_bytes_async_future(&bytes, prio).await.map(|_| ()) - } - Err(e) => { - let err = format!("failed to request clipboard data: {}", e); - log::warn!("{}", err); - Err(glib::Error::new(gio::IOErrorEnum::Failed, &err)) - } - } - }))) - }); + async fn unregister(&mut self) { + self.reset_serials(); + } - if let Err(e) = clipboard.set_content(Some(&content)) { - log::warn!("Failed to set clipboard grab: {}", e); - } - } - } - Release { selection } => { - if let Some((clipboard, _)) = clipboard_from_selection(selection) { - // TODO: track if the outside/app changed the clipboard - if let Err(e) = clipboard.set_content(gdk::NONE_CONTENT_PROVIDER) { - log::warn!("Failed to release clipboard: {}", e); - } - } - } - Request { selection, mimes, tx } => { - if let Some((clipboard, _)) = clipboard_from_selection(selection) { - glib::MainContext::default().spawn_local(async move { - let m: Vec<_> = mimes.iter().map(|s|s.as_str()).collect(); - let res = clipboard.read_async_future(&m, glib::Priority::default()).await; - log::debug!("clipboard-read: {}", res.is_ok()); - let reply = match res { - Ok((stream, mime)) => { - let out = gio::MemoryOutputStream::new_resizable(); - let res = out.splice_async_future( - &stream, - gio::OutputStreamSpliceFlags::CLOSE_SOURCE | gio::OutputStreamSpliceFlags::CLOSE_TARGET, - glib::Priority::default()).await; - match res { - Ok(_) => { - let data = out.steal_as_bytes(); - Ok((mime.to_string(), data.as_ref().to_vec())) - } - Err(e) => { - Err(qdl::Error::Failed(format!("{}", e))) - } - } - } - Err(e) => { - Err(qdl::Error::Failed(format!("{}", e))) - } - }; - let _ = tx.lock().unwrap().send(reply); - }); - } - } + async fn grab(&mut self, selection: ClipboardSelection, serial: u32, mimes: Vec) { + if let Some((clipboard, idx)) = clipboard_from_selection(selection) { + let cur_serial = self.serials[idx].load(Ordering::SeqCst); + if serial < cur_serial { + log::debug!("Ignored peer grab: {} < {}", serial, cur_serial); + return; } - Continue(true) - }); + self.serials[idx].store(serial, Ordering::SeqCst); + let m: Vec<_> = mimes.iter().map(|s| s.as_str()).collect(); + let p = self.proxy.clone(); + let content = rdw::ContentProvider::new(&m, move |mime, stream, prio| { + log::debug!("content-provider-write: {:?}", (mime, stream)); + + let p = p.clone(); + let mime = mime.to_string(); + Some(Box::pin( + clone!(@strong stream => @default-return panic!(), async move { + match p.request(selection, &[&mime]).await { + Ok((_, data)) => { + let bytes = glib::Bytes::from(&data); + stream.write_bytes_async_future(&bytes, prio).await.map(|_| ()) + } + Err(e) => { + let err = format!("failed to request clipboard data: {}", e); + log::warn!("{}", err); + Err(glib::Error::new(gio::IOErrorEnum::Failed, &err)) + } + } + }), + )) + }); + + if let Err(e) = clipboard.set_content(Some(&content)) { + log::warn!("Failed to set clipboard grab: {}", e); + } + } + } + + async fn release(&mut self, selection: ClipboardSelection) { + if let Some((clipboard, _)) = clipboard_from_selection(selection) { + // TODO: track if the outside/app changed the clipboard + if let Err(e) = clipboard.set_content(gdk::NONE_CONTENT_PROVIDER) { + log::warn!("Failed to release clipboard: {}", e); + } + } + } + + async fn request( + &mut self, + selection: ClipboardSelection, + mimes: Vec, + ) -> qemu_display::Result<(String, Vec)> { + // we have to spawn a local future, because clipboard is not Send + let (sender, receiver) = futures::channel::oneshot::channel(); + glib::MainContext::default().spawn_local(async move { + let res = if let Some((clipboard, _)) = clipboard_from_selection(selection) { + let m: Vec<_> = mimes.iter().map(|s| s.as_str()).collect(); + let res = clipboard + .read_async_future(&m, glib::Priority::default()) + .await; + log::debug!("clipboard-read: {}", res.is_ok()); + match res { + Ok((stream, mime)) => { + let out = gio::MemoryOutputStream::new_resizable(); + let res = out + .splice_async_future( + &stream, + gio::OutputStreamSpliceFlags::CLOSE_SOURCE + | gio::OutputStreamSpliceFlags::CLOSE_TARGET, + glib::Priority::default(), + ) + .await; + match res { + Ok(_) => { + let data = out.steal_as_bytes(); + Ok((mime.to_string(), data.as_ref().to_vec())) + } + Err(e) => Err(qemu_display::Error::Failed(format!("{}", e))), + } + } + Err(e) => Err(qemu_display::Error::Failed(format!("{}", e))), + } + } else { + Err(qemu_display::Error::Failed( + "Clipboard request failed".into(), + )) + }; + sender.send(res).unwrap() + }); + match receiver.await { + Ok(res) => res, + Err(e) => Err(qemu_display::Error::Failed(format!( + "Clipboard request failed: {}", + e + ))), + } + } +} + +impl Handler { + pub async fn new(clipboard: Clipboard) -> Result> { + let proxy = clipboard.proxy.clone(); + let serials = Arc::new([AtomicU32::new(0), AtomicU32::new(0)]); let cb_handler = watch_clipboard( - ctxt.proxy.clone(), + clipboard.proxy.clone(), ClipboardSelection::Clipboard, serials.clone(), ); let cb_primary_handler = watch_clipboard( - ctxt.proxy.clone(), + clipboard.proxy.clone(), ClipboardSelection::Primary, serials.clone(), ); - - ctxt.register().await?; - Ok(Self { - rx, + clipboard.register(InnerHandler { proxy, serials }).await?; + Ok(Handler { + clipboard, cb_handler, cb_primary_handler, }) } } +impl Drop for Handler { + fn drop(&mut self) { + if let Some(id) = self.cb_primary_handler.take() { + clipboard_from_selection(ClipboardSelection::Primary) + .unwrap() + .0 + .disconnect(id); + } + if let Some(id) = self.cb_handler.take() { + clipboard_from_selection(ClipboardSelection::Clipboard) + .unwrap() + .0 + .disconnect(id); + } + } +} + fn watch_clipboard( proxy: AsyncClipboardProxy<'static>, selection: ClipboardSelection, - serials: Rc<[Cell; 2]>, + serials: Arc<[AtomicU32; 2]>, ) -> Option { let (clipboard, idx) = match clipboard_from_selection(selection) { Some(it) => it, @@ -161,9 +211,9 @@ fn watch_clipboard( let _ = proxy.release(selection).await; } else { let mimes: Vec<_> = types.iter().map(|s| s.as_str()).collect(); - let ser = serials[idx].get(); + let ser = serials[idx].load(Ordering::SeqCst); let _ = proxy.grab(selection, ser, &mimes).await; - serials[idx].set(ser + 1); + serials[idx].store(ser + 1, Ordering::SeqCst); } }); } diff --git a/qemu-rdw/src/display.rs b/qemu-rdw/src/display.rs index 2a0eb5d..86cebe0 100644 --- a/qemu-rdw/src/display.rs +++ b/qemu-rdw/src/display.rs @@ -1,15 +1,15 @@ +use futures_util::StreamExt; use glib::{clone, subclass::prelude::*, MainContext}; -use gtk::{glib, prelude::*}; -use once_cell::sync::OnceCell; - +use gtk::glib; use keycodemap::KEYMAP_XORGEVDEV2QNUM; -use qemu_display::Console; -use rdw::DisplayExt; +use once_cell::sync::OnceCell; +use qemu_display::{Console, ConsoleListenerHandler}; +use rdw::{gtk, DisplayExt}; +use std::os::unix::io::IntoRawFd; mod imp { use super::*; use gtk::subclass::prelude::*; - use std::{convert::TryInto, os::unix::io::IntoRawFd}; #[repr(C)] pub struct RdwDisplayQemuClass { @@ -136,22 +136,17 @@ mod imp { MainContext::default().spawn_local(clone!(@weak widget => async move { let self_ = Self::from_instance(&widget); let console = self_.console.get().unwrap(); - - let (rx, wait_tx) = console - .glib_listen() - .await - .expect("Failed to listen to the console"); - rx.attach( - None, - clone!(@weak widget => @default-panic, move |evt| { - use qemu_display::ConsoleEvent::*; - - log::debug!("Console event: {:?}", evt); - match evt { + // we have to use a channel, because widget is not Send.. + let (sender, mut receiver) = futures::channel::mpsc::unbounded(); + console.register_listener(ConsoleHandler { sender }).await.unwrap(); + MainContext::default().spawn_local(clone!(@weak widget => async move { + while let Some(e) = receiver.next().await { + use ConsoleEvent::*; + match e { Scanout(s) => { if s.format != 0x20020888 { log::warn!("Format not yet supported: {:X}", s.format); - return Continue(true); + continue; } widget.set_display_size(Some((s.width as _, s.height as _))); widget.update_area(0, 0, s.width as _, s.height as _, s.stride as _, &s.data); @@ -159,7 +154,7 @@ mod imp { Update(u) => { if u.format != 0x20020888 { log::warn!("Format not yet supported: {:X}", u.format); - return Continue(true); + continue; } widget.update_area(u.x as _, u.y as _, u.w as _, u.h as _, u.stride as _, &u.data); } @@ -175,19 +170,20 @@ mod imp { fd: s.into_raw_fd(), }); } - UpdateDMABUF { .. } => { + UpdateDMABUF { wait_tx, .. } => { widget.render(); let _ = wait_tx.send(()); } Disconnected => { + log::warn!("Console disconnected"); } - CursorDefine { width, height, hot_x, hot_y, data }=> { + CursorDefine(c) => { let cursor = rdw::Display::make_cursor( - &data, - width, - height, - hot_x, - hot_y, + &c.data, + c.width, + c.height, + c.hot_x, + c.hot_y, 1, ); widget.define_cursor(Some(cursor)); @@ -200,41 +196,21 @@ mod imp { } } } - Continue(true) - }) - ); - - let mut abs_changed = console.mouse.receive_is_absolute_changed().await; - MainContext::default().spawn_local(clone!(@weak widget => async move { - use futures_util::StreamExt; - - while let Some(abs) = abs_changed.next().await { - let abs = if let Some(abs) = abs { - abs.try_into().unwrap_or(false) - } else { - continue; - }; - widget.set_mouse_absolute(abs); } })); - - loop { - if let Err(e) = console.dispatch_signals().await { - log::warn!("Console dispatching error: {}", e); - break; + let mut abs_changed = console.mouse.receive_is_absolute_changed().await; + MainContext::default().spawn_local(clone!(@weak widget => async move { + while let Some(abs) = abs_changed.next().await { + if let Some(abs) = abs { + widget.set_mouse_absolute(abs); + } } - } + })); })); } } impl rdw::DisplayImpl for Display {} - - impl Display { - pub(crate) fn set_console(&self, console: Console) { - self.console.set(console).unwrap(); - } - } } glib::wrapper! { @@ -245,7 +221,7 @@ impl Display { pub fn new(console: Console) -> Self { let obj = glib::Object::new::(&[]).unwrap(); let self_ = imp::Display::from_instance(&obj); - self_.set_console(console); + self_.console.set(console).unwrap(); obj } @@ -255,6 +231,67 @@ impl Display { } } +#[derive(Debug)] +enum ConsoleEvent { + Scanout(qemu_display::Scanout), + Update(qemu_display::Update), + ScanoutDMABUF(qemu_display::ScanoutDMABUF), + UpdateDMABUF { + _update: qemu_display::UpdateDMABUF, + wait_tx: futures::channel::oneshot::Sender<()>, + }, + MouseSet(qemu_display::MouseSet), + CursorDefine(qemu_display::Cursor), + Disconnected, +} + +struct ConsoleHandler { + sender: futures::channel::mpsc::UnboundedSender, +} + +impl ConsoleHandler { + fn send(&self, event: ConsoleEvent) { + if let Err(e) = self.sender.unbounded_send(event) { + log::warn!("failed to send console event: {}", e); + } + } +} + +#[async_trait::async_trait] +impl ConsoleListenerHandler for ConsoleHandler { + async fn scanout(&mut self, scanout: qemu_display::Scanout) { + self.send(ConsoleEvent::Scanout(scanout)); + } + + async fn update(&mut self, update: qemu_display::Update) { + self.send(ConsoleEvent::Update(update)); + } + + async fn scanout_dmabuf(&mut self, scanout: qemu_display::ScanoutDMABUF) { + self.send(ConsoleEvent::ScanoutDMABUF(scanout)); + } + + async fn update_dmabuf(&mut self, _update: qemu_display::UpdateDMABUF) { + let (wait_tx, wait_rx) = futures::channel::oneshot::channel(); + self.send(ConsoleEvent::UpdateDMABUF { _update, wait_tx }); + if let Err(e) = wait_rx.await { + log::warn!("wait update dmabuf failed: {}", e); + } + } + + async fn mouse_set(&mut self, set: qemu_display::MouseSet) { + self.send(ConsoleEvent::MouseSet(set)); + } + + async fn cursor_define(&mut self, cursor: qemu_display::Cursor) { + self.send(ConsoleEvent::CursorDefine(cursor)); + } + + fn disconnected(&mut self) { + self.send(ConsoleEvent::Disconnected); + } +} + fn from_gdk_button(button: u32) -> qemu_display::MouseButton { use qemu_display::MouseButton::*; diff --git a/qemu-rdw/src/main.rs b/qemu-rdw/src/main.rs index ea21141..9314a3f 100644 --- a/qemu-rdw/src/main.rs +++ b/qemu-rdw/src/main.rs @@ -2,9 +2,8 @@ use gio::ApplicationFlags; use glib::MainContext; use gtk::{gio, glib, prelude::*}; use qemu_display::{Chardev, Console, Display}; -use std::cell::RefCell; -use std::sync::Arc; -use zbus::Connection; +use rdw::gtk; +use std::{cell::RefCell, sync::Arc}; mod audio; mod clipboard; @@ -13,7 +12,6 @@ mod usbredir; struct Inner { app: gtk::Application, - conn: zbus::azync::Connection, usbredir: RefCell>, audio: RefCell>, clipboard: RefCell>, @@ -69,20 +67,15 @@ impl App { if opt.lookup_value("list", None).is_some() { app_opt.list = true; } - app_opt.vm_name = - opt.lookup_value(&glib::OPTION_REMAINING, None) - .and_then(|args| args.child_value(0).get::()); + app_opt.vm_name = opt + .lookup_value(&glib::OPTION_REMAINING, None) + .and_then(|args| args.child_value(0).get::()); -1 }); - let conn = Connection::session() - .expect("Failed to connect to DBus") - .into(); - let app = App { inner: Arc::new(Inner { app, - conn, usbredir: Default::default(), audio: Default::default(), clipboard: Default::default(), @@ -90,7 +83,6 @@ impl App { }; let app_clone = app.clone(); - let opt_clone = opt.clone(); app.inner.app.connect_activate(move |app| { let ui_src = include_str!("main.ui"); let builder = gtk::Builder::new(); @@ -102,11 +94,24 @@ impl App { window.set_application(Some(app)); let app_clone = app_clone.clone(); - let opt_clone = opt_clone.clone(); + let opt_clone = opt.clone(); MainContext::default().spawn_local(async move { - // let opt = opt_clone.borrow(); + let conn = zbus::ConnectionBuilder::session() + .unwrap() + .internal_executor(false) + .build() + .await + .expect("Failed to connect to DBus"); + + let conn_clone = conn.clone(); + MainContext::default().spawn_local(async move { + loop { + conn_clone.executor().tick().await; + } + }); + if opt_clone.borrow().list { - let list = Display::by_name(app_clone.connection()).await.unwrap(); + let list = Display::by_name(&conn).await.unwrap(); for (name, dest) in list { println!("{} (at {})", name, dest); } @@ -114,20 +119,18 @@ impl App { return; } let dest = if let Some(name) = opt_clone.borrow().vm_name.as_ref() { - let list = Display::by_name(app_clone.connection()).await.unwrap(); + let list = Display::by_name(&conn).await.unwrap(); Some( list.get(name) - .expect(&format!("Can't find VM name: {}", name)) + .unwrap_or_else(|| panic!("Can't find VM name: {}", name)) .clone(), ) } else { None }; - let display = Display::new(app_clone.connection(), dest.as_ref()) - .await - .unwrap(); + let display = Display::new(&conn, dest.as_ref()).await.unwrap(); - let console = Console::new(app_clone.connection(), 0) + let console = Console::new(&conn, 0) .await .expect("Failed to get the QEMU console"); let rdw = display::Display::new(console); @@ -143,22 +146,26 @@ impl App { if let Ok(Some(audio)) = display.audio().await { match audio::Handler::new(audio).await { Ok(handler) => app_clone.set_audio(handler), - Err(e) => log::warn!("Failed to setup audio: {}", e), + Err(e) => { + log::warn!("Failed to setup audio handler: {}", e); + } } } if let Ok(Some(clipboard)) = display.clipboard().await { match clipboard::Handler::new(clipboard).await { Ok(handler) => app_clone.set_clipboard(handler), - Err(e) => log::warn!("Failed to setup clipboard: {}", e), + Err(e) => { + log::warn!("Failed to setup clipboard handler: {}", e); + } } } - if let Ok(c) = Chardev::new(app_clone.connection(), "qmp").await { - use std::io::prelude::*; - use std::io::BufReader; - use std::os::unix::io::AsRawFd; - use std::os::unix::net::UnixStream; + if let Ok(c) = Chardev::new(&conn, "qmp").await { + use std::{ + io::{prelude::*, BufReader}, + os::unix::{io::AsRawFd, net::UnixStream}, + }; let (p0, p1) = UnixStream::pair().unwrap(); if c.proxy.register(p1.as_raw_fd().into()).await.is_ok() { @@ -192,10 +199,6 @@ impl App { app } - fn connection(&self) -> &zbus::azync::Connection { - &self.inner.conn - } - fn set_usbredir(&self, usbredir: usbredir::Handler) { self.inner.usbredir.replace(Some(usbredir)); } @@ -204,8 +207,8 @@ impl App { self.inner.audio.replace(Some(audio)); } - fn set_clipboard(&self, clipboard: clipboard::Handler) { - self.inner.clipboard.replace(Some(clipboard)); + fn set_clipboard(&self, cb: clipboard::Handler) { + self.inner.clipboard.replace(Some(cb)); } fn run(&self) -> i32 { diff --git a/qemu-rdw/src/usbredir.rs b/qemu-rdw/src/usbredir.rs index 49682f4..bc01802 100644 --- a/qemu-rdw/src/usbredir.rs +++ b/qemu-rdw/src/usbredir.rs @@ -1,6 +1,7 @@ use glib::{clone, MainContext}; use gtk::{glib, prelude::*}; use qemu_display::UsbRedir; +use rdw::gtk; #[derive(Clone, Debug)] pub struct Handler { @@ -30,7 +31,7 @@ impl Handler { let usbredir = self.usbredir.clone(); widget.connect_device_state_set(move |widget, item, state| { let device = match item.device() { - Some(it) => it.clone(), + Some(it) => it, _ => return, }; diff --git a/qemu-vnc/Cargo.toml b/qemu-vnc/Cargo.toml index 386a13e..c2118a2 100644 --- a/qemu-vnc/Cargo.toml +++ b/qemu-vnc/Cargo.toml @@ -16,3 +16,4 @@ libc = "0.2.86" image = "0.23.14" derivative = "2.2.0" async-io = "1.3.1" +async-trait = "0.1.48" diff --git a/qemu-vnc/src/main.rs b/qemu-vnc/src/main.rs index 510c15e..946ce7b 100644 --- a/qemu-vnc/src/main.rs +++ b/qemu-vnc/src/main.rs @@ -1,20 +1,22 @@ -use std::iter::FromIterator; -use std::net::{TcpListener, TcpStream}; -use std::sync::mpsc; -use std::sync::{Arc, Mutex}; -use std::{collections::HashSet, convert::TryInto}; -use std::{error::Error, thread::JoinHandle}; -use std::{io, thread, time}; +use std::{ + borrow::Borrow, + collections::HashSet, + error::Error, + io, + iter::FromIterator, + net::{TcpListener, TcpStream}, + sync::{mpsc, Arc, Mutex}, + thread, time, +}; use clap::Clap; use image::GenericImage; use keycodemap::*; -use qemu_display::{Console, ConsoleEvent, MouseButton, VMProxy}; +use qemu_display::{AsyncVMProxy, Console, ConsoleListenerHandler, MouseButton}; use vnc::{ - server::Event as VncEvent, server::FramebufferUpdate, Encoding, Error as VncError, PixelFormat, - Rect, Screen, Server as VncServer, + server::{Event as VncEvent, FramebufferUpdate}, + Encoding, Error as VncError, PixelFormat, Rect, Screen, Server as VncServer, }; -use zbus::Connection; #[derive(Clap, Debug)] pub struct SocketAddrArgs { @@ -233,10 +235,59 @@ impl Client { } } +#[derive(Debug)] +struct ConsoleListener { + server: Server, +} + +#[async_trait::async_trait] +impl ConsoleListenerHandler for ConsoleListener { + async fn scanout(&mut self, s: qemu_display::Scanout) { + let mut inner = self.server.inner.lock().unwrap(); + inner.image = image_from_vec(s.format, s.width, s.height, s.stride, s.data); + } + + async fn update(&mut self, u: qemu_display::Update) { + let mut inner = self.server.inner.lock().unwrap(); + let update = image_from_vec(u.format, u.w as _, u.h as _, u.stride, u.data); + if (u.x, u.y) == (0, 0) && update.dimensions() == inner.image.dimensions() { + inner.image = update; + } else { + inner.image.copy_from(&update, u.x as _, u.y as _).unwrap(); + } + let rect = Rect { + left: u.x as _, + top: u.y as _, + width: u.w as _, + height: u.h as _, + }; + inner.tx.send(Event::ConsoleUpdate(rect)).unwrap(); + } + + async fn scanout_dmabuf(&mut self, _scanout: qemu_display::ScanoutDMABUF) { + unimplemented!() + } + + async fn update_dmabuf(&mut self, _update: qemu_display::UpdateDMABUF) { + unimplemented!() + } + + async fn mouse_set(&mut self, set: qemu_display::MouseSet) { + dbg!(set); + } + + async fn cursor_define(&mut self, cursor: qemu_display::Cursor) { + dbg!(cursor); + } + + fn disconnected(&mut self) { + dbg!(); + } +} + #[derive(Debug)] struct ServerInner { console: Console, - console_thread: Option>, image: BgraImage, tx: mpsc::Sender, } @@ -257,76 +308,24 @@ impl Server { Ok(Self { vm_name, rx: Arc::new(Mutex::new(rx)), - inner: Arc::new(Mutex::new(ServerInner { - console, - console_thread: None, - image, - tx, - })), + inner: Arc::new(Mutex::new(ServerInner { console, image, tx })), }) } fn stop_console(&self) -> Result<(), Box> { let mut inner = self.inner.lock().unwrap(); - if let Some(_thread) = inner.console_thread.take() { - todo!("join console thread"); - //thread.join().unwrap(); - } + inner.console.unregister_listener(); Ok(()) } async fn run_console(&self) -> Result<(), Box> { let mut inner = self.inner.lock().unwrap(); - if inner.console_thread.is_some() { - return Ok(()); - } - - let server = self.clone(); - let (console_rx, _ack) = inner.console.listen().await?; - - let thread = thread::spawn(move || loop { - match console_rx.recv().unwrap() { - ConsoleEvent::ScanoutDMABUF(_) | ConsoleEvent::UpdateDMABUF { .. } => { - unimplemented!(); - } - ConsoleEvent::Scanout(s) => { - let mut inner = server.inner.lock().unwrap(); - inner.image = image_from_vec(s.format, s.width, s.height, s.stride, s.data); - } - ConsoleEvent::Update(u) => { - let mut inner = server.inner.lock().unwrap(); - let update = image_from_vec( - u.format, - u.w.try_into().unwrap(), - u.h.try_into().unwrap(), - u.stride, - u.data, - ); - if (u.x, u.y) == (0, 0) && update.dimensions() == inner.image.dimensions() { - inner.image = update; - } else { - inner - .image - .copy_from(&update, u.x.try_into().unwrap(), u.y.try_into().unwrap()) - .unwrap(); - } - let rect = Rect { - left: u.x.try_into().unwrap(), - top: u.y.try_into().unwrap(), - width: u.w.try_into().unwrap(), - height: u.h.try_into().unwrap(), - }; - inner.tx.send(Event::ConsoleUpdate(rect)).unwrap(); - } - ConsoleEvent::CursorDefine { .. } => {} - ConsoleEvent::MouseSet(_) => {} - e => { - dbg!(e); - } - } - }); - - inner.console_thread = Some(thread); + inner + .console + .register_listener(ConsoleListener { + server: self.clone(), + }) + .await?; Ok(()) } @@ -471,13 +470,15 @@ async fn run() -> Result<(), Box> { let listener = TcpListener::bind::(args.address.into()).unwrap(); let dbus = if let Some(addr) = args.dbus_address { - Connection::new_for_address(&addr, true) + zbus::ConnectionBuilder::address(addr.borrow())? + .build() + .await } else { - Connection::new_session() + zbus::Connection::session().await } .expect("Failed to connect to DBus"); - let vm_name = VMProxy::new(&dbus)?.name()?; + let vm_name = AsyncVMProxy::new(&dbus).await?.name().await?; let console = Console::new(&dbus.into(), 0) .await diff --git a/qemu-vte/Cargo.toml b/qemu-vte/Cargo.toml index 283ad16..2b0f949 100644 --- a/qemu-vte/Cargo.toml +++ b/qemu-vte/Cargo.toml @@ -8,9 +8,9 @@ log = "0.4" pretty_env_logger = "0.4" once_cell = "1.5" zbus = { version = "2.0.0-beta" } -qemu-display = { path = "../qemu-display", features = ["glib"] } +qemu-display = { path = "../qemu-display" } futures = "0.3.13" [dependencies.vte] package = "vte4" -git = "https://gitlab.gnome.org/malureau/vte4-rs" +version = "0.0.1" diff --git a/qemu-vte/src/main.rs b/qemu-vte/src/main.rs index 1cf95ad..5023973 100644 --- a/qemu-vte/src/main.rs +++ b/qemu-vte/src/main.rs @@ -1,11 +1,10 @@ use futures::prelude::*; use glib::{clone, MainContext}; +use gtk::{gio, glib}; use qemu_display::Chardev; -use std::os::unix::io::AsRawFd; -use std::os::unix::net::UnixStream; -use vte::prelude::*; -use vte::{gio, glib, gtk}; -use zbus::azync::Connection; +use std::os::unix::{io::AsRawFd, net::UnixStream}; +use vte::{gtk, prelude::*}; +use zbus::Connection; fn main() { pretty_env_logger::init();