Skip to content

Commit

Permalink
Merge pull request #2297 from jqnatividad/2288-dataset-level-stats
Browse files Browse the repository at this point in the history
feat: add dataset-level stats
  • Loading branch information
jqnatividad authored Nov 19, 2024
2 parents bb9bd8f + 672796e commit cc49c38
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 50 deletions.
98 changes: 93 additions & 5 deletions src/cmd/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The following additional "non-streaming" statistics require loading the entire f
cardinality, mode/antimode, median, MAD, quartiles and its related measures (IQR,
lower/upper fences & skewness).
When computing non-streaming statistics, an Out-Of-Memory (OOM) heuristic check is done.
When computing "non-streaming" statistics, an Out-Of-Memory (OOM) heuristic check is done.
If the file is larger than the available memory minus a headroom buffer of 20% (which can be
adjusted using the QSV_FREEMEMORY_HEADROOM_PCT environment variable), processing will be
preemptively prevented.
Expand Down Expand Up @@ -242,7 +242,9 @@ with ~500 tests.

use std::{
default::Default,
fmt, fs, io,
fmt, fs,
hash::BuildHasher,
io,
io::Write,
iter::repeat,
path::{Path, PathBuf},
Expand Down Expand Up @@ -769,6 +771,10 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
}?;

let stats_sr_vec = args.stats_to_records(stats);
let mut work_br;

// vec we use to compute dataset-level fingerprint hash
let mut stats_br_vec: Vec<csv::ByteRecord> = Vec::with_capacity(stats_sr_vec.len());

let stats_headers_sr = args.stat_headers();
wtr.write_record(&stats_headers_sr)?;
Expand All @@ -780,10 +786,85 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
header.to_vec()
};
let stat = stat.iter().map(str::as_bytes);
wtr.write_record(vec![&*header].into_iter().chain(stat))?;
work_br = vec![&*header]
.into_iter()
.chain(stat)
.collect::<csv::ByteRecord>();
wtr.write_record(&work_br)?;
stats_br_vec.push(work_br);
}

// update the stats args json metadata
// Add dataset-level stats as additional rows ====================
let num_stats_fields = stats_headers_sr.len();
let mut dataset_stats_br = csv::ByteRecord::with_capacity(128, num_stats_fields);

// Helper closure to write a dataset stat row
let mut write_dataset_stat = |name: &[u8], value: &[u8]| -> CliResult<()> {
dataset_stats_br.clear();
dataset_stats_br.push_field(name);
// Fill middle columns with empty strings
for _ in 2..num_stats_fields {
dataset_stats_br.push_field(b"");
}
// write qsv__value as last column
dataset_stats_br.push_field(value);
wtr.write_byte_record(&dataset_stats_br)
.map_err(std::convert::Into::into)
};

// Write qsv__rowcount
let ds_record_count = itoa::Buffer::new()
.format(*record_count)
.as_bytes()
.to_vec();
write_dataset_stat(b"qsv__rowcount", &ds_record_count)?;

// Write qsv__columncount
let ds_column_count = itoa::Buffer::new()
.format(headers.len())
.as_bytes()
.to_vec();
write_dataset_stat(b"qsv__columncount", &ds_column_count)?;

// Write qsv__filesize_bytes
let ds_filesize_bytes = itoa::Buffer::new()
.format(fs::metadata(&path)?.len())
.as_bytes()
.to_vec();
write_dataset_stat(b"qsv__filesize_bytes", &ds_filesize_bytes)?;

// Compute hash of stats for data fingerprinting
let stats_hash = {
#[allow(deprecated)]
// we use "deprecated" SipHasher explicitly instead of DefaultHasher,
// even though, it is the current DefaultHasher since Rust 1.7.0
// as we want the hash to be deterministic and stable across Rust versions
// DefaultHasher may change in future Rust versions
let mut hasher =
std::hash::BuildHasherDefault::<std::hash::SipHasher>::default().build_hasher();

// Hash the first 20 columns of each stats record
// we only do the first 20 stats columns to compute the hash as those
// columns are always the same, even if other stats --options are used
for record in &stats_br_vec {
for field in record.iter().take(20) {
std::hash::Hash::hash(field, &mut hasher);
}
}

// Include dataset-level stats in hash
for stat in [&ds_record_count, &ds_column_count, &ds_filesize_bytes] {
std::hash::Hash::hash(stat, &mut hasher);
}

std::hash::Hasher::finish(&hasher)
};

// Write qsv__fingerprint_hash dataset
let hash_bytes = itoa::Buffer::new().format(stats_hash).as_bytes().to_vec();
write_dataset_stat(b"qsv__fingerprint_hash", &hash_bytes)?;

// update the stats args json metadata ===============
current_stats_args.compute_duration_ms = start_time.elapsed().as_millis() as u64;

if create_cache
Expand Down Expand Up @@ -891,7 +972,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
// save the stats data to "<FILESTEM>.stats.csv.data.jsonl"
if write_stats_jsonl {
stats_pathbuf.set_extension("data.jsonl");
util::csv_to_jsonl(&currstats_filename, &get_stats_data_types(), stats_pathbuf)?;
util::csv_to_jsonl(&currstats_filename, &get_stats_data_types(), &stats_pathbuf)?;
}
}
}
Expand Down Expand Up @@ -1144,6 +1225,10 @@ impl Args {
"antimode_occurrences",
]);
}

// we add the qsv__value field at the end for dataset-level stats
fields.push("qsv__value");

csv::StringRecord::from(fields)
}
}
Expand Down Expand Up @@ -1791,6 +1876,9 @@ impl Stats {
// append it here to preserve legacy ordering of columns
pieces.extend_from_slice(&mc_pieces);

// add an empty field for qsv__value
pieces.push(empty());

csv::StringRecord::from(pieces)
}
}
Expand Down
86 changes: 45 additions & 41 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1921,6 +1921,8 @@ pub fn get_stats_records(
args: &SchemaArgs,
mode: StatsMode,
) -> CliResult<(ByteRecord, Vec<StatsData>)> {
const DATASET_STATS_PREFIX: &str = r#"{"field":"qsv__"#;

if mode == StatsMode::None
|| args.arg_input.is_none()
|| args.arg_input.as_ref() == Some(&"-".to_string())
Expand All @@ -1930,13 +1932,13 @@ pub fn get_stats_records(
return Ok((ByteRecord::new(), Vec::new()));
};

let canonical_input_path = Path::new(&args.arg_input.clone().unwrap()).canonicalize()?;
let canonical_input_path = Path::new(args.arg_input.as_ref().unwrap()).canonicalize()?;
let statsdata_path = canonical_input_path.with_extension("stats.csv.data.jsonl");

let stats_data_current = if statsdata_path.exists() {
let statsdata_metadata = std::fs::metadata(&statsdata_path)?;

let input_metadata = std::fs::metadata(args.arg_input.clone().unwrap())?;
let input_metadata = std::fs::metadata(args.arg_input.as_ref().unwrap())?;

let statsdata_mtime = FileTime::from_last_modification_time(&statsdata_metadata);
let input_mtime = FileTime::from_last_modification_time(&input_metadata);
Expand All @@ -1958,28 +1960,39 @@ pub fn get_stats_records(
return Ok((ByteRecord::new(), Vec::new()));
}

// get the headers from the input file
let mut rdr = csv::Reader::from_path(args.arg_input.as_ref().ok_or("No input provided")?)?;
let csv_fields = rdr.byte_headers()?.clone();
drop(rdr);

let mut stats_data_loaded = false;
let mut csv_stats: Vec<StatsData> = Vec::new();
let mut csv_stats: Vec<StatsData> = Vec::with_capacity(csv_fields.len());

// if stats_data file exists and is current, use it
if stats_data_current && !args.flag_force {
let statsdata_file = std::fs::File::open(&statsdata_path)?;
let statsdata_reader = std::io::BufReader::new(statsdata_file);
let statsdata_lines = statsdata_reader.lines();

let mut line: String;
for curr_line in statsdata_lines {
line = curr_line?;
let stats_record: StatsData = serde_json::from_str(&line)?;
csv_stats.push(stats_record);
let statsdatajson_rdr =
BufReader::with_capacity(DEFAULT_RDR_BUFFER_CAPACITY, File::open(statsdata_path)?);

let mut curr_line: String;
let mut s_slice: Vec<u8>;
for line in statsdatajson_rdr.lines() {
curr_line = line?;
if curr_line.starts_with(DATASET_STATS_PREFIX) {
break;
}
s_slice = curr_line.as_bytes().to_vec();
match simd_json::serde::from_slice(&mut **&mut s_slice) {
Ok(stats) => csv_stats.push(stats),
Err(_) => continue,
}
}
stats_data_loaded = true;
stats_data_loaded = !csv_stats.is_empty();
}

// otherwise, run stats command to generate stats.csv.data.jsonl file
if !stats_data_loaded {
let stats_args = crate::cmd::stats::Args {
arg_input: args.arg_input.clone(),
arg_input: args.arg_input.as_ref().map(String::from),
flag_select: crate::select::SelectColumns::parse("").unwrap(),
flag_everything: false,
flag_typesonly: false,
Expand Down Expand Up @@ -2010,13 +2023,10 @@ pub fn get_stats_records(
.unwrap();
let tempfile_path = tempfile.path().to_str().unwrap().to_string();

let statsdatajson_path = canonical_input_path.with_extension("stats.csv.data.jsonl");
let statsdatajson_path = &canonical_input_path.with_extension("stats.csv.data.jsonl");

let input = stats_args.arg_input.unwrap_or_else(|| "-".to_string());

let input = if let Some(arg_input) = stats_args.arg_input {
arg_input
} else {
"-".to_string()
};
// we do rustfmt::skip here as it was breaking the stats cmdline along strange
// boundaries, causing CI errors.
// This is because we're using tab characters (/t) to separate args to fix #2294,
Expand All @@ -2041,8 +2051,7 @@ pub fn get_stats_records(
// StatsMode::FrequencyForceStats
// we're doing frequency, so we need cardinality from a --forced stats run
format!(
"stats\t{input}\t--cardinality\t--stats-jsonl\t--force\
\t--output\t{tempfile_path}"
"stats\t{input}\t--cardinality\t--stats-jsonl\t--force\t--output\t{tempfile_path}"
)
},
#[cfg(feature = "polars")]
Expand Down Expand Up @@ -2103,31 +2112,26 @@ pub fn get_stats_records(
}

// create a statsdatajon from the output of the stats command
csv_to_jsonl(
&tempfile_path,
&get_stats_data_types(),
statsdatajson_path.clone(),
)?;

let statsdatajson_rdr = BufReader::with_capacity(
DEFAULT_RDR_BUFFER_CAPACITY * 2,
File::open(statsdatajson_path)?,
);
csv_to_jsonl(&tempfile_path, &get_stats_data_types(), &statsdatajson_path)?;

let statsdatajson_rdr =
BufReader::with_capacity(DEFAULT_RDR_BUFFER_CAPACITY, File::open(statsdatajson_path)?);

let mut statsrecord: StatsData;
let mut curr_line: String;
let mut s_slice: Vec<u8>;
for line in statsdatajson_rdr.lines() {
curr_line = line?;
statsrecord = serde_json::from_str(&curr_line)?;
csv_stats.push(statsrecord);
if curr_line.starts_with(DATASET_STATS_PREFIX) {
break;
}
s_slice = curr_line.as_bytes().to_vec();
match simd_json::serde::from_slice(&mut **&mut s_slice) {
Ok(stats) => csv_stats.push(stats),
Err(_) => continue,
}
}
};

// get the headers from the input file
let mut rdr = csv::Reader::from_path(args.arg_input.clone().unwrap()).unwrap();
let csv_fields = rdr.byte_headers()?.clone();
drop(rdr);

Ok((csv_fields, csv_stats))
}

Expand All @@ -2136,7 +2140,7 @@ pub fn get_stats_records(
pub fn csv_to_jsonl(
input_csv: &str,
csv_types: &[JsonTypes],
output_jsonl: PathBuf,
output_jsonl: &PathBuf,
) -> CliResult<()> {
let file = File::open(input_csv)?;
let mut rdr = csv::ReaderBuilder::new()
Expand Down
2 changes: 2 additions & 0 deletions tests/test_frequency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,8 @@ fn frequency_all_unique_force_stats_cache() {
.args(["--stats-mode", "force"])
.arg(testdata);

wrk.assert_success(&mut cmd);

let got: Vec<Vec<String>> = wrk.read_stdout(&mut cmd);
let expected = vec![
svec!["field", "value", "count", "percentage"],
Expand Down
Loading

0 comments on commit cc49c38

Please sign in to comment.