Basic implementation of parallel extract; Todo: Message passing

This commit is contained in:
Nathan Fisher 2023-07-05 23:50:28 -04:00
parent 02b9e4d1db
commit 05bec9ae2f

View File

@ -5,21 +5,26 @@ use {
iter::Iterator,
},
};
#[cfg(feature = "parallel")]
use {
rayon::{iter::ParallelBridge, prelude::ParallelIterator},
std::{ops::DerefMut, sync::mpsc::Sender},
};
/// An iterator over a series of archive `Node`'s. This struct is generic over any
/// type which implements `Read`, such as a file or a network stream.
#[derive(Debug)]
pub struct Stream<T: Read> {
pub struct Stream<T: Read + Send> {
reader: T,
}
impl<T: Read> From<T> for Stream<T> {
impl<T: Read + Send> From<T> for Stream<T> {
fn from(value: T) -> Self {
Self { reader: value }
}
}
impl<T: Read> Iterator for Stream<T> {
impl<T: Read + Send> Iterator for Stream<T> {
type Item = Result<Node, Error>;
fn next(&mut self) -> Option<Self::Item> {
@ -34,13 +39,36 @@ impl<T: Read> Iterator for Stream<T> {
}
}
impl<T: Read> Stream<T> {
#[cfg(feature = "parallel")]
#[derive(Debug)]
pub enum Message {
FileExtracted { name: String, size: u64 },
LinkCreated { name: String, target: String },
DirectoryCreated { name: String },
DeviceCreated { name: String },
Err(Error),
Eof,
}
impl<T: Read + Send> Stream<T> {
pub fn extract(&mut self, prefix: Option<&str>) -> Result<(), Error> {
todo!()
for node in self {
node?.extract(prefix)?;
}
Ok(())
}
#[cfg(feature = "parallel")]
pub fn par_extract(&mut self, prefix: Option<&str>) -> Result<(), Error> {
todo!()
pub fn par_extract(
&mut self,
prefix: Option<&str>,
sender: &Sender<Message>,
) -> Result<(), Error> {
let s = sender.clone();
self.into_iter().par_bridge().try_for_each_with(s, |s, n| {
n?.extract(prefix)?;
Ok::<(), Error>(())
})?;
Ok(())
}
}