diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index 6354df38811ae..1a478b4a3857d 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -227,7 +227,7 @@ def _build_spark_submit_command(self, application): """ connection_cmd = self._get_spark_binary_path() - # The url ot the spark master + # The url of the spark master connection_cmd += ["--master", self._connection['master']] if self._conf: @@ -236,6 +236,8 @@ def _build_spark_submit_command(self, application): if self._env_vars and (self._is_kubernetes or self._is_yarn): if self._is_yarn: tmpl = "spark.yarn.appMasterEnv.{}={}" + # Allow dynamic setting of hadoop/yarn configuration environments + self._env = self._env_vars else: tmpl = "spark.kubernetes.driverEnv.{}={}" for key in self._env_vars: diff --git a/tests/contrib/hooks/test_spark_submit_hook.py b/tests/contrib/hooks/test_spark_submit_hook.py index b73024cec48b2..f7165bea9e757 100644 --- a/tests/contrib/hooks/test_spark_submit_hook.py +++ b/tests/contrib/hooks/test_spark_submit_hook.py @@ -489,6 +489,7 @@ def test_resolve_spark_submit_env_vars_yarn(self): # Then self.assertEqual(cmd[4], "spark.yarn.appMasterEnv.bar=foo") + self.assertEqual(hook._env, {"bar": "foo"}) def test_resolve_spark_submit_env_vars_k8s(self): # Given