Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIRFLOW-5581: Join KubernetesJobWatcher in terminate call and unblock queues from blocking forever #6237

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,28 @@ def _labels_to_key(self, labels):
)
return None

def _flush_watcher_queue(self):
self.log.debug('Executor shutting down, watcher_queue approx. size=%d', self.watcher_queue.qsize())
while True:
try:
task = self.watcher_queue.get_nowait()
# Ignoring it since it can only have either FAILED or SUCCEEDED pods
self.log.warning('Executor shutting down, IGNORING watcher task=%s', task)
self.watcher_queue.task_done()
except Empty:
break

def terminate(self):
"""Termninates the watcher."""
self.log.debug("Terminating kube_watcher...")
self.kube_watcher.terminate()
self.kube_watcher.join()
ashb marked this conversation as resolved.
Show resolved Hide resolved
self.log.debug("kube_watcher=%s", self.kube_watcher)
self.log.debug("Flushing watcher_queue...")
self._flush_watcher_queue()
# Queue should be empty...
self.watcher_queue.join()
self.log.debug("Shutting down manager...")
self._manager.shutdown()


Expand Down Expand Up @@ -768,9 +787,45 @@ def _change_state(self, key, state, pod_id: str) -> None:
self.log.debug('Could not find key: %s', str(key))
self.event_buffer[key] = state

def _flush_task_queue(self):
self.log.debug('Executor shutting down, task_queue approximate size=%d', self.task_queue.qsize())
while True:
try:
task = self.task_queue.get_nowait()
# This is a new task to run thus ok to ignore.
self.log.warning('Executor shutting down, will NOT run task=%s', task)
self.task_queue.task_done()
except Empty:
break

def _flush_result_queue(self):
self.log.debug('Executor shutting down, result_queue approximate size=%d', self.result_queue.qsize())
while True: # pylint: disable=too-many-nested-blocks
try:
results = self.result_queue.get_nowait()
self.log.warning('Executor shutting down, flushing results=%s', results)
try:
key, state, pod_id, resource_version = results
self.log.info('Changing state of %s to %s : resource_version=%d', results, state,
resource_version)
try:
self._change_state(key, state, pod_id)
except Exception as e: # pylint: disable=broad-except
self.log.exception('Ignoring exception: %s when attempting to change state of %s '
'to %s.', e, results, state)
finally:
self.result_queue.task_done()
except Empty:
break

def end(self):
"""Called when the executor shuts down"""
self.log.info('Shutting down Kubernetes executor')
self.log.debug('Flushing task_queue...')
self._flush_task_queue()
dimberman marked this conversation as resolved.
Show resolved Hide resolved
self.log.debug('Flushing result_queue...')
self._flush_result_queue()
# Both queues should be empty...
self.task_queue.join()
self.result_queue.join()
if self.kube_scheduler:
Expand Down