Skip to content

Commit

Permalink
tpch demo (#29)
Browse files Browse the repository at this point in the history
* tpch demo

* tpch demo

* tpch added metrics and dagster

* cleaning up tpch

* tpch working

* tpch adding in default scaling factor. Can set config at run time

* tpch fixes

* adding analysis and fixing dbt profiles

* testing out adding multiple code repos to worksapce

* testing out act for github actions

* testing out act for github actions

* testing out act for github actions

* testing out act for github actions

* fixing branching for dagster end to end

* changing which workspaces are made available via ui

* fixing issue with dagster not finding target file from dbt for tpch

* fixing issue with dagster not finding target file from dbt for tpch

* fixing issue with dagster not finding target file from dbt for tpch

* fixing issue with dagster not finding target file from dbt for tpch

* fixing issue with dagster not finding target file from dbt for tpch

* fixing issue with dagster not finding target file from dbt for tpch

* fixing issue with dagster not finding target file from dbt for tpch

* fixing issue with dagster not finding target file from dbt for tpch

* fixing issue with dagster not finding target file from dbt for tpch

* fixing issue with dagster not finding target file from dbt for tpch

* fixing issue with dagster not finding target file from dbt for tpch

* fixing issue with dagster not finding target file from dbt for tpch

* fixing issue with dagster not finding target file from dbt for tpch

* fixing codepace missing env variables

* fixing codepace missing env variables

* fixing codepace missing env variables

* fixing broken apprenticeship, header row changed

* fixing issue 36 apprentis completions sechmea change

* removing requires secret from codespace

* fixing issue 36 apprentis completions sechmea change

* fixing issue 36 apprentis completions sechmea change

* fixing issue 36 apprentis completions sechmea change

* fixing ruff

* fixing ruff

* fixing ruff

* fixing ruff

* fixing ruff

* fixing dbt slim ci

* forcing duckdb to not get ahead of mother duck

* sqlfluff fix

* sqlfluff fix

* pytest fix

* fixing slim ci with snapshots

* fixing slim ci with snapshots

* fixing slim ci with snapshots

* fixing slim ci with snapshots

* testing slim ci

* testing slim ci

* fixing apprenticeship schema

* fixing dagster embeded issue with version changing

* fixing dagster embeded issue with version changing

* removing openai from nsw_doe
  • Loading branch information
wisemuffin authored Oct 18, 2024
1 parent 00e0f3f commit 4fbc7ef
Show file tree
Hide file tree
Showing 78 changed files with 1,637 additions and 281 deletions.
7 changes: 7 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@
"NSW_DOE_DATA_STACK_IN_A_BOX_DB_NAME": "nsw_doe_data_stack_in_a_box__dev",
"NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA": "analytics",

"TPCH__ENV": "dev",
"TPCH_DBT_PROJECT_DIR": "transformation/demo_transformation_scaling_tpch",
"TPCH_DB_PATH": "reports/sources/demo_tpch__dev",
"TPCH_DB_PATH_AND_DB": "reports/sources/demo_tpch__dev/demo_tpch__dev.duckdb",
"TPCH_DB_NAME": "demo_tpch__dev",
"TPCH_TARGET_SCHEMA": "analytics",

"DESTINATION__DUCKDB__CREDENTIALS": "duckdb:///////workspaces/nsw-doe-data-stack-in-a-box/reports/sources/nsw_doe_data_stack_in_a_box/nsw_doe_data_stack_in_a_box__dev.duckdb",

"DAGSTER_HOME": "orchestration/dagster-local-file-store",
Expand Down
69 changes: 69 additions & 0 deletions .github/workflows/ci_demo_act_workflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
name: CI Demo ACT
on:
workflow_dispatch:
# pull_request:
# types: [opened, synchronize, reopened, closed]

concurrency:
# Cancel in-progress deploys to same branch
group: ${{ github.ref }}/branch_deployments
cancel-in-progress: true
env:
DAGSTER_CLOUD_URL: "http://wisemuffin.dagster.cloud"
DAGSTER_CLOUD_API_TOKEN: ${{ secrets.DAGSTER_CLOUD_API_TOKEN }}
ENABLE_FAST_DEPLOYS: 'true'
PYTHON_VERSION: '3.11'
DAGSTER_CLOUD_FILE: 'dagster_cloud.yaml'

DAGSTER_CLOUD_ORGANIZATION: "wisemuffin"
DAGSTER_PROJECT_NAME: pipeline_nsw_doe

NSW_DOE_DATA_STACK_IN_A_BOX_DAGSTER_PROJECT_DIR: ${{ vars.NSW_DOE_DATA_STACK_IN_A_BOX_DAGSTER_PROJECT_DIR }}
NSW_DOE_DATA_STACK_IN_A_BOX_DBT_PROJECT_DIR: ${{ vars.NSW_DOE_DATA_STACK_IN_A_BOX_DBT_PROJECT_DIR }}

NSW_DOE_DATA_STACK_IN_A_BOX__ENV: ${{ vars.NSW_DOE_DATA_STACK_IN_A_BOX__ENV }}

NSW_DOE_DATA_STACK_IN_A_BOX_DB_PATH_AND_DB: ${{ secrets.NSW_DOE_DATA_STACK_IN_A_BOX_DB_PATH_AND_DB }}
NSW_DOE_DATA_STACK_IN_A_BOX_DB_NAME: ${{ vars.NSW_DOE_DATA_STACK_IN_A_BOX_DB_NAME }}

MOTHERDUCK_TOKEN: ${{ secrets.MOTHERDUCK_TOKEN }}
DATAFOLD_APIKEY: ${{ secrets.DATAFOLD_APIKEY }}

DAGSTER_HOME: ${{ vars.DAGSTER_HOME }}

AWS_ROLE_TO_ASSUME: ${{ secrets.AWS_ROLE_TO_ASSUME }}
S3_BUCKET_METADATA: ${{ secrets.S3_BUCKET_METADATA }}

DESTINATION__DUCKDB__CREDENTIALS: ${{ secrets.DESTINATION__DUCKDB__CREDENTIALS }}

SOURCES__GITHUB__ACCESS_TOKEN: ${{ secrets.SOURCES__GITHUB__ACCESS_TOKEN }}

SOURCES__GOOGLE_ANALYTICS__CREDENTIALS__PROJECT_ID: ${{ secrets.SOURCES__GOOGLE_ANALYTICS__CREDENTIALS__PROJECT_ID }}
SOURCES__GOOGLE_ANALYTICS__CREDENTIALS__CLIENT_EMAIL: ${{ secrets.SOURCES__GOOGLE_ANALYTICS__CREDENTIALS__CLIENT_EMAIL }}
SOURCES__GOOGLE_ANALYTICS__CREDENTIALS__PRIVATE_KEY: ${{ secrets.SOURCES__GOOGLE_ANALYTICS__CREDENTIALS__PRIVATE_KEY }}
SOURCES__GOOGLE_ANALYTICS__PROPERTY_ID: ${{ secrets.SOURCES__GOOGLE_ANALYTICS__PROPERTY_ID }}

jobs:
act_job:
name: ACT job
runs-on: ubuntu-20.04
environment: test
outputs:
build_info: ${{ steps.parse-workspace.outputs.build_info }}

steps:
- name: Setup Python
uses: actions/[email protected]
with:
python-version: "3.11.x"
# - name: Prerun Checks
# id: prerun
# uses: dagster-io/dagster-cloud-action/actions/utils/[email protected]

- name: Set NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA
run: |
head_ref=$(echo ${GITHUB_HEAD_REF} | tr '[:upper:]' '[:lower:]' | sed -e 's/[^a-zA-Z0-9]/_/g')
echo head_ref = $head_ref
echo GITHUB_HEAD_REF = ${GITHUB_HEAD_REF}
echo "NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA=pr_full_${head_ref}" >> $GITHUB_ENV
echo schema = $NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA
3 changes: 2 additions & 1 deletion .github/workflows/ci_dev_quick_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
with:
python-version: "3.11.x"
- name: check python with ruff
uses: chartboost/ruff-action@v1
uses: astral-sh/ruff-action@v1
- name: Setup Node
uses: actions/[email protected]
with:
Expand All @@ -68,6 +68,7 @@ jobs:
- name: Lint SQL
run: |
echo schema $NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA
cd ${{github.workspace}}/transformation/transformation_nsw_doe
sqlfluff lint --format github-annotation-native
# required to set path for pytest
- name: set pythonpath
Expand Down
34 changes: 31 additions & 3 deletions .github/workflows/ci_test_branch_deployments_end_to_end.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ env:
SOURCES__GOOGLE_ANALYTICS__CREDENTIALS__PRIVATE_KEY: ${{ secrets.SOURCES__GOOGLE_ANALYTICS__CREDENTIALS__PRIVATE_KEY }}
SOURCES__GOOGLE_ANALYTICS__PROPERTY_ID: ${{ secrets.SOURCES__GOOGLE_ANALYTICS__PROPERTY_ID }}


TPCH__ENV: ${{ vars.TPCH__ENV}}
TPCH_DBT_PROJECT_DIR: ${{ vars.TPCH_DBT_PROJECT_DIR}}
TPCH_DB_PATH_AND_DB: ${{ secrets.TPCH_DB_PATH_AND_DB}}
TPCH_DB_NAME: ${{ vars.TPCH_DB_NAME}}

jobs:
dagster_cloud_default_deploy:
name: Dagster Serverless Deploy
Expand All @@ -61,8 +67,15 @@ jobs:

- name: Set NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA
run: |
echo "NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA=pr_full_$(echo ${GITHUB_HEAD_REF} | tr '[:upper:]' '[:lower:]' | sed -e 's/[^a-zA-Z0-9]/_/g')" >> $GITHUB_ENV
echo schema = $NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA
# echo "NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA=pr_full_$(echo ${GITHUB_HEAD_REF} | tr '[:upper:]' '[:lower:]' | sed -e 's/[^a-zA-Z0-9]/_/g')" >> $GITHUB_ENV
NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA=pr_full_$(echo ${GITHUB_HEAD_REF} | tr '[:upper:]' '[:lower:]' | sed -e 's/[^a-zA-Z0-9]/_/g')
echo "NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA=$(echo $NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA)" >> $GITHUB_ENV
# echo schema ${{ env.NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA }} # cant ref env variables in command in same run
echo schema $NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA
- name: check schema
run: echo schema ${{ env.NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA }}


- name: Launch Docker Deploy
if: steps.prerun.outputs.result == 'docker-deploy'
Expand Down Expand Up @@ -103,9 +116,23 @@ jobs:
source .venv/bin/activate
# currently requirements.txt is stored at total project level one level above dagster project
uv pip install -r requirements.txt
cd ./transformation/demo_transformation_scaling_tpch
dbt deps
cd ../..
cd ./transformation/transformation_nsw_doe
dbt deps
cd ../..
cd ./${{ env.NSW_DOE_DATA_STACK_IN_A_BOX_DAGSTER_PROJECT_DIR }}
uv pip install .
dagster-dbt project prepare-for-deployment --file ./${{ env.DAGSTER_PROJECT_NAME }}/project.py
dagster-dbt project prepare-for-deployment --file ./pipeline_nsw_doe_requires_secrets/project.py
dagster-dbt project prepare-for-deployment --file ./demo_pipeline_scaling_tpch/project.py
# The cli command below can be used to manage syncing the prod manifest to branches if state_path is set on the DbtProject
# dagster-cloud ci dagster-dbt project manage-state --file ./${{ env.DAGSTER_PROJECT_NAME }}/project.py
shell: bash
Expand All @@ -119,7 +146,8 @@ jobs:
python_version: "${{ env.PYTHON_VERSION }}"
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA: $NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA
NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA: ${{ env.NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA }}
# cant seem to send env to dagster config, for now when doing end to end test will need to set this env in the UI

dagster_cloud_docker_deploy:
name: Docker Deploy
Expand Down
8 changes: 7 additions & 1 deletion .github/workflows/ci_test_branch_deployments_only_dbt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,16 @@ jobs:
aws s3 cp s3://$S3_BUCKET_METADATA/prod/manifest/manifest.json ${{github.workspace}}/transformation/transformation_nsw_doe/target/last_manifest/manifest.json
- name: Deploy
# have excluded source: from state modified as this seem to try to rebuild all models as source database are different between envs.
# also for now have remove the + at the end of state:modified as that was also causing all models to be created even when nothing changed.
# TODO
run: |
echo schema $NSW_DOE_DATA_STACK_IN_A_BOX_TARGET_SCHEMA
cd ${{github.workspace}}/transformation/transformation_nsw_doe
dbt build --exclude "+*github*+ +*google*+" --models state:modified+ --defer --state ${{github.workspace}}/transformation/transformation_nsw_doe/target/last_manifest --profiles-dir ${{github.workspace}}/transformation/transformation_nsw_doe
echo models modified
dbt ls --exclude "source:* +*github*+ +*google*+" --models state:modified --defer --state ${{github.workspace}}/transformation/transformation_nsw_doe/target/last_manifest --profiles-dir ${{github.workspace}}/transformation/transformation_nsw_doe
echo building modified models
dbt build --exclude "source:* +*github*+ +*google*+ *snapshot*" --models state:modified --defer --state ${{github.workspace}}/transformation/transformation_nsw_doe/target/last_manifest --profiles-dir ${{github.workspace}}/transformation/transformation_nsw_doe
# reoving datafold until we get a motherduck connection
Expand Down
13 changes: 11 additions & 2 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ tasks:
# workaround as cant get dynamic manifest.json to build
- cd ${GITHUB_WORKSPACE}/${NSW_DOE_DATA_STACK_IN_A_BOX_DBT_PROJECT_DIR} && dbt parse

- cd transformation/demo_transformation_jaffle_shop && dbt deps
- cd transformation/demo_transformation_jaffle_shop && dbt parse
- cd ${GITHUB_WORKSPACE}/transformation/demo_transformation_jaffle_shop && dbt deps
- cd ${GITHUB_WORKSPACE}/transformation/demo_transformation_jaffle_shop && dbt parse

- cd ${GITHUB_WORKSPACE}/transformation/pipeline_nsw_doe_requires_secrets && dbt deps
- cd ${GITHUB_WORKSPACE}/transformation/pipeline_nsw_doe_requires_secrets && dbt parse

- cd ${GITHUB_WORKSPACE}/transformation/demo_pipeline_scaling_tpch && dbt deps
- cd ${GITHUB_WORKSPACE}/transformation/demo_pipeline_scaling_tpch && dbt parse


node_deps:
Expand Down Expand Up @@ -87,6 +93,9 @@ tasks:
duck:
cmds:
- duckdb ${GITHUB_WORKSPACE}/$NSW_DOE_DATA_STACK_IN_A_BOX_DB_PATH_AND_DB
duck_tpch:
cmds:
- duckdb ${GITHUB_WORKSPACE}/$TPCH_DB_PATH_AND_DB
setup_sqltools:
cmds:
- cd ~/.local/share/vscode-sqltools && npm install [email protected] && exit 0
Expand Down
6 changes: 6 additions & 0 deletions dagster_cloud.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@ locations:
build:
directory: .
working_directory: ./orchestration
- code_source:
package_name: demo_pipeline_scaling_tpch
location_name: demo-pipeline-scaling-tpch
build:
directory: .
working_directory: ./orchestration
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,9 @@
}
],
"source": [
"import matplotlib.patches as mpatches\n",
"import matplotlib.lines as mlines\n",
"\n",
"fig, axs = plt.subplots(ncols=3, nrows=3, figsize=(12, 12))\n",
"\n",
"sns.scatterplot(\n",
Expand Down Expand Up @@ -984,8 +987,6 @@
"axs[2, 0].axis(\"off\")\n",
"axs[2, 1].axis(\"off\")\n",
"\n",
"import matplotlib.patches as mpatches\n",
"import matplotlib.lines as mlines\n",
"\n",
"palette = sns.color_palette()\n",
"setosa = mpatches.Patch(color=palette[0], label=\"Iris-setosa\", alpha=0.5)\n",
Expand Down
74 changes: 74 additions & 0 deletions orchestration/demo_pipeline_scaling_tpch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import os
from pathlib import Path

from dagster import Definitions, FilesystemIOManager, load_assets_from_package_module
from dagster_dbt import DbtCliResource
from dagster_duckdb_pandas import DuckDBPandasIOManager
from dagstermill import ConfigurableLocalOutputNotebookIOManager
from dotenv import load_dotenv

# from dagster_airbyte import airbyte_resource
# from .assets import jaffle_shop_dbt_assets,raw_customers_py,raw_orders_py,raw_payments_py,iris_dataset,iris_dataset_test_to_remove #,csv_to_onelake_asset
# from .assets import iris,raw,transformation,machine_learning
from . import assets
from .project import tpch_project
from .schedules import schedules

from .util.branching import set_schema_name_env

load_dotenv()


set_schema_name_env()

TPCH_TARGET_SCHEMA = os.getenv("TPCH_TARGET_SCHEMA")


TPCH_DB_PATH_AND_DB = os.getenv("TPCH_DB_PATH_AND_DB")
DUCKDB_PROJECT_DIR = str(
Path(__file__).parent.parent.parent.joinpath(os.environ["TPCH_DB_PATH_AND_DB"])
)

S3_BUCKET_METADATA = os.getenv("S3_BUCKET_METADATA")

TPCH__ENV = os.getenv("TPCH__ENV", "dev")

resources_by_env = {
"prod": {
"io_manager_dw": DuckDBPandasIOManager(
database=f"{TPCH_DB_PATH_AND_DB}",
),
# "io_manager": S3PickleIOManager(
# s3_resource=S3Resource(),
# s3_bucket=f"{S3_BUCKET_METADATA}",
# s3_prefix=f"{TPCH_TARGET_SCHEMA}",
# ),
"io_manager": FilesystemIOManager(),
"dbt": DbtCliResource(project_dir=tpch_project),
"output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(),
},
"test": {
"io_manager_dw": DuckDBPandasIOManager(
database=f"{TPCH_DB_PATH_AND_DB}",
),
"io_manager": FilesystemIOManager(),
"dbt": DbtCliResource(project_dir=tpch_project),
"output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(),
},
"dev": {
"io_manager_dw": DuckDBPandasIOManager(database=DUCKDB_PROJECT_DIR),
"io_manager": FilesystemIOManager(),
"dbt": DbtCliResource(project_dir=tpch_project),
"output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(),
},
}

# all_assets = [*load_assets_from_package_module(raw, group_name="TPCH"), *load_assets_from_package_module(transformation, group_name="TPCH"), *load_assets_from_package_module(iris, group_name="other"), *load_assets_from_package_module(machine_learning, group_name="TPCH")]
all_assets = load_assets_from_package_module(assets)

defs = Definitions(
# assets=[raw_customers_py,raw_orders_py,raw_payments_py,iris_dataset,iris_dataset_test_to_remove,TPCH_dbt_assets],
assets=all_assets,
schedules=schedules,
resources=resources_by_env[os.getenv("TPCH__ENV", "dev")],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import *
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .assets import *
Loading

0 comments on commit 4fbc7ef

Please sign in to comment.