From f6f547111e20b3d10acec6c70e7b24798e0df143 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 7 Dec 2023 11:00:03 -0800 Subject: [PATCH 1/6] Initial pass at local RAPIDS support --- ci/environment-rapids.yml | 11 ++++++++++ ci/environment.yml | 3 +-- tests/tpch/conftest.py | 43 +++++++++++++++++++++++++++------------ 3 files changed, 42 insertions(+), 15 deletions(-) create mode 100644 ci/environment-rapids.yml diff --git a/ci/environment-rapids.yml b/ci/environment-rapids.yml new file mode 100644 index 0000000000..b35eb0fd05 --- /dev/null +++ b/ci/environment-rapids.yml @@ -0,0 +1,11 @@ +# This is an addition to ci/environment.yml. +# Add cudf and downgrade some pinned dependencies. +channels: + - rapidsai-nightly + - conda-forge + - nvidia +dependencies: + - dask-cudf =24.02 + - dask-cuda =24.02 + - pandas ==1.5.3 # pinned by cudf + - pynvml ==11.4.1 # pinned by dask-cuda diff --git a/ci/environment.yml b/ci/environment.yml index 5a677721c7..980a13c7b0 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -8,14 +8,13 @@ dependencies: # - AB_environments/AB_sample.conda.yaml ######################################################## - - python >=3.9 + - python >=3.9,<3.11 - pip - coiled >=0.2.54 - numpy ==1.24.4 - pandas ==2.1.1 - dask ==2023.10.1 - distributed ==2023.10.1 - - dask-expr ==0.1.11 - dask-labextension ==7.0.0 - dask-ml ==2023.3.24 - fsspec ==2023.9.1 diff --git a/tests/tpch/conftest.py b/tests/tpch/conftest.py index 79869b7790..f500940035 100644 --- a/tests/tpch/conftest.py +++ b/tests/tpch/conftest.py @@ -17,6 +17,7 @@ def pytest_addoption(parser): parser.addoption("--local", action="store_true", default=False, help="") + parser.addoption("--rapids", action="store_true", default=False, help="") parser.addoption("--cloud", action="store_false", dest="local", help="") parser.addoption("--restart", action="store_true", default=True, help="") parser.addoption("--no-restart", action="store_false", dest="restart", help="") @@ -48,6 +49,11 @@ def local(request): return request.config.getoption("local") +@pytest.fixture(scope="session") +def rapids(request): + return request.config.getoption("rapids") + + @pytest.fixture(scope="session") def restart(request): return request.config.getoption("restart") @@ -64,7 +70,7 @@ def dataset_path(local, scale): local_paths = { 1: "./tpch-data/scale-1/", 10: "./tpch-data/scale-10/", - 100: "./tpch-data/scale-100/", + 100: "/raid/charlesb/tpc-h/sf100/repartitioned/", } if local: @@ -186,6 +192,7 @@ def cluster_spec(scale): @pytest.fixture(scope="module") def cluster( local, + rapids, scale, module, dask_env_variables, @@ -195,19 +202,29 @@ def cluster( make_chart, ): if local: - with LocalCluster() as cluster: - yield cluster - else: - kwargs = dict( - name=f"tpch-{module}-{scale}-{name}", - environ=dask_env_variables, - tags=github_cluster_tags, - region="us-east-2", - **cluster_spec, - ) - with dask.config.set({"distributed.scheduler.worker-saturation": "inf"}): - with coiled.Cluster(**kwargs) as cluster: + if not rapids: + with LocalCluster() as cluster: yield cluster + else: + from dask_cuda import LocalCUDACluster + + with dask.config.set({"dataframe.backend": "cudf", "dataframe.shuffle.method": "tasks"}): + with LocalCUDACluster() as cluster: + yield cluster + else: + if not rapids: + kwargs = dict( + name=f"tpch-{module}-{scale}-{name}", + environ=dask_env_variables, + tags=github_cluster_tags, + region="us-east-2", + **cluster_spec, + ) + with dask.config.set({"distributed.scheduler.worker-saturation": "inf"}): + with coiled.Cluster(**kwargs) as cluster: + yield cluster + else: + raise NotImplementedError("RAPIDS on Coiled not yet supported") @pytest.fixture From 2e029c081993b1d58cfe6e1e5f870f626e8a5672 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 7 Dec 2023 11:02:44 -0800 Subject: [PATCH 2/6] Linting --- tests/tpch/conftest.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/tpch/conftest.py b/tests/tpch/conftest.py index f500940035..ae83be9e6a 100644 --- a/tests/tpch/conftest.py +++ b/tests/tpch/conftest.py @@ -208,7 +208,9 @@ def cluster( else: from dask_cuda import LocalCUDACluster - with dask.config.set({"dataframe.backend": "cudf", "dataframe.shuffle.method": "tasks"}): + with dask.config.set( + {"dataframe.backend": "cudf", "dataframe.shuffle.method": "tasks"} + ): with LocalCUDACluster() as cluster: yield cluster else: From 299e2a6a93a3e18fd4d72c9d2c9cefa2b069f311 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 7 Dec 2023 11:07:37 -0800 Subject: [PATCH 3/6] Revert local dir change --- tests/tpch/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tpch/conftest.py b/tests/tpch/conftest.py index ae83be9e6a..b91bb62680 100644 --- a/tests/tpch/conftest.py +++ b/tests/tpch/conftest.py @@ -70,7 +70,7 @@ def dataset_path(local, scale): local_paths = { 1: "./tpch-data/scale-1/", 10: "./tpch-data/scale-10/", - 100: "/raid/charlesb/tpc-h/sf100/repartitioned/", + 100: "./tpch-data/scale-100/", } if local: From d81a3a4bc2b3f51d954a10ec671f505928df17dc Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 7 Dec 2023 11:49:56 -0800 Subject: [PATCH 4/6] Use basic CUDA cluster config for now --- tests/tpch/conftest.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/tpch/conftest.py b/tests/tpch/conftest.py index b91bb62680..98f18bc45e 100644 --- a/tests/tpch/conftest.py +++ b/tests/tpch/conftest.py @@ -226,7 +226,13 @@ def cluster( with coiled.Cluster(**kwargs) as cluster: yield cluster else: - raise NotImplementedError("RAPIDS on Coiled not yet supported") + from dask_cuda import LocalCUDACluster + + with dask.config.set( + {"dataframe.backend": "cudf", "dataframe.shuffle.method": "tasks"} + ): + with LocalCUDACluster() as cluster: + yield cluster @pytest.fixture From c53c8d0a6efcdc8b76416fd01b46a336fc4972b9 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 7 Dec 2023 11:55:34 -0800 Subject: [PATCH 5/6] Make blocksize configurable --- tests/tpch/test_dask.py | 48 +++++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/tests/tpch/test_dask.py b/tests/tpch/test_dask.py index e2b629c77e..8b00d55f9e 100644 --- a/tests/tpch/test_dask.py +++ b/tests/tpch/test_dask.py @@ -6,10 +6,12 @@ dd = pytest.importorskip("dask_expr") +BLOCKSIZE = "default" + def test_query_1(client, dataset_path, fs): VAR1 = datetime(1998, 9, 2) - lineitem_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs) + lineitem_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs, blocksize=BLOCKSIZE) lineitem_filtered = lineitem_ds[lineitem_ds.l_shipdate <= VAR1] lineitem_filtered["sum_qty"] = lineitem_filtered.l_quantity @@ -50,11 +52,11 @@ def test_query_2(client, dataset_path, fs): var2 = "BRASS" var3 = "EUROPE" - region_ds = dd.read_parquet(dataset_path + "region", filesystem=fs) - nation_filtered = dd.read_parquet(dataset_path + "nation", filesystem=fs) - supplier_filtered = dd.read_parquet(dataset_path + "supplier", filesystem=fs) - part_filtered = dd.read_parquet(dataset_path + "part", filesystem=fs) - partsupp_filtered = dd.read_parquet(dataset_path + "partsupp", filesystem=fs) + region_ds = dd.read_parquet(dataset_path + "region", filesystem=fs, blocksize=BLOCKSIZE) + nation_filtered = dd.read_parquet(dataset_path + "nation", filesystem=fs, blocksize=BLOCKSIZE) + supplier_filtered = dd.read_parquet(dataset_path + "supplier", filesystem=fs, blocksize=BLOCKSIZE) + part_filtered = dd.read_parquet(dataset_path + "part", filesystem=fs, blocksize=BLOCKSIZE) + partsupp_filtered = dd.read_parquet(dataset_path + "partsupp", filesystem=fs, blocksize=BLOCKSIZE) region_filtered = region_ds[(region_ds["r_name"] == var3)] r_n_merged = nation_filtered.merge( @@ -118,9 +120,9 @@ def test_query_3(client, dataset_path, fs): var1 = datetime.strptime("1995-03-15", "%Y-%m-%d") var2 = "BUILDING" - lineitem_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs) - orders_ds = dd.read_parquet(dataset_path + "orders", filesystem=fs) - cutomer_ds = dd.read_parquet(dataset_path + "customer", filesystem=fs) + lineitem_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs, blocksize=BLOCKSIZE) + orders_ds = dd.read_parquet(dataset_path + "orders", filesystem=fs, blocksize=BLOCKSIZE) + cutomer_ds = dd.read_parquet(dataset_path + "customer", filesystem=fs, blocksize=BLOCKSIZE) lsel = lineitem_ds.l_shipdate > var1 osel = orders_ds.o_orderdate < var1 @@ -144,8 +146,8 @@ def test_query_4(client, dataset_path, fs): date1 = datetime.strptime("1993-10-01", "%Y-%m-%d") date2 = datetime.strptime("1993-07-01", "%Y-%m-%d") - line_item_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs) - orders_ds = dd.read_parquet(dataset_path + "orders", filesystem=fs) + line_item_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs, blocksize=BLOCKSIZE) + orders_ds = dd.read_parquet(dataset_path + "orders", filesystem=fs, blocksize=BLOCKSIZE) lsel = line_item_ds.l_commitdate < line_item_ds.l_receiptdate osel = (orders_ds.o_orderdate < date1) & (orders_ds.o_orderdate >= date2) @@ -168,12 +170,12 @@ def test_query_5(client, dataset_path, fs): date1 = datetime.strptime("1994-01-01", "%Y-%m-%d") date2 = datetime.strptime("1995-01-01", "%Y-%m-%d") - region_ds = dd.read_parquet(dataset_path + "region", filesystem=fs) - nation_ds = dd.read_parquet(dataset_path + "nation", filesystem=fs) - customer_ds = dd.read_parquet(dataset_path + "customer", filesystem=fs) - line_item_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs) - orders_ds = dd.read_parquet(dataset_path + "orders", filesystem=fs) - supplier_ds = dd.read_parquet(dataset_path + "supplier", filesystem=fs) + region_ds = dd.read_parquet(dataset_path + "region", filesystem=fs, blocksize=BLOCKSIZE) + nation_ds = dd.read_parquet(dataset_path + "nation", filesystem=fs, blocksize=BLOCKSIZE) + customer_ds = dd.read_parquet(dataset_path + "customer", filesystem=fs, blocksize=BLOCKSIZE) + line_item_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs, blocksize=BLOCKSIZE) + orders_ds = dd.read_parquet(dataset_path + "orders", filesystem=fs, blocksize=BLOCKSIZE) + supplier_ds = dd.read_parquet(dataset_path + "supplier", filesystem=fs, blocksize=BLOCKSIZE) rsel = region_ds.r_name == "ASIA" osel = (orders_ds.o_orderdate >= date1) & (orders_ds.o_orderdate < date2) @@ -198,7 +200,7 @@ def test_query_6(client, dataset_path, fs): date2 = datetime.strptime("1995-01-01", "%Y-%m-%d") var3 = 24 - line_item_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs) + line_item_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs, blocksize=BLOCKSIZE) sel = ( (line_item_ds.l_shipdate >= date1) @@ -217,11 +219,11 @@ def test_query_7(client, dataset_path, fs): var1 = datetime.strptime("1995-01-01", "%Y-%m-%d") var2 = datetime.strptime("1997-01-01", "%Y-%m-%d") - nation_ds = dd.read_parquet(dataset_path + "nation", filesystem=fs) - customer_ds = dd.read_parquet(dataset_path + "customer", filesystem=fs) - line_item_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs) - orders_ds = dd.read_parquet(dataset_path + "orders", filesystem=fs) - supplier_ds = dd.read_parquet(dataset_path + "supplier", filesystem=fs) + nation_ds = dd.read_parquet(dataset_path + "nation", filesystem=fs, blocksize=BLOCKSIZE) + customer_ds = dd.read_parquet(dataset_path + "customer", filesystem=fs, blocksize=BLOCKSIZE) + line_item_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs, blocksize=BLOCKSIZE) + orders_ds = dd.read_parquet(dataset_path + "orders", filesystem=fs, blocksize=BLOCKSIZE) + supplier_ds = dd.read_parquet(dataset_path + "supplier", filesystem=fs, blocksize=BLOCKSIZE) lineitem_filtered = line_item_ds[ (line_item_ds["l_shipdate"] >= var1) & (line_item_ds["l_shipdate"] < var2) From e87f3d140d760ecd3bc8fb2c436a3b692f7129a2 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 7 Dec 2023 11:56:24 -0800 Subject: [PATCH 6/6] Init RMM pool in CUDA cluster --- tests/tpch/conftest.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/tpch/conftest.py b/tests/tpch/conftest.py index 98f18bc45e..c4fd14a101 100644 --- a/tests/tpch/conftest.py +++ b/tests/tpch/conftest.py @@ -211,7 +211,7 @@ def cluster( with dask.config.set( {"dataframe.backend": "cudf", "dataframe.shuffle.method": "tasks"} ): - with LocalCUDACluster() as cluster: + with LocalCUDACluster(rmm_pool_size="24GB") as cluster: yield cluster else: if not rapids: @@ -226,12 +226,13 @@ def cluster( with coiled.Cluster(**kwargs) as cluster: yield cluster else: + # should be using Coiled for this from dask_cuda import LocalCUDACluster with dask.config.set( {"dataframe.backend": "cudf", "dataframe.shuffle.method": "tasks"} ): - with LocalCUDACluster() as cluster: + with LocalCUDACluster(rmm_pool_size="24GB") as cluster: yield cluster