Add pathname to message when there is an error creating, writing or extracting an archive node when we are operating in parallel

This commit is contained in:
Nathan Fisher 2024-01-23 15:02:38 -05:00
parent ce1c383f44
commit e60acbbda2
4 changed files with 86 additions and 18 deletions

View File

@ -39,6 +39,6 @@ to see Haggis implemented in other languages.
- [x] Override user/group when creating archives - [x] Override user/group when creating archives
- [x] Override user/group when extracting archives - [x] Override user/group when extracting archives
- [x] Automatically detect zstd compressed archives - [x] Automatically detect zstd compressed archives
- [ ] Add path to error message when passing between threads - [x] Add path to error message when passing between threads
- [x] Add ability to write archives to stdout - [x] Add ability to write archives to stdout
- [ ] Add ability to read archives from stdin - [ ] Add ability to read archives from stdin

View File

@ -21,6 +21,15 @@ pub enum Error {
Other(String), Other(String),
} }
impl Clone for Error {
fn clone(&self) -> Self {
match self {
Self::Io(e) => Self::Io(io::Error::other(format!("{e}"))),
_ => self.clone(),
}
}
}
impl fmt::Display for Error { impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {

View File

@ -39,6 +39,7 @@ pub use {
#[cfg(feature = "parallel")] #[cfg(feature = "parallel")]
pub use stream::Message as StreamMessage; pub use stream::Message as StreamMessage;
/// The *magic* number for a Haggis archive
pub static MAGIC: [u8; 7] = [0x89, b'h', b'a', b'g', b'g', b'i', b's']; pub static MAGIC: [u8; 7] = [0x89, b'h', b'a', b'g', b'g', b'i', b's'];
static ZSTD_MAGIC: [u8; 4] = [0x28, 0xb5, 0x2f, 0xfd]; static ZSTD_MAGIC: [u8; 4] = [0x28, 0xb5, 0x2f, 0xfd];
@ -121,9 +122,25 @@ pub fn stream_archive<W: Write>(
#[cfg(feature = "parallel")] #[cfg(feature = "parallel")]
#[derive(Debug)] #[derive(Debug)]
pub enum Message { pub enum Message {
NodeCreated(String), /// A `Node` has successfully been created from this path
NodeSaved { name: String, size: u64 }, NodeCreated(
Err(Error), /// The path of the created `Node`
String,
),
/// The `Node` has successfully been written into the writer
NodeSaved {
/// The path of the saved `Node`
name: String,
/// The size in bytes of the created `Node`
size: u64,
},
/// An error occurred creating or writing out the node
Err {
/// The pathname of the node
name: String,
/// The error which occurred
error: Error,
},
Eof, Eof,
} }
@ -157,9 +174,9 @@ pub fn par_create_archive_stdout(
uid: Option<u32>, uid: Option<u32>,
gid: Option<u32>, gid: Option<u32>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let stdout = io::stdout().lock(); let stdout = io::stdout();
let writer = BufWriter::new(stdout); let writer = BufWriter::new(stdout);
par_stream_archive(writer, files, algorithm, uid, gid)?; par_stream_archive(writer, files, algorithm, sender, uid, gid)?;
Ok(()) Ok(())
} }
@ -184,8 +201,12 @@ pub fn par_stream_archive<W: Write + Send>(
files.par_iter().try_for_each_with(s, |s, f| { files.par_iter().try_for_each_with(s, |s, f| {
let mut node = match Node::from_path(f, algorithm, &links) { let mut node = match Node::from_path(f, algorithm, &links) {
Ok(n) => n, Ok(n) => n,
Err(e) => { Err(error) => {
s.send(Message::Err(e)).map_err(|_| Error::SenderError)?; s.send(Message::Err {
name: f.to_string(),
error,
})
.map_err(|_| Error::SenderError)?;
return Ok(()); return Ok(());
} }
}; };
@ -197,8 +218,12 @@ pub fn par_stream_archive<W: Write + Send>(
} }
if let Ok(mut writer) = writer.lock() { if let Ok(mut writer) = writer.lock() {
let mut writer = &mut *writer; let mut writer = &mut *writer;
if let Err(e) = node.write(&mut writer) { if let Err(error) = node.write(&mut writer) {
s.send(Message::Err(e)).map_err(|_| Error::SenderError)?; s.send(Message::Err {
name: node.name.clone(),
error,
})
.map_err(|_| Error::SenderError)?;
} }
match node.filetype { match node.filetype {
FileType::Normal(n) => s FileType::Normal(n) => s

View File

@ -41,11 +41,38 @@ impl<R: Read + Send> Iterator for Stream<R> {
#[cfg(feature = "parallel")] #[cfg(feature = "parallel")]
#[derive(Debug)] #[derive(Debug)]
pub enum Message { pub enum Message {
FileExtracted { name: String, size: u64 }, /// A File node has been successfully extracted
LinkCreated { name: String, target: String }, FileExtracted {
DirectoryCreated { name: String }, /// The path of the extracted `Node`
DeviceCreated { name: String }, name: String,
Err(Error), /// The size in bytes of the created file
size: u64,
},
/// A link has been created from a `Node`
LinkCreated {
/// The path of the link
name: String,
/// The path to this link's target
target: String,
},
/// A directory has been created from this `Node`
DirectoryCreated {
/// The directory's path
name: String,
},
/// A device file has been created from this `Node`
DeviceCreated {
/// The path of the device file
name: String,
},
/// An error occurred while extracting this `Node`
Err {
/// The pathame of the `Node` being extracted
name: String,
/// The error which occurred
error: Error,
},
/// The reader has reached the end of the archive
Eof, Eof,
} }
@ -90,7 +117,7 @@ impl<R: Read + Send> Stream<R> {
f: F, f: F,
) -> Result<(), Error> ) -> Result<(), Error>
where where
F: FnOnce(Node, Option<u32>, Option<u32>) + Copy, F: FnOnce(Node, Option<u32>, Option<u32>) + Copy,
{ {
for node in self { for node in self {
let node = node?; let node = node?;
@ -114,7 +141,14 @@ impl<R: Read + Send> Stream<R> {
let s = sender.clone(); let s = sender.clone();
self.into_iter().par_bridge().try_for_each_with(s, |s, n| { self.into_iter().par_bridge().try_for_each_with(s, |s, n| {
let n = n?; let n = n?;
n.extract(prefix, uid, gid)?; if let Err(error) = n.extract(prefix, uid, gid) {
s.send(Message::Err {
name: n.name.clone(),
error: error.clone(),
})
.map_err(|_| Error::SenderError)?;
return Err(error);
}
match n.filetype { match n.filetype {
FileType::Normal(f) => { FileType::Normal(f) => {
s.send(Message::FileExtracted { s.send(Message::FileExtracted {
@ -166,7 +200,7 @@ impl<R: Read + Send> Stream<R> {
f: F, f: F,
) -> Result<(), Error> ) -> Result<(), Error>
where where
F: FnOnce(Node, Option<u32>, Option<u32>) + Copy + Send + Sync, F: FnOnce(Node, Option<u32>, Option<u32>) + Copy + Send + Sync,
{ {
let s = sender.clone(); let s = sender.clone();
self.into_iter().par_bridge().try_for_each_with(s, |s, n| { self.into_iter().par_bridge().try_for_each_with(s, |s, n| {