Skip to content

Commit

Permalink
Default to Celery Task model when backend model does not exist (#14612)
Browse files Browse the repository at this point in the history
closes #14586

We add this feature in #12336
but looks like for some users this attribute does not exist.

I am not sure if they are using a different Celery DB Backend or not
but even Celery > 5 contains this attribute
(https://github.com/celery/celery/blob/v5.0.5/celery/backends/database/__init__.py#L66)

and even Celery 4 but this commits use the Celery Task model when an attribute
error occurs
  • Loading branch information
kaxil authored Mar 5, 2021
1 parent 511f042 commit 33910d6
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

from celery import Celery, Task, states as celery_states
from celery.backends.base import BaseKeyValueStoreBackend
from celery.backends.database import DatabaseBackend, session_cleanup
from celery.backends.database import DatabaseBackend, Task as TaskDb, session_cleanup
from celery.result import AsyncResult
from celery.signals import import_modules as celery_import_modules
from setproctitle import setproctitle # pylint: disable=no-name-in-module
Expand Down Expand Up @@ -567,7 +567,7 @@ def _get_many_from_kv_backend(self, async_tasks) -> Mapping[str, EventBufferValu
def _get_many_from_db_backend(self, async_tasks) -> Mapping[str, EventBufferValueType]:
task_ids = _tasks_list_to_task_ids(async_tasks)
session = app.backend.ResultSession()
task_cls = app.backend.task_cls
task_cls = getattr(app.backend, "task_cls", TaskDb)
with session_cleanup(session):
tasks = session.query(task_cls).filter(task_cls.task_id.in_(task_ids)).all()

Expand Down

0 comments on commit 33910d6

Please sign in to comment.