Fix parallel archive creation, add archive streaming functions

This commit is contained in:
Nathan Fisher 2023-07-05 17:28:29 -04:00
parent 797aa16a0d
commit 9ed4d503d1

View File

@ -1,7 +1,10 @@
#![doc = include_str!("../README.md")] #![doc = include_str!("../README.md")]
use std::{collections::HashMap, fs, io::{BufWriter, Write}, sync::Mutex};
#[cfg(feature = "parallel")] #[cfg(feature = "parallel")]
use rayon::prelude::{IntoParallelRefIterator, ParallelIterator}; use {
use std::{collections::HashMap, fs, io::BufWriter, sync::Mutex}; rayon::prelude::{IntoParallelRefIterator, ParallelIterator},
std::ops::DerefMut,
};
mod checksum; mod checksum;
mod error; mod error;
@ -22,39 +25,55 @@ pub use {
stream::Stream, stream::Stream,
}; };
/// Creates a haggis archive from a list of files
pub fn create_archive(path: &str, files: Vec<&str>, algorithm: Algorithm) -> Result<(), Error> { pub fn create_archive(path: &str, files: Vec<&str>, algorithm: Algorithm) -> Result<(), Error> {
let fd = fs::OpenOptions::new() let fd = fs::OpenOptions::new()
.create(true) .create(true)
.truncate(true) .truncate(true)
.open(path)?; .open(path)?;
let mut writer = BufWriter::new(fd); let mut writer = BufWriter::new(fd);
stream_archive(&mut writer, files, algorithm)?;
Ok(())
}
/// Streams a haggis archive over something which implements `Write`
pub fn stream_archive<W: Write>(
mut writer: W,
files: Vec<&str>,
algorithm: Algorithm,
) -> Result<(), Error> {
let links = Mutex::new(HashMap::new()); let links = Mutex::new(HashMap::new());
for f in files.iter() { for f in &files {
let node = Node::from_path(f, algorithm, &links)?; let node = Node::from_path(f, algorithm, &links)?;
node.write(&mut writer)?; node.write(&mut writer)?;
} }
Ok(()) Ok(())
} }
/// Creates a Haggis archive from a list of files, processing each file in parallel
#[cfg(feature = "parallel")] #[cfg(feature = "parallel")]
pub fn par_create_archive( pub fn par_create_archive(path: &str, files: Vec<&str>, algorithm: Algorithm) -> Result<(), Error> {
path: &str, let fd = fs::File::create(path)?;
files: Vec<String>, let writer = BufWriter::new(fd);
par_stream_archive(writer, files, algorithm)?;
Ok(())
}
/// Streams a Haggis archive from a list of files, processing each file in parallel
#[cfg(feature = "parallel")]
pub fn par_stream_archive<W: Write + Send>(
writer: W,
files: Vec<&str>,
algorithm: Algorithm, algorithm: Algorithm,
) -> Result<(), Error> { ) -> Result<(), Error> {
let links = Mutex::new(HashMap::<u64, String>::new()); let links = Mutex::new(HashMap::<u64, String>::new());
{ let writer = Mutex::new(writer);
let _fd = fs::File::create(path)?;
}
files.par_iter().try_for_each(|f| { files.par_iter().try_for_each(|f| {
let node = Node::from_path(f, algorithm, &links)?; let node = Node::from_path(f, algorithm, &links)?;
let fd = fs::OpenOptions::new() if let Ok(mut writer) = writer.lock() {
.create(false) let mut writer = writer.deref_mut();
.truncate(false) node.write(&mut writer)?;
.append(true) }
.open(path)?;
let mut writer = BufWriter::new(fd);
node.write(&mut writer)?;
Ok::<(), Error>(()) Ok::<(), Error>(())
})?; })?;
Ok(()) Ok(())