diff --git a/Cargo.lock b/Cargo.lock index 8d1ec9271..50ec83afa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -533,6 +533,28 @@ dependencies = [ "abi_stable", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -1174,8 +1196,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" dependencies = [ "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-link", ] @@ -2775,6 +2799,35 @@ dependencies = [ "serde", ] +[[package]] +name = "geoarrow-array" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc1cc4106ac0a0a512c398961ce95d8150475c84a84e17c4511c3643fa120a17" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-schema", + "geo-traits", + "geoarrow-schema", + "num-traits", + "wkb", + "wkt 0.14.0", +] + +[[package]] +name = "geoarrow-schema" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e97be4e9f523f92bd6a0e0458323f4b783d073d011664decd8dbf05651704f34" +dependencies = [ + "arrow-schema", + "geo-traits", + "serde", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "geographiclib-rs" version = "0.2.5" @@ -3449,6 +3502,42 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "las" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f5fc2ade5beaea800d3e615ce9886abb8d325be0c2bf2c94553fa88677d1b30" +dependencies = [ + "byteorder", + "chrono", + "laz", + "log", + "num-traits", + "thiserror 2.0.17", + "uuid", +] + +[[package]] +name = "las-crs" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b25b1e192564ebf5b563e821c4924ce97bd703f445e851117db2e656499c21c" +dependencies = [ + "las", + "log", + "thiserror 2.0.17", +] + +[[package]] +name = "laz" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d5b7251e6d7aee8f4263f35d61e5e40d22bb189ecfc4f9fbd550806fed98cce" +dependencies = [ + "byteorder", + "num-traits", +] + [[package]] name = "lexical-core" version = "1.0.6" @@ -4948,6 +5037,7 @@ dependencies = [ "sedona-geometry", "sedona-geoparquet", "sedona-geos", + "sedona-pointcloud", "sedona-proj", "sedona-raster-functions", "sedona-s2geography", @@ -5266,6 +5356,34 @@ dependencies = [ "which", ] +[[package]] +name = "sedona-pointcloud" +version = "0.3.0" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-schema", + "async-stream", + "async-trait", + "datafusion-catalog", + "datafusion-common", + "datafusion-datasource", + "datafusion-execution", + "datafusion-physical-expr", + "datafusion-physical-plan", + "datafusion-pruning", + "futures", + "geoarrow-array", + "geoarrow-schema", + "las", + "las-crs", + "laz", + "object_store", + "sedona-expr", + "sedona-geometry", + "tokio", +] + [[package]] name = "sedona-proj" version = "0.3.0" @@ -5479,6 +5597,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "sedonadb-rust-pointcloud-example" +version = "0.0.1" +dependencies = [ + "sedona", + "sedona-pointcloud", + "tokio", +] + [[package]] name = "sedonadbr" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 85c86da04..d9cce5671 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ members = [ "rust/sedona-geo", "rust/sedona-geometry", "rust/sedona-geoparquet", + "rust/sedona-pointcloud", "rust/sedona-raster", "rust/sedona-raster-functions", "rust/sedona-schema", @@ -41,6 +42,7 @@ members = [ "rust/sedona-testing", "rust/sedona", "sedona-cli", + "examples/sedonadb-rust-pointcloud", ] resolver = "2" @@ -81,6 +83,7 @@ datafusion = { version = "51.0.0", default-features = false } datafusion-catalog = { version = "51.0.0" } datafusion-common = { version = "51.0.0", default-features = false } datafusion-common-runtime = { version = "51.0.0", default-features = false } +datafusion-datasource = { version = "51.0.0", default-features = false } datafusion-datasource-parquet = { version = "51.0.0" } datafusion-execution = { version = "51.0.0", default-features = false } datafusion-expr = { version = "51.0.0" } @@ -88,6 +91,7 @@ datafusion-ffi = { version = "51.0.0" } datafusion-functions-nested = { version = "51.0.0" } datafusion-physical-expr = { version = "51.0.0" } datafusion-physical-plan = { version = "51.0.0" } +datafusion-pruning = { version = "51.0.0" } dirs = "6.0.0" env_logger = "0.11" fastrand = "2.0" @@ -140,6 +144,7 @@ sedona-geo-generic-alg = { version = "0.3.0", path = "rust/sedona-geo-generic-al sedona-geo-traits-ext = { version = "0.3.0", path = "rust/sedona-geo-traits-ext" } sedona-geometry = { version = "0.3.0", path = "rust/sedona-geometry" } sedona-geoparquet = { version = "0.3.0", path = "rust/sedona-geoparquet" } +sedona-pointcloud = { version = "0.3.0", path = "rust/sedona-pointcloud" } sedona-raster = { version = "0.3.0", path = "rust/sedona-raster" } sedona-raster-functions = { version = "0.3.0", path = "rust/sedona-raster-functions" } sedona-schema = { version = "0.3.0", path = "rust/sedona-schema" } diff --git a/c/sedona-s2geography/s2geography b/c/sedona-s2geography/s2geography index c4d4e5f74..e63647cb4 160000 --- a/c/sedona-s2geography/s2geography +++ b/c/sedona-s2geography/s2geography @@ -1 +1 @@ -Subproject commit c4d4e5f7416dc203d3cb0485d56f5d72e9ccb6dd +Subproject commit e63647cb4536648b06cc5aa6b97ce1777a5f6163 diff --git a/examples/sedonadb-rust-pointcloud/Cargo.toml b/examples/sedonadb-rust-pointcloud/Cargo.toml new file mode 100644 index 000000000..4c764c3fa --- /dev/null +++ b/examples/sedonadb-rust-pointcloud/Cargo.toml @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "sedonadb-rust-pointcloud-example" +version = "0.0.1" +authors = ["Apache Sedona "] +license = "Apache-2.0" +homepage = "https://github.com/apache/sedona-db" +repository = "https://github.com/apache/sedona-db" +description = "Apache SedonaDB Rust API Example" +readme = "README.md" +edition = "2021" +rust-version = "1.86" +publish = false + +[dependencies] +sedona = { workspace = true, features = ["pointcloud"]} +sedona-pointcloud = { workspace = true } +tokio = { version = "1.44", features = ["rt-multi-thread"]} diff --git a/examples/sedonadb-rust-pointcloud/src/main.rs b/examples/sedonadb-rust-pointcloud/src/main.rs new file mode 100644 index 000000000..9749f3347 --- /dev/null +++ b/examples/sedonadb-rust-pointcloud/src/main.rs @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// Because a number of methods only return Err() for not implemented, +// the compiler doesn't know how to guess which impl RecordBatchReader +// will be returned. When we implement the methods, we can remove this. + +use std::error::Error; + +use sedona::context::{SedonaContext, SedonaDataFrame}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let ctx = SedonaContext::new_local_interactive().await?; + let url = "https://basisdata.nl/hwh-ahn/ahn4/01_LAZ/C_69AZ1.LAZ"; + let df = ctx.sql(&format!("SELECT geometry FROM \"{url}\"")).await?; + let output = df.show_sedona(&ctx, Some(5), Default::default()).await?; + println!("{output}"); + Ok(()) +} diff --git a/rust/sedona-pointcloud/Cargo.toml b/rust/sedona-pointcloud/Cargo.toml new file mode 100644 index 000000000..7281ad1c5 --- /dev/null +++ b/rust/sedona-pointcloud/Cargo.toml @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "sedona-pointcloud" +version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description.workspace = true +readme.workspace = true +edition.workspace = true +rust-version.workspace = true +keywords.workspace = true +categories.workspace = true + +[dependencies] +async-stream = "0.3.6" +async-trait = { workspace = true } +arrow-array = { workspace = true } +arrow-buffer = { workspace = true } +arrow-schema = { workspace = true } +datafusion-catalog = { workspace = true } +datafusion-common = { workspace = true } +datafusion-datasource = { workspace = true } +datafusion-execution = { workspace = true } +datafusion-physical-expr = { workspace = true } +datafusion-physical-plan = { workspace = true } +datafusion-pruning = { workspace = true } +futures = { workspace = true } +geoarrow-array = { version = "0.7.0" } +geoarrow-schema = { version = "0.7.0" } +las = { version = "0.9.8", features = ["laz"] } +las-crs = { version = "1.0.0" } +laz = "0.11.1" +object_store = { workspace = true } +sedona-expr = { workspace = true } +sedona-geometry = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true } diff --git a/rust/sedona-pointcloud/src/laz/builder.rs b/rust/sedona-pointcloud/src/laz/builder.rs new file mode 100644 index 000000000..ea06eb76a --- /dev/null +++ b/rust/sedona-pointcloud/src/laz/builder.rs @@ -0,0 +1,571 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{fmt::Debug, sync::Arc}; + +use arrow_array::{ + builder::{ + ArrayBuilder, BinaryBuilder, BooleanBuilder, FixedSizeBinaryBuilder, Float32Builder, + Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, UInt16Builder, + UInt32Builder, UInt64Builder, UInt8Builder, + }, + Array, ArrayRef, BooleanArray, FixedSizeBinaryArray, Float32Array, Float64Array, StructArray, + UInt16Array, UInt8Array, +}; +use arrow_buffer::ScalarBuffer; +use arrow_schema::{ArrowError, DataType}; +use geoarrow_array::{ + array::{CoordBuffer, PointArray, SeparatedCoordBuffer}, + GeoArrowArray, +}; +use geoarrow_schema::Dimension; +use las::{Header, Point}; + +use crate::laz::{ + metadata::ExtraAttribute, + options::{LasExtraBytes, LasPointEncoding}, + schema::try_schema_from_header, +}; + +#[derive(Debug)] +pub struct RowBuilder { + x: Float64Builder, + y: Float64Builder, + z: Float64Builder, + intensity: UInt16Builder, + return_number: UInt8Builder, + number_of_returns: UInt8Builder, + is_synthetic: BooleanBuilder, + is_key_point: BooleanBuilder, + is_withheld: BooleanBuilder, + is_overlap: BooleanBuilder, + scanner_channel: UInt8Builder, + scan_direction: UInt8Builder, + is_edge_of_flight_line: BooleanBuilder, + classification: UInt8Builder, + user_data: UInt8Builder, + scan_angle: Float32Builder, + point_source_id: UInt16Builder, + gps_time: Float64Builder, + red: UInt16Builder, + green: UInt16Builder, + blue: UInt16Builder, + nir: UInt16Builder, + extra: FixedSizeBinaryBuilder, + header: Arc
, + point_encoding: LasPointEncoding, + extra_bytes: LasExtraBytes, + extra_attributes: Arc>, +} + +impl RowBuilder { + pub fn new(capacity: usize, header: Arc
) -> Self { + Self { + x: Float64Array::builder(capacity), + y: Float64Array::builder(capacity), + z: Float64Array::builder(capacity), + intensity: UInt16Array::builder(capacity), + return_number: UInt8Array::builder(capacity), + number_of_returns: UInt8Array::builder(capacity), + is_synthetic: BooleanArray::builder(capacity), + is_key_point: BooleanArray::builder(capacity), + is_withheld: BooleanArray::builder(capacity), + is_overlap: BooleanArray::builder(capacity), + scanner_channel: UInt8Array::builder(capacity), + scan_direction: UInt8Array::builder(capacity), + is_edge_of_flight_line: BooleanArray::builder(capacity), + classification: UInt8Array::builder(capacity), + user_data: UInt8Array::builder(capacity), + scan_angle: Float32Array::builder(capacity), + point_source_id: UInt16Array::builder(capacity), + gps_time: Float64Array::builder(capacity), + red: UInt16Array::builder(capacity), + green: UInt16Array::builder(capacity), + blue: UInt16Array::builder(capacity), + nir: UInt16Array::builder(capacity), + extra: FixedSizeBinaryBuilder::with_capacity( + capacity, + header.point_format().extra_bytes as i32, + ), + + header, + point_encoding: Default::default(), + extra_bytes: Default::default(), + extra_attributes: Arc::new(Vec::new()), + } + } + + pub fn with_point_encoding(mut self, point_encoding: LasPointEncoding) -> Self { + self.point_encoding = point_encoding; + self + } + + pub fn with_extra_attributes( + mut self, + attributes: Arc>, + extra_bytes: LasExtraBytes, + ) -> Self { + self.extra_attributes = attributes; + self.extra_bytes = extra_bytes; + self + } + + pub fn append(&mut self, p: Point) { + self.x.append_value(p.x); + self.y.append_value(p.y); + self.z.append_value(p.z); + self.intensity.append_option(Some(p.intensity)); + self.return_number.append_value(p.return_number); + self.number_of_returns.append_value(p.number_of_returns); + self.is_synthetic.append_value(p.is_synthetic); + self.is_key_point.append_value(p.is_key_point); + self.is_withheld.append_value(p.is_withheld); + self.is_overlap.append_value(p.is_overlap); + self.scanner_channel.append_value(p.scanner_channel); + self.scan_direction.append_value(p.scan_direction as u8); + self.is_edge_of_flight_line + .append_value(p.is_edge_of_flight_line); + self.classification.append_value(u8::from(p.classification)); + self.user_data.append_value(p.user_data); + self.scan_angle.append_value(p.scan_angle); + self.point_source_id.append_value(p.point_source_id); + if self.header.point_format().has_gps_time { + self.gps_time.append_value(p.gps_time.unwrap()); + } + if self.header.point_format().has_color { + let color = p.color.unwrap(); + self.red.append_value(color.red); + self.green.append_value(color.green); + self.blue.append_value(color.blue); + } + if self.header.point_format().has_nir { + self.nir.append_value(p.nir.unwrap()); + } + if self.header.point_format().extra_bytes > 0 { + self.extra.append_value(p.extra_bytes).unwrap(); + } + } + + /// Note: returns StructArray to allow nesting within another array if desired + pub fn finish(&mut self) -> Result { + let schema = try_schema_from_header(&self.header, self.point_encoding, self.extra_bytes)?; + + let mut columns = match self.point_encoding { + LasPointEncoding::Plain => vec![ + Arc::new(self.x.finish()) as ArrayRef, + Arc::new(self.y.finish()) as ArrayRef, + Arc::new(self.z.finish()) as ArrayRef, + ], + LasPointEncoding::Wkb => { + const POINT_SIZE: usize = 29; + + let n: usize = self.x.len(); + + let mut builder = BinaryBuilder::with_capacity(n, n * POINT_SIZE); + + let x = self.x.finish(); + let y = self.y.finish(); + let z = self.z.finish(); + + let mut wkb_bytes = [0_u8; POINT_SIZE]; + wkb_bytes[0] = 0x01; // Little-endian + wkb_bytes[1..5].copy_from_slice(&[0xE9, 0x03, 0x00, 0x00]); // Point Z type (1001) + + for i in 0..n { + let x = unsafe { x.value_unchecked(i) }; + let y = unsafe { y.value_unchecked(i) }; + let z = unsafe { z.value_unchecked(i) }; + + wkb_bytes[5..13].copy_from_slice(x.to_le_bytes().as_slice()); + wkb_bytes[13..21].copy_from_slice(y.to_le_bytes().as_slice()); + wkb_bytes[21..29].copy_from_slice(z.to_le_bytes().as_slice()); + + builder.append_value(wkb_bytes); + } + + vec![Arc::new(builder.finish()) as ArrayRef] + } + LasPointEncoding::Native => { + let buffers = [ + self.x.finish().into_parts().1, + self.y.finish().into_parts().1, + self.z.finish().into_parts().1, + ScalarBuffer::from(vec![]), + ]; + let coords = CoordBuffer::Separated(SeparatedCoordBuffer::from_array( + buffers, + Dimension::XYZ, + )?); + let points = PointArray::new(coords, None, Default::default()); + vec![points.to_array_ref()] + } + }; + + columns.extend([ + Arc::new(self.intensity.finish()) as ArrayRef, + Arc::new(self.return_number.finish()) as ArrayRef, + Arc::new(self.number_of_returns.finish()) as ArrayRef, + Arc::new(self.is_synthetic.finish()) as ArrayRef, + Arc::new(self.is_key_point.finish()) as ArrayRef, + Arc::new(self.is_withheld.finish()) as ArrayRef, + Arc::new(self.is_overlap.finish()) as ArrayRef, + Arc::new(self.scanner_channel.finish()) as ArrayRef, + Arc::new(self.scan_direction.finish()) as ArrayRef, + Arc::new(self.is_edge_of_flight_line.finish()) as ArrayRef, + Arc::new(self.classification.finish()) as ArrayRef, + Arc::new(self.user_data.finish()) as ArrayRef, + Arc::new(self.scan_angle.finish()) as ArrayRef, + Arc::new(self.point_source_id.finish()) as ArrayRef, + ]); + if self.header.point_format().has_gps_time { + columns.push(Arc::new(self.gps_time.finish()) as ArrayRef); + } + if self.header.point_format().has_color { + columns.extend([ + Arc::new(self.red.finish()) as ArrayRef, + Arc::new(self.green.finish()) as ArrayRef, + Arc::new(self.blue.finish()) as ArrayRef, + ]); + } + if self.header.point_format().has_nir { + columns.push(Arc::new(self.nir.finish()) as ArrayRef); + } + + // extra bytes + let num_extra_bytes = self.header.point_format().extra_bytes as usize; + if num_extra_bytes > 0 { + match self.extra_bytes { + LasExtraBytes::Typed => { + let extra = self.extra.finish(); + + let mut pos = 0; + + for attribute in self.extra_attributes.iter() { + pos += build_attribute(attribute, pos, &extra, &mut columns)?; + } + } + LasExtraBytes::Blob => columns.push(Arc::new(self.extra.finish())), + LasExtraBytes::Ignore => (), + } + } + + Ok(StructArray::new(schema.fields.to_owned(), columns, None)) + } +} + +fn build_attribute( + attribute: &ExtraAttribute, + pos: usize, + extra: &FixedSizeBinaryArray, + columns: &mut Vec, +) -> Result { + let scale = attribute.scale.unwrap_or(1.0); + let offset = attribute.offset.unwrap_or(0.0); + + let width = if let DataType::FixedSizeBinary(width) = attribute.data_type { + width as usize + } else { + attribute.data_type.primitive_width().unwrap() + }; + + let iter = extra.iter().map(|b| &b.unwrap()[pos..pos + width]); + + match &attribute.data_type { + DataType::FixedSizeBinary(_) => { + let data = FixedSizeBinaryArray::try_from_iter(iter)?; + columns.push(Arc::new(data) as ArrayRef) + } + DataType::Int8 => { + let no_data = attribute.no_data.map(i64::from_le_bytes); + + let iter = iter.map(|d| { + let v = d[0] as i8; + if let Some(no_data) = no_data { + if no_data == v as i64 { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = Int8Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::Int16 => { + let no_data = attribute.no_data.map(i64::from_le_bytes); + + let iter = iter.map(|d| { + let v = i16::from_le_bytes(d.try_into().unwrap()); + if let Some(no_data) = no_data { + if no_data == v as i64 { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = Int16Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::Int32 => { + let no_data = attribute.no_data.map(i64::from_le_bytes); + + let iter = iter.map(|d| { + let v = i32::from_le_bytes(d.try_into().unwrap()); + if let Some(no_data) = no_data { + if no_data == v as i64 { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = Int32Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::Int64 => { + let no_data = attribute.no_data.map(i64::from_le_bytes); + + let iter = iter.map(|d| { + let v = i64::from_le_bytes(d.try_into().unwrap()); + if let Some(no_data) = no_data { + if no_data == v { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = Int64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::UInt8 => { + let no_data = attribute.no_data.map(u64::from_le_bytes); + + let iter = iter.map(|d| { + let v = d[0]; + if let Some(no_data) = no_data { + if no_data == v as u64 { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = UInt8Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::UInt16 => { + let no_data = attribute.no_data.map(u64::from_le_bytes); + + let iter = iter.map(|d| { + let v = u16::from_le_bytes(d.try_into().unwrap()); + if let Some(no_data) = no_data { + if no_data == v as u64 { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = UInt16Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::UInt32 => { + let no_data = attribute.no_data.map(u64::from_le_bytes); + + let iter = iter.map(|d| { + let v = u32::from_le_bytes(d.try_into().unwrap()); + if let Some(no_data) = no_data { + if no_data == v as u64 { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = UInt32Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::UInt64 => { + let no_data = attribute.no_data.map(u64::from_le_bytes); + + let iter = iter.map(|d| { + let v = u64::from_le_bytes(d.try_into().unwrap()); + if let Some(no_data) = no_data { + if no_data == v { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = UInt64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::Float32 => { + let no_data = attribute.no_data.map(f64::from_le_bytes); + + let iter = iter.map(|d| { + let v = f32::from_le_bytes(d.try_into().unwrap()); + if let Some(no_data) = no_data { + if no_data == v as f64 { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = Float32Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::Float64 => { + let no_data = attribute.no_data.map(f64::from_le_bytes); + + let iter = iter.map(|d| { + let v = f64::from_le_bytes(d.try_into().unwrap()); + if let Some(no_data) = no_data { + if no_data == v { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + + dt => { + return Err(ArrowError::ExternalError( + format!("Unsupported data type for extra bytes: `{dt}`").into(), + )) + } + } + + Ok(width) +} diff --git a/rust/sedona-pointcloud/src/laz/format.rs b/rust/sedona-pointcloud/src/laz/format.rs new file mode 100644 index 000000000..9db9441c6 --- /dev/null +++ b/rust/sedona-pointcloud/src/laz/format.rs @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, collections::HashMap, fmt, sync::Arc}; + +use arrow_schema::{Schema, SchemaRef}; +use datafusion_catalog::{memory::DataSourceExec, Session}; +use datafusion_common::{ + config::ExtensionOptions, error::DataFusionError, internal_err, + parsers::CompressionTypeVariant, GetExt, Statistics, +}; +use datafusion_datasource::{ + file::FileSource, + file_compression_type::FileCompressionType, + file_format::{FileFormat, FileFormatFactory}, + file_scan_config::FileScanConfig, +}; +use datafusion_physical_plan::ExecutionPlan; +use futures::{StreamExt, TryStreamExt}; +use object_store::{ObjectMeta, ObjectStore}; + +use crate::laz::{metadata::LazMetadataReader, options::LazTableOptions, source::LazSource}; + +const DEFAULT_LAZ_EXTENSION: &str = ".laz"; + +/// Factory struct used to create [LazFormat] +#[derive(Default)] +pub struct LazFormatFactory { + // inner options for LAZ + pub options: Option, +} + +impl LazFormatFactory { + /// Creates an instance of [LazFormatFactory] + pub fn new() -> Self { + Self { options: None } + } + + /// Creates an instance of [LazFormatFactory] with customized default options + pub fn new_with(options: LazTableOptions) -> Self { + Self { + options: Some(options), + } + } +} + +impl FileFormatFactory for LazFormatFactory { + fn create( + &self, + state: &dyn Session, + format_options: &HashMap, + ) -> Result, DataFusionError> { + let mut laz_options = state + .config_options() + .extensions + .get::() + .or_else(|| state.table_options().extensions.get::()) + .cloned() + .or(self.options.clone()) + .unwrap_or_default(); + + for (k, v) in format_options { + laz_options.set(k, v)?; + } + + Ok(Arc::new(LazFormat::default().with_options(laz_options))) + } + + fn default(&self) -> Arc { + Arc::new(LazFormat::default()) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl GetExt for LazFormatFactory { + fn get_ext(&self) -> String { + // Removes the dot, i.e. ".laz" -> "laz" + DEFAULT_LAZ_EXTENSION[1..].to_string() + } +} + +impl fmt::Debug for LazFormatFactory { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LazFormatFactory") + .field("LazFormatFactory", &self.options) + .finish() + } +} + +/// The LAZ `FileFormat` implementation +#[derive(Debug, Default)] +pub struct LazFormat { + pub options: LazTableOptions, +} + +impl LazFormat { + pub fn with_options(mut self, options: LazTableOptions) -> Self { + self.options = options; + self + } +} + +#[async_trait::async_trait] +impl FileFormat for LazFormat { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_ext(&self) -> String { + LazFormatFactory::new().get_ext() + } + + fn get_ext_with_compression( + &self, + file_compression_type: &FileCompressionType, + ) -> Result { + let ext = self.get_ext(); + match file_compression_type.get_variant() { + CompressionTypeVariant::UNCOMPRESSED => Ok(ext), + _ => internal_err!("Laz FileFormat does not support compression."), + } + } + + fn compression_type(&self) -> Option { + Some(FileCompressionType::UNCOMPRESSED) + } + + async fn infer_schema( + &self, + state: &dyn Session, + store: &Arc, + objects: &[ObjectMeta], + ) -> Result { + let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); + + let mut schemas: Vec<_> = futures::stream::iter(objects) + .map(|object_meta| async { + let loc_path = object_meta.location.clone(); + + let schema = LazMetadataReader::new(store, object_meta) + .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .with_options(self.options.clone()) + .fetch_schema() + .await?; + + Ok::<_, DataFusionError>((loc_path, schema)) + }) + .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 + // fetch schemas concurrently, if requested + .buffered(state.config_options().execution.meta_fetch_concurrency) + .try_collect() + .await?; + + schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2)); + + let schemas = schemas + .into_iter() + .map(|(_, schema)| schema) + .collect::>(); + + let schema = Schema::try_merge(schemas)?; + + Ok(Arc::new(schema)) + } + + async fn infer_stats( + &self, + state: &dyn Session, + store: &Arc, + table_schema: SchemaRef, + object: &ObjectMeta, + ) -> Result { + let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); + LazMetadataReader::new(store, object) + .with_options(self.options.clone()) + .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .fetch_statistics(&table_schema) + .await + } + + async fn create_physical_plan( + &self, + _state: &dyn Session, + conf: FileScanConfig, + ) -> Result, DataFusionError> { + Ok(DataSourceExec::from_data_source(conf)) + } + + fn file_source(&self) -> Arc { + Arc::new(LazSource::default().with_options(self.options.clone())) + } +} diff --git a/rust/sedona-pointcloud/src/laz/metadata.rs b/rust/sedona-pointcloud/src/laz/metadata.rs new file mode 100644 index 000000000..f92db9f62 --- /dev/null +++ b/rust/sedona-pointcloud/src/laz/metadata.rs @@ -0,0 +1,451 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + any::Any, + collections::HashMap, + error::Error, + io::{Cursor, Read}, + ops::Range, + sync::Arc, +}; + +use arrow_schema::{DataType, Schema, SchemaRef}; +use datafusion_common::{ + error::DataFusionError, scalar::ScalarValue, stats::Precision, ColumnStatistics, Statistics, +}; +use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; +use las::{ + raw::{Header as RawHeader, Vlr as RawVlr}, + Builder, Header, Vlr, +}; +use laz::laszip::ChunkTable; +use object_store::{ObjectMeta, ObjectStore}; + +use crate::laz::{options::LazTableOptions, schema::try_schema_from_header}; + +/// Laz chunk metadata +#[derive(Debug, Clone)] +pub struct ChunkMeta { + pub num_points: u64, + pub point_offset: u64, + pub byte_range: Range, +} + +/// Laz metadata +#[derive(Debug, Clone)] +pub struct LazMetadata { + pub header: Arc
, + pub chunk_table: Vec, + pub extra_attributes: Arc>, +} + +impl FileMetadata for LazMetadata { + fn as_any(&self) -> &dyn Any { + self + } + + fn memory_size(&self) -> usize { + self.chunk_table.capacity() * std::mem::size_of::() + + self.extra_attributes.capacity() * std::mem::size_of::() + } + + fn extra_info(&self) -> HashMap { + HashMap::new() + } +} + +/// Reader for laz file metadata in object storage. +pub struct LazMetadataReader<'a> { + store: &'a dyn ObjectStore, + object_meta: &'a ObjectMeta, + file_metadata_cache: Option>, + options: LazTableOptions, +} + +impl<'a> LazMetadataReader<'a> { + pub fn new(store: &'a dyn ObjectStore, object_meta: &'a ObjectMeta) -> Self { + Self { + store, + object_meta, + file_metadata_cache: None, + options: Default::default(), + } + } + + /// set file metadata cache + pub fn with_file_metadata_cache( + mut self, + file_metadata_cache: Option>, + ) -> Self { + self.file_metadata_cache = file_metadata_cache; + self + } + + /// set table options + pub fn with_options(mut self, options: LazTableOptions) -> Self { + self.options = options; + self + } + + /// Fetch header + pub async fn fetch_header(&self) -> Result { + fetch_header(self.store, self.object_meta) + .await + .map_err(DataFusionError::External) + } + + /// Fetch laz metadata from the remote object store + pub async fn fetch_metadata(&self) -> Result, DataFusionError> { + let Self { + store, + object_meta, + file_metadata_cache, + options: _, + } = self; + + if let Some(las_file_metadata) = file_metadata_cache + .as_ref() + .and_then(|file_metadata_cache| file_metadata_cache.get(object_meta)) + .and_then(|file_metadata| { + file_metadata + .as_any() + .downcast_ref::() + .map(|laz_file_metadata| Arc::new(laz_file_metadata.to_owned())) + }) + { + return Ok(las_file_metadata); + } + + let header = self.fetch_header().await?; + let extra_attributes = extra_bytes_attributes(&header)?; + let chunk_table = chunk_table(*store, object_meta, &header).await?; + + let metadata = Arc::new(LazMetadata { + header: Arc::new(header), + chunk_table, + extra_attributes: Arc::new(extra_attributes), + }); + + if let Some(file_metadata_cache) = file_metadata_cache { + file_metadata_cache.put(object_meta, metadata.clone()); + } + + Ok(metadata) + } + + /// Read and parse the schema of the laz file + pub async fn fetch_schema(&mut self) -> Result { + let metadata = self.fetch_metadata().await?; + + let schema = try_schema_from_header( + &metadata.header, + self.options.point_encoding, + self.options.extra_bytes, + )?; + + Ok(schema) + } + + /// Fetch the metadata from the laz file via [`Self::fetch_metadata`] and extracts + /// the statistics in the metadata + pub async fn fetch_statistics( + &self, + table_schema: &SchemaRef, + ) -> Result { + let metadata = self.fetch_metadata().await?; + + let mut statistics = Statistics::new_unknown(table_schema) + .with_num_rows(Precision::Exact(metadata.header.number_of_points() as usize)) + .with_total_byte_size(Precision::Exact( + metadata + .chunk_table + .iter() + .map(|meta| meta.byte_range.end - meta.byte_range.start) + .sum::() as usize, + )); + + let bounds = metadata.header.bounds(); + for field in table_schema.fields() { + let cs = match field.name().as_str() { + "x" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.x)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.x)))) + .with_null_count(Precision::Exact(0)), + "y" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.y)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.y)))) + .with_null_count(Precision::Exact(0)), + "z" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.z)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.z)))) + .with_null_count(Precision::Exact(0)), + _ => ColumnStatistics::new_unknown(), + }; + + statistics = statistics.add_column_statistics(cs); + } + + Ok(statistics) + } +} + +pub(crate) async fn fetch_header( + store: &(impl ObjectStore + ?Sized), + object_meta: &ObjectMeta, +) -> Result> { + let location = &object_meta.location; + + // Header + let bytes = store.get_range(location, 0..375).await?; + let reader = Cursor::new(bytes); + let raw_header = RawHeader::read_from(reader)?; + + let header_size = raw_header.header_size as u64; + let offset_to_point_data = raw_header.offset_to_point_data as u64; + let num_vlr = raw_header.number_of_variable_length_records; + let evlr = raw_header.evlr; + + let mut builder = Builder::new(raw_header)?; + + // VLRs + let bytes = store + .get_range(location, header_size..offset_to_point_data) + .await?; + let mut reader = Cursor::new(bytes); + + for _ in 0..num_vlr { + let vlr = RawVlr::read_from(&mut reader, false).map(Vlr::new)?; + builder.vlrs.push(vlr); + } + + reader.read_to_end(&mut builder.vlr_padding)?; + + // EVLRs + if let Some(evlr) = evlr { + let mut start = evlr.start_of_first_evlr; + + for _ in 0..evlr.number_of_evlrs { + let mut end = start + 60; + + let bytes = store.get_range(location, start..end).await?; + + end += u64::from_le_bytes(bytes[20..28].try_into()?); + + let bytes = store.get_range(location, start..end).await?; + let mut reader = Cursor::new(bytes); + let evlr = RawVlr::read_from(&mut reader, true).map(Vlr::new)?; + + builder.evlrs.push(evlr); + + start = end; + } + } + + Ok(builder.into_header()?) +} + +#[derive(Debug, Clone, PartialEq)] +pub struct ExtraAttribute { + pub data_type: DataType, + pub no_data: Option<[u8; 8]>, + pub scale: Option, + pub offset: Option, +} + +pub(crate) fn extra_bytes_attributes( + header: &Header, +) -> Result, Box> { + let mut attributes = Vec::new(); + + for vlr in header.all_vlrs() { + if !(vlr.user_id == "LASF_Spec" && vlr.record_id == 4) { + continue; + } + + for bytes in vlr.data.chunks(192) { + // data type + let data_type = match bytes[2] { + 0 => DataType::FixedSizeBinary(bytes[3] as i32), + 1 => DataType::UInt8, + 2 => DataType::Int8, + 3 => DataType::UInt16, + 4 => DataType::Int16, + 5 => DataType::UInt32, + 6 => DataType::Int32, + 7 => DataType::UInt64, + 8 => DataType::Int64, + 9 => DataType::Float32, + 10 => DataType::Float64, + 11..=30 => return Err("deprecated extra bytes data type".into()), + 31..=255 => return Err("reserved extra butes data type".into()), + }; + + // no data + let no_data = if bytes[2] != 0 && bytes[3] & 1 == 1 { + Some(bytes[40..48].try_into().unwrap()) + } else { + None + }; + + // scale + let scale = if bytes[2] != 0 && bytes[3] >> 3 & 1 == 1 { + Some(f64::from_le_bytes(bytes[112..120].try_into().unwrap())) + } else { + None + }; + + // offset + let offset = if bytes[2] != 0 && bytes[3] >> 4 & 1 == 1 { + Some(f64::from_le_bytes(bytes[136..144].try_into().unwrap())) + } else { + None + }; + + let attribute = ExtraAttribute { + data_type, + no_data, + scale, + offset, + }; + + attributes.push(attribute); + } + } + + Ok(attributes) +} + +pub(crate) async fn chunk_table( + store: &(impl ObjectStore + ?Sized), + object_meta: &ObjectMeta, + header: &Header, +) -> Result, Box> { + let num_points = header.number_of_points(); + let mut point_offset = 0; + + let vlr_len = header.vlrs().iter().map(|v| v.len(false)).sum::(); + let header_size = header.version().header_size() as usize + header.padding().len(); + let mut byte_offset = (header_size + vlr_len + header.vlr_padding().len()) as u64; + + let laz_vlr = header.laz_vlr()?; + + let ranges = [ + byte_offset..byte_offset + 8, + object_meta.size - 8..object_meta.size, + ]; + let bytes = store.get_ranges(&object_meta.location, &ranges).await?; + let mut table_offset = None; + + let table_offset1 = i64::from_le_bytes(bytes[0].to_vec().try_into().unwrap()) as u64; + let table_offset2 = i64::from_le_bytes(bytes[1].to_vec().try_into().unwrap()) as u64; + + if table_offset1 > byte_offset { + table_offset = Some(table_offset1); + } else if table_offset2 > byte_offset { + table_offset = Some(table_offset2); + } + + let Some(table_offset) = table_offset else { + return Err("LAZ files without chunk table not supported (yet)".into()); + }; + + if table_offset > object_meta.size { + return Err("LAZ file chunk table position is missing/bad".into()); + } + + let bytes = store + .get_range(&object_meta.location, table_offset..table_offset + 8) + .await?; + + let num_chunks = u32::from_le_bytes(bytes[4..].to_vec().try_into().unwrap()) as u64; + let range = table_offset..table_offset + 8 + 8 * num_chunks; + let bytes = store.get_range(&object_meta.location, range).await?; + + let mut reader = Cursor::new(bytes); + let variable_size = laz_vlr.uses_variable_size_chunks(); + let chunk_table = ChunkTable::read(&mut reader, variable_size)?; + assert_eq!(chunk_table.len(), num_chunks as usize); + + let mut chunks = Vec::with_capacity(num_chunks as usize); + let chunk_size = laz_vlr.chunk_size() as u64; + byte_offset += 8; + + for chunk_table_entry in &chunk_table { + let point_count = if variable_size { + chunk_table_entry.point_count + } else { + chunk_size.min(num_points - point_offset) + }; + + let chunk = ChunkMeta { + num_points: point_count, + point_offset, + byte_range: byte_offset..byte_offset + chunk_table_entry.byte_count, + }; + chunks.push(chunk); + point_offset += point_count; + byte_offset += chunk_table_entry.byte_count; + } + + Ok(chunks) +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use las::{point::Format, Builder, Reader, Writer}; + use object_store::{memory::InMemory, path::Path, ObjectStore, PutPayload}; + + use crate::laz::metadata::LazMetadataReader; + + #[allow(static_mut_refs)] + #[tokio::test] + async fn header_basic_e2e() { + // create laz file + static mut LAZ: Vec = Vec::new(); + + let mut builder = Builder::from((1, 4)); + builder.point_format = Format::new(1).unwrap(); + builder.point_format.is_compressed = true; + let header = builder.into_header().unwrap(); + let write = unsafe { Cursor::new(&mut LAZ) }; + let mut writer = Writer::new(write, header).unwrap(); + + writer.close().unwrap(); + + // put to object store + let store = InMemory::new(); + let location = Path::parse("test.laz").unwrap(); + let payload = unsafe { PutPayload::from_static(&LAZ) }; + store.put(&location, payload).await.unwrap(); + + // read with `LazMetadataReader` + let object_meta = store.head(&location).await.unwrap(); + let metadata_reader = LazMetadataReader::new(&store, &object_meta); + + // read with las `Reader` + let read = unsafe { Cursor::new(&mut LAZ) }; + let reader = Reader::new(read).unwrap(); + + assert_eq!( + reader.header(), + &metadata_reader.fetch_header().await.unwrap() + ); + } +} diff --git a/rust/sedona-pointcloud/src/laz/mod.rs b/rust/sedona-pointcloud/src/laz/mod.rs new file mode 100644 index 000000000..b6fb48cde --- /dev/null +++ b/rust/sedona-pointcloud/src/laz/mod.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod builder; +pub mod format; +pub mod metadata; +pub mod opener; +pub mod options; +pub mod reader; +pub mod schema; +pub mod source; diff --git a/rust/sedona-pointcloud/src/laz/opener.rs b/rust/sedona-pointcloud/src/laz/opener.rs new file mode 100644 index 000000000..9bf7a60cf --- /dev/null +++ b/rust/sedona-pointcloud/src/laz/opener.rs @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion_common::{error::DataFusionError, pruning::PrunableStatistics}; +use datafusion_datasource::{ + file_stream::{FileOpenFuture, FileOpener}, + PartitionedFile, +}; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_pruning::PruningPredicate; +use futures::StreamExt; + +use sedona_expr::spatial_filter::SpatialFilter; +use sedona_geometry::bounding_box::BoundingBox; + +use crate::laz::{ + options::LazTableOptions, + reader::{LazFileReader, LazFileReaderFactory}, + schema::try_schema_from_header, +}; + +pub struct LazOpener { + /// Column indexes in `table_schema` needed by the query + pub projection: Arc<[usize]>, + /// Optional limit on the number of rows to read + pub limit: Option, + pub predicate: Option>, + /// Factory for instantiating laz reader + pub laz_file_reader_factory: Arc, + /// Table options + pub options: LazTableOptions, +} + +impl FileOpener for LazOpener { + fn open(&self, file: PartitionedFile) -> Result { + let projection = self.projection.clone(); + let limit = self.limit; + + let predicate = self.predicate.clone(); + + let laz_reader: Box = self + .laz_file_reader_factory + .create_reader(file.clone(), self.options.clone())?; + + Ok(Box::pin(async move { + let metadata = laz_reader.get_metadata().await?; + let schema = Arc::new(try_schema_from_header( + &metadata.header, + laz_reader.options.point_encoding, + laz_reader.options.extra_bytes, + )?); + + let pruning_predicate = predicate.and_then(|physical_expr| { + PruningPredicate::try_new(physical_expr, schema.clone()).ok() + }); + + // file pruning + if let Some(pruning_predicate) = &pruning_predicate { + // based on spatial filter + let spatial_filter = SpatialFilter::try_from_expr(pruning_predicate.orig_expr())?; + let bounds = metadata.header.bounds(); + let bbox = BoundingBox::xyzm( + (bounds.min.x, bounds.max.x), + (bounds.min.y, bounds.max.y), + Some((bounds.min.z, bounds.max.z).into()), + None, + ); + if !spatial_filter.filter_bbox("geometry").intersects(&bbox) { + return Ok(futures::stream::empty().boxed()); + } + // based on file statistics + if let Some(statistics) = file.statistics { + let prunable_statistics = PrunableStatistics::new(vec![statistics], schema); + if let Ok(filter) = pruning_predicate.prune(&prunable_statistics) { + if !filter[0] { + return Ok(futures::stream::empty().boxed()); + } + } + } + } + + // map chunk table + let chunk_table: Vec<_> = metadata + .chunk_table + .iter() + .filter(|chunk_meta| { + file.range.as_ref().is_none_or(|range| { + let offset = chunk_meta.byte_range.start; + offset >= range.start as u64 && offset < range.end as u64 + }) + }) + .cloned() + .collect(); + + let mut row_count = 0; + + let stream = async_stream::try_stream! { + for chunk_meta in chunk_table.into_iter() { + // limit + if let Some(limit) = limit { + if row_count >= limit { + break; + } + } + + // fetch batch + let record_batch = laz_reader.get_batch(&chunk_meta).await?; + row_count += record_batch.num_rows(); + + // project + let record_batch = record_batch + .project(&projection) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + + yield record_batch + } + + }; + + Ok(Box::pin(stream) as _) + })) + } +} diff --git a/rust/sedona-pointcloud/src/laz/options.rs b/rust/sedona-pointcloud/src/laz/options.rs new file mode 100644 index 000000000..39bfd3b5f --- /dev/null +++ b/rust/sedona-pointcloud/src/laz/options.rs @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{fmt::Display, str::FromStr}; + +use datafusion_common::{ + config::{ConfigExtension, ConfigField, Visit}, + error::DataFusionError, + extensions_options, +}; + +/// Geometry representation +#[derive(Clone, Copy, Default, PartialEq, Eq, Debug)] +pub enum LasPointEncoding { + /// Use plain coordinates as three fields `x`, `y`, `z` with datatype Float64 encoding. + #[default] + Plain, + /// Resolves the coordinates to a fields `geometry` with WKB encoding. + Wkb, + /// Resolves the coordinates to a fields `geometry` with separated GeoArrow encoding. + Native, +} + +impl Display for LasPointEncoding { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + LasPointEncoding::Plain => f.write_str("plain"), + LasPointEncoding::Wkb => f.write_str("wkb"), + LasPointEncoding::Native => f.write_str("nativ"), + } + } +} + +impl FromStr for LasPointEncoding { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "plain" => Ok(Self::Plain), + "wkb" => Ok(Self::Wkb), + "native" => Ok(Self::Native), + s => Err(format!("Unable to parse from `{s}`")), + } + } +} + +impl ConfigField for LasPointEncoding { + fn visit(&self, v: &mut V, key: &str, _description: &'static str) { + v.some( + &format!("{key}.point_encoding"), + self, + "Specify point encoding", + ); + } + + fn set(&mut self, _key: &str, value: &str) -> Result<(), DataFusionError> { + *self = value.parse().map_err(DataFusionError::Configuration)?; + Ok(()) + } +} + +/// LAS extra bytes handling +#[derive(Clone, Copy, Default, PartialEq, Eq, Debug)] +pub enum LasExtraBytes { + /// Resolve to typed and named attributes + Typed, + /// Keep as binary blob + Blob, + /// Drop/ignore extrabytes + #[default] + Ignore, +} + +impl Display for LasExtraBytes { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + LasExtraBytes::Typed => f.write_str("typed"), + LasExtraBytes::Blob => f.write_str("blob"), + LasExtraBytes::Ignore => f.write_str("ignore"), + } + } +} + +impl FromStr for LasExtraBytes { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "typed" => Ok(Self::Typed), + "blob" => Ok(Self::Blob), + "ignore" => Ok(Self::Ignore), + s => Err(format!("Unable to parse from `{s}`")), + } + } +} + +impl ConfigField for LasExtraBytes { + fn visit(&self, v: &mut V, key: &str, _description: &'static str) { + v.some( + &format!("{key}.extra_bytes"), + self, + "Specify extra bytes handling", + ); + } + + fn set(&mut self, _key: &str, value: &str) -> Result<(), DataFusionError> { + *self = value.parse().map_err(DataFusionError::Configuration)?; + Ok(()) + } +} + +// Define a new configuration struct using the `extensions_options` macro +extensions_options! { + /// The LAZ config options + pub struct LazTableOptions { + pub point_encoding: LasPointEncoding, default = LasPointEncoding::default() + pub extra_bytes: LasExtraBytes, default = LasExtraBytes::default() + } + +} + +impl ConfigExtension for LazTableOptions { + const PREFIX: &'static str = "laz"; +} + +impl LazTableOptions { + pub fn with_point_encoding(mut self, point_encoding: LasPointEncoding) -> Self { + self.point_encoding = point_encoding; + self + } + + pub fn with_extra_bytes(mut self, extra_bytes: LasExtraBytes) -> Self { + self.extra_bytes = extra_bytes; + self + } +} diff --git a/rust/sedona-pointcloud/src/laz/reader.rs b/rust/sedona-pointcloud/src/laz/reader.rs new file mode 100644 index 000000000..e622f8d4c --- /dev/null +++ b/rust/sedona-pointcloud/src/laz/reader.rs @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + io::{Cursor, Read, Seek}, + sync::Arc, +}; + +use arrow_array::RecordBatch; +use datafusion_common::error::DataFusionError; +use datafusion_datasource::PartitionedFile; +use datafusion_execution::cache::cache_manager::FileMetadataCache; +use futures::{future::BoxFuture, FutureExt}; +use las::{raw::Point as RawPoint, Point}; +use laz::{ + record::{ + LayeredPointRecordDecompressor, RecordDecompressor, SequentialPointRecordDecompressor, + }, + DecompressionSelection, LasZipError, LazItem, +}; +use object_store::ObjectStore; + +use crate::laz::{ + builder::RowBuilder, + metadata::{ChunkMeta, LazMetadata, LazMetadataReader}, + options::LazTableOptions, +}; + +/// Laz file reader factory +#[derive(Debug)] +pub struct LazFileReaderFactory { + store: Arc, + metadata_cache: Option>, +} + +impl LazFileReaderFactory { + /// Create a new `LazFileReaderFactory`. + pub fn new( + store: Arc, + metadata_cache: Option>, + ) -> Self { + Self { + store, + metadata_cache, + } + } + + pub fn create_reader( + &self, + partitioned_file: PartitionedFile, + options: LazTableOptions, + ) -> Result, DataFusionError> { + Ok(Box::new(LazFileReader { + partitioned_file, + store: self.store.clone(), + metadata_cache: self.metadata_cache.clone(), + options, + })) + } +} + +/// Reader for a laz file in object storage. +pub struct LazFileReader { + partitioned_file: PartitionedFile, + store: Arc, + metadata_cache: Option>, + pub options: LazTableOptions, +} + +impl LazFileReader { + pub fn get_metadata<'a>(&'a self) -> BoxFuture<'a, Result, DataFusionError>> { + let object_meta = self.partitioned_file.object_meta.clone(); + let metadata_cache = self.metadata_cache.clone(); + + async move { + LazMetadataReader::new(&self.store, &object_meta) + .with_file_metadata_cache(metadata_cache) + .with_options(self.options.clone()) + .fetch_metadata() + .await + } + .boxed() + } + + pub async fn get_batch(&self, chunk_meta: &ChunkMeta) -> Result { + let metadata = self.get_metadata().await?; + let header = metadata.header.clone(); + + // fetch bytes + let location = &self.partitioned_file.object_meta.location; + let bytes = self + .store + .get_range(location, chunk_meta.byte_range.clone()) + .await?; + + // laz decompressor + let laz_vlr = header + .laz_vlr() + .map_err(|e| DataFusionError::External(Box::new(e)))?; + let reader = Cursor::new(bytes); + let mut decompressor = record_decompressor(laz_vlr.items(), reader) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + // record batch builder + let num_points = chunk_meta.num_points as usize; + let mut builder = RowBuilder::new(num_points, header.clone()) + .with_point_encoding(self.options.point_encoding) + .with_extra_attributes(metadata.extra_attributes.clone(), self.options.extra_bytes); + + // transform + let format = header.point_format(); + let transforms = header.transforms(); + + let out = vec![0; format.len() as usize]; + let mut buffer = Cursor::new(out); + + for _ in 0..chunk_meta.num_points { + buffer.set_position(0); + decompressor.decompress_next(buffer.get_mut())?; + + let point = RawPoint::read_from(&mut buffer, format) + .map(|raw_point| Point::new(raw_point, transforms)) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + builder.append(point); + } + + let struct_array = builder.finish()?; + + Ok(RecordBatch::from(struct_array)) + } +} + +pub(super) fn record_decompressor<'a, R: Read + Seek + Send + Sync + 'a>( + items: &Vec, + input: R, +) -> Result + Send + Sync + 'a>, LasZipError> { + let first_item = items + .first() + .expect("There should be at least one LazItem to be able to create a RecordDecompressor"); + + let mut decompressor = match first_item.version() { + 1 | 2 => { + let decompressor = SequentialPointRecordDecompressor::new(input); + Box::new(decompressor) as Box + Send + Sync> + } + 3 | 4 => { + let decompressor = LayeredPointRecordDecompressor::new(input); + Box::new(decompressor) as Box + Send + Sync> + } + _ => { + return Err(LasZipError::UnsupportedLazItemVersion( + first_item.item_type(), + first_item.version(), + )); + } + }; + + decompressor.set_fields_from(items)?; + decompressor.set_selection(DecompressionSelection::all()); + + Ok(decompressor) +} + +#[cfg(test)] +mod tests { + use std::{io::Cursor, sync::Arc}; + + use datafusion_datasource::PartitionedFile; + use las::{point::Format, Builder, Point, Writer}; + use object_store::{memory::InMemory, path::Path, ObjectStore, PutPayload}; + + use crate::laz::reader::LazFileReaderFactory; + + #[allow(static_mut_refs)] + #[tokio::test] + async fn reader_basic_e2e() { + // create laz file + static mut LAZ: Vec = Vec::new(); + + let mut builder = Builder::from((1, 4)); + builder.point_format = Format::new(1).unwrap(); + builder.point_format.is_compressed = true; + let header = builder.into_header().unwrap(); + let write = unsafe { Cursor::new(&mut LAZ) }; + let mut writer = Writer::new(write, header).unwrap(); + + let point = Point { + gps_time: Some(Default::default()), + ..Default::default() + }; + writer.write_point(point).unwrap(); + + writer.close().unwrap(); + + // put to object store + let store = InMemory::new(); + let location = Path::parse("test.laz").unwrap(); + let payload = unsafe { PutPayload::from_static(&LAZ) }; + store.put(&location, payload).await.unwrap(); + + // read batch with `LazFileReader` + let laz_file_reader = LazFileReaderFactory::new(Arc::new(store), None) + .create_reader( + PartitionedFile::new(location, unsafe { LAZ.len() as u64 }), + Default::default(), + ) + .unwrap(); + let metadata = laz_file_reader.get_metadata().await.unwrap(); + + let batch = laz_file_reader + .get_batch(&metadata.chunk_table[0]) + .await + .unwrap(); + + assert_eq!(batch.num_rows(), 1); + } +} diff --git a/rust/sedona-pointcloud/src/laz/schema.rs b/rust/sedona-pointcloud/src/laz/schema.rs new file mode 100644 index 000000000..5cff9d818 --- /dev/null +++ b/rust/sedona-pointcloud/src/laz/schema.rs @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_schema::{ArrowError, DataType, Field, Schema}; +use geoarrow_schema::{CoordType, Crs, Dimension, Metadata, PointType, WkbType}; +use las::Header; +use las_crs::{get_epsg_from_geotiff_crs, get_epsg_from_wkt_crs_bytes}; + +use crate::laz::options::{LasExtraBytes, LasPointEncoding}; + +// Arrow schema for LAS points +pub fn try_schema_from_header( + header: &Header, + point_encoding: LasPointEncoding, + extra_bytes: LasExtraBytes, +) -> Result { + let epsg_crs = if header.has_wkt_crs() { + header + .get_wkt_crs_bytes() + .and_then(|bytes| get_epsg_from_wkt_crs_bytes(bytes).ok()) + } else { + header + .get_geotiff_crs() + .map(|gtc| gtc.and_then(|gtc| get_epsg_from_geotiff_crs(>c).ok())) + .unwrap_or_default() + }; + + let crs = epsg_crs + .map(|epsg_crs| Crs::from_authority_code(format!("EPSG:{}", epsg_crs.get_horizontal()))) + .unwrap_or_default(); + + let mut fields = match point_encoding { + LasPointEncoding::Plain => vec![ + Field::new("x", DataType::Float64, false), + Field::new("y", DataType::Float64, false), + Field::new("z", DataType::Float64, false), + ], + LasPointEncoding::Wkb => { + let point_type = WkbType::new(Arc::new(Metadata::new(crs, None))); + vec![Field::new("geometry", DataType::Binary, false).with_extension_type(point_type)] + } + LasPointEncoding::Native => { + let point_type = PointType::new(Dimension::XYZ, Arc::new(Metadata::new(crs, None))) + .with_coord_type(CoordType::Separated); + vec![point_type.to_field("geometry", false)] + } + }; + fields.extend_from_slice(&[ + Field::new("intensity", DataType::UInt16, true), + Field::new("return_number", DataType::UInt8, false), + Field::new("number_of_returns", DataType::UInt8, false), + Field::new("is_synthetic", DataType::Boolean, false), + Field::new("is_key_point", DataType::Boolean, false), + Field::new("is_withheld", DataType::Boolean, false), + Field::new("is_overlap", DataType::Boolean, false), + Field::new("scanner_channel", DataType::UInt8, false), + Field::new("scan_direction", DataType::UInt8, false), + Field::new("is_edge_of_flight_line", DataType::Boolean, false), + Field::new("classification", DataType::UInt8, false), + Field::new("user_data", DataType::UInt8, false), + Field::new("scan_angle", DataType::Float32, false), + Field::new("point_source_id", DataType::UInt16, false), + ]); + if header.point_format().has_gps_time { + fields.push(Field::new("gps_time", DataType::Float64, false)); + } + if header.point_format().has_color { + fields.extend([ + Field::new("red", DataType::UInt16, false), + Field::new("green", DataType::UInt16, false), + Field::new("blue", DataType::UInt16, false), + ]) + } + if header.point_format().has_nir { + fields.push(Field::new("nir", DataType::UInt16, false)); + } + + // extra bytes + if header.point_format().extra_bytes > 0 { + match extra_bytes { + LasExtraBytes::Typed => fields.extend(extra_bytes_fields(header)?), + LasExtraBytes::Blob => fields.push(Field::new( + "extra_bytes", + DataType::FixedSizeBinary(header.point_format().extra_bytes as i32), + false, + )), + LasExtraBytes::Ignore => (), + } + } + + Ok(Schema::new(fields)) +} + +fn extra_bytes_fields(header: &Header) -> Result, ArrowError> { + let mut fields = Vec::new(); + + for vlr in header.all_vlrs() { + if !(vlr.user_id == "LASF_Spec" && vlr.record_id == 4) { + continue; + } + + for bytes in vlr.data.chunks(192) { + // name + let name = std::str::from_utf8(&bytes[4..36])?; + let name = name.trim_end_matches(char::from(0)); + + // data type + let data_type = if bytes[2] != 0 && (bytes[3] >> 3 & 1 == 1 || bytes[3] >> 4 & 1 == 1) { + // if scaled and/or offset resolve to f64 + DataType::Float64 + } else { + match bytes[2] { + 0 => DataType::FixedSizeBinary(bytes[3] as i32), + 1 => DataType::UInt8, + 2 => DataType::Int8, + 3 => DataType::UInt16, + 4 => DataType::Int16, + 5 => DataType::UInt32, + 6 => DataType::Int32, + 7 => DataType::UInt64, + 8 => DataType::Int64, + 9 => DataType::Float32, + 10 => DataType::Float64, + 11..=30 => { + return Err(ArrowError::ExternalError( + "deprecated extra bytes data type".into(), + )); + } + 31..=255 => { + return Err(ArrowError::ExternalError( + "reserved extra bytes data type".into(), + )); + } + } + }; + + // nullability + let nullable = if bytes[2] != 0 && bytes[3] & 1 == 1 { + true // data bit is valid and set + } else { + false + }; + + fields.push(Field::new(name, data_type, nullable)); + } + } + + Ok(fields) +} diff --git a/rust/sedona-pointcloud/src/laz/source.rs b/rust/sedona-pointcloud/src/laz/source.rs new file mode 100644 index 000000000..c3324f2a5 --- /dev/null +++ b/rust/sedona-pointcloud/src/laz/source.rs @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, sync::Arc}; + +use datafusion_common::{config::ConfigOptions, error::DataFusionError, internal_err, Statistics}; +use datafusion_datasource::{ + file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, TableSchema, +}; +use datafusion_physical_expr::{conjunction, PhysicalExpr}; +use datafusion_physical_plan::{ + filter_pushdown::{FilterPushdownPropagation, PushedDown}, + metrics::ExecutionPlanMetricsSet, +}; +use object_store::ObjectStore; + +use crate::laz::{opener::LazOpener, options::LazTableOptions, reader::LazFileReaderFactory}; + +#[derive(Clone, Default, Debug)] +pub struct LazSource { + /// Optional metrics + metrics: ExecutionPlanMetricsSet, + /// The schema of the file. + pub(crate) table_schema: Option, + /// Optional predicate for row filtering during parquet scan + pub(crate) predicate: Option>, + /// Laz file reader factory + pub(crate) laz_file_reader_factory: Option>, + /// Batch size configuration + pub(crate) batch_size: Option, + pub(crate) projected_statistics: Option, + pub(crate) options: LazTableOptions, +} + +impl LazSource { + pub fn with_options(mut self, options: LazTableOptions) -> Self { + self.options = options; + self + } +} + +impl FileSource for LazSource { + fn create_file_opener( + &self, + object_store: Arc, + base_config: &FileScanConfig, + _partition: usize, + ) -> Arc { + let projection = base_config + .file_column_projection_indices() + .unwrap_or_else(|| (0..base_config.projected_file_schema().fields().len()).collect()); + + let laz_file_reader_factory = self + .laz_file_reader_factory + .clone() + .unwrap_or_else(|| Arc::new(LazFileReaderFactory::new(object_store, None))); + + Arc::new(LazOpener { + projection: Arc::from(projection), + limit: base_config.limit, + predicate: self.predicate.clone(), + laz_file_reader_factory, + options: self.options.clone(), + }) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn with_batch_size(&self, batch_size: usize) -> Arc { + let mut conf = self.clone(); + conf.batch_size = Some(batch_size); + Arc::new(conf) + } + + fn with_schema(&self, schema: TableSchema) -> Arc { + let mut conf = self.clone(); + conf.table_schema = Some(schema); + Arc::new(conf) + } + + fn with_projection(&self, _config: &FileScanConfig) -> Arc { + Arc::new(Self { ..self.clone() }) + } + + fn with_statistics(&self, statistics: Statistics) -> Arc { + let mut conf = self.clone(); + conf.projected_statistics = Some(statistics); + Arc::new(conf) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + + fn statistics(&self) -> Result { + let Some(statistics) = &self.projected_statistics else { + return internal_err!("projected_statistics must be set"); + }; + + if self.filter().is_some() { + Ok(statistics.clone().to_inexact()) + } else { + Ok(statistics.clone()) + } + } + + fn file_type(&self) -> &str { + "laz" + } + + fn try_pushdown_filters( + &self, + filters: Vec>, + _config: &ConfigOptions, + ) -> Result>, DataFusionError> { + let mut source = self.clone(); + + let predicate = match source.predicate { + Some(predicate) => conjunction(std::iter::once(predicate).chain(filters.clone())), + None => conjunction(filters.clone()), + }; + + source.predicate = Some(predicate); + let source = Arc::new(source); + + // Tell our parents that they still have to handle the filters (they will only be used for stats pruning). + Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![ + PushedDown::No; + filters.len() + ]) + .with_updated_node(source)) + } +} diff --git a/rust/sedona-pointcloud/src/lib.rs b/rust/sedona-pointcloud/src/lib.rs new file mode 100644 index 000000000..819cf2178 --- /dev/null +++ b/rust/sedona-pointcloud/src/lib.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod laz; diff --git a/rust/sedona/Cargo.toml b/rust/sedona/Cargo.toml index aab77f2e7..b20104ae9 100644 --- a/rust/sedona/Cargo.toml +++ b/rust/sedona/Cargo.toml @@ -38,6 +38,7 @@ geo = ["dep:sedona-geo"] geos = ["dep:sedona-geos"] tg = ["dep:sedona-tg"] http = ["object_store/http"] +pointcloud = ["dep:sedona-pointcloud"] proj = ["sedona-proj/proj-sys"] spatial-join = ["dep:sedona-spatial-join"] s2geography = ["dep:sedona-s2geography"] @@ -72,6 +73,7 @@ sedona-geo = { workspace = true, optional = true } sedona-geometry = { workspace = true } sedona-geoparquet = { workspace = true } sedona-geos = { workspace = true, optional = true } +sedona-pointcloud = { workspace = true, optional = true } sedona-proj = { workspace = true } sedona-raster-functions = { workspace = true } sedona-schema = { workspace = true } diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs index 998881df1..b0599143f 100644 --- a/rust/sedona/src/context.rs +++ b/rust/sedona/src/context.rs @@ -52,6 +52,11 @@ use sedona_geoparquet::{ format::GeoParquetFormatFactory, provider::{geoparquet_listing_table, GeoParquetReadOptions}, }; +#[cfg(feature = "pointcloud")] +use sedona_pointcloud::laz::{ + format::LazFormatFactory, + options::{LasExtraBytes, LasPointEncoding, LazTableOptions}, +}; /// Sedona SessionContext wrapper /// @@ -84,6 +89,13 @@ impl SedonaContext { // variables. let session_config = SessionConfig::from_env()?.with_information_schema(true); let session_config = add_sedona_option_extension(session_config); + #[cfg(feature = "pointcloud")] + let session_config = session_config.with_option_extension( + LazTableOptions::default() + .with_point_encoding(LasPointEncoding::Wkb) + .with_extra_bytes(LasExtraBytes::Ignore), + ); + let rt_builder = RuntimeEnvBuilder::new(); let runtime_env = rt_builder.build_arc()?; @@ -100,6 +112,17 @@ impl SedonaContext { let mut state = state_builder.build(); state.register_file_format(Arc::new(GeoParquetFormatFactory::new()), true)?; + #[cfg(feature = "pointcloud")] + { + use sedona_pointcloud::laz::options::LasExtraBytes; + + state.register_file_format(Arc::new(LazFormatFactory::new()), false)?; + state.register_table_options_extension( + LazTableOptions::default() + .with_point_encoding(LasPointEncoding::Wkb) + .with_extra_bytes(LasExtraBytes::Ignore), + ); + } // Enable dynamic file query (i.e., select * from 'filename') let ctx = SessionContext::new_with_state(state).enable_url_table();