diff --git a/src/fs.rs b/src/fs.rs index 4b672b2..41b8a52 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -701,18 +701,17 @@ impl TransferJob { } } - pub async fn read(&mut self, stream: &mut Stream) -> ResultType> { + async fn init_data_stream(&mut self, stream: &mut crate::Stream) -> ResultType<()> { let file_num = self.file_num as usize; - let name: &str; match &mut self.data_source { DataSource::FilePath(p) => { if file_num >= self.files.len() { + // job done self.data_stream.take(); - return Ok(None); + return Ok(()); }; - name = &self.files[file_num].name; if self.data_stream.is_none() { - match File::open(Self::join(p, name)).await { + match File::open(Self::join(p, &self.files[file_num].name)).await { Ok(file) => { self.data_stream = Some(DataStream::FileStream(file)); self.file_confirmed = false; @@ -728,7 +727,6 @@ impl TransferJob { } } DataSource::MemoryCursor(c) => { - name = ""; if self.data_stream.is_none() { let mut t = std::io::Cursor::new(Vec::new()); std::mem::swap(&mut t, c); @@ -742,9 +740,32 @@ impl TransferJob { self.send_current_digest(stream).await?; self.set_file_is_waiting(true); } + } + } + Ok(()) + } + + pub async fn read(&mut self) -> ResultType> { + if self.r#type == JobType::Generic { + if self.enable_overwrite_detection && !self.file_confirmed() { return Ok(None); } } + + let file_num = self.file_num as usize; + let name: &str; + match &mut self.data_source { + DataSource::FilePath(..) => { + if file_num >= self.files.len() { + self.data_stream.take(); + return Ok(None); + }; + name = &self.files[file_num].name; + } + DataSource::MemoryCursor(..) => { + name = ""; + } + } const BUF_SIZE: usize = 128 * 1024; let mut buf: Vec = vec![0; BUF_SIZE]; let mut compressed = false; @@ -953,6 +974,11 @@ impl TransferJob { pub async fn confirm(&mut self, r: &FileTransferSendConfirmRequest) -> bool { if self.file_num() != r.file_num { + // This branch will always be hit if: + // 1. `confirm()` is called in `ui_cm_interface.rs` + // 2. Not resuming + // + // It is ok. Because `confirm()` in `ui_cm_interface.rs` is only used for resuming. log::info!("file num truncated, ignoring"); } else { match r.union { @@ -1112,17 +1138,33 @@ pub fn get_job_immutable(id: i32, jobs: &[TransferJob]) -> Option<&TransferJob> jobs.iter().find(|x| x.id() == id) } +async fn init_jobs(jobs: &mut Vec, stream: &mut crate::Stream) -> ResultType<()> { + for job in jobs.iter_mut() { + if job.is_last_job { + continue; + } + if let Err(err) = job.init_data_stream(stream).await { + stream + .send(&new_error(job.id(), err, job.file_num())) + .await?; + } + } + Ok(()) +} + pub async fn handle_read_jobs( jobs: &mut Vec, stream: &mut crate::Stream, ) -> ResultType { + init_jobs(jobs, stream).await?; + let mut job_log = Default::default(); let mut finished = Vec::new(); for job in jobs.iter_mut() { if job.is_last_job { continue; } - match job.read(stream).await { + match job.read().await { Err(err) => { stream .send(&new_error(job.id(), err, job.file_num()))