Skip to content

Commit

Permalink
Adding options to LazyOptions (#240)
Browse files Browse the repository at this point in the history
Adding the following options to LazyOptions:

```
  slicePushdown?: boolean;
  commSubplanElim?: boolean;
  commSubexprElim?: boolean;
  streaming?: boolean;
```

Adding options to `ldf.collect`
  • Loading branch information
Bidek56 authored Jul 11, 2024
1 parent d020e02 commit 3ef1a59
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 15 deletions.
10 changes: 9 additions & 1 deletion __tests__/lazyframe.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ describe("lazyframe", () => {
const actual = await expected.lazy().collect();
expect(actual).toFrameEqual(expected);
});
test("collect:streaming", async () => {
const expected = pl.DataFrame({
foo: [1, 2],
bar: ["a", "b"],
});
const actual = await expected.lazy().collect({ streaming: true });
expect(actual).toFrameEqual(expected);
});
test("describeOptimizedPlan", () => {
const df = pl
.DataFrame({
Expand Down Expand Up @@ -1268,7 +1276,7 @@ describe("lazyframe", () => {
.lazy();
ldf.sinkCSV("./test.csv");
const newDF: pl.DataFrame = pl.readCSV("./test.csv");
const actualDf: pl.DataFrame = await ldf.collect();
const actualDf: pl.DataFrame = await ldf.collect({ streaming: true });
expect(newDF.sort("foo")).toFrameEqual(actualDf);
fs.rmSync("./test.csv");
});
Expand Down
4 changes: 2 additions & 2 deletions polars/dataframe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2351,7 +2351,7 @@ export const _DataFrame = (_df: any): DataFrame => {
return _DataFrame(_df)
.lazy()
.sort(arg, descending, nulls_last, maintain_order)
.collectSync({ noOptimization: true, stringCache: false });
.collectSync({ noOptimization: true });
}
return wrap("sort", arg, descending, nulls_last, maintain_order);
},
Expand Down Expand Up @@ -2571,7 +2571,7 @@ export const _DataFrame = (_df: any): DataFrame => {
}
return this.lazy()
.withColumns(columns)
.collectSync({ noOptimization: true, stringCache: false });
.collectSync({ noOptimization: true });
},
withColumnRenamed(opt, replacement?) {
if (typeof opt === "string") {
Expand Down
66 changes: 57 additions & 9 deletions polars/lazy/dataframe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,16 @@ export interface LazyDataFrame extends Serialize, GroupByOps<LazyGroupBy> {
* @param predicatePushdown - Do predicate pushdown optimization.
* @param projectionPushdown - Do projection pushdown optimization.
* @param simplifyExpression - Run simplify expressions optimization.
* @param stringCache - Use a global string cache in this query.
* This is needed if you want to join on categorical columns.
* Caution!
* * If you already have set a global string cache, set this to `false` as this will reset the
* * global cache when the query is finished.
* @param noOptimization - Turn off optimizations.
* @param commSubplanElim - Will try to cache branching subplans that occur on self-joins or unions.
* @param commSubexprElim - Common subexpressions will be cached and reused.
* @param streaming - Process the query in batches to handle larger-than-memory data.
If set to `False` (default), the entire query is processed in a single
batch.
.. warning::
Streaming mode is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
* @return DataFrame
*
*/
Expand Down Expand Up @@ -113,7 +117,16 @@ export interface LazyDataFrame extends Serialize, GroupByOps<LazyGroupBy> {
* @param opts.predicatePushdown - Do predicate pushdown optimization.
* @param opts.projectionPushdown - Do projection pushdown optimization.
* @param opts.simplifyExpression - Run simplify expressions optimization.
* @param opts.stringCache - Use a global string cache in this query.
* @param opts.commSubplanElim - Will try to cache branching subplans that occur on self-joins or unions.
* @param opts.commSubexprElim - Common subexpressions will be cached and reused.
* @param opts.streaming - Process the query in batches to handle larger-than-memory data.
If set to `False` (default), the entire query is processed in a single
batch.
.. warning::
Streaming mode is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
*
*/
fetch(numRows?: number): Promise<DataFrame>;
fetch(numRows: number, opts: LazyOptions): Promise<DataFrame>;
Expand Down Expand Up @@ -629,7 +642,30 @@ export const _LazyDataFrame = (_ldf: any): LazyDataFrame => {
collectSync() {
return _DataFrame(_ldf.collectSync());
},
collect() {
collect(opts?) {
if (opts?.noOptimization) {
opts.predicatePushdown = false;
opts.projectionPushdown = false;
opts.slicePushdown = false;
opts.commSubplanElim = false;
opts.commSubexprElim = false;
}

if (opts?.streaming) opts.commSubplanElim = false;

if (opts) {
_ldf = _ldf.optimizationToggle(
opts.typeCoercion,
opts.predicatePushdown,
opts.projectionPushdown,
opts.simplifyExpression,
opts.slicePushdown,
opts.commSubplanElim,
opts.commSubexprElim,
opts.streaming,
);
}

return _ldf.collect().then(_DataFrame);
},
drop(...cols) {
Expand Down Expand Up @@ -679,15 +715,21 @@ export const _LazyDataFrame = (_ldf: any): LazyDataFrame => {
if (opts?.noOptimization) {
opts.predicatePushdown = false;
opts.projectionPushdown = false;
opts.slicePushdown = false;
opts.commSubplanElim = false;
opts.commSubexprElim = false;
}
if (opts?.streaming) opts.commSubplanElim = false;
if (opts) {
_ldf = _ldf.optimizationToggle(
opts.typeCoercion,
opts.predicatePushdown,
opts.projectionPushdown,
opts.simplifyExpr,
opts.stringCache,
opts.slicePushdown,
opts.commSubplanElim,
opts.commSubexprElim,
opts.streaming,
);
}

Expand All @@ -697,15 +739,21 @@ export const _LazyDataFrame = (_ldf: any): LazyDataFrame => {
if (opts?.noOptimization) {
opts.predicatePushdown = false;
opts.projectionPushdown = false;
opts.slicePushdown = false;
opts.commSubplanElim = false;
opts.commSubexprElim = false;
}
if (opts?.streaming) opts.commSubplanElim = false;
if (opts) {
_ldf = _ldf.optimizationToggle(
opts.typeCoercion,
opts.predicatePushdown,
opts.projectionPushdown,
opts.simplifyExpr,
opts.stringCache,
opts.slicePushdown,
opts.commSubplanElim,
opts.commSubexprElim,
opts.streaming,
);
}

Expand Down
5 changes: 4 additions & 1 deletion polars/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,11 @@ export type LazyOptions = {
predicatePushdown?: boolean;
projectionPushdown?: boolean;
simplifyExpression?: boolean;
stringCache?: boolean;
slicePushdown?: boolean;
noOptimization?: boolean;
commSubplanElim?: boolean;
commSubexprElim?: boolean;
streaming?: boolean;
};

/**
Expand Down
13 changes: 11 additions & 2 deletions src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,22 +113,31 @@ impl JsLazyFrame {
predicate_pushdown: Option<bool>,
projection_pushdown: Option<bool>,
simplify_expr: Option<bool>,
_string_cache: Option<bool>,
slice_pushdown: Option<bool>,
comm_subplan_elim: Option<bool>,
comm_subexpr_elim: Option<bool>,
streaming: Option<bool>,
) -> JsLazyFrame {
let type_coercion = type_coercion.unwrap_or(true);
let predicate_pushdown = predicate_pushdown.unwrap_or(true);
let projection_pushdown = projection_pushdown.unwrap_or(true);
let simplify_expr = simplify_expr.unwrap_or(true);
let slice_pushdown = slice_pushdown.unwrap_or(true);
let comm_subplan_elim = comm_subplan_elim.unwrap_or(true);
let comm_subexpr_elim = comm_subexpr_elim.unwrap_or(true);
let streaming = streaming.unwrap_or(false);

let ldf = self.ldf.clone();
let ldf = ldf
.with_type_coercion(type_coercion)
.with_predicate_pushdown(predicate_pushdown)
.with_simplify_expr(simplify_expr)
.with_slice_pushdown(slice_pushdown)
.with_projection_pushdown(projection_pushdown);
.with_streaming(streaming)
.with_projection_pushdown(projection_pushdown)
.with_comm_subplan_elim(comm_subplan_elim)
.with_comm_subexpr_elim(comm_subexpr_elim);

ldf.into()
}
#[napi(catch_unwind)]
Expand Down

0 comments on commit 3ef1a59

Please sign in to comment.