From 5f2f0a347a4d46f3e0c46117eaceb757c859f55a Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 10 Jul 2024 14:00:25 -0400 Subject: [PATCH 1/5] New re_datafusion crate --- Cargo.lock | 1201 ++++++++++++++++- Cargo.toml | 8 + crates/store/re_datafusion/Cargo.toml | 45 + .../re_datafusion/examples/datafusion.rs | 13 + crates/store/re_datafusion/src/lib.rs | 39 + 5 files changed, 1281 insertions(+), 25 deletions(-) create mode 100644 crates/store/re_datafusion/Cargo.toml create mode 100644 crates/store/re_datafusion/examples/datafusion.rs create mode 100644 crates/store/re_datafusion/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 5a2602acfd31..e063708a6342 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -115,6 +115,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "const-random", "getrandom", "once_cell", "serde", @@ -131,6 +132,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "allocator-api2" version = "0.2.16" @@ -164,6 +180,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7eb209b1518d6bb87b283c20095f5228ecda460da70b44f0802523dea6da04" +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -295,12 +317,132 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf7d0a018de4f6aa429b9d33d69edf69072b1c5b1cb8d3e4a5f7ef898fc3eb76" +[[package]] +name = "arrayref" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d151e35f61089500b617991b791fc8bfd237ae50cd5950803758a179b41e67a" + [[package]] name = "arrayvec" version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" +[[package]] +name = "arrow" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa285343fba4d829d49985bdc541e3789cf6000ed0e84be7c039438df4a4e78c" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-csv", + "arrow-data", + "arrow-ipc", + "arrow-json", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "753abd0a5290c1bcade7c6623a556f7d1659c5f4148b140b5b63ce7bd1a45705" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half 2.3.1", + "num", +] + +[[package]] +name = "arrow-array" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d390feeb7f21b78ec997a4081a025baef1e2e0d6069e181939b61864c9779609" +dependencies = [ + "ahash", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "chrono-tz", + "half 2.3.1", + "hashbrown 0.14.2", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69615b061701bcdffbc62756bc7e85c827d5290b472b580c972ebbbf690f5aa4" +dependencies = [ + "bytes", + "half 2.3.1", + "num", +] + +[[package]] +name = "arrow-cast" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e448e5dd2f4113bf5b74a1f26531708f5edcacc77335b7066f9398f4bcf4cdef" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "base64 0.21.7", + "chrono", + "comfy-table", + "half 2.3.1", + "lexical-core", + "num", +] + +[[package]] +name = "arrow-csv" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46af72211f0712612f5b18325530b9ad1bfbdc87290d5fbfd32a7da128983781" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "chrono", + "csv", + "csv-core", + "lazy_static", + "lexical-core", + "regex", +] + +[[package]] +name = "arrow-data" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67d644b91a162f3ad3135ce1184d0a31c28b816a581e08f29e8e9277a574c64e" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half 2.3.1", + "num", +] + [[package]] name = "arrow-format" version = "0.8.1" @@ -311,6 +453,107 @@ dependencies = [ "serde", ] +[[package]] +name = "arrow-ipc" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03dea5e79b48de6c2e04f03f62b0afea7105be7b77d134f6c5414868feefb80d" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "flatbuffers", + "lz4_flex", +] + +[[package]] +name = "arrow-json" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8950719280397a47d37ac01492e3506a8a724b3fb81001900b866637a829ee0f" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "chrono", + "half 2.3.1", + "indexmap 2.1.0", + "lexical-core", + "num", + "serde", + "serde_json", +] + +[[package]] +name = "arrow-ord" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ed9630979034077982d8e74a942b7ac228f33dd93a93b615b4d02ad60c260be" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "half 2.3.1", + "num", +] + +[[package]] +name = "arrow-row" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "007035e17ae09c4e8993e4cb8b5b96edf0afb927cd38e2dff27189b274d83dcf" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half 2.3.1", + "hashbrown 0.14.2", +] + +[[package]] +name = "arrow-schema" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ff3e9c01f7cd169379d269f926892d0e622a704960350d09d331be3ec9e0029" + +[[package]] +name = "arrow-select" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce20973c1912de6514348e064829e50947e35977bb9d7fb637dc99ea9ffd78c" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num", +] + +[[package]] +name = "arrow-string" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00f3b37f2aeece31a2636d1b037dabb69ef590e03bdc7eb68519b51ec86932a7" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "num", + "regex", + "regex-syntax 0.8.4", +] + [[package]] name = "as-raw-xcb-connection" version = "1.0.1" @@ -372,6 +615,24 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-compression" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fec134f64e2bc57411226dfc4e52dec859ddfc7e711fc5e07b612584f000e4aa" +dependencies = [ + "bzip2", + "flate2", + "futures-core", + "futures-io", + "memchr", + "pin-project-lite", + "tokio", + "xz2", + "zstd", + "zstd-safe", +] + [[package]] name = "async-executor" version = "1.5.1" @@ -540,9 +801,9 @@ checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" [[package]] name = "async-trait" -version = "0.1.68" +version = "0.1.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" +checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", @@ -681,6 +942,29 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "blake2" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" +dependencies = [ + "digest", +] + +[[package]] +name = "blake3" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "199c42ab6972d92c9f8995f086273d25c42fc0f7b2a1fcefba465c1352d25ba5" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if", + "constant_time_eq", + "digest", +] + [[package]] name = "block" version = "0.1.6" @@ -758,6 +1042,27 @@ dependencies = [ "log", ] +[[package]] +name = "brotli" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "2.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bstr" version = "0.2.17" @@ -813,6 +1118,27 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +[[package]] +name = "bzip2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdb116a6ef3f6c3698828873ad02c3014b3c85cadb88496095628e3ef1e347f8" +dependencies = [ + "bzip2-sys", + "libc", +] + +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "calloop" version = "0.12.3" @@ -961,11 +1287,36 @@ checksum = "77e53693616d3075149f4ead59bdeecd204ac6b8192d8969757601b74bddf00f" [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" dependencies = [ + "android-tzdata", + "iana-time-zone", "num-traits", + "windows-targets 0.52.0", +] + +[[package]] +name = "chrono-tz" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e" +dependencies = [ + "chrono", + "chrono-tz-build", + "phf", +] + +[[package]] +name = "chrono-tz-build" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "433e39f13c9a060046954e0592a8d0a4bcb1040125cbf91cb8ee58964cfb350f" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", ] [[package]] @@ -1177,12 +1528,38 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom", + "once_cell", + "tiny-keccak", +] + [[package]] name = "const_soft_float" version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87ca1caa64ef4ed453e68bb3db612e51cf1b2f5b871337f0fcab1c8f87cc3dff" +[[package]] +name = "constant_time_eq" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" + [[package]] name = "constgebra" version = "0.1.4" @@ -1400,6 +1777,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "cursor-icon" version = "1.1.0" @@ -1466,12 +1864,260 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.2", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" +[[package]] +name = "datafusion" +version = "36.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b360b692bf6c6d6e6b6dbaf41a3be0020daeceac0f406aed54c75331e50dbb" +dependencies = [ + "ahash", + "arrow", + "arrow-array", + "arrow-ipc", + "arrow-schema", + "async-compression", + "async-trait", + "bytes", + "bzip2", + "chrono", + "dashmap", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions", + "datafusion-functions-array", + "datafusion-optimizer", + "datafusion-physical-expr", + "datafusion-physical-plan", + "datafusion-sql", + "flate2", + "futures", + "glob", + "half 2.3.1", + "hashbrown 0.14.2", + "indexmap 2.1.0", + "itertools 0.12.1", + "log", + "num_cpus", + "object_store", + "parking_lot", + "parquet", + "pin-project-lite", + "rand", + "sqlparser", + "tempfile", + "tokio", + "tokio-util", + "url", + "uuid", + "xz2", + "zstd", +] + +[[package]] +name = "datafusion-common" +version = "36.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37f343ccc298f440e25aa38ff82678291a7acc24061c7370ba6c0ff5cc811412" +dependencies = [ + "ahash", + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-schema", + "chrono", + "half 2.3.1", + "libc", + "num_cpus", + "object_store", + "parquet", + "sqlparser", +] + +[[package]] +name = "datafusion-execution" +version = "36.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9c93043081487e335399a21ebf8295626367a647ac5cb87d41d18afad7d0f7" +dependencies = [ + "arrow", + "chrono", + "dashmap", + "datafusion-common", + "datafusion-expr", + "futures", + "hashbrown 0.14.2", + "log", + "object_store", + "parking_lot", + "rand", + "tempfile", + "url", +] + +[[package]] +name = "datafusion-expr" +version = "36.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e204d89909e678846b6a95f156aafc1ee5b36cb6c9e37ec2e1449b078a38c818" +dependencies = [ + "ahash", + "arrow", + "arrow-array", + "datafusion-common", + "paste", + "sqlparser", + "strum", + "strum_macros", +] + +[[package]] +name = "datafusion-functions" +version = "36.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98f1c73f7801b2b8ba2297b3ad78ffcf6c1fc6b8171f502987eb9ad5cb244ee7" +dependencies = [ + "arrow", + "base64 0.21.7", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "hex", + "log", +] + +[[package]] +name = "datafusion-functions-array" +version = "36.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42d16a0ddf2c991526f6ffe2f47a72c6da0b7354d6c32411dd20631fe2e38937" +dependencies = [ + "arrow", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "log", + "paste", +] + +[[package]] +name = "datafusion-optimizer" +version = "36.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ae27e07bf1f04d327be5c2a293470879801ab5535204dc3b16b062fda195496" +dependencies = [ + "arrow", + "async-trait", + "chrono", + "datafusion-common", + "datafusion-expr", + "datafusion-physical-expr", + "hashbrown 0.14.2", + "itertools 0.12.1", + "log", + "regex-syntax 0.8.4", +] + +[[package]] +name = "datafusion-physical-expr" +version = "36.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dde620cd9ef76a3bca9c754fb68854bd2349c49f55baf97e08001f9e967f6d6b" +dependencies = [ + "ahash", + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-ord", + "arrow-schema", + "arrow-string", + "base64 0.21.7", + "blake2", + "blake3", + "chrono", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "half 2.3.1", + "hashbrown 0.14.2", + "hex", + "indexmap 2.1.0", + "itertools 0.12.1", + "log", + "md-5", + "paste", + "petgraph", + "rand", + "regex", + "sha2", + "unicode-segmentation", + "uuid", +] + +[[package]] +name = "datafusion-physical-plan" +version = "36.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a4c75fba9ea99d64b2246cbd2fcae2e6fc973e6616b1015237a616036506dd4" +dependencies = [ + "ahash", + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-schema", + "async-trait", + "chrono", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-expr", + "futures", + "half 2.3.1", + "hashbrown 0.14.2", + "indexmap 2.1.0", + "itertools 0.12.1", + "log", + "once_cell", + "parking_lot", + "pin-project-lite", + "rand", + "tokio", + "uuid", +] + +[[package]] +name = "datafusion-sql" +version = "36.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21474a95c3a62d113599d21b439fa15091b538bac06bd20be0bb2e7d22903c09" +dependencies = [ + "arrow", + "arrow-schema", + "datafusion-common", + "datafusion-expr", + "log", + "sqlparser", +] + [[package]] name = "derivative" version = "2.2.0" @@ -1497,6 +2143,7 @@ checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", "crypto-common", + "subtle", ] [[package]] @@ -1544,6 +2191,12 @@ dependencies = [ "rerun", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "document-features" version = "0.2.8" @@ -2135,6 +2788,21 @@ dependencies = [ "libc", ] +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.28" @@ -2142,6 +2810,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -2150,6 +2819,17 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +[[package]] +name = "futures-executor" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.28" @@ -2213,6 +2893,7 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -2428,6 +3109,7 @@ dependencies = [ "bytemuck", "cfg-if", "crunchy", + "num-traits", ] [[package]] @@ -2557,6 +3239,29 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "iana-time-zone" +version = "0.1.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "icrate" version = "0.0.4" @@ -2717,6 +3422,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "io-lifetimes" version = "1.0.10" @@ -2749,6 +3460,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -2857,7 +3577,71 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" name = "leb128" version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" +checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" + +[[package]] +name = "lexical-core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" +dependencies = [ + "lexical-parse-integer", + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-parse-integer" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" +dependencies = [ + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-util" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5255b9ff16ff898710eb9eb63cb39248ea8a5bb036bea8085b1a767ff6c4e3fc" +dependencies = [ + "static_assertions", +] + +[[package]] +name = "lexical-write-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" +dependencies = [ + "lexical-util", + "lexical-write-integer", + "static_assertions", +] + +[[package]] +name = "lexical-write-integer" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" +dependencies = [ + "lexical-util", + "static_assertions", +] [[package]] name = "libc" @@ -2927,9 +3711,9 @@ checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5" [[package]] name = "lock_api" -version = "0.4.9" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" dependencies = [ "autocfg", "scopeguard", @@ -2938,12 +3722,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.17" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" -dependencies = [ - "cfg-if", -] +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "log-once" @@ -2983,6 +3764,17 @@ dependencies = [ "twox-hash", ] +[[package]] +name = "lzma-sys" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fda04ab3764e6cde78b9974eec4f779acaba7c4e84b36eca3cf77c581b85d27" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "malloc_buf" version = "0.0.6" @@ -3002,6 +3794,16 @@ dependencies = [ "rawpointer", ] +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.5.0" @@ -3287,10 +4089,34 @@ dependencies = [ ] [[package]] -name = "num-complex" +name = "num" version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02e0d21255c828d6f128a1e41534206671e8c3ea0c62f32291e808dc82cff17d" +checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" dependencies = [ "num-traits", ] @@ -3308,19 +4134,40 @@ dependencies = [ [[package]] name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-iter" version = "0.1.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" dependencies = [ "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint", + "num-integer", "num-traits", ] [[package]] name = "num-traits" -version = "0.2.15" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", "libm", @@ -3551,6 +4398,27 @@ dependencies = [ "memchr", ] +[[package]] +name = "object_store" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8718f8b65fdf67a45108d1548347d4af7d71fb81ce727bbf9e3b2535e079db3" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "futures", + "humantime", + "itertools 0.12.1", + "parking_lot", + "percent-encoding", + "snafu", + "tokio", + "tracing", + "url", + "walkdir", +] + [[package]] name = "objectron" version = "0.18.0-alpha.1+dev" @@ -3592,6 +4460,15 @@ dependencies = [ "redox_syscall 0.3.5", ] +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-float" version = "4.2.0" @@ -3638,25 +4515,69 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.7" +version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "backtrace", "cfg-if", "libc", "petgraph", - "redox_syscall 0.2.16", + "redox_syscall 0.5.3", "smallvec", "thread-id", - "windows-sys 0.45.0", + "windows-targets 0.52.0", +] + +[[package]] +name = "parquet" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "547b92ebf0c1177e3892f44c8f79757ee62e678d564a9834189725f2c5b7a750" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", + "base64 0.21.7", + "brotli", + "bytes", + "chrono", + "flate2", + "futures", + "half 2.3.1", + "hashbrown 0.14.2", + "lz4_flex", + "num", + "num-bigint", + "object_store", + "paste", + "seq-macro", + "snap", + "thrift", + "tokio", + "twox-hash", + "zstd", +] + +[[package]] +name = "parse-zoneinfo" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f2a05b18d44e2957b88f96ba460715e295bc1d7510468a2f3d3b44535d26c24" +dependencies = [ + "regex", ] [[package]] name = "paste" -version = "1.0.12" +version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" [[package]] name = "pathdiff" @@ -3707,6 +4628,44 @@ dependencies = [ "indexmap 1.9.3", ] +[[package]] +name = "phf" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_codegen" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8d39688d359e6b34654d328e262234662d16cc0f60ec8dcbe5e718709342a5a" +dependencies = [ + "phf_generator", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48e4cc64c2ad9ebe670cb8fd69dd50ae301650392e81c05f9bfcb2d5bdbc24b0" +dependencies = [ + "phf_shared", + "rand", +] + +[[package]] +name = "phf_shared" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" +dependencies = [ + "siphasher", +] + [[package]] name = "pico-args" version = "0.5.0" @@ -4233,7 +5192,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1285f33f03e2faf9f77b06c19f32f8c54792a4cbb19df762b9ea70b79e0773d" dependencies = [ "ahash", + "arrow-array", + "arrow-buffer", + "arrow-data", "arrow-format", + "arrow-schema", "bytemuck", "chrono", "comfy-table", @@ -4464,6 +5427,25 @@ dependencies = [ "unindent", ] +[[package]] +name = "re_datafusion" +version = "0.18.0-alpha.1+dev" +dependencies = [ + "ahash", + "anyhow", + "async-trait", + "datafusion", + "document-features", + "re_arrow2", + "re_chunk", + "re_chunk_store", + "re_format", + "re_log", + "re_log_types", + "re_types", + "tokio", +] + [[package]] name = "re_dev_tools" version = "0.18.0-alpha.1+dev" @@ -4740,7 +5722,7 @@ dependencies = [ "itertools 0.13.0", "never", "notify", - "ordered-float", + "ordered-float 4.2.0", "parking_lot", "pathdiff", "profiling", @@ -5482,6 +6464,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4" +dependencies = [ + "bitflags 2.5.0", +] + [[package]] name = "redox_users" version = "0.4.3" @@ -5501,7 +6492,7 @@ checksum = "d1a59b5d8e97dee33696bf13c5ba8ab85341c002922fba050069326b9c498974" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.7.2", ] [[package]] @@ -5516,6 +6507,12 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" +[[package]] +name = "regex-syntax" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" + [[package]] name = "renderdoc-sys" version = "1.1.0" @@ -6231,6 +7228,12 @@ dependencies = [ "similar", ] +[[package]] +name = "siphasher" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" + [[package]] name = "skeptic" version = "0.13.7" @@ -6319,6 +7322,34 @@ dependencies = [ "serde", ] +[[package]] +name = "snafu" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6" +dependencies = [ + "doc-comment", + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "snap" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" + [[package]] name = "snippets" version = "0.18.0-alpha.1+dev" @@ -6363,6 +7394,27 @@ dependencies = [ "bitflags 2.5.0", ] +[[package]] +name = "sqlparser" +version = "0.43.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f95c4bae5aba7cd30bd506f7140026ade63cff5afd778af8854026f9606bf5d4" +dependencies = [ + "log", + "sqlparser_derive", +] + +[[package]] +name = "sqlparser_derive" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -6416,6 +7468,12 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7986063f7c0ab374407e586d7048a3d5aac94f103f751088bf398e07cd5400" +[[package]] +name = "subtle" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" + [[package]] name = "syn" version = "1.0.109" @@ -6573,6 +7631,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "ordered-float 2.10.1", +] + [[package]] name = "tiff" version = "0.9.1" @@ -6614,6 +7683,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tiny_http" version = "0.12.0" @@ -6669,6 +7747,42 @@ dependencies = [ "ahash", ] +[[package]] +name = "tokio" +version = "1.39.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" +dependencies = [ + "backtrace", + "bytes", + "pin-project-lite", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "tokio-util" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml" version = "0.8.10" @@ -7884,6 +8998,15 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dbabb1cbd15a1d6d12d9ed6b35cc6777d4af87ab3ba155ea37215f20beab80c" +[[package]] +name = "xz2" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "388c44dc09d76f1536602ead6d325eb532f5c122f17782bd57fb47baeeb767e2" +dependencies = [ + "lzma-sys", +] + [[package]] name = "zbus" version = "3.14.1" @@ -7982,6 +9105,34 @@ dependencies = [ "flate2", ] +[[package]] +name = "zstd" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa556e971e7b568dc775c136fc9de8c779b1c2fc3a63defaafadffdbd3181afa" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.12+zstd.1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a4e40c320c3cb459d9a9ff6de98cff88f4751ee9275d140e2be94a2b74e4c13" +dependencies = [ + "cc", + "pkg-config", +] + [[package]] name = "zune-core" version = "0.4.11" diff --git a/Cargo.toml b/Cargo.toml index c5333aa85ebd..c55e7333140f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ re_chunk = { path = "crates/store/re_chunk", version = "=0.18.0-alpha.1", defaul re_chunk_store = { path = "crates/store/re_chunk_store", version = "=0.18.0-alpha.1", default-features = false } re_data_loader = { path = "crates/store/re_data_loader", version = "=0.18.0-alpha.1", default-features = false } re_data_source = { path = "crates/store/re_data_source", version = "=0.18.0-alpha.1", default-features = false } +re_datafusion = { path = "crates/store/re_datafusion", version = "=0.18.0-alpha.1", default-features = false } re_entity_db = { path = "crates/store/re_entity_db", version = "=0.18.0-alpha.1", default-features = false } re_format_arrow = { path = "crates/store/re_format_arrow", version = "=0.18.0-alpha.1", default-features = false } re_log_encoding = { path = "crates/store/re_log_encoding", version = "=0.18.0-alpha.1", default-features = false } @@ -157,6 +158,7 @@ console_error_panic_hook = "0.1.6" convert_case = "0.6" criterion = "0.5" crossbeam = "0.8" +datafusion = "39.0" directories = "5" document-features = "0.2.8" ehttp = "0.5.0" @@ -253,6 +255,12 @@ tracing = { version = "0.1", default-features = false } tungstenite = { version = "0.20", default-features = false } type-map = "0.5" typenum = "1.15" +tokio = { version = "1.38", features = [ + "macros", + "rt", + "rt-multi-thread", + "sync", +] } unindent = "0.2" ureq = "2.6" url = "2.3" diff --git a/crates/store/re_datafusion/Cargo.toml b/crates/store/re_datafusion/Cargo.toml new file mode 100644 index 000000000000..4ee283777357 --- /dev/null +++ b/crates/store/re_datafusion/Cargo.toml @@ -0,0 +1,45 @@ +[package] +name = "re_datafusion" +authors.workspace = true +description = "Datafusion bindings on top of Rerun" +edition.workspace = true +homepage.workspace = true +include.workspace = true +license.workspace = true +publish = true +readme = "README.md" +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[lints] +workspace = true + +[package.metadata.docs.rs] +all-features = true + + +[features] +default = [] + +[dependencies] +re_chunk = { workspace = true, features = ["serde"] } +re_chunk_store.workspace = true +re_format.workspace = true +re_log.workspace = true + +ahash.workspace = true +datafusion.workspace = true +document-features.workspace = true +tokio.workspace = true +arrow = "52.1.0" + +[dev-dependencies] +re_types.workspace = true + +[lib] +bench = false + +[[example]] +name = "datafusion" +path = "examples/datafusion.rs" diff --git a/crates/store/re_datafusion/examples/datafusion.rs b/crates/store/re_datafusion/examples/datafusion.rs new file mode 100644 index 000000000000..859b9442c0c6 --- /dev/null +++ b/crates/store/re_datafusion/examples/datafusion.rs @@ -0,0 +1,13 @@ +use datafusion::error::Result; + +use re_datafusion::create_datafusion_context; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = create_datafusion_context()?; + + let df = ctx.sql("SELECT * FROM my_table").await?; + + df.show().await?; + Ok(()) +} diff --git a/crates/store/re_datafusion/src/lib.rs b/crates/store/re_datafusion/src/lib.rs new file mode 100644 index 000000000000..3191c40c1c4d --- /dev/null +++ b/crates/store/re_datafusion/src/lib.rs @@ -0,0 +1,39 @@ +//! This is how we bind Rerun into datafusion +//! +//! ## Feature flags +#![doc = document_features::document_features!()] +//! + +use datafusion::error::Result; +use datafusion::prelude::*; + +use arrow::array::{Float64Array, Int32Array, StringArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use std::sync::Arc; + +pub fn create_datafusion_context() -> Result { + let ctx = SessionContext::new(); + + // Define the schema for the in-memory table + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + Field::new("value", DataType::Float64, false), + ])); + + // Create sample data + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["A", "B", "C"])), + Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])), + ], + )?; + + // Register the in-memory table + ctx.register_batch("my_table", batch)?; + + Ok(ctx) +} From 74f72f602aba5094c45e4c5a9a608b54da1ae16a Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 10 Jul 2024 14:36:40 -0400 Subject: [PATCH 2/5] Introduce a custom data source --- crates/store/re_datafusion/Cargo.toml | 1 + .../re_datafusion/examples/datafusion.rs | 2 +- crates/store/re_datafusion/src/chunk_table.rs | 159 ++++++++++++++++++ crates/store/re_datafusion/src/lib.rs | 26 +-- 4 files changed, 166 insertions(+), 22 deletions(-) create mode 100644 crates/store/re_datafusion/src/chunk_table.rs diff --git a/crates/store/re_datafusion/Cargo.toml b/crates/store/re_datafusion/Cargo.toml index 4ee283777357..faa9f85aa3c9 100644 --- a/crates/store/re_datafusion/Cargo.toml +++ b/crates/store/re_datafusion/Cargo.toml @@ -33,6 +33,7 @@ datafusion.workspace = true document-features.workspace = true tokio.workspace = true arrow = "52.1.0" +async-trait = "0.1.81" [dev-dependencies] re_types.workspace = true diff --git a/crates/store/re_datafusion/examples/datafusion.rs b/crates/store/re_datafusion/examples/datafusion.rs index 859b9442c0c6..506e5de8aca0 100644 --- a/crates/store/re_datafusion/examples/datafusion.rs +++ b/crates/store/re_datafusion/examples/datafusion.rs @@ -6,7 +6,7 @@ use re_datafusion::create_datafusion_context; async fn main() -> Result<()> { let ctx = create_datafusion_context()?; - let df = ctx.sql("SELECT * FROM my_table").await?; + let df = ctx.sql("SELECT * FROM custom_table").await?; df.show().await?; Ok(()) diff --git a/crates/store/re_datafusion/src/chunk_table.rs b/crates/store/re_datafusion/src/chunk_table.rs new file mode 100644 index 000000000000..ece4c26617fc --- /dev/null +++ b/crates/store/re_datafusion/src/chunk_table.rs @@ -0,0 +1,159 @@ +use std::any::Any; +use std::fmt::{self, Debug, Formatter}; +use std::sync::Arc; + +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::Result; +use datafusion::execution::context::{SessionState, TaskContext}; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::memory::MemoryStream; +use datafusion::physical_plan::{ + project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, + PlanProperties, SendableRecordBatchStream, +}; +use datafusion::prelude::*; + +use async_trait::async_trait; + +/// A custom datasource, used to represent a datastore with a single index +#[derive(Clone, Default)] +pub struct CustomDataSource {} + +impl Debug for CustomDataSource { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str("custom_db") + } +} + +impl CustomDataSource { + pub(crate) async fn create_physical_plan( + &self, + projections: Option<&Vec>, + schema: SchemaRef, + ) -> Result> { + Ok(Arc::new(CustomExec::try_new( + projections, + &schema, + self.clone(), + )?)) + } +} + +#[async_trait] +impl TableProvider for CustomDataSource { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + Field::new("value", DataType::Float64, false), + ])) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec>, + // filters and limit can be used here to inject some push-down operations if needed + _filters: &[Expr], + _limit: Option, + ) -> Result> { + return self.create_physical_plan(projection, self.schema()).await; + } +} + +#[derive(Debug, Clone)] +struct CustomExec { + _db: CustomDataSource, + projected_schema: SchemaRef, + cache: PlanProperties, +} + +impl CustomExec { + fn try_new( + projections: Option<&Vec>, + schema: &SchemaRef, + db: CustomDataSource, + ) -> Result { + let projected_schema = project_schema(schema, projections)?; + let cache = Self::compute_properties(projected_schema.clone()); + Ok(Self { + _db: db, + projected_schema, + cache, + }) + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties(schema: SchemaRef) -> PlanProperties { + let eq_properties = EquivalenceProperties::new(schema); + PlanProperties::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ) + } +} + +impl DisplayAs for CustomExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "CustomExec") + } +} + +impl ExecutionPlan for CustomExec { + fn name(&self) -> &'static str { + "CustomExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + use arrow::array::{Float64Array, Int32Array, StringArray}; + + let batch = RecordBatch::try_new( + self.projected_schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["A", "B", "C"])), + Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])), + ], + )?; + + Ok(Box::pin(MemoryStream::try_new( + vec![batch], + self.schema(), + None, + )?)) + } +} diff --git a/crates/store/re_datafusion/src/lib.rs b/crates/store/re_datafusion/src/lib.rs index 3191c40c1c4d..17fc74cec845 100644 --- a/crates/store/re_datafusion/src/lib.rs +++ b/crates/store/re_datafusion/src/lib.rs @@ -4,36 +4,20 @@ #![doc = document_features::document_features!()] //! +mod chunk_table; + +use chunk_table::CustomDataSource; use datafusion::error::Result; use datafusion::prelude::*; -use arrow::array::{Float64Array, Int32Array, StringArray}; -use arrow::datatypes::{DataType, Field, Schema}; -use arrow::record_batch::RecordBatch; use std::sync::Arc; pub fn create_datafusion_context() -> Result { let ctx = SessionContext::new(); - // Define the schema for the in-memory table - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, false), - Field::new("value", DataType::Float64, false), - ])); - - // Create sample data - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Int32Array::from(vec![1, 2, 3])), - Arc::new(StringArray::from(vec!["A", "B", "C"])), - Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])), - ], - )?; + let db = CustomDataSource::default(); - // Register the in-memory table - ctx.register_batch("my_table", batch)?; + ctx.register_table("custom_table", Arc::new(db))?; Ok(ctx) } From e74baa7f9873ba86920fd565c6de0b6745a7cacb Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 10 Jul 2024 15:20:50 -0400 Subject: [PATCH 3/5] Add a chunkstore to the CustomDataSource --- crates/store/re_datafusion/Cargo.toml | 2 ++ .../re_datafusion/examples/datafusion.rs | 34 +++++++++++++++++-- crates/store/re_datafusion/src/chunk_table.rs | 14 ++++++-- crates/store/re_datafusion/src/lib.rs | 5 +-- 4 files changed, 48 insertions(+), 7 deletions(-) diff --git a/crates/store/re_datafusion/Cargo.toml b/crates/store/re_datafusion/Cargo.toml index faa9f85aa3c9..2ba01273f5d8 100644 --- a/crates/store/re_datafusion/Cargo.toml +++ b/crates/store/re_datafusion/Cargo.toml @@ -27,7 +27,9 @@ re_chunk = { workspace = true, features = ["serde"] } re_chunk_store.workspace = true re_format.workspace = true re_log.workspace = true +re_log_types.workspace = true +anyhow.workspace = true ahash.workspace = true datafusion.workspace = true document-features.workspace = true diff --git a/crates/store/re_datafusion/examples/datafusion.rs b/crates/store/re_datafusion/examples/datafusion.rs index 506e5de8aca0..a027db14eca1 100644 --- a/crates/store/re_datafusion/examples/datafusion.rs +++ b/crates/store/re_datafusion/examples/datafusion.rs @@ -1,10 +1,38 @@ -use datafusion::error::Result; +use std::sync::Arc; +use re_chunk::{ + external::re_log_types::example_components::MyPoint, Chunk, EntityPath, RowId, TimePoint, + Timeline, +}; +use re_chunk_store::ChunkStore; use re_datafusion::create_datafusion_context; #[tokio::main] -async fn main() -> Result<()> { - let ctx = create_datafusion_context()?; +async fn main() -> anyhow::Result<()> { + let entity_path: EntityPath = "some_entity".into(); + + let timeline_frame = Timeline::new_sequence("frame"); + let timepoint = TimePoint::from_iter([(timeline_frame, 10)]); + + let point1 = MyPoint::new(1.0, 1.0); + let point2 = MyPoint::new(2.0, 2.0); + + let mut store = ChunkStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + Default::default(), + ); + + let chunk = Chunk::builder(entity_path.clone()) + .with_component_batch(RowId::new(), timepoint.clone(), &[point1]) + .build()?; + store.insert_chunk(&Arc::new(chunk))?; + + let chunk = Chunk::builder(entity_path.clone()) + .with_component_batch(RowId::new(), timepoint.clone(), &[point2]) + .build()?; + store.insert_chunk(&Arc::new(chunk))?; + + let ctx = create_datafusion_context(store)?; let df = ctx.sql("SELECT * FROM custom_table").await?; diff --git a/crates/store/re_datafusion/src/chunk_table.rs b/crates/store/re_datafusion/src/chunk_table.rs index ece4c26617fc..c3cce58cec38 100644 --- a/crates/store/re_datafusion/src/chunk_table.rs +++ b/crates/store/re_datafusion/src/chunk_table.rs @@ -16,10 +16,20 @@ use datafusion::physical_plan::{ use datafusion::prelude::*; use async_trait::async_trait; +use re_chunk_store::ChunkStore; /// A custom datasource, used to represent a datastore with a single index -#[derive(Clone, Default)] -pub struct CustomDataSource {} +#[derive(Clone)] +pub struct CustomDataSource { + // TODO(jleibs): Sort out lifetime here so we don't need to take ownership. + store: ChunkStore, +} + +impl CustomDataSource { + pub fn new(store: ChunkStore) -> Self { + Self { store } + } +} impl Debug for CustomDataSource { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { diff --git a/crates/store/re_datafusion/src/lib.rs b/crates/store/re_datafusion/src/lib.rs index 17fc74cec845..78b991d1a55f 100644 --- a/crates/store/re_datafusion/src/lib.rs +++ b/crates/store/re_datafusion/src/lib.rs @@ -9,13 +9,14 @@ mod chunk_table; use chunk_table::CustomDataSource; use datafusion::error::Result; use datafusion::prelude::*; +use re_chunk_store::ChunkStore; use std::sync::Arc; -pub fn create_datafusion_context() -> Result { +pub fn create_datafusion_context(store: ChunkStore) -> Result { let ctx = SessionContext::new(); - let db = CustomDataSource::default(); + let db = CustomDataSource::new(store); ctx.register_table("custom_table", Arc::new(db))?; From 3ba95cbe67034bcc8c2c1b18adb6c7f23766a289 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Thu, 11 Jul 2024 07:14:03 -0400 Subject: [PATCH 4/5] Pull real data from a chunkstore --- Cargo.toml | 2 +- crates/store/re_datafusion/Cargo.toml | 4 +- .../re_datafusion/examples/datafusion.rs | 22 +++- crates/store/re_datafusion/src/chunk_table.rs | 100 +++++++++++------- 4 files changed, 80 insertions(+), 48 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c55e7333140f..e1449eba0bea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -158,7 +158,7 @@ console_error_panic_hook = "0.1.6" convert_case = "0.6" criterion = "0.5" crossbeam = "0.8" -datafusion = "39.0" +datafusion = "36.0" directories = "5" document-features = "0.2.8" ehttp = "0.5.0" diff --git a/crates/store/re_datafusion/Cargo.toml b/crates/store/re_datafusion/Cargo.toml index 2ba01273f5d8..3c9a128f1608 100644 --- a/crates/store/re_datafusion/Cargo.toml +++ b/crates/store/re_datafusion/Cargo.toml @@ -28,17 +28,17 @@ re_chunk_store.workspace = true re_format.workspace = true re_log.workspace = true re_log_types.workspace = true +re_types.workspace = true anyhow.workspace = true ahash.workspace = true datafusion.workspace = true document-features.workspace = true tokio.workspace = true -arrow = "52.1.0" +arrow2 = { workspace = true, features = ["arrow"] } async-trait = "0.1.81" [dev-dependencies] -re_types.workspace = true [lib] bench = false diff --git a/crates/store/re_datafusion/examples/datafusion.rs b/crates/store/re_datafusion/examples/datafusion.rs index a027db14eca1..9f973244f5e9 100644 --- a/crates/store/re_datafusion/examples/datafusion.rs +++ b/crates/store/re_datafusion/examples/datafusion.rs @@ -6,6 +6,7 @@ use re_chunk::{ }; use re_chunk_store::ChunkStore; use re_datafusion::create_datafusion_context; +use re_log_types::example_components::MyLabel; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -14,8 +15,12 @@ async fn main() -> anyhow::Result<()> { let timeline_frame = Timeline::new_sequence("frame"); let timepoint = TimePoint::from_iter([(timeline_frame, 10)]); - let point1 = MyPoint::new(1.0, 1.0); - let point2 = MyPoint::new(2.0, 2.0); + let point1 = MyPoint::new(1.0, 2.0); + let point2 = MyPoint::new(3.0, 4.0); + let point3 = MyPoint::new(5.0, 6.0); + let label1 = MyLabel("point1".to_owned()); + let label2 = MyLabel("point2".to_owned()); + let label3 = MyLabel("point3".to_owned()); let mut store = ChunkStore::new( re_log_types::StoreId::random(re_log_types::StoreKind::Recording), @@ -23,12 +28,19 @@ async fn main() -> anyhow::Result<()> { ); let chunk = Chunk::builder(entity_path.clone()) - .with_component_batch(RowId::new(), timepoint.clone(), &[point1]) + .with_component_batches( + RowId::new(), + timepoint.clone(), + [&[point1, point2] as _, &[label1, label2] as _], + ) .build()?; store.insert_chunk(&Arc::new(chunk))?; - let chunk = Chunk::builder(entity_path.clone()) - .with_component_batch(RowId::new(), timepoint.clone(), &[point2]) + .with_component_batches( + RowId::new(), + timepoint.clone(), + [&[point3] as _, &[label3] as _], + ) .build()?; store.insert_chunk(&Arc::new(chunk))?; diff --git a/crates/store/re_datafusion/src/chunk_table.rs b/crates/store/re_datafusion/src/chunk_table.rs index c3cce58cec38..6965dd0573fc 100644 --- a/crates/store/re_datafusion/src/chunk_table.rs +++ b/crates/store/re_datafusion/src/chunk_table.rs @@ -2,21 +2,25 @@ use std::any::Any; use std::fmt::{self, Debug, Formatter}; use std::sync::Arc; -use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow2::array::{Arrow2Arrow as _, ListArray}; +use datafusion::arrow::array::{ArrayRef, GenericListArray}; +use datafusion::arrow::datatypes::{Field, Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result; use datafusion::execution::context::{SessionState, TaskContext}; -use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::{ - project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, - PlanProperties, SendableRecordBatchStream, + project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, }; use datafusion::prelude::*; use async_trait::async_trait; use re_chunk_store::ChunkStore; +use re_types::Archetype as _; + +//use crate::conversions::convert_datatype_arrow2_to_arrow; /// A custom datasource, used to represent a datastore with a single index #[derive(Clone)] @@ -58,11 +62,23 @@ impl TableProvider for CustomDataSource { } fn schema(&self) -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, false), - Field::new("value", DataType::Float64, false), - ])) + // TODO(jleibs): This should come from the chunk store directly + let components = re_log_types::example_components::MyPoints::all_components(); + + let fields: Vec = components + .iter() + .filter_map(|c| { + self.store.lookup_datatype(c).map(|dt| { + Field::new( + c.as_str(), + ListArray::::default_datatype(dt.clone()).into(), + true, + ) + }) + }) + .collect(); + + Arc::new(Schema::new(fields)) } fn table_type(&self) -> TableType { @@ -85,7 +101,6 @@ impl TableProvider for CustomDataSource { struct CustomExec { _db: CustomDataSource, projected_schema: SchemaRef, - cache: PlanProperties, } impl CustomExec { @@ -95,23 +110,11 @@ impl CustomExec { db: CustomDataSource, ) -> Result { let projected_schema = project_schema(schema, projections)?; - let cache = Self::compute_properties(projected_schema.clone()); Ok(Self { _db: db, projected_schema, - cache, }) } - - /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. - fn compute_properties(schema: SchemaRef) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); - PlanProperties::new( - eq_properties, - Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, - ) - } } impl DisplayAs for CustomExec { @@ -121,19 +124,23 @@ impl DisplayAs for CustomExec { } impl ExecutionPlan for CustomExec { - fn name(&self) -> &'static str { - "CustomExec" - } - fn as_any(&self) -> &dyn Any { self } - fn properties(&self) -> &PlanProperties { - &self.cache + fn schema(&self) -> SchemaRef { + self.projected_schema.clone() + } + + fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { + datafusion::physical_plan::Partitioning::UnknownPartitioning(1) } - fn children(&self) -> Vec<&Arc> { + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { vec![] } @@ -149,19 +156,32 @@ impl ExecutionPlan for CustomExec { _partition: usize, _context: Arc, ) -> Result { - use arrow::array::{Float64Array, Int32Array, StringArray}; - - let batch = RecordBatch::try_new( - self.projected_schema.clone(), - vec![ - Arc::new(Int32Array::from(vec![1, 2, 3])), - Arc::new(StringArray::from(vec!["A", "B", "C"])), - Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])), - ], - )?; + let batches: datafusion::arrow::error::Result> = self + ._db + .store + .iter_chunks() + .map(|chunk| { + let components = re_log_types::example_components::MyPoints::all_components(); + + RecordBatch::try_new( + self.projected_schema.clone(), + components + .iter() + .filter_map(|c| { + chunk.components().get(c).map(|c| { + let data = c.to_data(); + let converted = GenericListArray::::from(data); + + Arc::new(converted) as ArrayRef + }) + }) + .collect(), + ) + }) + .collect(); Ok(Box::pin(MemoryStream::try_new( - vec![batch], + batches?, self.schema(), None, )?)) From af20d75f73d8f9ca68af5da6459fd698fc6bfab1 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Mon, 15 Jul 2024 11:06:35 -0400 Subject: [PATCH 5/5] Extract_field udf --- .../re_datafusion/examples/datafusion.rs | 15 +++ crates/store/re_datafusion/src/chunk_table.rs | 9 +- .../re_datafusion/src/field_extraction.rs | 117 ++++++++++++++++++ crates/store/re_datafusion/src/lib.rs | 7 ++ 4 files changed, 143 insertions(+), 5 deletions(-) create mode 100644 crates/store/re_datafusion/src/field_extraction.rs diff --git a/crates/store/re_datafusion/examples/datafusion.rs b/crates/store/re_datafusion/examples/datafusion.rs index 9f973244f5e9..117c12a60e87 100644 --- a/crates/store/re_datafusion/examples/datafusion.rs +++ b/crates/store/re_datafusion/examples/datafusion.rs @@ -47,7 +47,22 @@ async fn main() -> anyhow::Result<()> { let ctx = create_datafusion_context(store)?; let df = ctx.sql("SELECT * FROM custom_table").await?; + df.show().await?; + + let df = ctx + .sql("SELECT \"example.MyPoint\" FROM custom_table") + .await?; + df.show().await?; + let df = ctx + .sql("SELECT \"example.MyLabel\" FROM custom_table") + .await?; df.show().await?; + + let df = ctx + .sql("SELECT array_extract(\"example.MyPoint\", 'x') as X, array_extract(\"example.MyPoint\", 'y') as Y FROM custom_table") + .await?; + df.show().await?; + Ok(()) } diff --git a/crates/store/re_datafusion/src/chunk_table.rs b/crates/store/re_datafusion/src/chunk_table.rs index 6965dd0573fc..eb2fc5694df9 100644 --- a/crates/store/re_datafusion/src/chunk_table.rs +++ b/crates/store/re_datafusion/src/chunk_table.rs @@ -161,14 +161,13 @@ impl ExecutionPlan for CustomExec { .store .iter_chunks() .map(|chunk| { - let components = re_log_types::example_components::MyPoints::all_components(); - RecordBatch::try_new( self.projected_schema.clone(), - components + self.projected_schema + .fields() .iter() - .filter_map(|c| { - chunk.components().get(c).map(|c| { + .filter_map(|f| { + chunk.components().get(&f.name().clone().into()).map(|c| { let data = c.to_data(); let converted = GenericListArray::::from(data); diff --git a/crates/store/re_datafusion/src/field_extraction.rs b/crates/store/re_datafusion/src/field_extraction.rs new file mode 100644 index 000000000000..4e06cf0da7c5 --- /dev/null +++ b/crates/store/re_datafusion/src/field_extraction.rs @@ -0,0 +1,117 @@ +use datafusion::arrow::array::{Array, ListArray, StructArray}; +use datafusion::arrow::datatypes::{DataType, Field}; +use datafusion::common::{plan_err, DataFusionError, ExprSchema, Result}; +use datafusion::logical_expr::ScalarUDFImpl; +use datafusion::logical_expr::{ColumnarValue, Expr, Signature, Volatility}; +use datafusion::scalar::ScalarValue; +use std::any::Any; +use std::sync::Arc; + +#[derive(Debug)] +pub struct ExtractField { + signature: Signature, +} + +impl ExtractField { + pub fn new() -> Self { + Self { + signature: Signature::any(2, Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for ExtractField { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "array_extract" + } + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _args: &[DataType]) -> Result { + Err(DataFusionError::Internal( + "Should have dispatched to return_type_from_exprs".to_owned(), + )) + } + + fn return_type_from_exprs(&self, args: &[Expr], schema: &dyn ExprSchema) -> Result { + let Some(Expr::Column(col)) = args.first() else { + return plan_err!("rr_extract first arg must be a Column containing a List of Structs"); + }; + let dt = schema.data_type(col)?; + + let DataType::List(inner) = dt else { + return plan_err!("rr_extract first arg must be a Column containing a List of Structs"); + }; + + let DataType::Struct(fields) = inner.data_type() else { + return plan_err!("rr_extract first arg must be a Column containing a List of Structs"); + }; + + let Some(Expr::Literal(ScalarValue::Utf8(Some(field)))) = args.get(1) else { + return plan_err!( + "rr_extract second arg must be a string matching a field in the struct" + ); + }; + + let Some(final_dt) = fields.find(field) else { + return plan_err!( + "rr_extract second arg must be a string matching a field in the struct" + ); + }; + + Ok(DataType::List(Arc::new(Field::new( + "item", + final_dt.1.data_type().clone(), + true, + )))) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + let ColumnarValue::Scalar(ScalarValue::Utf8(Some(field_name))) = &args[1] else { + return Err(DataFusionError::Internal( + "Expected second argument to be a string".to_owned(), + )); + }; + + let args = ColumnarValue::values_to_arrays(args)?; + let arg = &args[0]; + + // Downcast to list array + let Some(list_array) = arg.as_any().downcast_ref::() else { + return Err(DataFusionError::Internal( + "Expected first argument to be a ListArray".to_owned(), + )); + }; + + // Get the child values array + let child_values = list_array.values(); + + // Downcast to a struct array + let Some(struct_array) = child_values.as_any().downcast_ref::() else { + return Err(DataFusionError::Internal( + "Expected ListArray to contain StructArray".to_owned(), + )); + }; + + // Get the values of the field with the correct name + let Some(field_values) = struct_array.column_by_name(field_name) else { + return Err(DataFusionError::Internal(format!( + "Expected StructArray to contain field named '{field_name}'", + ))); + }; + + // Create a new list array with the same offsets but the child values + let new_array = ListArray::new( + Arc::new(Field::new("item", field_values.data_type().clone(), true)), + list_array.offsets().clone(), + field_values.clone(), + list_array.nulls().cloned(), + ); + + Ok(ColumnarValue::Array(Arc::new(new_array))) + } +} diff --git a/crates/store/re_datafusion/src/lib.rs b/crates/store/re_datafusion/src/lib.rs index 78b991d1a55f..b0d023465cff 100644 --- a/crates/store/re_datafusion/src/lib.rs +++ b/crates/store/re_datafusion/src/lib.rs @@ -5,17 +5,24 @@ //! mod chunk_table; +mod field_extraction; use chunk_table::CustomDataSource; use datafusion::error::Result; +use datafusion::logical_expr::ScalarUDF; use datafusion::prelude::*; +use field_extraction::ExtractField; use re_chunk_store::ChunkStore; use std::sync::Arc; pub fn create_datafusion_context(store: ChunkStore) -> Result { + let extract_field = ScalarUDF::from(ExtractField::new()); + let ctx = SessionContext::new(); + ctx.register_udf(extract_field.clone()); + let db = CustomDataSource::new(store); ctx.register_table("custom_table", Arc::new(db))?;