diff --git a/.gitignore b/.gitignore index 3f072cfb..c77f2bc9 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,5 @@ run.out clickhouse/etc_sudoers.bak workdir/ timeout-exit-codes.out +*/target +*.lock diff --git a/_benchplot/benchplot-dict.R b/_benchplot/benchplot-dict.R index 292aaec9..74632e6a 100644 --- a/_benchplot/benchplot-dict.R +++ b/_benchplot/benchplot-dict.R @@ -40,7 +40,8 @@ solution.dict = {list( "juliadf" = list(name=c(short="DF.jl", long="DataFrames.jl"), color=c(strong="deepskyblue", light="darkturquoise")), "clickhouse" = list(name=c(short="clickhouse", long="ClickHouse"), color=c(strong="hotpink4", light="hotpink1")), "cudf" = list(name=c(short="cuDF", long="cuDF"), color=c(strong="peachpuff3", light="peachpuff1")), - "polars" = list(name=c(short="polars", long="Polars"), color=c(strong="deepskyblue4", light="deepskyblue3")) + "polars" = list(name=c(short="polars", long="Polars"), color=c(strong="deepskyblue4", light="deepskyblue3")), + "datafusion" = list(name=c(short="datafusion", long="Datafusion"), color=c(strong="deepskyblue4", light="deepskyblue3")) )} #barplot(rep(c(0L,1L,1L), length(solution.dict)), # col=rev(c(rbind(sapply(solution.dict, `[[`, "color"), "black"))), @@ -181,6 +182,18 @@ groupby.syntax.dict = {list( "largest two v3 by id6" = "DF.drop_nulls('v3').sort('v3', reverse=True).groupby('id6').agg(col('v3').head(2).alias('largest2_v3)).explode('largest2_v3').collect()", "regression v1 v2 by id2 id4" = "DF.groupby(['id2','id4']).agg(pl.pearson_corr('v1','v2').alias('r2')).with_column(col('r2')**2).collect()", "sum v3 count by id1:id6" = "DF.groupby(['id1','id2','id3','id4','id5','id6']).agg([pl.sum('v3').alias('v3'), pl.count('v1').alias('count')]).collect()" + )}, + "datafusion" = {c( + "sum v1 by id1" = "SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1", + "sum v1 by id1:id2" = "SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2", + "sum v1 mean v3 by id3" = "SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3", + "mean v1:v3 by id4" = "SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4", + "sum v1:v3 by id6" = "SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6", + "median v3 sd v3 by id4 id5" = "", + "max v1 - min v2 by id3" = "SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3", + "largest two v3 by id6" = "SELECT id6, v3 from (SELECT id6, v3, row_number() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS row FROM x) t WHERE row <= 2", + "regression v1 v2 by id2 id4" = "", + "sum v3 count by id1:id6" = "SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v3, COUNT(*) AS cnt FROM x GROUP BY id1, id2, id3, id4, id5, id6" )} )} groupby.query.exceptions = {list( @@ -195,7 +208,9 @@ groupby.query.exceptions = {list( "not yet implemented: cudf#2592" = "largest two v3 by id6", "not yet implemented: cudf#1267" = "regression v1 v2 by id2 id4"), "clickhouse" = list(), - "polars" = list() + "polars" = list(), + "datafusion" = list("not yet implemented: datafusion#1486" = "median v3 sd v3 by id4 id5", + "not yet implemented: datafusion#1486" = "regression v1 v2 by id2 id4"), )} groupby.data.exceptions = {list( # exceptions as of run 1575727624 "data.table" = {list( @@ -240,7 +255,8 @@ groupby.data.exceptions = {list( )}, "polars" = {list( "segfault: polars#260" = c("G1_1e9_1e2_0_0","G1_1e9_1e1_0_0","G1_1e9_2e0_0_0","G1_1e9_1e2_0_1","G1_1e9_1e2_5_0") # polars#260 - )} + )}, + "datafusion" = {list()} )} groupby.exceptions = task.exceptions(groupby.query.exceptions, groupby.data.exceptions) @@ -322,6 +338,13 @@ join.syntax.dict = {list( "medium outer on int" = "DF.merge(medium, how='left', on='id2')", "medium inner on factor" = "DF.merge(medium, on='id5')", "big inner on int" = "DF.merge(big, on='id3')" + )}, + "datafusion" = {c( + "small inner on int" = "SELECT x.id1, x.id2, x.id3, x.id4 as xid4, small.id4 as smallid4, x.id5, x.id6, x.v1, small.v2 FROM x INNER JOIN small ON x.id1 = small.id1", + "medium inner on int" = "SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x INNER JOIN medium ON x.id2 = medium.id2", + "medium outer on int" = "SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x LEFT JOIN medium ON x.id2 = medium.id2", + "medium inner on factor" = "SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x LEFT JOIN medium ON x.id5 = medium.id5", + "big inner on int" = "SELECT x.id1 as xid1, large.id1 as largeid1, x.id2 as xid2, large.id2 as largeid2, x.id3, x.id4 as xid4, large.id4 as largeid4, x.id5 as xid5, large.id5 as largeid5, x.id6 as xid6, large.id6 as largeid6, x.v1, large.v2 FROM x LEFT JOIN large ON x.id3 = large.id3" )} )} join.query.exceptions = {list( @@ -334,7 +357,8 @@ join.query.exceptions = {list( "juliadf" = list(), "cudf" = list(), "clickhouse" = list(), - "polars" = list() + "polars" = list(), + "datafusion" = list() )} join.data.exceptions = {list( # exceptions as of run 1575727624 "data.table" = {list( @@ -372,6 +396,7 @@ join.data.exceptions = {list( )}, "polars" = {list( "segfault: polars#260" = c("J1_1e9_NA_0_0","J1_1e9_NA_5_0","J1_1e9_NA_0_1") # polars#260 - )} + )}, + "datafusion" = {list()} )} join.exceptions = task.exceptions(join.query.exceptions, join.data.exceptions) diff --git a/_launcher/launcher.R b/_launcher/launcher.R index 9b111585..4ecd208a 100644 --- a/_launcher/launcher.R +++ b/_launcher/launcher.R @@ -15,7 +15,7 @@ file.ext = function(x) { ans = switch( x, "data.table"=, "dplyr"=, "h2o"="R", - "pandas"=, "cudf"=, "spark"=, "pydatatable"=, "modin"=, "dask"=, "polars"="py", + "pandas"=, "cudf"=, "spark"=, "pydatatable"=, "modin"=, "dask"=, "polars"="py", "datafusion"="py", "clickhouse"="sql", "juliadf"="jl" ) diff --git a/_launcher/solution.R b/_launcher/solution.R index 0c26402d..1ed01f20 100755 --- a/_launcher/solution.R +++ b/_launcher/solution.R @@ -111,7 +111,7 @@ file.ext = function(x) { ans = switch( x, "data.table"=, "dplyr"=, "h2o"="R", - "pandas"=, "cudf"=, "spark"=, "pydatatable"=, "modin"=, "dask"=, "polars"="py", + "pandas"=, "cudf"=, "spark"=, "pydatatable"=, "modin"=, "dask"=, "polars"="py", "datafusion"="py", "clickhouse"="sql", "juliadf"="jl" ) diff --git a/_report/history.Rmd b/_report/history.Rmd index 6d288a6f..2afce764 100644 --- a/_report/history.Rmd +++ b/_report/history.Rmd @@ -486,6 +486,48 @@ plot(d, "polars", 1e8, "join") plot(d, "polars", 1e9, "join") ``` +### datafusion {.tabset .tabset-fade .tabset-pills} + +#### groupby {.tabset .tabset-fade .tabset-pills} + +##### 0.5 GB + +```{r datafusion.groupby.1e7} +plot(d, "datafusion", 1e7, "groupby") +``` + +##### 5 GB + +```{r datafusion.groupby.1e8} +plot(d, "datafusion", 1e8, "groupby") +``` + +##### 50 GB {.active} + +```{r datafusion.groupby.1e9} +plot(d, "datafusion", 1e9, "groupby") +``` + +#### join {.tabset .tabset-fade .tabset-pills} + +##### 0.5 GB + +```{r datafusion.join.1e7} +plot(d, "datafusion", 1e7, "join") +``` + +##### 5 GB {.active} + +```{r datafusion.join.1e8} +plot(d, "datafusion", 1e8, "join") +``` + +##### 50 GB + +```{r datafusion.join.1e9} +plot(d, "datafusion", 1e9, "join") +``` + ## Details ### Environment diff --git a/_report/report.R b/_report/report.R index acbb161e..1bcc5d8d 100644 --- a/_report/report.R +++ b/_report/report.R @@ -5,7 +5,7 @@ get_report_status_file = function(path=getwd()) { file.path(path, "report-done") } get_report_solutions = function() { - c("data.table", "dplyr", "pandas", "pydatatable", "spark", "dask", "juliadf", "clickhouse", "cudf", "polars") + c("data.table", "dplyr", "pandas", "pydatatable", "spark", "dask", "juliadf", "clickhouse", "cudf", "polars", "datafusion") } get_data_levels = function() { ## groupby diff --git a/datafusion/groupby-datafusion.py b/datafusion/groupby-datafusion.py new file mode 100644 index 00000000..c1b21ad5 --- /dev/null +++ b/datafusion/groupby-datafusion.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python + +print("# groupby-datafusion.py", flush=True) + +import os +import gc +import timeit +import datafusion as df +from datafusion import functions as f +from datafusion import col +from pyarrow import csv as pacsv + +exec(open("./_helpers/helpers.py").read()) + +def ans_shape(batches): + rows, cols = 0, 0 + for batch in batches: + rows += batch.num_rows + if cols == 0: + cols = batch.num_columns + else: + assert(cols == batch.num_columns) + + return rows, cols + +# ver = df.__version__ +ver = "6.0.0" +git = "" +task = "groupby" +solution = "datafusion" +fun = ".groupby" +cache = "TRUE" +on_disk = "FALSE" + +data_name = os.environ["SRC_DATANAME"] +src_grp = os.path.join("data", data_name + ".csv") +print("loading dataset %s" % data_name, flush=True) + +data = pacsv.read_csv(src_grp, convert_options=pacsv.ConvertOptions(auto_dict_encode=True)) + +ctx = df.ExecutionContext() +ctx.register_record_batches("x", [data.to_batches()]) + +in_rows = data.num_rows +print(in_rows, flush=True) + +task_init = timeit.default_timer() + +question = "sum v1 by id1" # q1 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1").collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "sum v1 by id1:id2" # q2 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2").collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "sum v1 mean v3 by id3" # q3 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3").collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "mean v1:v3 by id4" # q4 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4").collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "sum v1:v3 by id6" # q5 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6").collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "max v1 - min v2 by id3" # q7 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3").collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("range_v1_v2"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "largest two v3 by id6" # q8 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT id6, v3 from (SELECT id6, v3, row_number() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS row FROM x) t WHERE row <= 2").collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v3"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "sum v3 count by id1:id6" # q10 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v3, COUNT(*) AS cnt FROM x GROUP BY id1, id2, id3, id4, id5, id6").collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v3")), f.sum(col("cnt"))]).collect()[0].to_pandas().to_numpy()[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +print("grouping finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True) + +exit(0) diff --git a/datafusion/join-datafusion.py b/datafusion/join-datafusion.py new file mode 100755 index 00000000..e61e6878 --- /dev/null +++ b/datafusion/join-datafusion.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python + +print("# join-datafusion.py", flush=True) + +import os +import gc +import timeit +import datafusion as df +from datafusion import functions as f +from datafusion import col +from pyarrow import csv as pacsv + +exec(open("./_helpers/helpers.py").read()) + +def ans_shape(batches): + rows, cols = 0, 0 + for batch in batches: + rows += batch.num_rows + if cols == 0: + cols = batch.num_columns + else: + assert(cols == batch.num_columns) + + return rows, cols + +ver = "6.0.0" +task = "join" +git = "" +solution = "datafusion" +fun = ".join" +cache = "TRUE" +on_disk = "FALSE" + +data_name = os.environ["SRC_DATANAME"] +src_jn_x = os.path.join("data", data_name + ".csv") +y_data_name = join_to_tbls(data_name) +src_jn_y = [os.path.join("data", y_data_name[0] + ".csv"), os.path.join("data", y_data_name[1] + ".csv"), os.path.join("data", y_data_name[2] + ".csv")] +if len(src_jn_y) != 3: + raise Exception("Something went wrong in preparing files used for join") + +print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[2] + ", " + y_data_name[2], flush=True) + +ctx = df.ExecutionContext() + +x_data = pacsv.read_csv(src_jn_x, convert_options=pacsv.ConvertOptions(auto_dict_encode=True)) +ctx.register_record_batches("x", [x_data.to_batches()]) +small_data = pacsv.read_csv(src_jn_y[0], convert_options=pacsv.ConvertOptions(auto_dict_encode=True)) +ctx.register_record_batches("small", [small_data.to_batches()]) +medium_data = pacsv.read_csv(src_jn_y[1], convert_options=pacsv.ConvertOptions(auto_dict_encode=True)) +ctx.register_record_batches("medium", [medium_data.to_batches()]) +large_data = pacsv.read_csv(src_jn_y[2], convert_options=pacsv.ConvertOptions(auto_dict_encode=True)) +ctx.register_record_batches("large", [large_data.to_batches()]) + +print(x_data.num_rows, flush=True) +print(small_data.num_rows, flush=True) +print(medium_data.num_rows, flush=True) +print(large_data.num_rows, flush=True) + +task_init = timeit.default_timer() +print("joining...", flush=True) + +question = "small inner on int" # q1 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT x.id1, x.id2, x.id3, x.id4 as xid4, small.id4 as smallid4, x.id5, x.id6, x.v1, small.v2 FROM x INNER JOIN small ON x.id1 = small.id1").collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +m = memory_usage() +write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "medium inner on int" # q2 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x INNER JOIN medium ON x.id2 = medium.id2").collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +m = memory_usage() +write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "medium outer on int" # q3 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x LEFT JOIN medium ON x.id2 = medium.id2").collect() +shape = ans_shape(ans) +print(shape, flush=True) +t = timeit.default_timer() - t_start +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +m = memory_usage() +write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "medium inner on factor" # q4 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x LEFT JOIN medium ON x.id5 = medium.id5").collect() +shape = ans_shape(ans) +print(shape) +t = timeit.default_timer() - t_start +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +m = memory_usage() +write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "big inner on int" # q5 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT x.id1 as xid1, large.id1 as largeid1, x.id2 as xid2, large.id2 as largeid2, x.id3, x.id4 as xid4, large.id4 as largeid4, x.id5 as xid5, large.id5 as largeid5, x.id6 as xid6, large.id6 as largeid6, x.v1, large.v2 FROM x LEFT JOIN large ON x.id3 = large.id3").collect() +shape = ans_shape(ans) +print(shape) +t = timeit.default_timer() - t_start +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +m = memory_usage() +write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +print("joining finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True) + +exit(0) diff --git a/datafusion/setup-datafusion.sh b/datafusion/setup-datafusion.sh new file mode 100755 index 00000000..ba74868a --- /dev/null +++ b/datafusion/setup-datafusion.sh @@ -0,0 +1,35 @@ +#!/bin/bash +set -e + +# install dependencies +sudo apt-get update -qq +sudo apt-get install -y python3.6-dev virtualenv + +virtualenv datafusion/py-datafusion --python=/usr/bin/python3.6 +source datafusion/py-datafusion/bin/activate + +python -m pip install --upgrade psutil datafusion + +# build +deactivate +./datafusion/upg-datafusion.sh + +# check +# source datafusion/py-datafusion/bin/activate +# python +# import datafusion as df +# df.__version__ +# quit() +# deactivate +echo "0.4.0" + +# fix: print(ans.head(3), flush=True): UnicodeEncodeError: 'ascii' codec can't encode characters in position 14-31: ordinal not in range(128) +vim datafusion/py-datafusion/bin/activate +#deactivate () { +# unset PYTHONIOENCODING +# ... +#} +#... +#PYTHONIOENCODING="utf-8" +#export PYTHONIOENCODING +#... diff --git a/datafusion/upg-datafusion.sh b/datafusion/upg-datafusion.sh new file mode 100755 index 00000000..c1d6c2eb --- /dev/null +++ b/datafusion/upg-datafusion.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -e + +echo 'upgrading datafusion...' + +source ./datafusion/py-datafusion/bin/activate + +python -m pip install --upgrade datafusion > /dev/null \ No newline at end of file diff --git a/datafusion/ver-datafusion.sh b/datafusion/ver-datafusion.sh new file mode 100755 index 00000000..df32ed50 --- /dev/null +++ b/datafusion/ver-datafusion.sh @@ -0,0 +1 @@ +echo "0.4.0" > datafusion/VERSION \ No newline at end of file diff --git a/run.conf b/run.conf index de58a433..37d08736 100644 --- a/run.conf +++ b/run.conf @@ -1,7 +1,7 @@ # task, used in init-setup-iteration.R export RUN_TASKS="groupby join" # solution, used in init-setup-iteration.R -export RUN_SOLUTIONS="data.table pydatatable dplyr pandas spark dask juliadf cudf clickhouse polars" +export RUN_SOLUTIONS="data.table pydatatable dplyr pandas spark dask juliadf cudf clickhouse polars datafusion" # flag to upgrade tools, used in run.sh on init export DO_UPGRADE=true diff --git a/run.sh b/run.sh index c1c45e07..e7c30c02 100755 --- a/run.sh +++ b/run.sh @@ -62,6 +62,8 @@ if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "h2o" ]]; then ./h2o/upg-h2o. if [[ "$RUN_SOLUTIONS" =~ "h2o" ]]; then ./h2o/ver-h2o.sh; fi; if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "polars" ]]; then ./polars/upg-polars.sh; fi; if [[ "$RUN_SOLUTIONS" =~ "polars" ]]; then ./polars/ver-polars.sh; fi; +if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "datafusion" ]]; then ./datafusion/upg-datafusion.sh; fi; +if [[ "$RUN_SOLUTIONS" =~ "datafusion" ]]; then ./datafusion/ver-datafusion.sh; fi; # run if [[ -f ./stop ]]; then echo "# Benchmark run $BATCH has been interrupted after $(($(date +%s)-$BATCH))s due to 'stop' file" && rm -f ./stop && rm -f ./run.lock && exit; fi;