From edb4f62f518cc28adb54542bb39ad808c29ad2c4 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Tue, 31 Dec 2019 13:06:47 +0000 Subject: [PATCH] [AIRFLOW-6359] Make Spark status_poll_interval explicit (#6978) (cherry picked from commit 6cc1123e6f2ed09b0fa42fa85fcac97a06ba0924) --- airflow/contrib/hooks/spark_submit_hook.py | 5 +++++ airflow/contrib/operators/spark_submit_operator.py | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index d295aab683c23a..8263e70bfd2672 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -86,6 +86,9 @@ class SparkSubmitHook(BaseHook, LoggingMixin): :type name: str :param num_executors: Number of executors to launch :type num_executors: int + :param status_poll_interval: Seconds to wait between polls of driver status in cluster + mode (Default: 1) + :type status_poll_interval: int :param application_args: Arguments for the application being submitted :type application_args: list :param env_vars: Environment variables for spark-submit. It @@ -118,6 +121,7 @@ def __init__(self, proxy_user=None, name='default-name', num_executors=None, + status_poll_interval=1, application_args=None, env_vars=None, verbose=False, @@ -142,6 +146,7 @@ def __init__(self, self._proxy_user = proxy_user self._name = name self._num_executors = num_executors + self._status_poll_interval = status_poll_interval self._application_args = application_args self._env_vars = env_vars self._verbose = verbose diff --git a/airflow/contrib/operators/spark_submit_operator.py b/airflow/contrib/operators/spark_submit_operator.py index 41dc8df71ae1bd..21f6b4ec3df066 100644 --- a/airflow/contrib/operators/spark_submit_operator.py +++ b/airflow/contrib/operators/spark_submit_operator.py @@ -76,6 +76,9 @@ class SparkSubmitOperator(BaseOperator): :type name: str :param num_executors: Number of executors to launch :type num_executors: int + :param status_poll_interval: Seconds to wait between polls of driver status in cluster + mode (Default: 1) + :type status_poll_interval: int :param application_args: Arguments for the application being submitted (templated) :type application_args: list :param env_vars: Environment variables for spark-submit. It supports yarn and k8s mode too. (templated) @@ -114,6 +117,7 @@ def __init__(self, proxy_user=None, name='airflow-spark', num_executors=None, + status_poll_interval=1, application_args=None, env_vars=None, verbose=False, @@ -141,6 +145,7 @@ def __init__(self, self._proxy_user = proxy_user self._name = name self._num_executors = num_executors + self._status_poll_interval = status_poll_interval self._application_args = application_args self._env_vars = env_vars self._verbose = verbose @@ -173,6 +178,7 @@ def execute(self, context): proxy_user=self._proxy_user, name=self._name, num_executors=self._num_executors, + status_poll_interval=self._status_poll_interval, application_args=self._application_args, env_vars=self._env_vars, verbose=self._verbose,