Add Sender to parallel methods for user feedback

This commit is contained in:
Nathan Fisher 2023-07-05 23:21:30 -04:00
parent 9ed4d503d1
commit 02b9e4d1db
3 changed files with 68 additions and 8 deletions

View File

@ -10,7 +10,9 @@ pub enum Error {
Slice(TryFromSliceError), Slice(TryFromSliceError),
InvalidChecksum, InvalidChecksum,
MissingData, MissingData,
MutexError,
NulError, NulError,
SenderError,
UnexpectedData, UnexpectedData,
UnknownFileType, UnknownFileType,
Other(String), Other(String),
@ -25,7 +27,9 @@ impl fmt::Display for Error {
Self::Io(e) => write!(f, "{e}"), Self::Io(e) => write!(f, "{e}"),
Self::InvalidChecksum => write!(f, "invalid checksum"), Self::InvalidChecksum => write!(f, "invalid checksum"),
Self::MissingData => write!(f, "missing data"), Self::MissingData => write!(f, "missing data"),
Self::MutexError => write!(f, "mutex error"),
Self::NulError => write!(f, "nul error"), Self::NulError => write!(f, "nul error"),
Self::SenderError => write!(f, "sender error"),
Self::UnexpectedData => write!(f, "unexpected data"), Self::UnexpectedData => write!(f, "unexpected data"),
Self::UnknownFileType => write!(f, "unknown file type"), Self::UnknownFileType => write!(f, "unknown file type"),
Self::Other(s) => write!(f, "{s}"), Self::Other(s) => write!(f, "{s}"),

View File

@ -1,9 +1,14 @@
#![doc = include_str!("../README.md")] #![doc = include_str!("../README.md")]
use std::{collections::HashMap, fs, io::{BufWriter, Write}, sync::Mutex}; use std::{
collections::HashMap,
fs,
io::{BufWriter, Write},
sync::Mutex,
};
#[cfg(feature = "parallel")] #[cfg(feature = "parallel")]
use { use {
rayon::prelude::{IntoParallelRefIterator, ParallelIterator}, rayon::prelude::{IntoParallelRefIterator, ParallelIterator},
std::ops::DerefMut, std::{ops::DerefMut, sync::mpsc::Sender},
}; };
mod checksum; mod checksum;
@ -47,15 +52,30 @@ pub fn stream_archive<W: Write>(
let node = Node::from_path(f, algorithm, &links)?; let node = Node::from_path(f, algorithm, &links)?;
node.write(&mut writer)?; node.write(&mut writer)?;
} }
writer.write_all(&[0; 8])?;
Ok(()) Ok(())
} }
#[cfg(feature = "parallel")]
#[derive(Debug)]
pub enum Message {
NodeCreated(String),
NodeSaved { name: String, size: u64 },
Err(Error),
Eof,
}
/// Creates a Haggis archive from a list of files, processing each file in parallel /// Creates a Haggis archive from a list of files, processing each file in parallel
#[cfg(feature = "parallel")] #[cfg(feature = "parallel")]
pub fn par_create_archive(path: &str, files: Vec<&str>, algorithm: Algorithm) -> Result<(), Error> { pub fn par_create_archive(
path: &str,
files: Vec<&str>,
algorithm: Algorithm,
sender: &Sender<Message>,
) -> Result<(), Error> {
let fd = fs::File::create(path)?; let fd = fs::File::create(path)?;
let writer = BufWriter::new(fd); let writer = BufWriter::new(fd);
par_stream_archive(writer, files, algorithm)?; par_stream_archive(writer, files, algorithm, sender)?;
Ok(()) Ok(())
} }
@ -65,16 +85,41 @@ pub fn par_stream_archive<W: Write + Send>(
writer: W, writer: W,
files: Vec<&str>, files: Vec<&str>,
algorithm: Algorithm, algorithm: Algorithm,
sender: &Sender<Message>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let links = Mutex::new(HashMap::<u64, String>::new()); let links = Mutex::new(HashMap::<u64, String>::new());
let writer = Mutex::new(writer); let writer = Mutex::new(writer);
files.par_iter().try_for_each(|f| { let s = sender.clone();
let node = Node::from_path(f, algorithm, &links)?; files.par_iter().try_for_each_with(s, |s, f| {
let node = match Node::from_path(f, algorithm, &links) {
Ok(n) => n,
Err(e) => {
s.send(Message::Err(e)).map_err(|_| Error::SenderError)?;
return Ok(());
}
};
if let Ok(mut writer) = writer.lock() { if let Ok(mut writer) = writer.lock() {
let mut writer = writer.deref_mut(); let mut writer = writer.deref_mut();
node.write(&mut writer)?; if let Err(e) = node.write(&mut writer) {
s.send(Message::Err(e)).map_err(|_| Error::SenderError)?;
}
match node.filetype {
FileType::Normal(n) => s
.send(Message::NodeSaved {
name: node.name.clone(),
size: n.len,
})
.map_err(|_| Error::SenderError)?,
_ => s
.send(Message::NodeCreated(node.name.clone()))
.map_err(|_| Error::SenderError)?,
}
Ok(())
} else {
Err(Error::MutexError)
} }
Ok::<(), Error>(())
})?; })?;
let mut writer = writer.into_inner().map_err(|_| Error::MutexError)?;
writer.write_all(&[0; 8])?;
Ok(()) Ok(())
} }

View File

@ -33,3 +33,14 @@ impl<T: Read> Iterator for Stream<T> {
} }
} }
} }
impl<T: Read> Stream<T> {
pub fn extract(&mut self, prefix: Option<&str>) -> Result<(), Error> {
todo!()
}
#[cfg(feature = "parallel")]
pub fn par_extract(&mut self, prefix: Option<&str>) -> Result<(), Error> {
todo!()
}
}