Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark-on-k8s sensor - add driver logs #10023

Merged
merged 6 commits into from
Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions airflow/providers/cncf/kubernetes/hooks/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
# specific language governing permissions and limitations
# under the License.
import tempfile
from typing import Optional, Union
from typing import Generator, Optional, Tuple, Union

import yaml
from kubernetes import client, config
from kubernetes import client, config, watch

from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
Expand Down Expand Up @@ -131,3 +131,50 @@ def get_namespace(self):
extras = connection.extra_dejson
namespace = extras.get("extra__kubernetes__namespace", "default")
return namespace

def get_pod_log_stream(
self, pod_name: str, container: Optional[str] = "", namespace: Optional[str] = None,
) -> Tuple[watch.Watch, Generator[str, None, None]]:
"""
Retrieves a log stream for a container in a kubernetes pod.

:param pod_name: pod name
:type pod_name: str
:param container: container name
:type version: str
:param namespace: kubernetes namespace
:type namespace: str
"""

api = client.CoreV1Api(self.get_conn())
watcher = watch.Watch()
return (
watcher,
watcher.stream(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#9570
Does this problem occur here? What permissions are needed for this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes a role similar to the one below ( specific namespace role or ClusterRole. In any case the operator is constrained to the job namespace )

- apiGroups: [""]
  resources: ["pods", "pods/log"]
  verbs: ["get", "watch", "list"]

It's important to mention that I opted against using the watcher, but left this here for future use ( I'm using read_namespaced_pod_log, which still requires the mentioned permissions )

Should we add a manifest that deals with it for a system test? similar to RBAC yamls from the operator here

In any case I don't think it should block, as it depends on explicitly adding the flag

api.read_namespaced_pod_log,
name=pod_name,
container=container,
namespace=namespace if namespace else self.get_namespace(),
),
)

def get_pod_logs(
self, pod_name: str, container: Optional[str] = "", namespace: Optional[str] = None,
):
"""
Retrieves a container's log from the specified pod.

:param pod_name: pod name
:type pod_name: str
:param container: container name
:type version: str
:param namespace: kubernetes namespace
:type namespace: str
"""
api = client.CoreV1Api(self.get_conn())
return api.read_namespaced_pod_log(
name=pod_name,
container=container,
_preload_content=False,
namespace=namespace if namespace else self.get_namespace(),
)
41 changes: 34 additions & 7 deletions airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# under the License.
from typing import Dict, Optional

from kubernetes import client

from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
Expand All @@ -37,40 +39,65 @@ class SparkKubernetesSensor(BaseSensorOperator):
:type namespace: str
:param kubernetes_conn_id: the connection to Kubernetes cluster
:type kubernetes_conn_id: str
:param attach_log: determines whether logs for driver pod should be appended to the sensor log
:type attach_log: bool
"""

template_fields = ('application_name', 'namespace')
FAILURE_STATES = ('FAILED', 'UNKNOWN')
SUCCESS_STATES = ('COMPLETED',)
template_fields = ("application_name", "namespace")
FAILURE_STATES = ("FAILED", "UNKNOWN")
SUCCESS_STATES = ("COMPLETED",)

@apply_defaults
def __init__(
self,
*,
application_name: str,
attach_log: Optional[bool] = False,
namespace: Optional[str] = None,
kubernetes_conn_id: str = 'kubernetes_default',
kubernetes_conn_id: str = "kubernetes_default",
**kwargs,
):
super().__init__(**kwargs)
self.application_name = application_name
self.attach_log = attach_log
self.namespace = namespace
self.kubernetes_conn_id = kubernetes_conn_id
self.hook = KubernetesHook(conn_id=self.kubernetes_conn_id)

def _log_driver(self, application_state: str):
if not self.attach_log:
return
driver_pod_name = f"{self.application_name}-driver"
log_method = self.log.error if application_state in self.FAILURE_STATES else self.log.info
try:
log = ""
for line in self.hook.get_pod_logs(driver_pod_name):
log += line.decode()
log_method(log)
except client.rest.ApiException as e:
self.log.warning(
"Could not read logs for pod %s. It may have been disposed.\n"
"Make sure timeToLiveSeconds is set on your SparkApplication spec.\n"
"underlying exception: %s",
driver_pod_name,
e,
)

def poke(self, context: Dict):
self.log.info("Poking: %s", self.application_name)
hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
response = hook.get_custom_resource_definition(
response = self.hook.get_custom_resource_definition(
group="sparkoperator.k8s.io",
version="v1beta2",
plural="sparkapplications",
name=self.application_name,
namespace=self.namespace,
)
try:
application_state = response['status']['applicationState']['state']
application_state = response["status"]["applicationState"]["state"]
except KeyError:
return False
if self.attach_log and application_state in self.FAILURE_STATES + self.SUCCESS_STATES:
self._log_driver(application_state)
if application_state in self.FAILURE_STATES:
raise AirflowException("Spark application failed with state: %s" % application_state)
elif application_state in self.SUCCESS_STATES:
Expand Down
Loading