Skip to content

Commit

Permalink
Add support for zst archives and improve logging
Browse files Browse the repository at this point in the history
  • Loading branch information
aldahick committed Sep 23, 2023
1 parent dbe733a commit 38639b4
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "jsonfilter"
version = "0.1.0"
version = "0.2.0"
edition = "2021"

[dependencies]
Expand Down
27 changes: 22 additions & 5 deletions src/io.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
use std::{
fs::File,
io::{BufRead, BufReader, BufWriter, Error},
io::{copy, BufRead, BufReader, BufWriter, Result},
path::Path,
};

pub fn get_size<P>(path: P) -> Result<u64, Error>
use indicatif::ProgressBar;
use zstd::Decoder;

pub fn get_size<P>(path: P) -> Result<u64>
where
P: AsRef<Path>,
{
let file = File::open(path)?;
Ok(file.metadata()?.len())
}

pub fn write_lines<P>(filename: P) -> Result<BufWriter<File>, Error>
pub fn write_lines<P>(filename: P) -> Result<BufWriter<File>>
where
P: AsRef<Path>,
{
Expand All @@ -35,7 +38,7 @@ impl<B: BufRead> BufLines<B> {
}

impl<B: BufRead> Iterator for BufLines<B> {
type Item = Result<Vec<u8>, Error>;
type Item = Result<Vec<u8>>;

fn next(&mut self) -> Option<Self::Item> {
self.chunk.clear();
Expand All @@ -51,10 +54,24 @@ impl<B: BufRead> Iterator for BufLines<B> {
}
}

pub fn read_lines_buf<P>(path: P) -> Result<BufLines<BufReader<File>>, Error>
pub fn read_lines_buf<P>(path: P) -> Result<BufLines<BufReader<File>>>
where
P: AsRef<Path>,
{
let file = File::open(path)?;
Ok(BufLines::new(BufReader::new(file)))
}

pub fn extract_archive<P>(from: P, to: P, progress_bar: &ProgressBar) -> Result<u64>
where
P: AsRef<Path>,
{
let archive_file = File::open(from)?;
let extract_file = File::create(to)?;
let mut decoder = Decoder::new(archive_file)?;
decoder.window_log_max(31)?;
let mut writer = BufWriter::new(&extract_file);
let mut reader = progress_bar.wrap_read(decoder);
copy(&mut reader, &mut writer)?;
Ok(extract_file.metadata()?.len())
}
37 changes: 31 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ use std::{error::Error, io::Write};
mod io;
mod progress;

const GIGABYTE: u64 = u64::pow(1024, 3);
const MEGABYTE: u64 = u64::pow(1024, 2);

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Optionally provide a zst archive to unpack.
/// If provided, input is the path within the archive.
/// If archive is provided, it is extracted to the input path.
#[arg(short, long, default_value = "")]
archive: String,

Expand Down Expand Up @@ -40,16 +43,30 @@ fn is_filtered(row: &BorrowedValue, key: &str, filter: &str) -> Option<bool> {
Some(value.as_str()? == filter)
}

fn main() -> Result<(), Box<dyn Error>> {
let args = Args::parse();
let total_size = io::get_size(&args.input)?;
fn extract_archive(args: &Args) -> Result<(), Box<dyn Error>> {
let total_size = io::get_size(&args.archive)?;
println!("Extracting {} GB to {}", total_size / GIGABYTE, &args.input);
let progress = create_progress_bar(total_size)?;
let extracted_size = io::extract_archive(&args.archive, &args.input, &progress)?;
println!(
"Finished extracting {} GB into {} GB in {} seconds",
total_size / GIGABYTE,
extracted_size / GIGABYTE,
progress.elapsed().as_secs()
);
Ok(())
}

fn write_filtered_rows(args: &Args) -> Result<(), Box<dyn Error>> {
let total_size = io::get_size(&args.input)?;
let mut total_read: u64 = 0;
let mut total_wrote: u64 = 0;
let mut writer = io::write_lines(&args.output)?;
let lines = io::read_lines_buf(&args.input)?;
let key = args.key.as_str();
let filter = args.filter.as_str();
println!("Filtering {} GB to {}", total_size / GIGABYTE, &args.output);
let progress = create_progress_bar(total_size)?;
for line_result in lines {
let line = line_result?;
let line_len = line.len() as u64 + 1;
Expand All @@ -64,9 +81,17 @@ fn main() -> Result<(), Box<dyn Error>> {
}
println!(
"Finished filtering {} GB into {} MB in {} seconds",
total_size,
total_wrote,
total_size / GIGABYTE,
total_wrote / MEGABYTE,
progress.elapsed().as_secs()
);
Ok(())
}

fn main() -> Result<(), Box<dyn Error>> {
let args = Args::parse();
if !args.archive.is_empty() {
extract_archive(&args)?;
}
write_filtered_rows(&args)
}

0 comments on commit 38639b4

Please sign in to comment.