Merge pull request #84 from fufesou/feat/remote_printer_fs

refact: fs, buf stream
This commit is contained in:
RustDesk 2025-03-27 14:25:03 +08:00 committed by GitHub
commit 9ede5d49f6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

295
src/fs.rs
View File

@ -1,6 +1,8 @@
#[cfg(windows)] #[cfg(windows)]
use std::os::windows::prelude::*; use std::os::windows::prelude::*;
use std::{ use std::{
fmt::{Debug, Display},
io::Cursor,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::atomic::{AtomicI32, Ordering}, sync::atomic::{AtomicI32, Ordering},
time::{Duration, SystemTime, UNIX_EPOCH}, time::{Duration, SystemTime, UNIX_EPOCH},
@ -8,7 +10,10 @@ use std::{
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use tokio::{fs::File, io::*}; use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWriteExt, BufStream as TokioBufStream},
};
use crate::{anyhow::anyhow, bail, get_version_number, message_proto::*, ResultType, Stream}; use crate::{anyhow::anyhow, bail, get_version_number, message_proto::*, ResultType, Stream};
// https://doc.rust-lang.org/std/os/windows/fs/trait.MetadataExt.html // https://doc.rust-lang.org/std/os/windows/fs/trait.MetadataExt.html
@ -301,13 +306,86 @@ impl JobType {
} }
} }
#[derive(Debug)]
pub enum DataSource {
FilePath(PathBuf),
MemoryCursor(Cursor<Vec<u8>>),
}
impl Default for DataSource {
fn default() -> Self {
DataSource::FilePath(PathBuf::new())
}
}
impl serde::Serialize for DataSource {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self {
DataSource::FilePath(p) => serializer.serialize_str(p.to_str().unwrap_or("")),
DataSource::MemoryCursor(_) => serializer.serialize_str(""),
}
}
}
impl Display for DataSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DataSource::FilePath(p) => write!(f, "File: {}", p.to_string_lossy().to_string()),
DataSource::MemoryCursor(_) => write!(f, "Bytes"),
}
}
}
impl DataSource {
fn to_meta(&self) -> String {
match self {
DataSource::FilePath(p) => p.to_string_lossy().to_string(),
DataSource::MemoryCursor(_) => "".to_string(),
}
}
}
enum DataStream {
FileStream(File),
BufStream(TokioBufStream<Cursor<Vec<u8>>>),
}
impl Debug for DataStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DataStream::FileStream(fs) => write!(f, "{:?}", fs),
DataStream::BufStream(_) => write!(f, "BufStream"),
}
}
}
impl DataStream {
async fn write_all(&mut self, buf: &[u8]) -> ResultType<()> {
match self {
DataStream::FileStream(fs) => fs.write_all(buf).await?,
DataStream::BufStream(bs) => bs.write_all(buf).await?,
}
Ok(())
}
async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self {
DataStream::FileStream(fs) => fs.read(buf).await,
DataStream::BufStream(bs) => bs.read(buf).await,
}
}
}
#[derive(Default, Serialize, Debug)] #[derive(Default, Serialize, Debug)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct TransferJob { pub struct TransferJob {
pub id: i32, pub id: i32,
pub r#type: JobType, pub r#type: JobType,
pub remote: String, pub remote: String,
pub path: PathBuf, pub data_source: DataSource,
pub show_hidden: bool, pub show_hidden: bool,
pub is_remote: bool, pub is_remote: bool,
pub is_last_job: bool, pub is_last_job: bool,
@ -317,7 +395,7 @@ pub struct TransferJob {
pub conn_id: i32, // server only pub conn_id: i32, // server only
#[serde(skip_serializing)] #[serde(skip_serializing)]
file: Option<File>, data_stream: Option<DataStream>,
pub total_size: u64, pub total_size: u64,
finished_size: u64, finished_size: u64,
transferred: u64, transferred: u64,
@ -376,20 +454,20 @@ impl TransferJob {
id: i32, id: i32,
r#type: JobType, r#type: JobType,
remote: String, remote: String,
path: String, data_source: DataSource,
file_num: i32, file_num: i32,
show_hidden: bool, show_hidden: bool,
is_remote: bool, is_remote: bool,
files: Vec<FileEntry>, files: Vec<FileEntry>,
enable_overwrite_detection: bool, enable_overwrite_detection: bool,
) -> Self { ) -> Self {
log::info!("new write {}", path); log::info!("new write {}", data_source);
let total_size = files.iter().map(|x| x.size).sum(); let total_size = files.iter().map(|x| x.size).sum();
Self { Self {
id, id,
r#type, r#type,
remote, remote,
path: get_path(&path), data_source,
file_num, file_num,
show_hidden, show_hidden,
is_remote, is_remote,
@ -404,20 +482,27 @@ impl TransferJob {
id: i32, id: i32,
r#type: JobType, r#type: JobType,
remote: String, remote: String,
path: String, data_source: DataSource,
file_num: i32, file_num: i32,
show_hidden: bool, show_hidden: bool,
is_remote: bool, is_remote: bool,
enable_overwrite_detection: bool, enable_overwrite_detection: bool,
) -> ResultType<Self> { ) -> ResultType<Self> {
log::info!("new read {}", path); log::info!("new read {}", data_source);
let files = get_recursive_files(&path, show_hidden)?; let (files, total_size) = match &data_source {
let total_size = files.iter().map(|x| x.size).sum(); DataSource::FilePath(p) => {
let p = p.to_str().ok_or(anyhow!("Invalid path"))?;
let files = get_recursive_files(p, show_hidden)?;
let total_size = files.iter().map(|x| x.size).sum();
(files, total_size)
}
DataSource::MemoryCursor(c) => (Vec::new(), c.get_ref().len() as u64),
};
Ok(Self { Ok(Self {
id, id,
r#type, r#type,
remote, remote,
path: get_path(&path), data_source,
file_num, file_num,
show_hidden, show_hidden,
is_remote, is_remote,
@ -428,6 +513,13 @@ impl TransferJob {
}) })
} }
pub fn get_buf_data(self) -> Option<Vec<u8>> {
match self.data_stream {
Some(DataStream::BufStream(bs)) => Some(bs.into_inner().into_inner()),
_ => None,
}
}
#[inline] #[inline]
pub fn files(&self) -> &Vec<FileEntry> { pub fn files(&self) -> &Vec<FileEntry> {
&self.files &self.files
@ -467,17 +559,19 @@ impl TransferJob {
if self.r#type == JobType::Printer { if self.r#type == JobType::Printer {
return; return;
} }
let file_num = self.file_num as usize; if let DataSource::FilePath(p) = &self.data_source {
if file_num < self.files.len() { let file_num = self.file_num as usize;
let entry = &self.files[file_num]; if file_num < self.files.len() {
let path = self.join(&entry.name); let entry = &self.files[file_num];
let download_path = format!("{}.download", get_string(&path)); let path = Self::join(p, &entry.name);
std::fs::rename(download_path, &path).ok(); let download_path = format!("{}.download", get_string(&path));
filetime::set_file_mtime( std::fs::rename(download_path, &path).ok();
&path, filetime::set_file_mtime(
filetime::FileTime::from_unix_time(entry.modified_time as _, 0), &path,
) filetime::FileTime::from_unix_time(entry.modified_time as _, 0),
.ok(); )
.ok();
}
} }
} }
@ -485,12 +579,14 @@ impl TransferJob {
if self.r#type == JobType::Printer { if self.r#type == JobType::Printer {
return; return;
} }
let file_num = self.file_num as usize; if let DataSource::FilePath(p) = &self.data_source {
if file_num < self.files.len() { let file_num = self.file_num as usize;
let entry = &self.files[file_num]; if file_num < self.files.len() {
let path = self.join(&entry.name); let entry = &self.files[file_num];
let download_path = format!("{}.download", get_string(&path)); let path = Self::join(p, &entry.name);
std::fs::remove_file(download_path).ok(); let download_path = format!("{}.download", get_string(&path));
std::fs::remove_file(download_path).ok();
}
} }
} }
@ -498,38 +594,47 @@ impl TransferJob {
if block.id != self.id { if block.id != self.id {
bail!("Wrong id"); bail!("Wrong id");
} }
let file_num = block.file_num as usize; match &self.data_source {
if file_num >= self.files.len() { DataSource::FilePath(p) => {
bail!("Wrong file number"); let file_num = block.file_num as usize;
} if file_num >= self.files.len() {
if file_num != self.file_num as usize || self.file.is_none() { bail!("Wrong file number");
self.modify_time();
if let Some(file) = self.file.as_mut() {
file.sync_all().await?;
}
self.file_num = block.file_num;
let entry = &self.files[file_num];
let path = if self.r#type == JobType::Printer {
self.path.to_string_lossy().to_string()
} else {
let path = self.join(&entry.name);
if let Some(p) = path.parent() {
std::fs::create_dir_all(p).ok();
} }
format!("{}.download", get_string(&path)) if file_num != self.file_num as usize || self.data_stream.is_none() {
}; self.modify_time();
self.file = Some(File::create(&path).await?); if let Some(DataStream::FileStream(file)) = self.data_stream.as_mut() {
file.sync_all().await?;
}
self.file_num = block.file_num;
let entry = &self.files[file_num];
let path = if self.r#type == JobType::Printer {
p.to_string_lossy().to_string()
} else {
let path = Self::join(p, &entry.name);
if let Some(pp) = path.parent() {
std::fs::create_dir_all(pp).ok();
}
format!("{}.download", get_string(&path))
};
self.data_stream = Some(DataStream::FileStream(File::create(&path).await?));
}
}
DataSource::MemoryCursor(c) => {
if self.data_stream.is_none() {
self.data_stream = Some(DataStream::BufStream(TokioBufStream::new(c.clone())));
}
}
} }
if block.compressed { if block.compressed {
let tmp = decompress(&block.data); let tmp = decompress(&block.data);
self.file self.data_stream
.as_mut() .as_mut()
.ok_or(anyhow!("file is None"))? .ok_or(anyhow!("data stream is None"))?
.write_all(&tmp) .write_all(&tmp)
.await?; .await?;
self.finished_size += tmp.len() as u64; self.finished_size += tmp.len() as u64;
} else { } else {
self.file self.data_stream
.as_mut() .as_mut()
.ok_or(anyhow!("file is None"))? .ok_or(anyhow!("file is None"))?
.write_all(&block.data) .write_all(&block.data)
@ -541,33 +646,46 @@ impl TransferJob {
} }
#[inline] #[inline]
pub fn join(&self, name: &str) -> PathBuf { pub fn join(p: &PathBuf, name: &str) -> PathBuf {
if name.is_empty() { if name.is_empty() {
self.path.clone() p.clone()
} else { } else {
self.path.join(name) p.join(name)
} }
} }
pub async fn read(&mut self, stream: &mut Stream) -> ResultType<Option<FileTransferBlock>> { pub async fn read(&mut self, stream: &mut Stream) -> ResultType<Option<FileTransferBlock>> {
let file_num = self.file_num as usize; let file_num = self.file_num as usize;
if file_num >= self.files.len() { let name: &str;
self.file.take(); match &mut self.data_source {
return Ok(None); DataSource::FilePath(p) => {
} if file_num >= self.files.len() {
let name = &self.files[file_num].name; self.data_stream.take();
if self.file.is_none() { return Ok(None);
match File::open(self.join(name)).await { };
Ok(file) => { name = &self.files[file_num].name;
self.file = Some(file); if self.data_stream.is_none() {
self.file_confirmed = false; match File::open(Self::join(p, name)).await {
self.file_is_waiting = false; Ok(file) => {
self.data_stream = Some(DataStream::FileStream(file));
self.file_confirmed = false;
self.file_is_waiting = false;
}
Err(err) => {
self.file_num += 1;
self.file_confirmed = false;
self.file_is_waiting = false;
return Err(err.into());
}
}
} }
Err(err) => { }
self.file_num += 1; DataSource::MemoryCursor(c) => {
self.file_confirmed = false; name = "";
self.file_is_waiting = false; if self.data_stream.is_none() {
return Err(err.into()); let mut t = std::io::Cursor::new(Vec::new());
std::mem::swap(&mut t, c);
self.data_stream = Some(DataStream::BufStream(TokioBufStream::new(t)));
} }
} }
} }
@ -586,15 +704,15 @@ impl TransferJob {
let mut offset: usize = 0; let mut offset: usize = 0;
loop { loop {
match self match self
.file .data_stream
.as_mut() .as_mut()
.ok_or(anyhow!("file is None"))? .ok_or(anyhow!("data stream is None"))?
.read(&mut buf[offset..]) .read(&mut buf[offset..])
.await .await
{ {
Err(err) => { Err(err) => {
self.file_num += 1; self.file_num += 1;
self.file = None; self.data_stream = None;
self.file_confirmed = false; self.file_confirmed = false;
self.file_is_waiting = false; self.file_is_waiting = false;
return Err(err.into()); return Err(err.into());
@ -609,13 +727,17 @@ impl TransferJob {
} }
unsafe { buf.set_len(offset) }; unsafe { buf.set_len(offset) };
if offset == 0 { if offset == 0 {
if matches!(self.data_source, DataSource::MemoryCursor(_)) {
self.data_stream.take();
return Ok(None);
}
self.file_num += 1; self.file_num += 1;
self.file = None; self.data_stream = None;
self.file_confirmed = false; self.file_confirmed = false;
self.file_is_waiting = false; self.file_is_waiting = false;
} else { } else {
self.finished_size += offset as u64; self.finished_size += offset as u64;
if !is_compressed_file(name) { if matches!(self.data_source, DataSource::FilePath(_)) && !is_compressed_file(name) {
let tmp = compress(&buf); let tmp = compress(&buf);
if tmp.len() < buf.len() { if tmp.len() < buf.len() {
buf = tmp; buf = tmp;
@ -633,15 +755,14 @@ impl TransferJob {
})) }))
} }
// Only for generic job and file stream
async fn send_current_digest(&mut self, stream: &mut Stream) -> ResultType<()> { async fn send_current_digest(&mut self, stream: &mut Stream) -> ResultType<()> {
let mut msg = Message::new(); let mut msg = Message::new();
let mut resp = FileResponse::new(); let mut resp = FileResponse::new();
let meta = self let meta = match self.data_stream.as_ref().ok_or(anyhow!("file is None"))? {
.file DataStream::FileStream(file) => file.metadata().await?,
.as_ref() DataStream::BufStream(_) => bail!("No need to send digest for buf stream"),
.ok_or(anyhow!("file is None"))? };
.metadata()
.await?;
let last_modified = meta let last_modified = meta
.modified()? .modified()?
.duration_since(SystemTime::UNIX_EPOCH)? .duration_since(SystemTime::UNIX_EPOCH)?
@ -727,7 +848,7 @@ impl TransferJob {
pub fn set_file_skipped(&mut self) -> bool { pub fn set_file_skipped(&mut self) -> bool {
log::debug!("skip file {} in job {}", self.file_num, self.id); log::debug!("skip file {} in job {}", self.file_num, self.id);
self.file.take(); self.data_stream.take();
self.set_file_confirmed(false); self.set_file_confirmed(false);
self.set_file_is_waiting(false); self.set_file_is_waiting(false);
self.file_num += 1; self.file_num += 1;
@ -761,7 +882,7 @@ impl TransferJob {
TransferJobMeta { TransferJobMeta {
id: self.id, id: self.id,
remote: self.remote.to_string(), remote: self.remote.to_string(),
to: self.path.to_string_lossy().to_string(), to: self.data_source.to_meta(),
file_num: self.file_num, file_num: self.file_num,
show_hidden: self.show_hidden, show_hidden: self.show_hidden,
is_remote: self.is_remote, is_remote: self.is_remote,
@ -875,8 +996,10 @@ pub fn new_done(id: i32, file_num: i32) -> Message {
} }
#[inline] #[inline]
pub fn remove_job(id: i32, jobs: &mut Vec<TransferJob>) { pub fn remove_job(id: i32, jobs: &mut Vec<TransferJob>) -> Option<TransferJob> {
*jobs = jobs.drain(0..).filter(|x| x.id() != id).collect(); jobs.iter()
.position(|x| x.id() == id)
.map(|index| jobs.remove(index))
} }
#[inline] #[inline]
@ -928,7 +1051,7 @@ pub async fn handle_read_jobs(
} }
} }
for id in finished { for id in finished {
remove_job(id, jobs); let _ = remove_job(id, jobs);
} }
Ok(job_log) Ok(job_log)
} }