-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Replace buffer management with Arrow buffers
This implements a second version of the Query interface using Arrow Arrays. The core idea here is that we can share Arrow arrays freely and then convert them to mutable buffers as long as there are no external references to them. This is all done safely and returns an error if any buffer is externally referenced.
- Loading branch information
Showing
25 changed files
with
4,139 additions
and
22 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
use std::path::PathBuf; | ||
use std::sync::Arc; | ||
|
||
use arrow::array as aa; | ||
use itertools::izip; | ||
|
||
use tiledb::array::{ | ||
Array, ArrayType, AttributeData, CellOrder, DimensionData, DomainData, | ||
SchemaData, TileOrder, | ||
}; | ||
use tiledb::context::Context; | ||
use tiledb::error::Error as TileDBError; | ||
use tiledb::query_arrow::{QueryBuilder, QueryLayout, QueryStatus, QueryType}; | ||
use tiledb::Result as TileDBResult; | ||
use tiledb::{Datatype, Factory}; | ||
|
||
const ARRAY_URI: &str = "multi_range_slicing"; | ||
|
||
/// This example creates a 4x4 dense array with the contents: | ||
/// | ||
/// Col: 1 2 3 4 | ||
/// Row: 1 1 2 3 4 | ||
/// 2 5 6 7 8 | ||
/// 3 9 10 11 12 | ||
/// 4 13 14 15 16 | ||
/// | ||
/// The query run restricts rows to [1, 2, 4] and returns all columns which | ||
/// should produce these rows: | ||
/// | ||
/// Row Col Value | ||
/// 1 1 1 | ||
/// 1 2 2 | ||
/// 1 3 3 | ||
/// 1 4 4 | ||
/// 2 1 5 | ||
/// 2 2 6 | ||
/// 2 3 7 | ||
/// 2 4 8 | ||
/// 4 1 13 | ||
/// 4 2 14 | ||
/// 4 3 15 | ||
/// 4 4 16 | ||
fn main() -> TileDBResult<()> { | ||
if let Ok(manifest_dir) = std::env::var("CARGO_MANIFEST_DIR") { | ||
let _ = std::env::set_current_dir( | ||
PathBuf::from(manifest_dir).join("examples").join("output"), | ||
); | ||
} | ||
|
||
let ctx = Context::new()?; | ||
if Array::exists(&ctx, ARRAY_URI)? { | ||
Array::delete(&ctx, ARRAY_URI)?; | ||
} | ||
|
||
create_array(&ctx)?; | ||
write_array(&ctx)?; | ||
|
||
let array = Array::open(&ctx, ARRAY_URI, tiledb::array::Mode::Read)?; | ||
let mut query = QueryBuilder::new(array, QueryType::Read) | ||
.with_layout(QueryLayout::RowMajor) | ||
.start_fields() | ||
.field("rows") | ||
.field("cols") | ||
.field("a") | ||
.end_fields() | ||
.start_subarray() | ||
.add_range("rows", &[1, 2]) | ||
.add_range("rows", &[4, 4]) | ||
.add_range("cols", &[1, 4]) | ||
.end_subarray() | ||
.build() | ||
.map_err(|e| TileDBError::Other(format!("{e}")))?; | ||
|
||
let status = query | ||
.submit() | ||
.map_err(|e| TileDBError::Other(format!("{e}")))?; | ||
|
||
if !matches!(status, QueryStatus::Completed) { | ||
return Err(TileDBError::Other("Make this better.".to_string())); | ||
} | ||
|
||
let buffers = query | ||
.buffers() | ||
.map_err(|e| TileDBError::Other(format!("{e}")))?; | ||
|
||
let rows = buffers.get::<aa::Int32Array>("rows").unwrap(); | ||
let cols = buffers.get::<aa::Int32Array>("cols").unwrap(); | ||
let attr = buffers.get::<aa::Int32Array>("a").unwrap(); | ||
|
||
for (r, c, a) in izip!(rows.values(), cols.values(), attr.values()) { | ||
println!("{} {} {}", r, c, a); | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
fn create_array(ctx: &Context) -> TileDBResult<()> { | ||
let schema = SchemaData { | ||
array_type: ArrayType::Dense, | ||
domain: DomainData { | ||
dimension: vec![ | ||
DimensionData { | ||
name: "rows".to_owned(), | ||
datatype: Datatype::Int32, | ||
constraints: ([1i32, 4], 4i32).into(), | ||
filters: None, | ||
}, | ||
DimensionData { | ||
name: "cols".to_owned(), | ||
datatype: Datatype::Int32, | ||
constraints: ([1i32, 4], 4i32).into(), | ||
filters: None, | ||
}, | ||
], | ||
}, | ||
attributes: vec![AttributeData { | ||
name: "a".to_owned(), | ||
datatype: Datatype::Int32, | ||
..Default::default() | ||
}], | ||
tile_order: Some(TileOrder::RowMajor), | ||
cell_order: Some(CellOrder::RowMajor), | ||
|
||
..Default::default() | ||
}; | ||
|
||
let schema = schema.create(ctx)?; | ||
Array::create(ctx, ARRAY_URI, schema)?; | ||
Ok(()) | ||
} | ||
|
||
fn write_array(ctx: &Context) -> TileDBResult<()> { | ||
let data = Arc::new(aa::Int32Array::from(vec![ | ||
1i32, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, | ||
])); | ||
|
||
let array = | ||
tiledb::Array::open(ctx, ARRAY_URI, tiledb::array::Mode::Write)?; | ||
|
||
let mut query = QueryBuilder::new(array, QueryType::Write) | ||
.with_layout(QueryLayout::RowMajor) | ||
.start_fields() | ||
.field_with_buffer("a", data) | ||
.end_fields() | ||
.build() | ||
.map_err(|e| TileDBError::Other(format!("{e}")))?; | ||
|
||
let (_, _) = query | ||
.submit() | ||
.and_then(|_| query.finalize()) | ||
.map_err(|e| TileDBError::Other(format!("{e}")))?; | ||
|
||
Ok(()) | ||
} |
Oops, something went wrong.