#[cfg(windows)] use std::os::windows::prelude::*; use std::{ fmt::{Debug, Display}, io::Cursor, path::{Path, PathBuf}, sync::atomic::{AtomicI32, Ordering}, time::{Duration, SystemTime, UNIX_EPOCH}, }; use serde_derive::{Deserialize, Serialize}; use serde_json::json; use tokio::{ fs::{File, OpenOptions}, io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufStream as TokioBufStream}, }; use crate::{anyhow::anyhow, bail, get_version_number, message_proto::*, ResultType, Stream}; // https://doc.rust-lang.org/std/os/windows/fs/trait.MetadataExt.html use crate::{ compress::{compress, decompress}, config::Config, }; static NEXT_JOB_ID: AtomicI32 = AtomicI32::new(1); pub fn get_next_job_id() -> i32 { NEXT_JOB_ID.fetch_add(1, Ordering::SeqCst) } pub fn update_next_job_id(id: i32) { NEXT_JOB_ID.store(id, Ordering::SeqCst); } pub fn read_dir(path: &Path, include_hidden: bool) -> ResultType { let mut dir = FileDirectory { path: get_string(path), ..Default::default() }; #[cfg(windows)] if "/" == &get_string(path) { let drives = unsafe { winapi::um::fileapi::GetLogicalDrives() }; for i in 0..32 { if drives & (1 << i) != 0 { let name = format!( "{}:", std::char::from_u32('A' as u32 + i as u32).unwrap_or('A') ); dir.entries.push(FileEntry { name, entry_type: FileType::DirDrive.into(), ..Default::default() }); } } return Ok(dir); } for entry in path.read_dir()?.flatten() { let p = entry.path(); let name = p .file_name() .map(|p| p.to_str().unwrap_or("")) .unwrap_or("") .to_owned(); if name.is_empty() { continue; } let mut is_hidden = false; let meta; if let Ok(tmp) = std::fs::symlink_metadata(&p) { meta = tmp; } else { continue; } // docs.microsoft.com/en-us/windows/win32/fileio/file-attribute-constants #[cfg(windows)] if meta.file_attributes() & 0x2 != 0 { is_hidden = true; } #[cfg(not(windows))] if name.find('.').unwrap_or(usize::MAX) == 0 { is_hidden = true; } if is_hidden && !include_hidden { continue; } let (entry_type, size) = { if p.is_dir() { if meta.file_type().is_symlink() { (FileType::DirLink.into(), 0) } else { (FileType::Dir.into(), 0) } } else if meta.file_type().is_symlink() { (FileType::FileLink.into(), 0) } else { (FileType::File.into(), meta.len()) } }; let modified_time = meta .modified() .map(|x| { x.duration_since(std::time::SystemTime::UNIX_EPOCH) .map(|x| x.as_secs()) .unwrap_or(0) }) .unwrap_or(0); dir.entries.push(FileEntry { name: get_file_name(&p), entry_type, is_hidden, size, modified_time, ..Default::default() }); } Ok(dir) } #[inline] pub fn get_file_name(p: &Path) -> String { p.file_name() .map(|p| p.to_str().unwrap_or("")) .unwrap_or("") .to_owned() } #[inline] pub fn get_string(path: &Path) -> String { path.to_str().unwrap_or("").to_owned() } #[inline] pub fn get_path(path: &str) -> PathBuf { Path::new(path).to_path_buf() } #[inline] pub fn get_home_as_string() -> String { get_string(&Config::get_home()) } fn read_dir_recursive( path: &Path, prefix: &Path, include_hidden: bool, ) -> ResultType> { let mut files = Vec::new(); if path.is_dir() { // to-do: symbol link handling, cp the link rather than the content // to-do: file mode, for unix let fd = read_dir(path, include_hidden)?; for entry in fd.entries.iter() { match entry.entry_type.enum_value() { Ok(FileType::File) => { let mut entry = entry.clone(); entry.name = get_string(&prefix.join(entry.name)); files.push(entry); } Ok(FileType::Dir) => { if let Ok(mut tmp) = read_dir_recursive( &path.join(&entry.name), &prefix.join(&entry.name), include_hidden, ) { for entry in tmp.drain(0..) { files.push(entry); } } } _ => {} } } Ok(files) } else if path.is_file() { let (size, modified_time) = if let Ok(meta) = std::fs::metadata(path) { ( meta.len(), meta.modified() .map(|x| { x.duration_since(std::time::SystemTime::UNIX_EPOCH) .map(|x| x.as_secs()) .unwrap_or(0) }) .unwrap_or(0), ) } else { (0, 0) }; files.push(FileEntry { entry_type: FileType::File.into(), size, modified_time, ..Default::default() }); Ok(files) } else { bail!("Not exists"); } } pub fn get_recursive_files(path: &str, include_hidden: bool) -> ResultType> { read_dir_recursive(&get_path(path), &get_path(""), include_hidden) } fn read_empty_dirs_recursive( path: &Path, prefix: &Path, include_hidden: bool, ) -> ResultType> { let mut dirs = Vec::new(); if path.is_dir() { // to-do: symbol link handling, cp the link rather than the content // to-do: file mode, for unix let fd = read_dir(path, include_hidden)?; if fd.entries.is_empty() { dirs.push(fd); } else { for entry in fd.entries.iter() { match entry.entry_type.enum_value() { Ok(FileType::Dir) => { if let Ok(mut tmp) = read_empty_dirs_recursive( &path.join(&entry.name), &prefix.join(&entry.name), include_hidden, ) { for entry in tmp.drain(0..) { dirs.push(entry); } } } _ => {} } } } Ok(dirs) } else if path.is_file() { Ok(dirs) } else { bail!("Not exists"); } } pub fn get_empty_dirs_recursive( path: &str, include_hidden: bool, ) -> ResultType> { read_empty_dirs_recursive(&get_path(path), &get_path(""), include_hidden) } #[inline] pub fn is_file_exists(file_path: &str) -> bool { return Path::new(file_path).exists(); } #[inline] pub fn can_enable_overwrite_detection(version: i64) -> bool { version >= get_version_number("1.1.10") } #[repr(i32)] #[derive(Copy, Clone, Serialize, Debug, PartialEq)] pub enum JobType { Generic = 0, Printer = 1, } impl Default for JobType { fn default() -> Self { JobType::Generic } } impl From for file_transfer_send_request::FileType { fn from(t: JobType) -> Self { match t { JobType::Generic => file_transfer_send_request::FileType::Generic, JobType::Printer => file_transfer_send_request::FileType::Printer, } } } impl From for JobType { fn from(value: i32) -> Self { match value { 0 => JobType::Generic, 1 => JobType::Printer, _ => JobType::Generic, } } } impl Into for JobType { fn into(self) -> i32 { self as i32 } } impl JobType { pub fn from_proto(t: ::protobuf::EnumOrUnknown) -> Self { match t.enum_value() { Ok(file_transfer_send_request::FileType::Generic) => JobType::Generic, Ok(file_transfer_send_request::FileType::Printer) => JobType::Printer, _ => JobType::Generic, } } } #[derive(Debug)] pub enum DataSource { FilePath(PathBuf), MemoryCursor(Cursor>), } impl Default for DataSource { fn default() -> Self { DataSource::FilePath(PathBuf::new()) } } impl serde::Serialize for DataSource { fn serialize(&self, serializer: S) -> std::result::Result where S: serde::Serializer, { match self { DataSource::FilePath(p) => serializer.serialize_str(p.to_str().unwrap_or("")), DataSource::MemoryCursor(_) => serializer.serialize_str(""), } } } impl Display for DataSource { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { DataSource::FilePath(p) => write!(f, "File: {}", p.to_string_lossy().to_string()), DataSource::MemoryCursor(_) => write!(f, "Bytes"), } } } impl DataSource { fn to_meta(&self) -> String { match self { DataSource::FilePath(p) => p.to_string_lossy().to_string(), DataSource::MemoryCursor(_) => "".to_string(), } } } enum DataStream { FileStream(File), BufStream(TokioBufStream>>), } impl Debug for DataStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { DataStream::FileStream(fs) => write!(f, "{:?}", fs), DataStream::BufStream(_) => write!(f, "BufStream"), } } } impl DataStream { async fn write_all(&mut self, buf: &[u8]) -> ResultType<()> { match self { DataStream::FileStream(fs) => fs.write_all(buf).await?, DataStream::BufStream(bs) => bs.write_all(buf).await?, } Ok(()) } async fn read(&mut self, buf: &mut [u8]) -> std::io::Result { match self { DataStream::FileStream(fs) => fs.read(buf).await, DataStream::BufStream(bs) => bs.read(buf).await, } } } #[derive(Default, Serialize, Deserialize, Debug)] pub struct FileDigest { pub size: u64, pub modified: u64, } #[derive(Default, Serialize, Debug)] #[serde(rename_all = "camelCase")] pub struct TransferJob { pub id: i32, pub r#type: JobType, pub remote: String, pub data_source: DataSource, pub show_hidden: bool, pub is_remote: bool, pub is_last_job: bool, pub is_resume: bool, pub file_num: i32, #[serde(skip_serializing)] pub files: Vec, pub conn_id: i32, // server only #[serde(skip_serializing)] data_stream: Option, pub total_size: u64, finished_size: u64, transferred: u64, enable_overwrite_detection: bool, file_confirmed: bool, // indicating the last file is skipped file_skipped: bool, file_is_waiting: bool, default_overwrite_strategy: Option, #[serde(skip_serializing)] digest: FileDigest, } #[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct TransferJobMeta { #[serde(default)] pub id: i32, #[serde(default)] pub remote: String, #[serde(default)] pub to: String, #[serde(default)] pub show_hidden: bool, #[serde(default)] pub file_num: i32, #[serde(default)] pub is_remote: bool, } #[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct RemoveJobMeta { #[serde(default)] pub path: String, #[serde(default)] pub is_remote: bool, #[serde(default)] pub no_confirm: bool, } #[inline] fn get_ext(name: &str) -> &str { if let Some(i) = name.rfind('.') { return &name[i + 1..]; } "" } #[inline] fn is_compressed_file(name: &str) -> bool { let compressed_exts = ["xz", "gz", "zip", "7z", "rar", "bz2", "tgz", "png", "jpg"]; let ext = get_ext(name); compressed_exts.contains(&ext) } impl TransferJob { #[allow(clippy::too_many_arguments)] pub fn new_write( id: i32, r#type: JobType, remote: String, data_source: DataSource, file_num: i32, show_hidden: bool, is_remote: bool, files: Vec, enable_overwrite_detection: bool, ) -> Self { log::info!("new write {}", data_source); let total_size = files.iter().map(|x| x.size).sum(); Self { id, r#type, remote, data_source, file_num, show_hidden, is_remote, files, total_size, enable_overwrite_detection, ..Default::default() } } pub fn new_read( id: i32, r#type: JobType, remote: String, data_source: DataSource, file_num: i32, show_hidden: bool, is_remote: bool, enable_overwrite_detection: bool, ) -> ResultType { log::info!("new read {}", data_source); let (files, total_size) = match &data_source { DataSource::FilePath(p) => { let p = p.to_str().ok_or(anyhow!("Invalid path"))?; let files = get_recursive_files(p, show_hidden)?; let total_size = files.iter().map(|x| x.size).sum(); (files, total_size) } DataSource::MemoryCursor(c) => (Vec::new(), c.get_ref().len() as u64), }; Ok(Self { id, r#type, remote, data_source, file_num, show_hidden, is_remote, files, total_size, enable_overwrite_detection, ..Default::default() }) } pub async fn get_buf_data(self) -> ResultType>> { match self.data_stream { Some(DataStream::BufStream(mut bs)) => { bs.flush().await?; Ok(Some(bs.into_inner().into_inner())) } _ => Ok(None), } } #[inline] pub fn files(&self) -> &Vec { &self.files } #[inline] pub fn set_files(&mut self, files: Vec) { self.files = files; } #[inline] pub fn set_digest(&mut self, size: u64, modified: u64) { self.digest.size = size; self.digest.modified = modified; } #[inline] pub fn id(&self) -> i32 { self.id } #[inline] pub fn total_size(&self) -> u64 { self.total_size } #[inline] pub fn finished_size(&self) -> u64 { self.finished_size } #[inline] pub fn transferred(&self) -> u64 { self.transferred } #[inline] pub fn file_num(&self) -> i32 { self.file_num } pub fn modify_time(&self) { if self.r#type == JobType::Printer { return; } if let DataSource::FilePath(p) = &self.data_source { let file_num = self.file_num as usize; if file_num < self.files.len() { let entry = &self.files[file_num]; let path = Self::join(p, &entry.name); let download_path = format!("{}.download", get_string(&path)); let digest_path = format!("{}.digest", get_string(&path)); std::fs::remove_file(digest_path).ok(); std::fs::rename(download_path, &path).ok(); filetime::set_file_mtime( &path, filetime::FileTime::from_unix_time(entry.modified_time as _, 0), ) .ok(); } } } pub fn remove_download_file(&self) { if self.r#type == JobType::Printer { return; } if let DataSource::FilePath(p) = &self.data_source { let file_num = self.file_num as usize; if file_num < self.files.len() { let entry = &self.files[file_num]; let path = Self::join(p, &entry.name); let download_path = format!("{}.download", get_string(&path)); let digest_path = format!("{}.digest", get_string(&path)); std::fs::remove_file(download_path).ok(); std::fs::remove_file(digest_path).ok(); } } } #[inline] pub fn set_finished_size_on_resume(&mut self) { if self.is_resume && self.file_num > 0 { let finished_size: u64 = self .files .iter() .take(self.file_num as usize) .map(|file| file.size) .sum(); self.finished_size = finished_size; } } pub async fn write(&mut self, block: FileTransferBlock) -> ResultType<()> { if block.id != self.id { bail!("Wrong id"); } match &self.data_source { DataSource::FilePath(p) => { let file_num = block.file_num as usize; if file_num >= self.files.len() { bail!("Wrong file number"); } if file_num != self.file_num as usize || self.data_stream.is_none() { self.modify_time(); if let Some(DataStream::FileStream(file)) = self.data_stream.as_mut() { file.sync_all().await?; } self.file_num = block.file_num; let entry = &self.files[file_num]; let (path, digest_path) = if self.r#type == JobType::Printer { (p.to_string_lossy().to_string(), None) } else { let path = Self::join(p, &entry.name); if let Some(pp) = path.parent() { std::fs::create_dir_all(pp).ok(); } let file_path = get_string(&path); ( format!("{}.download", &file_path), Some(format!("{}.digest", &file_path)), ) }; if let Some(dp) = digest_path.as_ref() { if Path::new(dp).exists() { std::fs::remove_file(dp)?; } } self.data_stream = Some(DataStream::FileStream(File::create(&path).await?)); if let Some(dp) = digest_path.as_ref() { std::fs::write(dp, json!(self.digest).to_string()).ok(); } } } DataSource::MemoryCursor(c) => { if self.data_stream.is_none() { self.data_stream = Some(DataStream::BufStream(TokioBufStream::new(c.clone()))); } } } if block.compressed { let tmp = decompress(&block.data); self.data_stream .as_mut() .ok_or(anyhow!("data stream is None"))? .write_all(&tmp) .await?; self.finished_size += tmp.len() as u64; } else { self.data_stream .as_mut() .ok_or(anyhow!("file is None"))? .write_all(&block.data) .await?; self.finished_size += block.data.len() as u64; } self.transferred += block.data.len() as u64; Ok(()) } #[inline] pub fn join(p: &PathBuf, name: &str) -> PathBuf { if name.is_empty() { p.clone() } else { p.join(name) } } async fn init_data_stream(&mut self, stream: &mut crate::Stream) -> ResultType<()> { let file_num = self.file_num as usize; match &mut self.data_source { DataSource::FilePath(p) => { if file_num >= self.files.len() { // job done self.data_stream.take(); return Ok(()); }; if self.data_stream.is_none() { match File::open(Self::join(p, &self.files[file_num].name)).await { Ok(file) => { self.data_stream = Some(DataStream::FileStream(file)); self.file_confirmed = false; self.file_is_waiting = false; } Err(err) => { self.file_num += 1; self.file_confirmed = false; self.file_is_waiting = false; return Err(err.into()); } } } } DataSource::MemoryCursor(c) => { if self.data_stream.is_none() { let mut t = std::io::Cursor::new(Vec::new()); std::mem::swap(&mut t, c); self.data_stream = Some(DataStream::BufStream(TokioBufStream::new(t))); } } } if self.r#type == JobType::Generic { if self.enable_overwrite_detection && !self.file_confirmed() { if !self.file_is_waiting() { self.send_current_digest(stream).await?; self.set_file_is_waiting(true); } } } Ok(()) } pub async fn read(&mut self) -> ResultType> { if self.r#type == JobType::Generic { if self.enable_overwrite_detection && !self.file_confirmed() { return Ok(None); } } let file_num = self.file_num as usize; let name: &str; match &mut self.data_source { DataSource::FilePath(..) => { if file_num >= self.files.len() { self.data_stream.take(); return Ok(None); }; name = &self.files[file_num].name; } DataSource::MemoryCursor(..) => { name = ""; } } const BUF_SIZE: usize = 128 * 1024; let mut buf: Vec = vec![0; BUF_SIZE]; let mut compressed = false; let mut offset: usize = 0; loop { match self .data_stream .as_mut() .ok_or(anyhow!("data stream is None"))? .read(&mut buf[offset..]) .await { Err(err) => { self.file_num += 1; self.data_stream = None; self.file_confirmed = false; self.file_is_waiting = false; return Err(err.into()); } Ok(n) => { offset += n; if n == 0 || offset == BUF_SIZE { break; } } } } unsafe { buf.set_len(offset) }; if offset == 0 { if matches!(self.data_source, DataSource::MemoryCursor(_)) { self.data_stream.take(); return Ok(None); } self.file_num += 1; self.data_stream = None; self.file_confirmed = false; self.file_is_waiting = false; } else { self.finished_size += offset as u64; if matches!(self.data_source, DataSource::FilePath(_)) && !is_compressed_file(name) { let tmp = compress(&buf); if tmp.len() < buf.len() { buf = tmp; compressed = true; } } self.transferred += buf.len() as u64; } Ok(Some(FileTransferBlock { id: self.id, file_num: file_num as _, data: buf.into(), compressed, ..Default::default() })) } // Only for generic job and file stream async fn send_current_digest(&mut self, stream: &mut Stream) -> ResultType<()> { let mut msg = Message::new(); let mut resp = FileResponse::new(); let meta = match self.data_stream.as_ref().ok_or(anyhow!("file is None"))? { DataStream::FileStream(file) => file.metadata().await?, DataStream::BufStream(_) => bail!("No need to send digest for buf stream"), }; let last_modified = meta .modified()? .duration_since(SystemTime::UNIX_EPOCH)? .as_secs(); resp.set_digest(FileTransferDigest { id: self.id, file_num: self.file_num, last_modified, file_size: meta.len(), is_resume: self.is_resume, ..Default::default() }); msg.set_file_response(resp); stream.send(&msg).await?; log::info!( "id: {}, file_num: {}, digest message is sent. waiting for confirm. msg: {:?}", self.id, self.file_num, msg ); Ok(()) } pub fn set_overwrite_strategy(&mut self, overwrite_strategy: Option) { self.default_overwrite_strategy = overwrite_strategy; } pub fn default_overwrite_strategy(&self) -> Option { self.default_overwrite_strategy } pub fn set_file_confirmed(&mut self, file_confirmed: bool) { log::info!("id: {}, file_confirmed: {}", self.id, file_confirmed); self.file_confirmed = file_confirmed; self.file_skipped = false; } pub fn set_file_is_waiting(&mut self, file_is_waiting: bool) { self.file_is_waiting = file_is_waiting; } #[inline] pub fn file_is_waiting(&self) -> bool { self.file_is_waiting } #[inline] pub fn file_confirmed(&self) -> bool { self.file_confirmed } /// Indicating whether the last file is skipped #[inline] pub fn file_skipped(&self) -> bool { self.file_skipped } /// Indicating whether the whole task is skipped #[inline] pub fn job_skipped(&self) -> bool { self.file_skipped() && self.files.len() == 1 } /// Check whether the job is completed after `read` returns `None` /// This is a helper function which gives additional lifecycle when the job reads `None`. /// If returns `true`, it means we can delete the job automatically. `False` otherwise. /// /// [`Note`] /// Conditions: /// 1. Files are not waiting for confirmation by peers. #[inline] pub fn job_completed(&self) -> bool { // has no error, Condition 2 !self.enable_overwrite_detection || (!self.file_confirmed && !self.file_is_waiting) } /// Get job error message, useful for getting status when job had finished pub fn job_error(&self) -> Option { if self.job_skipped() { return Some("skipped".to_string()); } None } pub fn set_file_skipped(&mut self) -> bool { log::debug!("skip file {} in job {}", self.file_num, self.id); self.data_stream.take(); self.set_file_confirmed(false); self.set_file_is_waiting(false); self.file_num += 1; self.file_skipped = true; true } async fn set_stream_offset(&mut self, file_num: usize, offset: u64) { if let DataSource::FilePath(p) = &self.data_source { let entry = &self.files[file_num]; let path = Self::join(p, &entry.name); let file_path = get_string(&path); let download_path = format!("{}.download", &file_path); let digest_path = format!("{}.digest", &file_path); let mut f = if Path::new(&download_path).exists() && Path::new(&digest_path).exists() { // If both download and digest files exist, seek (writer) to the offset match OpenOptions::new() .create(true) .write(true) .open(&download_path) .await { Ok(f) => f, Err(e) => { log::warn!("Failed to open file {}: {}", download_path, e); return; } } } else if Path::new(&file_path).exists() { // If `file_path` exists, seek (reader) to the offset match File::open(&file_path).await { Ok(f) => f, Err(e) => { log::warn!("Failed to open file {}: {}", file_path, e); return; } } } else { log::warn!( "File {} not found, cannot seek to offset {}", file_path, offset ); return; }; if f.seek(std::io::SeekFrom::Start(offset)).await.is_ok() { self.data_stream = Some(DataStream::FileStream(f)); self.transferred += offset; self.finished_size += offset; } } } pub async fn confirm(&mut self, r: &FileTransferSendConfirmRequest) -> bool { if self.file_num() != r.file_num { // This branch will always be hit if: // 1. `confirm()` is called in `ui_cm_interface.rs` // 2. Not resuming // // It is ok. Because `confirm()` in `ui_cm_interface.rs` is only used for resuming. log::info!("file num truncated, ignoring"); } else { match r.union { Some(file_transfer_send_confirm_request::Union::Skip(s)) => { if s { self.set_file_skipped(); } else { self.set_file_confirmed(true); } } Some(file_transfer_send_confirm_request::Union::OffsetBlk(offset)) => { self.set_file_confirmed(true); // If offset is greater than 0, we need to seek to the offset if offset > 0 { self.set_stream_offset(r.file_num as usize, offset as u64) .await; } } _ => {} } } true } #[inline] pub fn gen_meta(&self) -> TransferJobMeta { TransferJobMeta { id: self.id, remote: self.remote.to_string(), to: self.data_source.to_meta(), file_num: self.file_num, show_hidden: self.show_hidden, is_remote: self.is_remote, } } } #[inline] pub fn new_error(id: i32, err: T, file_num: i32) -> Message { let mut resp = FileResponse::new(); resp.set_error(FileTransferError { id, error: err.to_string(), file_num, ..Default::default() }); let mut msg_out = Message::new(); msg_out.set_file_response(resp); msg_out } #[inline] pub fn new_dir(id: i32, path: String, files: Vec) -> Message { let mut resp = FileResponse::new(); resp.set_dir(FileDirectory { id, path, entries: files, ..Default::default() }); let mut msg_out = Message::new(); msg_out.set_file_response(resp); msg_out } #[inline] pub fn new_block(block: FileTransferBlock) -> Message { let mut resp = FileResponse::new(); resp.set_block(block); let mut msg_out = Message::new(); msg_out.set_file_response(resp); msg_out } #[inline] pub fn new_send_confirm(r: FileTransferSendConfirmRequest) -> Message { let mut msg_out = Message::new(); let mut action = FileAction::new(); action.set_send_confirm(r); msg_out.set_file_action(action); msg_out } #[inline] pub fn new_receive( id: i32, path: String, file_num: i32, files: Vec, total_size: u64, ) -> Message { let mut action = FileAction::new(); action.set_receive(FileTransferReceiveRequest { id, path, files, file_num, total_size, ..Default::default() }); let mut msg_out = Message::new(); msg_out.set_file_action(action); msg_out } #[inline] pub fn new_send( id: i32, r#type: JobType, path: String, file_num: i32, include_hidden: bool, ) -> Message { log::info!("new send: {}, id: {}", path, id); let mut action = FileAction::new(); let t: file_transfer_send_request::FileType = r#type.into(); action.set_send(FileTransferSendRequest { id, path, include_hidden, file_num, file_type: t.into(), ..Default::default() }); let mut msg_out = Message::new(); msg_out.set_file_action(action); msg_out } #[inline] pub fn new_done(id: i32, file_num: i32) -> Message { let mut resp = FileResponse::new(); resp.set_done(FileTransferDone { id, file_num, ..Default::default() }); let mut msg_out = Message::new(); msg_out.set_file_response(resp); msg_out } #[inline] pub fn remove_job(id: i32, jobs: &mut Vec) -> Option { jobs.iter() .position(|x| x.id() == id) .map(|index| jobs.remove(index)) } #[inline] pub fn get_job(id: i32, jobs: &mut [TransferJob]) -> Option<&mut TransferJob> { jobs.iter_mut().find(|x| x.id() == id) } #[inline] pub fn get_job_immutable(id: i32, jobs: &[TransferJob]) -> Option<&TransferJob> { jobs.iter().find(|x| x.id() == id) } async fn init_jobs(jobs: &mut Vec, stream: &mut crate::Stream) -> ResultType<()> { for job in jobs.iter_mut() { if job.is_last_job { continue; } if let Err(err) = job.init_data_stream(stream).await { stream .send(&new_error(job.id(), err, job.file_num())) .await?; } } Ok(()) } pub async fn handle_read_jobs( jobs: &mut Vec, stream: &mut crate::Stream, ) -> ResultType { init_jobs(jobs, stream).await?; let mut job_log = Default::default(); let mut finished = Vec::new(); for job in jobs.iter_mut() { if job.is_last_job { continue; } match job.read().await { Err(err) => { stream .send(&new_error(job.id(), err, job.file_num())) .await?; } Ok(Some(block)) => { stream.send(&new_block(block)).await?; } Ok(None) => { if job.job_completed() { job_log = serialize_transfer_job(job, true, false, ""); finished.push(job.id()); match job.job_error() { Some(err) => { job_log = serialize_transfer_job(job, false, false, &err); stream .send(&new_error(job.id(), err, job.file_num())) .await? } None => stream.send(&new_done(job.id(), job.file_num())).await?, } } else { // waiting confirmation. } } } // Break to handle jobs one by one. break; } for id in finished { let _ = remove_job(id, jobs); } Ok(job_log) } pub fn remove_all_empty_dir(path: &Path) -> ResultType<()> { let fd = read_dir(path, true)?; for entry in fd.entries.iter() { match entry.entry_type.enum_value() { Ok(FileType::Dir) => { remove_all_empty_dir(&path.join(&entry.name)).ok(); } Ok(FileType::DirLink) | Ok(FileType::FileLink) => { std::fs::remove_file(path.join(&entry.name)).ok(); } _ => {} } } std::fs::remove_dir(path).ok(); Ok(()) } #[inline] pub fn remove_file(file: &str) -> ResultType<()> { std::fs::remove_file(get_path(file))?; Ok(()) } #[inline] pub fn create_dir(dir: &str) -> ResultType<()> { std::fs::create_dir_all(get_path(dir))?; Ok(()) } #[inline] pub fn rename_file(path: &str, new_name: &str) -> ResultType<()> { let path = std::path::Path::new(&path); if path.exists() { let dir = path .parent() .ok_or(anyhow!("Parent directoy of {path:?} not exists"))?; let new_path = dir.join(&new_name); std::fs::rename(&path, &new_path)?; Ok(()) } else { bail!("{path:?} not exists"); } } #[inline] pub fn transform_windows_path(entries: &mut Vec) { for entry in entries { entry.name = entry.name.replace('\\', "/"); } } pub enum DigestCheckResult { IsSame, NeedConfirm(FileTransferDigest), NoSuchFile, } #[inline] pub fn is_write_need_confirmation( is_resume: bool, file_path: &str, digest: &FileTransferDigest, ) -> ResultType { let path = Path::new(file_path); let digest_file = format!("{}.digest", file_path); let download_file = format!("{}.download", file_path); if is_resume && Path::new(&digest_file).exists() && Path::new(&download_file).exists() { // If the digest file exists, it means the file was transferred before. // We can use the digest file to check whether the file is the same. if let Ok(content) = std::fs::read_to_string(digest_file) { if let Ok(local_digest) = serde_json::from_str::(&content) { let is_identical = local_digest.modified == digest.last_modified && local_digest.size == digest.file_size; if is_identical { if let Ok(download_metadata) = std::fs::metadata(download_file) { // Get the file size of the local file // Only send confirmation if the file is not empty. let transferred_size = download_metadata.len(); if transferred_size > 0 { return Ok(DigestCheckResult::NeedConfirm(FileTransferDigest { id: digest.id, file_num: digest.file_num, last_modified: digest.last_modified, file_size: digest.file_size, is_identical, transferred_size, ..Default::default() })); } } } } } } if path.exists() && path.is_file() { let metadata = std::fs::metadata(path)?; let modified_time = metadata.modified()?; let remote_mt = Duration::from_secs(digest.last_modified); let local_mt = modified_time.duration_since(UNIX_EPOCH)?; // [Note] // We decide to give the decision whether to override the existing file to users, // which obey the behavior of the file manager in our system. let mut is_identical = false; if remote_mt == local_mt && digest.file_size == metadata.len() { is_identical = true; } Ok(DigestCheckResult::NeedConfirm(FileTransferDigest { id: digest.id, file_num: digest.file_num, last_modified: local_mt.as_secs(), file_size: metadata.len(), is_identical, ..Default::default() })) } else { // If the file does not exist, or the digest file and download file do not exist, we return NoSuchFile. Ok(DigestCheckResult::NoSuchFile) } } pub fn serialize_transfer_jobs(jobs: &[TransferJob]) -> String { let mut v = vec![]; for job in jobs { let value = serde_json::to_value(job).unwrap_or_default(); v.push(value); } serde_json::to_string(&v).unwrap_or_default() } pub fn serialize_transfer_job(job: &TransferJob, done: bool, cancel: bool, error: &str) -> String { let mut value = serde_json::to_value(job).unwrap_or_default(); value["done"] = json!(done); value["cancel"] = json!(cancel); value["error"] = json!(error); serde_json::to_string(&value).unwrap_or_default() }