diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 3134455a21fd55..537d5acc563351 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -572,9 +572,28 @@ def _labels_to_key(self, labels): ) return None + def _flush_watcher_queue(self): + self.log.debug('Executor shutting down, watcher_queue approx. size=%d', self.watcher_queue.qsize()) + while True: + try: + task = self.watcher_queue.get_nowait() + # Ignoring it since it can only have either FAILED or SUCCEEDED pods + self.log.warning('Executor shutting down, IGNORING watcher task=%s', task) + self.watcher_queue.task_done() + except Empty: + break + def terminate(self): """Termninates the watcher.""" + self.log.debug("Terminating kube_watcher...") + self.kube_watcher.terminate() + self.kube_watcher.join() + self.log.debug("kube_watcher=%s", self.kube_watcher) + self.log.debug("Flushing watcher_queue...") + self._flush_watcher_queue() + # Queue should be empty... self.watcher_queue.join() + self.log.debug("Shutting down manager...") self._manager.shutdown() @@ -768,9 +787,45 @@ def _change_state(self, key, state, pod_id: str) -> None: self.log.debug('Could not find key: %s', str(key)) self.event_buffer[key] = state + def _flush_task_queue(self): + self.log.debug('Executor shutting down, task_queue approximate size=%d', self.task_queue.qsize()) + while True: + try: + task = self.task_queue.get_nowait() + # This is a new task to run thus ok to ignore. + self.log.warning('Executor shutting down, will NOT run task=%s', task) + self.task_queue.task_done() + except Empty: + break + + def _flush_result_queue(self): + self.log.debug('Executor shutting down, result_queue approximate size=%d', self.result_queue.qsize()) + while True: # pylint: disable=too-many-nested-blocks + try: + results = self.result_queue.get_nowait() + self.log.warning('Executor shutting down, flushing results=%s', results) + try: + key, state, pod_id, resource_version = results + self.log.info('Changing state of %s to %s : resource_version=%d', results, state, + resource_version) + try: + self._change_state(key, state, pod_id) + except Exception as e: # pylint: disable=broad-except + self.log.exception('Ignoring exception: %s when attempting to change state of %s ' + 'to %s.', e, results, state) + finally: + self.result_queue.task_done() + except Empty: + break + def end(self): """Called when the executor shuts down""" self.log.info('Shutting down Kubernetes executor') + self.log.debug('Flushing task_queue...') + self._flush_task_queue() + self.log.debug('Flushing result_queue...') + self._flush_result_queue() + # Both queues should be empty... self.task_queue.join() self.result_queue.join() if self.kube_scheduler: