Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow: CSV to parquet #841

Merged
merged 35 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
3a76080
Add matplotlib arxiv workflow
jrbourbeau Mar 21, 2023
5aedaca
Remove stray print
jrbourbeau Mar 21, 2023
e185e5d
Merge branch 'main' of https://github.com/coiled/coiled-runtime into …
jrbourbeau Mar 21, 2023
95585d4
Update fixture name
jrbourbeau Mar 21, 2023
10a529b
Only use requester_pays for test_embarassingly_parallel
jrbourbeau Mar 21, 2023
4504020
Rerun CI
jrbourbeau Mar 22, 2023
96e66f5
Update instance type
jrbourbeau Mar 23, 2023
0779b48
Run workflows on demand and during nightly cron job
jrbourbeau Mar 23, 2023
7fb6792
Use specific range of years
jrbourbeau Mar 27, 2023
437eb0b
Merge branch 'main' of https://github.com/coiled/coiled-runtime into …
jrbourbeau Mar 27, 2023
e4851df
Light asserts
jrbourbeau Mar 27, 2023
2657885
add workflow
douglasdavis Mar 28, 2023
6eb04d6
show something with use of pytest -s
douglasdavis Mar 28, 2023
6f4c04e
Merge remote-tracking branch 'origin/main' into add-workflow-from-csv…
douglasdavis Mar 28, 2023
222c695
rm unnecessary noqa comments
douglasdavis Mar 28, 2023
fc40687
var name
douglasdavis Mar 28, 2023
670e3cc
adjust tests.yml based on James' suggestion
douglasdavis Mar 28, 2023
16b0277
write some parquet to s3
douglasdavis Mar 29, 2023
b557bc5
this version actually passes
douglasdavis Apr 6, 2023
ccedaf8
check if read works
douglasdavis Apr 6, 2023
37120c3
works with some excluded files
douglasdavis Apr 11, 2023
b3cfbaa
rm unnecessary line
douglasdavis Apr 11, 2023
ca7df93
Resolve merge conflict
milesgranger May 10, 2023
cdca5de
Refactoring [skip ci]
milesgranger May 10, 2023
6a93130
Just use hardcoded bad files
milesgranger May 10, 2023
e2b070e
Use dtype and converters to avoid hard-coded bad_files
milesgranger May 10, 2023
5b58925
Reset tests.yml to main
milesgranger May 10, 2023
a753aea
Use default instance types
milesgranger May 11, 2023
6e9fc7f
Update cluster_kwargs.yaml
milesgranger May 11, 2023
db73e85
Set pyarrow string conversion
milesgranger May 11, 2023
ed1d9e3
Reduce worker count
milesgranger May 11, 2023
e3145c6
Try only fixing floats
milesgranger May 15, 2023
19606da
Remove config dataframe.convert-string
milesgranger May 16, 2023
e701cc1
Adjust for fjetter's feedback
milesgranger May 17, 2023
530d35c
Use float64 instead of Float64
milesgranger May 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cluster_kwargs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,10 @@ test_work_stealing_on_straggling_worker:
test_repeated_merge_spill:
n_workers: 20
worker_vm_types: [m6i.large]

# For tests/workflows/test_from_csv_to_parquet.py
from_csv_to_parquet_cluster:
n_workers: 10
worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB (preferred default instance)
backend_options:
region: "us-east-1" # Same region as dataset
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,12 @@ def configure_shuffling(shuffle_method):
yield


@pytest.fixture
def configure_use_pyarrow_strings():
with dask.config.set({"dataframe.convert-string": True}):
yield


# Include https://github.com/dask/distributed/pull/7534
P2P_RECHUNK_AVAILABLE = Version(distributed.__version__) >= Version("2023.2.1")

Expand Down
141 changes: 141 additions & 0 deletions tests/workflows/test_from_csv_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import uuid
from collections import OrderedDict

import coiled
import dask.dataframe as dd
import pytest
from distributed import Client


@pytest.fixture(scope="module")
def from_csv_to_parquet_cluster(
dask_env_variables,
cluster_kwargs,
github_cluster_tags,
):
with coiled.Cluster(
f"from-csv-to-parquet-{uuid.uuid4().hex[:8]}",
environ=dask_env_variables,
tags=github_cluster_tags,
**cluster_kwargs["from_csv_to_parquet_cluster"],
) as cluster:
yield cluster


@pytest.fixture
def from_csv_to_parquet_client(
from_csv_to_parquet_cluster,
cluster_kwargs,
upload_cluster_dump,
benchmark_all,
milesgranger marked this conversation as resolved.
Show resolved Hide resolved
):
n_workers = cluster_kwargs["from_csv_to_parquet_cluster"]["n_workers"]
with Client(from_csv_to_parquet_cluster) as client:
from_csv_to_parquet_cluster.scale(n_workers)
client.wait_for_workers(n_workers)
client.restart()
milesgranger marked this conversation as resolved.
Show resolved Hide resolved
with upload_cluster_dump(client), benchmark_all(client):
yield client


SCHEMA = OrderedDict(
[
("GlobalEventID", "Int64"),
("Day", "Int64"),
("MonthYear", "Int64"),
("Year", "Int64"),
("FractionDate", "Float64"),
milesgranger marked this conversation as resolved.
Show resolved Hide resolved
("Actor1Code", "string[pyarrow]"),
("Actor1Name", "string[pyarrow]"),
("Actor1CountryCode", "string[pyarrow]"),
("Actor1KnownGroupCode", "string[pyarrow]"),
("Actor1EthnicCode", "string[pyarrow]"),
("Actor1Religion1Code", "string[pyarrow]"),
("Actor1Religion2Code", "string[pyarrow]"),
("Actor1Type1Code", "string[pyarrow]"),
("Actor1Type2Code", "string[pyarrow]"),
("Actor1Type3Code", "string[pyarrow]"),
("Actor2Code", "string[pyarrow]"),
("Actor2Name", "string[pyarrow]"),
("Actor2CountryCode", "string[pyarrow]"),
("Actor2KnownGroupCode", "string[pyarrow]"),
("Actor2EthnicCode", "string[pyarrow]"),
("Actor2Religion1Code", "string[pyarrow]"),
("Actor2Religion2Code", "string[pyarrow]"),
("Actor2Type1Code", "string[pyarrow]"),
("Actor2Type2Code", "string[pyarrow]"),
("Actor2Type3Code", "string[pyarrow]"),
("IsRootEvent", "Int64"),
("EventCode", "string[pyarrow]"),
("EventBaseCode", "string[pyarrow]"),
("EventRootCode", "string[pyarrow]"),
("QuadClass", "Int64"),
("GoldsteinScale", "Float64"),
("NumMentions", "Int64"),
("NumSources", "Int64"),
("NumArticles", "Int64"),
("AvgTone", "Float64"),
("Actor1Geo_Type", "Int64"),
("Actor1Geo_Fullname", "string[pyarrow]"),
("Actor1Geo_CountryCode", "string[pyarrow]"),
("Actor1Geo_ADM1Code", "string[pyarrow]"),
("Actor1Geo_Lat", "Float64"),
("Actor1Geo_Long", "Float64"),
("Actor1Geo_FeatureID", "string[pyarrow]"),
("Actor2Geo_Type", "Int64"),
("Actor2Geo_Fullname", "string[pyarrow]"),
("Actor2Geo_CountryCode", "string[pyarrow]"),
("Actor2Geo_ADM1Code", "string[pyarrow]"),
("Actor2Geo_Lat", "Float64"),
("Actor2Geo_Long", "Float64"),
("Actor2Geo_FeatureID", "string[pyarrow]"),
("ActionGeo_Type", "Int64"),
("ActionGeo_Fullname", "string[pyarrow]"),
("ActionGeo_CountryCode", "string[pyarrow]"),
("ActionGeo_ADM1Code", "string[pyarrow]"),
("ActionGeo_Lat", "Float64"),
("ActionGeo_Long", "Float64"),
("ActionGeo_FeatureID", "string[pyarrow]"),
("DATEADDED", "Int64"),
("SOURCEURL", "string[pyarrow]"),
]
)


def test_from_csv_to_parquet(
from_csv_to_parquet_client, s3_factory, s3_url, configure_use_pyarrow_strings
):
s3 = s3_factory(anon=True)
files = s3.ls("s3://gdelt-open-data/events/")[:1000]
files = [f"s3://{f}" for f in files]

df = dd.read_csv(
files,
sep="\t",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes read_csv fall back to the python engine. 2 things:

  • if this is intended, we should be explicit
  • the python engine is really slow. So if this is not intended we should choose a file that has a 1 character separator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it's quite likely the average user (including me) would not know this.

Therefore, IMO, I'd think it ought to stay this way so when it changes in the future, the bump in performance would be evident historically because it's what the average user might expect.

The second option I think would be a slightly hacky work-around that the benchmarks aren't designed for. My understanding is these workflows should represent somewhat realistic use cases and not as a means for maximizing performance itself. Could be mistaken though. cc @fjetter @jrbourbeau

Copy link
Contributor

@phofl phofl May 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally fine by me if this is intended. Just wanted to check.
read_csv raises a warning that tells you that it will use the Python engine if you execute this code. That's why I suggested switching it explicitly. There aren't any plans to support regex or multi char seps in the c engine and I doubt that there ever will be, so we don't have to plan for that scenario

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a follow-up ticket, we could parametrize this test with comma-separated and tab-separated files if we care about the performance of the different engines.

names=SCHEMA.keys(),
# 'dtype' and 'converters' cannot overlap
milesgranger marked this conversation as resolved.
Show resolved Hide resolved
dtype={col: dtype for col, dtype in SCHEMA.items() if dtype != "Float64"},
storage_options=s3.storage_options,
on_bad_lines="skip",
# Some bad files have '#' in float values
converters={
col: lambda v: float(v.replace("#", "") or "NaN")
for col, dtype in SCHEMA.items()
if dtype == "Float64"
},
)

# Now we can safely convert the float columns
df = df.astype({col: dtype for col, dtype in SCHEMA.items() if dtype == "Float64"})

df = df.map_partitions(
milesgranger marked this conversation as resolved.
Show resolved Hide resolved
lambda xdf: xdf.drop_duplicates(subset=["SOURCEURL"], keep="first")
)
df["national_paper"] = df.SOURCEURL.str.contains(
"washingtonpost|nytimes", regex=True
)
df = df[df["national_paper"]]
milesgranger marked this conversation as resolved.
Show resolved Hide resolved
df = df.persist()
assert len(df)
milesgranger marked this conversation as resolved.
Show resolved Hide resolved

df.to_parquet(f"{s3_url}/from-csv-to-parquet/", write_index=False)
milesgranger marked this conversation as resolved.
Show resolved Hide resolved