From 02b9e4d1dbeb39f5e5410ace70035b51f3252156 Mon Sep 17 00:00:00 2001 From: Nathan Fisher Date: Wed, 5 Jul 2023 23:21:30 -0400 Subject: [PATCH] Add Sender to parallel methods for user feedback --- src/error.rs | 4 ++++ src/lib.rs | 61 ++++++++++++++++++++++++++++++++++++++++++++------- src/stream.rs | 11 ++++++++++ 3 files changed, 68 insertions(+), 8 deletions(-) diff --git a/src/error.rs b/src/error.rs index bffaefd..6fbfce3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -10,7 +10,9 @@ pub enum Error { Slice(TryFromSliceError), InvalidChecksum, MissingData, + MutexError, NulError, + SenderError, UnexpectedData, UnknownFileType, Other(String), @@ -25,7 +27,9 @@ impl fmt::Display for Error { Self::Io(e) => write!(f, "{e}"), Self::InvalidChecksum => write!(f, "invalid checksum"), Self::MissingData => write!(f, "missing data"), + Self::MutexError => write!(f, "mutex error"), Self::NulError => write!(f, "nul error"), + Self::SenderError => write!(f, "sender error"), Self::UnexpectedData => write!(f, "unexpected data"), Self::UnknownFileType => write!(f, "unknown file type"), Self::Other(s) => write!(f, "{s}"), diff --git a/src/lib.rs b/src/lib.rs index 85ef7e5..31eb208 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,14 @@ #![doc = include_str!("../README.md")] -use std::{collections::HashMap, fs, io::{BufWriter, Write}, sync::Mutex}; +use std::{ + collections::HashMap, + fs, + io::{BufWriter, Write}, + sync::Mutex, +}; #[cfg(feature = "parallel")] use { rayon::prelude::{IntoParallelRefIterator, ParallelIterator}, - std::ops::DerefMut, + std::{ops::DerefMut, sync::mpsc::Sender}, }; mod checksum; @@ -47,15 +52,30 @@ pub fn stream_archive( let node = Node::from_path(f, algorithm, &links)?; node.write(&mut writer)?; } + writer.write_all(&[0; 8])?; Ok(()) } +#[cfg(feature = "parallel")] +#[derive(Debug)] +pub enum Message { + NodeCreated(String), + NodeSaved { name: String, size: u64 }, + Err(Error), + Eof, +} + /// Creates a Haggis archive from a list of files, processing each file in parallel #[cfg(feature = "parallel")] -pub fn par_create_archive(path: &str, files: Vec<&str>, algorithm: Algorithm) -> Result<(), Error> { +pub fn par_create_archive( + path: &str, + files: Vec<&str>, + algorithm: Algorithm, + sender: &Sender, +) -> Result<(), Error> { let fd = fs::File::create(path)?; let writer = BufWriter::new(fd); - par_stream_archive(writer, files, algorithm)?; + par_stream_archive(writer, files, algorithm, sender)?; Ok(()) } @@ -65,16 +85,41 @@ pub fn par_stream_archive( writer: W, files: Vec<&str>, algorithm: Algorithm, + sender: &Sender, ) -> Result<(), Error> { let links = Mutex::new(HashMap::::new()); let writer = Mutex::new(writer); - files.par_iter().try_for_each(|f| { - let node = Node::from_path(f, algorithm, &links)?; + let s = sender.clone(); + files.par_iter().try_for_each_with(s, |s, f| { + let node = match Node::from_path(f, algorithm, &links) { + Ok(n) => n, + Err(e) => { + s.send(Message::Err(e)).map_err(|_| Error::SenderError)?; + return Ok(()); + } + }; if let Ok(mut writer) = writer.lock() { let mut writer = writer.deref_mut(); - node.write(&mut writer)?; + if let Err(e) = node.write(&mut writer) { + s.send(Message::Err(e)).map_err(|_| Error::SenderError)?; + } + match node.filetype { + FileType::Normal(n) => s + .send(Message::NodeSaved { + name: node.name.clone(), + size: n.len, + }) + .map_err(|_| Error::SenderError)?, + _ => s + .send(Message::NodeCreated(node.name.clone())) + .map_err(|_| Error::SenderError)?, + } + Ok(()) + } else { + Err(Error::MutexError) } - Ok::<(), Error>(()) })?; + let mut writer = writer.into_inner().map_err(|_| Error::MutexError)?; + writer.write_all(&[0; 8])?; Ok(()) } diff --git a/src/stream.rs b/src/stream.rs index 27ebfcc..d55af3d 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -33,3 +33,14 @@ impl Iterator for Stream { } } } + +impl Stream { + pub fn extract(&mut self, prefix: Option<&str>) -> Result<(), Error> { + todo!() + } + + #[cfg(feature = "parallel")] + pub fn par_extract(&mut self, prefix: Option<&str>) -> Result<(), Error> { + todo!() + } +}