feat(fs): add init_data_stream_for_cm for cm reading

Add new method `init_data_stream_for_cm()` to TransferJob to support
file reading and digest handling in Connection Manager (CM) context.

Signed-off-by: fufesou <linlong1266@gmail.com>
This commit is contained in:
fufesou
2025-11-26 19:52:35 +08:00
parent 8b0e258673
commit 98f58b97f2
2 changed files with 78 additions and 26 deletions

View File

@@ -2548,6 +2548,16 @@ pub mod keys {
pub const OPTION_REGISTER_DEVICE: &str = "register-device"; pub const OPTION_REGISTER_DEVICE: &str = "register-device";
pub const OPTION_RELAY_SERVER: &str = "relay-server"; pub const OPTION_RELAY_SERVER: &str = "relay-server";
pub const OPTION_ICE_SERVERS: &str = "ice-servers"; pub const OPTION_ICE_SERVERS: &str = "ice-servers";
/// Maximum number of files allowed during a single file transfer request.
///
/// Key: `file-transfer-max-files`.
/// Unit: number of files (not bytes).
///
/// Behaviour:
/// - If set to a positive integer N, at most N files are allowed.
/// - If set to 0, a safe built-in default is used (see DEFAULT_MAX_VALIDATED_FILES).
/// - If unset, negative, or non-integer, no explicit limit is enforced for backward compatibility.
pub const OPTION_FILE_TRANSFER_MAX_FILES: &str = "file-transfer-max-files";
pub const OPTION_DISABLE_UDP: &str = "disable-udp"; pub const OPTION_DISABLE_UDP: &str = "disable-udp";
pub const OPTION_ALLOW_INSECURE_TLS_FALLBACK: &str = "allow-insecure-tls-fallback"; pub const OPTION_ALLOW_INSECURE_TLS_FALLBACK: &str = "allow-insecure-tls-fallback";
pub const OPTION_SHOW_VIRTUAL_MOUSE: &str = "show-virtual-mouse"; pub const OPTION_SHOW_VIRTUAL_MOUSE: &str = "show-virtual-mouse";
@@ -2774,6 +2784,7 @@ pub mod keys {
OPTION_REGISTER_DEVICE, OPTION_REGISTER_DEVICE,
OPTION_HIDE_POWERED_BY_ME, OPTION_HIDE_POWERED_BY_ME,
OPTION_MAIN_WINDOW_ALWAYS_ON_TOP, OPTION_MAIN_WINDOW_ALWAYS_ON_TOP,
OPTION_FILE_TRANSFER_MAX_FILES,
]; ];
} }

View File

@@ -701,14 +701,16 @@ impl TransferJob {
} }
} }
async fn init_data_stream(&mut self, stream: &mut crate::Stream) -> ResultType<()> { /// Open the data stream for the current file.
/// Returns Ok(true) if job is done, Ok(false) otherwise.
async fn open_data_stream(&mut self) -> ResultType<bool> {
let file_num = self.file_num as usize; let file_num = self.file_num as usize;
match &mut self.data_source { match &mut self.data_source {
DataSource::FilePath(p) => { DataSource::FilePath(p) => {
if file_num >= self.files.len() { if file_num >= self.files.len() {
// job done // job done
self.data_stream.take(); self.data_stream.take();
return Ok(()); return Ok(true);
}; };
if self.data_stream.is_none() { if self.data_stream.is_none() {
match File::open(Self::join(p, &self.files[file_num].name)).await { match File::open(Self::join(p, &self.files[file_num].name)).await {
@@ -717,6 +719,8 @@ impl TransferJob {
self.file_confirmed = false; self.file_confirmed = false;
self.file_is_waiting = false; self.file_is_waiting = false;
} }
// On open error, behave the same as validation failure: advance
// to next file and return the error.
Err(err) => { Err(err) => {
self.file_num += 1; self.file_num += 1;
self.file_confirmed = false; self.file_confirmed = false;
@@ -734,17 +738,58 @@ impl TransferJob {
} }
} }
} }
if self.r#type == JobType::Generic { Ok(false)
if self.enable_overwrite_detection && !self.file_confirmed() { }
if !self.file_is_waiting() {
self.send_current_digest(stream).await?; /// Get current file's digest (last_modified, file_size) for overwrite detection.
self.set_file_is_waiting(true); async fn get_current_digest(&self) -> ResultType<(u64, u64)> {
} let meta = match self.data_stream.as_ref().ok_or(anyhow!("file is None"))? {
} DataStream::FileStream(file) => file.metadata().await?,
DataStream::BufStream(_) => bail!("No digest for buf stream"),
};
let last_modified = meta
.modified()?
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
Ok((last_modified, meta.len()))
}
async fn init_data_stream(&mut self, stream: &mut crate::Stream) -> ResultType<()> {
if self.open_data_stream().await? {
return Ok(());
}
if self.r#type == JobType::Generic
&& self.enable_overwrite_detection
&& !self.file_confirmed()
&& !self.file_is_waiting()
{
self.send_current_digest(stream).await?;
self.set_file_is_waiting(true);
} }
Ok(()) Ok(())
} }
/// Initialize data stream for CM (Connection Manager) scenario.
/// Returns digest info (last_modified, file_size) if overwrite detection is enabled,
/// so caller can send it via IPC instead of network stream.
/// Returns Ok(None) if job is done or already initialized.
pub async fn init_data_stream_for_cm(&mut self) -> ResultType<Option<(u64, u64)>> {
if self.open_data_stream().await? {
return Ok(None);
}
// For overwrite detection, return digest info instead of sending via stream
if self.r#type == JobType::Generic
&& self.enable_overwrite_detection
&& !self.file_confirmed()
&& !self.file_is_waiting()
{
let digest = self.get_current_digest().await?;
self.set_file_is_waiting(true);
return Ok(Some(digest));
}
Ok(None)
}
pub async fn read(&mut self) -> ResultType<Option<FileTransferBlock>> { pub async fn read(&mut self) -> ResultType<Option<FileTransferBlock>> {
if self.r#type == JobType::Generic { if self.r#type == JobType::Generic {
if self.enable_overwrite_detection && !self.file_confirmed() { if self.enable_overwrite_detection && !self.file_confirmed() {
@@ -753,19 +798,22 @@ impl TransferJob {
} }
let file_num = self.file_num as usize; let file_num = self.file_num as usize;
let name: &str; let name = match &self.data_source {
match &mut self.data_source { DataSource::FilePath(p) => {
DataSource::FilePath(..) => {
if file_num >= self.files.len() { if file_num >= self.files.len() {
self.data_stream.take(); self.data_stream.take();
return Ok(None); return Ok(None);
}; };
name = &self.files[file_num].name; if self.files.len() == 1 && self.files[file_num].name.is_empty() {
p.file_name()
.map(|p| p.to_str().unwrap_or(""))
.unwrap_or("")
} else {
&self.files[file_num].name
}
} }
DataSource::MemoryCursor(..) => { DataSource::MemoryCursor(..) => "",
name = ""; };
}
}
const BUF_SIZE: usize = 128 * 1024; const BUF_SIZE: usize = 128 * 1024;
let mut buf: Vec<u8> = vec![0; BUF_SIZE]; let mut buf: Vec<u8> = vec![0; BUF_SIZE];
let mut compressed = false; let mut compressed = false;
@@ -825,21 +873,14 @@ impl TransferJob {
// Only for generic job and file stream // Only for generic job and file stream
async fn send_current_digest(&mut self, stream: &mut Stream) -> ResultType<()> { async fn send_current_digest(&mut self, stream: &mut Stream) -> ResultType<()> {
let (last_modified, file_size) = self.get_current_digest().await?;
let mut msg = Message::new(); let mut msg = Message::new();
let mut resp = FileResponse::new(); let mut resp = FileResponse::new();
let meta = match self.data_stream.as_ref().ok_or(anyhow!("file is None"))? {
DataStream::FileStream(file) => file.metadata().await?,
DataStream::BufStream(_) => bail!("No need to send digest for buf stream"),
};
let last_modified = meta
.modified()?
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
resp.set_digest(FileTransferDigest { resp.set_digest(FileTransferDigest {
id: self.id, id: self.id,
file_num: self.file_num, file_num: self.file_num,
last_modified, last_modified,
file_size: meta.len(), file_size,
is_resume: self.is_resume, is_resume: self.is_resume,
..Default::default() ..Default::default()
}); });