-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Closed
Description
Describe the bug
After upgrading parquet to 56, we encountered Parquet error: item_reader def levels are None when reading nested field with row filter.
To Reproduce
[dependencies]
arrow = "56"
parquet = { version = "56", features = ["async"] }
tokio = { version = "1", features = ["full"] }
futures = "0.3"
use arrow::array::{BooleanArray, Float32Builder, Int32Array, ListBuilder};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use futures::stream::StreamExt;
use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter};
use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
use std::sync::Arc;
use tokio::fs::File;
#[tokio::main]
async fn main() {
write_parquet_file().await.unwrap();
read_parquet_file().await.unwrap();
}
async fn read_parquet_file() -> Result<(), Box<dyn std::error::Error>> {
let file = File::open("output.parquet").await?;
let builder = ParquetRecordBatchStreamBuilder::new(file).await?;
let predicate = ArrowPredicateFn::new(ProjectionMask::all(), |batch| {
Ok(BooleanArray::from(vec![true; batch.num_rows()]))
});
let projection_mask = ProjectionMask::all();
let mut stream = builder
.with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
.with_projection(projection_mask)
.build()?;
while let Some(batch) = stream.next().await {
let batch = batch?;
println!("Read batch {batch:?}");
}
Ok(())
}
async fn write_parquet_file() -> Result<(), Box<dyn std::error::Error>> {
let list_inner_field = Arc::new(Field::new("item", DataType::Float32, true));
let table_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("vector", DataType::List(list_inner_field.clone()), true),
]));
let mut list_builder =
ListBuilder::new(Float32Builder::new()).with_field(list_inner_field.clone());
list_builder.values().append_slice(&[10.0, 10.0, 10.0]);
list_builder.append(true);
list_builder.values().append_slice(&[20.0, 20.0, 20.0]);
list_builder.append(true);
list_builder.values().append_slice(&[30.0, 30.0, 30.0]);
list_builder.append(true);
list_builder.values().append_slice(&[40.0, 40.0, 40.0]);
list_builder.append(true);
let list_array = list_builder.finish();
let data = vec![RecordBatch::try_new(
table_schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
Arc::new(list_array),
],
)?];
let file = File::create("output.parquet").await?;
let mut writer = AsyncArrowWriter::try_new(file, table_schema, None)?;
for batch in data {
writer.write(&batch).await?;
}
writer.close().await?;
println!("Parquet file written successfully!");
Ok(())
}
thread 'main' panicked at src\main.rs:13:31:
called `Result::unwrap()` on an `Err` value: External(ParquetError("Parquet error: item_reader def levels are None."))
note: run with `RUST_BACKTRACE=1` environment variable to display a backtraceAbove code works in parquet 55.
Expected behavior
Additional context