Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple deltalake benchmark #911

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 24 additions & 23 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,30 @@ jobs:
matrix:
os: [ubuntu-latest]
python-version: ["3.9"]
pytest_args: [tests]
include:
# Run stability tests on the lowest and highest versions of Python only
# These are temporarily redundant with the current global python-version
# - pytest_args: tests/stability
# python-version: "3.9"
# os: ubuntu-latest
# - pytest_args: tests/stability
# python-version: "3.9"
# os: ubuntu-latest
- pytest_args: tests/stability
python-version: "3.11"
os: ubuntu-latest
- pytest_args: tests/stability
python-version: "3.11"
os: ubuntu-latest
# Run stability tests on Python Windows and MacOS (latest py39 only)
- pytest_args: tests/stability
python-version: "3.9"
os: windows-latest
- pytest_args: tests/stability
python-version: "3.9"
os: macos-latest
# pytest_args: [tests]
pytest_args: [tests/benchmarks/test_deltalake.py]
# include:
# # Run stability tests on the lowest and highest versions of Python only
# # These are temporarily redundant with the current global python-version
# # - pytest_args: tests/stability
# # python-version: "3.9"
# # os: ubuntu-latest
# # - pytest_args: tests/stability
# # python-version: "3.9"
# # os: ubuntu-latest
# - pytest_args: tests/stability
# python-version: "3.11"
# os: ubuntu-latest
# - pytest_args: tests/stability
# python-version: "3.11"
# os: ubuntu-latest
# # Run stability tests on Python Windows and MacOS (latest py39 only)
# - pytest_args: tests/stability
# python-version: "3.9"
# os: windows-latest
# - pytest_args: tests/stability
# python-version: "3.9"
# os: macos-latest

steps:
- name: Checkout
Expand Down
4 changes: 3 additions & 1 deletion ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ dependencies:
- gilknocker ==0.4.1
- openssl >1.1.0g
- pyopenssl ==22.1.0 # Pinned by snowflake-connector-python
- cryptography ==38.0.4 # Pinned by snowflake-connector-python
- cryptography ==38.0.4 # Pinned by snowflake-connector-python
- pip:
- git+https://github.com/dask-contrib/dask-deltatable.git # TODO: link to release version
136 changes: 136 additions & 0 deletions tests/benchmarks/test_deltalake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import os

import dask.dataframe as dd
import dask_deltatable as ddt
import pytest

DATASETS = {
"0.5 GB": "s3://coiled-datasets/h2o-delta/N_1e7_K_1e2/",
"5 GB": "s3://coiled-datasets/h2o-delta/N_1e8_K_1e2/",
}

enabled_datasets = os.getenv("H2O_DELTA_DATASETS")
if enabled_datasets is not None:
enabled_datasets = {k.strip() for k in enabled_datasets.split(",")}
if unknown_datasets := enabled_datasets - DATASETS.keys():
raise ValueError("Unknown h2o-delta dataset(s): ", unknown_datasets)
else:
enabled_datasets = {
"0.5 GB",
"5 GB",
}


@pytest.fixture(params=list(DATASETS))
def uri(request):
if request.param not in enabled_datasets:
raise pytest.skip(
"Disabled by default config or H2O_DELTA_DATASETS env variable"
)
return DATASETS[request.param]


@pytest.fixture(params=["read_deltalake", "read_parquet"])
def ddf(request, small_client, uri):
if request.param == "read_deltalake":
delta_storage_options = {
"AWS_REGION": "us-east-2",
"AWS_ACCESS_KEY_ID": os.environ["AWS_ACCESS_KEY_ID"],
"AWS_SECRET_ACCESS_KEY": os.environ["AWS_SECRET_ACCESS_KEY"],
}
yield ddt.read_deltalake(uri, delta_storage_options=delta_storage_options)
else:
yield dd.read_parquet(
f"{uri}*/*.parquet", engine="pyarrow", storage_options={"anon": "true"}
)


def test_q1(ddf):
ddf = ddf[["id1", "v2"]]
ddf.groupby("id1", dropna=False, observed=True).agg({"v2": "sum"}).compute()


# def test_q2(ddf):
# ddf = ddf[["id1", "id2", "v1"]]
# (
# ddf.groupby(["id1", "id2"], dropna=False, observed=True)
# .agg({"v1": "sum"})
# .compute()
# )
#
#
# def test_q3(ddf):
# ddf = ddf[["id3", "v1", "v3"]]
# (
# ddf.groupby("id3", dropna=False, observed=True)
# .agg({"v1": "sum", "v3": "mean"})
# .compute()
# )
#
#
# def test_q4(ddf):
# ddf = ddf[["id4", "v1", "v2", "v3"]]
# (
# ddf.groupby("id4", dropna=False, observed=True)
# .agg({"v1": "mean", "v2": "mean", "v3": "mean"})
# .compute()
# )
#
#
# def test_q5(ddf):
# ddf = ddf[["id6", "v1", "v2", "v3"]]
# (
# ddf.groupby("id6", dropna=False, observed=True)
# .agg(
# {"v1": "sum", "v2": "sum", "v3": "sum"},
# )
# .compute()
# )
#
#
# def test_q6(ddf, shuffle_method):
# # Median aggregation uses an explicitly-set shuffle
# ddf = ddf[["id4", "id5", "v3"]]
# (
# ddf.groupby(["id4", "id5"], dropna=False, observed=True)
# .agg({"v3": ["median", "std"]}, shuffle=shuffle_method)
# .compute() # requires shuffle arg to be set explicitly
# )
#
#
# def test_q7(ddf):
# ddf = ddf[["id3", "v1", "v2"]]
# (
# ddf.groupby("id3", dropna=False, observed=True)
# .agg({"v1": "max", "v2": "min"})
# .assign(range_v1_v2=lambda x: x["v1"] - x["v2"])[["range_v1_v2"]]
# .compute()
# )
#
#
# def test_q8(ddf, configure_shuffling):
# # .groupby(...).apply(...) uses a shuffle to transfer data before applying the function
# ddf = ddf[["id6", "v1", "v2", "v3"]]
# (
# ddf[~ddf["v3"].isna()][["id6", "v3"]]
# .groupby("id6", dropna=False, observed=True)
# .apply(
# lambda x: x.nlargest(2, columns="v3"),
# meta={"id6": "Int64", "v3": "float64"},
# )[["v3"]]
# .compute()
# )
#
#
# def test_q9(ddf, configure_shuffling):
# # .groupby(...).apply(...) uses a shuffle to transfer data before applying the function
# ddf = ddf[["id2", "id4", "v1", "v2"]]
# (
# ddf[["id2", "id4", "v1", "v2"]]
# .groupby(["id2", "id4"], dropna=False, observed=True)
# .apply(
# lambda x: pd.Series({"r2": x.corr(numeric_only=True)["v1"]["v2"] ** 2}),
# meta={"r2": "float64"},
# )
# .compute()
# )