From d1d638f3a930e1e6dfe38ff81972476daf32f2d5 Mon Sep 17 00:00:00 2001 From: aviemzur Date: Tue, 31 Dec 2019 16:47:17 +0200 Subject: [PATCH] changes after CR #2 --- airflow/contrib/operators/emr_add_steps_operator.py | 8 +++----- tests/contrib/operators/test_emr_add_steps_operator.py | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/airflow/contrib/operators/emr_add_steps_operator.py b/airflow/contrib/operators/emr_add_steps_operator.py index c7d53984978b94..0d7040dfe5b1ac 100644 --- a/airflow/contrib/operators/emr_add_steps_operator.py +++ b/airflow/contrib/operators/emr_add_steps_operator.py @@ -72,12 +72,10 @@ def execute(self, context): emr = emr_hook.get_conn() - job_flow_id = self.job_flow_id - + job_flow_id = self.job_flow_id or emr_hook.get_cluster_id_by_name(self.job_flow_name, + self.cluster_states) if not job_flow_id: - job_flow_id = emr_hook.get_cluster_id_by_name(self.job_flow_name, self.cluster_states) - if not job_flow_id: - raise AirflowException(f'No cluster found for name: {self.job_flow_name}') + raise AirflowException(f'No cluster found for name: {self.job_flow_name}') if self.do_xcom_push: context['ti'].xcom_push(key='job_flow_id', value=job_flow_id) diff --git a/tests/contrib/operators/test_emr_add_steps_operator.py b/tests/contrib/operators/test_emr_add_steps_operator.py index eecbcddabab5ff..1ca1e9e1e1bd1c 100644 --- a/tests/contrib/operators/test_emr_add_steps_operator.py +++ b/tests/contrib/operators/test_emr_add_steps_operator.py @@ -146,7 +146,7 @@ def test_init_with_nonexistent_cluster_name(self): with self.assertRaises(AirflowException) as error: operator.execute(self.mock_context) - self.assertEqual(str(error.exception), f'No cluster found for name: {cluster_name}') + self.assertEqual(str(error.exception), f'No cluster found for name: {cluster_name}') if __name__ == '__main__':