Skip to content

Commit

Permalink
feat: joinp add --maintain-order option
Browse files Browse the repository at this point in the history
  • Loading branch information
jqnatividad committed Dec 7, 2024
1 parent 9a43db2 commit cd31fdb
Showing 1 changed file with 47 additions and 14 deletions.
61 changes: 47 additions & 14 deletions src/cmd/joinp.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
static USAGE: &str = r#"
Joins two sets of CSV data on the specified columns using the Pola.rs engine.
Joins two sets of CSV data on the specified columns using the Polars engine.
The default join operation is an 'inner' join. This corresponds to the
intersection of rows on the keys specified.
Unlike the join command, joinp can process files larger than RAM, is multithreaded,
has join key validation, pre-join filtering, supports asof joins & its output columns
can be coalesced (no duplicate columns).
However, joinp doesn't have an --ignore-case option.
has join key validation, a maintain row order option, pre-join filtering, supports
asof joins and its output columns can be coalesced (no duplicate columns).
Returns the shape of the join result (number of rows, number of columns) to stderr.
Expand Down Expand Up @@ -76,7 +74,13 @@ joinp options:
onetoone - join keys are unique in both left & right data sets.
[default: none]
JOIN OPTIONS:
JOIN OPTIONS:
--maintain-order <arg> Which row order to preserve, if any. Valid values are:
none, left, right, left_right, right_left
Do not rely on any observed ordering without explicitly
setting this parameter. Not specifying any order can improve
performance. Supported for inner, left, right and full joins.
[default: none]
--nulls When set, joins will work on empty fields.
Otherwise, empty fields are completely ignored.
--streaming When set, the join will be done in a streaming fashion.
Expand Down Expand Up @@ -228,6 +232,7 @@ struct Args {
flag_filter_left: Option<String>,
flag_filter_right: Option<String>,
flag_validate: Option<String>,
flag_maintain_order: Option<String>,
flag_nulls: bool,
flag_streaming: bool,
flag_try_parsedates: bool,
Expand Down Expand Up @@ -283,6 +288,19 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
s => return fail_incorrectusage_clierror!("Invalid join validation: {s}"),
};

let flag_maintain_order = args
.flag_maintain_order
.unwrap_or_else(|| "none".to_string())
.to_lowercase();
let maintain_order = match flag_maintain_order.as_str() {
"none" => MaintainOrderJoin::None,
"left" => MaintainOrderJoin::Left,
"right" => MaintainOrderJoin::Right,
"left_right" => MaintainOrderJoin::LeftRight,
"right_left" => MaintainOrderJoin::RightLeft,
s => return fail_incorrectusage_clierror!("Invalid maintain order option: {s}"),
};

let join_shape: (usize, usize) = match (
args.flag_left,
args.flag_left_anti,
Expand All @@ -292,27 +310,35 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
args.flag_cross,
args.flag_asof,
) {
// default inner join
(false, false, false, false, false, false, false) => {
join.run(JoinType::Inner, validation, false)
join.run(JoinType::Inner, validation, maintain_order, false)
},
// left join
(true, false, false, false, false, false, false) => {
join.run(JoinType::Left, validation, false)
join.run(JoinType::Left, validation, maintain_order, false)
},
// left anti join
(false, true, false, false, false, false, false) => {
join.run(JoinType::Anti, validation, false)
join.run(JoinType::Anti, validation, maintain_order, false)
},
// left semi join
(false, false, true, false, false, false, false) => {
join.run(JoinType::Semi, validation, false)
join.run(JoinType::Semi, validation, maintain_order, false)
},
// right join
(false, false, false, true, false, false, false) => {
join.run(JoinType::Right, validation, false)
join.run(JoinType::Right, validation, maintain_order, false)
},
// full join
(false, false, false, false, true, false, false) => {
join.run(JoinType::Full, validation, false)
join.run(JoinType::Full, validation, maintain_order, false)
},
// cross join
(false, false, false, false, false, true, false) => {
join.run(JoinType::Cross, validation, false)
join.run(JoinType::Cross, validation, MaintainOrderJoin::None, false)
},
// as of join
(false, false, false, false, false, false, true) => {
// safety: flag_strategy is always is_some() as it has a default value
args.flag_strategy = Some(args.flag_strategy.unwrap().to_lowercase());
Expand Down Expand Up @@ -358,7 +384,12 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
.collect(),
);
}
join.run(JoinType::AsOf(asof_options), validation, true)
join.run(
JoinType::AsOf(asof_options),
validation,
MaintainOrderJoin::None,
true,
)
},
_ => fail_incorrectusage_clierror!("Please pick exactly one join operation."),
}?;
Expand Down Expand Up @@ -394,6 +425,7 @@ impl JoinStruct {
mut self,
jointype: JoinType,
validation: JoinValidation,
maintain_order: MaintainOrderJoin,
asof_join: bool,
) -> CliResult<(usize, usize)> {
let mut left_selcols: Vec<_> = self
Expand Down Expand Up @@ -537,6 +569,7 @@ impl JoinStruct {
.left_on(left_selcols)
.right_on(right_selcols)
.how(jointype)
.maintain_order(maintain_order)
.coalesce(coalesce_flag)
.allow_parallel(true)
.validate(validation)
Expand Down

0 comments on commit cd31fdb

Please sign in to comment.