mirror of
https://github.com/rustdesk/hbb_common.git
synced 2025-08-16 00:25:39 +00:00
Merge pull request #313 from fufesou/feat/file_transfer_resume
feat: file transfer, resume
This commit is contained in:
commit
32fed54062
@ -437,6 +437,7 @@ message FileTransferDigest {
|
||||
uint64 file_size = 4;
|
||||
bool is_upload = 5;
|
||||
bool is_identical = 6;
|
||||
uint64 transferred_size = 7; // for resume transfer, indicates the size of the file already transferred
|
||||
}
|
||||
|
||||
message FileTransferBlock {
|
||||
|
131
src/fs.rs
131
src/fs.rs
@ -11,8 +11,8 @@ use std::{
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use tokio::{
|
||||
fs::File,
|
||||
io::{AsyncReadExt, AsyncWriteExt, BufStream as TokioBufStream},
|
||||
fs::{File, OpenOptions},
|
||||
io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufStream as TokioBufStream},
|
||||
};
|
||||
|
||||
use crate::{anyhow::anyhow, bail, get_version_number, message_proto::*, ResultType, Stream};
|
||||
@ -379,6 +379,12 @@ impl DataStream {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Serialize, Deserialize, Debug)]
|
||||
pub struct FileDigest {
|
||||
pub size: u64,
|
||||
pub modified: u64,
|
||||
}
|
||||
|
||||
#[derive(Default, Serialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct TransferJob {
|
||||
@ -389,6 +395,7 @@ pub struct TransferJob {
|
||||
pub show_hidden: bool,
|
||||
pub is_remote: bool,
|
||||
pub is_last_job: bool,
|
||||
pub is_resume: bool,
|
||||
pub file_num: i32,
|
||||
#[serde(skip_serializing)]
|
||||
pub files: Vec<FileEntry>,
|
||||
@ -405,6 +412,8 @@ pub struct TransferJob {
|
||||
file_skipped: bool,
|
||||
file_is_waiting: bool,
|
||||
default_overwrite_strategy: Option<bool>,
|
||||
#[serde(skip_serializing)]
|
||||
digest: FileDigest,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
|
||||
@ -533,6 +542,12 @@ impl TransferJob {
|
||||
self.files = files;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn set_digest(&mut self, size: u64, modified: u64) {
|
||||
self.digest.size = size;
|
||||
self.digest.modified = modified;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn id(&self) -> i32 {
|
||||
self.id
|
||||
@ -568,6 +583,8 @@ impl TransferJob {
|
||||
let entry = &self.files[file_num];
|
||||
let path = Self::join(p, &entry.name);
|
||||
let download_path = format!("{}.download", get_string(&path));
|
||||
let digest_path = format!("{}.digest", get_string(&path));
|
||||
std::fs::remove_file(digest_path).ok();
|
||||
std::fs::rename(download_path, &path).ok();
|
||||
filetime::set_file_mtime(
|
||||
&path,
|
||||
@ -588,7 +605,9 @@ impl TransferJob {
|
||||
let entry = &self.files[file_num];
|
||||
let path = Self::join(p, &entry.name);
|
||||
let download_path = format!("{}.download", get_string(&path));
|
||||
let digest_path = format!("{}.digest", get_string(&path));
|
||||
std::fs::remove_file(download_path).ok();
|
||||
std::fs::remove_file(digest_path).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -610,16 +629,28 @@ impl TransferJob {
|
||||
}
|
||||
self.file_num = block.file_num;
|
||||
let entry = &self.files[file_num];
|
||||
let path = if self.r#type == JobType::Printer {
|
||||
p.to_string_lossy().to_string()
|
||||
let (path, digest_path) = if self.r#type == JobType::Printer {
|
||||
(p.to_string_lossy().to_string(), None)
|
||||
} else {
|
||||
let path = Self::join(p, &entry.name);
|
||||
if let Some(pp) = path.parent() {
|
||||
std::fs::create_dir_all(pp).ok();
|
||||
}
|
||||
format!("{}.download", get_string(&path))
|
||||
let file_path = get_string(&path);
|
||||
(
|
||||
format!("{}.download", &file_path),
|
||||
Some(format!("{}.digest", &file_path)),
|
||||
)
|
||||
};
|
||||
if let Some(dp) = digest_path.as_ref() {
|
||||
if Path::new(dp).exists() {
|
||||
std::fs::remove_file(dp)?;
|
||||
}
|
||||
}
|
||||
self.data_stream = Some(DataStream::FileStream(File::create(&path).await?));
|
||||
if let Some(dp) = digest_path.as_ref() {
|
||||
std::fs::write(dp, json!(self.digest).to_string()).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
DataSource::MemoryCursor(c) => {
|
||||
@ -859,7 +890,54 @@ impl TransferJob {
|
||||
true
|
||||
}
|
||||
|
||||
pub fn confirm(&mut self, r: &FileTransferSendConfirmRequest) -> bool {
|
||||
async fn set_stream_offset(&mut self, file_num: usize, offset: u64) {
|
||||
if let DataSource::FilePath(p) = &self.data_source {
|
||||
let entry = &self.files[file_num];
|
||||
let path = Self::join(p, &entry.name);
|
||||
let file_path = get_string(&path);
|
||||
let download_path = format!("{}.download", &file_path);
|
||||
let digest_path = format!("{}.digest", &file_path);
|
||||
|
||||
let mut f = if Path::new(&download_path).exists() && Path::new(&digest_path).exists() {
|
||||
// If both download and digest files exist, seek (writer) to the offset
|
||||
match OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.open(&download_path)
|
||||
.await
|
||||
{
|
||||
Ok(f) => f,
|
||||
Err(e) => {
|
||||
log::warn!("Failed to open file {}: {}", download_path, e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else if Path::new(&file_path).exists() {
|
||||
// If `file_path` exists, seek (reader) to the offset
|
||||
match File::open(&file_path).await {
|
||||
Ok(f) => f,
|
||||
Err(e) => {
|
||||
log::warn!("Failed to open file {}: {}", file_path, e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log::warn!(
|
||||
"File {} not found, cannot seek to offset {}",
|
||||
file_path,
|
||||
offset
|
||||
);
|
||||
return;
|
||||
};
|
||||
if f.seek(std::io::SeekFrom::Start(offset)).await.is_ok() {
|
||||
self.data_stream = Some(DataStream::FileStream(f));
|
||||
self.transferred += offset;
|
||||
self.finished_size += offset;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn confirm(&mut self, r: &FileTransferSendConfirmRequest) -> bool {
|
||||
if self.file_num() != r.file_num {
|
||||
log::info!("file num truncated, ignoring");
|
||||
} else {
|
||||
@ -871,8 +949,13 @@ impl TransferJob {
|
||||
self.set_file_confirmed(true);
|
||||
}
|
||||
}
|
||||
Some(file_transfer_send_confirm_request::Union::OffsetBlk(_offset)) => {
|
||||
Some(file_transfer_send_confirm_request::Union::OffsetBlk(offset)) => {
|
||||
self.set_file_confirmed(true);
|
||||
// If offset is greater than 0, we need to seek to the offset
|
||||
if offset > 0 {
|
||||
self.set_stream_offset(r.file_num as usize, offset as u64)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
@ -1052,6 +1135,8 @@ pub async fn handle_read_jobs(
|
||||
}
|
||||
}
|
||||
}
|
||||
// Break to handle jobs one by one.
|
||||
break;
|
||||
}
|
||||
for id in finished {
|
||||
let _ = remove_job(id, jobs);
|
||||
@ -1122,6 +1207,37 @@ pub fn is_write_need_confirmation(
|
||||
digest: &FileTransferDigest,
|
||||
) -> ResultType<DigestCheckResult> {
|
||||
let path = Path::new(file_path);
|
||||
let digest_file = format!("{}.digest", file_path);
|
||||
let download_file = format!("{}.download", file_path);
|
||||
if Path::new(&digest_file).exists() && Path::new(&download_file).exists() {
|
||||
// If the digest file exists, it means the file was transferred before.
|
||||
// We can use the digest file to check whether the file is the same.
|
||||
if let Ok(content) = std::fs::read_to_string(digest_file) {
|
||||
if let Ok(local_digest) = serde_json::from_str::<FileDigest>(&content) {
|
||||
let is_identical = local_digest.modified == digest.last_modified
|
||||
&& local_digest.size == digest.file_size;
|
||||
if is_identical {
|
||||
if let Ok(download_metadata) = std::fs::metadata(download_file) {
|
||||
// Get the file size of the local file
|
||||
// Only send confirmation if the file is not empty.
|
||||
let transferred_size = download_metadata.len();
|
||||
if transferred_size > 0 {
|
||||
return Ok(DigestCheckResult::NeedConfirm(FileTransferDigest {
|
||||
id: digest.id,
|
||||
file_num: digest.file_num,
|
||||
last_modified: digest.last_modified,
|
||||
file_size: digest.file_size,
|
||||
is_identical,
|
||||
transferred_size,
|
||||
..Default::default()
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if path.exists() && path.is_file() {
|
||||
let metadata = std::fs::metadata(path)?;
|
||||
let modified_time = metadata.modified()?;
|
||||
@ -1143,6 +1259,7 @@ pub fn is_write_need_confirmation(
|
||||
..Default::default()
|
||||
}))
|
||||
} else {
|
||||
// If the file does not exist, or the digest file and download file do not exist, we return NoSuchFile.
|
||||
Ok(DigestCheckResult::NoSuchFile)
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user