Skip to content

Commit 879a53b

Browse files
authored
Vortex Session (#5111)
# VortexSession We have had a `VortexSession` object for a while, but never really made use of it, in a large part because none of the Vortex components could take a dependency without creating a cycle. Since Vortex is incredibly extensible, the idea of the session is to hold the plugin registries for arrays, layouts, expressions and so on. While our existing solution does this, it created very fragile APIs where it was possible to construct file readers/writers _without_ using the main registry of plugins. Similarly, metrics registries were getting lost and some metrics ended up in isolated `VortexMetrics::default()` instances - never to see the light of day. This PR introduces a top-level VortexSession create that essentially acts as a type map. Components of Vortex can register session state, and then read it back again later. This allows components to accept and hold a VortexSession internally, while our root `vortex` crate performs a default configuration. # Runtime Handles One of the most fragile bits of mis-configured API was the Vortex runtime handle. This is our abstraction over runtimes (in practice, only over Tokio or a custom smol-based CurrentThreadRuntime) and provides an API for components to spawn CPU, I/O, blocking and other types of work. This PR demonstrates the session API by holding a runtime handle and plumbing this through APIs that formerly accepted a with_handle argument. # Language Bindings This PR doesn't not completely migrate language bindings onto a session-based API. Many bindings still use a global static session where a session instance would be preferred. --------- Signed-off-by: Nicholas Gates <[email protected]>
1 parent 1361988 commit 879a53b

File tree

113 files changed

+1607
-1295
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

113 files changed

+1607
-1295
lines changed

Cargo.lock

Lines changed: 22 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ members = [
2828
"vortex-python",
2929
"vortex-scan",
3030
"vortex-scalar",
31+
"vortex-session",
3132
"vortex-tui",
3233
"vortex-utils",
3334
"vortex-vector",
@@ -245,6 +246,7 @@ vortex-runend = { version = "0.1.0", path = "./encodings/runend", default-featur
245246
vortex-scalar = { version = "0.1.0", path = "./vortex-scalar", default-features = false }
246247
vortex-scan = { version = "0.1.0", path = "./vortex-scan", default-features = false }
247248
vortex-sequence = { version = "0.1.0", path = "encodings/sequence", default-features = false }
249+
vortex-session = { version = "0.1.0", path = "./vortex-session", default-features = false }
248250
vortex-sparse = { version = "0.1.0", path = "./encodings/sparse", default-features = false }
249251
vortex-tui = { version = "0.1.0", path = "./vortex-tui", default-features = false }
250252
vortex-utils = { version = "0.1.0", path = "./vortex-utils", default-features = false }

bench-vortex/src/clickbench/clickbench_data.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ use tokio::fs::{OpenOptions, create_dir_all};
2626
use tracing::{Instrument, info, warn};
2727
use url::Url;
2828
use vortex::error::VortexExpect;
29-
use vortex::file::VortexWriteOptions;
29+
use vortex::file::WriteOptionsSessionExt;
3030
use vortex_datafusion::VortexFormat;
3131

3232
use crate::conversions::parquet_to_vortex;
3333
#[cfg(feature = "lance")]
3434
use crate::utils;
3535
use crate::utils::file_utils::{idempotent, idempotent_async};
36-
use crate::{CompactionStrategy, Format};
36+
use crate::{CompactionStrategy, Format, SESSION};
3737

3838
pub static HITS_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
3939
use DataType::*;
@@ -204,7 +204,7 @@ pub async fn convert_parquet_to_vortex(
204204
.open(&vtx_file)
205205
.await?;
206206

207-
let write_options = compaction.apply_options(VortexWriteOptions::default());
207+
let write_options = compaction.apply_options(SESSION.write_options());
208208

209209
write_options.write(&mut f, array_stream).await?;
210210

@@ -250,7 +250,7 @@ pub async fn register_vortex_files(
250250
glob_pattern: Option<Pattern>,
251251
) -> anyhow::Result<()> {
252252
let vortex_path = input_path.join(&format!("{}/", Format::OnDiskVortex.name()))?;
253-
let format = Arc::new(VortexFormat::default());
253+
let format = Arc::new(VortexFormat::new(SESSION.clone()));
254254

255255
info!(
256256
"Registering table from {vortex_path} with glob {:?}",
@@ -283,7 +283,7 @@ pub async fn register_vortex_compact_files(
283283
glob_pattern: Option<Pattern>,
284284
) -> anyhow::Result<()> {
285285
let vortex_compact_path = input_path.join(&format!("{}/", Format::VortexCompact.name()))?;
286-
let format = Arc::new(VortexFormat::default());
286+
let format = Arc::new(VortexFormat::new(SESSION.clone()));
287287

288288
info!(
289289
"Registering vortex-compact table from {vortex_compact_path} with glob {:?}",

bench-vortex/src/compress/vortex.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,23 @@ use std::sync::Arc;
77
use bytes::Bytes;
88
use futures::{StreamExt, pin_mut};
99
use vortex::Array;
10-
use vortex::file::{VortexOpenOptions, VortexWriteOptions};
10+
use vortex::file::{OpenOptionsSessionExt, WriteOptionsSessionExt};
11+
12+
use crate::SESSION;
1113

1214
#[inline(never)]
1315
pub async fn vortex_compress_write(array: &dyn Array, buf: &mut Vec<u8>) -> anyhow::Result<u64> {
1416
let mut cursor = Cursor::new(buf);
15-
VortexWriteOptions::default()
17+
SESSION
18+
.write_options()
1619
.write(&mut cursor, array.to_array_stream())
1720
.await?;
1821
Ok(cursor.position())
1922
}
2023

2124
#[inline(never)]
2225
pub async fn vortex_decompress_read(buf: Bytes) -> anyhow::Result<usize> {
23-
let scan = VortexOpenOptions::new().open_buffer(buf)?.scan()?;
26+
let scan = SESSION.open_options().open_buffer(buf)?.scan()?;
2427
let schema = Arc::new(scan.dtype()?.to_arrow_schema()?);
2528

2629
let stream = scan.into_record_batch_stream(schema)?;

bench-vortex/src/datasets/file.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use vortex_datafusion::VortexFormat;
1717
#[cfg(feature = "lance")]
1818
use {crate::Format, lance::datafusion::LanceTableProvider, lance::dataset::Dataset};
1919

20+
use crate::SESSION;
2021
use crate::datasets::BenchmarkDataset;
2122

2223
pub async fn register_parquet_files(
@@ -85,7 +86,7 @@ pub async fn register_vortex_files(
8586
&file_url,
8687
glob.as_ref().map(|g| g.as_str()).unwrap_or("")
8788
);
88-
let format = Arc::new(VortexFormat::default());
89+
let format = Arc::new(VortexFormat::new(SESSION.clone()));
8990
let table_url = ListingTableUrl::try_new(file_url.clone(), glob)?;
9091
let config = ListingTableConfig::new(table_url).with_listing_options(
9192
ListingOptions::new(format).with_session_config_options(session.state().config()),
@@ -133,7 +134,7 @@ pub async fn register_vortex_compact_files(
133134
&file_url,
134135
glob.as_ref().map(|g| g.as_str()).unwrap_or("")
135136
);
136-
let format = Arc::new(VortexFormat::default());
137+
let format = Arc::new(VortexFormat::new(SESSION.clone()));
137138
let table_url = ListingTableUrl::try_new(file_url.clone(), glob)?;
138139
let config = ListingTableConfig::new(table_url).with_listing_options(
139140
ListingOptions::new(format).with_session_config_options(session.state().config()),

bench-vortex/src/datasets/taxi_data.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use async_trait::async_trait;
88
use tokio::fs::File as TokioFile;
99
use tokio::io::AsyncWriteExt;
1010
use vortex::ArrayRef;
11-
use vortex::file::{VortexOpenOptions, VortexWriteOptions};
11+
use vortex::file::{OpenOptionsSessionExt, WriteOptionsSessionExt};
1212
use vortex::stream::ArrayStreamExt;
1313
#[cfg(feature = "lance")]
1414
use {
@@ -21,7 +21,7 @@ use {
2121
use crate::conversions::parquet_to_vortex;
2222
use crate::datasets::Dataset;
2323
use crate::datasets::data_downloads::download_data;
24-
use crate::{CompactionStrategy, IdempotentPath, idempotent_async};
24+
use crate::{CompactionStrategy, IdempotentPath, SESSION, idempotent_async};
2525

2626
pub struct TaxiData;
2727

@@ -45,7 +45,8 @@ pub async fn taxi_data_parquet() -> Result<PathBuf> {
4545

4646
pub async fn fetch_taxi_data() -> Result<ArrayRef> {
4747
let vortex_data = taxi_data_vortex().await?;
48-
Ok(VortexOpenOptions::new()
48+
Ok(SESSION
49+
.open_options()
4950
.open(vortex_data)
5051
.await?
5152
.scan()?
@@ -58,7 +59,8 @@ pub async fn taxi_data_vortex() -> Result<PathBuf> {
5859
idempotent_async("taxi/taxi.vortex", |output_fname| async move {
5960
let buf = output_fname.to_path_buf();
6061
let mut output_file = TokioFile::create(output_fname).await?;
61-
VortexWriteOptions::default()
62+
SESSION
63+
.write_options()
6264
.write(
6365
&mut output_file,
6466
parquet_to_vortex(taxi_data_parquet().await?)?,
@@ -76,8 +78,7 @@ pub async fn taxi_data_vortex_compact() -> Result<PathBuf> {
7678
let mut output_file = TokioFile::create(output_fname).await?;
7779

7880
// This is the only difference to `taxi_data_vortex`.
79-
let write_options =
80-
CompactionStrategy::Compact.apply_options(VortexWriteOptions::default());
81+
let write_options = CompactionStrategy::Compact.apply_options(SESSION.write_options());
8182

8283
write_options
8384
.write(

bench-vortex/src/datasets/tpch_l_comment.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ use glob::glob;
88
use vortex::arrays::ChunkedArray;
99
use vortex::dtype::Nullability::NonNullable;
1010
use vortex::expr::{col, pack};
11-
use vortex::file::VortexOpenOptions;
11+
use vortex::file::OpenOptionsSessionExt;
1212
use vortex::{Array, ArrayRef, IntoArray, ToCanonical};
1313

1414
use crate::datasets::Dataset;
1515
use crate::tpch::tpchgen::{TpchGenOptions, generate_tpch_tables};
16-
use crate::{Format, IdempotentPath};
16+
use crate::{Format, IdempotentPath, SESSION};
1717

1818
pub struct TPCHLCommentChunked;
1919

@@ -44,7 +44,7 @@ impl Dataset for TPCHLCommentChunked {
4444
.to_string_lossy()
4545
.as_ref(),
4646
)? {
47-
let file = VortexOpenOptions::new().open(path?).await?;
47+
let file = SESSION.open_options().open(path?).await?;
4848
let file_chunks: Vec<_> = file
4949
.scan()?
5050
.with_projection(pack(vec![("l_comment", col("l_comment"))], NonNullable))

bench-vortex/src/downloadable_dataset.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44
use async_trait::async_trait;
55
use tokio::fs::File;
66
use vortex::ArrayRef;
7-
use vortex::file::{VortexOpenOptions, VortexWriteOptions};
7+
use vortex::file::{OpenOptionsSessionExt, WriteOptionsSessionExt};
88
use vortex::stream::ArrayStreamExt;
99

1010
use crate::conversions::parquet_to_vortex;
1111
use crate::datasets::Dataset;
1212
use crate::datasets::data_downloads::download_data;
13-
use crate::{IdempotentPath, idempotent_async};
13+
use crate::{IdempotentPath, SESSION, idempotent_async};
1414

1515
/// Datasets which can be downloaded over HTTP in Parquet format.
1616
///
@@ -57,7 +57,8 @@ impl Dataset for DownloadableDataset {
5757
let vortex = dir.join(format!("{}.vortex", self.name()));
5858
download_data(parquet.clone(), self.parquet_url()).await?;
5959
idempotent_async(&vortex, async |path| -> anyhow::Result<()> {
60-
VortexWriteOptions::default()
60+
SESSION
61+
.write_options()
6162
.write(
6263
&mut File::create(path)
6364
.await
@@ -70,7 +71,8 @@ impl Dataset for DownloadableDataset {
7071
})
7172
.await?;
7273

73-
Ok(VortexOpenOptions::new()
74+
Ok(SESSION
75+
.open_options()
7476
.open(vortex.as_path())
7577
.await?
7678
.scan()?

bench-vortex/src/fineweb/mod.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ use log::info;
1414
use parquet::arrow::async_writer::AsyncFileWriter;
1515
use url::Url;
1616
use vortex::compressor::CompactCompressor;
17-
use vortex::file::{VortexWriteOptions, WriteStrategyBuilder};
17+
use vortex::file::{WriteOptionsSessionExt, WriteStrategyBuilder};
1818
use vortex_datafusion::VortexFormat;
1919

2020
use crate::benchmark_trait::Benchmark;
2121
use crate::conversions::parquet_to_vortex;
2222
use crate::engines::EngineCtx;
23-
use crate::{BenchmarkDataset, Format, Target, idempotent_async};
23+
use crate::{BenchmarkDataset, Format, SESSION, Target, idempotent_async};
2424

2525
/// URL to the sample file
2626
const SAMPLE_URL: &str = "https://huggingface.co/datasets/HuggingFaceFW/fineweb/resolve/v1.4.0/sample/10BT/001_00000.parquet";
@@ -146,7 +146,8 @@ impl Benchmark for Fineweb {
146146
info!("Converting FineWeb to Vortex with default compressor");
147147
let array_stream = parquet_to_vortex(parquet)?;
148148
let w = tokio::fs::File::create(vortex_path).await?;
149-
VortexWriteOptions::default()
149+
SESSION
150+
.write_options()
150151
.write(w, array_stream)
151152
.await
152153
.map_err(|e| anyhow::anyhow!("Failed to write to VortexWriter: {e}"))
@@ -158,7 +159,8 @@ impl Benchmark for Fineweb {
158159
info!("Converting FineWeb to Vortex with Compact compressor");
159160
let array_stream = parquet_to_vortex(parquet)?;
160161
let w = tokio::fs::File::create(vortex_path).await?;
161-
VortexWriteOptions::default()
162+
SESSION
163+
.write_options()
162164
.with_strategy(
163165
WriteStrategyBuilder::new()
164166
.with_compressor(CompactCompressor::default())
@@ -221,7 +223,9 @@ pub async fn register_table(
221223
.with_listing_options(
222224
ListingOptions::new(match format {
223225
Format::Parquet => Arc::from(ParquetFormat::new()),
224-
Format::OnDiskVortex | Format::VortexCompact => Arc::from(VortexFormat::default()),
226+
Format::OnDiskVortex | Format::VortexCompact => {
227+
Arc::from(VortexFormat::new(SESSION.clone()))
228+
}
225229
_ => anyhow::bail!("unsupported format for `fineweb` bench: {}", format),
226230
})
227231
.with_session_config_options(session.state().config()),

0 commit comments

Comments
 (0)