Skip to content

Commit

Permalink
Spark-on-k8s sensor logs - properly pass defined namespace to pod log…
Browse files Browse the repository at this point in the history
… call (#11199)

This is a follow up to #10023 - it seems that passing the defined namespace to the log call was missed in one of the rebases done during the PR.
Without this fix, logging will fail when the k8s connection uses a different namespace than the one SparkApplication(s) are actually submitted to.
  • Loading branch information
bbenzikry authored Nov 23, 2020
1 parent c133df8 commit c02a3f5
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
15 changes: 11 additions & 4 deletions airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,21 @@ def __init__(
self.kubernetes_conn_id = kubernetes_conn_id
self.hook = KubernetesHook(conn_id=self.kubernetes_conn_id)

def _log_driver(self, application_state: str) -> None:
def _log_driver(self, application_state: str, response: dict) -> None:
if not self.attach_log:
return
driver_pod_name = f"{self.application_name}-driver"
status_info = response["status"]
if "driverInfo" not in status_info:
return
driver_info = status_info["driverInfo"]
if "podName" not in driver_info:
return
driver_pod_name = driver_info["podName"]
namespace = response["metadata"]["namespace"]
log_method = self.log.error if application_state in self.FAILURE_STATES else self.log.info
try:
log = ""
for line in self.hook.get_pod_logs(driver_pod_name):
for line in self.hook.get_pod_logs(driver_pod_name, namespace=namespace):
log += line.decode()
log_method(log)
except client.rest.ApiException as e:
Expand All @@ -97,7 +104,7 @@ def poke(self, context: Dict) -> bool:
except KeyError:
return False
if self.attach_log and application_state in self.FAILURE_STATES + self.SUCCESS_STATES:
self._log_driver(application_state)
self._log_driver(application_state, response)
if application_state in self.FAILURE_STATES:
raise AirflowException("Spark application failed with state: %s" % application_state)
elif application_state in self.SUCCESS_STATES:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ def test_driver_logging_failure(
task_id="test_task_id",
)
self.assertRaises(AirflowException, sensor.poke, None)
mock_log_call.assert_called_once_with("spark_pi-driver")
mock_log_call.assert_called_once_with("spark-pi-driver", namespace="default")
error_log_call.assert_called_once_with(TEST_POD_LOG_RESULT)

@patch(
Expand All @@ -719,7 +719,7 @@ def test_driver_logging_completed(
task_id="test_task_id",
)
sensor.poke(None)
mock_log_call.assert_called_once_with("spark_pi-driver")
mock_log_call.assert_called_once_with("spark-pi-2020-02-24-1-driver", namespace="default")
log_info_call = info_log_call.mock_calls[1]
log_value = log_info_call[1][0]
self.assertEqual(log_value, TEST_POD_LOG_RESULT)
Expand Down

0 comments on commit c02a3f5

Please sign in to comment.