Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/dagster): Add automatic snowflake_pandas_io_manager asset capture #11189

Merged
merged 94 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
94 commits
Select commit Hold shift + click to select a range
1d5b308
Dagster source metadata ingestion code added
shubhamjagtap639 Jul 5, 2023
92e3cea
Job failed and canceled execution metadata emit code added
shubhamjagtap639 Jul 6, 2023
e006275
Code changes as per review comment and dagster test cases added
shubhamjagtap639 Jul 7, 2023
d24e27f
Dagster source integration doc added
shubhamjagtap639 Jul 7, 2023
28fcad2
Dagster sensor renamed to datahub sensor
shubhamjagtap639 Jul 7, 2023
78220aa
Code and doc changes as per review comment
shubhamjagtap639 Jul 7, 2023
9b8b50a
File reformatted
shubhamjagtap639 Jul 10, 2023
f7c6b08
Lint error fixed
shubhamjagtap639 Jul 11, 2023
fbd73d8
Dagster package added
shubhamjagtap639 Jul 11, 2023
de5ad77
Temp changes
shubhamjagtap639 Jul 14, 2023
12bc920
Dagster version set to >1.3.3
shubhamjagtap639 Jul 14, 2023
a9c1a83
Datahub dagster plugin code added
shubhamjagtap639 Sep 5, 2023
29bb752
Extra init file removed
shubhamjagtap639 Sep 5, 2023
c419a61
cl failed error resolved
shubhamjagtap639 Sep 6, 2023
b996394
Revert doc changes
shubhamjagtap639 Sep 6, 2023
50e5724
Added missing command in lint task
shubhamjagtap639 Sep 6, 2023
e5e51fd
Temp changes
shubhamjagtap639 Sep 6, 2023
d134b74
Typo error resolved
shubhamjagtap639 Sep 6, 2023
c7a3d18
dataset entity added in datahub/api/entities. Dagster example files a…
shubhamjagtap639 Sep 7, 2023
00cb35e
lint error fixed
shubhamjagtap639 Sep 7, 2023
a7efd7c
Code changes as per review comment
shubhamjagtap639 Sep 11, 2023
9fad5c4
Initial commit of the reworked Dagster plugin
treff7es Mar 18, 2024
74dc9cf
Adding entities
treff7es Mar 18, 2024
5f373dc
Using java17 build
treff7es Mar 18, 2024
281e666
Black formatting
treff7es Mar 18, 2024
5ca6867
Silenting some mypy error temporary
treff7es Mar 18, 2024
7eb7a8e
Black formatting
treff7es Mar 18, 2024
059ec35
Fixing test
treff7es Mar 18, 2024
b7bb9e3
Add missing import
treff7es Mar 18, 2024
6efc54f
Fixing url generation
treff7es Mar 19, 2024
b48b061
Fixing linter issues
treff7es Mar 19, 2024
5d68929
Fixes
treff7es Mar 19, 2024
c1f7557
Fixing build
treff7es Mar 19, 2024
512824d
Not pinning datahub client version
treff7es Mar 19, 2024
f788ec5
Adding way to capture assets
treff7es Mar 20, 2024
508fcbc
Add way to test multiple dagster version
treff7es Mar 20, 2024
8ec8f4e
Adding way to bring your own lineage extractor
treff7es Mar 21, 2024
63df8c1
Pr review fixes
treff7es Mar 22, 2024
6bfe22d
Fixing imports
treff7es Mar 22, 2024
479c2cf
Fixing golden files and pydantic v2 issues with s3 tests
treff7es Mar 22, 2024
80d9a89
Fixing test
treff7es Mar 22, 2024
1450b83
Fixing tests
treff7es Mar 22, 2024
975b19d
Disabling Dagster tests
treff7es Mar 22, 2024
295b91a
Reverting accidentally downgraded package
treff7es Mar 22, 2024
39f23cc
- Moving examples out from the package
treff7es Mar 25, 2024
186069e
Merge branch 'master' into dagster_source-rebase
treff7es Mar 25, 2024
85cff86
Fixing doc links
treff7es Mar 25, 2024
d0f5365
Fixing doc build concurrency issue
treff7es Mar 25, 2024
39e2a7f
Fixing typo
treff7es Mar 25, 2024
53b0560
Opinionated way to capture Asset inlets/outlets automatically
treff7es May 27, 2024
bc4cbca
Merge branch 'master' into dagster_asset_dep_capture
treff7es May 27, 2024
e7b31c2
Fixing lint issues
treff7es May 27, 2024
86bafd5
revertin change
treff7es May 27, 2024
e2df73e
delete file
treff7es May 27, 2024
0b81dbe
linting
treff7es May 27, 2024
6156651
Mypy fixes
treff7es May 27, 2024
a79af54
isort
treff7es May 27, 2024
b40a886
mypying
treff7es May 27, 2024
4ac71fc
Isort
treff7es May 27, 2024
ef0851e
Additional check to make mypy happy
treff7es May 27, 2024
40c5537
fix
treff7es May 27, 2024
f169dfe
Mypy fixes
treff7es May 27, 2024
0dc07b8
Adding sqlglot as dep
treff7es May 27, 2024
e17830d
fix
treff7es May 27, 2024
c0e0cd1
Updating doc and various refactors
treff7es May 28, 2024
4655a94
Fixing example formatting
treff7es May 28, 2024
127c516
isorting
treff7es May 28, 2024
0b0dc95
remove unneded deps
treff7es May 28, 2024
da4af37
Fixing mypy issues
treff7es May 28, 2024
553cf2f
Fix
treff7es May 28, 2024
64ac9f0
Update dagster.md
jjoyce0510 Aug 8, 2024
3802dba
Add snowflake_pandas_io_manager drop in replacement to automatically …
treff7es Aug 15, 2024
1084300
Merge branch 'master' into dagster_asset_dep_capture
treff7es Aug 15, 2024
6ee5e6f
Adding missing init
treff7es Aug 15, 2024
c45a82b
Merge remote-tracking branch 'upstream/dagster_asset_dep_capture' int…
treff7es Aug 15, 2024
5eb067e
lint fix
treff7es Aug 15, 2024
aeb7b79
remove accidentally added import
treff7es Aug 15, 2024
a3b94c2
mypy fix
treff7es Aug 15, 2024
719d3c8
Isorting
treff7es Aug 15, 2024
63a4862
Fixing datajob input/output type
treff7es Aug 16, 2024
e077c87
Fix set type
treff7es Aug 16, 2024
7d85629
Not parsing select queries
treff7es Aug 16, 2024
72e175c
Update doc
treff7es Aug 16, 2024
b866f8b
Merge branch 'master' into dagster_asset_dep_capture
treff7es Aug 22, 2024
21aa495
Generating status aspect for dataflow/datajobs
treff7es Aug 22, 2024
c80d9e5
Merge branch 'dagster_asset_dep_capture' of github.com:treff7es/datah…
treff7es Aug 22, 2024
c0f3e3d
Adding more debug log
treff7es Aug 22, 2024
ce9eae6
Fix airflow unit tests
treff7es Aug 23, 2024
deef278
Fix golden files
treff7es Aug 23, 2024
319b132
Add option to enable more verbose logging
treff7es Aug 23, 2024
aab0f67
fix black formatting
treff7es Aug 23, 2024
a1bd12c
Add doc about the supported version
treff7es Aug 23, 2024
f1065ce
Update dagster.md
jjoyce0510 Aug 26, 2024
c1aeed1
Update dagster.md
jjoyce0510 Aug 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 162 additions & 41 deletions docs/lineage/dagster.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)",
Expand Down Expand Up @@ -85,6 +96,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
Expand Down Expand Up @@ -86,6 +97,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
Expand Down Expand Up @@ -540,6 +562,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def test_lineage_backend(mock_emit, inlets, outlets, capture_executions):
assert all(map(lambda let: isinstance(let, Dataset), op2.outlets))

# Check that the right things were emitted.
assert mock_emitter.emit.call_count == 17 if capture_executions else 9
assert mock_emitter.emit.call_count == 19 if capture_executions else 11

# TODO: Replace this with a golden file-based comparison.
assert mock_emitter.method_calls[0].args[0].aspectName == "dataFlowInfo"
Expand All @@ -325,127 +325,139 @@ def test_lineage_backend(mock_emit, inlets, outlets, capture_executions):
== "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)"
)

assert mock_emitter.method_calls[1].args[0].aspectName == "ownership"
assert mock_emitter.method_calls[1].args[0].aspectName == "status"
assert (
mock_emitter.method_calls[1].args[0].entityUrn
== "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)"
)

assert mock_emitter.method_calls[2].args[0].aspectName == "globalTags"
assert mock_emitter.method_calls[2].args[0].aspectName == "ownership"
assert (
mock_emitter.method_calls[2].args[0].entityUrn
== "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)"
)

assert mock_emitter.method_calls[3].args[0].aspectName == "dataJobInfo"
assert mock_emitter.method_calls[3].args[0].aspectName == "globalTags"
assert (
mock_emitter.method_calls[3].args[0].entityUrn
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
== "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)"
)

assert mock_emitter.method_calls[4].args[0].aspectName == "dataJobInputOutput"
assert mock_emitter.method_calls[4].args[0].aspectName == "dataJobInfo"
assert (
mock_emitter.method_calls[4].args[0].entityUrn
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
)

assert mock_emitter.method_calls[5].args[0].aspectName == "status"
assert (
mock_emitter.method_calls[4].args[0].aspect.inputDatajobs[0]
mock_emitter.method_calls[5].args[0].entityUrn
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
)

assert mock_emitter.method_calls[6].args[0].aspectName == "dataJobInputOutput"
assert (
mock_emitter.method_calls[6].args[0].entityUrn
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
)
assert (
mock_emitter.method_calls[6].args[0].aspect.inputDatajobs[0]
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task1_upstream)"
)
assert (
mock_emitter.method_calls[4].args[0].aspect.inputDatajobs[1]
mock_emitter.method_calls[6].args[0].aspect.inputDatajobs[1]
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)"
)
assert (
mock_emitter.method_calls[4].args[0].aspect.inputDatasets[0]
mock_emitter.method_calls[6].args[0].aspect.inputDatasets[0]
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)"
)
assert (
mock_emitter.method_calls[4].args[0].aspect.outputDatasets[0]
mock_emitter.method_calls[6].args[0].aspect.outputDatasets[0]
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableProduced,PROD)"
)

assert mock_emitter.method_calls[5].args[0].aspectName == "datasetKey"
assert mock_emitter.method_calls[7].args[0].aspectName == "datasetKey"
assert (
mock_emitter.method_calls[5].args[0].entityUrn
mock_emitter.method_calls[7].args[0].entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)"
)

assert mock_emitter.method_calls[6].args[0].aspectName == "datasetKey"
assert mock_emitter.method_calls[8].args[0].aspectName == "datasetKey"
assert (
mock_emitter.method_calls[6].args[0].entityUrn
mock_emitter.method_calls[8].args[0].entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableProduced,PROD)"
)

assert mock_emitter.method_calls[7].args[0].aspectName == "ownership"
assert mock_emitter.method_calls[9].args[0].aspectName == "ownership"
assert (
mock_emitter.method_calls[7].args[0].entityUrn
mock_emitter.method_calls[9].args[0].entityUrn
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
)

assert mock_emitter.method_calls[8].args[0].aspectName == "globalTags"
assert mock_emitter.method_calls[10].args[0].aspectName == "globalTags"
assert (
mock_emitter.method_calls[8].args[0].entityUrn
mock_emitter.method_calls[10].args[0].entityUrn
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
)

if capture_executions:
assert (
mock_emitter.method_calls[9].args[0].aspectName
mock_emitter.method_calls[11].args[0].aspectName
== "dataProcessInstanceProperties"
)
assert (
mock_emitter.method_calls[9].args[0].entityUrn
mock_emitter.method_calls[11].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)

assert (
mock_emitter.method_calls[10].args[0].aspectName
mock_emitter.method_calls[12].args[0].aspectName
== "dataProcessInstanceRelationships"
)
assert (
mock_emitter.method_calls[10].args[0].entityUrn
mock_emitter.method_calls[12].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)
assert (
mock_emitter.method_calls[11].args[0].aspectName
mock_emitter.method_calls[13].args[0].aspectName
== "dataProcessInstanceInput"
)
assert (
mock_emitter.method_calls[11].args[0].entityUrn
mock_emitter.method_calls[13].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)
assert (
mock_emitter.method_calls[12].args[0].aspectName
mock_emitter.method_calls[14].args[0].aspectName
== "dataProcessInstanceOutput"
)
assert (
mock_emitter.method_calls[12].args[0].entityUrn
mock_emitter.method_calls[14].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)
assert mock_emitter.method_calls[13].args[0].aspectName == "datasetKey"
assert mock_emitter.method_calls[15].args[0].aspectName == "datasetKey"
assert (
mock_emitter.method_calls[13].args[0].entityUrn
mock_emitter.method_calls[15].args[0].entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)"
)
assert mock_emitter.method_calls[14].args[0].aspectName == "datasetKey"
assert mock_emitter.method_calls[16].args[0].aspectName == "datasetKey"
assert (
mock_emitter.method_calls[14].args[0].entityUrn
mock_emitter.method_calls[16].args[0].entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableProduced,PROD)"
)
assert (
mock_emitter.method_calls[15].args[0].aspectName
mock_emitter.method_calls[17].args[0].aspectName
== "dataProcessInstanceRunEvent"
)
assert (
mock_emitter.method_calls[15].args[0].entityUrn
mock_emitter.method_calls[17].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)
assert (
mock_emitter.method_calls[16].args[0].aspectName
mock_emitter.method_calls[18].args[0].aspectName
== "dataProcessInstanceRunEvent"
)
assert (
mock_emitter.method_calls[16].args[0].entityUrn
mock_emitter.method_calls[18].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)
112 changes: 112 additions & 0 deletions metadata-ingestion-modules/dagster-plugin/examples/iris.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import pandas as pd
from dagster import MaterializeResult, MetadataValue, asset
from dagster_aws.redshift import RedshiftClientResource
from dagster_snowflake import SnowflakeResource


@asset(
metadata={"schema": "public"},
key_prefix=["prod", "snowflake", "test_db", "public"],
group_name="iris",
io_manager_key="snowflake_io_manager",
)
def iris_dataset() -> pd.DataFrame:
return pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)


@asset(
metadata={"schema": "public"},
key_prefix=["prod", "snowflake", "test_db", "public"],
group_name="iris",
io_manager_key="snowflake_io_manager",
)
def iris_cleaned(iris_dataset: pd.DataFrame) -> pd.DataFrame:
return iris_dataset.dropna().drop_duplicates()


@asset(
metadata={"schema": "public"},
key_prefix=["prod", "snowflake", "test_db", "public"],
group_name="iris",
deps=[iris_dataset],
)
def iris_setosa(snowflake: SnowflakeResource) -> MaterializeResult:
query = """
create or replace table TEST_DB.public.iris_setosa as (
SELECT *
FROM TEST_DB.public.iris_cleaned
WHERE species = 'Iris-setosa'
);
"""

with snowflake.get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute(query)

return MaterializeResult(
metadata={
"Query": MetadataValue.text(query),
}
)


@asset(
key_prefix=[
"prod",
"snowflake",
"db_name",
"schema_name",
], # the fqdn asset name to be able identify platform and make sure asset is unique
group_name="iris",
deps=[iris_dataset],
)
def my_asset_table_a(snowflake: SnowflakeResource) -> MaterializeResult:
query = """
create or replace table db_name.schema_name.my_asset_table_a as (
SELECT *
FROM db_name.schema_name.my_asset_table_b
);
"""

with snowflake.get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute(query)

return MaterializeResult( # Adding query to metadata to use it getting lineage from it with sql parser
metadata={
"Query": MetadataValue.text(query),
}
)


@asset(
key_prefix=[
"prod",
"redshift",
"dev",
"public",
"blood_storage_count",
], # the fqdn asset name to be able identify platform and make sure asset is unique
group_name="blood_storage",
)
def blood_storage_cleaned(redshift: RedshiftClientResource) -> MaterializeResult:
query = """
select count(*) from public.blood_storage;
"""
client = redshift.get_client()
client.execute_query(query)

return MaterializeResult( # Adding query to metadata to use it getting lineage from it with sql parser
metadata={
"Query": MetadataValue.text(query),
}
)
Loading
Loading