haggis-rs/src/stream.rs

105 lines
3.1 KiB
Rust
Raw Normal View History

2023-07-07 19:20:21 -04:00
use crate::MAGIC;
use {
2023-07-08 00:39:52 -04:00
crate::{Error, FileType, Node},
std::{
io::{ErrorKind, Read},
iter::Iterator,
},
};
#[cfg(feature = "parallel")]
use {
rayon::{iter::ParallelBridge, prelude::ParallelIterator},
std::{ops::DerefMut, sync::mpsc::Sender},
};
/// An iterator over a series of archive `Node`'s. This struct is generic over any
/// type which implements `Read`, such as a file or a network stream.
#[derive(Debug)]
2023-07-07 19:20:21 -04:00
pub struct Stream<R: Read + Send> {
2023-07-08 00:39:52 -04:00
pub length: u32,
2023-07-07 19:20:21 -04:00
reader: R,
}
2023-07-07 19:20:21 -04:00
impl<R: Read + Send> Iterator for Stream<R> {
type Item = Result<Node, Error>;
fn next(&mut self) -> Option<Self::Item> {
match Node::read(&mut self.reader) {
Err(Error::Io(e)) if e.kind() == ErrorKind::UnexpectedEof => None,
Ok(f) => match f.filetype {
crate::FileType::Eof => None,
_ => Some(Ok(f)),
},
x => Some(x),
}
}
}
#[cfg(feature = "parallel")]
#[derive(Debug)]
pub enum Message {
FileExtracted { name: String, size: u64 },
LinkCreated { name: String, target: String },
DirectoryCreated { name: String },
DeviceCreated { name: String },
Err(Error),
Eof,
}
2023-07-07 19:20:21 -04:00
impl<R: Read + Send> Stream<R> {
pub fn new(mut reader: R) -> Result<Self, Error> {
2023-07-08 00:39:52 -04:00
let mut buf = [0; 11];
2023-07-07 19:20:21 -04:00
reader.read_exact(&mut buf)?;
2023-07-08 00:39:52 -04:00
let length = u32::from_le_bytes(buf[7..].try_into()?);
if buf[0..7] == MAGIC {
Ok(Self { length ,reader })
2023-07-07 19:20:21 -04:00
} else {
Err(Error::InvalidMagic)
}
}
pub fn extract(&mut self, prefix: Option<&str>) -> Result<(), Error> {
for node in self {
node?.extract(prefix)?;
}
Ok(())
}
#[cfg(feature = "parallel")]
pub fn par_extract(
&mut self,
prefix: Option<&str>,
sender: &Sender<Message>,
) -> Result<(), Error> {
let s = sender.clone();
self.into_iter().par_bridge().try_for_each_with(s, |s, n| {
2023-07-08 00:39:52 -04:00
let n = n?;
n.extract(prefix)?;
match n.filetype {
FileType::Normal(f) => {
s.send(Message::FileExtracted { name: n.name.clone(), size: f.len })
.map_err(|_| Error::SenderError)?;
},
FileType::SoftLink(t) | FileType::HardLink(t) => {
s.send(Message::LinkCreated { name: n.name.clone(), target: t.clone() })
.map_err(|_| Error::SenderError)?;
},
FileType::Directory => {
s.send(Message::DirectoryCreated { name: n.name.clone() })
.map_err(|_| Error::SenderError)?;
},
FileType::Block(_) | FileType::Character(_) | FileType::Fifo => {
s.send(Message::DeviceCreated { name: n.name.clone() })
.map_err(|_| Error::SenderError)?;
},
FileType::Eof => {
s.send(Message::Eof).map_err(|_| Error::SenderError)?;
},
}
Ok::<(), Error>(())
})?;
Ok(())
}
}