From 3bc8b9aa803a4c0cbfb7805bbfb3226f47ead8f9 Mon Sep 17 00:00:00 2001 From: Nathan Fisher Date: Sun, 31 Mar 2024 23:12:07 -0400 Subject: [PATCH] Use `Listing` as message type for parallel actions --- src/haggis.rs | 66 ++++++++++++++++++------------- src/lib.rs | 24 +---------- src/stream.rs | 107 +++++++++++--------------------------------------- 3 files changed, 63 insertions(+), 134 deletions(-) diff --git a/src/haggis.rs b/src/haggis.rs index 1f58158..a4d1f33 100644 --- a/src/haggis.rs +++ b/src/haggis.rs @@ -95,13 +95,13 @@ fn create(matches: &ArgMatches) -> Result<(), haggis::Error> { handle = Some(thread::spawn(move || { for msg in &receiver { match msg { - Message::NodeCreated(s) => { - pb.set_prefix(s.split('/').last().unwrap().to_string()); - pb.inc(1); - } - Message::NodeSaved { name, size } => { - let name = name.split('/').last().unwrap(); - pb.set_prefix(format!("{name} added, {size} bytes")); + Message::NodeSaved(n) => { + let name = n.name.split('/').last().unwrap(); + if let ListingKind::Normal(size) = n.kind { + pb.set_prefix(format!("{name} added, {size} bytes")); + } else { + pb.set_prefix(format!("{name} added")); + } pb.inc(1); } Message::Eof => { @@ -242,27 +242,37 @@ fn progress(file: &str, receiver: &mpsc::Receiver, len: u64) -> u pb.println(format!("Extracting archive {file}")); for msg in receiver { match msg { - StreamMessage::FileExtracted { name, size } => { - let name = name.split('/').last().unwrap(); - pb.set_prefix(format!("{name} extracted, {size} bytes")); - pb.inc(1); - total += size; - } - StreamMessage::LinkCreated { name, target } => { - let name = name.split('/').last().unwrap(); - let target = target.split('/').last().unwrap(); - pb.set_prefix(format!("{name} -> {target}")); - pb.inc(1); - } - StreamMessage::DirectoryCreated { name } => { - let name = name.split('/').last().unwrap(); - pb.set_prefix(format!("mkdir {name}")); - pb.inc(1); - } - StreamMessage::DeviceCreated { name } => { - let name = name.split('/').last().unwrap(); - pb.set_prefix(format!("mknod {name}")); - pb.inc(1); + StreamMessage::NodeExtracted(n) => { + let name = n.name.split('/').last().unwrap(); + match n.kind { + ListingKind::Normal(size) => { + pb.set_prefix(format!("{name} extracted, {size} bytes")); + pb.inc(1); + total += size; + } + ListingKind::SoftLink(t) + | ListingKind::HardLink(t) => { + pb.set_prefix(format!("{name} -> {t}")); + pb.inc(1); + } + ListingKind::Directory => { + pb.set_prefix(format!("mkdir {name}")); + pb.inc(1); + } + ListingKind::Block(_d) + | ListingKind::Character(_d) => { + pb.set_prefix(format!("mknod {name}")); + pb.inc(1); + } + ListingKind::Fifo => { + pb.set_prefix(format!("mkfifo {name}")); + pb.inc(1); + } + ListingKind::Eof => { + pb.finish_and_clear(); + break; + } + } } StreamMessage::Eof => { pb.finish_and_clear(); diff --git a/src/lib.rs b/src/lib.rs index 7706364..597bb1d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -137,18 +137,8 @@ pub fn stream_archive( #[cfg(feature = "parallel")] #[derive(Debug)] pub enum Message { - /// A `Node` has successfully been created from this path - NodeCreated( - /// The path of the created `Node` - String, - ), /// The `Node` has successfully been written into the writer - NodeSaved { - /// The path of the saved `Node` - name: String, - /// The size in bytes of the created `Node` - size: u64, - }, + NodeSaved(Listing), /// An error occurred creating or writing out the node Err { /// The pathname of the node @@ -241,17 +231,7 @@ pub fn par_stream_archive( }) .map_err(|_| Error::SenderError)?; } - match node.filetype { - FileType::Normal(n) => s - .send(Message::NodeSaved { - name: node.name.clone(), - size: n.len, - }) - .map_err(|_| Error::SenderError)?, - _ => s - .send(Message::NodeCreated(node.name.clone())) - .map_err(|_| Error::SenderError)?, - } + s.send(Message::NodeSaved(node.into())).map_err(|_| Error::SenderError)?; Ok(()) } else { Err(Error::MutexError) diff --git a/src/stream.rs b/src/stream.rs index 8281c25..1bb1615 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -2,7 +2,7 @@ #[cfg(feature = "parallel")] use { - crate::FileType, + crate::{ FileType, Listing }, rayon::{iter::ParallelBridge, prelude::ParallelIterator}, std::sync::mpsc::Sender, }; @@ -44,30 +44,8 @@ impl Iterator for Stream { #[cfg(feature = "parallel")] #[derive(Debug)] pub enum Message { - /// A File node has been successfully extracted - FileExtracted { - /// The path of the extracted `Node` - name: String, - /// The size in bytes of the created file - size: u64, - }, - /// A link has been created from a `Node` - LinkCreated { - /// The path of the link - name: String, - /// The path to this link's target - target: String, - }, - /// A directory has been created from this `Node` - DirectoryCreated { - /// The directory's path - name: String, - }, - /// A device file has been created from this `Node` - DeviceCreated { - /// The path of the device file - name: String, - }, + /// A Node has been successfully extracted + NodeExtracted(Listing), /// An error occurred while extracting this `Node` Err { /// The pathame of the `Node` being extracted @@ -135,12 +113,12 @@ impl Stream { f: F, ) -> Result<(), Error> where - F: FnOnce(Node, Option, Option) + Copy, + F: FnOnce(&Node, Option, Option) + Copy, { for node in self { let node = node?; node.extract(prefix, uid, gid)?; - f(node, uid, gid); + f(&node, uid, gid); } Ok(()) } @@ -148,7 +126,7 @@ impl Stream { #[cfg(feature = "parallel")] impl Stream { - /// Extracts and archive in parallel + /// Extracts an archive in parallel /// # Errors /// Returns `crate::Error` if io fails or several other error conditions pub fn par_extract( @@ -170,35 +148,12 @@ impl Stream { return Err(error); } match n.filetype { - FileType::Normal(f) => { - s.send(Message::FileExtracted { - name: n.name.clone(), - size: f.len, - }) - .map_err(|_| Error::SenderError)?; - } - FileType::SoftLink(t) | FileType::HardLink(t) => { - s.send(Message::LinkCreated { - name: n.name.clone(), - target: t.clone(), - }) - .map_err(|_| Error::SenderError)?; - } - FileType::Directory => { - s.send(Message::DirectoryCreated { - name: n.name.clone(), - }) - .map_err(|_| Error::SenderError)?; - } - FileType::Block(_) | FileType::Character(_) | FileType::Fifo => { - s.send(Message::DeviceCreated { - name: n.name.clone(), - }) - .map_err(|_| Error::SenderError)?; - } FileType::Eof => { s.send(Message::Eof).map_err(|_| Error::SenderError)?; } + _ => { + s.send(Message::NodeExtracted(n.into())).map_err(|_| Error::SenderError)?; + } } Ok::<(), Error>(()) })?; @@ -206,7 +161,7 @@ impl Stream { Ok(()) } - /// Extracts and archive in parallel and runs the passed in function for + /// Extracts an archive in parallel and runs the passed in function for /// each `Node` /// # Errors /// Returns `crate::Error` if io fails or several other error conditions @@ -219,44 +174,28 @@ impl Stream { f: F, ) -> Result<(), Error> where - F: FnOnce(Node, Option, Option) + Copy + Send + Sync, + F: FnOnce(&Node, Option, Option) + Copy + Send + Sync, { let s = sender.clone(); self.into_iter().par_bridge().try_for_each_with(s, |s, n| { let n = n?; - n.extract(prefix, uid, gid)?; + if let Err(error) = n.extract(prefix, uid, gid) { + s.send(Message::Err { + name: n.name.clone(), + error: error.clone(), + }) + .map_err(|_| Error::SenderError)?; + return Err(error); + } match n.filetype { - FileType::Normal(ref f) => { - s.send(Message::FileExtracted { - name: n.name.clone(), - size: f.len, - }) - .map_err(|_| Error::SenderError)?; - } - FileType::SoftLink(ref t) | FileType::HardLink(ref t) => { - s.send(Message::LinkCreated { - name: n.name.clone(), - target: t.clone(), - }) - .map_err(|_| Error::SenderError)?; - } - FileType::Directory => { - s.send(Message::DirectoryCreated { - name: n.name.clone(), - }) - .map_err(|_| Error::SenderError)?; - } - FileType::Block(_) | FileType::Character(_) | FileType::Fifo => { - s.send(Message::DeviceCreated { - name: n.name.clone(), - }) - .map_err(|_| Error::SenderError)?; - } FileType::Eof => { s.send(Message::Eof).map_err(|_| Error::SenderError)?; } + _ => { + f(&n, uid, gid); + s.send(Message::NodeExtracted(n.into())).map_err(|_| Error::SenderError)?; + } } - f(n, uid, gid); Ok::<(), Error>(()) })?; sender.send(Message::Eof).map_err(|_| Error::SenderError)?;