Skip to content

Commit

Permalink
Spark-on-K8S sensor - add driver logs (#10023)
Browse files Browse the repository at this point in the history
  • Loading branch information
bbenzikry authored Aug 26, 2020
1 parent 3b0aa16 commit 1e5aa44
Show file tree
Hide file tree
Showing 3 changed files with 582 additions and 426 deletions.
51 changes: 49 additions & 2 deletions airflow/providers/cncf/kubernetes/hooks/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
# specific language governing permissions and limitations
# under the License.
import tempfile
from typing import Optional, Union
from typing import Generator, Optional, Tuple, Union

import yaml
from kubernetes import client, config
from kubernetes import client, config, watch

from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
Expand Down Expand Up @@ -131,3 +131,50 @@ def get_namespace(self):
extras = connection.extra_dejson
namespace = extras.get("extra__kubernetes__namespace", "default")
return namespace

def get_pod_log_stream(
self, pod_name: str, container: Optional[str] = "", namespace: Optional[str] = None,
) -> Tuple[watch.Watch, Generator[str, None, None]]:
"""
Retrieves a log stream for a container in a kubernetes pod.
:param pod_name: pod name
:type pod_name: str
:param container: container name
:type version: str
:param namespace: kubernetes namespace
:type namespace: str
"""

api = client.CoreV1Api(self.get_conn())
watcher = watch.Watch()
return (
watcher,
watcher.stream(
api.read_namespaced_pod_log,
name=pod_name,
container=container,
namespace=namespace if namespace else self.get_namespace(),
),
)

def get_pod_logs(
self, pod_name: str, container: Optional[str] = "", namespace: Optional[str] = None,
):
"""
Retrieves a container's log from the specified pod.
:param pod_name: pod name
:type pod_name: str
:param container: container name
:type version: str
:param namespace: kubernetes namespace
:type namespace: str
"""
api = client.CoreV1Api(self.get_conn())
return api.read_namespaced_pod_log(
name=pod_name,
container=container,
_preload_content=False,
namespace=namespace if namespace else self.get_namespace(),
)
41 changes: 34 additions & 7 deletions airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# under the License.
from typing import Dict, Optional

from kubernetes import client

from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
Expand All @@ -37,40 +39,65 @@ class SparkKubernetesSensor(BaseSensorOperator):
:type namespace: str
:param kubernetes_conn_id: the connection to Kubernetes cluster
:type kubernetes_conn_id: str
:param attach_log: determines whether logs for driver pod should be appended to the sensor log
:type attach_log: bool
"""

template_fields = ('application_name', 'namespace')
FAILURE_STATES = ('FAILED', 'UNKNOWN')
SUCCESS_STATES = ('COMPLETED',)
template_fields = ("application_name", "namespace")
FAILURE_STATES = ("FAILED", "UNKNOWN")
SUCCESS_STATES = ("COMPLETED",)

@apply_defaults
def __init__(
self,
*,
application_name: str,
attach_log: Optional[bool] = False,
namespace: Optional[str] = None,
kubernetes_conn_id: str = 'kubernetes_default',
kubernetes_conn_id: str = "kubernetes_default",
**kwargs,
):
super().__init__(**kwargs)
self.application_name = application_name
self.attach_log = attach_log
self.namespace = namespace
self.kubernetes_conn_id = kubernetes_conn_id
self.hook = KubernetesHook(conn_id=self.kubernetes_conn_id)

def _log_driver(self, application_state: str):
if not self.attach_log:
return
driver_pod_name = f"{self.application_name}-driver"
log_method = self.log.error if application_state in self.FAILURE_STATES else self.log.info
try:
log = ""
for line in self.hook.get_pod_logs(driver_pod_name):
log += line.decode()
log_method(log)
except client.rest.ApiException as e:
self.log.warning(
"Could not read logs for pod %s. It may have been disposed.\n"
"Make sure timeToLiveSeconds is set on your SparkApplication spec.\n"
"underlying exception: %s",
driver_pod_name,
e,
)

def poke(self, context: Dict):
self.log.info("Poking: %s", self.application_name)
hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
response = hook.get_custom_resource_definition(
response = self.hook.get_custom_resource_definition(
group="sparkoperator.k8s.io",
version="v1beta2",
plural="sparkapplications",
name=self.application_name,
namespace=self.namespace,
)
try:
application_state = response['status']['applicationState']['state']
application_state = response["status"]["applicationState"]["state"]
except KeyError:
return False
if self.attach_log and application_state in self.FAILURE_STATES + self.SUCCESS_STATES:
self._log_driver(application_state)
if application_state in self.FAILURE_STATES:
raise AirflowException("Spark application failed with state: %s" % application_state)
elif application_state in self.SUCCESS_STATES:
Expand Down
Loading

0 comments on commit 1e5aa44

Please sign in to comment.