Skip to content

Commit d45a27d

Browse files
committed
Some last minute things
Signed-off-by: Adam Gutglick <[email protected]>
1 parent d48a606 commit d45a27d

File tree

2 files changed

+20
-29
lines changed
  • bench-vortex/src/engines/df
  • vortex-datafusion/src/persistent

2 files changed

+20
-29
lines changed

bench-vortex/src/engines/df/mod.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use datafusion::execution::SessionStateBuilder;
1111
use datafusion::execution::cache::cache_manager::CacheManagerConfig;
1212
use datafusion::execution::cache::cache_unit::{DefaultFileStatisticsCache, DefaultListFilesCache};
1313
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
14-
use datafusion::physical_plan::collect;
1514
use datafusion::prelude::{SessionConfig, SessionContext};
1615
use datafusion_common::GetExt;
1716
use datafusion_physical_plan::ExecutionPlan;
@@ -135,11 +134,10 @@ pub async fn execute_query(
135134
ctx: &SessionContext,
136135
query: &str,
137136
) -> anyhow::Result<(Vec<RecordBatch>, Arc<dyn ExecutionPlan>)> {
138-
let plan = ctx.sql(query).await?;
139-
let (state, plan) = plan.into_parts();
140-
let physical_plan = state.create_physical_plan(&plan).await?;
137+
let df = ctx.sql(query).await?;
138+
let physical_plan = df.clone().create_physical_plan().await?;
139+
let result = df.collect().await?;
141140

142-
let result = collect(physical_plan.clone(), state.task_ctx()).await?;
143141
Ok((result, physical_plan))
144142
}
145143

vortex-datafusion/src/persistent/mod.rs

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -205,36 +205,28 @@ mod tests {
205205

206206
session
207207
.sql(&format!(
208-
"CREATE EXTERNAL TABLE my_tbl_vx \
208+
"CREATE EXTERNAL TABLE my_tbl \
209209
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
210-
STORED AS vortex \
210+
STORED AS VORTEX \
211211
WITH ORDER (c1 ASC)
212-
LOCATION '{}/vx/'",
212+
LOCATION '{}/'",
213213
dir.path().to_str().unwrap()
214214
))
215215
.await?;
216216

217217
session
218-
.sql("INSERT INTO my_tbl_vx VALUES ('air', 5), ('balloon', 42)")
218+
.sql("INSERT INTO my_tbl VALUES ('air', 10), ('alabama', 20), ('balloon', 30)")
219219
.await?
220220
.collect()
221221
.await?;
222222

223223
session
224-
.sql("INSERT INTO my_tbl_vx VALUES ('zebra', 5)")
224+
.sql("INSERT INTO my_tbl VALUES ('kangaroo', 11), ('zebra', 21)")
225225
.await?
226226
.collect()
227227
.await?;
228228

229-
session
230-
.sql("INSERT INTO my_tbl_vx VALUES ('texas', 2000), ('alabama', 2000)")
231-
.await?
232-
.collect()
233-
.await?;
234-
235-
let df = session
236-
.sql("SELECT * FROM my_tbl_vx ORDER BY c1 ASC limit 3")
237-
.await?;
229+
let df = session.sql("SELECT * FROM my_tbl ORDER BY c1 ASC").await?;
238230

239231
let physical_plan = df.clone().create_physical_plan().await?;
240232

@@ -243,27 +235,28 @@ mod tests {
243235
┌───────────────────────────┐
244236
│ SortPreservingMergeExec │
245237
│ -------------------- │
246-
│ c1 ASC NULLS LASTlimit: │
247-
│ 3 │
238+
│ c1 ASC NULLS LAST │
248239
└─────────────┬─────────────┘
249240
┌─────────────┴─────────────┐
250241
│ DataSourceExec │
251242
│ -------------------- │
252-
│ files: 3
243+
│ files: 2
253244
│ format: vortex │
254245
└───────────────────────────┘
255246
");
256247

257248
let r = df.collect().await?;
258249

259250
insta::assert_snapshot!(pretty_format_batches(&r)?.to_string(), @r"
260-
+---------+------+
261-
| c1 | c2 |
262-
+---------+------+
263-
| air | 5 |
264-
| alabama | 2000 |
265-
| balloon | 42 |
266-
+---------+------+
251+
+----------+----+
252+
| c1 | c2 |
253+
+----------+----+
254+
| air | 10 |
255+
| alabama | 20 |
256+
| balloon | 30 |
257+
| kangaroo | 11 |
258+
| zebra | 21 |
259+
+----------+----+
267260
");
268261

269262
Ok(())

0 commit comments

Comments
 (0)