diff --git a/src/stream.rs b/src/stream.rs index d55af3d..e8f95d1 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -5,21 +5,26 @@ use { iter::Iterator, }, }; +#[cfg(feature = "parallel")] +use { + rayon::{iter::ParallelBridge, prelude::ParallelIterator}, + std::{ops::DerefMut, sync::mpsc::Sender}, +}; /// An iterator over a series of archive `Node`'s. This struct is generic over any /// type which implements `Read`, such as a file or a network stream. #[derive(Debug)] -pub struct Stream { +pub struct Stream { reader: T, } -impl From for Stream { +impl From for Stream { fn from(value: T) -> Self { Self { reader: value } } } -impl Iterator for Stream { +impl Iterator for Stream { type Item = Result; fn next(&mut self) -> Option { @@ -34,13 +39,36 @@ impl Iterator for Stream { } } -impl Stream { +#[cfg(feature = "parallel")] +#[derive(Debug)] +pub enum Message { + FileExtracted { name: String, size: u64 }, + LinkCreated { name: String, target: String }, + DirectoryCreated { name: String }, + DeviceCreated { name: String }, + Err(Error), + Eof, +} + +impl Stream { pub fn extract(&mut self, prefix: Option<&str>) -> Result<(), Error> { - todo!() + for node in self { + node?.extract(prefix)?; + } + Ok(()) } #[cfg(feature = "parallel")] - pub fn par_extract(&mut self, prefix: Option<&str>) -> Result<(), Error> { - todo!() + pub fn par_extract( + &mut self, + prefix: Option<&str>, + sender: &Sender, + ) -> Result<(), Error> { + let s = sender.clone(); + self.into_iter().par_bridge().try_for_each_with(s, |s, n| { + n?.extract(prefix)?; + Ok::<(), Error>(()) + })?; + Ok(()) } }