From da95f9e638ec60ff747cd6bd69ec9afe9078ab44 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Fri, 5 Mar 2021 19:34:17 +0000 Subject: [PATCH] Default to Celery Task model when backend model does not exist (#14612) closes https://github.com/apache/airflow/issues/14586 We add this feature in https://github.com/apache/airflow/pull/12336 but looks like for some users this attribute does not exist. I am not sure if they are using a different Celery DB Backend or not but even Celery > 5 contains this attribute (https://github.com/celery/celery/blob/v5.0.5/celery/backends/database/__init__.py#L66) and even Celery 4 but this commits use the Celery Task model when an attribute error occurs GitOrigin-RevId: 33910d6c699b5528db4be40d31199626dafed912 --- airflow/executors/celery_executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 7927dbd7af4..96a78bee3d0 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -35,7 +35,7 @@ from celery import Celery, Task, states as celery_states from celery.backends.base import BaseKeyValueStoreBackend -from celery.backends.database import DatabaseBackend, session_cleanup +from celery.backends.database import DatabaseBackend, Task as TaskDb, session_cleanup from celery.result import AsyncResult from celery.signals import import_modules as celery_import_modules from setproctitle import setproctitle # pylint: disable=no-name-in-module @@ -567,7 +567,7 @@ def _get_many_from_kv_backend(self, async_tasks) -> Mapping[str, EventBufferValu def _get_many_from_db_backend(self, async_tasks) -> Mapping[str, EventBufferValueType]: task_ids = _tasks_list_to_task_ids(async_tasks) session = app.backend.ResultSession() - task_cls = app.backend.task_cls + task_cls = getattr(app.backend, "task_cls", TaskDb) with session_cleanup(session): tasks = session.query(task_cls).filter(task_cls.task_id.in_(task_ids)).all()