Skip to content

Commit

Permalink
Merge pull request #1656 from jqnatividad/1655-polars-count
Browse files Browse the repository at this point in the history
`count`: use Polars multithreaded, mem-mapped CSV reader when `polars` feature is enabled
  • Loading branch information
jqnatividad authored Mar 10, 2024
2 parents 3e0d84d + 59b3bf2 commit 425996a
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ polars = { version = "0.38", features = [
"performant",
"cse",
"avro",
"csv",
], optional = true }
pyo3 = { version = "0.20", features = ["auto-initialize"], optional = true }
qsv-dateparser = "0.11"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
| <a name="applydp_deeplink"></a>[applydp](/src/cmd/applydp.rs#L2)<br>🚀🔣 ![CKAN](docs/images/ckan.png)| applydp is a slimmed-down version of `apply` with only [Datapusher+](https://github.com/dathere/datapusher-plus) relevant subcommands/operations (`qsvdp` binary variant only). |
| [behead](/src/cmd/behead.rs#L2) | Drop headers from a CSV. |
| [cat](/src/cmd/cat.rs#L2)<br>🗄️ | Concatenate CSV files by row or by column. |
| [count](/src/cmd/count.rs#L2)<br>📇🏎️ | Count the rows in a CSV file. (13.98 seconds for a 15gb, 27m row NYC 311 dataset without an index. Instantaneous with an index.) |
| [count](/src/cmd/count.rs#L2)<br>📇🏎️🐻‍❄️ | Count the rows in a CSV file. (13.98 seconds for a 15gb, 27m row NYC 311 dataset without an index. Instantaneous with an index.) If the `polars` feature is enabled, uses Polars' multithreaded, mem-mapped CSV reader for near instant counts even without an index |
| [datefmt](/src/cmd/datefmt.rs#L2)<br>🚀 | Formats recognized date fields ([19 formats recognized](https://github.com/jqnatividad/belt/tree/main/dateparser#accepted-date-formats)) to a specified date format using [strftime date format specifiers](https://docs.rs/chrono/latest/chrono/format/strftime/). |
| [dedup](/src/cmd/dedup.rs#L2)<br>🤯🚀 | Remove duplicate rows (See also `extdedup`, `extsort`, `sort` & `sortcheck` commands). |
| [describegpt](/src/cmd/describegpt.rs#L2)<br>🌐🤖 | Infer extended metadata about a CSV using a GPT model from [OpenAI's API](https://platform.openai.com/docs/introduction). |
Expand Down
80 changes: 79 additions & 1 deletion src/cmd/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ count options:
--width Also return the length of the longest record.
The count and width are separated by a semicolon.
WHEN THE POLARS FEATURE IS ENABLED:
--no-polars Use the regular single-threaded, streaming CSV reader instead of
the much faster Polars multi-threaded, mem-mapped CSV reader.
Use this when you encounter issues when counting with the
Polars CSV reader. The regular reader is slower but can read any
valid CSV file of any size.
--low-memory Use the Polars CSV Reader's low-memory mode. This
mode is slower but uses less memory.
Common options:
-h, --help Display this message
-f, --flexible Do not validate if the CSV has different number of
Expand All @@ -34,6 +44,8 @@ struct Args {
arg_input: Option<String>,
flag_human_readable: bool,
flag_width: bool,
flag_no_polars: bool,
flag_low_memory: bool,
flag_flexible: bool,
flag_no_headers: bool,
}
Expand Down Expand Up @@ -66,7 +78,22 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
info!("index used");
(idx.count(), 0)
},
None => count_input(&conf, args.flag_width)?,
None => {
#[cfg(feature = "polars")]
{
// if --no-polars or --width is set or its a snappy compressed file, use the
// regular CSV reader
if args.flag_no_polars || args.flag_width || conf.is_snappy() {
count_input(&conf, args.flag_width)?
} else {
polars_count_input(&conf, args.flag_low_memory)?
}
}
#[cfg(not(feature = "polars"))]
{
count_input(&conf, args.flag_width)?
}
},
}
};

Expand Down Expand Up @@ -126,3 +153,54 @@ fn count_input(
// which we also want to count when returning width
Ok((count, max_width + record_numdelimiters))
}

#[cfg(feature = "polars")]
fn polars_count_input(
conf: &Config,
low_memory: bool,
) -> Result<(u64, usize), crate::clitypes::CliError> {
use polars::prelude::*;

log::info!("using polars");
let mut comment_char = String::new();
let temp_char;

let comment_prefix = if let Some(c) = conf.comment {
comment_char.push(c as char);
temp_char = comment_char.to_string();
Some(temp_char.as_str())
} else {
None
};

let df = if conf.is_stdin() {
let mut temp_file = tempfile::Builder::new().suffix(".csv").tempfile()?;
let stdin = std::io::stdin();
let mut stdin_handle = stdin.lock();
std::io::copy(&mut stdin_handle, &mut temp_file)?;
drop(stdin_handle);

let path = temp_file
.into_temp_path()
.as_os_str()
.to_string_lossy()
.to_string();

CsvReader::from_path(path)?
.with_comment_prefix(comment_prefix)
.has_header(!conf.no_headers)
.truncate_ragged_lines(conf.flexible)
.low_memory(low_memory)
.finish()?
} else {
let csv_path = conf.path.as_ref().unwrap().to_str().unwrap().to_string();
polars::io::csv::CsvReader::from_path(csv_path)?
.with_comment_prefix(comment_prefix)
.has_header(!conf.no_headers)
.truncate_ragged_lines(conf.flexible)
.low_memory(low_memory)
.finish()?
};
let count = df.height() as u64;
Ok((count, 0))
}
2 changes: 2 additions & 0 deletions tests/test_comments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ fn comments_unicode_supported() {
assert_eq!(got, expected);
}

// Polars doesn't support unicode comment char
#[cfg(not(feature = "polars"))]
#[test]
fn comments_count() {
let wrk = Workdir::new("comments");
Expand Down
5 changes: 5 additions & 0 deletions tests/test_count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ fn prop_count_len(
}
}

#[cfg(not(feature = "polars"))]
#[test]
fn prop_count() {
fn p(rows: CsvData) -> bool {
Expand All @@ -141,6 +142,7 @@ fn prop_count() {
qcheck(p as fn(CsvData) -> bool);
}

#[cfg(not(feature = "polars"))]
#[test]
fn prop_count_human_readable() {
fn p(rows: CsvData) -> bool {
Expand All @@ -149,6 +151,7 @@ fn prop_count_human_readable() {
qcheck(p as fn(CsvData) -> bool);
}

#[cfg(not(feature = "polars"))]
#[test]
fn prop_count_headers() {
fn p(rows: CsvData) -> bool {
Expand All @@ -157,6 +160,7 @@ fn prop_count_headers() {
qcheck(p as fn(CsvData) -> bool);
}

#[cfg(not(feature = "polars"))]
#[test]
fn prop_count_headers_human_readable() {
fn p(rows: CsvData) -> bool {
Expand All @@ -181,6 +185,7 @@ fn prop_count_indexed_headers() {
qcheck(p as fn(CsvData) -> bool);
}

#[cfg(not(feature = "polars"))]
#[test]
fn prop_count_noheaders_env() {
fn p(rows: CsvData) -> bool {
Expand Down

0 comments on commit 425996a

Please sign in to comment.