Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions _control/solutions.csv
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ dask,groupby
dask,join
juliadf,groupby
juliadf,join
juliads,groupby
juliads,join
clickhouse,groupby
clickhouse,join
cudf,groupby
Expand Down
58 changes: 58 additions & 0 deletions _helpers/helpersds.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using Printf; # sprintf macro to print in non-scientific format
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIU this file could be shared between juliadf and juliads. As we do for pyton and R tools.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMD uses DLMReader for writing the log file, however, DataFrames.jl uses CSV. So one possible way to combine these is to have different write_log functions in the same file.

using Pkg;

# from https://github.com/JuliaLang/Pkg.jl/issues/793
function getpkgmeta(name::AbstractString)
fname = joinpath(dirname(Base.active_project()), "Manifest.toml")
Pkg.TOML.parse(read(fname, String))["deps"][name][1]
end;

function write_log(run, task, data, in_rows, question, out_rows, out_cols, solution, version, git, fun, time_sec, mem_gb, cache, chk, chk_time_sec, on_disk)
file=try
ENV["CSV_TIME_FILE"]
catch
"time.csv"
end;
if (occursin("/", file) && SubString(file, 1, 1)!="/") # otherwise we assume full path
file="$(pwd())/$file";
end;
batch=try
ENV["BATCH"]
catch
""
end;
if (isfile(file) && filesize(file)==0)
rm(file)
end;
nodename=gethostname()
comment="" # placeholder for updates to timing data
time_sec=round(time_sec, digits=3)
mem_gb=round(mem_gb, digits=3)
chk_time_sec=round(chk_time_sec, digits=3)
timestamp=@sprintf("%0.6f", time())
csv_verbose = false
log = Dataset(nodename=nodename, batch=batch, timestamp=timestamp, task=task, data=data, in_rows=in_rows, question=question, out_rows=out_rows, out_cols=out_cols, solution=solution, version=version, git=git, fun=fun, run=run, time_sec=time_sec, mem_gb=mem_gb, cache=uppercase(string(cache)), chk=chk, chk_time_sec=chk_time_sec, comment=comment, on_disk=uppercase(string(on_disk)))
filewriter(file, log, append=isfile(file), header=!isfile(file))
end;

function make_chk(x)
n = length(x)
res = ""
for i = 1:n
res = string(res, i==1 ? "" : ";", @sprintf("%0.3f", x[i]))
end
res
end;

function memory_usage()
pid = getpid()
s = read(pipeline(`ps -o rss $pid`,`tail -1`), String)
parse(Float64, replace(s, "\n" => "")) / (1024^2)
end;

function join_to_tbls(data_name)
x_n = Int(parse(Float64, split(data_name, "_")[2]))
y_n = [x_n/1e6, x_n/1e3, x_n]
y_n = [replace(@sprintf("%.0e", y_n[1]), r"[+]0?"=>""), replace(@sprintf("%.0e", y_n[2]), r"[+]0?"=>""), replace(@sprintf("%.0e", y_n[3]), r"[+]0?"=>"")]
[replace(data_name, "NA" => y_n[1]), replace(data_name, "NA" => y_n[2]), replace(data_name, "NA" => y_n[3])]
end;
4 changes: 2 additions & 2 deletions _launcher/solution.R
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ file.ext = function(x) {
"data.table"=, "dplyr"=, "h2o"=, "arrow"=, "duckdb"="R",
"pandas"=, "cudf"=, "spark"=, "pydatatable"=, "modin"=, "dask"=, "polars"="py",
"clickhouse"="sql",
"juliadf"="jl"
"juliadf"=, "juliads"="jl"
)
if (is.null(ans)) stop(sprintf("solution %s does not have file extension defined in file.ext helper function", x))
ans
Expand Down Expand Up @@ -154,7 +154,7 @@ setenv("SRC_DATANAME", d)

ns = solution.path(s)
ext = file.ext(s)
localcmd = if (s %in% c("clickhouse","h2o","juliadf")) { # custom launcher bash script, for clickhouse h2o juliadf
localcmd = if (s %in% c("clickhouse","h2o","juliadf", "juliads")) { # custom launcher bash script, for clickhouse h2o juliadf
sprintf("exec.sh %s", t)
} else sprintf("%s-%s.%s", t, ns, ext)
cmd = sprintf("./%s/%s", ns, localcmd)
Expand Down
12 changes: 12 additions & 0 deletions juliads/exec.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash
set -e

if [ "$#" -ne 1 ]; then
echo 'usage: ./juliads/exec.sh groupby';
exit 1
fi;

source ./path.env

# execute benchmark script
julia -t 20 ./juliads/$1-juliads.jl
204 changes: 204 additions & 0 deletions juliads/groupby-juliads.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
#!/usr/bin/env julia

print("# groupby-juliads.jl\n"); flush(stdout);

using InMemoryDatasets;
using Printf;
using DLMReader;
using PooledArrays

# Force Julia to precompile methods for common patterns
IMD.warmup()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have a comment what this warm up actually do

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment for this.


include("$(pwd())/_helpers/helpersds.jl");

pkgmeta = getpkgmeta("InMemoryDatasets");
ver = pkgmeta["version"];
git = "";
task = "groupby";
solution = "juliads";
fun = "combine";
cache = true;
on_disk = false;

data_name = ENV["SRC_DATANAME"];
src_grp = string("_data/", data_name, ".csv");
println(string("loading dataset ", data_name)); flush(stdout);

x = filereader(src_grp, types=[Characters{5}, Characters{5}, Characters{12}, Int32, Int32, Int32, Int32, Int32, Float64]);
modify!(x, 1:3 => PooledArray)

in_rows = size(x, 1);
println(in_rows); flush(stdout);

task_init = time();
print("grouping...\n"); flush(stdout);

question = "sum v1 by id1"; # q1
GC.gc();
t = @elapsed (ANS = combine(gatherby(x, :id1, stable = false), :v1 => IMD.sum => :v1); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = sum(ANS.v1);
write_log(1, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
ANS = 0;
GC.gc();
t = @elapsed (ANS = combine(gatherby(x, :id1, stable = false), :v1 => IMD.sum => :v1); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = sum(ANS.v1);
write_log(2, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
println(first(ANS, 3));
println(last(ANS, 3));
ANS = 0;

question = "sum v1 by id1:id2"; # q2
GC.gc();
t = @elapsed (ANS = combine(gatherby(x, [:id1, :id2], stable = false), :v1 => IMD.sum => :v1); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = sum(ANS.v1);
write_log(1, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
ANS = 0;
GC.gc();
t = @elapsed (ANS = combine(gatherby(x, [:id1, :id2], stable = false), :v1 => IMD.sum => :v1); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = sum(ANS.v1);
write_log(2, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
println(first(ANS, 3));
println(last(ANS, 3));
ANS = 0;

question = "sum v1 mean v3 by id3"; # q3
GC.gc();
t = @elapsed (ANS = combine(gatherby(x, :id3, stable = false), :v1 => IMD.sum => :v1, :v3 => IMD.mean => :v3); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = [sum(ANS.v1), sum(ANS.v3)];
write_log(1, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
ANS = 0;
GC.gc();
t = @elapsed (ANS = combine(gatherby(x, :id3, stable = false), :v1 => IMD.sum => :v1, :v3 => IMD.mean => :v3); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = [sum(ANS.v1), sum(ANS.v3)];
write_log(2, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
println(first(ANS, 3));
println(last(ANS, 3));
ANS = 0;

question = "mean v1:v3 by id4"; # q4
GC.gc();
t = @elapsed (ANS = combine(gatherby(x, :id4, stable = false), :v1 => IMD.mean => :v1, :v2 => IMD.mean => :v2, :v3 => IMD.mean => :v3); println(size(ANS)); flush(stdout));
m = memory_usage();
t_start = time_ns();
chkt = @elapsed chk = [sum(ANS.v1), sum(ANS.v2), sum(ANS.v3)];
write_log(1, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
ANS = 0;
GC.gc();
t = @elapsed (ANS = combine(gatherby(x, :id4, stable = false), :v1 => IMD.mean => :v1, :v2 => IMD.mean => :v2, :v3 => IMD.mean => :v3); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = [sum(ANS.v1), sum(ANS.v2), sum(ANS.v3)];
write_log(2, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
println(first(ANS, 3));
println(last(ANS, 3));
ANS = 0;

question = "sum v1:v3 by id6"; # q5
GC.gc();
t = @elapsed (ANS = combine(groupby(x, :id6, stable=false), :v1 => IMD.sum => :v1, :v2 => IMD.sum => :v2, :v3 => IMD.sum => :v3); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = [sum(ANS.v1), sum(ANS.v2), sum(ANS.v3)];
write_log(1, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
ANS = 0;
GC.gc();
t = @elapsed (ANS = combine(groupby(x, :id6, stable=false), :v1 => IMD.sum => :v1, :v2 => IMD.sum => :v2, :v3 => IMD.sum => :v3); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = [sum(ANS.v1), sum(ANS.v2), sum(ANS.v3)];
write_log(2, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
println(first(ANS, 3));
println(last(ANS, 3));
ANS = 0;

question = "median v3 sd v3 by id4 id5"; # q6
GC.gc();
t = @elapsed (ANS = combine(groupby(x, [:id4, :id5], stable=false), :v3 => median! => :median_v3, :v3 => std => :sd_v3); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = [sum(ANS.median_v3), sum(ANS.sd_v3)];
write_log(1, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
ANS = 0;
GC.gc();
t = @elapsed (ANS = combine(groupby(x, [:id4, :id5], stable=false), :v3 => median! => :median_v3, :v3 => std => :sd_v3); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = [sum(ANS.median_v3), sum(ANS.sd_v3)];;
write_log(2, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
println(first(ANS, 3));
println(last(ANS, 3));
ANS = 0;

question = "max v1 - min v2 by id3"; # q7
GC.gc();
t = @elapsed (ANS = combine(gatherby(x, :id3, stable = false), :v1 => IMD.maximum, :v2 => IMD.minimum, 2:3 => byrow(-) => :range_v1_v2); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = sum(ANS.range_v1_v2);
write_log(1, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
ANS = 0;
GC.gc();
t = @elapsed (ANS = combine(gatherby(x, :id3, stable = false), :v1 => IMD.maximum, :v2 => IMD.minimum, 2:3 => byrow(-) => :range_v1_v2); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = sum(ANS.range_v1_v2);
write_log(2, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
println(first(ANS, 3));
println(last(ANS, 3));
ANS = 0;

question = "largest two v3 by id6"; # q8
GC.gc();
t = @elapsed (ANS = combine(groupby(x, :id6, stable=false), :v3 => (x -> partialsort!(x, 1:min(2, IMD.n(x)), by = -)) => :largest2_v3); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = sum(ANS.largest2_v3);
write_log(1, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
ANS = 0;
GC.gc();
t = @elapsed (ANS = combine(groupby(x, :id6, stable=false), :v3 => (x -> partialsort!(x, 1:min(2, IMD.n(x)), by = -)) => :largest2_v3); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = sum(ANS.largest2_v3);
write_log(2, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
println(first(ANS, 3));
println(last(ANS, 3));
ANS = 0;

question = "regression v1 v2 by id2 id4"; # q9
function cor2(x, y) ## 73647e5a81d4b643c51bd784b3c8af04144cfaf6
nm = @. !ismissing(x) & !ismissing(y)
return count(nm) < 2 ? NaN : cor(view(x, nm), view(y, nm))
end
GC.gc();
t = @elapsed (ANS = combine(groupby(x, [:id2, :id4], stable=false), (:v1, :v2) => ((v1,v2) -> cor2(v1, v2)^2) => :r2); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = sum(ANS.r2);
write_log(1, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
ANS = 0;
GC.gc();
t = @elapsed (ANS = combine(groupby(x, [:id2, :id4], stable=false), (:v1, :v2) => ((v1,v2) -> cor2(v1, v2)^2) => :r2); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = sum(ANS.r2);
write_log(2, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
println(first(ANS, 3));
println(last(ANS, 3));
ANS = 0;

question = "sum v3 count by id1:id6"; # q10
GC.gc();
t = @elapsed (ANS = combine(gatherby(x, [:id1, :id2, :id3, :id4, :id5, :id6], stable=false), :v3 => IMD.sum => :v3, :v3 => length => :count); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = [sum(ANS.v3), sum(ANS.count)];
write_log(1, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
ANS = 0;
GC.gc();
t = @elapsed (ANS = combine(gatherby(x, [:id1, :id2, :id3, :id4, :id5, :id6], stable=false), :v3 => IMD.sum => :v3, :v3 => length => :count); println(size(ANS)); flush(stdout));
m = memory_usage();
chkt = @elapsed chk = [sum(ANS.v3), sum(ANS.count)];
write_log(2, task, data_name, in_rows, question, size(ANS, 1), size(ANS, 2), solution, ver, git, fun, t, m, cache, make_chk(chk), chkt, on_disk);
println(first(ANS, 3));
println(last(ANS, 3));
ANS = 0;

print(@sprintf "grouping finished, took %.0fs\n" (time()-task_init)); flush(stdout);

exit();
Loading