From 4fbc7ef22a2a8c7df62d1f243a03c60fab382f4a Mon Sep 17 00:00:00 2001 From: David Griffiths Date: Fri, 18 Oct 2024 17:27:26 +1100 Subject: [PATCH] tpch demo (#29) * tpch demo * tpch demo * tpch added metrics and dagster * cleaning up tpch * tpch working * tpch adding in default scaling factor. Can set config at run time * tpch fixes * adding analysis and fixing dbt profiles * testing out adding multiple code repos to worksapce * testing out act for github actions * testing out act for github actions * testing out act for github actions * testing out act for github actions * fixing branching for dagster end to end * changing which workspaces are made available via ui * fixing issue with dagster not finding target file from dbt for tpch * fixing issue with dagster not finding target file from dbt for tpch * fixing issue with dagster not finding target file from dbt for tpch * fixing issue with dagster not finding target file from dbt for tpch * fixing issue with dagster not finding target file from dbt for tpch * fixing issue with dagster not finding target file from dbt for tpch * fixing issue with dagster not finding target file from dbt for tpch * fixing issue with dagster not finding target file from dbt for tpch * fixing issue with dagster not finding target file from dbt for tpch * fixing issue with dagster not finding target file from dbt for tpch * fixing issue with dagster not finding target file from dbt for tpch * fixing issue with dagster not finding target file from dbt for tpch * fixing issue with dagster not finding target file from dbt for tpch * fixing codepace missing env variables * fixing codepace missing env variables * fixing codepace missing env variables * fixing broken apprenticeship, header row changed * fixing issue 36 apprentis completions sechmea change * removing requires secret from codespace * fixing issue 36 apprentis completions sechmea change * fixing issue 36 apprentis completions sechmea change * fixing issue 36 apprentis completions sechmea change * fixing ruff * fixing ruff * fixing ruff * fixing ruff * fixing ruff * fixing dbt slim ci * forcing duckdb to not get ahead of mother duck * sqlfluff fix * sqlfluff fix * pytest fix * fixing slim ci with snapshots * fixing slim ci with snapshots * fixing slim ci with snapshots * fixing slim ci with snapshots * testing slim ci * testing slim ci * fixing apprenticeship schema * fixing dagster embeded issue with version changing * fixing dagster embeded issue with version changing * removing openai from nsw_doe --- .devcontainer/devcontainer.json | 7 + .github/workflows/ci_demo_act_workflow.yml | 69 ++++ .github/workflows/ci_dev_quick_checks.yml | 3 +- .../ci_test_branch_deployments_end_to_end.yml | 34 +- .../ci_test_branch_deployments_only_dbt.yml | 8 +- Taskfile.yml | 13 +- dagster_cloud.yaml | 6 + .../notebooks/iris-kmeans.ipynb | 5 +- .../demo_pipeline_scaling_tpch/__init__.py | 74 ++++ .../assets/__init__.py | 1 + .../assets/source/__init__.py | 1 + .../assets/source/assets.py | 91 ++++ .../assets/transformation/__init__.py | 1 + .../assets/transformation/assets.py | 37 ++ .../demo_pipeline_scaling_tpch/project.py | 25 ++ .../demo_pipeline_scaling_tpch/schedules.py | 15 + .../util/__init__.py | 0 .../util/branching.py | 33 ++ orchestration/pipeline_nsw_doe/__init__.py | 35 +- .../pipeline_nsw_doe/assets/raw/assets.py | 31 +- ...ry-data-analysis-nsw-doe-entrolments.ipynb | 1 - .../pipeline_nsw_doe/assets/raw_dlt/assets.py | 5 +- .../notebooks/data-science-example.ipynb | 2 - orchestration/pipeline_nsw_doe/project.py | 4 +- .../pipeline_nsw_doe/util/__init__.py | 0 .../pipeline_nsw_doe/util/branching.py | 33 ++ .../__init__.py | 36 +- .../assets/raw_dlt/assets.py | 11 +- .../assets/semantic_layer/assets.py | 4 + .../branching.py | 33 ++ .../notebooks/data-science-example.ipynb | 2 - .../project.py | 4 +- .../test_pipeline_nsw_doe/test_pipelines.py | 14 + orchestration/workspace.yaml | 9 +- pyproject.toml | 10 + .../sources/demo_jaffle_shop__dev/.gitkeep | 0 reports/sources/demo_tpch__dev/.gitkeep | 0 .../nsw_doe_data_stack_in_a_box/.gitkeep | 0 requirements.in | 8 +- requirements.txt | 391 +++++++++--------- .../.gitignore | 4 + .../.user.yml | 1 + .../README.md | 44 ++ .../_demo_queries/data catalog.sql | 18 + .../_demo_queries/scaling_tests.sql | 7 + .../analyses/.gitkeep | 0 .../analyses/exploritory_analysis.sql | 24 ++ .../dbt_project.yml | 34 ++ .../macros/.gitkeep | 0 .../models/dimensional/dim__customer.sql | 42 ++ .../models/dimensional/dim__date.sql | 41 ++ .../models/dimensional/dim__part.sql | 26 ++ .../models/dimensional/dim__supplier.sql | 42 ++ .../models/dimensional/fct__order.sql | 77 ++++ .../models/dimensional/fct__order_item.sql | 62 +++ .../models/prep/prep__order_item.sql | 63 +++ .../models/semantic_layer/customer.yml | 30 ++ .../semantic_layer/metricflow_time_spine.sql | 8 + .../models/semantic_layer/order.yml | 143 +++++++ .../models/staging/schema.yml | 14 + .../models/staging/stg__customer.sql | 17 + .../models/staging/stg__nation.sql | 13 + .../models/staging/stg__order.sql | 18 + .../models/staging/stg__order_item.sql | 25 ++ .../models/staging/stg__part.sql | 18 + .../models/staging/stg__part_supplier.sql | 14 + .../models/staging/stg__region.sql | 12 + .../models/staging/stg__supplier.sql | 16 + .../package-lock.yml | 8 + .../packages.yml | 8 + .../profiles.yml | 19 + .../seeds/.gitkeep | 0 .../snapshots/.gitkeep | 0 .../tests/.gitkeep | 0 .../transformation_nsw_doe/analysis/.gitkeep | 0 .../transformation_nsw_doe/dbt_project.yml | 4 +- .../dimensional/facts/fct__attendance.sql | 2 +- .../models/preperation/prep__incident.sql | 8 +- 78 files changed, 1637 insertions(+), 281 deletions(-) create mode 100644 .github/workflows/ci_demo_act_workflow.yml create mode 100644 orchestration/demo_pipeline_scaling_tpch/__init__.py create mode 100644 orchestration/demo_pipeline_scaling_tpch/assets/__init__.py create mode 100644 orchestration/demo_pipeline_scaling_tpch/assets/source/__init__.py create mode 100644 orchestration/demo_pipeline_scaling_tpch/assets/source/assets.py create mode 100644 orchestration/demo_pipeline_scaling_tpch/assets/transformation/__init__.py create mode 100644 orchestration/demo_pipeline_scaling_tpch/assets/transformation/assets.py create mode 100644 orchestration/demo_pipeline_scaling_tpch/project.py create mode 100644 orchestration/demo_pipeline_scaling_tpch/schedules.py rename transformation/transformation_nsw_doe/_local_queries/.gitkeep => orchestration/demo_pipeline_scaling_tpch/util/__init__.py (100%) create mode 100644 orchestration/demo_pipeline_scaling_tpch/util/branching.py create mode 100644 orchestration/pipeline_nsw_doe/util/__init__.py create mode 100644 orchestration/pipeline_nsw_doe/util/branching.py create mode 100644 orchestration/pipeline_nsw_doe_requires_secrets/branching.py create mode 100644 orchestration/test_pipeline_nsw_doe/test_pipelines.py create mode 100644 reports/sources/demo_jaffle_shop__dev/.gitkeep create mode 100644 reports/sources/demo_tpch__dev/.gitkeep create mode 100644 reports/sources/nsw_doe_data_stack_in_a_box/.gitkeep create mode 100644 transformation/demo_transformation_scaling_tpch/.gitignore create mode 100644 transformation/demo_transformation_scaling_tpch/.user.yml create mode 100644 transformation/demo_transformation_scaling_tpch/README.md create mode 100644 transformation/demo_transformation_scaling_tpch/_demo_queries/data catalog.sql create mode 100644 transformation/demo_transformation_scaling_tpch/_demo_queries/scaling_tests.sql create mode 100644 transformation/demo_transformation_scaling_tpch/analyses/.gitkeep create mode 100644 transformation/demo_transformation_scaling_tpch/analyses/exploritory_analysis.sql create mode 100644 transformation/demo_transformation_scaling_tpch/dbt_project.yml create mode 100644 transformation/demo_transformation_scaling_tpch/macros/.gitkeep create mode 100644 transformation/demo_transformation_scaling_tpch/models/dimensional/dim__customer.sql create mode 100644 transformation/demo_transformation_scaling_tpch/models/dimensional/dim__date.sql create mode 100644 transformation/demo_transformation_scaling_tpch/models/dimensional/dim__part.sql create mode 100644 transformation/demo_transformation_scaling_tpch/models/dimensional/dim__supplier.sql create mode 100644 transformation/demo_transformation_scaling_tpch/models/dimensional/fct__order.sql create mode 100644 transformation/demo_transformation_scaling_tpch/models/dimensional/fct__order_item.sql create mode 100644 transformation/demo_transformation_scaling_tpch/models/prep/prep__order_item.sql create mode 100644 transformation/demo_transformation_scaling_tpch/models/semantic_layer/customer.yml create mode 100644 transformation/demo_transformation_scaling_tpch/models/semantic_layer/metricflow_time_spine.sql create mode 100644 transformation/demo_transformation_scaling_tpch/models/semantic_layer/order.yml create mode 100644 transformation/demo_transformation_scaling_tpch/models/staging/schema.yml create mode 100644 transformation/demo_transformation_scaling_tpch/models/staging/stg__customer.sql create mode 100644 transformation/demo_transformation_scaling_tpch/models/staging/stg__nation.sql create mode 100644 transformation/demo_transformation_scaling_tpch/models/staging/stg__order.sql create mode 100644 transformation/demo_transformation_scaling_tpch/models/staging/stg__order_item.sql create mode 100644 transformation/demo_transformation_scaling_tpch/models/staging/stg__part.sql create mode 100644 transformation/demo_transformation_scaling_tpch/models/staging/stg__part_supplier.sql create mode 100644 transformation/demo_transformation_scaling_tpch/models/staging/stg__region.sql create mode 100644 transformation/demo_transformation_scaling_tpch/models/staging/stg__supplier.sql create mode 100644 transformation/demo_transformation_scaling_tpch/package-lock.yml create mode 100644 transformation/demo_transformation_scaling_tpch/packages.yml create mode 100644 transformation/demo_transformation_scaling_tpch/profiles.yml create mode 100644 transformation/demo_transformation_scaling_tpch/seeds/.gitkeep create mode 100644 transformation/demo_transformation_scaling_tpch/snapshots/.gitkeep create mode 100644 transformation/demo_transformation_scaling_tpch/tests/.gitkeep create mode 100644 transformation/transformation_nsw_doe/analysis/.gitkeep diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 05b4b6b..2579b36 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -80,6 +80,13 @@ "NSW_DOE_DATA_STACK_IN_A_BOX_DB_NAME": "nsw_doe_data_stack_in_a_box__dev", "NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA": "analytics", + "TPCH__ENV": "dev", + "TPCH_DBT_PROJECT_DIR": "transformation/demo_transformation_scaling_tpch", + "TPCH_DB_PATH": "reports/sources/demo_tpch__dev", + "TPCH_DB_PATH_AND_DB": "reports/sources/demo_tpch__dev/demo_tpch__dev.duckdb", + "TPCH_DB_NAME": "demo_tpch__dev", + "TPCH_TARGET_SCHEMA": "analytics", + "DESTINATION__DUCKDB__CREDENTIALS": "duckdb:///////workspaces/nsw-doe-data-stack-in-a-box/reports/sources/nsw_doe_data_stack_in_a_box/nsw_doe_data_stack_in_a_box__dev.duckdb", "DAGSTER_HOME": "orchestration/dagster-local-file-store", diff --git a/.github/workflows/ci_demo_act_workflow.yml b/.github/workflows/ci_demo_act_workflow.yml new file mode 100644 index 0000000..8321246 --- /dev/null +++ b/.github/workflows/ci_demo_act_workflow.yml @@ -0,0 +1,69 @@ +name: CI Demo ACT +on: + workflow_dispatch: + # pull_request: + # types: [opened, synchronize, reopened, closed] + +concurrency: + # Cancel in-progress deploys to same branch + group: ${{ github.ref }}/branch_deployments + cancel-in-progress: true +env: + DAGSTER_CLOUD_URL: "http://wisemuffin.dagster.cloud" + DAGSTER_CLOUD_API_TOKEN: ${{ secrets.DAGSTER_CLOUD_API_TOKEN }} + ENABLE_FAST_DEPLOYS: 'true' + PYTHON_VERSION: '3.11' + DAGSTER_CLOUD_FILE: 'dagster_cloud.yaml' + + DAGSTER_CLOUD_ORGANIZATION: "wisemuffin" + DAGSTER_PROJECT_NAME: pipeline_nsw_doe + + NSW_DOE_DATA_STACK_IN_A_BOX_DAGSTER_PROJECT_DIR: ${{ vars.NSW_DOE_DATA_STACK_IN_A_BOX_DAGSTER_PROJECT_DIR }} + NSW_DOE_DATA_STACK_IN_A_BOX_DBT_PROJECT_DIR: ${{ vars.NSW_DOE_DATA_STACK_IN_A_BOX_DBT_PROJECT_DIR }} + + NSW_DOE_DATA_STACK_IN_A_BOX__ENV: ${{ vars.NSW_DOE_DATA_STACK_IN_A_BOX__ENV }} + + NSW_DOE_DATA_STACK_IN_A_BOX_DB_PATH_AND_DB: ${{ secrets.NSW_DOE_DATA_STACK_IN_A_BOX_DB_PATH_AND_DB }} + NSW_DOE_DATA_STACK_IN_A_BOX_DB_NAME: ${{ vars.NSW_DOE_DATA_STACK_IN_A_BOX_DB_NAME }} + + MOTHERDUCK_TOKEN: ${{ secrets.MOTHERDUCK_TOKEN }} + DATAFOLD_APIKEY: ${{ secrets.DATAFOLD_APIKEY }} + + DAGSTER_HOME: ${{ vars.DAGSTER_HOME }} + + AWS_ROLE_TO_ASSUME: ${{ secrets.AWS_ROLE_TO_ASSUME }} + S3_BUCKET_METADATA: ${{ secrets.S3_BUCKET_METADATA }} + + DESTINATION__DUCKDB__CREDENTIALS: ${{ secrets.DESTINATION__DUCKDB__CREDENTIALS }} + + SOURCES__GITHUB__ACCESS_TOKEN: ${{ secrets.SOURCES__GITHUB__ACCESS_TOKEN }} + + SOURCES__GOOGLE_ANALYTICS__CREDENTIALS__PROJECT_ID: ${{ secrets.SOURCES__GOOGLE_ANALYTICS__CREDENTIALS__PROJECT_ID }} + SOURCES__GOOGLE_ANALYTICS__CREDENTIALS__CLIENT_EMAIL: ${{ secrets.SOURCES__GOOGLE_ANALYTICS__CREDENTIALS__CLIENT_EMAIL }} + SOURCES__GOOGLE_ANALYTICS__CREDENTIALS__PRIVATE_KEY: ${{ secrets.SOURCES__GOOGLE_ANALYTICS__CREDENTIALS__PRIVATE_KEY }} + SOURCES__GOOGLE_ANALYTICS__PROPERTY_ID: ${{ secrets.SOURCES__GOOGLE_ANALYTICS__PROPERTY_ID }} + +jobs: + act_job: + name: ACT job + runs-on: ubuntu-20.04 + environment: test + outputs: + build_info: ${{ steps.parse-workspace.outputs.build_info }} + + steps: + - name: Setup Python + uses: actions/setup-python@v5.0.0 + with: + python-version: "3.11.x" + # - name: Prerun Checks + # id: prerun + # uses: dagster-io/dagster-cloud-action/actions/utils/prerun@v0.1 + + - name: Set NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA + run: | + head_ref=$(echo ${GITHUB_HEAD_REF} | tr '[:upper:]' '[:lower:]' | sed -e 's/[^a-zA-Z0-9]/_/g') + echo head_ref = $head_ref + echo GITHUB_HEAD_REF = ${GITHUB_HEAD_REF} + echo "NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA=pr_full_${head_ref}" >> $GITHUB_ENV + echo schema = $NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA diff --git a/.github/workflows/ci_dev_quick_checks.yml b/.github/workflows/ci_dev_quick_checks.yml index cc8656d..8fa59e1 100644 --- a/.github/workflows/ci_dev_quick_checks.yml +++ b/.github/workflows/ci_dev_quick_checks.yml @@ -49,7 +49,7 @@ jobs: with: python-version: "3.11.x" - name: check python with ruff - uses: chartboost/ruff-action@v1 + uses: astral-sh/ruff-action@v1 - name: Setup Node uses: actions/setup-node@v4.0.2 with: @@ -68,6 +68,7 @@ jobs: - name: Lint SQL run: | echo schema $NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA + cd ${{github.workspace}}/transformation/transformation_nsw_doe sqlfluff lint --format github-annotation-native # required to set path for pytest - name: set pythonpath diff --git a/.github/workflows/ci_test_branch_deployments_end_to_end.yml b/.github/workflows/ci_test_branch_deployments_end_to_end.yml index e59787f..1da2ee2 100644 --- a/.github/workflows/ci_test_branch_deployments_end_to_end.yml +++ b/.github/workflows/ci_test_branch_deployments_end_to_end.yml @@ -42,6 +42,12 @@ env: SOURCES__GOOGLE_ANALYTICS__CREDENTIALS__PRIVATE_KEY: ${{ secrets.SOURCES__GOOGLE_ANALYTICS__CREDENTIALS__PRIVATE_KEY }} SOURCES__GOOGLE_ANALYTICS__PROPERTY_ID: ${{ secrets.SOURCES__GOOGLE_ANALYTICS__PROPERTY_ID }} + + TPCH__ENV: ${{ vars.TPCH__ENV}} + TPCH_DBT_PROJECT_DIR: ${{ vars.TPCH_DBT_PROJECT_DIR}} + TPCH_DB_PATH_AND_DB: ${{ secrets.TPCH_DB_PATH_AND_DB}} + TPCH_DB_NAME: ${{ vars.TPCH_DB_NAME}} + jobs: dagster_cloud_default_deploy: name: Dagster Serverless Deploy @@ -61,8 +67,15 @@ jobs: - name: Set NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA run: | - echo "NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA=pr_full_$(echo ${GITHUB_HEAD_REF} | tr '[:upper:]' '[:lower:]' | sed -e 's/[^a-zA-Z0-9]/_/g')" >> $GITHUB_ENV - echo schema = $NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA + # echo "NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA=pr_full_$(echo ${GITHUB_HEAD_REF} | tr '[:upper:]' '[:lower:]' | sed -e 's/[^a-zA-Z0-9]/_/g')" >> $GITHUB_ENV + NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA=pr_full_$(echo ${GITHUB_HEAD_REF} | tr '[:upper:]' '[:lower:]' | sed -e 's/[^a-zA-Z0-9]/_/g') + echo "NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA=$(echo $NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA)" >> $GITHUB_ENV + # echo schema ${{ env.NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA }} # cant ref env variables in command in same run + echo schema $NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA + + - name: check schema + run: echo schema ${{ env.NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA }} + - name: Launch Docker Deploy if: steps.prerun.outputs.result == 'docker-deploy' @@ -103,9 +116,23 @@ jobs: source .venv/bin/activate # currently requirements.txt is stored at total project level one level above dagster project uv pip install -r requirements.txt + + cd ./transformation/demo_transformation_scaling_tpch + dbt deps + cd ../.. + + cd ./transformation/transformation_nsw_doe + dbt deps + cd ../.. + cd ./${{ env.NSW_DOE_DATA_STACK_IN_A_BOX_DAGSTER_PROJECT_DIR }} uv pip install . + dagster-dbt project prepare-for-deployment --file ./${{ env.DAGSTER_PROJECT_NAME }}/project.py + dagster-dbt project prepare-for-deployment --file ./pipeline_nsw_doe_requires_secrets/project.py + + dagster-dbt project prepare-for-deployment --file ./demo_pipeline_scaling_tpch/project.py + # The cli command below can be used to manage syncing the prod manifest to branches if state_path is set on the DbtProject # dagster-cloud ci dagster-dbt project manage-state --file ./${{ env.DAGSTER_PROJECT_NAME }}/project.py shell: bash @@ -119,7 +146,8 @@ jobs: python_version: "${{ env.PYTHON_VERSION }}" env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA: $NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA + NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA: ${{ env.NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA }} + # cant seem to send env to dagster config, for now when doing end to end test will need to set this env in the UI dagster_cloud_docker_deploy: name: Docker Deploy diff --git a/.github/workflows/ci_test_branch_deployments_only_dbt.yml b/.github/workflows/ci_test_branch_deployments_only_dbt.yml index d67d149..6342b12 100644 --- a/.github/workflows/ci_test_branch_deployments_only_dbt.yml +++ b/.github/workflows/ci_test_branch_deployments_only_dbt.yml @@ -75,10 +75,16 @@ jobs: aws s3 cp s3://$S3_BUCKET_METADATA/prod/manifest/manifest.json ${{github.workspace}}/transformation/transformation_nsw_doe/target/last_manifest/manifest.json - name: Deploy + # have excluded source: from state modified as this seem to try to rebuild all models as source database are different between envs. + # also for now have remove the + at the end of state:modified as that was also causing all models to be created even when nothing changed. + # TODO run: | echo schema $NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA cd ${{github.workspace}}/transformation/transformation_nsw_doe - dbt build --exclude "+*github*+ +*google*+" --models state:modified+ --defer --state ${{github.workspace}}/transformation/transformation_nsw_doe/target/last_manifest --profiles-dir ${{github.workspace}}/transformation/transformation_nsw_doe + echo models modified + dbt ls --exclude "source:* +*github*+ +*google*+" --models state:modified --defer --state ${{github.workspace}}/transformation/transformation_nsw_doe/target/last_manifest --profiles-dir ${{github.workspace}}/transformation/transformation_nsw_doe + echo building modified models + dbt build --exclude "source:* +*github*+ +*google*+ *snapshot*" --models state:modified --defer --state ${{github.workspace}}/transformation/transformation_nsw_doe/target/last_manifest --profiles-dir ${{github.workspace}}/transformation/transformation_nsw_doe # reoving datafold until we get a motherduck connection diff --git a/Taskfile.yml b/Taskfile.yml index c818605..802e315 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -22,8 +22,14 @@ tasks: # workaround as cant get dynamic manifest.json to build - cd ${GITHUB_WORKSPACE}/${NSW_DOE_DATA_STACK_IN_A_BOX_DBT_PROJECT_DIR} && dbt parse - - cd transformation/demo_transformation_jaffle_shop && dbt deps - - cd transformation/demo_transformation_jaffle_shop && dbt parse + - cd ${GITHUB_WORKSPACE}/transformation/demo_transformation_jaffle_shop && dbt deps + - cd ${GITHUB_WORKSPACE}/transformation/demo_transformation_jaffle_shop && dbt parse + + - cd ${GITHUB_WORKSPACE}/transformation/pipeline_nsw_doe_requires_secrets && dbt deps + - cd ${GITHUB_WORKSPACE}/transformation/pipeline_nsw_doe_requires_secrets && dbt parse + + - cd ${GITHUB_WORKSPACE}/transformation/demo_pipeline_scaling_tpch && dbt deps + - cd ${GITHUB_WORKSPACE}/transformation/demo_pipeline_scaling_tpch && dbt parse node_deps: @@ -87,6 +93,9 @@ tasks: duck: cmds: - duckdb ${GITHUB_WORKSPACE}/$NSW_DOE_DATA_STACK_IN_A_BOX_DB_PATH_AND_DB + duck_tpch: + cmds: + - duckdb ${GITHUB_WORKSPACE}/$TPCH_DB_PATH_AND_DB setup_sqltools: cmds: - cd ~/.local/share/vscode-sqltools && npm install duckdb-async@0.10.0 && exit 0 diff --git a/dagster_cloud.yaml b/dagster_cloud.yaml index f4ebd5f..a8e6f45 100644 --- a/dagster_cloud.yaml +++ b/dagster_cloud.yaml @@ -11,3 +11,9 @@ locations: build: directory: . working_directory: ./orchestration + - code_source: + package_name: demo_pipeline_scaling_tpch + location_name: demo-pipeline-scaling-tpch + build: + directory: . + working_directory: ./orchestration diff --git a/orchestration/demo_pipeline_jaffle_shop/notebooks/iris-kmeans.ipynb b/orchestration/demo_pipeline_jaffle_shop/notebooks/iris-kmeans.ipynb index 3cc3924..94dbcdf 100644 --- a/orchestration/demo_pipeline_jaffle_shop/notebooks/iris-kmeans.ipynb +++ b/orchestration/demo_pipeline_jaffle_shop/notebooks/iris-kmeans.ipynb @@ -909,6 +909,9 @@ } ], "source": [ + "import matplotlib.patches as mpatches\n", + "import matplotlib.lines as mlines\n", + "\n", "fig, axs = plt.subplots(ncols=3, nrows=3, figsize=(12, 12))\n", "\n", "sns.scatterplot(\n", @@ -984,8 +987,6 @@ "axs[2, 0].axis(\"off\")\n", "axs[2, 1].axis(\"off\")\n", "\n", - "import matplotlib.patches as mpatches\n", - "import matplotlib.lines as mlines\n", "\n", "palette = sns.color_palette()\n", "setosa = mpatches.Patch(color=palette[0], label=\"Iris-setosa\", alpha=0.5)\n", diff --git a/orchestration/demo_pipeline_scaling_tpch/__init__.py b/orchestration/demo_pipeline_scaling_tpch/__init__.py new file mode 100644 index 0000000..6a44f3c --- /dev/null +++ b/orchestration/demo_pipeline_scaling_tpch/__init__.py @@ -0,0 +1,74 @@ +import os +from pathlib import Path + +from dagster import Definitions, FilesystemIOManager, load_assets_from_package_module +from dagster_dbt import DbtCliResource +from dagster_duckdb_pandas import DuckDBPandasIOManager +from dagstermill import ConfigurableLocalOutputNotebookIOManager +from dotenv import load_dotenv + +# from dagster_airbyte import airbyte_resource +# from .assets import jaffle_shop_dbt_assets,raw_customers_py,raw_orders_py,raw_payments_py,iris_dataset,iris_dataset_test_to_remove #,csv_to_onelake_asset +# from .assets import iris,raw,transformation,machine_learning +from . import assets +from .project import tpch_project +from .schedules import schedules + +from .util.branching import set_schema_name_env + +load_dotenv() + + +set_schema_name_env() + +TPCH_TARGET_SCHEMA = os.getenv("TPCH_TARGET_SCHEMA") + + +TPCH_DB_PATH_AND_DB = os.getenv("TPCH_DB_PATH_AND_DB") +DUCKDB_PROJECT_DIR = str( + Path(__file__).parent.parent.parent.joinpath(os.environ["TPCH_DB_PATH_AND_DB"]) +) + +S3_BUCKET_METADATA = os.getenv("S3_BUCKET_METADATA") + +TPCH__ENV = os.getenv("TPCH__ENV", "dev") + +resources_by_env = { + "prod": { + "io_manager_dw": DuckDBPandasIOManager( + database=f"{TPCH_DB_PATH_AND_DB}", + ), + # "io_manager": S3PickleIOManager( + # s3_resource=S3Resource(), + # s3_bucket=f"{S3_BUCKET_METADATA}", + # s3_prefix=f"{TPCH_TARGET_SCHEMA}", + # ), + "io_manager": FilesystemIOManager(), + "dbt": DbtCliResource(project_dir=tpch_project), + "output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(), + }, + "test": { + "io_manager_dw": DuckDBPandasIOManager( + database=f"{TPCH_DB_PATH_AND_DB}", + ), + "io_manager": FilesystemIOManager(), + "dbt": DbtCliResource(project_dir=tpch_project), + "output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(), + }, + "dev": { + "io_manager_dw": DuckDBPandasIOManager(database=DUCKDB_PROJECT_DIR), + "io_manager": FilesystemIOManager(), + "dbt": DbtCliResource(project_dir=tpch_project), + "output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(), + }, +} + +# all_assets = [*load_assets_from_package_module(raw, group_name="TPCH"), *load_assets_from_package_module(transformation, group_name="TPCH"), *load_assets_from_package_module(iris, group_name="other"), *load_assets_from_package_module(machine_learning, group_name="TPCH")] +all_assets = load_assets_from_package_module(assets) + +defs = Definitions( + # assets=[raw_customers_py,raw_orders_py,raw_payments_py,iris_dataset,iris_dataset_test_to_remove,TPCH_dbt_assets], + assets=all_assets, + schedules=schedules, + resources=resources_by_env[os.getenv("TPCH__ENV", "dev")], +) diff --git a/orchestration/demo_pipeline_scaling_tpch/assets/__init__.py b/orchestration/demo_pipeline_scaling_tpch/assets/__init__.py new file mode 100644 index 0000000..b6e690f --- /dev/null +++ b/orchestration/demo_pipeline_scaling_tpch/assets/__init__.py @@ -0,0 +1 @@ +from . import * diff --git a/orchestration/demo_pipeline_scaling_tpch/assets/source/__init__.py b/orchestration/demo_pipeline_scaling_tpch/assets/source/__init__.py new file mode 100644 index 0000000..67b75dd --- /dev/null +++ b/orchestration/demo_pipeline_scaling_tpch/assets/source/__init__.py @@ -0,0 +1 @@ +from .assets import * diff --git a/orchestration/demo_pipeline_scaling_tpch/assets/source/assets.py b/orchestration/demo_pipeline_scaling_tpch/assets/source/assets.py new file mode 100644 index 0000000..aa9179d --- /dev/null +++ b/orchestration/demo_pipeline_scaling_tpch/assets/source/assets.py @@ -0,0 +1,91 @@ +from dagster import multi_asset, MaterializeResult, AssetSpec, AssetKey, Config +from pydantic import Field +import duckdb +from dagster_duckdb_pandas import DuckDBPandasIOManager + + +class GenerateTPCHDataConfig(Config): + scaling_factor: float = Field(default=0.1) + + +@multi_asset( + specs=[ + AssetSpec(AssetKey(["tpch", table]), skippable=True) + for table in [ + "customer", + "lineitem", + "nation", + "orders", + "part", + "partsupp", + "region", + "supplier", + ] + ], + required_resource_keys={"io_manager_dw"}, +) +def generate_tpch_data(context, config: GenerateTPCHDataConfig): + # Retrieve the database name from the context + io_manager_dw: DuckDBPandasIOManager = context.resources.io_manager_dw + database = io_manager_dw._database + conn = duckdb.connect(database=database) + schema_name = "tpch" + + # Check if schema exists and drop it + schema_exists = conn.execute( + f"SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{schema_name}'" + ).fetchone() + if schema_exists: + context.log.info(f"Schema '{schema_name}' exists. Dropping schema and tables.") + conn.execute(f"DROP SCHEMA {schema_name} CASCADE") + + # remove landing tables + conn.execute("drop table IF EXISTS customer") + conn.execute("drop table IF EXISTS lineitem") + conn.execute("drop table IF EXISTS nation") + conn.execute("drop table IF EXISTS orders") + conn.execute("drop table IF EXISTS part") + conn.execute("drop table IF EXISTS partsupp") + conn.execute("drop table IF EXISTS region") + conn.execute("drop table IF EXISTS supplier") + + # Create schema + conn.execute(f"CREATE SCHEMA {schema_name}") + + # Generate TPC-H data + context.log.info( + f"Generating TPC-H data with scale factor {config.scaling_factor}." + ) + conn.execute(f"CALL dbgen(sf = {config.scaling_factor})") + + # update the schema + context.log.info("Updating schema for TPC-H data") + tables = [ + "customer", + "lineitem", + "nation", + "orders", + "part", + "partsupp", + "region", + "supplier", + ] + for table in tables: + # would have prefered to `alter table {table} set schema {schema_name}`` but not supported yet see: https://github.com/duckdb/duckdb/discussions/10641 + conn.execute(f"CREATE TABLE {schema_name}.{table} as select * from {table}") + + # Verify data generation + context.log.info("Verify data generation") + tables = conn.execute( + f"SELECT table_name FROM information_schema.tables WHERE table_schema = '{schema_name}'" + ).fetchall() + if tables: + context.log.info( + f"Generated TPC-H data in schema '{schema_name}' with tables: {', '.join([table[0] for table in tables])}" + ) + else: + context.log.error("Failed to generate TPC-H data.") + + for table in tables: + table_name = table[0] + yield MaterializeResult(asset_key=AssetKey([schema_name, table_name])) diff --git a/orchestration/demo_pipeline_scaling_tpch/assets/transformation/__init__.py b/orchestration/demo_pipeline_scaling_tpch/assets/transformation/__init__.py new file mode 100644 index 0000000..67b75dd --- /dev/null +++ b/orchestration/demo_pipeline_scaling_tpch/assets/transformation/__init__.py @@ -0,0 +1 @@ +from .assets import * diff --git a/orchestration/demo_pipeline_scaling_tpch/assets/transformation/assets.py b/orchestration/demo_pipeline_scaling_tpch/assets/transformation/assets.py new file mode 100644 index 0000000..3223b5d --- /dev/null +++ b/orchestration/demo_pipeline_scaling_tpch/assets/transformation/assets.py @@ -0,0 +1,37 @@ +from typing import Any, Mapping + +from dagster import AssetExecutionContext, AssetKey +from dagster_dbt import ( + DagsterDbtTranslator, + DagsterDbtTranslatorSettings, + DbtCliResource, + dbt_assets, +) + +from ...project import tpch_project + + +class CustomDagsterDbtTranslator(DagsterDbtTranslator): + def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey: + asset_key = super().get_asset_key(dbt_resource_props) + + if dbt_resource_props["resource_type"] == "source": + # asset_key = asset_key.with_prefix("raw") + pass + else: + asset_key = asset_key.with_prefix("analytics") + + return asset_key + + +@dbt_assets( + manifest=tpch_project.manifest_path, + dagster_dbt_translator=CustomDagsterDbtTranslator( + settings=DagsterDbtTranslatorSettings(enable_asset_checks=True) + ), + io_manager_key="io_manager_dw", + exclude="saved_query:* *google* *github* *web_analytics* *repo*", + select="fqn:*", +) +def jaffle_shop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): + yield from dbt.cli(["build"], context=context).stream() diff --git a/orchestration/demo_pipeline_scaling_tpch/project.py b/orchestration/demo_pipeline_scaling_tpch/project.py new file mode 100644 index 0000000..ca75e00 --- /dev/null +++ b/orchestration/demo_pipeline_scaling_tpch/project.py @@ -0,0 +1,25 @@ +import os +from pathlib import Path + +from dagster_dbt import DbtProject, DbtCliResource + + +project_dir = ( + Path(__file__) + .joinpath("..", "..", "..", os.environ["TPCH_DBT_PROJECT_DIR"]) + .resolve() +) + + +dbt = DbtCliResource(project_dir=str(project_dir)) + +if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD") or not os.path.exists( + os.path.join(project_dir, "target") +): + dbt_parse_invocation = dbt.cli(["parse"]).wait() + + +tpch_project = DbtProject( + project_dir=project_dir, + packaged_project_dir=Path(__file__).joinpath("..", "dbt-project-temp").resolve(), +) diff --git a/orchestration/demo_pipeline_scaling_tpch/schedules.py b/orchestration/demo_pipeline_scaling_tpch/schedules.py new file mode 100644 index 0000000..be2d3b5 --- /dev/null +++ b/orchestration/demo_pipeline_scaling_tpch/schedules.py @@ -0,0 +1,15 @@ +""" +To add a daily schedule that materializes your dbt assets, uncomment the following lines. +""" +# from dagster_dbt import build_schedule_from_dbt_selection + +# from .assets import jaffle_shop_dbt_assets + +schedules = [ + # build_schedule_from_dbt_selection( + # [jaffle_shop_dbt_assets], + # job_name="materialize_dbt_models", + # cron_schedule="0 0 * * *", + # dbt_select="fqn:*", + # ), +] diff --git a/transformation/transformation_nsw_doe/_local_queries/.gitkeep b/orchestration/demo_pipeline_scaling_tpch/util/__init__.py similarity index 100% rename from transformation/transformation_nsw_doe/_local_queries/.gitkeep rename to orchestration/demo_pipeline_scaling_tpch/util/__init__.py diff --git a/orchestration/demo_pipeline_scaling_tpch/util/branching.py b/orchestration/demo_pipeline_scaling_tpch/util/branching.py new file mode 100644 index 0000000..4059c75 --- /dev/null +++ b/orchestration/demo_pipeline_scaling_tpch/util/branching.py @@ -0,0 +1,33 @@ +import os +import re + + +def set_schema_name_env(): + """creates the schema name for the variable TPCH_TARGET_SCHEMA""" + if os.getenv("TPCH_TARGET_SCHEMA"): + """already set as part of environment setup""" + pass + + # work around to set schema when in a branch deployment, DAGSTER_CLOUD_GIT_BRANCH is only present in branch deployments + elif ( + "DAGSTER_CLOUD_GIT_BRANCH" in os.environ + and os.getenv("NSW_DOE_DATA_STACK_IN_A_BOX__ENV") != "prod" + ): + # Get the current timestamp + # timestamp = int(time.time()) + # pr_string = f"pr_{timestamp}_" + pr_string = "pr_full_" + + # Get the Git branch (assuming it's an environment variable) + git_branch = os.environ.get("DAGSTER_CLOUD_GIT_BRANCH", "") + git_branch_lower = git_branch.lower() + + # Replace non-alphanumeric characters with underscores + git_branch_clean = re.sub(r"[^a-zA-Z0-9]", "_", git_branch_lower) + + # Final result + result = pr_string + git_branch_clean + print(f"setting TPCH_TARGET_SCHEMA = {result}") + os.environ["TPCH_TARGET_SCHEMA"] = result + else: + os.environ["TPCH_TARGET_SCHEMA"] = "schema_not_set" diff --git a/orchestration/pipeline_nsw_doe/__init__.py b/orchestration/pipeline_nsw_doe/__init__.py index f9f7ffc..6a665d9 100644 --- a/orchestration/pipeline_nsw_doe/__init__.py +++ b/orchestration/pipeline_nsw_doe/__init__.py @@ -1,6 +1,4 @@ -import time import os -import re from pathlib import Path from dagster import ( @@ -11,7 +9,6 @@ define_asset_job, in_process_executor, load_assets_from_package_module, - EnvVar, multi_or_in_process_executor, ) from dagster_dbt import DbtCliResource @@ -19,7 +16,6 @@ from dagster_msteams import make_teams_on_run_failure_sensor from dagstermill import ConfigurableLocalOutputNotebookIOManager from dotenv import load_dotenv -from dagster_openai import OpenAIResource from dagster_embedded_elt.dlt import DagsterDltResource @@ -32,32 +28,10 @@ # from .assets import iris,raw,transformation,machine_learning from . import assets from .project import nsw_doe_data_stack_in_a_box_project - +from pipeline_nsw_doe.util.branching import set_schema_name_env load_dotenv() -# work around to set schema when in a branch deployment, DAGSTER_CLOUD_GIT_BRANCH is only present in branch deployments -if ( - "DAGSTER_CLOUD_GIT_BRANCH" in os.environ - and os.getenv("NSW_DOE_DATA_STACK_IN_A_BOX__ENV") != "prod" -): - # Get the current timestamp - timestamp = int(time.time()) - # pr_string = f"pr_{timestamp}_" - pr_string = "pr_full_" - - # Get the Git branch (assuming it's an environment variable) - git_branch = os.environ.get("DAGSTER_CLOUD_GIT_BRANCH", "") - git_branch_lower = git_branch.lower() - - # Replace non-alphanumeric characters with underscores - git_branch_clean = re.sub(r"[^a-zA-Z0-9]", "_", git_branch_lower) - - # Final result - result = pr_string + git_branch_clean - print(f"setting NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA = {result}") - os.environ["NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA"] = result - NSW_DOE_DATA_STACK_IN_A_BOX_DB_PATH_AND_DB = os.getenv( "NSW_DOE_DATA_STACK_IN_A_BOX_DB_PATH_AND_DB" ) @@ -66,10 +40,14 @@ os.environ["NSW_DOE_DATA_STACK_IN_A_BOX_DB_PATH_AND_DB"] ) ) + +set_schema_name_env() + NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA = os.getenv( "NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA" ) + S3_BUCKET_METADATA = os.getenv("S3_BUCKET_METADATA") NSW_DOE_DATA_STACK_IN_A_BOX__ENV = os.getenv("NSW_DOE_DATA_STACK_IN_A_BOX__ENV", "dev") @@ -88,7 +66,6 @@ "dbt": DbtCliResource(project_dir=nsw_doe_data_stack_in_a_box_project), "dlt": DagsterDltResource(), "output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(), - "openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")), "pandas_parquet_io_manager": PandasParquetIOManager( bucket_name=f"{S3_BUCKET_METADATA}", prefix=f"{NSW_DOE_DATA_STACK_IN_A_BOX__ENV}", @@ -102,7 +79,6 @@ "dbt": DbtCliResource(project_dir=nsw_doe_data_stack_in_a_box_project), "dlt": DagsterDltResource(), "output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(), - "openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")), "pandas_parquet_io_manager": PandasParquetIOManager( bucket_name=f"{S3_BUCKET_METADATA}", prefix=f"{NSW_DOE_DATA_STACK_IN_A_BOX__ENV}", @@ -114,7 +90,6 @@ "dbt": DbtCliResource(project_dir=nsw_doe_data_stack_in_a_box_project), "dlt": DagsterDltResource(), "output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(), - "openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")), "pandas_parquet_io_manager": PandasParquetIOManager( bucket_name=f"{S3_BUCKET_METADATA}", prefix=f"{NSW_DOE_DATA_STACK_IN_A_BOX__ENV}", diff --git a/orchestration/pipeline_nsw_doe/assets/raw/assets.py b/orchestration/pipeline_nsw_doe/assets/raw/assets.py index 25292f5..fde8881 100644 --- a/orchestration/pipeline_nsw_doe/assets/raw/assets.py +++ b/orchestration/pipeline_nsw_doe/assets/raw/assets.py @@ -15,15 +15,20 @@ from .schema_masterdataset import schema as schema_masterdataset from .schema_ram import schema as schema_ram -datanswMasterDatasetDagsterType = pandera_schema_to_dagster_type( - schema=schema_masterdataset -) +from ...util.branching import set_schema_name_env -NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA: str = os.getenv( +set_schema_name_env() + +NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA = os.getenv( "NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA", "schema_not_set" ) +datanswMasterDatasetDagsterType = pandera_schema_to_dagster_type( + schema=schema_masterdataset +) + + @asset( compute_kind="python", key_prefix=[NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA], @@ -184,11 +189,15 @@ def raw__nsw_doe_datansw__attendance(): ) def raw__nsw_doe_datansw__apprenticeship_traineeship_training_contract_approvals(): url = "https://data.nsw.gov.au/data/dataset/f7cba3fc-6e9b-4b8b-b1fd-e7dda9b49001/resource/54d2df2f-44ae-4d67-980f-ce855d68f2d5/download/apprenticeship_traineeship_training_contract_approvals-1.xlsx" - df = pd.read_excel(url, sheet_name="Training Type", header=3) + df = pd.read_excel(url, sheet_name="Training Type", header=2) df["_load_timestamp"] = pd.Timestamp("now") df["_source"] = url + df.insert( + 0, "Unnamed: 0", None + ) # fix to get schema the same as prior to fix for https://github.com/wisemuffin/nsw-doe-data-stack-in-a-box/issues/36 + df.head() print(df.shape) print(df.dtypes) @@ -204,11 +213,15 @@ def raw__nsw_doe_datansw__apprenticeship_traineeship_training_contract_approvals ) def raw__nsw_doe_datansw__apprenticeship_traineeship_training_contract_completions(): url = "https://data.nsw.gov.au/data/dataset/f7cba3fc-6e9b-4b8b-b1fd-e7dda9b49001/resource/e969d98e-d89a-474b-b89b-9452f1e45644/download/apprenticeship_traineeship_training_contract_completions.xlsx" - df = pd.read_excel(url, sheet_name="Training Type", header=3) + df = pd.read_excel(url, sheet_name="Training Type", header=2) df["_load_timestamp"] = pd.Timestamp("now") df["_source"] = url + df.insert( + 0, "Unnamed: 0", None + ) # fix to get schema the same as prior to fix for https://github.com/wisemuffin/nsw-doe-data-stack-in-a-box/issues/36 + df.head() print(df.shape) print(df.dtypes) @@ -224,11 +237,15 @@ def raw__nsw_doe_datansw__apprenticeship_traineeship_training_contract_completio ) def raw__nsw_doe_datansw__apprenticeship_traineeship_training_contract_in_training(): url = "https://data.nsw.gov.au/data/dataset/f7cba3fc-6e9b-4b8b-b1fd-e7dda9b49001/resource/fe7169bf-32ba-433b-8354-eb9ef5477eaa/download/apprenticeship_traineeship_training_contract_in-trainings.xlsx" - df = pd.read_excel(url, sheet_name="Training Type", header=3) + df = pd.read_excel(url, sheet_name="Training Type", header=2) df["_load_timestamp"] = pd.Timestamp("now") df["_source"] = url + df.insert( + 0, "Unnamed: 0", None + ) # fix to get schema the same as prior to fix for https://github.com/wisemuffin/nsw-doe-data-stack-in-a-box/issues/36 + df.head() print(df.shape) print(df.dtypes) diff --git a/orchestration/pipeline_nsw_doe/assets/raw/exploritary_data_analysis/exploritary-data-analysis-nsw-doe-entrolments.ipynb b/orchestration/pipeline_nsw_doe/assets/raw/exploritary_data_analysis/exploritary-data-analysis-nsw-doe-entrolments.ipynb index d993f44..54c92de 100644 --- a/orchestration/pipeline_nsw_doe/assets/raw/exploritary_data_analysis/exploritary-data-analysis-nsw-doe-entrolments.ipynb +++ b/orchestration/pipeline_nsw_doe/assets/raw/exploritary_data_analysis/exploritary-data-analysis-nsw-doe-entrolments.ipynb @@ -90,7 +90,6 @@ "metadata": {}, "outputs": [], "source": [ - "import dlt\n", "from dlt.sources.helpers import requests\n", "\n", "# Specify the URL of the API endpoint\n", diff --git a/orchestration/pipeline_nsw_doe/assets/raw_dlt/assets.py b/orchestration/pipeline_nsw_doe/assets/raw_dlt/assets.py index 9bf7615..b4ecc76 100644 --- a/orchestration/pipeline_nsw_doe/assets/raw_dlt/assets.py +++ b/orchestration/pipeline_nsw_doe/assets/raw_dlt/assets.py @@ -17,8 +17,11 @@ from dlt.extract.resource import DltResource from ...dlt_sources.nsw_doe import nsw_doe_data +from ...util.branching import set_schema_name_env -NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA: str = os.getenv( +set_schema_name_env() + +NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA = os.getenv( "NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA", "schema_not_set" ) diff --git a/orchestration/pipeline_nsw_doe/notebooks/data-science-example.ipynb b/orchestration/pipeline_nsw_doe/notebooks/data-science-example.ipynb index 69a6363..da654d0 100644 --- a/orchestration/pipeline_nsw_doe/notebooks/data-science-example.ipynb +++ b/orchestration/pipeline_nsw_doe/notebooks/data-science-example.ipynb @@ -9,9 +9,7 @@ }, "outputs": [], "source": [ - "import matplotlib.pyplot as plt\n", "import pandas as pd\n", - "import seaborn as sns\n", "import dagstermill" ] }, diff --git a/orchestration/pipeline_nsw_doe/project.py b/orchestration/pipeline_nsw_doe/project.py index 2cf8aa0..98d8ef9 100644 --- a/orchestration/pipeline_nsw_doe/project.py +++ b/orchestration/pipeline_nsw_doe/project.py @@ -15,7 +15,9 @@ dbt = DbtCliResource(project_dir=str(project_dir)) -if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD") or not os.path.exists(project_dir): +if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD") or not os.path.exists( + os.path.join(project_dir, "target") +): dbt_parse_invocation = dbt.cli(["parse"]).wait() diff --git a/orchestration/pipeline_nsw_doe/util/__init__.py b/orchestration/pipeline_nsw_doe/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/orchestration/pipeline_nsw_doe/util/branching.py b/orchestration/pipeline_nsw_doe/util/branching.py new file mode 100644 index 0000000..de39f11 --- /dev/null +++ b/orchestration/pipeline_nsw_doe/util/branching.py @@ -0,0 +1,33 @@ +import os +import re + + +def set_schema_name_env(): + """creates the schema name for the variable NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA""" + if os.getenv("NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA"): + """already set as part of environment setup""" + pass + + # work around to set schema when in a branch deployment, DAGSTER_CLOUD_GIT_BRANCH is only present in branch deployments + elif ( + "DAGSTER_CLOUD_GIT_BRANCH" in os.environ + and os.getenv("NSW_DOE_DATA_STACK_IN_A_BOX__ENV") != "prod" + ): + # Get the current timestamp + # timestamp = int(time.time()) + # pr_string = f"pr_{timestamp}_" + pr_string = "pr_full_" + + # Get the Git branch (assuming it's an environment variable) + git_branch = os.environ.get("DAGSTER_CLOUD_GIT_BRANCH", "") + git_branch_lower = git_branch.lower() + + # Replace non-alphanumeric characters with underscores + git_branch_clean = re.sub(r"[^a-zA-Z0-9]", "_", git_branch_lower) + + # Final result + result = pr_string + git_branch_clean + print(f"setting NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA = {result}") + os.environ["NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA"] = result + else: + os.environ["NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA"] = "schema_not_set" diff --git a/orchestration/pipeline_nsw_doe_requires_secrets/__init__.py b/orchestration/pipeline_nsw_doe_requires_secrets/__init__.py index 005a639..8ba2408 100644 --- a/orchestration/pipeline_nsw_doe_requires_secrets/__init__.py +++ b/orchestration/pipeline_nsw_doe_requires_secrets/__init__.py @@ -1,6 +1,4 @@ -import time import os -import re from pathlib import Path from dagster import ( @@ -33,42 +31,26 @@ from . import sensors from .project import nsw_doe_data_stack_in_a_box_project +from .branching import set_schema_name_env load_dotenv() -# work around to set schema when in a branch deployment, DAGSTER_CLOUD_GIT_BRANCH is only present in branch deployments -if ( - "DAGSTER_CLOUD_GIT_BRANCH" in os.environ - and os.getenv("NSW_DOE_DATA_STACK_IN_A_BOX__ENV") != "prod" -): - # Get the current timestamp - timestamp = int(time.time()) - # pr_string = f"pr_{timestamp}_" - pr_string = "pr_full_" - - # Get the Git branch (assuming it's an environment variable) - git_branch = os.environ.get("DAGSTER_CLOUD_GIT_BRANCH", "") - git_branch_lower = git_branch.lower() - - # Replace non-alphanumeric characters with underscores - git_branch_clean = re.sub(r"[^a-zA-Z0-9]", "_", git_branch_lower) - - # Final result - result = pr_string + git_branch_clean - print(f"setting NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA = {result}") - os.environ["NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA"] = result - NSW_DOE_DATA_STACK_IN_A_BOX_DB_PATH_AND_DB = os.getenv( "NSW_DOE_DATA_STACK_IN_A_BOX_DB_PATH_AND_DB" ) + +set_schema_name_env() + +NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA = os.getenv( + "NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA" +) + DUCKDB_PROJECT_DIR = str( Path(__file__).parent.parent.parent.joinpath( os.environ["NSW_DOE_DATA_STACK_IN_A_BOX_DB_PATH_AND_DB"] ) ) -NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA = os.getenv( - "NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA" -) + S3_BUCKET_METADATA = os.getenv("S3_BUCKET_METADATA") diff --git a/orchestration/pipeline_nsw_doe_requires_secrets/assets/raw_dlt/assets.py b/orchestration/pipeline_nsw_doe_requires_secrets/assets/raw_dlt/assets.py index 2c3bea2..4a13aff 100644 --- a/orchestration/pipeline_nsw_doe_requires_secrets/assets/raw_dlt/assets.py +++ b/orchestration/pipeline_nsw_doe_requires_secrets/assets/raw_dlt/assets.py @@ -18,14 +18,19 @@ from ...dlt_sources.github import github_reactions, github_repo_events from ...dlt_sources.google_analytics import google_analytics -# dlt_configuration_path = file_relative_path(__file__, "../../dlt_sources/dlt_configuration.yaml") -# dlt_configuration = yaml.safe_load(open(dlt_configuration_path)) +from pipeline_nsw_doe_requires_secrets import set_schema_name_env -NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA: str = os.getenv( +set_schema_name_env() + +NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA = os.getenv( "NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA", "schema_not_set" ) +# dlt_configuration_path = file_relative_path(__file__, "../../dlt_sources/dlt_configuration.yaml") +# dlt_configuration = yaml.safe_load(open(dlt_configuration_path)) + + class GithubDagsterDltTranslator(DagsterDltTranslator): @public def get_auto_materialize_policy( diff --git a/orchestration/pipeline_nsw_doe_requires_secrets/assets/semantic_layer/assets.py b/orchestration/pipeline_nsw_doe_requires_secrets/assets/semantic_layer/assets.py index 776673f..ad89e25 100644 --- a/orchestration/pipeline_nsw_doe_requires_secrets/assets/semantic_layer/assets.py +++ b/orchestration/pipeline_nsw_doe_requires_secrets/assets/semantic_layer/assets.py @@ -8,6 +8,10 @@ from ...project import nsw_doe_data_stack_in_a_box_project +from pipeline_nsw_doe import set_schema_name_env + +set_schema_name_env() + NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA = os.getenv( "NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA", "schema_not_set" ) diff --git a/orchestration/pipeline_nsw_doe_requires_secrets/branching.py b/orchestration/pipeline_nsw_doe_requires_secrets/branching.py new file mode 100644 index 0000000..de39f11 --- /dev/null +++ b/orchestration/pipeline_nsw_doe_requires_secrets/branching.py @@ -0,0 +1,33 @@ +import os +import re + + +def set_schema_name_env(): + """creates the schema name for the variable NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA""" + if os.getenv("NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA"): + """already set as part of environment setup""" + pass + + # work around to set schema when in a branch deployment, DAGSTER_CLOUD_GIT_BRANCH is only present in branch deployments + elif ( + "DAGSTER_CLOUD_GIT_BRANCH" in os.environ + and os.getenv("NSW_DOE_DATA_STACK_IN_A_BOX__ENV") != "prod" + ): + # Get the current timestamp + # timestamp = int(time.time()) + # pr_string = f"pr_{timestamp}_" + pr_string = "pr_full_" + + # Get the Git branch (assuming it's an environment variable) + git_branch = os.environ.get("DAGSTER_CLOUD_GIT_BRANCH", "") + git_branch_lower = git_branch.lower() + + # Replace non-alphanumeric characters with underscores + git_branch_clean = re.sub(r"[^a-zA-Z0-9]", "_", git_branch_lower) + + # Final result + result = pr_string + git_branch_clean + print(f"setting NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA = {result}") + os.environ["NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA"] = result + else: + os.environ["NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA"] = "schema_not_set" diff --git a/orchestration/pipeline_nsw_doe_requires_secrets/notebooks/data-science-example.ipynb b/orchestration/pipeline_nsw_doe_requires_secrets/notebooks/data-science-example.ipynb index 69a6363..da654d0 100644 --- a/orchestration/pipeline_nsw_doe_requires_secrets/notebooks/data-science-example.ipynb +++ b/orchestration/pipeline_nsw_doe_requires_secrets/notebooks/data-science-example.ipynb @@ -9,9 +9,7 @@ }, "outputs": [], "source": [ - "import matplotlib.pyplot as plt\n", "import pandas as pd\n", - "import seaborn as sns\n", "import dagstermill" ] }, diff --git a/orchestration/pipeline_nsw_doe_requires_secrets/project.py b/orchestration/pipeline_nsw_doe_requires_secrets/project.py index 2cf8aa0..98d8ef9 100644 --- a/orchestration/pipeline_nsw_doe_requires_secrets/project.py +++ b/orchestration/pipeline_nsw_doe_requires_secrets/project.py @@ -15,7 +15,9 @@ dbt = DbtCliResource(project_dir=str(project_dir)) -if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD") or not os.path.exists(project_dir): +if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD") or not os.path.exists( + os.path.join(project_dir, "target") +): dbt_parse_invocation = dbt.cli(["parse"]).wait() diff --git a/orchestration/test_pipeline_nsw_doe/test_pipelines.py b/orchestration/test_pipeline_nsw_doe/test_pipelines.py new file mode 100644 index 0000000..d8e48a7 --- /dev/null +++ b/orchestration/test_pipeline_nsw_doe/test_pipelines.py @@ -0,0 +1,14 @@ +def test_pipeline(): + assert 1 == 1 + + +# def test_dop_code_location(): +# """Ensure that the code location can be loaded without error.""" + + +# # dbt_resource.cli(["deps"]).wait() +# dbt.cli(["parse"]).wait() + +# manifest_path = nsw_doe_data_stack_in_a_box_project.manifest_path + +# assert manifest_path.exists() diff --git a/orchestration/workspace.yaml b/orchestration/workspace.yaml index 983fe27..7d139b8 100644 --- a/orchestration/workspace.yaml +++ b/orchestration/workspace.yaml @@ -13,5 +13,12 @@ load_from: # port: 4003 # location_name: "pipeline_nsw_doe" - python_module: pipeline_nsw_doe - # - python_module: demo_pipeline_jaffle_shop + + # # enables these two when doing local development. But when merging into main comment them out + # # so that they dont confuse users of the code space as you cant run jobs together accross code spaces with 1 materialise command. + # # the requires secret one also requires env i wont share + # - python_module: demo_pipeline_scaling_tpch # - python_module: pipeline_nsw_doe_requires_secrets + + + # - python_module: demo_pipeline_jaffle_shop diff --git a/pyproject.toml b/pyproject.toml index c9935fc..cff0e14 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ exclude = [ "node_modules", "site-packages", "venv", + ".ipynb", # excluding .ipynb as having some issues where they dont get checked locally but do in ci ] # Same as Black. @@ -76,3 +77,12 @@ docstring-code-format = false # This only has an effect when the `docstring-code-format` setting is # enabled. docstring-code-line-length = "dynamic" + + +[tool.pytest.ini_options] +minversion = "6.0" +addopts = "-ra -q" +testpaths = [ + "orchestration", + +] diff --git a/reports/sources/demo_jaffle_shop__dev/.gitkeep b/reports/sources/demo_jaffle_shop__dev/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/reports/sources/demo_tpch__dev/.gitkeep b/reports/sources/demo_tpch__dev/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/reports/sources/nsw_doe_data_stack_in_a_box/.gitkeep b/reports/sources/nsw_doe_data_stack_in_a_box/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/requirements.in b/requirements.in index d93babc..fa916c6 100644 --- a/requirements.in +++ b/requirements.in @@ -1,11 +1,12 @@ dbt-duckdb>=1.8.0 -dbt-metricflow[duckdb]==0.7.1 +dbt-metricflow[duckdb] ipykernel -duckdb>=1.0.0 #latest version for mother duck +duckdb==1.1.1 #latest version for mother duck # dagster deps +dagster dagster-dbt dagster-duckdb dagster-duckdb-pandas @@ -44,8 +45,7 @@ ydata_profiling ipywidgets # dlt -dagster-embedded-elt -# dlt[motherduck] # cant use motherduck yet with dlt if duckdb > 0.10 +dagster-embedded-elt #==0.23.11 # having some issues with later versions and dlt google-analytics-data google-api-python-client google-auth-oauthlib diff --git a/requirements.txt b/requirements.txt index 8b2d509..194b960 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,29 +1,31 @@ # This file was autogenerated by uv via the following command: # uv pip compile requirements.in -o requirements.txt -agate==1.7.1 +agate==1.9.1 # via # dbt-adapters # dbt-common # dbt-core -aiobotocore==2.13.0 +aiobotocore==2.15.2 # via s3fs -aiohttp==3.9.5 +aiohappyeyeballs==2.4.3 + # via aiohttp +aiohttp==3.10.10 # via # aiobotocore # langchain # langchain-community # s3fs -aioitertools==0.11.0 +aioitertools==0.12.0 # via aiobotocore aiosignal==1.3.1 # via aiohttp -alembic==1.13.1 +alembic==1.13.3 # via dagster aniso8601==9.0.1 # via graphene ansicolors==1.1.8 # via papermill -anyio==4.3.0 +anyio==4.6.2.post1 # via # gql # httpx @@ -36,13 +38,13 @@ asttokens==2.4.1 # via stack-data astunparse==1.6.3 # via dlt -attrs==23.2.0 +attrs==24.2.0 # via # aiohttp # jsonschema # referencing # visions -babel==2.15.0 +babel==2.16.0 # via agate backoff==2.2.1 # via @@ -52,25 +54,29 @@ beautifulsoup4==4.12.3 # via nbconvert bitmath==1.3.3.1 # via noteable-origami -black==24.4.2 +black==24.10.0 # via pandera bleach==6.1.0 # via nbconvert -boto3==1.34.106 +boto3==1.35.36 + # via dagster-aws +boto3-stubs-lite==1.35.43 # via dagster-aws -botocore==1.34.106 +botocore==1.35.36 # via # aiobotocore # boto3 # s3transfer -cachetools==5.3.3 +botocore-stubs==1.35.43 + # via boto3-stubs-lite +cachetools==5.5.0 # via google-auth -certifi==2024.2.2 +certifi==2024.8.30 # via # httpcore # httpx # requests -cffi==1.16.0 +cffi==1.17.1 # via cryptography cfgv==3.4.0 # via pre-commit @@ -79,7 +85,7 @@ chardet==5.2.0 # diff-cover # frictionless # sqlfluff -charset-normalizer==3.3.2 +charset-normalizer==3.4.0 # via requests click==8.1.7 # via @@ -97,7 +103,7 @@ click==8.1.7 # uvicorn cloudpickle==2.2.1 # via papermill-origami -cmdstanpy==1.2.2 +cmdstanpy==1.2.4 # via prophet colorama==0.4.6 # via @@ -111,11 +117,11 @@ comm==0.2.2 # via # ipykernel # ipywidgets -contourpy==1.2.1 +contourpy==1.3.0 # via matplotlib -croniter==2.0.5 +croniter==3.0.3 # via dagster -cryptography==42.0.7 +cryptography==43.0.1 # via # jwt # noteable-origami @@ -126,7 +132,7 @@ dacite==1.8.1 # via ydata-profiling daff==1.3.46 # via dbt-core -dagster==1.7.11 +dagster==1.8.12 # via # dagster-aws # dagster-cloud @@ -141,46 +147,46 @@ dagster==1.7.11 # dagster-pandera # dagster-webserver # dagstermill -dagster-aws==0.23.11 -dagster-cloud==1.7.11 -dagster-cloud-cli==1.7.11 +dagster-aws==0.24.12 +dagster-cloud==1.8.12 +dagster-cloud-cli==1.8.12 # via dagster-cloud -dagster-dbt==0.23.11 -dagster-duckdb==0.23.11 +dagster-dbt==0.24.12 +dagster-duckdb==0.24.12 # via dagster-duckdb-pandas -dagster-duckdb-pandas==0.23.11 -dagster-embedded-elt==0.23.11 -dagster-graphql==1.7.11 +dagster-duckdb-pandas==0.24.12 +dagster-embedded-elt==0.24.12 +dagster-graphql==1.8.12 # via dagster-webserver -dagster-msteams==0.23.11 -dagster-openai==0.23.11 -dagster-pandera==0.23.11 -dagster-pipes==1.7.11 +dagster-msteams==0.24.12 +dagster-openai==0.24.12 +dagster-pandera==0.24.12 +dagster-pipes==1.8.12 # via dagster -dagster-webserver==1.7.11 -dagstermill==0.23.11 -dataclasses-json==0.6.6 +dagster-webserver==1.8.12 +dagstermill==0.24.12 +dataclasses-json==0.6.7 # via # langchain # langchain-community -dbt-adapters==1.3.1 +dbt-adapters==1.7.0 # via # dbt-core # dbt-duckdb dbt-artifacts-parser==0.6.0 # via dbterd -dbt-common==1.4.0 +dbt-common==1.11.0 # via # dbt-adapters # dbt-core # dbt-duckdb -dbt-core==1.8.3 +dbt-core==1.8.7 # via # dagster-dbt # dbt-duckdb # dbt-metricflow # sqlfluff-templater-dbt -dbt-duckdb==1.8.1 +dbt-duckdb==1.9.0 dbt-extractor==0.5.1 # via dbt-core dbt-metricflow==0.7.1 @@ -189,8 +195,8 @@ dbt-semantic-interfaces==0.5.1 # dbt-core # dbt-metricflow # metricflow -dbterd==1.13.5 -debugpy==1.8.1 +dbterd==1.18.0 +debugpy==1.8.7 # via ipykernel decorator==5.1.1 # via ipython @@ -198,19 +204,19 @@ deepdiff==7.0.1 # via dbt-common defusedxml==0.7.1 # via nbconvert -diff-cover==9.0.0 +diff-cover==9.2.0 # via sqlfluff diff-match-patch==20200713 # via noteable-origami -distlib==0.3.8 +distlib==0.3.9 # via virtualenv distro==1.9.0 # via openai -dlt==0.4.3 +dlt==1.2.0 # via dagster-embedded-elt docstring-parser==0.16 # via dagster -duckdb==1.0.0 +duckdb==1.1.1 # via # dagster-duckdb # dbt-duckdb @@ -220,16 +226,16 @@ entrypoints==0.4 # papermill et-xmlfile==1.1.0 # via openpyxl -executing==2.0.1 +executing==2.1.0 # via stack-data -faiss-cpu==1.8.0 -fastjsonschema==2.19.1 +faiss-cpu==1.9.0 +fastjsonschema==2.20.0 # via nbformat -filelock==3.14.0 +filelock==3.16.1 # via # dagster # virtualenv -fonttools==4.51.0 +fonttools==4.54.1 # via matplotlib frictionless==4.40.8 # via pandera @@ -237,7 +243,7 @@ frozenlist==1.4.1 # via # aiohttp # aiosignal -fsspec==2024.6.0 +fsspec==2024.9.0 # via # dlt # s3fs @@ -250,13 +256,13 @@ gitpython==3.1.43 # via dlt giturlparse==0.12.0 # via dlt -google-analytics-data==0.18.7 -google-api-core==2.19.0 +google-analytics-data==0.18.12 +google-api-core==2.21.0 # via # google-analytics-data # google-api-python-client -google-api-python-client==2.127.0 -google-auth==2.29.0 +google-api-python-client==2.149.0 +google-auth==2.35.0 # via # google-analytics-data # google-api-core @@ -265,8 +271,8 @@ google-auth==2.29.0 # google-auth-oauthlib google-auth-httplib2==0.2.0 # via google-api-python-client -google-auth-oauthlib==1.2.0 -googleapis-common-protos==1.63.0 +google-auth-oauthlib==1.2.1 +googleapis-common-protos==1.65.0 # via # google-api-core # grpcio-status @@ -274,7 +280,7 @@ gql==3.5.0 # via dagster-graphql graphene==3.3 # via dagster-graphql -graphql-core==3.2.3 +graphql-core==3.2.5 # via # gql # graphene @@ -283,17 +289,17 @@ graphql-relay==3.2.0 # via graphene graphviz==0.20.3 # via metricflow -greenlet==3.0.3 +greenlet==3.1.1 # via sqlalchemy -grpcio==1.64.0 +grpcio==1.67.0 # via # dagster # google-api-core # grpcio-health-checking # grpcio-status -grpcio-health-checking==1.62.2 +grpcio-health-checking==1.62.3 # via dagster -grpcio-status==1.62.2 +grpcio-status==1.62.3 # via google-api-core h11==0.14.0 # via @@ -301,31 +307,32 @@ h11==0.14.0 # uvicorn halo==0.0.31 # via dbt-metricflow -hexbytes==1.2.0 +hexbytes==1.2.1 # via dlt -holidays==0.49 +holidays==0.58 # via prophet htmlmin==0.1.12 # via ydata-profiling -httpcore==1.0.5 +httpcore==1.0.6 # via httpx httplib2==0.22.0 # via # google-api-python-client # google-auth-httplib2 -httptools==0.6.1 +httptools==0.6.4 # via uvicorn -httpx==0.27.0 +httpx==0.27.2 # via + # langsmith # noteable-origami # openai humanfriendly==10.0 # via coloredlogs -humanize==4.9.0 +humanize==4.11.0 # via dlt -identify==2.5.36 +identify==2.6.1 # via pre-commit -idna==3.7 +idna==3.10 # via # anyio # httpx @@ -337,20 +344,20 @@ imagehash==4.3.1 # ydata-profiling importlib-metadata==6.11.0 # via dbt-semantic-interfaces -importlib-resources==6.4.0 +importlib-resources==6.4.5 # via prophet iniconfig==2.0.0 # via pytest -ipykernel==6.29.4 +ipykernel==6.29.5 # via dagstermill -ipython==8.24.0 +ipython==8.28.0 # via # ipykernel # ipywidgets # scrapbook ipython-genutils==0.2.0 # via dagstermill -ipywidgets==8.1.2 +ipywidgets==8.1.5 isodate==0.6.1 # via # agate @@ -375,6 +382,8 @@ jinja2==3.1.4 # ydata-profiling jinja2-simple-tags==0.6.1 # via sqlfluff-templater-dbt +jiter==0.6.1 + # via openai jmespath==1.0.1 # via # boto3 @@ -387,18 +396,18 @@ jsonpatch==1.33 # via # langchain # langchain-core -jsonpath-ng==1.6.1 +jsonpath-ng==1.7.0 # via dlt -jsonpointer==2.4 +jsonpointer==3.0.0 # via jsonpatch -jsonschema==4.22.0 +jsonschema==4.23.0 # via # dbt-common # dbt-semantic-interfaces # frictionless # nbformat # scrapbook -jsonschema-specifications==2023.12.1 +jsonschema-specifications==2024.10.1 # via jsonschema jupyter-client==7.4.9 # via @@ -414,11 +423,11 @@ jupyter-core==5.7.2 # nbformat jupyterlab-pygments==0.3.0 # via nbconvert -jupyterlab-widgets==3.0.10 +jupyterlab-widgets==3.0.13 # via ipywidgets jwt==1.3.1 # via noteable-origami -kiwisolver==1.4.5 +kiwisolver==1.4.7 # via matplotlib langchain==0.1.11 langchain-community==0.0.38 @@ -430,7 +439,7 @@ langchain-core==0.1.52 # langchain-text-splitters langchain-text-splitters==0.0.2 # via langchain -langsmith==0.1.63 +langsmith==0.1.136 # via # langchain # langchain-community @@ -443,23 +452,22 @@ log-symbols==0.0.14 # via halo logbook==1.5.3 # via dbt-core -makefun==1.15.2 +makefun==1.15.6 # via dlt mako==1.3.5 # via alembic markdown-it-py==3.0.0 # via rich -marko==2.0.3 +marko==2.1.2 # via frictionless -markupsafe==2.1.5 +markupsafe==3.0.1 # via # jinja2 # mako # nbconvert - # sqlfluff-templater-dbt -marshmallow==3.21.2 +marshmallow==3.23.0 # via dataclasses-json -mashumaro==3.13 +mashumaro==3.13.1 # via # dbt-adapters # dbt-common @@ -487,9 +495,9 @@ more-itertools==10.1.0 # via # dbt-semantic-interfaces # metricflow -msgpack==1.0.8 +msgpack==1.1.0 # via mashumaro -multidict==6.0.5 +multidict==6.1.0 # via # aiohttp # yarl @@ -498,6 +506,14 @@ multimethod==1.10 # pandera # visions # ydata-profiling +mypy-boto3-ecs==1.35.43 + # via boto3-stubs-lite +mypy-boto3-emr==1.35.39 + # via boto3-stubs-lite +mypy-boto3-emr-serverless==1.35.25 + # via boto3-stubs-lite +mypy-boto3-glue==1.35.25 + # via boto3-stubs-lite mypy-extensions==1.0.0 # via # black @@ -518,12 +534,12 @@ nest-asyncio==1.6.0 # via # ipykernel # jupyter-client -networkx==3.3 +networkx==3.4.1 # via # dagster-dbt # dbt-core # visions -nodeenv==1.8.0 +nodeenv==1.9.1 # via pre-commit noteable-origami==0.0.35 # via papermill-origami @@ -533,7 +549,6 @@ numpy==1.25.2 # via # cmdstanpy # contourpy - # dagster-pandera # faiss-cpu # imagehash # langchain @@ -557,12 +572,12 @@ numpy==1.25.2 # ydata-profiling oauthlib==3.2.2 # via requests-oauthlib -openai==1.30.3 +openai==1.52.0 # via dagster-openai -openpyxl==3.1.2 +openpyxl==3.1.5 ordered-set==4.1.0 # via deepdiff -orjson==3.10.3 +orjson==3.10.7 # via # dagster-dbt # dlt @@ -578,6 +593,7 @@ packaging==23.2 # dagstermill # dbt-core # dlt + # faiss-cpu # ipykernel # langchain-core # marshmallow @@ -586,8 +602,9 @@ packaging==23.2 # pandera # plotly # pytest + # requirements-parser # statsmodels -pandas==1.5.3 +pandas==2.0.3 # via # cmdstanpy # dagster-duckdb-pandas @@ -600,7 +617,7 @@ pandas==1.5.3 # statsmodels # visions # ydata-profiling -pandera==0.19.3 +pandera==0.20.4 # via dagster-pandera pandocfilters==1.5.1 # via nbconvert @@ -614,57 +631,57 @@ parsedatetime==2.6 # via agate parso==0.8.4 # via jedi -pathspec==0.11.2 +pathspec==0.12.1 # via # black # dbt-common # dbt-core # sqlfluff -pathvalidate==3.2.0 +pathvalidate==3.2.1 # via dlt patsy==0.5.6 # via statsmodels pendulum==3.0.0 - # via - # dagster - # dlt + # via dlt petl==1.7.15 # via frictionless -pex==2.3.1 +pex==2.20.3 # via dagster-cloud pexpect==4.9.0 # via ipython phik==0.12.4 # via ydata-profiling -pillow==10.3.0 +pillow==11.0.0 # via # imagehash # matplotlib # visions # wordcloud -platformdirs==4.2.2 +platformdirs==4.3.6 # via # black # jupyter-core # virtualenv -plotly==5.22.0 +plotly==5.24.1 pluggy==1.5.0 # via # diff-cover # pytest ply==3.11 # via jsonpath-ng -pre-commit==3.7.1 -prompt-toolkit==3.0.43 +pre-commit==4.0.1 +prompt-toolkit==3.0.48 # via # ipython # questionary -prophet==1.1.5 -proto-plus==1.23.0 +propcache==0.2.0 + # via yarl +prophet==1.1.6 +proto-plus==1.24.0 # via # google-analytics-data # google-api-core -protobuf==4.25.3 +protobuf==4.25.5 # via # dagster # dbt-adapters @@ -676,23 +693,23 @@ protobuf==4.25.3 # grpcio-health-checking # grpcio-status # proto-plus -psutil==5.9.8 +psutil==6.1.0 # via ipykernel ptyprocess==0.7.0 # via pexpect -pure-eval==0.2.2 +pure-eval==0.2.3 # via stack-data -pyarrow==16.1.0 +pyarrow==17.0.0 # via scrapbook -pyasn1==0.6.0 +pyasn1==0.6.1 # via # pyasn1-modules # rsa -pyasn1-modules==0.4.0 +pyasn1-modules==0.4.1 # via google-auth pycparser==2.22 # via cffi -pydantic==1.10.15 +pydantic==1.10.18 # via # dagster # dbt-artifacts-parser @@ -704,7 +721,6 @@ pydantic==1.10.15 # noteable-origami # openai # pandera - # sqlfluff-templater-dbt # ydata-profiling pygments==2.18.0 # via @@ -712,19 +728,18 @@ pygments==2.18.0 # ipython # nbconvert # rich -pyjwt==2.8.0 +pyjwt==2.9.0 # via github3-py -pyparsing==3.1.2 +pyparsing==3.2.0 # via # httplib2 # matplotlib -pytest==8.2.1 +pytest==8.3.3 # via sqlfluff python-dateutil==2.9.0.post0 # via # botocore # croniter - # dagster # dbt-common # dbt-semantic-interfaces # frictionless @@ -746,7 +761,7 @@ python-slugify==8.0.4 # frictionless pytimeparse==1.1.8 # via agate -pytz==2024.1 +pytz==2024.2 # via # croniter # dagster @@ -754,9 +769,9 @@ pytz==2024.1 # dbt-core # dlt # pandas -pywavelets==1.6.0 +pywavelets==1.7.0 # via imagehash -pyyaml==6.0.1 +pyyaml==6.0.2 # via # dagster # dagster-cloud-cli @@ -773,7 +788,7 @@ pyyaml==6.0.1 # sqlfluff # uvicorn # ydata-profiling -pyzmq==26.0.3 +pyzmq==26.2.0 # via # ipykernel # jupyter-client @@ -781,17 +796,17 @@ questionary==1.10.0 # via # dagster-cloud # dagster-cloud-cli -rapidfuzz==3.9.1 +rapidfuzz==3.10.0 # via metricflow referencing==0.35.1 # via # jsonschema # jsonschema-specifications -regex==2024.5.15 +regex==2024.9.11 # via # sqlfluff # tiktoken -requests==2.32.2 +requests==2.32.3 # via # dagster # dagster-aws @@ -821,32 +836,29 @@ requests==2.32.2 requests-oauthlib==2.0.0 # via google-auth-oauthlib requests-toolbelt==1.0.0 - # via gql -requirements-parser==0.9.0 + # via + # gql + # langsmith +requirements-parser==0.11.0 # via dlt rfc3986==2.0.0 # via frictionless -rich==13.7.1 +rich==13.9.2 # via # dagster # dagster-dbt - # sqlfluff-templater-dbt # typer -rpds-py==0.18.1 +rpds-py==0.20.0 # via # jsonschema # referencing rsa==4.9 # via google-auth -ruamel-yaml==0.17.40 - # via sqlfluff-templater-dbt -ruamel-yaml-clib==0.2.8 - # via ruamel-yaml -ruff==0.4.4 -s3fs==2024.6.0 -s3transfer==0.10.1 +ruff==0.7.0 +s3fs==2024.9.0 +s3transfer==0.10.3 # via boto3 -scikit-learn==1.5.0 +scikit-learn==1.5.2 scipy==1.11.4 # via # imagehash @@ -862,16 +874,15 @@ semver==3.0.2 # via dlt sending==0.3.0 # via noteable-origami -setuptools==70.0.0 +setuptools==75.2.0 # via # dagster # dlt - # nodeenv shellingham==1.5.4 # via typer -simpleeval==0.9.13 +simpleeval==1.0.0 # via frictionless -simplejson==3.19.2 +simplejson==3.19.3 # via dlt six==1.16.0 # via @@ -883,9 +894,9 @@ six==1.16.0 # minimal-snowplow-tracker # patsy # python-dateutil -sling==1.2.10 +sling==1.2.20 # via dagster-embedded-elt -sling-linux-amd64==1.2.10 +sling-linux-amd64==1.2.20 # via sling smmap==5.0.1 # via gitdb @@ -894,35 +905,34 @@ sniffio==1.3.1 # anyio # httpx # openai -soupsieve==2.5 +soupsieve==2.6 # via beautifulsoup4 spinners==0.0.24 # via halo -sqlalchemy==2.0.30 +sqlalchemy==2.0.36 # via # alembic # dagster - # dlt # langchain # langchain-community -sqlfluff==3.0.6 +sqlfluff==3.2.4 # via sqlfluff-templater-dbt -sqlfluff-templater-dbt==3.0.6 -sqlglot==24.0.0 +sqlfluff-templater-dbt==3.2.4 +sqlglot==25.25.1 # via dagster-dbt -sqlglotrs==0.2.5 +sqlglotrs==0.2.12 # via sqlglot -sqlparse==0.5.0 +sqlparse==0.5.1 # via dbt-core stack-data==0.6.3 # via ipython -stanio==0.5.0 +stanio==0.5.1 # via cmdstanpy -starlette==0.37.2 +starlette==0.41.0 # via # dagster-graphql # dagster-webserver -statsmodels==0.14.2 +statsmodels==0.14.4 # via ydata-profiling stringcase==1.2.0 # via frictionless @@ -939,7 +949,7 @@ tangled-up-in-unicode==0.2.0 # via visions tblib==3.0.0 # via sqlfluff -tenacity==8.3.0 +tenacity==8.5.0 # via # dlt # langchain @@ -947,28 +957,28 @@ tenacity==8.3.0 # langchain-core # papermill # plotly -termcolor==2.4.0 +termcolor==2.5.0 # via halo text-unidecode==1.3 # via python-slugify threadpoolctl==3.5.0 # via scikit-learn -tiktoken==0.7.0 -time-machine==2.14.1 +tiktoken==0.8.0 +time-machine==2.16.0 # via pendulum tinycss2==1.3.0 # via nbconvert -tomli==2.0.1 +tomli==2.0.2 # via dagster -tomlkit==0.12.5 +tomlkit==0.13.2 # via dlt toposort==1.10 # via dagster -tornado==6.4 +tornado==6.4.1 # via # ipykernel # jupyter-client -tqdm==4.66.4 +tqdm==4.66.5 # via # cmdstanpy # dagster @@ -989,21 +999,26 @@ traitlets==5.14.3 # nbclient # nbconvert # nbformat -typeguard==4.2.1 +typeguard==4.3.0 # via # pandera # ydata-profiling -typer==0.12.3 +typer==0.12.5 # via # dagster-cloud # dagster-cloud-cli # dagster-dbt # frictionless -types-setuptools==69.5.0.20240519 +types-awscrt==0.22.0 + # via botocore-stubs +types-s3transfer==0.10.3 + # via boto3-stubs-lite +types-setuptools==75.1.0.20241014 # via requirements-parser -typing-extensions==4.11.0 +typing-extensions==4.12.2 # via # alembic + # boto3-stubs-lite # dagster # dbt-adapters # dbt-common @@ -1013,10 +1028,13 @@ typing-extensions==4.11.0 # ipython # mashumaro # metricflow + # mypy-boto3-ecs + # mypy-boto3-emr + # mypy-boto3-emr-serverless + # mypy-boto3-glue # openai # pydantic # sqlalchemy - # sqlfluff # typeguard # typer # typing-inspect @@ -1024,11 +1042,12 @@ typing-inspect==0.9.0 # via # dataclasses-json # pandera -tzdata==2024.1 +tzdata==2024.2 # via # dlt + # pandas # pendulum -universal-pathlib==0.2.2 +universal-pathlib==0.2.5 # via dagster update-checker==0.18.0 # via dbt-metricflow @@ -1036,23 +1055,23 @@ uritemplate==4.1.1 # via # github3-py # google-api-python-client -urllib3==1.26.18 +urllib3==2.2.3 # via # botocore # requests -uvicorn==0.29.0 +uvicorn==0.32.0 # via dagster-webserver -uvloop==0.19.0 +uvloop==0.21.0 # via uvicorn -validators==0.28.1 +validators==0.34.0 # via frictionless -virtualenv==20.26.2 +virtualenv==20.26.6 # via pre-commit visions==0.7.5 # via ydata-profiling -watchdog==4.0.0 +watchdog==5.0.3 # via dagster -watchfiles==0.21.0 +watchfiles==0.24.0 # via uvicorn wcwidth==0.2.13 # via prompt-toolkit @@ -1060,13 +1079,13 @@ webencodings==0.5.1 # via # bleach # tinycss2 -websockets==12.0 +websockets==13.1 # via # noteable-origami # uvicorn -wheel==0.43.0 +wheel==0.44.0 # via astunparse -widgetsnbextension==4.0.10 +widgetsnbextension==4.0.13 # via ipywidgets wordcloud==1.9.3 # via ydata-profiling @@ -1074,10 +1093,10 @@ wrapt==1.16.0 # via # aiobotocore # pandera -yarl==1.9.4 +yarl==1.15.4 # via # aiohttp # gql ydata-profiling==4.6.0 -zipp==3.18.2 +zipp==3.20.2 # via importlib-metadata diff --git a/transformation/demo_transformation_scaling_tpch/.gitignore b/transformation/demo_transformation_scaling_tpch/.gitignore new file mode 100644 index 0000000..49f147c --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/.gitignore @@ -0,0 +1,4 @@ + +target/ +dbt_packages/ +logs/ diff --git a/transformation/demo_transformation_scaling_tpch/.user.yml b/transformation/demo_transformation_scaling_tpch/.user.yml new file mode 100644 index 0000000..45f7128 --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/.user.yml @@ -0,0 +1 @@ +id: c8f89e33-f6bc-4506-bb77-a4dbf0055d0d diff --git a/transformation/demo_transformation_scaling_tpch/README.md b/transformation/demo_transformation_scaling_tpch/README.md new file mode 100644 index 0000000..6cc7520 --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/README.md @@ -0,0 +1,44 @@ +# duckdb tpch docs + +https://duckdb.org/docs/extensions/tpch + +# Setup +```bash +duckdb dev.duckdb +ATTACH 'my_catalog.db' AS my_catalog; + +CALL dbgen(sf = 1); +``` + + +```bash +task duck +CREATE SCHEMA tpch; +USE main; +CALL dbgen(sf = 1); +``` + +```sql +DROP TABLE IF EXISTS customer; +DROP TABLE IF EXISTS lineitem; +DROP TABLE IF EXISTS nation; +DROP TABLE IF EXISTS orders; +DROP TABLE IF EXISTS part; +DROP TABLE IF EXISTS partsupp; +DROP TABLE IF EXISTS region; +DROP TABLE IF EXISTS supplier; +``` + +generate a lot of data +```bash +CALL dbgen(sf = 50, children = 100, step = 0); +``` + + +# metriflow examples + +```bash +dbt parse +mf query --metrics gross_item_sales_amount --group-by metric_time__year +mf query --saved-query order_metrics +``` diff --git a/transformation/demo_transformation_scaling_tpch/_demo_queries/data catalog.sql b/transformation/demo_transformation_scaling_tpch/_demo_queries/data catalog.sql new file mode 100644 index 0000000..170f59d --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/_demo_queries/data catalog.sql @@ -0,0 +1,18 @@ +SELECT + TABLE_NAME,* +FROM + INFORMATION_SCHEMA.TABLES + +; + +SELECT + TABLE_NAME, + COLUMN_NAME, + DATA_TYPE +,* +FROM + INFORMATION_SCHEMA.COLUMNS +WHERE + TABLE_NAME = + 'dim__school' +; diff --git a/transformation/demo_transformation_scaling_tpch/_demo_queries/scaling_tests.sql b/transformation/demo_transformation_scaling_tpch/_demo_queries/scaling_tests.sql new file mode 100644 index 0000000..db88a70 --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/_demo_queries/scaling_tests.sql @@ -0,0 +1,7 @@ +select count(*) from tpch.stg__order_item +; + +FROM tpch_queries() +; +FROM tpch_answers() +; diff --git a/transformation/demo_transformation_scaling_tpch/analyses/.gitkeep b/transformation/demo_transformation_scaling_tpch/analyses/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/transformation/demo_transformation_scaling_tpch/analyses/exploritory_analysis.sql b/transformation/demo_transformation_scaling_tpch/analyses/exploritory_analysis.sql new file mode 100644 index 0000000..420fae8 --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/analyses/exploritory_analysis.sql @@ -0,0 +1,24 @@ +-- Build a query to generate a query +SELECT + 'SELECT ''' || table_name || ''' AS table_name, COUNT(*) AS row_count FROM tpch.' || table_name || ' UNION ALL ' +FROM + information_schema.tables +WHERE 1=1 +{# AND table_schema = 'main' #} +AND + table_type = 'BASE TABLE' + +-- After running this, copy the result, remove the last 'UNION ALL', and run the final query + + +SELECT 'dim__date' AS table_name, COUNT(*) AS row_count FROM tpch.dim__date UNION ALL +SELECT 'fct__order' AS table_name, COUNT(*) AS row_count FROM tpch.fct__order UNION ALL +SELECT 'fct__order_item' AS table_name, COUNT(*) AS row_count FROM tpch.fct__order_item UNION ALL +SELECT 'lineitem' AS table_name, COUNT(*) AS row_count FROM tpch.lineitem UNION ALL +SELECT 'nation' AS table_name, COUNT(*) AS row_count FROM tpch.nation UNION ALL +SELECT 'orders' AS table_name, COUNT(*) AS row_count FROM tpch.orders UNION ALL +SELECT 'part' AS table_name, COUNT(*) AS row_count FROM tpch.part UNION ALL +SELECT 'partsupp' AS table_name, COUNT(*) AS row_count FROM tpch.partsupp UNION ALL +SELECT 'prep__order_item' AS table_name, COUNT(*) AS row_count FROM tpch.prep__order_item UNION ALL +SELECT 'region' AS table_name, COUNT(*) AS row_count FROM tpch.region UNION ALL +SELECT 'supplier' AS table_name, COUNT(*) AS row_count FROM tpch.supplier diff --git a/transformation/demo_transformation_scaling_tpch/dbt_project.yml b/transformation/demo_transformation_scaling_tpch/dbt_project.yml new file mode 100644 index 0000000..a2b6f9c --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/dbt_project.yml @@ -0,0 +1,34 @@ + +# Name your project! Project names should contain only lowercase characters +# and underscores. A good package name should reflect your organization's +# name or the intended use of these models +name: 'demo_scaling_tpch' +version: '1.0.0' + +# This setting configures which "profile" dbt uses for this project. +profile: 'demo_scaling_tpch' + +# These configurations specify where dbt should look for different types of files. +# The `model-paths` config, for example, states that models in this project can be +# found in the "models/" directory. You probably won't need to change these! +model-paths: ["models"] +analysis-paths: ["analyses"] +test-paths: ["tests"] +seed-paths: ["seeds"] +macro-paths: ["macros"] +snapshot-paths: ["snapshots"] + +clean-targets: # directories to be removed by `dbt clean` + - "target" + - "dbt_packages" + + +# Configuring models +# Full documentation: https://docs.getdbt.com/docs/configuring-models + +# In this example config, we tell dbt to build all models in the example/ +# directory as views. These settings can be overridden in the individual model +# files using the `{{ config(...) }}` macro. +models: + demo_scaling_tpch: + # Config indicated by + and applies to all files under models/example/ diff --git a/transformation/demo_transformation_scaling_tpch/macros/.gitkeep b/transformation/demo_transformation_scaling_tpch/macros/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/transformation/demo_transformation_scaling_tpch/models/dimensional/dim__customer.sql b/transformation/demo_transformation_scaling_tpch/models/dimensional/dim__customer.sql new file mode 100644 index 0000000..77c1e6c --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/dimensional/dim__customer.sql @@ -0,0 +1,42 @@ +with customers as ( + + select * from {{ ref('stg__customer') }} + +), + +nations as ( + + select * from {{ ref('stg__nation') }} +), + +regions as ( + + select * from {{ ref('stg__region') }} + +), + +final as ( + select + customers.customer_key, + customers.customer_name, + customers.customer_address, + {# nations.nation_key as customer_nation_key, #} + nations.nation_name as customer_nation_name, + {# regions.region_key as customer_region_key, #} + regions.region_name as customer_region_name, + customers.customer_phone_number, + customers.customer_account_balance, + customers.customer_market_segment_name + from customers + join nations + on customers.nation_key = nations.nation_key + join regions + on nations.region_key = regions.region_key +) + +select + final.* +from + final +order by + final.customer_key diff --git a/transformation/demo_transformation_scaling_tpch/models/dimensional/dim__date.sql b/transformation/demo_transformation_scaling_tpch/models/dimensional/dim__date.sql new file mode 100644 index 0000000..b33ba53 --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/dimensional/dim__date.sql @@ -0,0 +1,41 @@ +{{ + config( + materialized = 'table', + + ) +}} + +with + +stg__order as ( + select * from {{ ref('stg__order') }} +), +date_prep as ( + + {{ dbt_date.get_date_dimension('1992-01-01', '1998-12-31') }} + + +), + +final as ( + select + -- Surrogate Key + -- its bit over kill to make every date in a fact table require a SK. + -- Lets just join on the natural key yyyy-mm-dd? + {{ dbt_utils.generate_surrogate_key(['date_day']) }} as _meta__dim__date__sk, + {# date_day as _meta__dim__date__sk, #} + + --Date Information + *, + 1 as test, + from date_prep +) +select * from final +{# {{ dbt_audit( + cte_ref="final", + created_by="@daveg", + updated_by="@daveg", + created_date="2024-04-06", + updated_date="2024-04-06" +) }} #} +{# from {{ ref('dim__date') }} where _meta__dim__date__sk = md5('-1') #} diff --git a/transformation/demo_transformation_scaling_tpch/models/dimensional/dim__part.sql b/transformation/demo_transformation_scaling_tpch/models/dimensional/dim__part.sql new file mode 100644 index 0000000..ec6ffe4 --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/dimensional/dim__part.sql @@ -0,0 +1,26 @@ +with parts as ( + + select * from {{ ref('stg__part') }} + +), + +final as ( + + select + parts.part_key, + parts.part_name, + parts.part_manufacturer_name, + parts.part_brand_name, + parts.part_type_name, + parts.part_size, + parts.part_container_desc, + parts.retail_price + from parts +) + +select + final.* +from + final +order by + final.part_key diff --git a/transformation/demo_transformation_scaling_tpch/models/dimensional/dim__supplier.sql b/transformation/demo_transformation_scaling_tpch/models/dimensional/dim__supplier.sql new file mode 100644 index 0000000..fdd0429 --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/dimensional/dim__supplier.sql @@ -0,0 +1,42 @@ +with suppliers as ( + + select * from {{ ref('stg__supplier') }} + +), + +nations as ( + + select * from {{ ref('stg__nation') }} +), + +regions as ( + + select * from {{ ref('stg__region') }} + +), + +final as ( + + select + suppliers.supplier_key, + suppliers.supplier_name, + suppliers.supplier_address, + nations.nation_key as supplier_nation_key, + nations.nation_name as supplier_nation_name, + regions.region_key as supplier_region_key, + regions.region_name as supplier_region_name, + suppliers.supplier_phone_number, + suppliers.supplier_account_balance + from suppliers + join nations + on suppliers.nation_key = nations.nation_key + join regions + on nations.region_key = regions.region_key +) + +select + final.* +from + final +order by + final.supplier_key diff --git a/transformation/demo_transformation_scaling_tpch/models/dimensional/fct__order.sql b/transformation/demo_transformation_scaling_tpch/models/dimensional/fct__order.sql new file mode 100644 index 0000000..ac1d4d0 --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/dimensional/fct__order.sql @@ -0,0 +1,77 @@ +{{ + config( + materialized = 'incremental', + unique_key = 'order_key', + on_schema_change = 'sync_all_columns' + + ) +}} +with orders as ( + + select * from {{ ref('stg__order') }} + + {% if is_incremental() %} + + -- this filter will only be applied on an incremental run + where order_date > (select max(order_date) from {{ this }}) + + {% endif %} + +), + +orders_items as ( + + select * from {{ ref('prep__order_item') }} + + {% if is_incremental() %} + + -- this filter will only be applied on an incremental run + where order_date > (select max(order_date) from {{ this }}) + + {% endif %} + +), + +order_item_summary as ( + + select + orders_items.order_key, + sum(orders_items.gross_item_sales_amount) as gross_item_sales_amount, + sum(orders_items.item_discount_amount) as item_discount_amount, + sum(orders_items.item_tax_amount) as item_tax_amount, + sum(orders_items.net_item_sales_amount) as net_item_sales_amount + from orders_items + group by orders_items.order_key +), +final as ( + + select + + orders.order_key, + orders.order_date, + orders.customer_key, + orders.order_status_code, + orders.order_priority_code, + orders.order_clerk_name, + orders.shipping_priority, + 1 as order_count, + 'order status: '|| orders.order_status_code ||' - sales amount: '||order_item_summary.net_item_sales_amount as item_summary, + order_item_summary.gross_item_sales_amount - order_item_summary.item_discount_amount - order_item_summary.item_tax_amount as net_check_amount, + order_item_summary.gross_item_sales_amount, + order_item_summary.item_discount_amount, + order_item_summary.item_tax_amount, + order_item_summary.net_item_sales_amount + from orders + left join order_item_summary + on orders.order_key = order_item_summary.order_key +) +select * +from +final +{# {{ dbt_audit( + cte_ref="final", + created_by="@davidgriffiths", + updated_by="@wisemuffin", + created_date="2024-07-06", + updated_date="2024-07-06" +) }} #} diff --git a/transformation/demo_transformation_scaling_tpch/models/dimensional/fct__order_item.sql b/transformation/demo_transformation_scaling_tpch/models/dimensional/fct__order_item.sql new file mode 100644 index 0000000..6ae4745 --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/dimensional/fct__order_item.sql @@ -0,0 +1,62 @@ +{{ + config( + materialized = 'table' + ) +}} +with orders_items as ( + + select * from {{ ref('prep__order_item') }} + +), + +parts_suppliers as ( + + select * from {{ ref('stg__part_supplier') }} + +), + +final as ( + select + + orders_items.order_item_key, + orders_items.order_key, + orders_items.order_date, + orders_items.customer_key, + orders_items.order_status_code, + + orders_items.part_key, + orders_items.supplier_key, + orders_items.return_status_code, + orders_items.order_line_number, + orders_items.order_line_status_code, + orders_items.ship_date, + orders_items.commit_date, + orders_items.receipt_date, + orders_items.ship_mode_name, + parts_suppliers.supplier_cost_amount, + {# parts_suppliers.retail_price, #} + orders_items.base_price, + orders_items.discount_percentage, + orders_items.discounted_price, + orders_items.tax_rate, + + 1 as order_item_count, + orders_items.quantity, + + orders_items.gross_item_sales_amount, + orders_items.discounted_item_sales_amount, + orders_items.item_discount_amount, + orders_items.item_tax_amount, + orders_items.net_item_sales_amount + + from orders_items + join parts_suppliers + on orders_items.part_key = parts_suppliers.part_key + and orders_items.supplier_key = parts_suppliers.supplier_key +) +select + final.* +from + final +order by + final.order_date diff --git a/transformation/demo_transformation_scaling_tpch/models/prep/prep__order_item.sql b/transformation/demo_transformation_scaling_tpch/models/prep/prep__order_item.sql new file mode 100644 index 0000000..1c8021f --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/prep/prep__order_item.sql @@ -0,0 +1,63 @@ +{{ + config( + materialized = 'table' + ) +}} +with orders as ( + + select * from {{ ref('stg__order') }} + +), + +order_item as ( + + select * from {{ ref('stg__order_item') }} + +) +select + + orders.order_key, + orders.order_date, + orders.customer_key, + orders.order_status_code, + + order_item.part_key, + order_item.supplier_key, + order_item.return_status_code, + order_item.order_line_number, + order_item.order_line_status_code, + order_item.ship_date, + order_item.commit_date, + order_item.receipt_date, + order_item.ship_mode_name, + order_item.quantity, + + -- extended_price is actually the line item total, + -- so we back out the extended price per item + order_item.extended_price as gross_item_sales_amount, + order_item.discount_percentage, + order_item.tax_rate, + + (order_item.extended_price + / nullif(order_item.quantity, 0)) as base_price, + (base_price * (1 - order_item.discount_percentage)) + as discounted_price, + + (order_item.extended_price + * (1 - order_item.discount_percentage)) + as discounted_item_sales_amount, + -- We model discounts as negative amounts + (1 * order_item.extended_price + * order_item.discount_percentage) + as item_discount_amount, + ((gross_item_sales_amount - item_discount_amount) + * order_item.tax_rate) as item_tax_amount, + (gross_item_sales_amount + - item_discount_amount + - item_tax_amount + ) as net_item_sales_amount, + + {{ dbt_utils.generate_surrogate_key(['orders.order_key', 'order_item.order_line_number']) }} as order_item_key +from orders +left join order_item + on orders.order_key = order_item.order_key diff --git a/transformation/demo_transformation_scaling_tpch/models/semantic_layer/customer.yml b/transformation/demo_transformation_scaling_tpch/models/semantic_layer/customer.yml new file mode 100644 index 0000000..5d2b364 --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/semantic_layer/customer.yml @@ -0,0 +1,30 @@ +semantic_models: + - name: customer + description: "Semantic model for customers with nation and region details" + model: ref('dim__customer') + entities: + - name: customer + type: primary + expr: customer_key + dimensions: + - name: customer_name + type: categorical + description: "Name of the customer" + - name: customer_address + type: categorical + description: "Address of the customer" + - name: customer_nation_name + type: categorical + description: "Name of the customer's nation" + - name: customer_region_name + type: categorical + description: "Name of the customer's region" + - name: customer_phone_number + type: categorical + description: "Phone number of the customer" + - name: customer_market_segment_name + type: categorical + description: "Market segment of the customer" + - name: customer_account_balance + description: "Account balance of the customer" + type: categorical diff --git a/transformation/demo_transformation_scaling_tpch/models/semantic_layer/metricflow_time_spine.sql b/transformation/demo_transformation_scaling_tpch/models/semantic_layer/metricflow_time_spine.sql new file mode 100644 index 0000000..1f4036f --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/semantic_layer/metricflow_time_spine.sql @@ -0,0 +1,8 @@ +-- metricflow_time_spine.sql +-- by default, MetricFlow expects the timespine table to be named metricflow_time_spine and doesn't support using a different name. +with final as ( + select *, from {{ ref('dim__date') }} + +) + +select *, from final diff --git a/transformation/demo_transformation_scaling_tpch/models/semantic_layer/order.yml b/transformation/demo_transformation_scaling_tpch/models/semantic_layer/order.yml new file mode 100644 index 0000000..5719c2d --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/semantic_layer/order.yml @@ -0,0 +1,143 @@ +semantic_models: + - name: order + description: "Metrics for orders and order items" + model: ref('fct__order') + entities: + - name: order + type: primary + expr: order_key + - name: customer + type: foreign + expr: customer_key + dimensions: + - name: order_date + type: time + description: "Date of the order" + type_params: + time_granularity: day + - name: order_status_code + type: categorical + description: "Status code of the order" + - name: order_priority_code + type: categorical + description: "Priority code of the order" + - name: order_clerk_name + type: categorical + description: "Name of the order clerk" + - name: shipping_priority + type: categorical + description: "Shipping priority of the order" + - name: item_summary + type: categorical + description: "Summary of the order status and sales amount" + measures: + - name: order_count + description: "Count of orders" + agg: count + expr: 1 + - name: gross_item_sales_amount + description: "Gross sales amount of items" + agg: sum + - name: item_discount_amount + description: "Discount amount on items" + agg: sum + - name: item_tax_amount + description: "Tax amount on items" + agg: sum + - name: net_item_sales_amount + description: "Net sales amount of items" + agg: sum + - name: net_check_amount + description: "Net check amount after discounts and taxes" + agg: sum + defaults: + agg_time_dimension: order_date + +metrics: + - name: order_count + label: Count of orders + type: simple + type_params: + measure: + name: order_count + - name: gross_item_sales_amount + label: Gross sales amount of items + type: simple + type_params: + measure: + name: gross_item_sales_amount + - name: item_discount_amount + label: "Discount amount on items" + type: simple + type_params: + measure: + name: item_discount_amount + - name: item_tax_amount + label: "Tax amount on items" + type: simple + type_params: + measure: + name: item_tax_amount + - name: net_item_sales_amount + label: "Net sales amount of items" + type: simple + type_params: + measure: + name: net_item_sales_amount + - name: net_check_amount + label: "Net check amount after discounts and taxes" + type: simple + type_params: + measure: + name: net_check_amount + + - name: average_order_value + label: average order value + type: ratio + type_params: + numerator: + name: gross_item_sales_amount + denominator: + name: order_count + + - name: average_discount_rate + label: average discount rate + type: ratio + type_params: + numerator: + name: item_discount_amount + denominator: + name: gross_item_sales_amount + + - name: cumulative_gross_item_sales_amount + label: Cumulative gross_item_sales_amount (All-Time) + type: cumulative + type_params: + measure: + name: gross_item_sales_amount + + # need to upgrade dbt + # - name: eight_weekly_gross_item_sales_amount + # label: 8 Weekly gross_item_sales_amount + # type: cumulative + # type_params: + # measure: + # name: gross_item_sales_amount + # cumulative_type_params: + # window: 8 weeks + # period_agg: first + + +saved_queries: + - name: order_metrics_saved_query + label: order metrics saved query + description: Relevant order metrics + query_params: + metrics: + # - eight_weekly_gross_item_sales_amount + - cumulative_gross_item_sales_amount + - gross_item_sales_amount + - average_order_value + group_by: + - TimeDimension('order__order_date', 'year') + - Dimension('customer__customer_name') diff --git a/transformation/demo_transformation_scaling_tpch/models/staging/schema.yml b/transformation/demo_transformation_scaling_tpch/models/staging/schema.yml new file mode 100644 index 0000000..c4f062a --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/staging/schema.yml @@ -0,0 +1,14 @@ +version: 2 + +sources: + - name: tpch + schema: tpch + tables: + - name: customer + - name: lineitem + - name: nation + - name: orders + - name: part + - name: partsupp + - name: region + - name: supplier diff --git a/transformation/demo_transformation_scaling_tpch/models/staging/stg__customer.sql b/transformation/demo_transformation_scaling_tpch/models/staging/stg__customer.sql new file mode 100644 index 0000000..ef7296f --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/staging/stg__customer.sql @@ -0,0 +1,17 @@ +with source as ( + select * from {{ source('tpch', 'customer') }} +), +renamed as ( + select + {{ adapter.quote("c_custkey") }} as customer_key, + {{ adapter.quote("c_name") }} as customer_name, + {{ adapter.quote("c_address") }} as customer_address, + {{ adapter.quote("c_nationkey") }} as nation_key, + {{ adapter.quote("c_phone") }} as customer_phone_number, + {{ adapter.quote("c_acctbal") }} as customer_account_balance, + {{ adapter.quote("c_mktsegment") }} as customer_market_segment_name, + {{ adapter.quote("c_comment") }} as customer_comment + + from source +) +select * from renamed diff --git a/transformation/demo_transformation_scaling_tpch/models/staging/stg__nation.sql b/transformation/demo_transformation_scaling_tpch/models/staging/stg__nation.sql new file mode 100644 index 0000000..5da8282 --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/staging/stg__nation.sql @@ -0,0 +1,13 @@ +with source as ( + select * from {{ source('tpch', 'nation') }} +), +renamed as ( + select + {{ adapter.quote("n_nationkey") }} as nation_key, + {{ adapter.quote("n_name") }} as nation_name, + {{ adapter.quote("n_regionkey") }} as region_key, + {{ adapter.quote("n_comment") }} as nation_comment + + from source +) +select * from renamed diff --git a/transformation/demo_transformation_scaling_tpch/models/staging/stg__order.sql b/transformation/demo_transformation_scaling_tpch/models/staging/stg__order.sql new file mode 100644 index 0000000..3a4a7e6 --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/staging/stg__order.sql @@ -0,0 +1,18 @@ +with source as ( + select * from {{ source('tpch', 'orders') }} +), +renamed as ( + select + {{ adapter.quote("o_orderkey") }} as order_key, + {{ adapter.quote("o_custkey") }} as customer_key, + {{ adapter.quote("o_orderstatus") }} as order_status_code, + {{ adapter.quote("o_totalprice") }} as total_price, + {{ adapter.quote("o_orderdate") }} as order_date, + {{ adapter.quote("o_orderpriority") }} as order_priority_code, + {{ adapter.quote("o_clerk") }} as order_clerk_name, + {{ adapter.quote("o_shippriority") }} as shipping_priority, + {{ adapter.quote("o_comment") }} as order_comment + + from source +) +select * from renamed diff --git a/transformation/demo_transformation_scaling_tpch/models/staging/stg__order_item.sql b/transformation/demo_transformation_scaling_tpch/models/staging/stg__order_item.sql new file mode 100644 index 0000000..a2fb4ab --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/staging/stg__order_item.sql @@ -0,0 +1,25 @@ +with source as ( + select * from {{ source('tpch', 'lineitem') }} +), +renamed as ( + select + {{ adapter.quote("l_orderkey") }} as order_key, + {{ adapter.quote("l_partkey") }} as part_key, + {{ adapter.quote("l_suppkey") }} as supplier_key, + {{ adapter.quote("l_linenumber") }} as order_line_number, + {{ adapter.quote("l_quantity") }} as quantity, + {{ adapter.quote("l_extendedprice") }} as extended_price, + {{ adapter.quote("l_discount") }} as discount_percentage, + {{ adapter.quote("l_tax") }} as tax_rate, + {{ adapter.quote("l_returnflag") }} as return_status_code, + {{ adapter.quote("l_linestatus") }} as order_line_status_code, + {{ adapter.quote("l_shipdate") }} as ship_date, + {{ adapter.quote("l_commitdate") }} as commit_date, + {{ adapter.quote("l_receiptdate") }} as receipt_date, + {{ adapter.quote("l_shipinstruct") }} as ship_instruction, + {{ adapter.quote("l_shipmode") }} as ship_mode_name, + {{ adapter.quote("l_comment") }} as comment + + from source +) +select * from renamed diff --git a/transformation/demo_transformation_scaling_tpch/models/staging/stg__part.sql b/transformation/demo_transformation_scaling_tpch/models/staging/stg__part.sql new file mode 100644 index 0000000..9390d86 --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/staging/stg__part.sql @@ -0,0 +1,18 @@ +with source as ( + select * from {{ source('tpch', 'part') }} +), +renamed as ( + select + {{ adapter.quote("p_partkey") }} as part_key, + {{ adapter.quote("p_name") }} as part_name, + {{ adapter.quote("p_mfgr") }} as part_manufacturer_name, + {{ adapter.quote("p_brand") }} as part_brand_name, + {{ adapter.quote("p_type") }} as part_type_name, + {{ adapter.quote("p_size") }} as part_size, + {{ adapter.quote("p_container") }} as part_container_desc, + {{ adapter.quote("p_retailprice") }} as retail_price, + {{ adapter.quote("p_comment") }} as part_comment + + from source +) +select * from renamed diff --git a/transformation/demo_transformation_scaling_tpch/models/staging/stg__part_supplier.sql b/transformation/demo_transformation_scaling_tpch/models/staging/stg__part_supplier.sql new file mode 100644 index 0000000..087666d --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/staging/stg__part_supplier.sql @@ -0,0 +1,14 @@ +with source as ( + select * from {{ source('tpch', 'partsupp') }} +), +renamed as ( + select + {{ adapter.quote("ps_partkey") }} as part_key, + {{ adapter.quote("ps_suppkey") }} as supplier_key, + {{ adapter.quote("ps_availqty") }} as available_quantaty, + {{ adapter.quote("ps_supplycost") }} as supplier_cost_amount, + {{ adapter.quote("ps_comment") }} as comment + + from source +) +select * from renamed diff --git a/transformation/demo_transformation_scaling_tpch/models/staging/stg__region.sql b/transformation/demo_transformation_scaling_tpch/models/staging/stg__region.sql new file mode 100644 index 0000000..535726c --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/staging/stg__region.sql @@ -0,0 +1,12 @@ +with source as ( + select * from {{ source('tpch', 'region') }} +), +renamed as ( + select + {{ adapter.quote("r_regionkey") }} as region_key, + {{ adapter.quote("r_name") }} as region_name, + {{ adapter.quote("r_comment") }} as region_comment + + from source +) +select * from renamed diff --git a/transformation/demo_transformation_scaling_tpch/models/staging/stg__supplier.sql b/transformation/demo_transformation_scaling_tpch/models/staging/stg__supplier.sql new file mode 100644 index 0000000..d1c3b5b --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/models/staging/stg__supplier.sql @@ -0,0 +1,16 @@ +with source as ( + select * from {{ source('tpch', 'supplier') }} +), +renamed as ( + select + {{ adapter.quote("s_suppkey") }} as supplier_key, + {{ adapter.quote("s_name") }} as supplier_name, + {{ adapter.quote("s_address") }} as supplier_address, + {{ adapter.quote("s_nationkey") }} as nation_key, + {{ adapter.quote("s_phone") }} as supplier_phone_number, + {{ adapter.quote("s_acctbal") }} as supplier_account_balance, + {{ adapter.quote("s_comment") }} as supplier_comment + + from source +) +select * from renamed diff --git a/transformation/demo_transformation_scaling_tpch/package-lock.yml b/transformation/demo_transformation_scaling_tpch/package-lock.yml new file mode 100644 index 0000000..5a8c078 --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/package-lock.yml @@ -0,0 +1,8 @@ +packages: +- package: dbt-labs/dbt_utils + version: 1.0.0 +- package: calogica/dbt_date + version: 0.10.1 +- package: dbt-labs/codegen + version: 0.12.1 +sha1_hash: 83d3710efeeea7070e310e1dd38b88034c15286d diff --git a/transformation/demo_transformation_scaling_tpch/packages.yml b/transformation/demo_transformation_scaling_tpch/packages.yml new file mode 100644 index 0000000..4872702 --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/packages.yml @@ -0,0 +1,8 @@ +packages: + - package: dbt-labs/dbt_utils + version: 1.0.0 + - package: calogica/dbt_date + version: [">=0.8.0"] + # for the latest version tag + - package: dbt-labs/codegen + version: 0.12.1 diff --git a/transformation/demo_transformation_scaling_tpch/profiles.yml b/transformation/demo_transformation_scaling_tpch/profiles.yml new file mode 100644 index 0000000..3164513 --- /dev/null +++ b/transformation/demo_transformation_scaling_tpch/profiles.yml @@ -0,0 +1,19 @@ +demo_scaling_tpch: + outputs: + dev: + type: duckdb + path: "../../{{ env_var('TPCH_DB_PATH_AND_DB') }}" + database: "{{ env_var('TPCH_DB_NAME') }}" + schema: "tpch" + threads: 4 + prod: + type: duckdb + path: "{{ env_var('TPCH_DB_PATH_AND_DB') }}" + schema: "tpch" + threads: 4 + test: + type: duckdb + path: "{{ env_var('TPCH_DB_PATH_AND_DB') }}" + schema: "tpch" + threads: 4 + target: "{{ env_var('TPCH__ENV') }}" diff --git a/transformation/demo_transformation_scaling_tpch/seeds/.gitkeep b/transformation/demo_transformation_scaling_tpch/seeds/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/transformation/demo_transformation_scaling_tpch/snapshots/.gitkeep b/transformation/demo_transformation_scaling_tpch/snapshots/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/transformation/demo_transformation_scaling_tpch/tests/.gitkeep b/transformation/demo_transformation_scaling_tpch/tests/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/transformation/transformation_nsw_doe/analysis/.gitkeep b/transformation/transformation_nsw_doe/analysis/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/transformation/transformation_nsw_doe/dbt_project.yml b/transformation/transformation_nsw_doe/dbt_project.yml index e3ead23..f8d52e0 100644 --- a/transformation/transformation_nsw_doe/dbt_project.yml +++ b/transformation/transformation_nsw_doe/dbt_project.yml @@ -9,7 +9,7 @@ profile: 'nsw_doe_data_stack_in_a_box_profile' vars: truncate_timespan_to: "{{ current_timestamp() }}" "dbt_date:time_zone": "America/Los_Angeles" - + model-paths: ["models"] seed-paths: ["seeds"] test-paths: ["tests"] @@ -41,4 +41,4 @@ models: - "api" # tagging api so i know which models require API keys google_analytics: +tags: - - "api" # tagging api so i know which models require API keys \ No newline at end of file + - "api" # tagging api so i know which models require API keys diff --git a/transformation/transformation_nsw_doe/models/dimensional/facts/fct__attendance.sql b/transformation/transformation_nsw_doe/models/dimensional/facts/fct__attendance.sql index 22146af..3c4aca3 100644 --- a/transformation/transformation_nsw_doe/models/dimensional/facts/fct__attendance.sql +++ b/transformation/transformation_nsw_doe/models/dimensional/facts/fct__attendance.sql @@ -27,7 +27,7 @@ final as ( -- Measures - stg__nsw_doe_datansw__attendance.attendance /100 as attendance, + stg__nsw_doe_datansw__attendance.attendance / 100 as attendance, from stg__nsw_doe_datansw__attendance diff --git a/transformation/transformation_nsw_doe/models/preperation/prep__incident.sql b/transformation/transformation_nsw_doe/models/preperation/prep__incident.sql index 8aac971..cc2963c 100644 --- a/transformation/transformation_nsw_doe/models/preperation/prep__incident.sql +++ b/transformation/transformation_nsw_doe/models/preperation/prep__incident.sql @@ -88,11 +88,12 @@ with unioned_data as ( secondary_category, primary_sub_category, from {{ ref('stg__nsw_doe_datansw__incidents_2022_part_2') }} -) -, final as ( +), + +final as ( select case_number, - cast(STRFTIME(STRPTIME(date_time_opened, '%d/%m/%Y %H:%M'), '%Y-%m-%d %H:%M:%S') as timestamp) as date_time_opened, + cast(strftime(strptime(date_time_opened, '%d/%m/%Y %H:%M'), '%Y-%m-%d %H:%M:%S') as timestamp) as date_time_opened, term, incident_group, operational_directorate, @@ -105,6 +106,7 @@ with unioned_data as ( primary_sub_category, from unioned_data ) + {{ dbt_audit( cte_ref="final", created_by="@daveg",