Skip to content

Commit

Permalink
Adjust for fjetter's feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
milesgranger committed May 17, 2023
1 parent 19606da commit e701cc1
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 40 deletions.
2 changes: 1 addition & 1 deletion cluster_kwargs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ test_repeated_merge_spill:
worker_vm_types: [m6i.large]

# For tests/workflows/test_from_csv_to_parquet.py
from_csv_to_parquet_cluster:
from_csv_to_parquet:
n_workers: 10
worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB (preferred default instance)
backend_options:
Expand Down
41 changes: 2 additions & 39 deletions tests/workflows/test_from_csv_to_parquet.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,7 @@
import uuid
from collections import OrderedDict

import coiled
import dask.dataframe as dd
import pytest
from distributed import Client


@pytest.fixture(scope="module")
def from_csv_to_parquet_cluster(
dask_env_variables,
cluster_kwargs,
github_cluster_tags,
):
with coiled.Cluster(
f"from-csv-to-parquet-{uuid.uuid4().hex[:8]}",
environ=dask_env_variables,
tags=github_cluster_tags,
**cluster_kwargs["from_csv_to_parquet_cluster"],
) as cluster:
yield cluster


@pytest.fixture
def from_csv_to_parquet_client(
from_csv_to_parquet_cluster,
cluster_kwargs,
upload_cluster_dump,
benchmark_all,
):
n_workers = cluster_kwargs["from_csv_to_parquet_cluster"]["n_workers"]
with Client(from_csv_to_parquet_cluster) as client:
from_csv_to_parquet_cluster.scale(n_workers)
client.wait_for_workers(n_workers)
client.restart()
with upload_cluster_dump(client), benchmark_all(client):
yield client


SCHEMA = OrderedDict(
[
Expand Down Expand Up @@ -102,7 +67,8 @@ def from_csv_to_parquet_client(
)


def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url):
@pytest.mark.client("from_csv_to_parquet")
def test_from_csv_to_parquet(client, s3_factory, s3_url):
s3 = s3_factory(anon=True)
files = s3.ls("s3://gdelt-open-data/events/")[:1000]
files = [f"s3://{f}" for f in files]
Expand Down Expand Up @@ -133,7 +99,4 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url):
"washingtonpost|nytimes", regex=True
)
df = df[df["national_paper"]]
df = df.persist()
assert len(df)

df.to_parquet(f"{s3_url}/from-csv-to-parquet/", write_index=False)

0 comments on commit e701cc1

Please sign in to comment.