diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index 0912fb40a5138f..d50630c44222e4 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -565,11 +565,12 @@ def on_kill(self): # Currently only instantiate Kubernetes client for killing a spark pod. try: + import kubernetes client = kube_client.get_kube_client() api_response = client.delete_namespaced_pod( self._kubernetes_driver_pod, self._connection['namespace'], - body=client.V1DeleteOptions(), + body=kubernetes.client.V1DeleteOptions(), pretty=True) self.log.info("Spark on K8s killed with response: %s", api_response) diff --git a/tests/contrib/hooks/test_spark_submit_hook.py b/tests/contrib/hooks/test_spark_submit_hook.py index 45b10d42bce5b0..6eba99e0091d2a 100644 --- a/tests/contrib/hooks/test_spark_submit_hook.py +++ b/tests/contrib/hooks/test_spark_submit_hook.py @@ -614,6 +614,51 @@ def test_standalone_cluster_process_on_kill(self): self.assertEqual(kill_cmd[3], '--kill') self.assertEqual(kill_cmd[4], 'driver-20171128111415-0001') + @patch('airflow.contrib.kubernetes.kube_client.get_kube_client') + @patch('airflow.contrib.hooks.spark_submit_hook.subprocess.Popen') + def test_k8s_process_on_kill(self, mock_popen, mock_client_method): + # Given + mock_popen.return_value.stdout = six.StringIO('stdout') + mock_popen.return_value.stderr = six.StringIO('stderr') + mock_popen.return_value.poll.return_value = None + mock_popen.return_value.wait.return_value = 0 + client = mock_client_method.return_value + hook = SparkSubmitHook(conn_id='spark_k8s_cluster') + log_lines = [ + 'INFO LoggingPodStatusWatcherImpl:54 - State changed, new state:' + + 'pod name: spark-pi-edf2ace37be7353a958b38733a12f8e6-driver' + + 'namespace: default' + + 'labels: spark-app-selector -> spark-465b868ada474bda82ccb84ab2747fcd,' + + 'spark-role -> driver' + + 'pod uid: ba9c61f6-205f-11e8-b65f-d48564c88e42' + + 'creation time: 2018-03-05T10:26:55Z' + + 'service account name: spark' + + 'volumes: spark-init-properties, download-jars-volume,' + + 'download-files-volume, spark-token-2vmlm' + + 'node name: N/A' + + 'start time: N/A' + + 'container images: N/A' + + 'phase: Pending' + + 'status: []' + + '2018-03-05 11:26:56 INFO LoggingPodStatusWatcherImpl:54 - State changed,' + + ' new state:' + + 'pod name: spark-pi-edf2ace37be7353a958b38733a12f8e6-driver' + + 'namespace: default' + + 'Exit code: 0' + ] + hook._process_spark_submit_log(log_lines) + hook.submit() + + # When + hook.on_kill() + + # Then + import kubernetes + kwargs = {'pretty': True, 'body': kubernetes.client.V1DeleteOptions()} + client.delete_namespaced_pod.assert_called_once_with( + 'spark-pi-edf2ace37be7353a958b38733a12f8e6-driver', + 'mynamespace', **kwargs) + if __name__ == '__main__': unittest.main()