diff --git a/protos/message.proto b/protos/message.proto index 593fbe3..13d9fba 100644 --- a/protos/message.proto +++ b/protos/message.proto @@ -437,6 +437,7 @@ message FileTransferDigest { uint64 file_size = 4; bool is_upload = 5; bool is_identical = 6; + uint64 transferred_size = 7; // for resume transfer, indicates the size of the file already transferred } message FileTransferBlock { diff --git a/src/fs.rs b/src/fs.rs index 4401a97..2e7f69a 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -11,8 +11,8 @@ use std::{ use serde_derive::{Deserialize, Serialize}; use serde_json::json; use tokio::{ - fs::File, - io::{AsyncReadExt, AsyncWriteExt, BufStream as TokioBufStream}, + fs::{File, OpenOptions}, + io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufStream as TokioBufStream}, }; use crate::{anyhow::anyhow, bail, get_version_number, message_proto::*, ResultType, Stream}; @@ -379,6 +379,12 @@ impl DataStream { } } +#[derive(Default, Serialize, Deserialize, Debug)] +pub struct FileDigest { + pub size: u64, + pub modified: u64, +} + #[derive(Default, Serialize, Debug)] #[serde(rename_all = "camelCase")] pub struct TransferJob { @@ -389,6 +395,7 @@ pub struct TransferJob { pub show_hidden: bool, pub is_remote: bool, pub is_last_job: bool, + pub is_resume: bool, pub file_num: i32, #[serde(skip_serializing)] pub files: Vec, @@ -405,6 +412,8 @@ pub struct TransferJob { file_skipped: bool, file_is_waiting: bool, default_overwrite_strategy: Option, + #[serde(skip_serializing)] + digest: FileDigest, } #[derive(Debug, Default, Serialize, Deserialize, Clone)] @@ -533,6 +542,12 @@ impl TransferJob { self.files = files; } + #[inline] + pub fn set_digest(&mut self, size: u64, modified: u64) { + self.digest.size = size; + self.digest.modified = modified; + } + #[inline] pub fn id(&self) -> i32 { self.id @@ -568,6 +583,8 @@ impl TransferJob { let entry = &self.files[file_num]; let path = Self::join(p, &entry.name); let download_path = format!("{}.download", get_string(&path)); + let digest_path = format!("{}.digest", get_string(&path)); + std::fs::remove_file(digest_path).ok(); std::fs::rename(download_path, &path).ok(); filetime::set_file_mtime( &path, @@ -588,7 +605,9 @@ impl TransferJob { let entry = &self.files[file_num]; let path = Self::join(p, &entry.name); let download_path = format!("{}.download", get_string(&path)); + let digest_path = format!("{}.digest", get_string(&path)); std::fs::remove_file(download_path).ok(); + std::fs::remove_file(digest_path).ok(); } } } @@ -610,16 +629,28 @@ impl TransferJob { } self.file_num = block.file_num; let entry = &self.files[file_num]; - let path = if self.r#type == JobType::Printer { - p.to_string_lossy().to_string() + let (path, digest_path) = if self.r#type == JobType::Printer { + (p.to_string_lossy().to_string(), None) } else { let path = Self::join(p, &entry.name); if let Some(pp) = path.parent() { std::fs::create_dir_all(pp).ok(); } - format!("{}.download", get_string(&path)) + let file_path = get_string(&path); + ( + format!("{}.download", &file_path), + Some(format!("{}.digest", &file_path)), + ) }; + if let Some(dp) = digest_path.as_ref() { + if Path::new(dp).exists() { + std::fs::remove_file(dp)?; + } + } self.data_stream = Some(DataStream::FileStream(File::create(&path).await?)); + if let Some(dp) = digest_path.as_ref() { + std::fs::write(dp, json!(self.digest).to_string()).ok(); + } } } DataSource::MemoryCursor(c) => { @@ -859,7 +890,54 @@ impl TransferJob { true } - pub fn confirm(&mut self, r: &FileTransferSendConfirmRequest) -> bool { + async fn set_stream_offset(&mut self, file_num: usize, offset: u64) { + if let DataSource::FilePath(p) = &self.data_source { + let entry = &self.files[file_num]; + let path = Self::join(p, &entry.name); + let file_path = get_string(&path); + let download_path = format!("{}.download", &file_path); + let digest_path = format!("{}.digest", &file_path); + + let mut f = if Path::new(&download_path).exists() && Path::new(&digest_path).exists() { + // If both download and digest files exist, seek (writer) to the offset + match OpenOptions::new() + .create(true) + .write(true) + .open(&download_path) + .await + { + Ok(f) => f, + Err(e) => { + log::warn!("Failed to open file {}: {}", download_path, e); + return; + } + } + } else if Path::new(&file_path).exists() { + // If `file_path` exists, seek (reader) to the offset + match File::open(&file_path).await { + Ok(f) => f, + Err(e) => { + log::warn!("Failed to open file {}: {}", file_path, e); + return; + } + } + } else { + log::warn!( + "File {} not found, cannot seek to offset {}", + file_path, + offset + ); + return; + }; + if f.seek(std::io::SeekFrom::Start(offset)).await.is_ok() { + self.data_stream = Some(DataStream::FileStream(f)); + self.transferred += offset; + self.finished_size += offset; + } + } + } + + pub async fn confirm(&mut self, r: &FileTransferSendConfirmRequest) -> bool { if self.file_num() != r.file_num { log::info!("file num truncated, ignoring"); } else { @@ -871,8 +949,13 @@ impl TransferJob { self.set_file_confirmed(true); } } - Some(file_transfer_send_confirm_request::Union::OffsetBlk(_offset)) => { + Some(file_transfer_send_confirm_request::Union::OffsetBlk(offset)) => { self.set_file_confirmed(true); + // If offset is greater than 0, we need to seek to the offset + if offset > 0 { + self.set_stream_offset(r.file_num as usize, offset as u64) + .await; + } } _ => {} } @@ -1052,6 +1135,8 @@ pub async fn handle_read_jobs( } } } + // Break to handle jobs one by one. + break; } for id in finished { let _ = remove_job(id, jobs); @@ -1122,6 +1207,37 @@ pub fn is_write_need_confirmation( digest: &FileTransferDigest, ) -> ResultType { let path = Path::new(file_path); + let digest_file = format!("{}.digest", file_path); + let download_file = format!("{}.download", file_path); + if Path::new(&digest_file).exists() && Path::new(&download_file).exists() { + // If the digest file exists, it means the file was transferred before. + // We can use the digest file to check whether the file is the same. + if let Ok(content) = std::fs::read_to_string(digest_file) { + if let Ok(local_digest) = serde_json::from_str::(&content) { + let is_identical = local_digest.modified == digest.last_modified + && local_digest.size == digest.file_size; + if is_identical { + if let Ok(download_metadata) = std::fs::metadata(download_file) { + // Get the file size of the local file + // Only send confirmation if the file is not empty. + let transferred_size = download_metadata.len(); + if transferred_size > 0 { + return Ok(DigestCheckResult::NeedConfirm(FileTransferDigest { + id: digest.id, + file_num: digest.file_num, + last_modified: digest.last_modified, + file_size: digest.file_size, + is_identical, + transferred_size, + ..Default::default() + })); + } + } + } + } + } + } + if path.exists() && path.is_file() { let metadata = std::fs::metadata(path)?; let modified_time = metadata.modified()?; @@ -1143,6 +1259,7 @@ pub fn is_write_need_confirmation( ..Default::default() })) } else { + // If the file does not exist, or the digest file and download file do not exist, we return NoSuchFile. Ok(DigestCheckResult::NoSuchFile) } }