From eb62deb80bead4a1b48c69ec14cd56eef22731cb Mon Sep 17 00:00:00 2001 From: Vishal Shah Date: Thu, 28 Jul 2022 17:07:33 -0400 Subject: [PATCH 1/5] use getattr to default None if no subdag --- .../src/datahub_provider/client/airflow_generator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py index b7864ddb71ea6..a4e4ba768c46d 100644 --- a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py +++ b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py @@ -32,7 +32,7 @@ def _get_dependencies( upstream_task = dag.task_dict[upstream_task_id] # if upstream task is not a subdag, then skip it - if upstream_task.subdag is None: + if getattr(upstream_task, 'subdag', None): continue # else, link the leaf tasks of the upstream subdag as upstream tasks @@ -113,7 +113,7 @@ def _get_dependencies( [ DataJobUrn.create_from_ids(job_id=task_id, data_flow_urn=str(flow_urn)) for task_id in task.upstream_task_ids - if dag.task_dict[task_id].subdag is None + if getattr(dag.task_dict[task_id], 'subdag', None) ] + upstream_subdag_task_urns + upstream_subdag_triggers From 76dd12c0080f91cf3074ace56871e686a95eb134 Mon Sep 17 00:00:00 2001 From: Vishal Shah Date: Thu, 28 Jul 2022 17:33:56 -0400 Subject: [PATCH 2/5] add None check --- .../src/datahub_provider/client/airflow_generator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py index a4e4ba768c46d..991f95f546cc5 100644 --- a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py +++ b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py @@ -32,7 +32,7 @@ def _get_dependencies( upstream_task = dag.task_dict[upstream_task_id] # if upstream task is not a subdag, then skip it - if getattr(upstream_task, 'subdag', None): + if getattr(upstream_task, 'subdag', None) is None: continue # else, link the leaf tasks of the upstream subdag as upstream tasks From 77bc90d65cae48528794ec42a19205b41a5f2969 Mon Sep 17 00:00:00 2001 From: Vishal Shah Date: Thu, 28 Jul 2022 17:36:16 -0400 Subject: [PATCH 3/5] add other None check --- .../src/datahub_provider/client/airflow_generator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py index 991f95f546cc5..7c0e11ad06165 100644 --- a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py +++ b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py @@ -113,7 +113,7 @@ def _get_dependencies( [ DataJobUrn.create_from_ids(job_id=task_id, data_flow_urn=str(flow_urn)) for task_id in task.upstream_task_ids - if getattr(dag.task_dict[task_id], 'subdag', None) + if getattr(dag.task_dict[task_id], 'subdag', None) is None ] + upstream_subdag_task_urns + upstream_subdag_triggers From fc621f35bbb8dcee2c94ee6f64dc2a1b561597be Mon Sep 17 00:00:00 2001 From: Vishal Shah Date: Fri, 29 Jul 2022 11:09:04 -0400 Subject: [PATCH 4/5] Apply suggestions from code review- double quotes Co-authored-by: Harshal Sheth --- .../src/datahub_provider/client/airflow_generator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py index 7c0e11ad06165..4c104bb7123b0 100644 --- a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py +++ b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py @@ -32,7 +32,7 @@ def _get_dependencies( upstream_task = dag.task_dict[upstream_task_id] # if upstream task is not a subdag, then skip it - if getattr(upstream_task, 'subdag', None) is None: + if getattr(upstream_task, "subdag", None) is None: continue # else, link the leaf tasks of the upstream subdag as upstream tasks @@ -113,7 +113,7 @@ def _get_dependencies( [ DataJobUrn.create_from_ids(job_id=task_id, data_flow_urn=str(flow_urn)) for task_id in task.upstream_task_ids - if getattr(dag.task_dict[task_id], 'subdag', None) is None + if getattr(dag.task_dict[task_id], "subdag", None) is None ] + upstream_subdag_task_urns + upstream_subdag_triggers From b8528348d7677f4d249206a2e592910983ff109a Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 29 Jul 2022 10:49:31 -0700 Subject: [PATCH 5/5] minor tweak to fix lint --- .../src/datahub_provider/client/airflow_generator.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py index 4c104bb7123b0..69943df50d3af 100644 --- a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py +++ b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py @@ -32,12 +32,11 @@ def _get_dependencies( upstream_task = dag.task_dict[upstream_task_id] # if upstream task is not a subdag, then skip it - if getattr(upstream_task, "subdag", None) is None: + upstream_subdag = getattr(upstream_task, "subdag", None) + if upstream_subdag is None: continue # else, link the leaf tasks of the upstream subdag as upstream tasks - upstream_subdag = upstream_task.subdag - for upstream_subdag_task_id in upstream_subdag.task_dict: upstream_subdag_task = upstream_subdag.task_dict[ upstream_subdag_task_id