From 3a7608007a18b4ef191d70033fbd2d6bcdabff30 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 20 Mar 2023 21:28:57 -0500 Subject: [PATCH 01/31] Add matplotlib arxiv workflow --- cluster_kwargs.yaml | 6 ++ tests/conftest.py | 1 + .../workflows/test_embarrassingly_parallel.py | 89 +++++++++++++++++++ 3 files changed, 96 insertions(+) create mode 100644 tests/workflows/test_embarrassingly_parallel.py diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index 1b97fa00a3..f09cb691b9 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -31,6 +31,12 @@ parquet_cluster: n_workers: 15 worker_vm_types: [m5.xlarge] # 4 CPU, 16 GiB +# For tests/workflows/test_embarrassingly_parallel.py +embarrassingly_parallel_cluster: + n_workers: 100 + backend_options: + region: "us-east-1" # Same region as dataset + # For test_spill.py spill_cluster: n_workers: 5 diff --git a/tests/conftest.py b/tests/conftest.py index cef605a440..6cd685d230 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -530,6 +530,7 @@ def s3(): return s3fs.S3FileSystem( key=os.environ.get("AWS_ACCESS_KEY_ID"), secret=os.environ.get("AWS_SECRET_ACCESS_KEY"), + requester_pays=True, ) diff --git a/tests/workflows/test_embarrassingly_parallel.py b/tests/workflows/test_embarrassingly_parallel.py new file mode 100644 index 0000000000..11ed8dfd95 --- /dev/null +++ b/tests/workflows/test_embarrassingly_parallel.py @@ -0,0 +1,89 @@ +import io +import tarfile +import uuid + +import coiled +import pandas as pd +import pytest +from dask.distributed import Client, wait + + +@pytest.fixture(scope="module") +def embarrassingly_parallel_cluster( + dask_env_variables, + cluster_kwargs, + gitlab_cluster_tags, +): + with coiled.Cluster( + f"embarrassingly-parallel-{uuid.uuid4().hex[:8]}", + environ=dask_env_variables, + tags=gitlab_cluster_tags, + **cluster_kwargs["embarrassingly_parallel_cluster"], + ) as cluster: + yield cluster + + +@pytest.fixture +def embarrassingly_parallel_client( + embarrassingly_parallel_cluster, + cluster_kwargs, + upload_cluster_dump, + benchmark_all, +): + n_workers = cluster_kwargs["embarrassingly_parallel_cluster"]["n_workers"] + with Client(embarrassingly_parallel_cluster) as client: + print(f"{client.dashboard_link = }") + embarrassingly_parallel_cluster.scale(n_workers) + client.wait_for_workers(n_workers) + client.restart() + with upload_cluster_dump(client), benchmark_all(client): + yield client + + +def test_embarassingly_parallel(embarrassingly_parallel_client, s3): + # How popular is matplotlib? + directories = s3.ls("s3://arxiv/pdf") + + def extract(filename: str, fs): + """Extract and process one directory of arXiv data + + Returns + ------- + filename: str + contains_matplotlib: boolean + """ + out = [] + with fs.open(filename) as f: + bytes_ = f.read() + with io.BytesIO() as bio: + bio.write(bytes_) + bio.seek(0) + with tarfile.TarFile(fileobj=bio) as tf: + for member in tf.getmembers(): + if member.isfile() and member.name.endswith(".pdf"): + data = tf.extractfile(member).read() + out.append((member.name, b"matplotlib" in data.lower())) + return out + + futures = embarrassingly_parallel_client.map(extract, directories, fs=s3) + wait(futures) + # We had one error in one file. Let's just ignore and move on. + good = [future for future in futures if future.status == "finished"] + data = embarrassingly_parallel_client.gather(good) + + # Convert to Pandas + dfs = [pd.DataFrame(d, columns=["filename", "has_matplotlib"]) for d in data] + df = pd.concat(dfs) + + def filename_to_date(filename): + year = int(filename.split("/")[0][:2]) + month = int(filename.split("/")[0][2:4]) + if year > 80: + year = 1900 + year + else: + year = 2000 + year + + return pd.Timestamp(year=year, month=month, day=1) + + df["date"] = df.filename.map(filename_to_date) + df.groupby("date").has_matplotlib.mean() From 5aedaca7c47bcb23f7618a8b966484fb82091281 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 20 Mar 2023 22:21:42 -0500 Subject: [PATCH 02/31] Remove stray print --- tests/workflows/test_embarrassingly_parallel.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/workflows/test_embarrassingly_parallel.py b/tests/workflows/test_embarrassingly_parallel.py index 11ed8dfd95..de2f263c01 100644 --- a/tests/workflows/test_embarrassingly_parallel.py +++ b/tests/workflows/test_embarrassingly_parallel.py @@ -32,7 +32,6 @@ def embarrassingly_parallel_client( ): n_workers = cluster_kwargs["embarrassingly_parallel_cluster"]["n_workers"] with Client(embarrassingly_parallel_cluster) as client: - print(f"{client.dashboard_link = }") embarrassingly_parallel_cluster.scale(n_workers) client.wait_for_workers(n_workers) client.restart() From 95585d42656525abf65e0a9f048806cef3268621 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 20 Mar 2023 22:22:59 -0500 Subject: [PATCH 03/31] Update fixture name --- tests/workflows/test_embarrassingly_parallel.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/workflows/test_embarrassingly_parallel.py b/tests/workflows/test_embarrassingly_parallel.py index de2f263c01..be19de4595 100644 --- a/tests/workflows/test_embarrassingly_parallel.py +++ b/tests/workflows/test_embarrassingly_parallel.py @@ -12,12 +12,12 @@ def embarrassingly_parallel_cluster( dask_env_variables, cluster_kwargs, - gitlab_cluster_tags, + github_cluster_tags, ): with coiled.Cluster( f"embarrassingly-parallel-{uuid.uuid4().hex[:8]}", environ=dask_env_variables, - tags=gitlab_cluster_tags, + tags=github_cluster_tags, **cluster_kwargs["embarrassingly_parallel_cluster"], ) as cluster: yield cluster From 10a529b634e9d1f1989d68373a295931795084bb Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Tue, 21 Mar 2023 12:43:11 -0500 Subject: [PATCH 04/31] Only use requester_pays for test_embarassingly_parallel --- tests/conftest.py | 17 +++++++++++------ tests/workflows/test_embarrassingly_parallel.py | 3 ++- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 7259548068..c9686b46a7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -526,12 +526,17 @@ def s3_storage_options(): @pytest.fixture(scope="session") -def s3(): - return s3fs.S3FileSystem( - key=os.environ.get("AWS_ACCESS_KEY_ID"), - secret=os.environ.get("AWS_SECRET_ACCESS_KEY"), - requester_pays=True, - ) +def s3(s3_storage_options): + return s3fs.S3FileSystem(**s3_storage_options) + + +@pytest.fixture +def s3_factory(s3_storage_options): + def _(**exta_options): + kwargs = {**s3_storage_options, **exta_options} + return s3fs.S3FileSystem(**kwargs) + + return _ @pytest.fixture(scope="session") diff --git a/tests/workflows/test_embarrassingly_parallel.py b/tests/workflows/test_embarrassingly_parallel.py index be19de4595..82a6a09cdf 100644 --- a/tests/workflows/test_embarrassingly_parallel.py +++ b/tests/workflows/test_embarrassingly_parallel.py @@ -39,8 +39,9 @@ def embarrassingly_parallel_client( yield client -def test_embarassingly_parallel(embarrassingly_parallel_client, s3): +def test_embarassingly_parallel(embarrassingly_parallel_client, s3_factory): # How popular is matplotlib? + s3 = s3_factory(requester_pays=True) directories = s3.ls("s3://arxiv/pdf") def extract(filename: str, fs): From 4504020bd8523dcd74df4f3b53e091fe868a2e7d Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 22 Mar 2023 16:00:09 -0500 Subject: [PATCH 05/31] Rerun CI From 96e66f5282a78b5b1463057abeec590ad1def9f0 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 23 Mar 2023 13:23:29 -0500 Subject: [PATCH 06/31] Update instance type --- cluster_kwargs.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index f09cb691b9..f4a5d1a3dc 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -34,6 +34,9 @@ parquet_cluster: # For tests/workflows/test_embarrassingly_parallel.py embarrassingly_parallel_cluster: n_workers: 100 + # TODO: Remove the `m6i.xlarge` worker specification below + # once it's the default worker instance type + worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB backend_options: region: "us-east-1" # Same region as dataset From 0779b48c9b30043e743fa0862300b0ab5bc813d4 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 23 Mar 2023 14:25:23 -0500 Subject: [PATCH 07/31] Run workflows on demand and during nightly cron job --- .github/workflows/tests.yml | 13 ++++++++++++- tests/conftest.py | 11 +++++++---- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7f4704a18e..973f7f8ab3 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -90,6 +90,17 @@ jobs: python ci/create_runtime_meta.py source ci/scripts/install_coiled_runtime.sh coiled_software_environment.yaml + - name: Determine if workflows should be run + # Run workflows on PRs with `workflows` label and nightly cron job + if: | + github.event_name == 'schedule' + || (github.event_name == 'pull_request' && contains(github.event.pull_request.labels.*.name, 'workflows')) + run: | + # Put EXTRA_OPTIONS into $GITHUB_ENV so it can be used in subsequent workflow steps + export EXTRA_OPTIONS="--run-workflows" + echo $EXTRA_OPTIONS + echo EXTRA_OPTIONS=$EXTRA_OPTIONS >> $GITHUB_ENV + - name: Run Coiled Runtime Tests id: test env: @@ -100,7 +111,7 @@ jobs: DB_NAME: ${{ matrix.os }}-${{ matrix.runtime-version }}-py${{ matrix.python-version }}.db BENCHMARK: true CLUSTER_DUMP: always - run: bash ci/scripts/run_tests.sh -n 4 --dist loadscope ${{ matrix.pytest_args }} + run: bash ci/scripts/run_tests.sh -n 4 --dist loadscope ${{ env.EXTRA_OPTIONS }} ${{ matrix.pytest_args }} - name: Dump coiled.Cluster kwargs run: cat cluster_kwargs.merged.yaml diff --git a/tests/conftest.py b/tests/conftest.py index c9686b46a7..f2ccad68ec 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -57,16 +57,19 @@ def pytest_addoption(parser): parser.addoption( "--benchmark", action="store_true", help="Collect benchmarking data for tests" ) + parser.addoption("--run-workflows", action="store_true", help="Run workflow tests") def pytest_collection_modifyitems(config, items): - if config.getoption("--run-latest"): - # --run-latest given in cli: do not skip latest coiled-runtime tests - return skip_latest = pytest.mark.skip(reason="need --run-latest option to run") + skip_workflows = pytest.mark.skip(reason="need --run-workflows option to run") for item in items: - if "latest_runtime" in item.keywords: + if not config.getoption("--run-latest") and "latest_runtime" in item.keywords: item.add_marker(skip_latest) + if not config.getoption("--run-workflows") and ( + (TEST_DIR / "workflows") in item.path.parents + ): + item.add_marker(skip_workflows) def get_coiled_runtime_version(): From 7fb67925a2b68849422b1edfcdc79aad5833aaa8 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 27 Mar 2023 12:29:07 -0500 Subject: [PATCH 08/31] Use specific range of years --- tests/workflows/test_embarrassingly_parallel.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/workflows/test_embarrassingly_parallel.py b/tests/workflows/test_embarrassingly_parallel.py index 82a6a09cdf..565ab9cc19 100644 --- a/tests/workflows/test_embarrassingly_parallel.py +++ b/tests/workflows/test_embarrassingly_parallel.py @@ -44,6 +44,15 @@ def test_embarassingly_parallel(embarrassingly_parallel_client, s3_factory): s3 = s3_factory(requester_pays=True) directories = s3.ls("s3://arxiv/pdf") + # We only analyze files from 1991-2022 here in order to have a consistent data volume. + # This is benchmarking purposes only, as this dataset is updated monthly. + years = list(range(91, 100)) + list(range(23)) + directories = [ + d + for d in directories + if d.endswith(".tar") and int(d.split("_")[2][:2]) in years + ] + def extract(filename: str, fs): """Extract and process one directory of arXiv data From e4851df7402efe455cbd3c234602937dd20e2918 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 27 Mar 2023 14:54:47 -0500 Subject: [PATCH 09/31] Light asserts --- tests/workflows/test_embarrassingly_parallel.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/workflows/test_embarrassingly_parallel.py b/tests/workflows/test_embarrassingly_parallel.py index 565ab9cc19..444f412b1e 100644 --- a/tests/workflows/test_embarrassingly_parallel.py +++ b/tests/workflows/test_embarrassingly_parallel.py @@ -95,4 +95,9 @@ def filename_to_date(filename): return pd.Timestamp(year=year, month=month, day=1) df["date"] = df.filename.map(filename_to_date) - df.groupby("date").has_matplotlib.mean() + result = df.groupby("date").has_matplotlib.mean() + # Some light validation to ensure results are consistent. + # This is only for benchmarking. + assert result.idxmin() == pd.Timestamp("1991-07-01") # Earliest timestamp + assert result.idxmax() == pd.Timestamp("2022-10-01") # Row with maximum value + assert result.ne(0).idxmax() == pd.Timestamp("2005-06-01") # First non-zero row From 26578859a200556816d7fa8b5f5d49061fece367 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 28 Mar 2023 11:19:59 -0500 Subject: [PATCH 10/31] add workflow --- cluster_kwargs.yaml | 9 ++ tests/workflows/test_from_csv_to_parquet.py | 127 ++++++++++++++++++++ 2 files changed, 136 insertions(+) create mode 100644 tests/workflows/test_from_csv_to_parquet.py diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index f4a5d1a3dc..5668aa64e4 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -58,3 +58,12 @@ test_work_stealing_on_straggling_worker: test_repeated_merge_spill: n_workers: 20 worker_vm_types: [m6i.large] + +# For tests/workflows/test_from_csv_to_parquet.py +from_csv_to_parquet_cluster: + n_workers: 5 + # TODO: Remove the `m6i.xlarge` worker specification below + # once it's the default worker instance type + worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB + backend_options: + region: "us-east-1" # Same region as dataset diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py new file mode 100644 index 0000000000..1b787ffb93 --- /dev/null +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -0,0 +1,127 @@ +import os +import uuid + +import coiled +import dask.dataframe as dd +import pytest +from distributed import Client, LocalCluster, wait # noqa + +LOCAL_RUN = os.environ.get("LOCAL_WORKFLOW_RUN") + + +@pytest.fixture(scope="module") +def from_csv_to_parquet_cluster( + dask_env_variables, + cluster_kwargs, + github_cluster_tags, +): + if LOCAL_RUN is not None: + with LocalCluster() as cluster: + yield cluster + else: + with coiled.Cluster( + f"from-csv-to-parquet-{uuid.uuid4().hex[:8]}", + environ=dask_env_variables, + tags=github_cluster_tags, + **cluster_kwargs["from_csv_to_parquet_cluster"], + ) as cluster: + yield cluster + + +@pytest.fixture +def from_csv_to_parquet_client( + from_csv_to_parquet_cluster, + cluster_kwargs, + upload_cluster_dump, + benchmark_all, +): + if LOCAL_RUN is not None: + with Client(from_csv_to_parquet_cluster) as client: + yield client + else: + n_workers = cluster_kwargs["from_csv_to_parquet_cluster"]["n_workers"] + with Client(from_csv_to_parquet_cluster) as client: + from_csv_to_parquet_cluster.scale(n_workers) + client.wait_for_workers(n_workers) + client.restart() + with upload_cluster_dump(client), benchmark_all(client): + yield client + + +COLUMNSV1 = { + "GlobalEventID": "Int64", + "Day": "Int64", + "MonthYear": "Int64", + "Year": "Int64", + "FractionDate": "Float64", + "Actor1Code": "string[pyarrow]", + "Actor1Name": "string[pyarrow]", + "Actor1CountryCode": "string[pyarrow]", + "Actor1KnownGroupCode": "string[pyarrow]", + "Actor1EthnicCode": "string[pyarrow]", + "Actor1Religion1Code": "string[pyarrow]", + "Actor1Religion2Code": "string[pyarrow]", + "Actor1Type1Code": "string[pyarrow]", + "Actor1Type2Code": "string[pyarrow]", + "Actor1Type3Code": "string[pyarrow]", + "Actor2Code": "string[pyarrow]", + "Actor2Name": "string[pyarrow]", + "Actor2CountryCode": "string[pyarrow]", + "Actor2KnownGroupCode": "string[pyarrow]", + "Actor2EthnicCode": "string[pyarrow]", + "Actor2Religion1Code": "string[pyarrow]", + "Actor2Religion2Code": "string[pyarrow]", + "Actor2Type1Code": "string[pyarrow]", + "Actor2Type2Code": "string[pyarrow]", + "Actor2Type3Code": "string[pyarrow]", + "IsRootEvent": "Int64", + "EventCode": "string[pyarrow]", + "EventBaseCode": "string[pyarrow]", + "EventRootCode": "string[pyarrow]", + "QuadClass": "Int64", + "GoldsteinScale": "Float64", + "NumMentions": "Int64", + "NumSources": "Int64", + "NumArticles": "Int64", + "AvgTone": "Float64", + "Actor1Geo_Type": "Int64", + "Actor1Geo_Fullname": "string[pyarrow]", + "Actor1Geo_CountryCode": "string[pyarrow]", + "Actor1Geo_ADM1Code": "string[pyarrow]", + "Actor1Geo_Lat": "Float64", + "Actor1Geo_Long": "Float64", + "Actor1Geo_FeatureID": "string[pyarrow]", + "Actor2Geo_Type": "Int64", + "Actor2Geo_Fullname": "string[pyarrow]", + "Actor2Geo_CountryCode": "string[pyarrow]", + "Actor2Geo_ADM1Code": "string[pyarrow]", + "Actor2Geo_Lat": "Float64", + "Actor2Geo_Long": "Float64", + "Actor2Geo_FeatureID": "string[pyarrow]", + "ActionGeo_Type": "Int64", + "ActionGeo_Fullname": "string[pyarrow]", + "ActionGeo_CountryCode": "string[pyarrow]", + "ActionGeo_ADM1Code": "string[pyarrow]", + "ActionGeo_Lat": "Float64", + "ActionGeo_Long": "Float64", + "ActionGeo_FeatureID": "string[pyarrow]", + "DATEADDED": "Int64", + "SOURCEURL": "string[pyarrow]", +} + + +def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory): + s3 = s3_factory(anon=True) + df = dd.read_csv( + "s3://gdelt-open-data/events/*.csv", + names=COLUMNSV1.keys(), + sep="\t", + dtype=COLUMNSV1, + storage_options=s3.storage_options, + ) + + df = df.partitions[-10:] + + result = from_csv_to_parquet_client.compute(df.GoldsteinScale.mean()) # noqa + print(result) + assert df.GlobalEventID.dtype == "Int64" From 6eb04d6608a67555ec54e26fa58201092ddc5498 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 28 Mar 2023 11:51:56 -0500 Subject: [PATCH 11/31] show something with use of pytest -s --- cluster_kwargs.yaml | 2 +- tests/workflows/test_from_csv_to_parquet.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index 5668aa64e4..b1aee6d1cc 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -64,6 +64,6 @@ from_csv_to_parquet_cluster: n_workers: 5 # TODO: Remove the `m6i.xlarge` worker specification below # once it's the default worker instance type - worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB + worker_vm_types: [t3.medium] # 2CPU, 4GiB backend_options: region: "us-east-1" # Same region as dataset diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 1b787ffb93..629aa805d0 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -122,6 +122,8 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory): df = df.partitions[-10:] - result = from_csv_to_parquet_client.compute(df.GoldsteinScale.mean()) # noqa - print(result) + future = from_csv_to_parquet_client.compute(df.GoldsteinScale.mean()) # noqa + wait(future) + print(future.result()) + assert df.GlobalEventID.dtype == "Int64" From 222c6957c7c884f871269c30f9dca59d98516887 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 28 Mar 2023 11:55:20 -0500 Subject: [PATCH 12/31] rm unnecessary noqa comments --- tests/workflows/test_from_csv_to_parquet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 629aa805d0..8c9ab0d1f2 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -4,7 +4,7 @@ import coiled import dask.dataframe as dd import pytest -from distributed import Client, LocalCluster, wait # noqa +from distributed import Client, LocalCluster, wait LOCAL_RUN = os.environ.get("LOCAL_WORKFLOW_RUN") @@ -122,7 +122,7 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory): df = df.partitions[-10:] - future = from_csv_to_parquet_client.compute(df.GoldsteinScale.mean()) # noqa + future = from_csv_to_parquet_client.compute(df.GoldsteinScale.mean()) wait(future) print(future.result()) From fc4068758bfa429f3c1bacb94e5f5b739cafe392 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 28 Mar 2023 11:59:11 -0500 Subject: [PATCH 13/31] var name --- tests/workflows/test_from_csv_to_parquet.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 8c9ab0d1f2..76d8e309b0 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -6,7 +6,7 @@ import pytest from distributed import Client, LocalCluster, wait -LOCAL_RUN = os.environ.get("LOCAL_WORKFLOW_RUN") +LOCAL_WORKFLOW_RUN = os.environ.get("LOCAL_WORKFLOW_RUN") @pytest.fixture(scope="module") @@ -15,7 +15,7 @@ def from_csv_to_parquet_cluster( cluster_kwargs, github_cluster_tags, ): - if LOCAL_RUN is not None: + if LOCAL_WORKFLOW_RUN is not None: with LocalCluster() as cluster: yield cluster else: @@ -35,7 +35,7 @@ def from_csv_to_parquet_client( upload_cluster_dump, benchmark_all, ): - if LOCAL_RUN is not None: + if LOCAL_WORKFLOW_RUN is not None: with Client(from_csv_to_parquet_cluster) as client: yield client else: From 670e3cc54f0c4de0ff3cc694f8462f865087d3d2 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 28 Mar 2023 13:00:24 -0500 Subject: [PATCH 14/31] adjust tests.yml based on James' suggestion --- .github/workflows/tests.yml | 75 +++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 37 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 973f7f8ab3..6d7adfdef1 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -31,44 +31,45 @@ jobs: matrix: os: [ubuntu-latest] python-version: ["3.9"] - pytest_args: [tests] + # pytest_args: [tests] + pytest_args: [tests/workflows/test_from_csv_to_parquet.py] runtime-version: [upstream, latest, "0.2.1"] - include: - # Run stability tests on Python 3.8 - - pytest_args: tests/stability - python-version: "3.8" - runtime-version: upstream - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.8" - runtime-version: latest - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.8" - runtime-version: "0.2.1" - os: ubuntu-latest - # Run stability tests on Python 3.10 - - pytest_args: tests/stability - python-version: "3.10" - runtime-version: upstream - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.10" - runtime-version: latest - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.10" - runtime-version: "0.2.1" - os: ubuntu-latest - # Run stability tests on Python Windows and MacOS (latest py39 only) - - pytest_args: tests/stability - python-version: "3.9" - runtime-version: latest - os: windows-latest - - pytest_args: tests/stability - python-version: "3.9" - runtime-version: latest - os: macos-latest + # include: + # # Run stability tests on Python 3.8 + # - pytest_args: tests/stability + # python-version: "3.8" + # runtime-version: upstream + # os: ubuntu-latest + # - pytest_args: tests/stability + # python-version: "3.8" + # runtime-version: latest + # os: ubuntu-latest + # - pytest_args: tests/stability + # python-version: "3.8" + # runtime-version: "0.2.1" + # os: ubuntu-latest + # # Run stability tests on Python 3.10 + # - pytest_args: tests/stability + # python-version: "3.10" + # runtime-version: upstream + # os: ubuntu-latest + # - pytest_args: tests/stability + # python-version: "3.10" + # runtime-version: latest + # os: ubuntu-latest + # - pytest_args: tests/stability + # python-version: "3.10" + # runtime-version: "0.2.1" + # os: ubuntu-latest + # # Run stability tests on Python Windows and MacOS (latest py39 only) + # - pytest_args: tests/stability + # python-version: "3.9" + # runtime-version: latest + # os: windows-latest + # - pytest_args: tests/stability + # python-version: "3.9" + # runtime-version: latest + # os: macos-latest steps: - name: Checkout From 16b0277573741da90b47584ba13b0a1f772234b4 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Wed, 29 Mar 2023 18:00:43 -0500 Subject: [PATCH 15/31] write some parquet to s3 --- tests/workflows/test_from_csv_to_parquet.py | 25 ++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 76d8e309b0..9e408dd1ac 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -110,7 +110,11 @@ def from_csv_to_parquet_client( } -def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory): +def drop_dupe_per_partition(df): + return df.drop_duplicates(subset=["SOURCEURL"], keep="first") + + +def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): s3 = s3_factory(anon=True) df = dd.read_csv( "s3://gdelt-open-data/events/*.csv", @@ -121,9 +125,20 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory): ) df = df.partitions[-10:] + df = df.map_partitions(drop_dupe_per_partition) + national_paper = df.SOURCEURL.str.contains("washingtonpost|nytimes", regex=True) + df["national_paper"] = national_paper + df = df[df["national_paper"]] - future = from_csv_to_parquet_client.compute(df.GoldsteinScale.mean()) - wait(future) - print(future.result()) + if LOCAL_WORKFLOW_RUN: + output = "test-output" + else: + output = s3_url + "/from-csv-to-parquet/" - assert df.GlobalEventID.dtype == "Int64" + + to_pq = df.to_parquet(output, compute=False) + + future = from_csv_to_parquet_client.compute(to_pq) + wait(future) + newdf = dd.read_parquet(output) + assert "DATEADDED" in list(newdf.columns) From b557bc541ed588a6374532137dcb64dc6e0c76ec Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 6 Apr 2023 12:14:58 -0500 Subject: [PATCH 16/31] this version actually passes --- tests/workflows/test_from_csv_to_parquet.py | 59 +++++++-------------- 1 file changed, 20 insertions(+), 39 deletions(-) diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 9e408dd1ac..5590eb64ce 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -15,17 +15,13 @@ def from_csv_to_parquet_cluster( cluster_kwargs, github_cluster_tags, ): - if LOCAL_WORKFLOW_RUN is not None: - with LocalCluster() as cluster: - yield cluster - else: - with coiled.Cluster( - f"from-csv-to-parquet-{uuid.uuid4().hex[:8]}", - environ=dask_env_variables, - tags=github_cluster_tags, - **cluster_kwargs["from_csv_to_parquet_cluster"], - ) as cluster: - yield cluster + with coiled.Cluster( + f"from-csv-to-parquet-{uuid.uuid4().hex[:8]}", + environ=dask_env_variables, + tags=github_cluster_tags, + **cluster_kwargs["from_csv_to_parquet_cluster"], + ) as cluster: + yield cluster @pytest.fixture @@ -35,17 +31,13 @@ def from_csv_to_parquet_client( upload_cluster_dump, benchmark_all, ): - if LOCAL_WORKFLOW_RUN is not None: - with Client(from_csv_to_parquet_cluster) as client: + n_workers = cluster_kwargs["from_csv_to_parquet_cluster"]["n_workers"] + with Client(from_csv_to_parquet_cluster) as client: + from_csv_to_parquet_cluster.scale(n_workers) + client.wait_for_workers(n_workers) + client.restart() + with upload_cluster_dump(client), benchmark_all(client): yield client - else: - n_workers = cluster_kwargs["from_csv_to_parquet_cluster"]["n_workers"] - with Client(from_csv_to_parquet_cluster) as client: - from_csv_to_parquet_cluster.scale(n_workers) - client.wait_for_workers(n_workers) - client.restart() - with upload_cluster_dump(client), benchmark_all(client): - yield client COLUMNSV1 = { @@ -110,10 +102,6 @@ def from_csv_to_parquet_client( } -def drop_dupe_per_partition(df): - return df.drop_duplicates(subset=["SOURCEURL"], keep="first") - - def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): s3 = s3_factory(anon=True) df = dd.read_csv( @@ -122,23 +110,16 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): sep="\t", dtype=COLUMNSV1, storage_options=s3.storage_options, + on_bad_lines="skip", ) df = df.partitions[-10:] - df = df.map_partitions(drop_dupe_per_partition) - national_paper = df.SOURCEURL.str.contains("washingtonpost|nytimes", regex=True) - df["national_paper"] = national_paper + df = df.map_partitions( + lambda xdf: xdf.drop_duplicates(subset=["SOURCEURL"], keep="first") + ) + df["national_paper"] = df.SOURCEURL.str.contains("washingtonpost|nytimes", regex=True) df = df[df["national_paper"]] - if LOCAL_WORKFLOW_RUN: - output = "test-output" - else: - output = s3_url + "/from-csv-to-parquet/" - - - to_pq = df.to_parquet(output, compute=False) + output = s3_url + "/from-csv-to-parquet/" - future = from_csv_to_parquet_client.compute(to_pq) - wait(future) - newdf = dd.read_parquet(output) - assert "DATEADDED" in list(newdf.columns) + df.to_parquet(output) From ccedaf8b5aa91e1ff09b6a29559bf1c482aee145 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 6 Apr 2023 12:22:37 -0500 Subject: [PATCH 17/31] check if read works --- tests/workflows/test_from_csv_to_parquet.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 5590eb64ce..fb4e235bad 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -123,3 +123,5 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): output = s3_url + "/from-csv-to-parquet/" df.to_parquet(output) + df = dd.read_parquet(output) + df.compute() From 37120c344042a317c539358d138b08464bb42a69 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Mon, 10 Apr 2023 20:13:54 -0500 Subject: [PATCH 18/31] works with some excluded files --- cluster_kwargs.yaml | 2 +- tests/workflows/test_from_csv_to_parquet.py | 27 +++++++++++++++++---- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index b1aee6d1cc..98c7b2fe21 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -61,7 +61,7 @@ test_repeated_merge_spill: # For tests/workflows/test_from_csv_to_parquet.py from_csv_to_parquet_cluster: - n_workers: 5 + n_workers: 30 # TODO: Remove the `m6i.xlarge` worker specification below # once it's the default worker instance type worker_vm_types: [t3.medium] # 2CPU, 4GiB diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index fb4e235bad..fea804be57 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -104,8 +104,26 @@ def from_csv_to_parquet_client( def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): s3 = s3_factory(anon=True) + + bad_files = [ + "gdelt-open-data/events/20161004.export.csv", + "gdelt-open-data/events/20170106.export.csv", + "gdelt-open-data/events/20170422.export.csv", + "gdelt-open-data/events/20170802.export.csv", + "gdelt-open-data/events/20170920.export.csv", + "gdelt-open-data/events/20171021.export.csv", + "gdelt-open-data/events/20180415.export.csv", + "gdelt-open-data/events/20180416.export.csv", + "gdelt-open-data/events/20180613.export.csv", + "gdelt-open-data/events/20180806.export.csv", + "gdelt-open-data/events/20190217.export.csv", + "gdelt-open-data/events/20190613.export.csv", + ] + files = s3.ls("s3://gdelt-open-data/events/")[120:] + files = [f"s3://{f}" for f in files if f not in bad_files] + df = dd.read_csv( - "s3://gdelt-open-data/events/*.csv", + files, names=COLUMNSV1.keys(), sep="\t", dtype=COLUMNSV1, @@ -113,15 +131,14 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): on_bad_lines="skip", ) - df = df.partitions[-10:] df = df.map_partitions( lambda xdf: xdf.drop_duplicates(subset=["SOURCEURL"], keep="first") ) - df["national_paper"] = df.SOURCEURL.str.contains("washingtonpost|nytimes", regex=True) + df["national_paper"] = df.SOURCEURL.str.contains( + "washingtonpost|nytimes", regex=True + ) df = df[df["national_paper"]] output = s3_url + "/from-csv-to-parquet/" df.to_parquet(output) - df = dd.read_parquet(output) - df.compute() From b3cfbaa136b9a215bf46a9b32f086a8c1e69a198 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Mon, 10 Apr 2023 20:30:04 -0500 Subject: [PATCH 19/31] rm unnecessary line --- tests/workflows/test_from_csv_to_parquet.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index fea804be57..b2c8e861a1 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -6,8 +6,6 @@ import pytest from distributed import Client, LocalCluster, wait -LOCAL_WORKFLOW_RUN = os.environ.get("LOCAL_WORKFLOW_RUN") - @pytest.fixture(scope="module") def from_csv_to_parquet_cluster( From cdca5decdf8863f26e504128ddb18385faf7a2f8 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Wed, 10 May 2023 14:42:29 +0200 Subject: [PATCH 20/31] Refactoring [skip ci] --- cluster_kwargs.yaml | 4 +- tests/workflows/test_from_csv_to_parquet.py | 176 ++++++++++---------- 2 files changed, 88 insertions(+), 92 deletions(-) diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index 34c7accc64..13e6f8f204 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -69,9 +69,7 @@ test_repeated_merge_spill: # For tests/workflows/test_from_csv_to_parquet.py from_csv_to_parquet_cluster: - n_workers: 30 - # TODO: Remove the `m6i.xlarge` worker specification below - # once it's the default worker instance type + n_workers: 20 worker_vm_types: [t3.medium] # 2CPU, 4GiB backend_options: region: "us-east-1" # Same region as dataset diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index b2c8e861a1..12e6199042 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -1,5 +1,6 @@ import os import uuid +from collections import OrderedDict import coiled import dask.dataframe as dd @@ -38,97 +39,95 @@ def from_csv_to_parquet_client( yield client -COLUMNSV1 = { - "GlobalEventID": "Int64", - "Day": "Int64", - "MonthYear": "Int64", - "Year": "Int64", - "FractionDate": "Float64", - "Actor1Code": "string[pyarrow]", - "Actor1Name": "string[pyarrow]", - "Actor1CountryCode": "string[pyarrow]", - "Actor1KnownGroupCode": "string[pyarrow]", - "Actor1EthnicCode": "string[pyarrow]", - "Actor1Religion1Code": "string[pyarrow]", - "Actor1Religion2Code": "string[pyarrow]", - "Actor1Type1Code": "string[pyarrow]", - "Actor1Type2Code": "string[pyarrow]", - "Actor1Type3Code": "string[pyarrow]", - "Actor2Code": "string[pyarrow]", - "Actor2Name": "string[pyarrow]", - "Actor2CountryCode": "string[pyarrow]", - "Actor2KnownGroupCode": "string[pyarrow]", - "Actor2EthnicCode": "string[pyarrow]", - "Actor2Religion1Code": "string[pyarrow]", - "Actor2Religion2Code": "string[pyarrow]", - "Actor2Type1Code": "string[pyarrow]", - "Actor2Type2Code": "string[pyarrow]", - "Actor2Type3Code": "string[pyarrow]", - "IsRootEvent": "Int64", - "EventCode": "string[pyarrow]", - "EventBaseCode": "string[pyarrow]", - "EventRootCode": "string[pyarrow]", - "QuadClass": "Int64", - "GoldsteinScale": "Float64", - "NumMentions": "Int64", - "NumSources": "Int64", - "NumArticles": "Int64", - "AvgTone": "Float64", - "Actor1Geo_Type": "Int64", - "Actor1Geo_Fullname": "string[pyarrow]", - "Actor1Geo_CountryCode": "string[pyarrow]", - "Actor1Geo_ADM1Code": "string[pyarrow]", - "Actor1Geo_Lat": "Float64", - "Actor1Geo_Long": "Float64", - "Actor1Geo_FeatureID": "string[pyarrow]", - "Actor2Geo_Type": "Int64", - "Actor2Geo_Fullname": "string[pyarrow]", - "Actor2Geo_CountryCode": "string[pyarrow]", - "Actor2Geo_ADM1Code": "string[pyarrow]", - "Actor2Geo_Lat": "Float64", - "Actor2Geo_Long": "Float64", - "Actor2Geo_FeatureID": "string[pyarrow]", - "ActionGeo_Type": "Int64", - "ActionGeo_Fullname": "string[pyarrow]", - "ActionGeo_CountryCode": "string[pyarrow]", - "ActionGeo_ADM1Code": "string[pyarrow]", - "ActionGeo_Lat": "Float64", - "ActionGeo_Long": "Float64", - "ActionGeo_FeatureID": "string[pyarrow]", - "DATEADDED": "Int64", - "SOURCEURL": "string[pyarrow]", -} +COLUMNSV1 = OrderedDict([ + ("GlobalEventID", "Int64"), + ("Day", "Int64"), + ("MonthYear", "Int64"), + ("Year", "Int64"), + ("FractionDate", "Float64"), + ("Actor1Code", "string[pyarrow]"), + ("Actor1Name", "string[pyarrow]"), + ("Actor1CountryCode", "string[pyarrow]"), + ("Actor1KnownGroupCode", "string[pyarrow]"), + ("Actor1EthnicCode", "string[pyarrow]"), + ("Actor1Religion1Code", "string[pyarrow]"), + ("Actor1Religion2Code", "string[pyarrow]"), + ("Actor1Type1Code", "string[pyarrow]"), + ("Actor1Type2Code", "string[pyarrow]"), + ("Actor1Type3Code", "string[pyarrow]"), + ("Actor2Code", "string[pyarrow]"), + ("Actor2Name", "string[pyarrow]"), + ("Actor2CountryCode", "string[pyarrow]"), + ("Actor2KnownGroupCode", "string[pyarrow]"), + ("Actor2EthnicCode", "string[pyarrow]"), + ("Actor2Religion1Code", "string[pyarrow]"), + ("Actor2Religion2Code", "string[pyarrow]"), + ("Actor2Type1Code", "string[pyarrow]"), + ("Actor2Type2Code", "string[pyarrow]"), + ("Actor2Type3Code", "string[pyarrow]"), + ("IsRootEvent", "Int64"), + ("EventCode", "string[pyarrow]"), + ("EventBaseCode", "string[pyarrow]"), + ("EventRootCode", "string[pyarrow]"), + ("QuadClass", "Int64"), + ("GoldsteinScale", "Float64"), + ("NumMentions", "Int64"), + ("NumSources", "Int64"), + ("NumArticles", "Int64"), + ("AvgTone", "Float64"), + ("Actor1Geo_Type", "Int64"), + ("Actor1Geo_Fullname", "string[pyarrow]"), + ("Actor1Geo_CountryCode", "string[pyarrow]"), + ("Actor1Geo_ADM1Code", "string[pyarrow]"), + ("Actor1Geo_Lat", "Float64"), + ("Actor1Geo_Long", "Float64"), + ("Actor1Geo_FeatureID", "string[pyarrow]"), + ("Actor2Geo_Type", "Int64"), + ("Actor2Geo_Fullname", "string[pyarrow]"), + ("Actor2Geo_CountryCode", "string[pyarrow]"), + ("Actor2Geo_ADM1Code", "string[pyarrow]"), + ("Actor2Geo_Lat", "Float64"), + ("Actor2Geo_Long", "Float64"), + ("Actor2Geo_FeatureID", "string[pyarrow]"), + ("ActionGeo_Type", "Int64"), + ("ActionGeo_Fullname", "string[pyarrow]"), + ("ActionGeo_CountryCode", "string[pyarrow]"), + ("ActionGeo_ADM1Code", "string[pyarrow]"), + ("ActionGeo_Lat", "Float64"), + ("ActionGeo_Long", "Float64"), + ("ActionGeo_FeatureID", "string[pyarrow]"), + ("DATEADDED", "Int64"), + ("SOURCEURL", "string[pyarrow]"), +]) def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): - s3 = s3_factory(anon=True) + client = from_csv_to_parquet_client - bad_files = [ - "gdelt-open-data/events/20161004.export.csv", - "gdelt-open-data/events/20170106.export.csv", - "gdelt-open-data/events/20170422.export.csv", - "gdelt-open-data/events/20170802.export.csv", - "gdelt-open-data/events/20170920.export.csv", - "gdelt-open-data/events/20171021.export.csv", - "gdelt-open-data/events/20180415.export.csv", - "gdelt-open-data/events/20180416.export.csv", - "gdelt-open-data/events/20180613.export.csv", - "gdelt-open-data/events/20180806.export.csv", - "gdelt-open-data/events/20190217.export.csv", - "gdelt-open-data/events/20190613.export.csv", - ] - files = s3.ls("s3://gdelt-open-data/events/")[120:] - files = [f"s3://{f}" for f in files if f not in bad_files] + s3 = s3_factory(anon=True) + files = s3.ls("s3://gdelt-open-data/events/")[-120:] + files = [f"s3://{f}" for f in files] - df = dd.read_csv( - files, - names=COLUMNSV1.keys(), - sep="\t", - dtype=COLUMNSV1, - storage_options=s3.storage_options, - on_bad_lines="skip", - ) + def download(path): + try: + df = dd.read_csv( + path, + sep='\t', + names=COLUMNSV1.keys(), + dtype=COLUMNSV1, + storage_options=s3.storage_options, + on_bad_lines="skip" + ) + # TODO: Want to raise ValueError right away, not + # inside the graph later which persist (or nothing) would cause. + df = df.compute() + return dd.from_pandas(df, npartitions=8).persist() + except Exception as exc: # Bad file + print(f"Failed to read {path} w/ {exc}") + downloads = client.map(download, files) + dfs = (d.result() for d in downloads) + df = dd.concat([df for df in dfs if df is not None]) df = df.map_partitions( lambda xdf: xdf.drop_duplicates(subset=["SOURCEURL"], keep="first") ) @@ -136,7 +135,6 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): "washingtonpost|nytimes", regex=True ) df = df[df["national_paper"]] - - output = s3_url + "/from-csv-to-parquet/" - - df.to_parquet(output) + df = df.persist() + assert len(df) + df.to_parquet(f"{s3_url}/from-csv-to-parquet/") From 6a9313010ab2174e5df8fe0f5d50ac7b9d377cd0 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Wed, 10 May 2023 15:27:24 +0200 Subject: [PATCH 21/31] Just use hardcoded bad files --- cluster_kwargs.yaml | 2 +- tests/workflows/test_from_csv_to_parquet.py | 45 +++++++++++---------- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index 13e6f8f204..701e8330a3 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -69,7 +69,7 @@ test_repeated_merge_spill: # For tests/workflows/test_from_csv_to_parquet.py from_csv_to_parquet_cluster: - n_workers: 20 + n_workers: 30 worker_vm_types: [t3.medium] # 2CPU, 4GiB backend_options: region: "us-east-1" # Same region as dataset diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 12e6199042..378a5c5e9a 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -105,29 +105,32 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): client = from_csv_to_parquet_client s3 = s3_factory(anon=True) - files = s3.ls("s3://gdelt-open-data/events/")[-120:] - files = [f"s3://{f}" for f in files] + bad_files = [ + "gdelt-open-data/events/20161004.export.csv", + "gdelt-open-data/events/20170106.export.csv", + "gdelt-open-data/events/20170422.export.csv", + "gdelt-open-data/events/20170802.export.csv", + "gdelt-open-data/events/20170920.export.csv", + "gdelt-open-data/events/20171021.export.csv", + "gdelt-open-data/events/20180415.export.csv", + "gdelt-open-data/events/20180416.export.csv", + "gdelt-open-data/events/20180613.export.csv", + "gdelt-open-data/events/20180806.export.csv", + "gdelt-open-data/events/20190217.export.csv", + "gdelt-open-data/events/20190613.export.csv", + ] + files = s3.ls("s3://gdelt-open-data/events/")[:1000] + files = [f"s3://{f}" for f in files if f not in bad_files] - def download(path): - try: - df = dd.read_csv( - path, - sep='\t', - names=COLUMNSV1.keys(), - dtype=COLUMNSV1, - storage_options=s3.storage_options, - on_bad_lines="skip" - ) - # TODO: Want to raise ValueError right away, not - # inside the graph later which persist (or nothing) would cause. - df = df.compute() - return dd.from_pandas(df, npartitions=8).persist() - except Exception as exc: # Bad file - print(f"Failed to read {path} w/ {exc}") + df = dd.read_csv( + files, + names=COLUMNSV1.keys(), + sep="\t", + dtype=COLUMNSV1, + storage_options=s3.storage_options, + on_bad_lines="skip", + ) - downloads = client.map(download, files) - dfs = (d.result() for d in downloads) - df = dd.concat([df for df in dfs if df is not None]) df = df.map_partitions( lambda xdf: xdf.drop_duplicates(subset=["SOURCEURL"], keep="first") ) From e2b070e5d2f6f1cbd355592229ba02e43490c011 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Wed, 10 May 2023 16:02:23 +0200 Subject: [PATCH 22/31] Use dtype and converters to avoid hard-coded bad_files --- tests/workflows/test_from_csv_to_parquet.py | 164 ++++++++++---------- 1 file changed, 82 insertions(+), 82 deletions(-) diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 378a5c5e9a..8df8b1a22b 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -1,11 +1,10 @@ -import os import uuid from collections import OrderedDict import coiled import dask.dataframe as dd import pytest -from distributed import Client, LocalCluster, wait +from distributed import Client @pytest.fixture(scope="module") @@ -39,96 +38,96 @@ def from_csv_to_parquet_client( yield client -COLUMNSV1 = OrderedDict([ - ("GlobalEventID", "Int64"), - ("Day", "Int64"), - ("MonthYear", "Int64"), - ("Year", "Int64"), - ("FractionDate", "Float64"), - ("Actor1Code", "string[pyarrow]"), - ("Actor1Name", "string[pyarrow]"), - ("Actor1CountryCode", "string[pyarrow]"), - ("Actor1KnownGroupCode", "string[pyarrow]"), - ("Actor1EthnicCode", "string[pyarrow]"), - ("Actor1Religion1Code", "string[pyarrow]"), - ("Actor1Religion2Code", "string[pyarrow]"), - ("Actor1Type1Code", "string[pyarrow]"), - ("Actor1Type2Code", "string[pyarrow]"), - ("Actor1Type3Code", "string[pyarrow]"), - ("Actor2Code", "string[pyarrow]"), - ("Actor2Name", "string[pyarrow]"), - ("Actor2CountryCode", "string[pyarrow]"), - ("Actor2KnownGroupCode", "string[pyarrow]"), - ("Actor2EthnicCode", "string[pyarrow]"), - ("Actor2Religion1Code", "string[pyarrow]"), - ("Actor2Religion2Code", "string[pyarrow]"), - ("Actor2Type1Code", "string[pyarrow]"), - ("Actor2Type2Code", "string[pyarrow]"), - ("Actor2Type3Code", "string[pyarrow]"), - ("IsRootEvent", "Int64"), - ("EventCode", "string[pyarrow]"), - ("EventBaseCode", "string[pyarrow]"), - ("EventRootCode", "string[pyarrow]"), - ("QuadClass", "Int64"), - ("GoldsteinScale", "Float64"), - ("NumMentions", "Int64"), - ("NumSources", "Int64"), - ("NumArticles", "Int64"), - ("AvgTone", "Float64"), - ("Actor1Geo_Type", "Int64"), - ("Actor1Geo_Fullname", "string[pyarrow]"), - ("Actor1Geo_CountryCode", "string[pyarrow]"), - ("Actor1Geo_ADM1Code", "string[pyarrow]"), - ("Actor1Geo_Lat", "Float64"), - ("Actor1Geo_Long", "Float64"), - ("Actor1Geo_FeatureID", "string[pyarrow]"), - ("Actor2Geo_Type", "Int64"), - ("Actor2Geo_Fullname", "string[pyarrow]"), - ("Actor2Geo_CountryCode", "string[pyarrow]"), - ("Actor2Geo_ADM1Code", "string[pyarrow]"), - ("Actor2Geo_Lat", "Float64"), - ("Actor2Geo_Long", "Float64"), - ("Actor2Geo_FeatureID", "string[pyarrow]"), - ("ActionGeo_Type", "Int64"), - ("ActionGeo_Fullname", "string[pyarrow]"), - ("ActionGeo_CountryCode", "string[pyarrow]"), - ("ActionGeo_ADM1Code", "string[pyarrow]"), - ("ActionGeo_Lat", "Float64"), - ("ActionGeo_Long", "Float64"), - ("ActionGeo_FeatureID", "string[pyarrow]"), - ("DATEADDED", "Int64"), - ("SOURCEURL", "string[pyarrow]"), -]) +SCHEMA = OrderedDict( + [ + ("GlobalEventID", "Int64"), + ("Day", "Int64"), + ("MonthYear", "Int64"), + ("Year", "Int64"), + ("FractionDate", "Float64"), + ("Actor1Code", "string[pyarrow]"), + ("Actor1Name", "string[pyarrow]"), + ("Actor1CountryCode", "string[pyarrow]"), + ("Actor1KnownGroupCode", "string[pyarrow]"), + ("Actor1EthnicCode", "string[pyarrow]"), + ("Actor1Religion1Code", "string[pyarrow]"), + ("Actor1Religion2Code", "string[pyarrow]"), + ("Actor1Type1Code", "string[pyarrow]"), + ("Actor1Type2Code", "string[pyarrow]"), + ("Actor1Type3Code", "string[pyarrow]"), + ("Actor2Code", "string[pyarrow]"), + ("Actor2Name", "string[pyarrow]"), + ("Actor2CountryCode", "string[pyarrow]"), + ("Actor2KnownGroupCode", "string[pyarrow]"), + ("Actor2EthnicCode", "string[pyarrow]"), + ("Actor2Religion1Code", "string[pyarrow]"), + ("Actor2Religion2Code", "string[pyarrow]"), + ("Actor2Type1Code", "string[pyarrow]"), + ("Actor2Type2Code", "string[pyarrow]"), + ("Actor2Type3Code", "string[pyarrow]"), + ("IsRootEvent", "Int64"), + ("EventCode", "string[pyarrow]"), + ("EventBaseCode", "string[pyarrow]"), + ("EventRootCode", "string[pyarrow]"), + ("QuadClass", "Int64"), + ("GoldsteinScale", "Float64"), + ("NumMentions", "Int64"), + ("NumSources", "Int64"), + ("NumArticles", "Int64"), + ("AvgTone", "Float64"), + ("Actor1Geo_Type", "Int64"), + ("Actor1Geo_Fullname", "string[pyarrow]"), + ("Actor1Geo_CountryCode", "string[pyarrow]"), + ("Actor1Geo_ADM1Code", "string[pyarrow]"), + ("Actor1Geo_Lat", "Float64"), + ("Actor1Geo_Long", "Float64"), + ("Actor1Geo_FeatureID", "string[pyarrow]"), + ("Actor2Geo_Type", "Int64"), + ("Actor2Geo_Fullname", "string[pyarrow]"), + ("Actor2Geo_CountryCode", "string[pyarrow]"), + ("Actor2Geo_ADM1Code", "string[pyarrow]"), + ("Actor2Geo_Lat", "Float64"), + ("Actor2Geo_Long", "Float64"), + ("Actor2Geo_FeatureID", "string[pyarrow]"), + ("ActionGeo_Type", "Int64"), + ("ActionGeo_Fullname", "string[pyarrow]"), + ("ActionGeo_CountryCode", "string[pyarrow]"), + ("ActionGeo_ADM1Code", "string[pyarrow]"), + ("ActionGeo_Lat", "Float64"), + ("ActionGeo_Long", "Float64"), + ("ActionGeo_FeatureID", "string[pyarrow]"), + ("DATEADDED", "Int64"), + ("SOURCEURL", "string[pyarrow]"), + ] +) def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): - client = from_csv_to_parquet_client - s3 = s3_factory(anon=True) - bad_files = [ - "gdelt-open-data/events/20161004.export.csv", - "gdelt-open-data/events/20170106.export.csv", - "gdelt-open-data/events/20170422.export.csv", - "gdelt-open-data/events/20170802.export.csv", - "gdelt-open-data/events/20170920.export.csv", - "gdelt-open-data/events/20171021.export.csv", - "gdelt-open-data/events/20180415.export.csv", - "gdelt-open-data/events/20180416.export.csv", - "gdelt-open-data/events/20180613.export.csv", - "gdelt-open-data/events/20180806.export.csv", - "gdelt-open-data/events/20190217.export.csv", - "gdelt-open-data/events/20190613.export.csv", - ] files = s3.ls("s3://gdelt-open-data/events/")[:1000] - files = [f"s3://{f}" for f in files if f not in bad_files] + files = [f"s3://{f}" for f in files] df = dd.read_csv( files, - names=COLUMNSV1.keys(), sep="\t", - dtype=COLUMNSV1, + names=SCHEMA.keys(), + # 'dtype' and 'converters' cannot overlap + dtype={ + col: dtype for col, dtype in SCHEMA.items() if dtype == "string[pyarrow]" + }, storage_options=s3.storage_options, on_bad_lines="skip", + # Some bad files have '#' in numeric values + converters={ + col: lambda v: float(v.replace("#", "") or "NaN") + for col, dtype in SCHEMA.items() + if dtype != "string[pyarrow]" + }, + ) + + # Now we can safely convert the numeric columns + df = df.astype( + {col: dtype for col, dtype in SCHEMA.items() if dtype != "string[pyarrow]"} ) df = df.map_partitions( @@ -140,4 +139,5 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): df = df[df["national_paper"]] df = df.persist() assert len(df) - df.to_parquet(f"{s3_url}/from-csv-to-parquet/") + + df.to_parquet(f"{s3_url}/from-csv-to-parquet/", write_index=False) From 5b58925af9d23c07163dcd45a0ad86843b3cb42f Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Wed, 10 May 2023 16:08:24 +0200 Subject: [PATCH 23/31] Reset tests.yml to main --- .github/workflows/tests.yml | 38 ------------------------------------- 1 file changed, 38 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 093174de3c..79d2bf0a3b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -33,44 +33,6 @@ jobs: os: [ubuntu-latest] python-version: ["3.9"] pytest_args: [tests] - runtime-version: [upstream, latest, "0.2.1"] - include: - # Run stability tests on Python 3.8 - - pytest_args: tests/stability - python-version: "3.8" - runtime-version: upstream - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.8" - runtime-version: latest - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.8" - runtime-version: "0.2.1" - os: ubuntu-latest - # Run stability tests on Python 3.10 - - pytest_args: tests/stability - python-version: "3.10" - runtime-version: upstream - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.10" - runtime-version: latest - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.10" - runtime-version: "0.2.1" - os: ubuntu-latest - # Run stability tests on Python Windows and MacOS (latest py39 only) - - pytest_args: tests/stability - python-version: "3.9" - runtime-version: latest - os: windows-latest - - pytest_args: tests/stability - python-version: "3.9" - runtime-version: latest - os: macos-latest - pytest_args: [tests] runtime-version: [upstream, latest] include: # Run stability tests on the lowest and highest versions of Python only From a753aea5d796fafa4a20e5a2ef9e0e439beb5f55 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Thu, 11 May 2023 20:15:47 +0200 Subject: [PATCH 24/31] Use default instance types --- cluster_kwargs.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index 701e8330a3..f7d26882e0 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -70,6 +70,5 @@ test_repeated_merge_spill: # For tests/workflows/test_from_csv_to_parquet.py from_csv_to_parquet_cluster: n_workers: 30 - worker_vm_types: [t3.medium] # 2CPU, 4GiB backend_options: region: "us-east-1" # Same region as dataset From 6e9fc7f95ac71d61bcf58bb9b7e598e32420de37 Mon Sep 17 00:00:00 2001 From: Miles Date: Thu, 11 May 2023 20:21:50 +0200 Subject: [PATCH 25/31] Update cluster_kwargs.yaml Co-authored-by: Naty Clementi --- cluster_kwargs.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index f7d26882e0..bde2f91f7f 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -70,5 +70,6 @@ test_repeated_merge_spill: # For tests/workflows/test_from_csv_to_parquet.py from_csv_to_parquet_cluster: n_workers: 30 + worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB (preferred default instance) backend_options: region: "us-east-1" # Same region as dataset From db73e85e9d9adef68ae179ec03899868e0872f0f Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Thu, 11 May 2023 20:26:47 +0200 Subject: [PATCH 26/31] Set pyarrow string conversion --- tests/conftest.py | 6 ++++++ tests/workflows/test_from_csv_to_parquet.py | 4 +++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index b5f2c47c4c..1366961a87 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -691,6 +691,12 @@ def configure_shuffling(shuffle_method): yield +@pytest.fixture +def configure_use_pyarrow_strings(): + with dask.config.set({"dataframe.convert-string": True}): + yield + + # Include https://github.com/dask/distributed/pull/7534 P2P_RECHUNK_AVAILABLE = Version(distributed.__version__) >= Version("2023.2.1") diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 8df8b1a22b..36fd9a05d3 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -102,7 +102,9 @@ def from_csv_to_parquet_client( ) -def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): +def test_from_csv_to_parquet( + from_csv_to_parquet_client, s3_factory, s3_url, configure_use_pyarrow_strings +): s3 = s3_factory(anon=True) files = s3.ls("s3://gdelt-open-data/events/")[:1000] files = [f"s3://{f}" for f in files] From ed1d9e363f92bd028d15ee64257db53485345cc6 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Thu, 11 May 2023 20:37:48 +0200 Subject: [PATCH 27/31] Reduce worker count --- cluster_kwargs.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index bde2f91f7f..a6442c48b8 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -69,7 +69,7 @@ test_repeated_merge_spill: # For tests/workflows/test_from_csv_to_parquet.py from_csv_to_parquet_cluster: - n_workers: 30 + n_workers: 10 worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB (preferred default instance) backend_options: region: "us-east-1" # Same region as dataset From e3145c621f401d0119207a16ee77ff1515a8ee43 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Mon, 15 May 2023 10:13:13 +0200 Subject: [PATCH 28/31] Try only fixing floats --- tests/workflows/test_from_csv_to_parquet.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 36fd9a05d3..86a3288306 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -114,23 +114,19 @@ def test_from_csv_to_parquet( sep="\t", names=SCHEMA.keys(), # 'dtype' and 'converters' cannot overlap - dtype={ - col: dtype for col, dtype in SCHEMA.items() if dtype == "string[pyarrow]" - }, + dtype={col: dtype for col, dtype in SCHEMA.items() if dtype != "Float64"}, storage_options=s3.storage_options, on_bad_lines="skip", - # Some bad files have '#' in numeric values + # Some bad files have '#' in float values converters={ col: lambda v: float(v.replace("#", "") or "NaN") for col, dtype in SCHEMA.items() - if dtype != "string[pyarrow]" + if dtype == "Float64" }, ) - # Now we can safely convert the numeric columns - df = df.astype( - {col: dtype for col, dtype in SCHEMA.items() if dtype != "string[pyarrow]"} - ) + # Now we can safely convert the float columns + df = df.astype({col: dtype for col, dtype in SCHEMA.items() if dtype == "Float64"}) df = df.map_partitions( lambda xdf: xdf.drop_duplicates(subset=["SOURCEURL"], keep="first") From 19606da825f0c2d1c5ccc3d8fcd7be6795f676e3 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Tue, 16 May 2023 20:48:33 +0200 Subject: [PATCH 29/31] Remove config dataframe.convert-string Not applied in current version --- tests/conftest.py | 6 ------ tests/workflows/test_from_csv_to_parquet.py | 4 +--- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 1366961a87..b5f2c47c4c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -691,12 +691,6 @@ def configure_shuffling(shuffle_method): yield -@pytest.fixture -def configure_use_pyarrow_strings(): - with dask.config.set({"dataframe.convert-string": True}): - yield - - # Include https://github.com/dask/distributed/pull/7534 P2P_RECHUNK_AVAILABLE = Version(distributed.__version__) >= Version("2023.2.1") diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 86a3288306..b471a99f44 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -102,9 +102,7 @@ def from_csv_to_parquet_client( ) -def test_from_csv_to_parquet( - from_csv_to_parquet_client, s3_factory, s3_url, configure_use_pyarrow_strings -): +def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): s3 = s3_factory(anon=True) files = s3.ls("s3://gdelt-open-data/events/")[:1000] files = [f"s3://{f}" for f in files] From e701cc130c5e0096bf20a7e9aa50eaa8f2bb5594 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Wed, 17 May 2023 19:29:19 +0200 Subject: [PATCH 30/31] Adjust for fjetter's feedback --- cluster_kwargs.yaml | 2 +- tests/workflows/test_from_csv_to_parquet.py | 41 +-------------------- 2 files changed, 3 insertions(+), 40 deletions(-) diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index a6442c48b8..50b3e537cd 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -68,7 +68,7 @@ test_repeated_merge_spill: worker_vm_types: [m6i.large] # For tests/workflows/test_from_csv_to_parquet.py -from_csv_to_parquet_cluster: +from_csv_to_parquet: n_workers: 10 worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB (preferred default instance) backend_options: diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index b471a99f44..0118343324 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -1,42 +1,7 @@ -import uuid from collections import OrderedDict -import coiled import dask.dataframe as dd import pytest -from distributed import Client - - -@pytest.fixture(scope="module") -def from_csv_to_parquet_cluster( - dask_env_variables, - cluster_kwargs, - github_cluster_tags, -): - with coiled.Cluster( - f"from-csv-to-parquet-{uuid.uuid4().hex[:8]}", - environ=dask_env_variables, - tags=github_cluster_tags, - **cluster_kwargs["from_csv_to_parquet_cluster"], - ) as cluster: - yield cluster - - -@pytest.fixture -def from_csv_to_parquet_client( - from_csv_to_parquet_cluster, - cluster_kwargs, - upload_cluster_dump, - benchmark_all, -): - n_workers = cluster_kwargs["from_csv_to_parquet_cluster"]["n_workers"] - with Client(from_csv_to_parquet_cluster) as client: - from_csv_to_parquet_cluster.scale(n_workers) - client.wait_for_workers(n_workers) - client.restart() - with upload_cluster_dump(client), benchmark_all(client): - yield client - SCHEMA = OrderedDict( [ @@ -102,7 +67,8 @@ def from_csv_to_parquet_client( ) -def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): +@pytest.mark.client("from_csv_to_parquet") +def test_from_csv_to_parquet(client, s3_factory, s3_url): s3 = s3_factory(anon=True) files = s3.ls("s3://gdelt-open-data/events/")[:1000] files = [f"s3://{f}" for f in files] @@ -133,7 +99,4 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): "washingtonpost|nytimes", regex=True ) df = df[df["national_paper"]] - df = df.persist() - assert len(df) - df.to_parquet(f"{s3_url}/from-csv-to-parquet/", write_index=False) From 530d35c7c8c2fec080cb91abbbe2a316fe3726d7 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Mon, 22 May 2023 15:14:29 +0200 Subject: [PATCH 31/31] Use float64 instead of Float64 --- tests/workflows/test_from_csv_to_parquet.py | 24 ++++++++++----------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 0118343324..a54d0c09b7 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -9,7 +9,7 @@ ("Day", "Int64"), ("MonthYear", "Int64"), ("Year", "Int64"), - ("FractionDate", "Float64"), + ("FractionDate", "float64"), ("Actor1Code", "string[pyarrow]"), ("Actor1Name", "string[pyarrow]"), ("Actor1CountryCode", "string[pyarrow]"), @@ -35,31 +35,31 @@ ("EventBaseCode", "string[pyarrow]"), ("EventRootCode", "string[pyarrow]"), ("QuadClass", "Int64"), - ("GoldsteinScale", "Float64"), + ("GoldsteinScale", "float64"), ("NumMentions", "Int64"), ("NumSources", "Int64"), ("NumArticles", "Int64"), - ("AvgTone", "Float64"), + ("AvgTone", "float64"), ("Actor1Geo_Type", "Int64"), ("Actor1Geo_Fullname", "string[pyarrow]"), ("Actor1Geo_CountryCode", "string[pyarrow]"), ("Actor1Geo_ADM1Code", "string[pyarrow]"), - ("Actor1Geo_Lat", "Float64"), - ("Actor1Geo_Long", "Float64"), + ("Actor1Geo_Lat", "float64"), + ("Actor1Geo_Long", "float64"), ("Actor1Geo_FeatureID", "string[pyarrow]"), ("Actor2Geo_Type", "Int64"), ("Actor2Geo_Fullname", "string[pyarrow]"), ("Actor2Geo_CountryCode", "string[pyarrow]"), ("Actor2Geo_ADM1Code", "string[pyarrow]"), - ("Actor2Geo_Lat", "Float64"), - ("Actor2Geo_Long", "Float64"), + ("Actor2Geo_Lat", "float64"), + ("Actor2Geo_Long", "float64"), ("Actor2Geo_FeatureID", "string[pyarrow]"), ("ActionGeo_Type", "Int64"), ("ActionGeo_Fullname", "string[pyarrow]"), ("ActionGeo_CountryCode", "string[pyarrow]"), ("ActionGeo_ADM1Code", "string[pyarrow]"), - ("ActionGeo_Lat", "Float64"), - ("ActionGeo_Long", "Float64"), + ("ActionGeo_Lat", "float64"), + ("ActionGeo_Long", "float64"), ("ActionGeo_FeatureID", "string[pyarrow]"), ("DATEADDED", "Int64"), ("SOURCEURL", "string[pyarrow]"), @@ -78,19 +78,19 @@ def test_from_csv_to_parquet(client, s3_factory, s3_url): sep="\t", names=SCHEMA.keys(), # 'dtype' and 'converters' cannot overlap - dtype={col: dtype for col, dtype in SCHEMA.items() if dtype != "Float64"}, + dtype={col: dtype for col, dtype in SCHEMA.items() if dtype != "float64"}, storage_options=s3.storage_options, on_bad_lines="skip", # Some bad files have '#' in float values converters={ col: lambda v: float(v.replace("#", "") or "NaN") for col, dtype in SCHEMA.items() - if dtype == "Float64" + if dtype == "float64" }, ) # Now we can safely convert the float columns - df = df.astype({col: dtype for col, dtype in SCHEMA.items() if dtype == "Float64"}) + df = df.astype({col: dtype for col, dtype in SCHEMA.items() if dtype == "float64"}) df = df.map_partitions( lambda xdf: xdf.drop_duplicates(subset=["SOURCEURL"], keep="first")