diff --git a/onadata/apps/api/tasks.py b/onadata/apps/api/tasks.py index 75bc3a66a9..02f0226ff8 100644 --- a/onadata/apps/api/tasks.py +++ b/onadata/apps/api/tasks.py @@ -2,7 +2,6 @@ import sys from builtins import str -from celery import task from celery.result import AsyncResult from django.conf import settings from django.core.files.uploadedfile import TemporaryUploadedFile @@ -12,6 +11,7 @@ from django.utils.datastructures import MultiValueDict from past.builtins import basestring +from onadata.celery import app from onadata.apps.api import tools from onadata.apps.logger.models.xform import XForm @@ -23,7 +23,7 @@ def recreate_tmp_file(name, path, mime_type): return tmp_file -@task(bind=True) +@app.task(bind=True) def publish_xlsform_async(self, user_id, post_data, owner_id, file_data): try: files = MultiValueDict() @@ -56,7 +56,7 @@ def publish_xlsform_async(self, user_id, post_data, owner_id, file_data): return {u'error': error_message} -@task() +@app.task() def delete_xform_async(xform_id, user_id): """Soft delete an XForm asynchrounous task""" xform = XForm.objects.get(pk=xform_id) @@ -64,7 +64,7 @@ def delete_xform_async(xform_id, user_id): xform.soft_delete(user) -@task() +@app.task() def delete_user_async(): """Delete inactive user accounts""" users = User.objects.filter(active=False, @@ -105,7 +105,7 @@ def send_generic_email(email, message_txt, subject): email_message.send() -@task() +@app.task() def send_verification_email(email, message_txt, subject): """ Sends a verification email @@ -113,6 +113,6 @@ def send_verification_email(email, message_txt, subject): send_generic_email(email, message_txt, subject) -@task() +@app.task() def send_account_lockout_email(email, message_txt, subject): send_generic_email(email, message_txt, subject) diff --git a/onadata/apps/logger/import_tools.py b/onadata/apps/logger/import_tools.py index e8d2661448..c0933b7bfe 100644 --- a/onadata/apps/logger/import_tools.py +++ b/onadata/apps/logger/import_tools.py @@ -5,7 +5,7 @@ import zipfile from builtins import open -from celery import task +from onadata.celery import app from django.core.files.uploadedfile import InMemoryUploadedFile from onadata.apps.logger.xform_fs import XFormInstanceFS @@ -74,7 +74,7 @@ def import_instance(username, xform_path, photos, osm_files, status, return 0 -@task(ignore_result=True) +@app.task(ignore_result=True) def import_instance_async(username, xform_path, photos, osm_files, status): import_instance(username, xform_path, photos, osm_files, status, False) diff --git a/onadata/apps/logger/models/instance.py b/onadata/apps/logger/models/instance.py index f5ec17d352..571bd6f3ab 100644 --- a/onadata/apps/logger/models/instance.py +++ b/onadata/apps/logger/models/instance.py @@ -5,7 +5,7 @@ import math from datetime import datetime -from celery import task +from onadata.celery import app from django.conf import settings from django.contrib.auth.models import User from django.contrib.gis.db import models @@ -143,7 +143,7 @@ def submission_time(): return timezone.now() -@task +@app.task @transaction.atomic() def update_xform_submission_count(instance_id, created): if created: @@ -215,7 +215,7 @@ def update_xform_submission_count_delete(sender, instance, **kwargs): xform.save() -@task +@app.task def save_full_json(instance_id, created): """set json data, ensure the primary key is part of the json data""" if created: @@ -228,7 +228,7 @@ def save_full_json(instance_id, created): instance.save(update_fields=['json']) -@task +@app.task def update_project_date_modified(instance_id, created): # update the date modified field of the project which will change # the etag value of the projects endpoint diff --git a/onadata/apps/messaging/tasks.py b/onadata/apps/messaging/tasks.py index 5c9156a333..e86cee23a3 100644 --- a/onadata/apps/messaging/tasks.py +++ b/onadata/apps/messaging/tasks.py @@ -4,12 +4,12 @@ """ from __future__ import unicode_literals -from celery import task +from onadata.celery import app from onadata.apps.messaging.backends.base import call_backend -@task(ignore_result=True) +@app.task(ignore_result=True) def call_backend_async(backend, instance_id, backend_options=None): """ Task to send messages to notification backeds such as MQTT diff --git a/onadata/apps/restservice/tasks.py b/onadata/apps/restservice/tasks.py index fa66a41530..27b6759378 100644 --- a/onadata/apps/restservice/tasks.py +++ b/onadata/apps/restservice/tasks.py @@ -1,9 +1,9 @@ -from celery import task +from onadata.celery import app from onadata.apps.restservice.utils import call_service -@task() +@app.task() def call_service_async(instance_pk): # load the parsed instance from onadata.apps.logger.models.instance import Instance diff --git a/onadata/apps/viewer/tasks.py b/onadata/apps/viewer/tasks.py index ce69480162..9ffa8ea5b7 100644 --- a/onadata/apps/viewer/tasks.py +++ b/onadata/apps/viewer/tasks.py @@ -11,7 +11,7 @@ from django.shortcuts import get_object_or_404 from django.utils import timezone -from celery import task +from onadata.celery import app from kombu.exceptions import OperationalError from requests import ConnectionError @@ -121,7 +121,7 @@ def _create_export(xform, export_type, options): return None -@task(track_started=True) +@app.task(track_started=True) def create_xls_export(username, id_string, export_id, **options): """ XLS export task. @@ -159,7 +159,7 @@ def create_xls_export(username, id_string, export_id, **options): return gen_export.id -@task(track_started=True) +@app.task(track_started=True) def create_csv_export(username, id_string, export_id, **options): """ CSV export task. @@ -194,7 +194,7 @@ def create_csv_export(username, id_string, export_id, **options): return gen_export.id -@task(track_started=True) +@app.task(track_started=True) def create_kml_export(username, id_string, export_id, **options): """ KML export task. @@ -227,7 +227,7 @@ def create_kml_export(username, id_string, export_id, **options): return gen_export.id -@task(track_started=True) +@app.task(track_started=True) def create_osm_export(username, id_string, export_id, **options): """ OSM export task. @@ -261,7 +261,7 @@ def create_osm_export(username, id_string, export_id, **options): return gen_export.id -@task(track_started=True) +@app.task(track_started=True) def create_zip_export(username, id_string, export_id, **options): """ Attachments zip export task. @@ -293,7 +293,7 @@ def create_zip_export(username, id_string, export_id, **options): return gen_export.id -@task(track_started=True) +@app.task(track_started=True) def create_csv_zip_export(username, id_string, export_id, **options): """ CSV zip export task. @@ -320,7 +320,7 @@ def create_csv_zip_export(username, id_string, export_id, **options): return gen_export.id -@task(track_started=True) +@app.task(track_started=True) def create_sav_zip_export(username, id_string, export_id, **options): """ SPSS sav export task. @@ -346,7 +346,7 @@ def create_sav_zip_export(username, id_string, export_id, **options): return gen_export.id -@task(track_started=True) +@app.task(track_started=True) def create_external_export(username, id_string, export_id, **options): """ XLSReport export task. @@ -376,7 +376,7 @@ def create_external_export(username, id_string, export_id, **options): return gen_export.id -@task(track_started=True) +@app.task(track_started=True) def create_google_sheet_export(username, id_string, export_id, **options): """ Google Sheets export task. @@ -403,7 +403,7 @@ def create_google_sheet_export(username, id_string, export_id, **options): return gen_export.id -@task(track_started=True) +@app.task(track_started=True) def delete_export(export_id): """ Delete export task with id export_id. @@ -418,7 +418,7 @@ def delete_export(export_id): return False -@task(ignore_result=True) +@app.task(ignore_result=True) def mark_expired_pending_exports_as_failed(): # pylint: disable=invalid-name """ Exports that have not completed within a set time should be marked as @@ -431,7 +431,7 @@ def mark_expired_pending_exports_as_failed(): # pylint: disable=invalid-name exports.update(internal_status=Export.FAILED) -@task(ignore_result=True) +@app.task(ignore_result=True) def delete_expired_failed_exports(): """ Delete old failed exports diff --git a/onadata/libs/tests/utils/test_api_export_tools.py b/onadata/libs/tests/utils/test_api_export_tools.py index 34c406abf8..8b3c420769 100644 --- a/onadata/libs/tests/utils/test_api_export_tools.py +++ b/onadata/libs/tests/utils/test_api_export_tools.py @@ -5,7 +5,6 @@ from collections import OrderedDict, defaultdict import mock -from celery.backends.amqp import BacklogLimitExceeded from django.http import Http404 from django.test.utils import override_settings from kombu.exceptions import OperationalError @@ -102,33 +101,6 @@ def __init__(self): with self.assertRaises(Http404): get_async_response('job_uuid', request, self.xform) - # pylint: disable=invalid-name - @mock.patch('onadata.libs.utils.api_export_tools.AsyncResult') - @override_settings(CELERY_TASK_ALWAYS_EAGER=True) - def test_get_async_response_export_backlog_limit(self, AsyncResult): - """ - Test get_async_response export backlog limit exceeded. - """ - - class MockAsyncResult(object): # pylint: disable=R0903 - """Mock AsyncResult""" - - def __init__(self): - pass - - @property - def state(self): - """Raise BacklogLimitExceeded""" - raise BacklogLimitExceeded() - - AsyncResult.return_value = MockAsyncResult() - self._publish_transportation_form_and_submit_instance() - request = self.factory.post('/') - request.user = self.user - - result = get_async_response('job_uuid', request, self.xform) - self.assertEqual(result, {'job_status': 'PENDING'}) - def test_response_for_format(self): """ Test response format type. diff --git a/onadata/libs/tests/utils/test_csv_import.py b/onadata/libs/tests/utils/test_csv_import.py index 9adfb9fca2..937d2f828c 100644 --- a/onadata/libs/tests/utils/test_csv_import.py +++ b/onadata/libs/tests/utils/test_csv_import.py @@ -8,7 +8,6 @@ import mock import unicodecsv as ucsv -from celery.backends.amqp import BacklogLimitExceeded from django.conf import settings from mock import patch diff --git a/onadata/libs/utils/api_export_tools.py b/onadata/libs/utils/api_export_tools.py index 33afd2cd0d..a778852da8 100644 --- a/onadata/libs/utils/api_export_tools.py +++ b/onadata/libs/utils/api_export_tools.py @@ -8,7 +8,6 @@ from datetime import datetime import httplib2 -from celery.backends.amqp import BacklogLimitExceeded from celery.result import AsyncResult from django.conf import settings from django.http import Http404, HttpResponseRedirect @@ -516,9 +515,6 @@ def _get_response(): raise ServiceUnavailable return get_async_response(job_uuid, request, xform, count + 1) - except BacklogLimitExceeded: - # most likely still processing - resp = async_status(celery_state_to_status('PENDING')) return resp diff --git a/onadata/libs/utils/csv_import.py b/onadata/libs/utils/csv_import.py index dcdd7f036d..53a80c4f56 100644 --- a/onadata/libs/utils/csv_import.py +++ b/onadata/libs/utils/csv_import.py @@ -16,8 +16,7 @@ import unicodecsv as ucsv import xlrd -from celery import current_task, task -from celery.backends.amqp import BacklogLimitExceeded +from celery import current_task from celery.result import AsyncResult from dateutil.parser import parse from django.conf import settings @@ -28,6 +27,7 @@ from future.utils import iteritems from multidb.pinning import use_master +from onadata.celery import app from onadata.apps.logger.models import Instance, XForm from onadata.apps.messaging.constants import XFORM, SUBMISSION_DELETED from onadata.apps.messaging.serializers import send_message @@ -142,7 +142,7 @@ def dict_pathkeys_to_nested_dicts(dictionary): return data -@task() +@app.task() def submit_csv_async(username, xform_id, file_path, overwrite=False): """Imports CSV data to an existing xform asynchrounously.""" xform = XForm.objects.get(pk=xform_id) @@ -489,21 +489,17 @@ def get_async_csv_submission_status(job_uuid): return async_status(FAILED, u'Empty job uuid') job = AsyncResult(job_uuid) - try: - # result = (job.result or job.state) - if job.state not in ['SUCCESS', 'FAILURE']: - response = async_status(celery_state_to_status(job.state)) - if isinstance(job.info, dict): - response.update(job.info) - - return response + # result = (job.result or job.state) + if job.state not in ['SUCCESS', 'FAILURE']: + response = async_status(celery_state_to_status(job.state)) + if isinstance(job.info, dict): + response.update(job.info) - if job.state == 'FAILURE': - return async_status( - celery_state_to_status(job.state), text(job.result)) + return response - except BacklogLimitExceeded: - return async_status(celery_state_to_status('PENDING')) + if job.state == 'FAILURE': + return async_status( + celery_state_to_status(job.state), text(job.result)) return job.get() diff --git a/onadata/libs/utils/osm.py b/onadata/libs/utils/osm.py index 48e7805cac..e7547ce809 100644 --- a/onadata/libs/utils/osm.py +++ b/onadata/libs/utils/osm.py @@ -6,7 +6,7 @@ import logging -from celery import task +from onadata.celery import app from django.contrib.gis.geos import (GeometryCollection, LineString, Point, Polygon) from django.contrib.gis.geos.error import GEOSException @@ -140,7 +140,7 @@ def parse_osm(osm_xml, include_osm_id=False): return nodes -@task() +@app.task() def save_osm_data_async(instance_id): """ Async task for saving OSM data for the specified submission. diff --git a/onadata/libs/utils/project_utils.py b/onadata/libs/utils/project_utils.py index 9035565175..58860a66fe 100644 --- a/onadata/libs/utils/project_utils.py +++ b/onadata/libs/utils/project_utils.py @@ -4,7 +4,7 @@ """ import sys -from celery import task +from onadata.celery import app from django.conf import settings from django.db import IntegrityError @@ -59,7 +59,7 @@ def set_project_perms_to_xform(xform, project): # pylint: disable=invalid-name -@task(bind=True, max_retries=3) +@app.task(bind=True, max_retries=3) def set_project_perms_to_xform_async(self, xform_id, project_id): """ Apply project permissions for ``project_id`` to a form ``xform_id`` task. diff --git a/onadata/settings/common.py b/onadata/settings/common.py index dd30b4108e..d9ac40abc8 100644 --- a/onadata/settings/common.py +++ b/onadata/settings/common.py @@ -210,8 +210,7 @@ 'onadata.libs', 'reversion', 'actstream', - 'onadata.apps.messaging.apps.MessagingConfig', - 'django_celery_results', + 'onadata.apps.messaging.apps.MessagingConfig' ) OAUTH2_PROVIDER = {