Skip to content

Commit

Permalink
Pull real data from a chunkstore
Browse files Browse the repository at this point in the history
  • Loading branch information
jleibs committed Aug 2, 2024
1 parent e74baa7 commit 3ba95cb
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 48 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ console_error_panic_hook = "0.1.6"
convert_case = "0.6"
criterion = "0.5"
crossbeam = "0.8"
datafusion = "39.0"
datafusion = "36.0"
directories = "5"
document-features = "0.2.8"
ehttp = "0.5.0"
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ re_chunk_store.workspace = true
re_format.workspace = true
re_log.workspace = true
re_log_types.workspace = true
re_types.workspace = true

anyhow.workspace = true
ahash.workspace = true
datafusion.workspace = true
document-features.workspace = true
tokio.workspace = true
arrow = "52.1.0"
arrow2 = { workspace = true, features = ["arrow"] }
async-trait = "0.1.81"

[dev-dependencies]
re_types.workspace = true

[lib]
bench = false
Expand Down
22 changes: 17 additions & 5 deletions crates/store/re_datafusion/examples/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use re_chunk::{
};
use re_chunk_store::ChunkStore;
use re_datafusion::create_datafusion_context;
use re_log_types::example_components::MyLabel;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand All @@ -14,21 +15,32 @@ async fn main() -> anyhow::Result<()> {
let timeline_frame = Timeline::new_sequence("frame");
let timepoint = TimePoint::from_iter([(timeline_frame, 10)]);

let point1 = MyPoint::new(1.0, 1.0);
let point2 = MyPoint::new(2.0, 2.0);
let point1 = MyPoint::new(1.0, 2.0);
let point2 = MyPoint::new(3.0, 4.0);
let point3 = MyPoint::new(5.0, 6.0);
let label1 = MyLabel("point1".to_owned());
let label2 = MyLabel("point2".to_owned());
let label3 = MyLabel("point3".to_owned());

let mut store = ChunkStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
Default::default(),
);

let chunk = Chunk::builder(entity_path.clone())
.with_component_batch(RowId::new(), timepoint.clone(), &[point1])
.with_component_batches(
RowId::new(),
timepoint.clone(),
[&[point1, point2] as _, &[label1, label2] as _],
)
.build()?;
store.insert_chunk(&Arc::new(chunk))?;

let chunk = Chunk::builder(entity_path.clone())
.with_component_batch(RowId::new(), timepoint.clone(), &[point2])
.with_component_batches(
RowId::new(),
timepoint.clone(),
[&[point3] as _, &[label3] as _],
)
.build()?;
store.insert_chunk(&Arc::new(chunk))?;

Expand Down
100 changes: 60 additions & 40 deletions crates/store/re_datafusion/src/chunk_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,25 @@ use std::any::Any;
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;

use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow2::array::{Arrow2Arrow as _, ListArray};
use datafusion::arrow::array::{ArrayRef, GenericListArray};
use datafusion::arrow::datatypes::{Field, Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream,
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream,
};
use datafusion::prelude::*;

use async_trait::async_trait;
use re_chunk_store::ChunkStore;
use re_types::Archetype as _;

//use crate::conversions::convert_datatype_arrow2_to_arrow;

/// A custom datasource, used to represent a datastore with a single index
#[derive(Clone)]
Expand Down Expand Up @@ -58,11 +62,23 @@ impl TableProvider for CustomDataSource {
}

fn schema(&self) -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
Field::new("value", DataType::Float64, false),
]))
// TODO(jleibs): This should come from the chunk store directly
let components = re_log_types::example_components::MyPoints::all_components();

let fields: Vec<Field> = components
.iter()
.filter_map(|c| {
self.store.lookup_datatype(c).map(|dt| {
Field::new(
c.as_str(),
ListArray::<i32>::default_datatype(dt.clone()).into(),
true,
)
})
})
.collect();

Arc::new(Schema::new(fields))
}

fn table_type(&self) -> TableType {
Expand All @@ -85,7 +101,6 @@ impl TableProvider for CustomDataSource {
struct CustomExec {
_db: CustomDataSource,
projected_schema: SchemaRef,
cache: PlanProperties,
}

impl CustomExec {
Expand All @@ -95,23 +110,11 @@ impl CustomExec {
db: CustomDataSource,
) -> Result<Self> {
let projected_schema = project_schema(schema, projections)?;
let cache = Self::compute_properties(projected_schema.clone());
Ok(Self {
_db: db,
projected_schema,
cache,
})
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
)
}
}

impl DisplayAs for CustomExec {
Expand All @@ -121,19 +124,23 @@ impl DisplayAs for CustomExec {
}

impl ExecutionPlan for CustomExec {
fn name(&self) -> &'static str {
"CustomExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &PlanProperties {
&self.cache
fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
}

fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}

Expand All @@ -149,19 +156,32 @@ impl ExecutionPlan for CustomExec {
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
use arrow::array::{Float64Array, Int32Array, StringArray};

let batch = RecordBatch::try_new(
self.projected_schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["A", "B", "C"])),
Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
],
)?;
let batches: datafusion::arrow::error::Result<Vec<RecordBatch>> = self
._db
.store
.iter_chunks()
.map(|chunk| {
let components = re_log_types::example_components::MyPoints::all_components();

RecordBatch::try_new(
self.projected_schema.clone(),
components
.iter()
.filter_map(|c| {
chunk.components().get(c).map(|c| {
let data = c.to_data();
let converted = GenericListArray::<i32>::from(data);

Arc::new(converted) as ArrayRef
})
})
.collect(),
)
})
.collect();

Ok(Box::pin(MemoryStream::try_new(
vec![batch],
batches?,
self.schema(),
None,
)?))
Expand Down

0 comments on commit 3ba95cb

Please sign in to comment.