|
1 |
| -import uuid |
2 | 1 | from collections import OrderedDict
|
3 | 2 |
|
4 |
| -import coiled |
5 | 3 | import dask.dataframe as dd
|
6 | 4 | import pytest
|
7 |
| -from distributed import Client |
8 |
| - |
9 |
| - |
10 |
| -@pytest.fixture(scope="module") |
11 |
| -def from_csv_to_parquet_cluster( |
12 |
| - dask_env_variables, |
13 |
| - cluster_kwargs, |
14 |
| - github_cluster_tags, |
15 |
| -): |
16 |
| - with coiled.Cluster( |
17 |
| - f"from-csv-to-parquet-{uuid.uuid4().hex[:8]}", |
18 |
| - environ=dask_env_variables, |
19 |
| - tags=github_cluster_tags, |
20 |
| - **cluster_kwargs["from_csv_to_parquet_cluster"], |
21 |
| - ) as cluster: |
22 |
| - yield cluster |
23 |
| - |
24 |
| - |
25 |
| -@pytest.fixture |
26 |
| -def from_csv_to_parquet_client( |
27 |
| - from_csv_to_parquet_cluster, |
28 |
| - cluster_kwargs, |
29 |
| - upload_cluster_dump, |
30 |
| - benchmark_all, |
31 |
| -): |
32 |
| - n_workers = cluster_kwargs["from_csv_to_parquet_cluster"]["n_workers"] |
33 |
| - with Client(from_csv_to_parquet_cluster) as client: |
34 |
| - from_csv_to_parquet_cluster.scale(n_workers) |
35 |
| - client.wait_for_workers(n_workers) |
36 |
| - client.restart() |
37 |
| - with upload_cluster_dump(client), benchmark_all(client): |
38 |
| - yield client |
39 |
| - |
40 | 5 |
|
41 | 6 | SCHEMA = OrderedDict(
|
42 | 7 | [
|
@@ -102,7 +67,8 @@ def from_csv_to_parquet_client(
|
102 | 67 | )
|
103 | 68 |
|
104 | 69 |
|
105 |
| -def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): |
| 70 | +@pytest.mark.client("from_csv_to_parquet") |
| 71 | +def test_from_csv_to_parquet(client, s3_factory, s3_url): |
106 | 72 | s3 = s3_factory(anon=True)
|
107 | 73 | files = s3.ls("s3://gdelt-open-data/events/")[:1000]
|
108 | 74 | files = [f"s3://{f}" for f in files]
|
@@ -133,7 +99,4 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url):
|
133 | 99 | "washingtonpost|nytimes", regex=True
|
134 | 100 | )
|
135 | 101 | df = df[df["national_paper"]]
|
136 |
| - df = df.persist() |
137 |
| - assert len(df) |
138 |
| - |
139 | 102 | df.to_parquet(f"{s3_url}/from-csv-to-parquet/", write_index=False)
|
0 commit comments