Merge pull request #458 from fufesou/feat/fs_read_cm

feat(fs): add init_data_stream_for_cm for cm reading
This commit is contained in:
RustDesk
2025-12-18 15:41:59 +08:00
committed by GitHub
2 changed files with 78 additions and 26 deletions

View File

@@ -2550,6 +2550,16 @@ pub mod keys {
pub const OPTION_REGISTER_DEVICE: &str = "register-device";
pub const OPTION_RELAY_SERVER: &str = "relay-server";
pub const OPTION_ICE_SERVERS: &str = "ice-servers";
/// Maximum number of files allowed during a single file transfer request.
///
/// Key: `file-transfer-max-files`.
/// Unit: number of files (not bytes).
///
/// Behaviour:
/// - If set to a positive integer N, at most N files are allowed.
/// - If set to 0, a safe built-in default is used (see DEFAULT_MAX_VALIDATED_FILES).
/// - If unset, negative, or non-integer, no explicit limit is enforced for backward compatibility.
pub const OPTION_FILE_TRANSFER_MAX_FILES: &str = "file-transfer-max-files";
pub const OPTION_DISABLE_UDP: &str = "disable-udp";
pub const OPTION_ALLOW_INSECURE_TLS_FALLBACK: &str = "allow-insecure-tls-fallback";
pub const OPTION_SHOW_VIRTUAL_MOUSE: &str = "show-virtual-mouse";
@@ -2776,6 +2786,7 @@ pub mod keys {
OPTION_REGISTER_DEVICE,
OPTION_HIDE_POWERED_BY_ME,
OPTION_MAIN_WINDOW_ALWAYS_ON_TOP,
OPTION_FILE_TRANSFER_MAX_FILES,
];
}

View File

@@ -701,14 +701,16 @@ impl TransferJob {
}
}
async fn init_data_stream(&mut self, stream: &mut crate::Stream) -> ResultType<()> {
/// Open the data stream for the current file.
/// Returns Ok(true) if job is done, Ok(false) otherwise.
async fn open_data_stream(&mut self) -> ResultType<bool> {
let file_num = self.file_num as usize;
match &mut self.data_source {
DataSource::FilePath(p) => {
if file_num >= self.files.len() {
// job done
self.data_stream.take();
return Ok(());
return Ok(true);
};
if self.data_stream.is_none() {
match File::open(Self::join(p, &self.files[file_num].name)).await {
@@ -717,6 +719,8 @@ impl TransferJob {
self.file_confirmed = false;
self.file_is_waiting = false;
}
// On open error, behave the same as validation failure: advance
// to next file and return the error.
Err(err) => {
self.file_num += 1;
self.file_confirmed = false;
@@ -734,17 +738,58 @@ impl TransferJob {
}
}
}
if self.r#type == JobType::Generic {
if self.enable_overwrite_detection && !self.file_confirmed() {
if !self.file_is_waiting() {
self.send_current_digest(stream).await?;
self.set_file_is_waiting(true);
}
}
Ok(false)
}
/// Get current file's digest (last_modified, file_size) for overwrite detection.
async fn get_current_digest(&self) -> ResultType<(u64, u64)> {
let meta = match self.data_stream.as_ref().ok_or(anyhow!("file is None"))? {
DataStream::FileStream(file) => file.metadata().await?,
DataStream::BufStream(_) => bail!("No digest for buf stream"),
};
let last_modified = meta
.modified()?
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
Ok((last_modified, meta.len()))
}
async fn init_data_stream(&mut self, stream: &mut crate::Stream) -> ResultType<()> {
if self.open_data_stream().await? {
return Ok(());
}
if self.r#type == JobType::Generic
&& self.enable_overwrite_detection
&& !self.file_confirmed()
&& !self.file_is_waiting()
{
self.send_current_digest(stream).await?;
self.set_file_is_waiting(true);
}
Ok(())
}
/// Initialize data stream for CM (Connection Manager) scenario.
/// Returns digest info (last_modified, file_size) if overwrite detection is enabled,
/// so caller can send it via IPC instead of network stream.
/// Returns Ok(None) if job is done or already initialized.
pub async fn init_data_stream_for_cm(&mut self) -> ResultType<Option<(u64, u64)>> {
if self.open_data_stream().await? {
return Ok(None);
}
// For overwrite detection, return digest info instead of sending via stream
if self.r#type == JobType::Generic
&& self.enable_overwrite_detection
&& !self.file_confirmed()
&& !self.file_is_waiting()
{
let digest = self.get_current_digest().await?;
self.set_file_is_waiting(true);
return Ok(Some(digest));
}
Ok(None)
}
pub async fn read(&mut self) -> ResultType<Option<FileTransferBlock>> {
if self.r#type == JobType::Generic {
if self.enable_overwrite_detection && !self.file_confirmed() {
@@ -753,19 +798,22 @@ impl TransferJob {
}
let file_num = self.file_num as usize;
let name: &str;
match &mut self.data_source {
DataSource::FilePath(..) => {
let name = match &self.data_source {
DataSource::FilePath(p) => {
if file_num >= self.files.len() {
self.data_stream.take();
return Ok(None);
};
name = &self.files[file_num].name;
if self.files.len() == 1 && self.files[file_num].name.is_empty() {
p.file_name()
.map(|p| p.to_str().unwrap_or(""))
.unwrap_or("")
} else {
&self.files[file_num].name
}
}
DataSource::MemoryCursor(..) => {
name = "";
}
}
DataSource::MemoryCursor(..) => "",
};
const BUF_SIZE: usize = 128 * 1024;
let mut buf: Vec<u8> = vec![0; BUF_SIZE];
let mut compressed = false;
@@ -825,21 +873,14 @@ impl TransferJob {
// Only for generic job and file stream
async fn send_current_digest(&mut self, stream: &mut Stream) -> ResultType<()> {
let (last_modified, file_size) = self.get_current_digest().await?;
let mut msg = Message::new();
let mut resp = FileResponse::new();
let meta = match self.data_stream.as_ref().ok_or(anyhow!("file is None"))? {
DataStream::FileStream(file) => file.metadata().await?,
DataStream::BufStream(_) => bail!("No need to send digest for buf stream"),
};
let last_modified = meta
.modified()?
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
resp.set_digest(FileTransferDigest {
id: self.id,
file_num: self.file_num,
last_modified,
file_size: meta.len(),
file_size,
is_resume: self.is_resume,
..Default::default()
});