haggis-rs/src/stream.rs

205 lines
6.0 KiB
Rust

#![allow(clippy::similar_names)]
#[cfg(feature = "parallel")]
use {
crate::{ FileType, Listing },
rayon::{iter::ParallelBridge, prelude::ParallelIterator},
std::sync::mpsc::Sender,
};
use {
crate::{Error, Node, MAGIC},
std::{
fs::File,
io::{self, BufReader, ErrorKind, Read},
iter::Iterator,
os::fd::{AsRawFd, FromRawFd},
},
};
/// An iterator over a series of archive `Node`'s. This struct is generic over any
/// type which implements `Read`, such as a file or a network stream.
#[derive(Debug)]
pub struct Stream<R: Read> {
pub length: u32,
reader: R,
}
impl<R: Read> Iterator for Stream<R> {
type Item = Result<Node, Error>;
fn next(&mut self) -> Option<Self::Item> {
match Node::read(&mut self.reader) {
Err(Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => None,
Ok(f) => match f.filetype {
crate::FileType::Eof => None,
_ => Some(Ok(f)),
},
x => Some(x),
}
}
}
/// A message to be passed from a background thread to a foreground thread when
/// a `Node` extraction has finished
#[cfg(feature = "parallel")]
#[derive(Debug)]
pub enum Message {
/// A Node has been successfully extracted
NodeExtracted(Listing),
/// An error occurred while extracting this `Node`
Err {
/// The pathame of the `Node` being extracted
name: String,
/// The error which occurred
error: Error,
},
/// The reader has reached the end of the archive
Eof,
}
impl Stream<BufReader<File>> {
/// Creates a new `Stream` from stdin
/// # Errors
/// Returns `crate::Error` if io fails or the first 11 bytes are not
/// the correct Haggis *magic* number
pub fn new_from_stdin() -> Result<Self, Error> {
let stdin = io::stdin();
let raw = stdin.as_raw_fd();
let fd = unsafe { File::from_raw_fd(raw) };
let reader = BufReader::new(fd);
Stream::new(reader)
}
}
impl<R: Read> Stream<R> {
/// Creates a new `Stream`
/// # Errors
/// Returns `crate::Error` if io fails or the first 11 bytes are not the
/// correct Haggis *magic* number
pub fn new(mut reader: R) -> Result<Self, Error> {
let mut buf = [0; 11];
reader.read_exact(&mut buf)?;
let length = u32::from_le_bytes(buf[7..].try_into()?);
if buf[0..7] == MAGIC {
Ok(Self { length, reader })
} else {
Err(Error::InvalidMagic)
}
}
/// Extracts an archive
/// # Errors
/// Returns `crate::Error` if io fails or several other error conditions
pub fn extract(
&mut self,
prefix: Option<&str>,
uid: Option<u32>,
gid: Option<u32>,
) -> Result<(), Error> {
for node in self {
node?.extract(prefix, uid, gid)?;
}
Ok(())
}
/// Extracts an archive, running the function `f` for each node
/// # Errors
/// Returns `crate::Error` if io fails or several other error conditions
pub fn extract_with<F>(
&mut self,
prefix: Option<&str>,
uid: Option<u32>,
gid: Option<u32>,
f: F,
) -> Result<(), Error>
where
F: FnOnce(&Node, Option<u32>, Option<u32>) + Copy,
{
for node in self {
let node = node?;
node.extract(prefix, uid, gid)?;
f(&node, uid, gid);
}
Ok(())
}
}
#[cfg(feature = "parallel")]
impl<R: Read + Send> Stream<R> {
/// Extracts an archive in parallel
/// # Errors
/// Returns `crate::Error` if io fails or several other error conditions
pub fn par_extract(
&mut self,
prefix: Option<&str>,
uid: Option<u32>,
gid: Option<u32>,
sender: &Sender<Message>,
) -> Result<(), Error> {
let s = sender.clone();
self.into_iter().par_bridge().try_for_each_with(s, |s, n| {
let n = n?;
if let Err(error) = n.extract(prefix, uid, gid) {
s.send(Message::Err {
name: n.name.clone(),
error: error.clone(),
})
.map_err(|_| Error::SenderError)?;
return Err(error);
}
match n.filetype {
FileType::Eof => {
s.send(Message::Eof).map_err(|_| Error::SenderError)?;
}
_ => {
s.send(Message::NodeExtracted(n.into())).map_err(|_| Error::SenderError)?;
}
}
Ok::<(), Error>(())
})?;
sender.send(Message::Eof).map_err(|_| Error::SenderError)?;
Ok(())
}
/// Extracts an archive in parallel and runs the passed in function for
/// each `Node`
/// # Errors
/// Returns `crate::Error` if io fails or several other error conditions
pub fn par_extract_with<F>(
&mut self,
prefix: Option<&str>,
uid: Option<u32>,
gid: Option<u32>,
sender: &Sender<Message>,
f: F,
) -> Result<(), Error>
where
F: FnOnce(&Node, Option<u32>, Option<u32>) + Copy + Send + Sync,
{
let s = sender.clone();
self.into_iter().par_bridge().try_for_each_with(s, |s, n| {
let n = n?;
if let Err(error) = n.extract(prefix, uid, gid) {
s.send(Message::Err {
name: n.name.clone(),
error: error.clone(),
})
.map_err(|_| Error::SenderError)?;
return Err(error);
}
match n.filetype {
FileType::Eof => {
s.send(Message::Eof).map_err(|_| Error::SenderError)?;
}
_ => {
f(&n, uid, gid);
s.send(Message::NodeExtracted(n.into())).map_err(|_| Error::SenderError)?;
}
}
Ok::<(), Error>(())
})?;
sender.send(Message::Eof).map_err(|_| Error::SenderError)?;
Ok(())
}
}