From e60acbbda299181b8391d73aafaaacf87fa98d1a Mon Sep 17 00:00:00 2001 From: Nathan Fisher Date: Tue, 23 Jan 2024 15:02:38 -0500 Subject: [PATCH] Add pathname to message when there is an error creating, writing or extracting an archive node when we are operating in parallel --- README.md | 2 +- src/error.rs | 9 +++++++++ src/lib.rs | 43 ++++++++++++++++++++++++++++++++++--------- src/stream.rs | 50 ++++++++++++++++++++++++++++++++++++++++++-------- 4 files changed, 86 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index d7d6b1e..b292f01 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,6 @@ to see Haggis implemented in other languages. - [x] Override user/group when creating archives - [x] Override user/group when extracting archives - [x] Automatically detect zstd compressed archives -- [ ] Add path to error message when passing between threads +- [x] Add path to error message when passing between threads - [x] Add ability to write archives to stdout - [ ] Add ability to read archives from stdin diff --git a/src/error.rs b/src/error.rs index 8132dba..d9b2236 100644 --- a/src/error.rs +++ b/src/error.rs @@ -21,6 +21,15 @@ pub enum Error { Other(String), } +impl Clone for Error { + fn clone(&self) -> Self { + match self { + Self::Io(e) => Self::Io(io::Error::other(format!("{e}"))), + _ => self.clone(), + } + } +} + impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { diff --git a/src/lib.rs b/src/lib.rs index 4e9fdf8..d8dab3d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,6 +39,7 @@ pub use { #[cfg(feature = "parallel")] pub use stream::Message as StreamMessage; +/// The *magic* number for a Haggis archive pub static MAGIC: [u8; 7] = [0x89, b'h', b'a', b'g', b'g', b'i', b's']; static ZSTD_MAGIC: [u8; 4] = [0x28, 0xb5, 0x2f, 0xfd]; @@ -121,9 +122,25 @@ pub fn stream_archive( #[cfg(feature = "parallel")] #[derive(Debug)] pub enum Message { - NodeCreated(String), - NodeSaved { name: String, size: u64 }, - Err(Error), + /// A `Node` has successfully been created from this path + NodeCreated( + /// The path of the created `Node` + String, + ), + /// The `Node` has successfully been written into the writer + NodeSaved { + /// The path of the saved `Node` + name: String, + /// The size in bytes of the created `Node` + size: u64, + }, + /// An error occurred creating or writing out the node + Err { + /// The pathname of the node + name: String, + /// The error which occurred + error: Error, + }, Eof, } @@ -157,9 +174,9 @@ pub fn par_create_archive_stdout( uid: Option, gid: Option, ) -> Result<(), Error> { - let stdout = io::stdout().lock(); + let stdout = io::stdout(); let writer = BufWriter::new(stdout); - par_stream_archive(writer, files, algorithm, uid, gid)?; + par_stream_archive(writer, files, algorithm, sender, uid, gid)?; Ok(()) } @@ -184,8 +201,12 @@ pub fn par_stream_archive( files.par_iter().try_for_each_with(s, |s, f| { let mut node = match Node::from_path(f, algorithm, &links) { Ok(n) => n, - Err(e) => { - s.send(Message::Err(e)).map_err(|_| Error::SenderError)?; + Err(error) => { + s.send(Message::Err { + name: f.to_string(), + error, + }) + .map_err(|_| Error::SenderError)?; return Ok(()); } }; @@ -197,8 +218,12 @@ pub fn par_stream_archive( } if let Ok(mut writer) = writer.lock() { let mut writer = &mut *writer; - if let Err(e) = node.write(&mut writer) { - s.send(Message::Err(e)).map_err(|_| Error::SenderError)?; + if let Err(error) = node.write(&mut writer) { + s.send(Message::Err { + name: node.name.clone(), + error, + }) + .map_err(|_| Error::SenderError)?; } match node.filetype { FileType::Normal(n) => s diff --git a/src/stream.rs b/src/stream.rs index a88a3f1..35eb9c5 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -41,11 +41,38 @@ impl Iterator for Stream { #[cfg(feature = "parallel")] #[derive(Debug)] pub enum Message { - FileExtracted { name: String, size: u64 }, - LinkCreated { name: String, target: String }, - DirectoryCreated { name: String }, - DeviceCreated { name: String }, - Err(Error), + /// A File node has been successfully extracted + FileExtracted { + /// The path of the extracted `Node` + name: String, + /// The size in bytes of the created file + size: u64, + }, + /// A link has been created from a `Node` + LinkCreated { + /// The path of the link + name: String, + /// The path to this link's target + target: String, + }, + /// A directory has been created from this `Node` + DirectoryCreated { + /// The directory's path + name: String, + }, + /// A device file has been created from this `Node` + DeviceCreated { + /// The path of the device file + name: String, + }, + /// An error occurred while extracting this `Node` + Err { + /// The pathame of the `Node` being extracted + name: String, + /// The error which occurred + error: Error, + }, + /// The reader has reached the end of the archive Eof, } @@ -90,7 +117,7 @@ impl Stream { f: F, ) -> Result<(), Error> where - F: FnOnce(Node, Option, Option) + Copy, + F: FnOnce(Node, Option, Option) + Copy, { for node in self { let node = node?; @@ -114,7 +141,14 @@ impl Stream { let s = sender.clone(); self.into_iter().par_bridge().try_for_each_with(s, |s, n| { let n = n?; - n.extract(prefix, uid, gid)?; + if let Err(error) = n.extract(prefix, uid, gid) { + s.send(Message::Err { + name: n.name.clone(), + error: error.clone(), + }) + .map_err(|_| Error::SenderError)?; + return Err(error); + } match n.filetype { FileType::Normal(f) => { s.send(Message::FileExtracted { @@ -166,7 +200,7 @@ impl Stream { f: F, ) -> Result<(), Error> where - F: FnOnce(Node, Option, Option) + Copy + Send + Sync, + F: FnOnce(Node, Option, Option) + Copy + Send + Sync, { let s = sender.clone(); self.into_iter().par_bridge().try_for_each_with(s, |s, n| {