#![allow(clippy::similar_names)] #[cfg(feature = "parallel")] use { crate::FileType, rayon::{iter::ParallelBridge, prelude::ParallelIterator}, std::sync::mpsc::Sender, }; use { crate::{Error, Node, MAGIC}, std::{ io::{ErrorKind, Read}, iter::Iterator, }, }; /// An iterator over a series of archive `Node`'s. This struct is generic over any /// type which implements `Read`, such as a file or a network stream. #[derive(Debug)] pub struct Stream { pub length: u32, reader: R, } impl Iterator for Stream { type Item = Result; fn next(&mut self) -> Option { match Node::read(&mut self.reader) { Err(Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => None, Ok(f) => match f.filetype { crate::FileType::Eof => None, _ => Some(Ok(f)), }, x => Some(x), } } } /// A message to be passed from a background thread to a foreground thread when /// a `Node` extraction has finished #[cfg(feature = "parallel")] #[derive(Debug)] pub enum Message { FileExtracted { name: String, size: u64 }, LinkCreated { name: String, target: String }, DirectoryCreated { name: String }, DeviceCreated { name: String }, Err(Error), Eof, } impl Stream { /// Creates a new archive /// # Errors /// Returns `crate::Error` if io fails or several other error conditions pub fn new(mut reader: R) -> Result { let mut buf = [0; 11]; reader.read_exact(&mut buf)?; let length = u32::from_le_bytes(buf[7..].try_into()?); if buf[0..7] == MAGIC { Ok(Self { length, reader }) } else { Err(Error::InvalidMagic) } } /// Extracts an archive /// # Errors /// Returns `crate::Error` if io fails or several other error conditions pub fn extract( &mut self, prefix: Option<&str>, uid: Option, gid: Option, ) -> Result<(), Error> { for node in self { node?.extract(prefix, uid, gid)?; } Ok(()) } /// Extracts an archive, running the function `f` for each node /// # Errors /// Returns `crate::Error` if io fails or several other error conditions pub fn extract_with( &mut self, prefix: Option<&str>, uid: Option, gid: Option, f: F, ) -> Result<(), Error> where F: FnOnce(Node, Option, Option) + Copy, { for node in self { let node = node?; node.extract(prefix, uid, gid)?; f(node, uid, gid); } Ok(()) } #[cfg(feature = "parallel")] /// Extracts and archive in parallel /// # Errors /// Returns `crate::Error` if io fails or several other error conditions pub fn par_extract( &mut self, prefix: Option<&str>, uid: Option, gid: Option, sender: &Sender, ) -> Result<(), Error> { let s = sender.clone(); self.into_iter().par_bridge().try_for_each_with(s, |s, n| { let n = n?; n.extract(prefix, uid, gid)?; match n.filetype { FileType::Normal(f) => { s.send(Message::FileExtracted { name: n.name.clone(), size: f.len, }) .map_err(|_| Error::SenderError)?; } FileType::SoftLink(t) | FileType::HardLink(t) => { s.send(Message::LinkCreated { name: n.name.clone(), target: t.clone(), }) .map_err(|_| Error::SenderError)?; } FileType::Directory => { s.send(Message::DirectoryCreated { name: n.name.clone(), }) .map_err(|_| Error::SenderError)?; } FileType::Block(_) | FileType::Character(_) | FileType::Fifo => { s.send(Message::DeviceCreated { name: n.name.clone(), }) .map_err(|_| Error::SenderError)?; } FileType::Eof => { s.send(Message::Eof).map_err(|_| Error::SenderError)?; } } Ok::<(), Error>(()) })?; sender.send(Message::Eof).map_err(|_| Error::SenderError)?; Ok(()) } #[cfg(feature = "parallel")] /// Extracts and archive in parallel and runs the passed in function for /// each `Node` /// # Errors /// Returns `crate::Error` if io fails or several other error conditions pub fn par_extract_with( &mut self, prefix: Option<&str>, uid: Option, gid: Option, sender: &Sender, f: F, ) -> Result<(), Error> where F: FnOnce(Node, Option, Option) + Copy + Send + Sync, { let s = sender.clone(); self.into_iter().par_bridge().try_for_each_with(s, |s, n| { let n = n?; n.extract(prefix, uid, gid)?; match n.filetype { FileType::Normal(ref f) => { s.send(Message::FileExtracted { name: n.name.clone(), size: f.len, }) .map_err(|_| Error::SenderError)?; } FileType::SoftLink(ref t) | FileType::HardLink(ref t) => { s.send(Message::LinkCreated { name: n.name.clone(), target: t.clone(), }) .map_err(|_| Error::SenderError)?; } FileType::Directory => { s.send(Message::DirectoryCreated { name: n.name.clone(), }) .map_err(|_| Error::SenderError)?; } FileType::Block(_) | FileType::Character(_) | FileType::Fifo => { s.send(Message::DeviceCreated { name: n.name.clone(), }) .map_err(|_| Error::SenderError)?; } FileType::Eof => { s.send(Message::Eof).map_err(|_| Error::SenderError)?; } } f(n, uid, gid); Ok::<(), Error>(()) })?; sender.send(Message::Eof).map_err(|_| Error::SenderError)?; Ok(()) } }