feat: file transfer, resume

Signed-off-by: fufesou <linlong1266@gmail.com>
This commit is contained in:
fufesou 2025-08-06 10:40:35 +08:00
parent 57c8a23ab9
commit 215b0e7700
2 changed files with 125 additions and 7 deletions

View File

@ -437,6 +437,7 @@ message FileTransferDigest {
uint64 file_size = 4; uint64 file_size = 4;
bool is_upload = 5; bool is_upload = 5;
bool is_identical = 6; bool is_identical = 6;
uint64 transferred_size = 7; // for resume transfer, indicates the size of the file already transferred
} }
message FileTransferBlock { message FileTransferBlock {

131
src/fs.rs
View File

@ -11,8 +11,8 @@ use std::{
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use tokio::{ use tokio::{
fs::File, fs::{File, OpenOptions},
io::{AsyncReadExt, AsyncWriteExt, BufStream as TokioBufStream}, io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufStream as TokioBufStream},
}; };
use crate::{anyhow::anyhow, bail, get_version_number, message_proto::*, ResultType, Stream}; use crate::{anyhow::anyhow, bail, get_version_number, message_proto::*, ResultType, Stream};
@ -379,6 +379,12 @@ impl DataStream {
} }
} }
#[derive(Default, Serialize, Deserialize, Debug)]
pub struct FileDigest {
pub size: u64,
pub modified: u64,
}
#[derive(Default, Serialize, Debug)] #[derive(Default, Serialize, Debug)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct TransferJob { pub struct TransferJob {
@ -389,6 +395,7 @@ pub struct TransferJob {
pub show_hidden: bool, pub show_hidden: bool,
pub is_remote: bool, pub is_remote: bool,
pub is_last_job: bool, pub is_last_job: bool,
pub is_resume: bool,
pub file_num: i32, pub file_num: i32,
#[serde(skip_serializing)] #[serde(skip_serializing)]
pub files: Vec<FileEntry>, pub files: Vec<FileEntry>,
@ -405,6 +412,8 @@ pub struct TransferJob {
file_skipped: bool, file_skipped: bool,
file_is_waiting: bool, file_is_waiting: bool,
default_overwrite_strategy: Option<bool>, default_overwrite_strategy: Option<bool>,
#[serde(skip_serializing)]
digest: FileDigest,
} }
#[derive(Debug, Default, Serialize, Deserialize, Clone)] #[derive(Debug, Default, Serialize, Deserialize, Clone)]
@ -533,6 +542,12 @@ impl TransferJob {
self.files = files; self.files = files;
} }
#[inline]
pub fn set_digest(&mut self, size: u64, modified: u64) {
self.digest.size = size;
self.digest.modified = modified;
}
#[inline] #[inline]
pub fn id(&self) -> i32 { pub fn id(&self) -> i32 {
self.id self.id
@ -568,6 +583,8 @@ impl TransferJob {
let entry = &self.files[file_num]; let entry = &self.files[file_num];
let path = Self::join(p, &entry.name); let path = Self::join(p, &entry.name);
let download_path = format!("{}.download", get_string(&path)); let download_path = format!("{}.download", get_string(&path));
let digest_path = format!("{}.digest", get_string(&path));
std::fs::remove_file(digest_path).ok();
std::fs::rename(download_path, &path).ok(); std::fs::rename(download_path, &path).ok();
filetime::set_file_mtime( filetime::set_file_mtime(
&path, &path,
@ -588,7 +605,9 @@ impl TransferJob {
let entry = &self.files[file_num]; let entry = &self.files[file_num];
let path = Self::join(p, &entry.name); let path = Self::join(p, &entry.name);
let download_path = format!("{}.download", get_string(&path)); let download_path = format!("{}.download", get_string(&path));
let digest_path = format!("{}.digest", get_string(&path));
std::fs::remove_file(download_path).ok(); std::fs::remove_file(download_path).ok();
std::fs::remove_file(digest_path).ok();
} }
} }
} }
@ -610,16 +629,28 @@ impl TransferJob {
} }
self.file_num = block.file_num; self.file_num = block.file_num;
let entry = &self.files[file_num]; let entry = &self.files[file_num];
let path = if self.r#type == JobType::Printer { let (path, digest_path) = if self.r#type == JobType::Printer {
p.to_string_lossy().to_string() (p.to_string_lossy().to_string(), None)
} else { } else {
let path = Self::join(p, &entry.name); let path = Self::join(p, &entry.name);
if let Some(pp) = path.parent() { if let Some(pp) = path.parent() {
std::fs::create_dir_all(pp).ok(); std::fs::create_dir_all(pp).ok();
} }
format!("{}.download", get_string(&path)) let file_path = get_string(&path);
(
format!("{}.download", &file_path),
Some(format!("{}.digest", &file_path)),
)
}; };
if let Some(dp) = digest_path.as_ref() {
if Path::new(dp).exists() {
std::fs::remove_file(dp)?;
}
}
self.data_stream = Some(DataStream::FileStream(File::create(&path).await?)); self.data_stream = Some(DataStream::FileStream(File::create(&path).await?));
if let Some(dp) = digest_path.as_ref() {
std::fs::write(dp, json!(self.digest).to_string()).ok();
}
} }
} }
DataSource::MemoryCursor(c) => { DataSource::MemoryCursor(c) => {
@ -859,7 +890,54 @@ impl TransferJob {
true true
} }
pub fn confirm(&mut self, r: &FileTransferSendConfirmRequest) -> bool { async fn set_stream_offset(&mut self, file_num: usize, offset: u64) {
if let DataSource::FilePath(p) = &self.data_source {
let entry = &self.files[file_num];
let path = Self::join(p, &entry.name);
let file_path = get_string(&path);
let download_path = format!("{}.download", &file_path);
let digest_path = format!("{}.digest", &file_path);
let mut f = if Path::new(&download_path).exists() && Path::new(&digest_path).exists() {
// If both download and digest files exist, seek (writer) to the offset
match OpenOptions::new()
.create(true)
.write(true)
.open(&download_path)
.await
{
Ok(f) => f,
Err(e) => {
log::warn!("Failed to open file {}: {}", download_path, e);
return;
}
}
} else if Path::new(&file_path).exists() {
// If `file_path` exists, seek (reader) to the offset
match File::open(&file_path).await {
Ok(f) => f,
Err(e) => {
log::warn!("Failed to open file {}: {}", file_path, e);
return;
}
}
} else {
log::warn!(
"File {} not found, cannot seek to offset {}",
file_path,
offset
);
return;
};
if f.seek(std::io::SeekFrom::Start(offset)).await.is_ok() {
self.data_stream = Some(DataStream::FileStream(f));
self.transferred += offset;
self.finished_size += offset;
}
}
}
pub async fn confirm(&mut self, r: &FileTransferSendConfirmRequest) -> bool {
if self.file_num() != r.file_num { if self.file_num() != r.file_num {
log::info!("file num truncated, ignoring"); log::info!("file num truncated, ignoring");
} else { } else {
@ -871,8 +949,13 @@ impl TransferJob {
self.set_file_confirmed(true); self.set_file_confirmed(true);
} }
} }
Some(file_transfer_send_confirm_request::Union::OffsetBlk(_offset)) => { Some(file_transfer_send_confirm_request::Union::OffsetBlk(offset)) => {
self.set_file_confirmed(true); self.set_file_confirmed(true);
// If offset is greater than 0, we need to seek to the offset
if offset > 0 {
self.set_stream_offset(r.file_num as usize, offset as u64)
.await;
}
} }
_ => {} _ => {}
} }
@ -1052,6 +1135,8 @@ pub async fn handle_read_jobs(
} }
} }
} }
// Break to handle jobs one by one.
break;
} }
for id in finished { for id in finished {
let _ = remove_job(id, jobs); let _ = remove_job(id, jobs);
@ -1122,6 +1207,37 @@ pub fn is_write_need_confirmation(
digest: &FileTransferDigest, digest: &FileTransferDigest,
) -> ResultType<DigestCheckResult> { ) -> ResultType<DigestCheckResult> {
let path = Path::new(file_path); let path = Path::new(file_path);
let digest_file = format!("{}.digest", file_path);
let download_file = format!("{}.download", file_path);
if Path::new(&digest_file).exists() && Path::new(&download_file).exists() {
// If the digest file exists, it means the file was transferred before.
// We can use the digest file to check whether the file is the same.
if let Ok(content) = std::fs::read_to_string(digest_file) {
if let Ok(local_digest) = serde_json::from_str::<FileDigest>(&content) {
let is_identical = local_digest.modified == digest.last_modified
&& local_digest.size == digest.file_size;
if is_identical {
if let Ok(download_metadata) = std::fs::metadata(download_file) {
// Get the file size of the local file
// Only send confirmation if the file is not empty.
let transferred_size = download_metadata.len();
if transferred_size > 0 {
return Ok(DigestCheckResult::NeedConfirm(FileTransferDigest {
id: digest.id,
file_num: digest.file_num,
last_modified: digest.last_modified,
file_size: digest.file_size,
is_identical,
transferred_size,
..Default::default()
}));
}
}
}
}
}
}
if path.exists() && path.is_file() { if path.exists() && path.is_file() {
let metadata = std::fs::metadata(path)?; let metadata = std::fs::metadata(path)?;
let modified_time = metadata.modified()?; let modified_time = metadata.modified()?;
@ -1143,6 +1259,7 @@ pub fn is_write_need_confirmation(
..Default::default() ..Default::default()
})) }))
} else { } else {
// If the file does not exist, or the digest file and download file do not exist, we return NoSuchFile.
Ok(DigestCheckResult::NoSuchFile) Ok(DigestCheckResult::NoSuchFile)
} }
} }