From f4787ad37d413144f5d62cee6618b5df5f0d3bfd Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sun, 13 Feb 2022 08:55:13 +0100 Subject: [PATCH 1/6] fix polars groupby script to polars==0.13.0 --- polars/groupby-polars.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/polars/groupby-polars.py b/polars/groupby-polars.py index 3804a5bd..c3cd508b 100755 --- a/polars/groupby-polars.py +++ b/polars/groupby-polars.py @@ -6,7 +6,7 @@ import gc import timeit import polars as pl -from polars.lazy import col +from polars import col exec(open("./_helpers/helpers.py").read()) @@ -42,7 +42,7 @@ question = "sum v1 by id1" # q1 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id1").agg(pl.sum("v1")).collect() +ans = x.groupby("id1").agg(pl.sum("v1").alias("v1_sum")).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -53,7 +53,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id1").agg(pl.sum("v1")).collect() +ans = x.groupby("id1").agg(pl.sum("v1").alias("v1_sum")).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -68,7 +68,7 @@ question = "sum v1 by id1:id2" # q2 gc.collect() t_start = timeit.default_timer() -ans = x.groupby(["id1","id2"]).agg(pl.sum("v1")).collect() +ans = x.groupby(["id1","id2"]).agg(pl.sum("v1").alias("v1_sum")).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -79,7 +79,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby(["id1","id2"]).agg(pl.sum("v1")).collect() +ans = x.groupby(["id1","id2"]).agg(pl.sum("v1").alias("v1_sum")).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -94,7 +94,7 @@ question = "sum v1 mean v3 by id3" # q3 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id3").agg([pl.sum("v1"), pl.mean("v3")]).collect() +ans = x.groupby("id3").agg([pl.sum("v1").alias("v1_sum"), pl.mean("v3").alias("v3_mean")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -105,7 +105,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id3").agg([pl.sum("v1"), pl.mean("v3")]).collect() +ans = x.groupby("id3").agg([pl.sum("v1").alias("v1_sum"), pl.mean("v3").alias("v3_mean")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -120,7 +120,7 @@ question = "mean v1:v3 by id4" # q4 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id4").agg([pl.mean("v1"), pl.mean("v2"), pl.mean("v3")]).collect() +ans = x.groupby("id4").agg([pl.mean("v1").alias("v1_mean"), pl.mean("v2").alias("v2_mean"), pl.mean("v3").alias("v3_mean")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -131,7 +131,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id4").agg([pl.mean("v1"), pl.mean("v2"), pl.mean("v3")]).collect() +ans = x.groupby("id4").agg([pl.mean("v1").alias("v1_mean"), pl.mean("v2").alias("v2_mean"), pl.mean("v3").alias("v3_mean")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -146,7 +146,7 @@ question = "sum v1:v3 by id6" # q5 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id6").agg([pl.sum("v1"), pl.sum("v2"), pl.sum("v3")]).collect() +ans = x.groupby("id6").agg([pl.sum("v1").alias("v1_sum"), pl.sum("v2").alias("v2_sum"), pl.sum("v3").alias("v3_sum")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -157,7 +157,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id6").agg([pl.sum("v1"), pl.sum("v2"), pl.sum("v3")]).collect() +ans = x.groupby("id6").agg([pl.sum("v1").alias("v1_sum"), pl.sum("v2").alias("v2_sum"), pl.sum("v3").alias("v3_sum")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() From 17a270244585327d224b4364000e32ad77c9b670 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 29 Aug 2022 15:12:23 +0200 Subject: [PATCH 2/6] fix for polars > 0.14.0 and improve q8 --- polars/groupby-polars.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/polars/groupby-polars.py b/polars/groupby-polars.py index c3cd508b..cd558afd 100755 --- a/polars/groupby-polars.py +++ b/polars/groupby-polars.py @@ -23,13 +23,8 @@ print("loading dataset %s" % data_name, flush=True) with pl.StringCache(): - x = pl.read_csv(src_grp, dtype={"id4":pl.Int32, "id5":pl.Int32, "id6":pl.Int32, "v1":pl.Int32, "v2":pl.Int32, "v3":pl.Float64}, low_memory=True) - x["id1"] = x["id1"].cast(pl.Categorical) - x["id1"].shrink_to_fit(in_place=True) - x["id2"] = x["id2"].cast(pl.Categorical) - x["id2"].shrink_to_fit(in_place=True) - x["id3"] = x["id3"].cast(pl.Categorical) - x["id3"].shrink_to_fit(in_place=True) + x = (pl.read_csv(src_grp, dtype={"id4":pl.Int32, "id5":pl.Int32, "id6":pl.Int32, "v1":pl.Int32, "v2":pl.Int32, "v3":pl.Float64}, low_memory=True) + .with_columns(pl.col(["id1", "id2", "id3"]).cast(pl.Categorical))) in_rows = x.shape[0] x = x.lazy() @@ -224,7 +219,7 @@ question = "largest two v3 by id6" # q8 gc.collect() t_start = timeit.default_timer() -ans = x.drop_nulls("v3").sort("v3", reverse=True).groupby("id6").agg(col("v3").head(2).alias("largest2_v3")).explode("largest2_v3").collect() +ans = x.drop_nulls("v3").groupby("id6").agg(col("v3").top_k(2).alias("largest2_v3")).explode("largest2_v3").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -235,7 +230,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.drop_nulls("v3").sort("v3", reverse=True).groupby("id6").agg(col("v3").head(2).alias("largest2_v3")).explode("largest2_v3").collect() +ans = x.drop_nulls("v3").groupby("id6").agg(col("v3").top_k(2).alias("largest2_v3")).explode("largest2_v3").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -299,6 +294,6 @@ print(ans.tail(3), flush=True) del ans -print("grouping finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True) +print("grouping finished, took %0.3fs" % (timeit.default_timer() - task_init), flush=True) exit(0) From 53c305c674a8bb853369cd6528868199ed21812f Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Fri, 31 Mar 2023 14:16:48 +0200 Subject: [PATCH 3/6] use streaming to reduce memory pressure --- polars/groupby-polars.py | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/polars/groupby-polars.py b/polars/groupby-polars.py index cd558afd..fb121cdd 100755 --- a/polars/groupby-polars.py +++ b/polars/groupby-polars.py @@ -21,9 +21,10 @@ data_name = os.environ["SRC_DATANAME"] src_grp = os.path.join("data", data_name + ".csv") print("loading dataset %s" % data_name, flush=True) +STREAMING = True with pl.StringCache(): - x = (pl.read_csv(src_grp, dtype={"id4":pl.Int32, "id5":pl.Int32, "id6":pl.Int32, "v1":pl.Int32, "v2":pl.Int32, "v3":pl.Float64}, low_memory=True) + x = (pl.read_csv(src_grp, dtypes={"id4":pl.Int32, "id5":pl.Int32, "id6":pl.Int32, "v1":pl.Int32, "v2":pl.Int32, "v3":pl.Float64}, low_memory=True) .with_columns(pl.col(["id1", "id2", "id3"]).cast(pl.Categorical))) in_rows = x.shape[0] @@ -37,7 +38,7 @@ question = "sum v1 by id1" # q1 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id1").agg(pl.sum("v1").alias("v1_sum")).collect() +ans = x.groupby("id1").agg(pl.sum("v1").alias("v1_sum")).collect(streaming=STREAMING) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -48,7 +49,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id1").agg(pl.sum("v1").alias("v1_sum")).collect() +ans = x.groupby("id1").agg(pl.sum("v1").alias("v1_sum")).collect(streaming=STREAMING) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -63,7 +64,7 @@ question = "sum v1 by id1:id2" # q2 gc.collect() t_start = timeit.default_timer() -ans = x.groupby(["id1","id2"]).agg(pl.sum("v1").alias("v1_sum")).collect() +ans = x.groupby(["id1","id2"]).agg(pl.sum("v1").alias("v1_sum")).collect(streaming=STREAMING) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -74,7 +75,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby(["id1","id2"]).agg(pl.sum("v1").alias("v1_sum")).collect() +ans = x.groupby(["id1","id2"]).agg(pl.sum("v1").alias("v1_sum")).collect(streaming=STREAMING) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -89,7 +90,7 @@ question = "sum v1 mean v3 by id3" # q3 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id3").agg([pl.sum("v1").alias("v1_sum"), pl.mean("v3").alias("v3_mean")]).collect() +ans = x.groupby("id3").agg([pl.sum("v1").alias("v1_sum"), pl.mean("v3").alias("v3_mean")]).collect(streaming=STREAMING) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -100,7 +101,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id3").agg([pl.sum("v1").alias("v1_sum"), pl.mean("v3").alias("v3_mean")]).collect() +ans = x.groupby("id3").agg([pl.sum("v1").alias("v1_sum"), pl.mean("v3").alias("v3_mean")]).collect(streaming=STREAMING) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -115,7 +116,7 @@ question = "mean v1:v3 by id4" # q4 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id4").agg([pl.mean("v1").alias("v1_mean"), pl.mean("v2").alias("v2_mean"), pl.mean("v3").alias("v3_mean")]).collect() +ans = x.groupby("id4").agg([pl.mean("v1").alias("v1_mean"), pl.mean("v2").alias("v2_mean"), pl.mean("v3").alias("v3_mean")]).collect(streaming=STREAMING) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -126,7 +127,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id4").agg([pl.mean("v1").alias("v1_mean"), pl.mean("v2").alias("v2_mean"), pl.mean("v3").alias("v3_mean")]).collect() +ans = x.groupby("id4").agg([pl.mean("v1").alias("v1_mean"), pl.mean("v2").alias("v2_mean"), pl.mean("v3").alias("v3_mean")]).collect(streaming=STREAMING) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -141,7 +142,7 @@ question = "sum v1:v3 by id6" # q5 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id6").agg([pl.sum("v1").alias("v1_sum"), pl.sum("v2").alias("v2_sum"), pl.sum("v3").alias("v3_sum")]).collect() +ans = x.groupby("id6").agg([pl.sum("v1").alias("v1_sum"), pl.sum("v2").alias("v2_sum"), pl.sum("v3").alias("v3_sum")]).collect(streaming=STREAMING) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -152,7 +153,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id6").agg([pl.sum("v1").alias("v1_sum"), pl.sum("v2").alias("v2_sum"), pl.sum("v3").alias("v3_sum")]).collect() +ans = x.groupby("id6").agg([pl.sum("v1").alias("v1_sum"), pl.sum("v2").alias("v2_sum"), pl.sum("v3").alias("v3_sum")]).collect(streaming=STREAMING) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -193,7 +194,7 @@ question = "max v1 - min v2 by id3" # q7 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id3").agg([(pl.max("v1") - pl.min("v2")).alias("range_v1_v2")]).collect() +ans = x.groupby("id3").agg([(pl.max("v1") - pl.min("v2")).alias("range_v1_v2")]).collect(streaming=STREAMING) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -204,7 +205,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id3").agg([(pl.max("v1") - pl.min("v2")).alias("range_v1_v2")]).collect() +ans = x.groupby("id3").agg([(pl.max("v1") - pl.min("v2")).alias("range_v1_v2")]).collect(streaming=STREAMING) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -245,7 +246,7 @@ question = "regression v1 v2 by id2 id4" # q9 gc.collect() t_start = timeit.default_timer() -ans = x.groupby(["id2","id4"]).agg((pl.pearson_corr("v1","v2")**2).alias("r2")).collect() +ans = x.groupby(["id2","id4"]).agg((pl.pearson_corr("v1","v2")**2).alias("r2")).collect(streaming=STREAMING) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -256,7 +257,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby(["id2","id4"]).agg((pl.pearson_corr("v1","v2")**2).alias("r2")).collect() +ans = x.groupby(["id2","id4"]).agg((pl.pearson_corr("v1","v2")**2).alias("r2")).collect(streaming=STREAMING) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -271,7 +272,7 @@ question = "sum v3 count by id1:id6" # q10 gc.collect() t_start = timeit.default_timer() -ans = x.groupby(["id1","id2","id3","id4","id5","id6"]).agg([pl.sum("v3").alias("v3"), pl.count("v1").alias("count")]).collect() +ans = x.groupby(["id1","id2","id3","id4","id5","id6"]).agg([pl.sum("v3").alias("v3"), pl.count("v1").alias("count")]).collect(streaming=STREAMING) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -282,7 +283,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby(["id1","id2","id3","id4","id5","id6"]).agg([pl.sum("v3").alias("v3"), pl.count("v1").alias("count")]).collect() +ans = x.groupby(["id1","id2","id3","id4","id5","id6"]).agg([pl.sum("v3").alias("v3"), pl.count("v1").alias("count")]).collect(streaming=STREAMING) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() From 1af31a61d1dbc1fb091a05aa4ccb54abc6f0cd4a Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Fri, 31 Mar 2023 14:17:28 +0200 Subject: [PATCH 4/6] fix join read_csv --- polars/__pycache__/groupby-polars.cpython-38.pyc | Bin 0 -> 6784 bytes polars/join-polars.py | 8 ++++---- 2 files changed, 4 insertions(+), 4 deletions(-) create mode 100644 polars/__pycache__/groupby-polars.cpython-38.pyc diff --git a/polars/__pycache__/groupby-polars.cpython-38.pyc b/polars/__pycache__/groupby-polars.cpython-38.pyc new file mode 100644 index 0000000000000000000000000000000000000000..b476d5d700561a46f9791f27ea488d02e980650b GIT binary patch literal 6784 zcmdT|Npssq7RE+`5-D2?X>DHM$g)k@BF#litZ@=6j#HJYOb)R#ht2_k*iR%REKCEW zE#6W(`yb>$9_JrT;W{V(gH%p=4S*EPk`6X&G!j|p<>BpLzXs4h&&|yW`1}3OfBkAd z6@>rtN%FUZljnGO|5+3S5U4;!D!r3Lfyz{|L?VX0M5M5ni5&I{NnxL|(zZ%eQGg80 z(DaEom>miyBF4_)5UiZS%-DHv?>g4BG;{<$Y;+4Z1W$`@*%D8 z0jH7L@Rp@?JTAX}()3oXU@jrigIia;L0;}d~wk2P%3RrIkq6j~xFCHTtl1Ta@{ z2R1`Z=yTYwVSgQRFbDH=4IaTZEId65QSg1E}5S<&-Rn&Xi%ky;W$s6 zQcF^P*`l}4%R9}tq^pwq|EsUl<^Rq(`7b1MkL$lqwV@ay@{tw5>u|6WR?sTeGq0q( zrEijpTIQMctMz6 zwdkh9_`x|CKb)x3B8@OEhV?of#-n;I#u#HiS~VAh@l}g%I*hl@!FYo{8b1p^=eQ2O zppQdLp2oB7(;f9=-cAzr9O*04gr*C^_^L%W9mb_|FdjXjcH_PJAH0r3)OTnBR9eP& zK$Si@5my96zw&8#)5-ro*s$CWg;=EgpVJHGJQ#y_4|UA=(d_}SU=x>Sog`jtt9jOX^T5|Oy<$3$OFlLD)r~QuC4Vd+WxVITr7*hkaVx2Bl*3E zlj1_4pt&Qa;v`T_M`!LEFUb7X1O@HA7uXi!D zejz5=U^rT@76nvebI4BAUQOrp{z7!OvO8j^vkWq@T*OrgxsbgYvCl_3$J`CZPed_4 zHJ+dTnhn&9ta)^7vyO?}VKba@CbY|WA{n3dZ`j6Lp6_kVHqp;&>}QyByoEj5HXV{> zhSLHak?3J;xawM%Z_R(>j_ovX&8ZA{f`MdXj;h|P_177+7>2@7Ih8~mHKoB^ke`~Ztv;)&Iy*ALtn(w+t+UAqefmeJHq&m!W{6KQOKx!d| z94pgjX&$uHu}C&96wBxzGmcoGz*}erirWDvP#74LWOY4W9J)?q9I`wzvg^FT17(2~ zc(cr#RcrzkoSw;W#m(>n&>h2uK^ z<2UMo_%=|QhUb&)3&V$&%gm->1u3s>V8PC#5u$L91K9%9hfCRlrjIAlaA0&TKU|TF z%ovz(E=2gR&Rgf|))9{Gw`81Gm z{a}Nsn5E3^Ag6`3sCX{(i3)E!mP>);I)FPER#ekvjAY-KejA5Y*LK(~^o!$U`u`T# z4o<@6uibLnwV{lTHrpoD8|5bIDbxM!eQ=s>+h9lK2<>vbp`GrMLFSq5(yj$x@ipLW zVFBl)j2QiM(!7)wmD9g8D=v!j(nB%N|2WG?dF0MXN+_`=UK4X-8ppif6{Rz&i>O12 zC2pT7F-c8I+#5@5C2UMmlM;`|64ivnBy~E8`q)O&t4V6o#`Cd6GFOw-q{Qx-5|h-V Y#HszHjY(=!;@Q|nGLA`VB(Wm>4Z_y8U;qFB literal 0 HcmV?d00001 diff --git a/polars/join-polars.py b/polars/join-polars.py index 75ed5f73..73a02e8c 100755 --- a/polars/join-polars.py +++ b/polars/join-polars.py @@ -27,16 +27,16 @@ print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[2] + ", " + y_data_name[2], flush=True) with pl.StringCache(): - x = pl.read_csv(src_jn_x, dtype={"id1":pl.Int32, "id2":pl.Int32, "id3":pl.Int32, "v1":pl.Float64}) + x = pl.read_csv(src_jn_x, dtypes={"id1":pl.Int32, "id2":pl.Int32, "id3":pl.Int32, "v1":pl.Float64}) x["id4"] = x["id4"].cast(pl.Categorical) x["id5"] = x["id5"].cast(pl.Categorical) x["id6"] = x["id6"].cast(pl.Categorical) - small = pl.read_csv(src_jn_y[0], dtype={"id1":pl.Int32, "v2":pl.Float64}) + small = pl.read_csv(src_jn_y[0], dtypes={"id1":pl.Int32, "v2":pl.Float64}) small["id4"] = small["id4"].cast(pl.Categorical) - medium = pl.read_csv(src_jn_y[1], dtype={"id1":pl.Int32, "id2":pl.Int32, "v2":pl.Float64}) + medium = pl.read_csv(src_jn_y[1], dtypes={"id1":pl.Int32, "id2":pl.Int32, "v2":pl.Float64}) medium["id4"] = medium["id4"].cast(pl.Categorical) medium["id5"] = medium["id5"].cast(pl.Categorical) - big = pl.read_csv(src_jn_y[2], dtype={"id1":pl.Int32, "id2":pl.Int32, "id3":pl.Int32, "v2":pl.Float64}) + big = pl.read_csv(src_jn_y[2], dtypes={"id1":pl.Int32, "id2":pl.Int32, "id3":pl.Int32, "v2":pl.Float64}) big["id4"] = big["id4"].cast(pl.Categorical) big["id5"] = big["id5"].cast(pl.Categorical) big["id6"] = big["id6"].cast(pl.Categorical) From 9fe58bc3e4bb9726227bf99f52a7e58db0867c49 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Fri, 31 Mar 2023 15:07:41 +0200 Subject: [PATCH 5/6] fix join and cache files --- polars/groupby-polars.py | 5 +++ polars/join-polars.py | 97 +++++++++++++++++++++++++++------------- 2 files changed, 70 insertions(+), 32 deletions(-) diff --git a/polars/groupby-polars.py b/polars/groupby-polars.py index fb121cdd..e46bf6f4 100755 --- a/polars/groupby-polars.py +++ b/polars/groupby-polars.py @@ -28,9 +28,14 @@ .with_columns(pl.col(["id1", "id2", "id3"]).cast(pl.Categorical))) in_rows = x.shape[0] +x.write_ipc("/tmp/tmp.ipc") +del x +x = pl.read_ipc("/tmp/tmp.ipc", memory_map=True) x = x.lazy() +# materialize print(len(x.collect()), flush=True) +in_rows = x.collect().shape[0] task_init = timeit.default_timer() print("grouping...", flush=True) diff --git a/polars/join-polars.py b/polars/join-polars.py index 73a02e8c..cdcd059e 100755 --- a/polars/join-polars.py +++ b/polars/join-polars.py @@ -27,50 +27,83 @@ print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[2] + ", " + y_data_name[2], flush=True) with pl.StringCache(): - x = pl.read_csv(src_jn_x, dtypes={"id1":pl.Int32, "id2":pl.Int32, "id3":pl.Int32, "v1":pl.Float64}) - x["id4"] = x["id4"].cast(pl.Categorical) - x["id5"] = x["id5"].cast(pl.Categorical) - x["id6"] = x["id6"].cast(pl.Categorical) + x = (pl.read_csv(src_jn_x, dtypes={"id1":pl.Int32, "id2":pl.Int32, "id3":pl.Int32, "v1":pl.Float64}) + .with_columns( + pl.col(["id4", "id5", "id6"]).cast(pl.Categorical) + ) + ) small = pl.read_csv(src_jn_y[0], dtypes={"id1":pl.Int32, "v2":pl.Float64}) - small["id4"] = small["id4"].cast(pl.Categorical) - medium = pl.read_csv(src_jn_y[1], dtypes={"id1":pl.Int32, "id2":pl.Int32, "v2":pl.Float64}) - medium["id4"] = medium["id4"].cast(pl.Categorical) - medium["id5"] = medium["id5"].cast(pl.Categorical) - big = pl.read_csv(src_jn_y[2], dtypes={"id1":pl.Int32, "id2":pl.Int32, "id3":pl.Int32, "v2":pl.Float64}) - big["id4"] = big["id4"].cast(pl.Categorical) - big["id5"] = big["id5"].cast(pl.Categorical) - big["id6"] = big["id6"].cast(pl.Categorical) + small = small.with_columns( + pl.col("id4").cast(pl.Categorical) + ) + medium = (pl.read_csv(src_jn_y[1], dtypes={"id1":pl.Int32, "id2":pl.Int32, "v2":pl.Float64}) + .with_columns( + pl.col(["id4", "id5"]).cast(pl.Categorical), + )) + big = (pl.read_csv(src_jn_y[2], dtypes={"id1":pl.Int32, "id2":pl.Int32, "id3":pl.Int32, "v2":pl.Float64}) + .with_columns( + pl.col(["id4", "id5", "id6"]).cast(pl.Categorical) + )) print(len(x), flush=True) print(len(small), flush=True) print(len(medium), flush=True) print(len(big), flush=True) +with pl.StringCache(): + x.write_ipc("/tmp/x.ipc") + del x + x = pl.read_ipc("/tmp/x.ipc", memory_map=True) + x = x.lazy() + + small.write_ipc("/tmp/small.ipc") + del small + small = pl.read_ipc("/tmp/small.ipc", memory_map=True) + small = small.lazy() + + medium.write_ipc("/tmp/medium.ipc") + del medium + medium = pl.read_ipc("/tmp/medium.ipc", memory_map=True) + medium = medium.lazy() + + big.write_ipc("/tmp/big.ipc") + del big + big = pl.read_ipc("/tmp/big.ipc", memory_map=True) + big = big.lazy() + +# materialize +print(len(x.collect()), flush=True) +print(len(small.collect()), flush=True) +print(len(medium.collect()), flush=True) +print(len(big.collect()), flush=True) + +in_rows = x.collect().shape[0] + task_init = timeit.default_timer() print("joining...", flush=True) question = "small inner on int" # q1 gc.collect() t_start = timeit.default_timer() -ans = x.join(small, on="id1") +ans = x.join(small, on="id1").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.join(small, on="id1") +ans = x.join(small, on="id1").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans @@ -78,25 +111,25 @@ question = "medium inner on int" # q2 gc.collect() t_start = timeit.default_timer() -ans = x.join(medium, on="id2") +ans = x.join(medium, on="id2").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.join(medium, on="id2") +ans = x.join(medium, on="id2").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans @@ -104,25 +137,25 @@ question = "medium outer on int" # q3 gc.collect() t_start = timeit.default_timer() -ans = x.join(medium, how="left", on="id2") +ans = x.join(medium, how="left", on="id2").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.join(medium, how="left", on="id2") +ans = x.join(medium, how="left", on="id2").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans @@ -130,25 +163,25 @@ question = "medium inner on factor" # q4 gc.collect() t_start = timeit.default_timer() -ans = x.join(medium, on="id5") +ans = x.join(medium, on="id5").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.join(medium, on="id5") +ans = x.join(medium, on="id5").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans @@ -156,25 +189,25 @@ question = "big inner on int" # q5 gc.collect() t_start = timeit.default_timer() -ans = x.join(big, on="id3") +ans = x.join(big, on="id3").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.join(big, on="id3") +ans = x.join(big, on="id3").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans From 8eeb036aef8ad459f3e481ad49e27c2ba9ea2a3b Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Fri, 31 Mar 2023 18:06:22 +0200 Subject: [PATCH 6/6] let runtime decide streaming --- polars/groupby-polars.py | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/polars/groupby-polars.py b/polars/groupby-polars.py index e46bf6f4..e54024ff 100755 --- a/polars/groupby-polars.py +++ b/polars/groupby-polars.py @@ -21,7 +21,6 @@ data_name = os.environ["SRC_DATANAME"] src_grp = os.path.join("data", data_name + ".csv") print("loading dataset %s" % data_name, flush=True) -STREAMING = True with pl.StringCache(): x = (pl.read_csv(src_grp, dtypes={"id4":pl.Int32, "id5":pl.Int32, "id6":pl.Int32, "v1":pl.Int32, "v2":pl.Int32, "v3":pl.Float64}, low_memory=True) @@ -43,7 +42,7 @@ question = "sum v1 by id1" # q1 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id1").agg(pl.sum("v1").alias("v1_sum")).collect(streaming=STREAMING) +ans = x.groupby("id1").agg(pl.sum("v1").alias("v1_sum")).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -54,7 +53,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id1").agg(pl.sum("v1").alias("v1_sum")).collect(streaming=STREAMING) +ans = x.groupby("id1").agg(pl.sum("v1").alias("v1_sum")).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -69,7 +68,7 @@ question = "sum v1 by id1:id2" # q2 gc.collect() t_start = timeit.default_timer() -ans = x.groupby(["id1","id2"]).agg(pl.sum("v1").alias("v1_sum")).collect(streaming=STREAMING) +ans = x.groupby(["id1","id2"]).agg(pl.sum("v1").alias("v1_sum")).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -80,7 +79,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby(["id1","id2"]).agg(pl.sum("v1").alias("v1_sum")).collect(streaming=STREAMING) +ans = x.groupby(["id1","id2"]).agg(pl.sum("v1").alias("v1_sum")).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -95,7 +94,7 @@ question = "sum v1 mean v3 by id3" # q3 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id3").agg([pl.sum("v1").alias("v1_sum"), pl.mean("v3").alias("v3_mean")]).collect(streaming=STREAMING) +ans = x.groupby("id3").agg([pl.sum("v1").alias("v1_sum"), pl.mean("v3").alias("v3_mean")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -106,7 +105,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id3").agg([pl.sum("v1").alias("v1_sum"), pl.mean("v3").alias("v3_mean")]).collect(streaming=STREAMING) +ans = x.groupby("id3").agg([pl.sum("v1").alias("v1_sum"), pl.mean("v3").alias("v3_mean")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -121,7 +120,7 @@ question = "mean v1:v3 by id4" # q4 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id4").agg([pl.mean("v1").alias("v1_mean"), pl.mean("v2").alias("v2_mean"), pl.mean("v3").alias("v3_mean")]).collect(streaming=STREAMING) +ans = x.groupby("id4").agg([pl.mean("v1").alias("v1_mean"), pl.mean("v2").alias("v2_mean"), pl.mean("v3").alias("v3_mean")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -132,7 +131,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id4").agg([pl.mean("v1").alias("v1_mean"), pl.mean("v2").alias("v2_mean"), pl.mean("v3").alias("v3_mean")]).collect(streaming=STREAMING) +ans = x.groupby("id4").agg([pl.mean("v1").alias("v1_mean"), pl.mean("v2").alias("v2_mean"), pl.mean("v3").alias("v3_mean")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -147,7 +146,7 @@ question = "sum v1:v3 by id6" # q5 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id6").agg([pl.sum("v1").alias("v1_sum"), pl.sum("v2").alias("v2_sum"), pl.sum("v3").alias("v3_sum")]).collect(streaming=STREAMING) +ans = x.groupby("id6").agg([pl.sum("v1").alias("v1_sum"), pl.sum("v2").alias("v2_sum"), pl.sum("v3").alias("v3_sum")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -158,7 +157,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id6").agg([pl.sum("v1").alias("v1_sum"), pl.sum("v2").alias("v2_sum"), pl.sum("v3").alias("v3_sum")]).collect(streaming=STREAMING) +ans = x.groupby("id6").agg([pl.sum("v1").alias("v1_sum"), pl.sum("v2").alias("v2_sum"), pl.sum("v3").alias("v3_sum")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -199,7 +198,7 @@ question = "max v1 - min v2 by id3" # q7 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id3").agg([(pl.max("v1") - pl.min("v2")).alias("range_v1_v2")]).collect(streaming=STREAMING) +ans = x.groupby("id3").agg([(pl.max("v1") - pl.min("v2")).alias("range_v1_v2")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -210,7 +209,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id3").agg([(pl.max("v1") - pl.min("v2")).alias("range_v1_v2")]).collect(streaming=STREAMING) +ans = x.groupby("id3").agg([(pl.max("v1") - pl.min("v2")).alias("range_v1_v2")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -251,7 +250,7 @@ question = "regression v1 v2 by id2 id4" # q9 gc.collect() t_start = timeit.default_timer() -ans = x.groupby(["id2","id4"]).agg((pl.pearson_corr("v1","v2")**2).alias("r2")).collect(streaming=STREAMING) +ans = x.groupby(["id2","id4"]).agg((pl.pearson_corr("v1","v2")**2).alias("r2")).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -262,7 +261,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby(["id2","id4"]).agg((pl.pearson_corr("v1","v2")**2).alias("r2")).collect(streaming=STREAMING) +ans = x.groupby(["id2","id4"]).agg((pl.pearson_corr("v1","v2")**2).alias("r2")).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -277,7 +276,7 @@ question = "sum v3 count by id1:id6" # q10 gc.collect() t_start = timeit.default_timer() -ans = x.groupby(["id1","id2","id3","id4","id5","id6"]).agg([pl.sum("v3").alias("v3"), pl.count("v1").alias("count")]).collect(streaming=STREAMING) +ans = x.groupby(["id1","id2","id3","id4","id5","id6"]).agg([pl.sum("v3").alias("v3"), pl.count("v1").alias("count")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -288,7 +287,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby(["id1","id2","id3","id4","id5","id6"]).agg([pl.sum("v3").alias("v3"), pl.count("v1").alias("count")]).collect(streaming=STREAMING) +ans = x.groupby(["id1","id2","id3","id4","id5","id6"]).agg([pl.sum("v3").alias("v3"), pl.count("v1").alias("count")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage()