-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Workflow: CSV to parquet #841
Merged
Merged
Changes from all commits
Commits
Show all changes
35 commits
Select commit
Hold shift + click to select a range
3a76080
Add matplotlib arxiv workflow
jrbourbeau 5aedaca
Remove stray print
jrbourbeau e185e5d
Merge branch 'main' of https://github.com/coiled/coiled-runtime into …
jrbourbeau 95585d4
Update fixture name
jrbourbeau 10a529b
Only use requester_pays for test_embarassingly_parallel
jrbourbeau 4504020
Rerun CI
jrbourbeau 96e66f5
Update instance type
jrbourbeau 0779b48
Run workflows on demand and during nightly cron job
jrbourbeau 7fb6792
Use specific range of years
jrbourbeau 437eb0b
Merge branch 'main' of https://github.com/coiled/coiled-runtime into …
jrbourbeau e4851df
Light asserts
jrbourbeau 2657885
add workflow
douglasdavis 6eb04d6
show something with use of pytest -s
douglasdavis 6f4c04e
Merge remote-tracking branch 'origin/main' into add-workflow-from-csv…
douglasdavis 222c695
rm unnecessary noqa comments
douglasdavis fc40687
var name
douglasdavis 670e3cc
adjust tests.yml based on James' suggestion
douglasdavis 16b0277
write some parquet to s3
douglasdavis b557bc5
this version actually passes
douglasdavis ccedaf8
check if read works
douglasdavis 37120c3
works with some excluded files
douglasdavis b3cfbaa
rm unnecessary line
douglasdavis ca7df93
Resolve merge conflict
milesgranger cdca5de
Refactoring [skip ci]
milesgranger 6a93130
Just use hardcoded bad files
milesgranger e2b070e
Use dtype and converters to avoid hard-coded bad_files
milesgranger 5b58925
Reset tests.yml to main
milesgranger a753aea
Use default instance types
milesgranger 6e9fc7f
Update cluster_kwargs.yaml
milesgranger db73e85
Set pyarrow string conversion
milesgranger ed1d9e3
Reduce worker count
milesgranger e3145c6
Try only fixing floats
milesgranger 19606da
Remove config dataframe.convert-string
milesgranger e701cc1
Adjust for fjetter's feedback
milesgranger 530d35c
Use float64 instead of Float64
milesgranger File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
from collections import OrderedDict | ||
|
||
import dask.dataframe as dd | ||
import pytest | ||
|
||
SCHEMA = OrderedDict( | ||
[ | ||
("GlobalEventID", "Int64"), | ||
("Day", "Int64"), | ||
("MonthYear", "Int64"), | ||
("Year", "Int64"), | ||
("FractionDate", "float64"), | ||
("Actor1Code", "string[pyarrow]"), | ||
("Actor1Name", "string[pyarrow]"), | ||
("Actor1CountryCode", "string[pyarrow]"), | ||
("Actor1KnownGroupCode", "string[pyarrow]"), | ||
("Actor1EthnicCode", "string[pyarrow]"), | ||
("Actor1Religion1Code", "string[pyarrow]"), | ||
("Actor1Religion2Code", "string[pyarrow]"), | ||
("Actor1Type1Code", "string[pyarrow]"), | ||
("Actor1Type2Code", "string[pyarrow]"), | ||
("Actor1Type3Code", "string[pyarrow]"), | ||
("Actor2Code", "string[pyarrow]"), | ||
("Actor2Name", "string[pyarrow]"), | ||
("Actor2CountryCode", "string[pyarrow]"), | ||
("Actor2KnownGroupCode", "string[pyarrow]"), | ||
("Actor2EthnicCode", "string[pyarrow]"), | ||
("Actor2Religion1Code", "string[pyarrow]"), | ||
("Actor2Religion2Code", "string[pyarrow]"), | ||
("Actor2Type1Code", "string[pyarrow]"), | ||
("Actor2Type2Code", "string[pyarrow]"), | ||
("Actor2Type3Code", "string[pyarrow]"), | ||
("IsRootEvent", "Int64"), | ||
("EventCode", "string[pyarrow]"), | ||
("EventBaseCode", "string[pyarrow]"), | ||
("EventRootCode", "string[pyarrow]"), | ||
("QuadClass", "Int64"), | ||
("GoldsteinScale", "float64"), | ||
("NumMentions", "Int64"), | ||
("NumSources", "Int64"), | ||
("NumArticles", "Int64"), | ||
("AvgTone", "float64"), | ||
("Actor1Geo_Type", "Int64"), | ||
("Actor1Geo_Fullname", "string[pyarrow]"), | ||
("Actor1Geo_CountryCode", "string[pyarrow]"), | ||
("Actor1Geo_ADM1Code", "string[pyarrow]"), | ||
("Actor1Geo_Lat", "float64"), | ||
("Actor1Geo_Long", "float64"), | ||
("Actor1Geo_FeatureID", "string[pyarrow]"), | ||
("Actor2Geo_Type", "Int64"), | ||
("Actor2Geo_Fullname", "string[pyarrow]"), | ||
("Actor2Geo_CountryCode", "string[pyarrow]"), | ||
("Actor2Geo_ADM1Code", "string[pyarrow]"), | ||
("Actor2Geo_Lat", "float64"), | ||
("Actor2Geo_Long", "float64"), | ||
("Actor2Geo_FeatureID", "string[pyarrow]"), | ||
("ActionGeo_Type", "Int64"), | ||
("ActionGeo_Fullname", "string[pyarrow]"), | ||
("ActionGeo_CountryCode", "string[pyarrow]"), | ||
("ActionGeo_ADM1Code", "string[pyarrow]"), | ||
("ActionGeo_Lat", "float64"), | ||
("ActionGeo_Long", "float64"), | ||
("ActionGeo_FeatureID", "string[pyarrow]"), | ||
("DATEADDED", "Int64"), | ||
("SOURCEURL", "string[pyarrow]"), | ||
] | ||
) | ||
|
||
|
||
@pytest.mark.client("from_csv_to_parquet") | ||
def test_from_csv_to_parquet(client, s3_factory, s3_url): | ||
s3 = s3_factory(anon=True) | ||
files = s3.ls("s3://gdelt-open-data/events/")[:1000] | ||
files = [f"s3://{f}" for f in files] | ||
|
||
df = dd.read_csv( | ||
files, | ||
sep="\t", | ||
names=SCHEMA.keys(), | ||
# 'dtype' and 'converters' cannot overlap | ||
milesgranger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
dtype={col: dtype for col, dtype in SCHEMA.items() if dtype != "float64"}, | ||
storage_options=s3.storage_options, | ||
on_bad_lines="skip", | ||
# Some bad files have '#' in float values | ||
converters={ | ||
col: lambda v: float(v.replace("#", "") or "NaN") | ||
for col, dtype in SCHEMA.items() | ||
if dtype == "float64" | ||
}, | ||
) | ||
|
||
# Now we can safely convert the float columns | ||
df = df.astype({col: dtype for col, dtype in SCHEMA.items() if dtype == "float64"}) | ||
|
||
df = df.map_partitions( | ||
milesgranger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
lambda xdf: xdf.drop_duplicates(subset=["SOURCEURL"], keep="first") | ||
) | ||
df["national_paper"] = df.SOURCEURL.str.contains( | ||
"washingtonpost|nytimes", regex=True | ||
) | ||
df = df[df["national_paper"]] | ||
milesgranger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
df.to_parquet(f"{s3_url}/from-csv-to-parquet/", write_index=False) | ||
milesgranger marked this conversation as resolved.
Show resolved
Hide resolved
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes read_csv fall back to the python engine. 2 things:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose it's quite likely the average user (including me) would not know this.
Therefore, IMO, I'd think it ought to stay this way so when it changes in the future, the bump in performance would be evident historically because it's what the average user might expect.
The second option I think would be a slightly hacky work-around that the benchmarks aren't designed for. My understanding is these workflows should represent somewhat realistic use cases and not as a means for maximizing performance itself. Could be mistaken though. cc @fjetter @jrbourbeau
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally fine by me if this is intended. Just wanted to check.
read_csv
raises a warning that tells you that it will use the Python engine if you execute this code. That's why I suggested switching it explicitly. There aren't any plans to support regex or multi char seps in the c engine and I doubt that there ever will be, so we don't have to plan for that scenarioThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a follow-up ticket, we could parametrize this test with comma-separated and tab-separated files if we care about the performance of the different engines.