Skip to content

Commit

Permalink
Merge pull request #2518 from dathere/2493-smart-diff
Browse files Browse the repository at this point in the history
feat: "smart" diff
  • Loading branch information
jqnatividad authored Feb 12, 2025
2 parents 606306f + 03b01df commit 5041318
Show file tree
Hide file tree
Showing 12 changed files with 328 additions and 108 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ mlua = { version = "0.10", features = [
], optional = true }
num_cpus = "1"
odht = "0.3"
phf = { version = "0.11", features = ["macros"], optional = true }
phf = { version = "0.11", features = ["macros"]}
polars = { version = "0.46", features = [
"asof_join",
"avro",
Expand Down Expand Up @@ -396,7 +396,6 @@ geocode = [
"cached",
"geosuggest-core",
"geosuggest-utils",
"phf",
"sled",
]
luau = ["mlua", "sanitize-filename"]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
| [datefmt](/src/cmd/datefmt.rs#L2)<br>📇🚀👆 | Formats recognized date fields ([19 formats recognized](https://docs.rs/qsv-dateparser/latest/qsv_dateparser/#accepted-date-formats)) to a specified date format using [strftime date format specifiers](https://docs.rs/chrono/latest/chrono/format/strftime/). |
| [dedup](/src/cmd/dedup.rs#L2)<br>🤯🚀👆 | Remove duplicate rows (See also `extdedup`, `extsort`, `sort` & `sortcheck` commands). |
| [describegpt](/src/cmd/describegpt.rs#L2)<br>🌐🤖🪄 | Infer extended metadata about a CSV using a GPT model from [OpenAI's API](https://platform.openai.com/docs/introduction) or an LLM from another API compatible with the OpenAI API specification such as [Ollama](https://ollama.com) or [Jan](https://jan.ai). |
| [diff](/src/cmd/diff.rs#L2)<br>🚀 | Find the difference between two CSVs with ludicrous speed!<br/>e.g. _compare two CSVs with 1M rows x 9 columns in under 600ms!_ |
| [diff](/src/cmd/diff.rs#L2)<br>🚀🪄 | Find the difference between two CSVs with ludicrous speed!<br/>e.g. _compare two CSVs with 1M rows x 9 columns in under 600ms!_ |
| [edit](/src/cmd/edit.rs#L2) | Replace the value of a cell specified by its row and column. |
| [enum](/src/cmd/enumerate.rs#L2)<br>👆 | Add a new column enumerating rows by adding a column of incremental or uuid identifiers. Can also be used to copy a column or fill a new column with a constant value. |
| [excel](/src/cmd/excel.rs#L2)<br>🚀 | Exports a specified Excel/ODS sheet to a CSV file. |
Expand Down
154 changes: 151 additions & 3 deletions src/cmd/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ Find the difference between two CSVs with ludicrous speed.
NOTE: diff does not support stdin. A file path is required for both arguments.
Further, PRIMARY KEY VALUES MUST BE UNIQUE WITHIN EACH CSV.
Otherwise, diff will produce an incorrect result.
When diffing CSVs with just a single --key column and a stats cache is
available, diff will automatically validate for primary key uniqueness.
If more than one --key column is specified, however, this auto-validation
is not done.
To check if a CSV has unique primary key values, use `qsv extdedup`
with the same key columns using the `--select` option:
Expand Down Expand Up @@ -76,7 +79,7 @@ diff options:
--delimiter-output <arg> The field delimiter for writing the CSV diff result.
Must be a single character. (default: ,)
-k, --key <arg...> The column indices that uniquely identify a record
as a comma separated list of indices, e.g. 0,1,2
as a comma separated list of 0-based indices, e.g. 0,1,2
or column names, e.g. name,age.
Note that when selecting columns by name, only the
left CSV's headers are used to match the column names
Expand All @@ -101,6 +104,12 @@ diff options:
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the number
of CPUs detected.
--force Force diff and ignore stats caches for the left & right CSVs.
Otherwise, if available, the stats cache will be used to:
* short-circuit the diff if their fingerprint hashes are
identical.
* check for primary key uniqueness when only one --key
column is specified.
Common options:
-h, --help Display this message
Expand All @@ -123,7 +132,10 @@ use super::rename::rename_headers_all_generic;
use crate::{
clitypes::CliError,
config::{Config, Delimiter},
util, CliResult,
select::SelectColumns,
util,
util::{get_stats_records, SchemaArgs, StatsMode},
CliResult,
};

#[derive(Deserialize)]
Expand All @@ -132,6 +144,7 @@ struct Args {
arg_input_right: Option<String>,
flag_output: Option<String>,
flag_jobs: Option<usize>,
flag_force: bool,
flag_no_headers_left: bool,
flag_no_headers_right: bool,
flag_no_headers_output: bool,
Expand All @@ -147,6 +160,139 @@ struct Args {
pub fn run(argv: &[&str]) -> CliResult<()> {
let mut args: Args = util::get_args(USAGE, argv)?;

// ===== VALIDATION CHECKS =====

// If --force is not set and
// neither --no-headers-left nor --no-headers-right are set and
// the key is a just single column (or unspecified),
// perform validation checks on the input files using the stats cache.
if !args.flag_force
&& (!args.flag_no_headers_left && !args.flag_no_headers_right)
&& args
.flag_key
.as_ref()
.is_none_or(|k| k.split(',').count() == 0)
{
// ---- STATS CACHE VALIDATION CHECKS ----

// Get stats for left file
let left_schema_args = SchemaArgs {
arg_input: args.arg_input_left.clone(),
flag_no_headers: false,
flag_delimiter: args.flag_delimiter,
flag_jobs: None,
flag_memcheck: false,
flag_force: args.flag_force,
flag_prefer_dmy: false,
flag_dates_whitelist: String::new(),
flag_enum_threshold: 0,
flag_ignore_case: false,
flag_strict_dates: false,
flag_pattern_columns: SelectColumns::parse("")?,
flag_stdout: false,
};

// Get stats for right file using same args
let right_schema_args = SchemaArgs {
arg_input: args.arg_input_right.clone(),
..left_schema_args.clone()
};

// Get stats records for both files
if let (
Ok((left_csv_fields, left_stats, left_dataset_stats)),
Ok((_, right_stats, right_dataset_stats)),
) = (
get_stats_records(&left_schema_args, StatsMode::FrequencyForceStats),
get_stats_records(&right_schema_args, StatsMode::FrequencyForceStats),
) {
// If both files fingerprint hashes match, files are identical short-circuit diff
if left_dataset_stats.get("qsv__fingerprint_hash")
== right_dataset_stats.get("qsv__fingerprint_hash")
{
return Ok(());
}

// Check if row counts match
let left_dataset_rowcount = left_dataset_stats
.get("qsv__rowcount")
.unwrap()
.parse::<f64>()
.unwrap_or_default() as u64;
let right_dataset_rowcount = right_dataset_stats
.get("qsv__rowcount")
.unwrap()
.parse::<f64>()
.unwrap_or_default() as u64;

if left_dataset_rowcount != right_dataset_rowcount {
return fail_incorrectusage_clierror!(
"The number of rows in the left ({left_dataset_rowcount}) and right \
({right_dataset_rowcount}) CSVs do not match."
);
}

// If key column specified, check if it has all unique values in both files
let mut colname_used_for_key = false;
if let Some(key_col) = &args.flag_key {
let idx = if key_col.chars().all(char::is_numeric) {
key_col
.parse::<usize>()
.map_err(|err| CliError::Other(err.to_string()))?
} else {
// Handle column name case...
colname_used_for_key = true;
left_csv_fields
.iter()
.position(|field| field == key_col.as_bytes())
.unwrap_or_default()
};

// Check cardinality equals row count for key column in left file
if let Some(left_col) = left_stats.get(idx) {
if left_col.cardinality != left_dataset_rowcount {
return fail_incorrectusage_clierror!(
"Primary key values in left CSV are not unique in column {colname} \
(cardinality: {left_cardinality} != rowcount: {left_rowcount}). Use \
`qsv extdedup --select {colname} {left_input} --no-output` to check \
duplicates.",
colname = if colname_used_for_key {
key_col.to_string()
} else {
idx.to_string()
},
left_cardinality = left_col.cardinality,
left_rowcount = left_dataset_rowcount,
left_input = args.arg_input_left.as_ref().unwrap()
);
}
}

// Check cardinality equals row count for key column in right file
if let Some(right_col) = right_stats.get(idx) {
if right_col.cardinality != right_dataset_rowcount {
return fail_incorrectusage_clierror!(
"Primary key values in right CSV are not unique in column {colname} \
(cardinality: {right_cardinality} != rowcount: {right_rowcount}). \
Use `qsv extdedup --select {colname} {right_input} --no-output` to \
check duplicates.",
colname = if colname_used_for_key {
key_col.to_string()
} else {
idx.to_string()
},
right_cardinality = right_col.cardinality,
right_rowcount = right_dataset_rowcount,
right_input = args.arg_input_right.as_ref().unwrap()
);
}
}
}
}
}

// ---- SETUP and OTHER VALIDATION CHECKS ----

if let Some(delim) = args.flag_delimiter {
[
args.flag_delimiter_left,
Expand Down Expand Up @@ -241,6 +387,8 @@ pub fn run(argv: &[&str]) -> CliResult<()> {

util::njobs(args.flag_jobs);

// ===== DIFF PROCESSING =====

let Ok(csv_diff) = CsvByteDiffBuilder::new()
.primary_key_columns(primary_key_cols.clone())
.build()
Expand Down
13 changes: 10 additions & 3 deletions src/cmd/frequency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ impl Args {
"_schema" => StatsMode::Schema, // only meant for internal use by schema command
_ => return fail_incorrectusage_clierror!("Invalid stats mode"),
};
let (csv_fields, csv_stats) = get_stats_records(&schema_args, stats_mode)?;
let (csv_fields, csv_stats, dataset_stats) = get_stats_records(&schema_args, stats_mode)?;

if stats_mode == StatsMode::None || stats_mode == StatsMode::Schema || csv_fields.is_empty()
{
Expand All @@ -552,8 +552,11 @@ impl Args {
// in the following hot iterator loop
assert!(
csv_fields.len() == csv_stats.len(),
"Mismatch between the number of fields and stats records"
"Mismatch between the number of fields: {} and stats records: {}",
csv_fields.len(),
csv_stats.len()
);

let col_cardinality_vec: Vec<(String, u64)> = csv_stats
.iter()
.enumerate()
Expand All @@ -571,7 +574,11 @@ impl Args {
.collect();

// now, get the unique headers, where cardinality == rowcount
let row_count = util::count_rows(&self.rconfig())?;
let row_count = if let Some(row_count) = dataset_stats.get("qsv__rowcount") {
row_count.parse::<u64>().unwrap()
} else {
util::count_rows(&self.rconfig())?
};
FREQ_ROW_COUNT.set(row_count as u64).unwrap();

// Most datasets have relatively few columns with all unique values (e.g. ID columns)
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/joinp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ impl Args {
flag_memcheck: false,
};

let (csv_fields, csv_stats) =
let (csv_fields, csv_stats, _) =
get_stats_records(&schema_args, util::StatsMode::PolarsSchema)?;

let mut schema = Schema::with_capacity(csv_stats.len());
Expand Down
21 changes: 13 additions & 8 deletions src/cmd/pivotp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Common options:
-q, --quiet Do not return smart aggregation chosen nor pivot result shape to stderr.
"#;

use std::{fs::File, io, io::Write, path::Path, sync::OnceLock};
use std::{collections::HashMap, fs::File, io, io::Write, path::Path, sync::OnceLock};

use csv::ByteRecord;
use indicatif::HumanCount;
Expand All @@ -78,7 +78,8 @@ use crate::{
CliResult,
};

static STATS_RECORDS: OnceLock<(ByteRecord, Vec<StatsData>)> = OnceLock::new();
static STATS_RECORDS: OnceLock<(ByteRecord, Vec<StatsData>, HashMap<String, String>)> =
OnceLock::new();

#[derive(Deserialize)]
struct Args {
Expand Down Expand Up @@ -128,9 +129,10 @@ fn calculate_pivot_metadata(
flag_memcheck: false,
};

let (csv_fields, csv_stats) = STATS_RECORDS.get_or_init(|| {
#[allow(unused_variables)]
let (csv_fields, csv_stats, dataset_stats) = STATS_RECORDS.get_or_init(|| {
get_stats_records(&schema_args, StatsMode::FrequencyForceStats)
.unwrap_or_else(|_| (ByteRecord::new(), Vec::new()))
.unwrap_or_else(|_| (ByteRecord::new(), Vec::new(), HashMap::new()))
});

if csv_stats.is_empty() {
Expand Down Expand Up @@ -231,14 +233,17 @@ fn suggest_agg_function(
flag_memcheck: false,
};

let (csv_fields, csv_stats) = STATS_RECORDS.get_or_init(|| {
let (csv_fields, csv_stats, dataset_stats) = STATS_RECORDS.get_or_init(|| {
get_stats_records(&schema_args, StatsMode::FrequencyForceStats)
.unwrap_or_else(|_| (ByteRecord::new(), Vec::new()))
.unwrap_or_else(|_| (ByteRecord::new(), Vec::new(), HashMap::new()))
});

let rconfig = Config::new(Some(&args.arg_input));
let row_count = util::count_rows(&rconfig)? as u64;
// eprintln!("row_count: {}\nstats: {:#?}", row_count, csv_stats);
let row_count = if let Some(row_count) = dataset_stats.get("qsv__rowcount") {
row_count.parse::<u64>().unwrap()
} else {
util::count_rows(&rconfig)?
};

// Analyze pivot column characteristics
let mut high_cardinality_pivot = false;
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ pub fn infer_schema_from_stats(
quiet: bool,
) -> CliResult<Map<String, Value>> {
// invoke cmd::stats
let (csv_fields, csv_stats) = util::get_stats_records(args, StatsMode::Schema)?;
let (csv_fields, csv_stats, _) = util::get_stats_records(args, StatsMode::Schema)?;

// amortize memory allocation
let mut low_cardinality_column_indices: Vec<u64> =
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/sqlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
arg_input: Some(table.to_string_lossy().into_owned()),
flag_memcheck: false,
};
let (csv_fields, csv_stats) =
let (csv_fields, csv_stats, _) =
get_stats_records(&schema_args, util::StatsMode::PolarsSchema)?;

let mut schema = Schema::with_capacity(csv_stats.len());
Expand Down
Loading

0 comments on commit 5041318

Please sign in to comment.