Fix usbredir cleanups on stop

We need to kick all the helper threads to avoid "leaking" them.

Signed-off-by: Marc-André Lureau <marcandre.lureau@redhat.com>
This commit is contained in:
Marc-André Lureau 2021-08-20 16:24:35 +04:00
parent b5bb98ad0e
commit 36d9aa07ab

View File

@ -25,6 +25,7 @@ struct InnerHandler {
stream_thread: JoinHandle<()>, stream_thread: JoinHandle<()>,
ctxt: rusb::Context, ctxt: rusb::Context,
ctxt_thread: Option<JoinHandle<()>>, ctxt_thread: Option<JoinHandle<()>>,
event: (UnixStream, UnixStream),
quit: bool, quit: bool,
} }
@ -36,7 +37,7 @@ struct Handler {
impl DeviceHandler for Handler { impl DeviceHandler for Handler {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
let read = match fd_poll_readable(inner.stream.as_raw_fd(), false) { let read = match fd_poll_readable(inner.stream.as_raw_fd(), None) {
Ok(true) => { Ok(true) => {
let read = inner.stream.read(buf); let read = inner.stream.read(buf);
if let Ok(0) = read { if let Ok(0) = read {
@ -99,17 +100,17 @@ impl Handler {
}; };
let (stream, peer) = UnixStream::pair()?; let (stream, peer) = UnixStream::pair()?;
chardev.proxy.register(dbg!(peer.as_raw_fd()).into()).await?; chardev.proxy.register(peer.as_raw_fd().into()).await?;
let c = ctxt.clone(); let c = ctxt.clone();
let stream_fd = stream.as_raw_fd(); let stream_fd = stream.as_raw_fd();
dbg!(stream_fd);
// really annoying libusb/usbredir APIs... // really annoying libusb/usbredir APIs...
let event = UnixStream::pair()?;
let event_fd = event.1.as_raw_fd();
let stream_thread = std::thread::spawn(move || loop { let stream_thread = std::thread::spawn(move || loop {
let ret = fd_poll_readable(stream_fd, true); let ret = fd_poll_readable(stream_fd, Some(event_fd));
c.interrupt_handle_events(); c.interrupt_handle_events();
if ret.is_err() { if ret.is_err() {
dbg!();
break; break;
} }
}); });
@ -119,6 +120,7 @@ impl Handler {
device_fd, device_fd,
stream, stream,
stream_thread, stream_thread,
event,
quit: false, quit: false,
ctxt: ctxt.clone(), ctxt: ctxt.clone(),
ctxt_thread: Default::default(), ctxt_thread: Default::default(),
@ -130,10 +132,9 @@ impl Handler {
let inner = handler.inner.clone(); let inner = handler.inner.clone();
let ctxt_thread = std::thread::spawn(move || loop { let ctxt_thread = std::thread::spawn(move || loop {
if inner.lock().unwrap().quit { if inner.lock().unwrap().quit {
dbg!();
break; break;
} }
if let Ok(true) = fd_poll_readable(stream_fd, false) { if let Ok(true) = fd_poll_readable(stream_fd, None) {
redirdev.read_peer().unwrap(); redirdev.read_peer().unwrap();
} }
if redirdev.has_data_to_write() > 0 { if redirdev.has_data_to_write() > 0 {
@ -152,18 +153,13 @@ impl Handler {
} }
} }
impl Drop for InnerHandler {
fn drop(&mut self) {
//FIXME: for some reason close stream doesn't HUP qemu ??
dbg!()
}
}
impl Drop for Handler { impl Drop for Handler {
fn drop(&mut self) { fn drop(&mut self) {
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
inner.quit = true; inner.quit = true;
inner.ctxt.interrupt_handle_events(); inner.ctxt.interrupt_handle_events();
// stream will be dropped and stream_thread will kick context_thread
inner.event.0.write(&[0]).unwrap();
} }
} }
@ -239,22 +235,30 @@ impl UsbRedir {
} }
} }
fn fd_poll_readable(fd: RawFd, wait: bool) -> std::io::Result<bool> { fn fd_poll_readable(fd: RawFd, wait: Option<RawFd>) -> std::io::Result<bool> {
let mut fds = [libc::pollfd { let mut fds = vec![libc::pollfd {
fd, fd,
events: libc::POLLIN|libc::POLLHUP, events: libc::POLLIN|libc::POLLHUP,
revents: 0, revents: 0,
}]; }];
let ret = unsafe { libc::poll(fds.as_mut_ptr(), 1, if wait { -1 } else { 0 }) }; if let Some(wait) = wait {
if ret > 0 { fds.push(libc::pollfd {
if fds[0].revents & libc::POLLHUP != 0 { fd: wait,
Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "hup")) events: libc::POLLIN|libc::POLLHUP,
} else { revents: 0,
Ok(fds[0].revents & libc::POLLIN != 0) });
} }
let ret = unsafe { libc::poll(fds.as_mut_ptr(),
fds.len() as _,
if wait.is_some() { -1 } else { 0 }) };
if ret < 0 {
Err(std::io::Error::last_os_error())
} else if ret == 0 { } else if ret == 0 {
Ok(false) Ok(false)
} else if fds[0].revents & libc::POLLHUP != 0 ||
(wait.is_some() && fds[1].revents & libc::POLLIN != 0) {
Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "hup"))
} else { } else {
Err(std::io::Error::last_os_error()) Ok(fds[0].revents & libc::POLLIN != 0)
} }
} }