From 0be002eebb182b607109a0390d7f6fb8795c668b Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Tue, 7 Aug 2018 19:43:41 +0100 Subject: [PATCH] [AIRFLOW-2140] Don't require kubernetes for the SparkSubmit hook (#3700) This extra dep is a quasi-breaking change when upgrading - previously there were no deps outside of Airflow itself for this hook. Importing the k8s libs breaks installs that aren't also using Kubernetes. This makes the dep optional for anyone who doesn't explicitly use the functionality --- airflow/contrib/hooks/spark_submit_hook.py | 7 +++++-- airflow/contrib/kubernetes/kube_client.py | 14 +++++++++++++- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index 0185cab283345a..65bb6134e6ac45 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -26,7 +26,6 @@ from airflow.exceptions import AirflowException from airflow.utils.log.logging_mixin import LoggingMixin from airflow.contrib.kubernetes import kube_client -from kubernetes.client.rest import ApiException class SparkSubmitHook(BaseHook, LoggingMixin): @@ -136,6 +135,10 @@ def __init__(self, self._connection = self._resolve_connection() self._is_yarn = 'yarn' in self._connection['master'] self._is_kubernetes = 'k8s' in self._connection['master'] + if self._is_kubernetes and kube_client is None: + raise RuntimeError( + "{master} specified by kubernetes dependencies are not installed!".format( + self._connection['master'])) self._should_track_driver_status = self._resolve_should_track_driver_status() self._driver_id = None @@ -559,6 +562,6 @@ def on_kill(self): self.log.info("Spark on K8s killed with response: %s", api_response) - except ApiException as e: + except kube_client.ApiException as e: self.log.info("Exception when attempting to kill Spark on K8s:") self.log.exception(e) diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py index 8b71f412423294..4b8fa17155eedd 100644 --- a/airflow/contrib/kubernetes/kube_client.py +++ b/airflow/contrib/kubernetes/kube_client.py @@ -17,9 +17,21 @@ from airflow.configuration import conf from six import PY2 +try: + from kubernetes import config, client + from kubernetes.client.rest import ApiException + has_kubernetes = True +except ImportError as e: + # We need an exception class to be able to use it in ``except`` elsewhere + # in the code base + ApiException = BaseException + has_kubernetes = False + _import_err = e + def _load_kube_config(in_cluster, cluster_context, config_file): - from kubernetes import config, client + if not has_kubernetes: + raise _import_err if in_cluster: config.load_incluster_config() else: