Skip to content

Commit

Permalink
add a very basic initial test
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed May 1, 2023
1 parent 2accea6 commit 12acd43
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 49 deletions.
5 changes: 1 addition & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ jobs:
- name: Setup
run: |
just setup
- name: Lint
run: |
just lint
- name: Test
run: |
just test
just lint test
5 changes: 5 additions & 0 deletions impact_analysis.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## Acryl Impact Analysis

Failed to run impact analysis: 'DATAHUB_GMS_HOST'

See the logs for full details.
2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

test: lint
# TODO
venv/bin/pytest tests

setup:
# Create venv.
Expand Down
72 changes: 28 additions & 44 deletions src/impact_analysis.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
import os
import pathlib
import subprocess
Expand All @@ -8,18 +9,24 @@
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.metadata.schema_classes import DatasetPropertiesClass
from datahub.telemetry import telemetry
from datahub.utilities.urns.urn import Urn, guess_entity_type

DATAHUB_SERVER = os.environ["DATAHUB_GMS_HOST"]
DATAHUB_TOKEN: Optional[str] = os.getenv("DATAHUB_GMS_TOKEN")
DATAHUB_FRONTEND_URL = os.environ["DATAHUB_FRONTEND_URL"]
from rendering import datahub_url_from_urn, format_entity

OUTPUT_PATH = pathlib.Path("impact_analysis.md")
DBT_ID_PROP = "dbt_unique_id"
MAX_IMPACTED_DOWNSTREAMS = 30
MAX_DOWNSTREAMS_TO_FETCH = 1000

graph = DataHubGraph(DatahubClientConfig(server=DATAHUB_SERVER, token=DATAHUB_TOKEN))

def get_graph() -> DataHubGraph:
DATAHUB_SERVER = os.environ["DATAHUB_GMS_HOST"]
DATAHUB_TOKEN: Optional[str] = os.getenv("DATAHUB_GMS_TOKEN")

graph = DataHubGraph(
DatahubClientConfig(server=DATAHUB_SERVER, token=DATAHUB_TOKEN)
)

return graph


class ImpactAnalysisError(Exception):
Expand Down Expand Up @@ -81,7 +88,7 @@ def determine_changed_dbt_models() -> List[DbtNodeInfo]:
raise ImpactAnalysisError("Failed to parse dbt output") from e


def find_datahub_urns(dbt_node_ids: List[str]) -> List[str]:
def find_datahub_urns(graph: DataHubGraph, dbt_node_ids: List[str]) -> List[str]:
if not dbt_node_ids:
return []

Expand All @@ -108,7 +115,7 @@ def find_datahub_urns(dbt_node_ids: List[str]) -> List[str]:
return urns


def get_datahub_info(urn: str) -> Optional[DatasetPropertiesClass]:
def get_datahub_info(graph: DataHubGraph, urn: str) -> Optional[DatasetPropertiesClass]:
return graph.get_aspect(urn, DatasetPropertiesClass)


Expand Down Expand Up @@ -172,7 +179,7 @@ def get_datahub_info(urn: str) -> Optional[DatasetPropertiesClass]:
"""


def get_impact_analysis(urn: str):
def get_impact_analysis(graph: DataHubGraph, urn: str):
result = graph.execute_graphql(
IMPACT_ANALYSIS_QUERY,
variables={
Expand All @@ -198,45 +205,20 @@ def get_impact_analysis(urn: str):
return downstream_details


def datahub_url_from_urn(urn: str, suffix: Optional[str] = None) -> str:
entity_type = guess_entity_type(urn)
if entity_type == "dataJob":
entity_type = "tasks"
elif entity_type == "dataFlow":
entity_type = "pipelines"

url = f"{DATAHUB_FRONTEND_URL}/{entity_type}/{Urn.url_encode(urn)}"
if suffix:
url += f"/{suffix}"
return url


def format_entity(downstream: Dict) -> str:
platform = downstream["platform"]["name"]
if downstream["platform"].get("properties", {}).get("displayName"):
platform = downstream["platform"]["properties"]["displayName"]

name = downstream["properties"]["name"]
url = datahub_url_from_urn(downstream["urn"])

type: str = downstream["type"].capitalize()
if downstream.get("subTypes"):
type = downstream["subTypes"]["typeNames"][0]

return f"{platform} {type} [{name}]({url})"


@telemetry.with_telemetry()
def dbt_impact_analysis() -> str:
graph = get_graph()
DATAHUB_FRONTEND_URL = os.environ["DATAHUB_FRONTEND_URL"]

# Step 1 - determine which dbt nodes are impacted by the changes in a given PR.
changed_dbt_nodes = determine_changed_dbt_models()
dbt_id_to_dbt_node = {node["unique_id"]: node for node in changed_dbt_nodes}
# print(changed_dbt_nodes)

# Step 2 - map dbt nodes to datahub urns.
# In an ideal world, the datahub urns for dbt would just be the dbt node ids.
urns = find_datahub_urns([node["unique_id"] for node in changed_dbt_nodes])
datahub_node_props = {urn: get_datahub_info(urn) for urn in urns}
urns = find_datahub_urns(graph, [node["unique_id"] for node in changed_dbt_nodes])
datahub_node_props = {urn: get_datahub_info(graph, urn) for urn in urns}
urn_to_dbt_id = {
urn: node.customProperties[DBT_ID_PROP]
for urn, node in datahub_node_props.items()
Expand All @@ -245,7 +227,7 @@ def dbt_impact_analysis() -> str:
# print(urn_to_dbt_id)

# Step 3 - generate downstream impact analysis for each datahub urn.
downstreams_report = {urn: get_impact_analysis(urn) for urn in urns}
downstreams_report = {urn: get_impact_analysis(graph, urn) for urn in urns}

all_impacted_urns = {
downstream["urn"]
Expand All @@ -263,15 +245,13 @@ def dbt_impact_analysis() -> str:
for urn, downstreams in downstreams_report.items():
dbt_node = dbt_id_to_dbt_node[urn_to_dbt_id[urn]]

output += (
f"\n### [{dbt_node['original_file_path']}]({datahub_url_from_urn(urn)})\n\n"
)
output += f"\n### [{dbt_node['original_file_path']}]({datahub_url_from_urn(DATAHUB_FRONTEND_URL, urn)})\n\n"
if downstreams:
output += f"May impact **{len(downstreams)}** downstreams:\n"
for downstream in downstreams[:MAX_IMPACTED_DOWNSTREAMS]:
output += f"- {format_entity(downstream)}\n"
output += f"- {format_entity(DATAHUB_FRONTEND_URL, downstream)}\n"
if len(downstreams) > MAX_IMPACTED_DOWNSTREAMS:
output += f"- ...and [{len(downstreams) - MAX_IMPACTED_DOWNSTREAMS} more]({datahub_url_from_urn(urn, suffix='/Lineage')})\n"
output += f"- ...and [{len(downstreams) - MAX_IMPACTED_DOWNSTREAMS} more]({datahub_url_from_urn(DATAHUB_FRONTEND_URL, urn, suffix='/Lineage')})\n"
else:
output += "No downstreams impacted.\n"

Expand Down Expand Up @@ -299,4 +279,8 @@ def main():


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
logging.getLogger("datahub").setLevel(logging.DEBUG)
logging.getLogger(__name__).setLevel(logging.DEBUG)

main()
33 changes: 33 additions & 0 deletions src/rendering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import Optional

from datahub.utilities.urns.urn import Urn, guess_entity_type


def datahub_url_from_urn(
frontend_base_url: str, urn: str, suffix: Optional[str] = None
) -> str:
entity_type = guess_entity_type(urn)
if entity_type == "dataJob":
entity_type = "tasks"
elif entity_type == "dataFlow":
entity_type = "pipelines"

url = f"{frontend_base_url}/{entity_type}/{Urn.url_encode(urn)}"
if suffix:
url += f"/{suffix}"
return url


def format_entity(frontend_base_url: str, downstream: dict) -> str:
platform = downstream["platform"]["name"]
if downstream["platform"].get("properties", {}).get("displayName"):
platform = downstream["platform"]["properties"]["displayName"]

name = downstream["properties"]["name"]
url = datahub_url_from_urn(frontend_base_url, downstream["urn"])

type: str = downstream["type"].capitalize()
if downstream.get("subTypes"):
type = downstream["subTypes"]["typeNames"][0]

return f"{platform} {type} [{name}]({url})"
7 changes: 7 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import sys
import pathlib

here = pathlib.Path(__file__).parent
src = here.parent / "src"

sys.path.insert(0, str(src))
11 changes: 11 additions & 0 deletions tests/test_render.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from rendering import datahub_url_from_urn


def test_url_generation():
assert (
datahub_url_from_urn(
"https://customer.acryl.io",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1000.orders,PROD)",
)
== "https://customer.acryl.io/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Csnowflake_sample_data.tpch_sf1000.orders%2CPROD%29"
)

0 comments on commit 12acd43

Please sign in to comment.