From 6d264c3b1ac0666d3fa6a32053fe0279e7d686ed Mon Sep 17 00:00:00 2001 From: david Date: Fri, 7 Jun 2024 16:46:56 +1000 Subject: [PATCH] temp fixes for malformed csv --- README.md | 4 +++ .../pipeline_nsw_doe/assets/raw/assets.py | 12 +++---- .../assets/raw/schema_masterdataset.py | 11 ------- orchestration/pipeline_nsw_doe/factory.py | 33 ------------------- requirements.in | 1 + requirements.txt | 4 +++ .../facts/fct__resource_allocation.sql | 4 +-- 7 files changed, 17 insertions(+), 52 deletions(-) delete mode 100644 orchestration/pipeline_nsw_doe/factory.py diff --git a/README.md b/README.md index b1f9cb4..c9f3278 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,10 @@ This is an data-stack-in-a-box based data from [NSW Education Data Hub](https:// > [!IMPORTANT] > Click below 👇🏼 to setup your own free data stack packed with [NSW Department of Education](https://education.nsw.gov.au/) data. +> [!WARNING] +> Only 100 Schools will appear. Currently have a bug where master data set is malformed and cant get all shcools +> to remove this filter when fixed `df = df.head(100)` in master data asset + [![Open in GitHub Codespaces](https://github.com/codespaces/badge.svg)](https://codespaces.new/wisemuffin/nsw-doe-data-stack-in-a-box?quickstart=1) diff --git a/orchestration/pipeline_nsw_doe/assets/raw/assets.py b/orchestration/pipeline_nsw_doe/assets/raw/assets.py index 0718750..517b4b0 100644 --- a/orchestration/pipeline_nsw_doe/assets/raw/assets.py +++ b/orchestration/pipeline_nsw_doe/assets/raw/assets.py @@ -9,15 +9,14 @@ asset, ) -from pipeline_nsw_doe.factory import pandera_schema_to_dagster_type +from dagster_pandera import pandera_schema_to_dagster_type +# from pipeline_nsw_doe.factory import pandera_schema_to_dagster_type from .schema_masterdataset import schema as schema_masterdataset from .schema_ram import schema as schema_ram DatahubMasterDatasetDagsterType = pandera_schema_to_dagster_type( - schema=schema_masterdataset, - name="DatahubMasterDatasetDagsterType", - description="data frame DagsterType type for this dummy asset.", + schema=schema_masterdataset ) NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA: str = os.getenv( @@ -47,6 +46,7 @@ def raw__nsw_doe_datahub__master_dataset(): url = "https://data.cese.nsw.gov.au/data/dataset/027493b2-33ad-3f5b-8ed9-37cdca2b8650/resource/2ac19870-44f6-443d-a0c3-4c867f04c305/download/master_dataset.csv" df = pd.read_csv( url, + on_bad_lines="skip", # 🚧 TODO Temp workaround due to malformed csv ) df["_load_timestamp"] = pd.Timestamp("now") @@ -56,6 +56,8 @@ def raw__nsw_doe_datahub__master_dataset(): print(df.shape) print(df.dtypes) + df = df.head(100) # 🚧 TODO - temp fix to skip errors with malformed csv + # schema = pa.infer_schema(df) # schema_script = schema.to_script('schema_template.py') # print(schema_script) @@ -69,8 +71,6 @@ def raw__nsw_doe_datahub__master_dataset(): DatahubRamDagsterType = pandera_schema_to_dagster_type( schema=schema_ram, - name="DatahubRamDagsterType", - description="data frame DagsterType type for this dummy asset.", ) diff --git a/orchestration/pipeline_nsw_doe/assets/raw/schema_masterdataset.py b/orchestration/pipeline_nsw_doe/assets/raw/schema_masterdataset.py index 554dc40..294f770 100644 --- a/orchestration/pipeline_nsw_doe/assets/raw/schema_masterdataset.py +++ b/orchestration/pipeline_nsw_doe/assets/raw/schema_masterdataset.py @@ -486,17 +486,6 @@ description=None, title=None, ), - "Healthy canteen": Column( - dtype="object", - checks=None, - nullable=True, - unique=False, - coerce=False, - required=True, - regex=False, - description=None, - title=None, - ), "FOEI_Value": Column( dtype="object", checks=None, diff --git a/orchestration/pipeline_nsw_doe/factory.py b/orchestration/pipeline_nsw_doe/factory.py deleted file mode 100644 index b3632de..0000000 --- a/orchestration/pipeline_nsw_doe/factory.py +++ /dev/null @@ -1,33 +0,0 @@ -import pandas as pd -import pandera as pa -from dagster import DagsterType, TypeCheck - - -def pandera_schema_to_dagster_type(schema, name, description): - """https://docs.dagster.io/guides/dagster/dagster_type_factories""" - - def type_check_fn(_context, value): - if not isinstance(value, pd.DataFrame): - return TypeCheck( - success=False, - description=f"Must be pandas.DataFrame, not {type(value).__name__}.", - ) - try: - # `lazy` instructs pandera to capture every (not just the first) validation error - schema.validate(value, lazy=True) - except pa.errors.SchemaErrors as e: - return TypeCheck( - success=False, - description=str(e), - metadata={ - "num_violations": len(e.failure_cases), - }, - ) - - return TypeCheck(success=True) - - return DagsterType( - type_check_fn=type_check_fn, - name=name, - description=description, - ) diff --git a/requirements.in b/requirements.in index cbc049d..9404086 100644 --- a/requirements.in +++ b/requirements.in @@ -17,6 +17,7 @@ dagster-cloud # data schema validation pandera pandera[io] +dagster-pandera # required for pandas to read excel for acara data openpyxl diff --git a/requirements.txt b/requirements.txt index fe14daf..4d5e310 100644 --- a/requirements.txt +++ b/requirements.txt @@ -121,6 +121,7 @@ dagster==1.7.6 # dagster-graphql # dagster-msteams # dagster-openai + # dagster-pandera # dagster-webserver # dagstermill dagster-cloud==1.7.6 @@ -135,6 +136,7 @@ dagster-graphql==1.7.6 # via dagster-webserver dagster-msteams==0.23.6 dagster-openai==0.23.6 +dagster-pandera==0.23.6 dagster-pipes==1.7.6 # via dagster dagster-webserver==1.7.6 @@ -548,6 +550,7 @@ pandas==1.5.3 # via # cmdstanpy # dagster-duckdb-pandas + # dagster-pandera # metricflow # pandera # phik @@ -558,6 +561,7 @@ pandas==1.5.3 # visions # ydata-profiling pandera==0.19.3 + # via dagster-pandera pandocfilters==1.5.1 # via nbconvert papermill==2.6.0 diff --git a/transformation/transformation_nsw_doe/models/dimensional/facts/fct__resource_allocation.sql b/transformation/transformation_nsw_doe/models/dimensional/facts/fct__resource_allocation.sql index a03aa91..cbfda22 100644 --- a/transformation/transformation_nsw_doe/models/dimensional/facts/fct__resource_allocation.sql +++ b/transformation/transformation_nsw_doe/models/dimensional/facts/fct__resource_allocation.sql @@ -19,7 +19,7 @@ final as ( --Foreign Keys ----Conformed Dimensions {{ get_keyed_nulls('dim__school._meta__dim__school__sk') }} as _meta__dim__school__sk, - prep__resource_allocation.year || '-01-01' as _meta__dim__date__sk, -- dont love this. 🚧 TODO - if only one date in fact this works...also doesnt force + prep__resource_allocation.year || '-01-01' as _meta__dim__date__sk, -- dont love this. 🚧 TODO - if only one date in fact this works...also doesnt force ----Local Dimensions @@ -37,7 +37,7 @@ final as ( from prep__resource_allocation left join dim__school - on prep__resource_allocation.school_code = dim__school.school_code + on cast(prep__resource_allocation.school_code as varchar) = cast(dim__school.school_code as varchar) {# left join dim__date on prep__resource_allocation.year || '-01-01' = dim__date. #} )