Skip to content

Commit 145b2cf

Browse files
authored
Merge pull request #1553 from jqnatividad/jsonl-refactor
`jsonl`: major perf refactor
2 parents 472f121 + b8db7cc commit 145b2cf

File tree

2 files changed

+102
-27
lines changed

2 files changed

+102
-27
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
| [input](/src/cmd/input.rs#L2) | Read CSV data with special commenting, quoting, trimming, line-skipping & non-UTF8 encoding handling rules. Typically used to "normalize" a CSV for further processing with other qsv commands. |
5555
| [join](/src/cmd/join.rs#L2) | Inner, outer, right, cross, anti & semi joins. Automatically creates a simple, in-memory hash index to make it fast. |
5656
| [joinp](/src/cmd/joinp.rs#L2)<br>✨🚀🐻‍❄️ | Inner, outer, cross, anti, semi & asof joins using the [Pola.rs](https://www.pola.rs) engine. Unlike the `join` command, `joinp` can process files larger than RAM, is multi-threaded, has join key validation, pre-join filtering, supports [asof joins](https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/api/polars.DataFrame.join_asof.html) (which is [particularly useful for time series data](https://github.com/jqnatividad/qsv/blob/30cc920d0812a854fcbfedc5db81788a0600c92b/tests/test_joinp.rs#L509-L983)) & its output doesn't have duplicate columns. However, `joinp` doesn't have an --ignore-case option & it doesn't support right outer joins. |
57-
| [jsonl](/src/cmd/jsonl.rs#L2)<br>🔣 | Convert newline-delimited JSON ([JSONL](https://jsonlines.org/)/[NDJSON](http://ndjson.org/)) to CSV. See `tojsonl` command to convert CSV to JSONL.
57+
| [jsonl](/src/cmd/jsonl.rs#L2)<br>🚀🔣 | Convert newline-delimited JSON ([JSONL](https://jsonlines.org/)/[NDJSON](http://ndjson.org/)) to CSV. See `tojsonl` command to convert CSV to JSONL.
5858
| <a name="luau_deeplink"></a><br>[luau](/src/cmd/luau.rs#L2) 👑<br>✨📇🌐🔣 ![CKAN](docs/images/ckan.png) | Create multiple new computed columns, filter rows, compute aggregations and build complex data pipelines by executing a [Luau](https://luau-lang.org) [0.606](https://github.com/Roblox/luau/releases/tag/0.606) expression/script for every row of a CSV file ([sequential mode](https://github.com/jqnatividad/qsv/blob/bb72c4ef369d192d85d8b7cc6e972c1b7df77635/tests/test_luau.rs#L254-L298)), or using [random access](https://www.webopedia.com/definitions/random-access/) with an index ([random access mode](https://github.com/jqnatividad/qsv/blob/bb72c4ef369d192d85d8b7cc6e972c1b7df77635/tests/test_luau.rs#L367-L415)).<br>Can process a single Luau expression or [full-fledged data-wrangling scripts using lookup tables](https://github.com/dathere/qsv-lookup-tables#example) with discrete BEGIN, MAIN and END sections.<br> It is not just another qsv command, it is qsv's [Domain-specific Language](https://en.wikipedia.org/wiki/Domain-specific_language) (DSL) with [numerous qsv-specific helper functions](https://github.com/jqnatividad/qsv/blob/113eee17b97882dc368b2e65fec52b86df09f78b/src/cmd/luau.rs#L1356-L2290) to build production data pipelines. |
5959
| [partition](/src/cmd/partition.rs#L2) | Partition a CSV based on a column value. |
6060
| [pseudo](/src/cmd/pseudo.rs#L2)<br>🔣 | [Pseudonymise](https://en.wikipedia.org/wiki/Pseudonymization) the value of the given column by replacing them with an incremental identifier. |

src/cmd/jsonl.rs

+101-26
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ straightforwardly convert JSON lines to CSV, the process might lose some complex
66
fields from the input.
77
88
Also, it will fail if the JSON documents are not consistent with one another,
9-
as the first JSON line will be use to infer the headers of the CSV output.
9+
as the first JSON line will be used to infer the headers of the CSV output.
1010
1111
For examples, see https://github.com/jqnatividad/qsv/blob/master/tests/test_jsonl.rs.
1212
@@ -16,6 +16,11 @@ Usage:
1616
1717
jsonl options:
1818
--ignore-errors Skip malformed input lines.
19+
-j, --jobs <arg> The number of jobs to run in parallel.
20+
When not set, the number of jobs is set to the
21+
number of CPUs detected.
22+
-b, --batch <size> The number of rows per batch to load into memory,
23+
before running in parallel. [default: 50000]
1924
2025
Common options:
2126
-h, --help Display this message
@@ -29,11 +34,15 @@ use std::{
2934
io::{self, BufRead, BufReader},
3035
};
3136

37+
use rayon::{
38+
iter::{IndexedParallelIterator, ParallelIterator},
39+
prelude::IntoParallelRefIterator,
40+
};
3241
use serde::Deserialize;
3342
use serde_json::Value;
3443

3544
use crate::{
36-
config::{Config, Delimiter},
45+
config::{Config, Delimiter, DEFAULT_RDR_BUFFER_CAPACITY},
3746
util, CliResult,
3847
};
3948

@@ -43,10 +52,11 @@ struct Args {
4352
flag_output: Option<String>,
4453
flag_delimiter: Option<Delimiter>,
4554
flag_ignore_errors: bool,
55+
flag_jobs: Option<usize>,
56+
flag_batch: u32,
4657
}
4758

48-
#[allow(clippy::needless_pass_by_value)]
49-
fn recurse_to_infer_headers(value: &Value, headers: &mut Vec<Vec<String>>, path: Vec<String>) {
59+
fn recurse_to_infer_headers(value: &Value, headers: &mut Vec<Vec<String>>, path: &[String]) {
5060
match value {
5161
Value::Object(map) => {
5262
for (key, value) in map {
@@ -56,16 +66,16 @@ fn recurse_to_infer_headers(value: &Value, headers: &mut Vec<Vec<String>>, path:
5666
| Value::Number(_)
5767
| Value::String(_)
5868
| Value::Array(_) => {
59-
let mut full_path = path.clone();
69+
let mut full_path = path.to_owned();
6070
full_path.push(key.to_string());
6171

6272
headers.push(full_path);
6373
},
6474
Value::Object(_) => {
65-
let mut new_path = path.clone();
75+
let mut new_path = path.to_owned();
6676
new_path.push(key.to_string());
6777

68-
recurse_to_infer_headers(value, headers, new_path);
78+
recurse_to_infer_headers(value, headers, &new_path);
6979
},
7080
#[allow(unreachable_patterns)]
7181
_ => {},
@@ -81,7 +91,7 @@ fn recurse_to_infer_headers(value: &Value, headers: &mut Vec<Vec<String>>, path:
8191
fn infer_headers(value: &Value) -> Vec<Vec<String>> {
8292
let mut headers: Vec<Vec<String>> = Vec::new();
8393

84-
recurse_to_infer_headers(value, &mut headers, Vec::new());
94+
recurse_to_infer_headers(value, &mut headers, &Vec::new());
8595

8696
headers
8797
}
@@ -103,6 +113,7 @@ fn get_value_at_path(value: &Value, path: &[String]) -> Option<Value> {
103113
Some(current.clone())
104114
}
105115

116+
#[inline]
106117
fn json_line_to_csv_record(value: &Value, headers: &[Vec<String>]) -> csv::StringRecord {
107118
let mut record = csv::StringRecord::new();
108119

@@ -141,31 +152,67 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
141152
.delimiter(args.flag_delimiter)
142153
.writer()?;
143154

144-
let rdr: Box<dyn BufRead> = match args.arg_input {
155+
let mut rdr: Box<dyn BufRead> = match args.arg_input {
145156
None => Box::new(BufReader::new(io::stdin())),
146-
Some(p) => Box::new(BufReader::new(fs::File::open(p)?)),
157+
Some(p) => Box::new(BufReader::with_capacity(
158+
DEFAULT_RDR_BUFFER_CAPACITY,
159+
fs::File::open(p)?,
160+
)),
147161
};
148162

149163
let mut headers: Vec<Vec<String>> = Vec::new();
150164
let mut headers_emitted: bool = false;
151165

152-
for (rowidx, line) in rdr.lines().enumerate() {
153-
let value: Value = match serde_json::from_str(&line?) {
154-
Ok(v) => v,
155-
Err(e) => {
156-
if args.flag_ignore_errors {
157-
continue;
158-
}
159-
let human_idx = rowidx + 1; // not zero based, for readability
160-
return fail_clierror!(
161-
r#"Could not parse line {human_idx} as JSON!: {e}
166+
// amortize memory allocation by reusing record
167+
let mut batch_line = String::new();
168+
169+
// reuse batch buffers
170+
let batchsize: usize = args.flag_batch as usize;
171+
let mut batch = Vec::with_capacity(batchsize);
172+
let mut batch_results = Vec::with_capacity(batchsize);
173+
174+
// set RAYON_NUM_THREADS
175+
util::njobs(args.flag_jobs);
176+
177+
let mut result_idx = 0_u64;
178+
179+
'batch_loop: loop {
180+
for _ in 0..batchsize {
181+
batch_line.clear();
182+
match rdr.read_line(&mut batch_line) {
183+
Ok(0) => {
184+
// EOF
185+
break;
186+
},
187+
Ok(_) => {
188+
batch.push(batch_line.clone());
189+
},
190+
Err(e) => {
191+
if args.flag_ignore_errors {
192+
continue;
193+
}
194+
return fail_clierror!(
195+
r#"Could not read input line!: {e}
162196
Use `--ignore-errors` option to skip malformed input lines.
163197
Use `tojsonl` command to convert _to_ jsonl instead of _from_ jsonl."#,
164-
);
165-
},
166-
};
198+
);
199+
},
200+
}
201+
}
202+
203+
if batch.is_empty() {
204+
break 'batch_loop; // EOF
205+
}
167206

168207
if !headers_emitted {
208+
let value: Value = match serde_json::from_str(&batch[0]) {
209+
Ok(v) => v,
210+
Err(e) => {
211+
return fail_clierror!(
212+
"Could not parse first input line as JSON to infer headers: {e}",
213+
);
214+
},
215+
};
169216
headers = infer_headers(&value);
170217

171218
let headers_formatted = headers.iter().map(|v| v.join(".")).collect::<Vec<String>>();
@@ -175,9 +222,37 @@ Use `tojsonl` command to convert _to_ jsonl instead of _from_ jsonl."#,
175222
headers_emitted = true;
176223
}
177224

178-
let record = json_line_to_csv_record(&value, &headers);
179-
wtr.write_record(&record)?;
180-
}
225+
// do actual work via rayon
226+
batch
227+
.par_iter()
228+
.map(|json_line| match serde_json::from_str(json_line) {
229+
Ok(v) => Some(json_line_to_csv_record(&v, &headers)),
230+
Err(e) => {
231+
if !args.flag_ignore_errors {
232+
log::error!("serde_json::from_str error: {:#?}", e);
233+
}
234+
None
235+
},
236+
})
237+
.collect_into_vec(&mut batch_results);
238+
239+
// rayon collect() guarantees original order, so we can just append results of each batch
240+
for result_record in &batch_results {
241+
result_idx += 1;
242+
if let Some(record) = result_record {
243+
wtr.write_record(record)?;
244+
} else if !args.flag_ignore_errors {
245+
// there was an error parsing a json line
246+
return fail_clierror!(
247+
r#"Could not parse input line {result_idx} as JSON
248+
Use `--ignore-errors` option to skip malformed input lines.
249+
Use `tojsonl` command to convert _to_ jsonl instead of _from_ jsonl."#,
250+
);
251+
}
252+
}
253+
254+
batch.clear();
255+
} // end batch loop
181256

182257
Ok(wtr.flush()?)
183258
}

0 commit comments

Comments
 (0)