Skip to content

Commit e336513

Browse files
authored
Merge branch 'pola-rs:main' into issue-351
2 parents a35e69b + c55de4b commit e336513

File tree

6 files changed

+252
-6
lines changed

6 files changed

+252
-6
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ Want to know about all the features Polars supports? Read the [docs](https://doc
186186

187187
#### Node
188188

189-
* Installation guide: `$ yarn install nodejs-polars`
189+
* Installation guide: `$ yarn add nodejs-polars` or `$ bun install nodejs-polars`
190190
* [Node documentation](https://pola-rs.github.io/nodejs-polars/)
191191

192192
## Contribution

__tests__/lazyframe.test.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1446,6 +1446,52 @@ describe("lazyframe", () => {
14461446
expect(newDF.sort("foo")).toFrameEqual(actualDf);
14471447
fs.rmSync("./test.parquet");
14481448
});
1449+
test("sinkNdJson:path", async () => {
1450+
const ldf = pl
1451+
.DataFrame([
1452+
pl.Series("foo", [1, 2, 3], pl.Int64),
1453+
pl.Series("bar", ["a", "b", "c"]),
1454+
])
1455+
.lazy();
1456+
await ldf.sinkNdJson("./test.ndjson").collect();
1457+
let df = pl.scanJson("./test.ndjson").collectSync();
1458+
expect(df.shape).toEqual({ height: 3, width: 2 });
1459+
1460+
await ldf
1461+
.sinkNdJson("./test.ndjson", {
1462+
retries: 1,
1463+
syncOnClose: "all",
1464+
maintainOrder: false,
1465+
})
1466+
.collect();
1467+
df = pl.scanJson("./test.ndjson").collectSync();
1468+
expect(df.shape).toEqual({ height: 3, width: 2 });
1469+
1470+
fs.rmSync("./test.ndjson");
1471+
});
1472+
test("sinkIpc:path", async () => {
1473+
const ldf = pl
1474+
.DataFrame([
1475+
pl.Series("foo", [1, 2, 3], pl.Int64),
1476+
pl.Series("bar", ["a", "b", "c"]),
1477+
])
1478+
.lazy();
1479+
await ldf.sinkIpc("./test.ipc").collect();
1480+
let df = pl.scanIPC("./test.ipc").collectSync();
1481+
expect(df.shape).toEqual({ height: 3, width: 2 });
1482+
1483+
await ldf
1484+
.sinkIpc("./test.ipc", {
1485+
retries: 1,
1486+
syncOnClose: "all",
1487+
maintainOrder: false,
1488+
compression: "lz4",
1489+
})
1490+
.collect();
1491+
df = pl.scanIPC("./test.ipc").collectSync();
1492+
expect(df.shape).toEqual({ height: 3, width: 2 });
1493+
fs.rmSync("./test.ipc");
1494+
});
14491495
test("unpivot renamed", () => {
14501496
const ldf = pl
14511497
.DataFrame({

polars/lazy/dataframe.ts

Lines changed: 94 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import type {
1414
LazyJoinOptions,
1515
LazyOptions,
1616
LazySameNameColumnJoinOptions,
17+
SinkIpcOptions,
18+
SinkJsonOptions,
1719
SinkParquetOptions,
1820
} from "../types";
1921
import {
@@ -615,10 +617,6 @@ export interface LazyDataFrame<S extends Schema = any>
615617
*
616618
* Evaluate the query in streaming mode and write to a Parquet file.
617619
618-
.. warning::
619-
Streaming mode is considered **unstable**. It may be changed
620-
at any point without it being considered a breaking change.
621-
622620
This allows streaming results that are larger than RAM to be written to disk.
623621
624622
Parameters
@@ -666,6 +664,82 @@ export interface LazyDataFrame<S extends Schema = any>
666664
>>> lf.sinkParquet("out.parquet").collect() # doctest: +SKIP
667665
*/
668666
sinkParquet(path: string, options?: SinkParquetOptions): LazyDataFrame;
667+
668+
/**
669+
*
670+
* Evaluate the query in streaming mode and write to an NDJSON file.
671+
* This allows streaming results that are larger than RAM to be written to disk.
672+
*
673+
* Parameters
674+
@param path - File path to which the file should be written.
675+
@param options.maintainOrder - Maintain the order in which data is processed. Default -> true
676+
Setting this to `False` will be slightly faster.
677+
@param options.mkdir - Recursively create all the directories in the path. Default -> false
678+
@param options.retries - Number of retries if accessing a cloud instance fails. Default = 2
679+
@param options.syncOnClose - { None, 'data', 'all' } Default -> 'all'
680+
Sync to disk when before closing a file.
681+
682+
* `None` does not sync.
683+
* `data` syncs the file contents.
684+
* `all` syncs the file contents and metadata.
685+
@param options.cloudOptions - Options that indicate how to connect to a cloud provider.
686+
If the cloud provider is not supported by Polars, the storage options are passed to `fsspec.open()`.
687+
688+
The cloud providers currently supported are AWS, GCP, and Azure.
689+
See supported keys here:
690+
691+
* `aws <https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html>`_
692+
* `gcp <https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html>`_
693+
* `azure <https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html>`_
694+
695+
If `cloudOptions` is not provided, Polars will try to infer the information from environment variables.
696+
@return DataFrame
697+
Examples
698+
--------
699+
>>> const lf = pl.scanCsv("/path/to/my_larger_than_ram_file.csv") # doctest: +SKIP
700+
>>> lf.sinkNdJson("out.ndjson").collect()
701+
*/
702+
sinkNdJson(path: string, options?: SinkJsonOptions): LazyDataFrame;
703+
/**
704+
*
705+
* Evaluate the query in streaming mode and write to an IPC file.
706+
* This allows streaming results that are larger than RAM to be written to disk.
707+
*
708+
* Parameters
709+
@param path - File path to which the file should be written.
710+
@param options.compression : {'uncompressed', 'lz4', 'zstd'}
711+
Choose "zstd" for good compression performance.
712+
Choose "lz4" for fast compression/decompression.
713+
@param options.compatLevel : { 'newest', 'oldest' } Default -> newest
714+
Use a specific compatibility level when exporting Polars' internal data structures.
715+
@param options.maintainOrder - Maintain the order in which data is processed. Default -> true
716+
Setting this to `False` will be slightly faster.
717+
@param options.mkdir - Recursively create all the directories in the path. Default -> false
718+
@param options.retries - Number of retries if accessing a cloud instance fails. Default = 2
719+
@param options.syncOnClose - { None, 'data', 'all' } Default -> 'all'
720+
Sync to disk when before closing a file.
721+
722+
* `None` does not sync.
723+
* `data` syncs the file contents.
724+
* `all` syncs the file contents and metadata.
725+
@param options.cloudOptions - Options that indicate how to connect to a cloud provider.
726+
If the cloud provider is not supported by Polars, the storage options are passed to `fsspec.open()`.
727+
728+
The cloud providers currently supported are AWS, GCP, and Azure.
729+
See supported keys here:
730+
731+
* `aws <https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html>`_
732+
* `gcp <https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html>`_
733+
* `azure <https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html>`_
734+
735+
If `cloudOptions` is not provided, Polars will try to infer the information from environment variables.
736+
@return DataFrame
737+
Examples
738+
--------
739+
>>> const lf = pl.scanCsv("/path/to/my_larger_than_ram_file.csv") # doctest: +SKIP
740+
>>> lf.sinkIpc("out.arrow").collect()
741+
*/
742+
sinkIpc(path: string, options?: SinkIpcOptions): LazyDataFrame;
669743
}
670744

671745
const prepareGroupbyInputs = (by) => {
@@ -1173,6 +1247,22 @@ export const _LazyDataFrame = (_ldf: any): LazyDataFrame => {
11731247
};
11741248
return _ldf.sinkParquet(path, options);
11751249
},
1250+
sinkNdJson(path: string, options: SinkJsonOptions = {}) {
1251+
options.retries = options.retries ?? 2;
1252+
options.syncOnClose = options.syncOnClose ?? "all";
1253+
options.maintainOrder = options.maintainOrder ?? true;
1254+
options.mkdir = options.mkdir ?? true;
1255+
return _ldf.sinkJson(path, options);
1256+
},
1257+
sinkIpc(path: string, options: SinkIpcOptions = {}) {
1258+
options.compatLevel = options.compatLevel ?? "newest";
1259+
options.compression = options.compression ?? "uncompressed";
1260+
options.retries = options.retries ?? 2;
1261+
options.syncOnClose = options.syncOnClose ?? "all";
1262+
options.maintainOrder = options.maintainOrder ?? true;
1263+
options.mkdir = options.mkdir ?? true;
1264+
return _ldf.sinkIpc(path, options);
1265+
},
11761266
};
11771267
};
11781268

polars/types.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ export interface SinkOptions {
6969
}
7070

7171
/**
72-
* Options for @see {@link LazyDataFrame.sinkParquet}
72+
* Options for @see {@link LazyDataFrame.sinkParquet }
7373
* @category Options
7474
*/
7575
export interface SinkParquetOptions {
@@ -89,6 +89,30 @@ export interface SinkParquetOptions {
8989
retries?: number;
9090
sinkOptions?: SinkOptions;
9191
}
92+
/**
93+
* Options for @see {@link LazyDataFrame.sinkNdJson}
94+
* @category Options
95+
*/
96+
export interface SinkJsonOptions {
97+
cloudOptions?: Map<string, string>;
98+
retries?: number;
99+
syncOnClose?: string; // Call sync when closing the file.
100+
maintainOrder?: boolean; // The output file needs to maintain order of the data that comes in.
101+
mkdir?: boolean; // Recursively create all the directories in the path.
102+
}
103+
/**
104+
* Options for @see {@link LazyDataFrame.sinkIpc}
105+
* @category Options
106+
*/
107+
export interface SinkIpcOptions {
108+
compression?: string;
109+
compatLevel?: string;
110+
cloudOptions?: Map<string, string>;
111+
retries?: number;
112+
syncOnClose?: string; // Call sync when closing the file.
113+
maintainOrder?: boolean; // The output file needs to maintain order of the data that comes in.
114+
mkdir?: boolean; // Recursively create all the directories in the path.
115+
}
92116
/**
93117
* Options for {@link DataFrame.writeJSON}
94118
* @category Options

src/conversion.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,16 @@ impl FromNapiValue for Wrap<Option<IpcCompression>> {
414414
}
415415
}
416416

417+
impl ToNapiValue for Wrap<Option<IpcCompression>> {
418+
unsafe fn to_napi_value(env: sys::napi_env, val: Self) -> Result<sys::napi_value> {
419+
let s = match val.0.unwrap() {
420+
IpcCompression::LZ4 => "lz4",
421+
IpcCompression::ZSTD => "zstd",
422+
};
423+
String::to_napi_value(env, s.to_owned())
424+
}
425+
}
426+
417427
impl FromNapiValue for Wrap<UniqueKeepStrategy> {
418428
unsafe fn from_napi_value(env: sys::napi_env, napi_val: sys::napi_value) -> JsResult<Self> {
419429
let method = String::from_napi_value(env, napi_val)?;
@@ -648,6 +658,26 @@ pub struct SinkParquetOptions {
648658
pub sink_options: JsSinkOptions,
649659
}
650660

661+
#[napi(object)]
662+
pub struct SinkJsonOptions {
663+
pub maintain_order: Option<bool>,
664+
pub cloud_options: Option<HashMap<String, String>>,
665+
pub retries: Option<u32>,
666+
pub sync_on_close: Wrap<SyncOnCloseType>,
667+
pub mkdir: Option<bool>,
668+
}
669+
670+
#[napi(object)]
671+
pub struct SinkIpcOptions {
672+
pub compat_level: Option<String>,
673+
pub compression: Wrap<Option<IpcCompression>>,
674+
pub maintain_order: Option<bool>,
675+
pub cloud_options: Option<HashMap<String, String>>,
676+
pub retries: Option<u32>,
677+
pub sync_on_close: Wrap<SyncOnCloseType>,
678+
pub mkdir: Option<bool>,
679+
}
680+
651681
#[napi(object)]
652682
pub struct ScanParquetOptions {
653683
pub n_rows: Option<i64>,

src/lazy/dataframe.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,62 @@ impl JsLazyFrame {
609609
.map_err(JsPolarsErr::from)?;
610610
Ok(rldf.into())
611611
}
612+
613+
#[napi(catch_unwind)]
614+
pub fn sink_json(&self, path: String, options: SinkJsonOptions) -> napi::Result<JsLazyFrame> {
615+
let cloud_options = parse_cloud_options(&path, options.cloud_options, options.retries);
616+
let sink_options: SinkOptions = SinkOptions {
617+
maintain_order: options.maintain_order.unwrap_or(true),
618+
sync_on_close: options.sync_on_close.0,
619+
mkdir: options.mkdir.unwrap_or(true),
620+
};
621+
let sink_target = SinkTarget::Path(PlPath::new(&path));
622+
let rldf = self
623+
.ldf
624+
.clone()
625+
.sink_json(
626+
sink_target,
627+
JsonWriterOptions {},
628+
cloud_options,
629+
sink_options,
630+
)
631+
.map_err(JsPolarsErr::from)?;
632+
Ok(rldf.into())
633+
}
634+
635+
#[napi(catch_unwind)]
636+
pub fn sink_ipc(&self, path: String, options: SinkIpcOptions) -> napi::Result<JsLazyFrame> {
637+
let cloud_options = parse_cloud_options(&path, options.cloud_options, options.retries);
638+
let sink_options: SinkOptions = SinkOptions {
639+
maintain_order: options.maintain_order.unwrap_or(true),
640+
sync_on_close: options.sync_on_close.0,
641+
mkdir: options.mkdir.unwrap_or(true),
642+
};
643+
644+
let compat_level: CompatLevel = match options.compat_level.unwrap().as_str() {
645+
"newest" => CompatLevel::newest(),
646+
"oldest" => CompatLevel::oldest(),
647+
_ => {
648+
return Err(napi::Error::from_reason(
649+
"use one of {'newest', 'oldest'}".to_owned(),
650+
))
651+
}
652+
};
653+
654+
let ipc_options = IpcWriterOptions {
655+
compression: options.compression.0,
656+
compat_level: compat_level,
657+
..Default::default()
658+
};
659+
660+
let sink_target = SinkTarget::Path(PlPath::new(&path));
661+
let rldf = self
662+
.ldf
663+
.clone()
664+
.sink_ipc(sink_target, ipc_options, cloud_options, sink_options)
665+
.map_err(JsPolarsErr::from)?;
666+
Ok(rldf.into())
667+
}
612668
}
613669

614670
#[napi(object)]

0 commit comments

Comments
 (0)