Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add workflow for reading CSV from s3, cleaning, saving to Parquet #738

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from

Conversation

douglasdavis
Copy link
Contributor

@douglasdavis douglasdavis commented Mar 28, 2023

This workflow reads the GDELT project CSV (with sep="\t") dataset which is on S3. The plan is to do some cleaning and save the transformed data to Parquet.

@jrbourbeau jrbourbeau added the workflows Related to representative Dask user workflows label Mar 28, 2023
Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @douglasdavis! #724 was just merged, so you should be able to merge main to incorporate those changes. Also, since workflows will tend to be larger and require more resources, we've configured them to only run on PRs when the workflows label has been added (otherwise all tests in tests/workflows will be skipped). I've gone ahead and added the workflows label to this PR

@douglasdavis
Copy link
Contributor Author

Awesome, thanks!

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While iterating on this workflow, I'd recommend adding the following (temporary) change to only run test_from_csv_to_parquet.py in CI. That should prevent running the full test suite unnecessarily and make it easier to iterate quickly

diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 973f7f8..b15196e 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -31,44 +31,44 @@ jobs:
       matrix:
         os: [ubuntu-latest]
         python-version: ["3.9"]
-        pytest_args: [tests]
+        pytest_args: [tests/workflows/test_from_csv_to_parquet.py]
         runtime-version: [upstream, latest, "0.2.1"]
-        include:
-          # Run stability tests on Python 3.8
-          - pytest_args: tests/stability
-            python-version: "3.8"
-            runtime-version: upstream
-            os: ubuntu-latest
-          - pytest_args: tests/stability
-            python-version: "3.8"
-            runtime-version: latest
-            os: ubuntu-latest
-          - pytest_args: tests/stability
-            python-version: "3.8"
-            runtime-version: "0.2.1"
-            os: ubuntu-latest
-          # Run stability tests on Python 3.10
-          - pytest_args: tests/stability
-            python-version: "3.10"
-            runtime-version: upstream
-            os: ubuntu-latest
-          - pytest_args: tests/stability
-            python-version: "3.10"
-            runtime-version: latest
-            os: ubuntu-latest
-          - pytest_args: tests/stability
-            python-version: "3.10"
-            runtime-version: "0.2.1"
-            os: ubuntu-latest
-          # Run stability tests on Python Windows and MacOS (latest py39 only)
-          - pytest_args: tests/stability
-            python-version: "3.9"
-            runtime-version: latest
-            os: windows-latest
-          - pytest_args: tests/stability
-            python-version: "3.9"
-            runtime-version: latest
-            os: macos-latest
+        # include:
+        #   # Run stability tests on Python 3.8
+        #   - pytest_args: tests/stability
+        #     python-version: "3.8"
+        #     runtime-version: upstream
+        #     os: ubuntu-latest
+        #   - pytest_args: tests/stability
+        #     python-version: "3.8"
+        #     runtime-version: latest
+        #     os: ubuntu-latest
+        #   - pytest_args: tests/stability
+        #     python-version: "3.8"
+        #     runtime-version: "0.2.1"
+        #     os: ubuntu-latest
+        #   # Run stability tests on Python 3.10
+        #   - pytest_args: tests/stability
+        #     python-version: "3.10"
+        #     runtime-version: upstream
+        #     os: ubuntu-latest
+        #   - pytest_args: tests/stability
+        #     python-version: "3.10"
+        #     runtime-version: latest
+        #     os: ubuntu-latest
+        #   - pytest_args: tests/stability
+        #     python-version: "3.10"
+        #     runtime-version: "0.2.1"
+        #     os: ubuntu-latest
+        #   # Run stability tests on Python Windows and MacOS (latest py39 only)
+        #   - pytest_args: tests/stability
+        #     python-version: "3.9"
+        #     runtime-version: latest
+        #     os: windows-latest
+        #   - pytest_args: tests/stability
+        #     python-version: "3.9"
+        #     runtime-version: latest
+        #     os: macos-latest
 
     steps:
       - name: Checkout

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @douglasdavis. I left a few comments / questions

tests/workflows/test_from_csv_to_parquet.py Show resolved Hide resolved


def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url):
s3 = s3_factory(anon=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking -- is anon=True needed to access the dataset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, I'll give it a test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it is necessary

)

df = df.partitions[-10:]
df = df.map_partitions(drop_dupe_per_partition)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why apply pandas' drop_duplicates instead of just using dasks? Did you run into an issue with the dask version?

Copy link
Contributor Author

@douglasdavis douglasdavis Apr 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing wrong with the Dask version- it's just that it reduces the graph completely (does an apply-concat-apply); at this point I'd like to remove duplicates on a per-file basis.

tests/workflows/test_from_csv_to_parquet.py Outdated Show resolved Hide resolved
tests/workflows/test_from_csv_to_parquet.py Outdated Show resolved Hide resolved
@jrbourbeau
Copy link
Member

Ah, also it looks like pre-commit is failing and there's a merge conflict. Let me know if I can be helpful with either of those (in particular the merge conflict)

@douglasdavis
Copy link
Contributor Author

Hey @jrbourbeau thanks for the input! I've got a few notes I plan to distill down into a little summary status I'll add here a little later today, will answer a couple of your questions (and I have a couple as well!)

@douglasdavis
Copy link
Contributor Author

Answered some things inline.
another item for my summary here:
When spinning this on a larger piece of the dataset I've found that some files are not perfect CSV's (some parsing errors). Coiled worker logs were very helpful here! and not just solvable with pd.read_csv's on_bad_lines argument. I'm trying to work through by adding some use of delayed to check to do a lazy check for bad files; or just keep an explicit list of bad files associated with this dataset and make sure to not use those.
Last thing: I think the conflict stems from the temporary change in the GH workflow file, so that should be fixed by reverting that temporary change I hope

@douglasdavis
Copy link
Contributor Author

Alright so the current status of the test is passing, but it took going in a bit of a roundabout path. Like I mentioned above, some files had parsing errors (discovered after trying to churn on the whole dataset for the first time). Also some of the larger files in the dataset (the early ones that cram a whole year's or month's worth of data into a single file) would cause the worker to run out of memory. I solved the former by essentially creating another mini workflow (code below) which just determines which files cause the parsing exception with a big list of delayeds-- Coiled was magic for this (once the bad files were determined I hardcoded them into the test). Solved the latter by just omitting the first ~5% of the files. As mentioned-things pass, but I feel the need for this to be a bit more polished and do something more interesting before saving to parquet (now that it at least works!), or perhaps cut my losses and find a better public CSV dataset. Will dive back in when I'm back from leave if this is still open

Code used to determine bad files
import s3fs
import distributed
import coiled
from dask.delayed import delayed

import pandas as pd


cluster_kwargs = dict(
    package_sync=True,
    wait_for_workers=False,
    scheduler_vm_types=["m6i.large"],
    backend_options=dict(region="us-east-1"),
    worker_vm_types=["m6i.large"],
)


fs = s3fs.S3FileSystem(anon=True)

raw_list = fs.ls("s3://gdelt-open-data/events/")[120:]

@delayed
def try_to_read_csv(filename):
    COLUMNSV1 = {
        "GlobalEventID": "Int64",
        "Day": "Int64",
        "MonthYear": "Int64",
        "Year": "Int64",
        "FractionDate": "Float64",
        "Actor1Code": "string[pyarrow]",
        "Actor1Name": "string[pyarrow]",
        "Actor1CountryCode": "string[pyarrow]",
        "Actor1KnownGroupCode": "string[pyarrow]",
        "Actor1EthnicCode": "string[pyarrow]",
        "Actor1Religion1Code": "string[pyarrow]",
        "Actor1Religion2Code": "string[pyarrow]",
        "Actor1Type1Code": "string[pyarrow]",
        "Actor1Type2Code": "string[pyarrow]",
        "Actor1Type3Code": "string[pyarrow]",
        "Actor2Code": "string[pyarrow]",
        "Actor2Name": "string[pyarrow]",
        "Actor2CountryCode": "string[pyarrow]",
        "Actor2KnownGroupCode": "string[pyarrow]",
        "Actor2EthnicCode": "string[pyarrow]",
        "Actor2Religion1Code": "string[pyarrow]",
        "Actor2Religion2Code": "string[pyarrow]",
        "Actor2Type1Code": "string[pyarrow]",
        "Actor2Type2Code": "string[pyarrow]",
        "Actor2Type3Code": "string[pyarrow]",
        "IsRootEvent": "Int64",
        "EventCode": "string[pyarrow]",
        "EventBaseCode": "string[pyarrow]",
        "EventRootCode": "string[pyarrow]",
        "QuadClass": "Int64",
        "GoldsteinScale": "Float64",
        "NumMentions": "Int64",
        "NumSources": "Int64",
        "NumArticles": "Int64",
        "AvgTone": "Float64",
        "Actor1Geo_Type": "Int64",
        "Actor1Geo_Fullname": "string[pyarrow]",
        "Actor1Geo_CountryCode": "string[pyarrow]",
        "Actor1Geo_ADM1Code": "string[pyarrow]",
        "Actor1Geo_Lat": "Float64",
        "Actor1Geo_Long": "Float64",
        "Actor1Geo_FeatureID": "string[pyarrow]",
        "Actor2Geo_Type": "Int64",
        "Actor2Geo_Fullname": "string[pyarrow]",
        "Actor2Geo_CountryCode": "string[pyarrow]",
        "Actor2Geo_ADM1Code": "string[pyarrow]",
        "Actor2Geo_Lat": "Float64",
        "Actor2Geo_Long": "Float64",
        "Actor2Geo_FeatureID": "string[pyarrow]",
        "ActionGeo_Type": "Int64",
        "ActionGeo_Fullname": "string[pyarrow]",
        "ActionGeo_CountryCode": "string[pyarrow]",
        "ActionGeo_ADM1Code": "string[pyarrow]",
        "ActionGeo_Lat": "Float64",
        "ActionGeo_Long": "Float64",
        "ActionGeo_FeatureID": "string[pyarrow]",
        "DATEADDED": "Int64",
        "SOURCEURL": "string[pyarrow]",
    }
    try:
        df = pd.read_csv(
            f"s3://{filename}",
            sep="\t",
            names=COLUMNSV1.keys(),
            dtype=COLUMNSV1,
            storage_options={"anon": True},
        )
        good = (True, filename)
        del df
    except Exception as err:
        good = (False, filename, err)
    return good


tries = [try_to_read_csv(x) for x in raw_list]

# ipython -i <file.py>
# >>> cluster = coiled.Cluster(**cluster_kwargs)
# >>> client = cluster.get_client()
# >>> tries_computed = client.compute(tries)
# >>> results = [tc.result() for tc in tries_computed]
# >>> bad = [x for x in results if not x[0]]
# >>> bad_files = [x[1] for x in bad]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
workflows Related to representative Dask user workflows
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants