Implement creating a Stream of Nodes over stdin

This commit is contained in:
Nathan Fisher 2024-01-23 16:55:35 -05:00
parent e60acbbda2
commit 0f4ca019d2
3 changed files with 31 additions and 9 deletions

View File

@ -41,4 +41,4 @@ to see Haggis implemented in other languages.
- [x] Automatically detect zstd compressed archives - [x] Automatically detect zstd compressed archives
- [x] Add path to error message when passing between threads - [x] Add path to error message when passing between threads
- [x] Add ability to write archives to stdout - [x] Add ability to write archives to stdout
- [ ] Add ability to read archives from stdin - [x] Add ability to read archives from stdin

View File

@ -54,6 +54,7 @@ pub fn detect_zstd<R: Read + Seek>(reader: &mut R) -> Result<bool, Error> {
Ok(buf == ZSTD_MAGIC) Ok(buf == ZSTD_MAGIC)
} }
#[allow(clippy::similar_names)]
/// Creates a haggis archive from a list of files /// Creates a haggis archive from a list of files
/// # Errors /// # Errors
/// Returns `crate::Error` if io fails or several other error conditions /// Returns `crate::Error` if io fails or several other error conditions
@ -73,6 +74,7 @@ pub fn create_archive(
Ok(()) Ok(())
} }
#[allow(clippy::similar_names)]
/// Creates a haggis archive and writes it to stdout /// Creates a haggis archive and writes it to stdout
/// # Errors /// # Errors
/// Returns `crate::Error` if io fails /// Returns `crate::Error` if io fails
@ -88,6 +90,7 @@ pub fn create_archive_stdout(
Ok(()) Ok(())
} }
#[allow(clippy::similar_names)]
/// Streams a haggis archive over something which implements `Write` /// Streams a haggis archive over something which implements `Write`
/// # Errors /// # Errors
/// Returns `crate::Error` if io fails or several other error conditions /// Returns `crate::Error` if io fails or several other error conditions

View File

@ -1,4 +1,5 @@
#![allow(clippy::similar_names)] #![allow(clippy::similar_names)]
#[cfg(feature = "parallel")] #[cfg(feature = "parallel")]
use { use {
crate::FileType, crate::FileType,
@ -8,20 +9,22 @@ use {
use { use {
crate::{Error, Node, MAGIC}, crate::{Error, Node, MAGIC},
std::{ std::{
io::{ErrorKind, Read}, fs::File,
io::{self, BufReader, ErrorKind, Read},
iter::Iterator, iter::Iterator,
os::fd::{AsRawFd, FromRawFd},
}, },
}; };
/// An iterator over a series of archive `Node`'s. This struct is generic over any /// An iterator over a series of archive `Node`'s. This struct is generic over any
/// type which implements `Read`, such as a file or a network stream. /// type which implements `Read`, such as a file or a network stream.
#[derive(Debug)] #[derive(Debug)]
pub struct Stream<R: Read + Send> { pub struct Stream<R: Read> {
pub length: u32, pub length: u32,
reader: R, reader: R,
} }
impl<R: Read + Send> Iterator for Stream<R> { impl<R: Read> Iterator for Stream<R> {
type Item = Result<Node, Error>; type Item = Result<Node, Error>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
@ -76,10 +79,25 @@ pub enum Message {
Eof, Eof,
} }
impl<R: Read + Send> Stream<R> { impl Stream<BufReader<File>> {
/// Creates a new archive /// Creates a new `Stream` from stdin
/// # Errors /// # Errors
/// Returns `crate::Error` if io fails or several other error conditions /// Returns `crate::Error` if io fails or the first 11 bytes are not
/// the correct Haggis *magic* number
pub fn new_from_stdin() -> Result<Self, Error> {
let stdin = io::stdin();
let raw = stdin.as_raw_fd();
let fd = unsafe { File::from_raw_fd(raw) };
let reader = BufReader::new(fd);
Stream::new(reader)
}
}
impl<R: Read> Stream<R> {
/// Creates a new `Stream`
/// # Errors
/// Returns `crate::Error` if io fails or the first 11 bytes are not the
/// correct Haggis *magic* number
pub fn new(mut reader: R) -> Result<Self, Error> { pub fn new(mut reader: R) -> Result<Self, Error> {
let mut buf = [0; 11]; let mut buf = [0; 11];
reader.read_exact(&mut buf)?; reader.read_exact(&mut buf)?;
@ -126,8 +144,10 @@ impl<R: Read + Send> Stream<R> {
} }
Ok(()) Ok(())
} }
}
#[cfg(feature = "parallel")] #[cfg(feature = "parallel")]
impl<R: Read + Send> Stream<R> {
/// Extracts and archive in parallel /// Extracts and archive in parallel
/// # Errors /// # Errors
/// Returns `crate::Error` if io fails or several other error conditions /// Returns `crate::Error` if io fails or several other error conditions
@ -186,7 +206,6 @@ impl<R: Read + Send> Stream<R> {
Ok(()) Ok(())
} }
#[cfg(feature = "parallel")]
/// Extracts and archive in parallel and runs the passed in function for /// Extracts and archive in parallel and runs the passed in function for
/// each `Node` /// each `Node`
/// # Errors /// # Errors