Skip to content

Commit

Permalink
Utilize celery app to register tasks and remove usage of celery amqp …
Browse files Browse the repository at this point in the history
…backend
  • Loading branch information
DavisRayM committed Sep 25, 2020
1 parent b044bbe commit 6b6caa2
Show file tree
Hide file tree
Showing 13 changed files with 46 additions and 84 deletions.
12 changes: 6 additions & 6 deletions onadata/apps/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import sys
from builtins import str

from celery import task
from celery.result import AsyncResult
from django.conf import settings
from django.core.files.uploadedfile import TemporaryUploadedFile
Expand All @@ -12,6 +11,7 @@
from django.utils.datastructures import MultiValueDict
from past.builtins import basestring

from onadata.celery import app
from onadata.apps.api import tools
from onadata.apps.logger.models.xform import XForm

Expand All @@ -23,7 +23,7 @@ def recreate_tmp_file(name, path, mime_type):
return tmp_file


@task(bind=True)
@app.task(bind=True)
def publish_xlsform_async(self, user_id, post_data, owner_id, file_data):
try:
files = MultiValueDict()
Expand Down Expand Up @@ -56,15 +56,15 @@ def publish_xlsform_async(self, user_id, post_data, owner_id, file_data):
return {u'error': error_message}


@task()
@app.task()
def delete_xform_async(xform_id, user_id):
"""Soft delete an XForm asynchrounous task"""
xform = XForm.objects.get(pk=xform_id)
user = User.objects.get(pk=user_id)
xform.soft_delete(user)


@task()
@app.task()
def delete_user_async():
"""Delete inactive user accounts"""
users = User.objects.filter(active=False,
Expand Down Expand Up @@ -105,14 +105,14 @@ def send_generic_email(email, message_txt, subject):
email_message.send()


@task()
@app.task()
def send_verification_email(email, message_txt, subject):
"""
Sends a verification email
"""
send_generic_email(email, message_txt, subject)


@task()
@app.task()
def send_account_lockout_email(email, message_txt, subject):
send_generic_email(email, message_txt, subject)
4 changes: 2 additions & 2 deletions onadata/apps/logger/import_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import zipfile
from builtins import open

from celery import task
from onadata.celery import app
from django.core.files.uploadedfile import InMemoryUploadedFile

from onadata.apps.logger.xform_fs import XFormInstanceFS
Expand Down Expand Up @@ -74,7 +74,7 @@ def import_instance(username, xform_path, photos, osm_files, status,
return 0


@task(ignore_result=True)
@app.task(ignore_result=True)
def import_instance_async(username, xform_path, photos, osm_files, status):
import_instance(username, xform_path, photos, osm_files, status, False)

Expand Down
8 changes: 4 additions & 4 deletions onadata/apps/logger/models/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import math
from datetime import datetime

from celery import task
from onadata.celery import app
from django.conf import settings
from django.contrib.auth.models import User
from django.contrib.gis.db import models
Expand Down Expand Up @@ -143,7 +143,7 @@ def submission_time():
return timezone.now()


@task
@app.task
@transaction.atomic()
def update_xform_submission_count(instance_id, created):
if created:
Expand Down Expand Up @@ -215,7 +215,7 @@ def update_xform_submission_count_delete(sender, instance, **kwargs):
xform.save()


@task
@app.task
def save_full_json(instance_id, created):
"""set json data, ensure the primary key is part of the json data"""
if created:
Expand All @@ -228,7 +228,7 @@ def save_full_json(instance_id, created):
instance.save(update_fields=['json'])


@task
@app.task
def update_project_date_modified(instance_id, created):
# update the date modified field of the project which will change
# the etag value of the projects endpoint
Expand Down
4 changes: 2 additions & 2 deletions onadata/apps/messaging/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
"""
from __future__ import unicode_literals

from celery import task
from onadata.celery import app

from onadata.apps.messaging.backends.base import call_backend


@task(ignore_result=True)
@app.task(ignore_result=True)
def call_backend_async(backend, instance_id, backend_options=None):
"""
Task to send messages to notification backeds such as MQTT
Expand Down
4 changes: 2 additions & 2 deletions onadata/apps/restservice/tasks.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from celery import task
from onadata.celery import app

from onadata.apps.restservice.utils import call_service


@task()
@app.task()
def call_service_async(instance_pk):
# load the parsed instance
from onadata.apps.logger.models.instance import Instance
Expand Down
26 changes: 13 additions & 13 deletions onadata/apps/viewer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from django.shortcuts import get_object_or_404
from django.utils import timezone

from celery import task
from onadata.celery import app
from kombu.exceptions import OperationalError
from requests import ConnectionError

Expand Down Expand Up @@ -121,7 +121,7 @@ def _create_export(xform, export_type, options):
return None


@task(track_started=True)
@app.task(track_started=True)
def create_xls_export(username, id_string, export_id, **options):
"""
XLS export task.
Expand Down Expand Up @@ -159,7 +159,7 @@ def create_xls_export(username, id_string, export_id, **options):
return gen_export.id


@task(track_started=True)
@app.task(track_started=True)
def create_csv_export(username, id_string, export_id, **options):
"""
CSV export task.
Expand Down Expand Up @@ -194,7 +194,7 @@ def create_csv_export(username, id_string, export_id, **options):
return gen_export.id


@task(track_started=True)
@app.task(track_started=True)
def create_kml_export(username, id_string, export_id, **options):
"""
KML export task.
Expand Down Expand Up @@ -227,7 +227,7 @@ def create_kml_export(username, id_string, export_id, **options):
return gen_export.id


@task(track_started=True)
@app.task(track_started=True)
def create_osm_export(username, id_string, export_id, **options):
"""
OSM export task.
Expand Down Expand Up @@ -261,7 +261,7 @@ def create_osm_export(username, id_string, export_id, **options):
return gen_export.id


@task(track_started=True)
@app.task(track_started=True)
def create_zip_export(username, id_string, export_id, **options):
"""
Attachments zip export task.
Expand Down Expand Up @@ -293,7 +293,7 @@ def create_zip_export(username, id_string, export_id, **options):
return gen_export.id


@task(track_started=True)
@app.task(track_started=True)
def create_csv_zip_export(username, id_string, export_id, **options):
"""
CSV zip export task.
Expand All @@ -320,7 +320,7 @@ def create_csv_zip_export(username, id_string, export_id, **options):
return gen_export.id


@task(track_started=True)
@app.task(track_started=True)
def create_sav_zip_export(username, id_string, export_id, **options):
"""
SPSS sav export task.
Expand All @@ -346,7 +346,7 @@ def create_sav_zip_export(username, id_string, export_id, **options):
return gen_export.id


@task(track_started=True)
@app.task(track_started=True)
def create_external_export(username, id_string, export_id, **options):
"""
XLSReport export task.
Expand Down Expand Up @@ -376,7 +376,7 @@ def create_external_export(username, id_string, export_id, **options):
return gen_export.id


@task(track_started=True)
@app.task(track_started=True)
def create_google_sheet_export(username, id_string, export_id, **options):
"""
Google Sheets export task.
Expand All @@ -403,7 +403,7 @@ def create_google_sheet_export(username, id_string, export_id, **options):
return gen_export.id


@task(track_started=True)
@app.task(track_started=True)
def delete_export(export_id):
"""
Delete export task with id export_id.
Expand All @@ -418,7 +418,7 @@ def delete_export(export_id):
return False


@task(ignore_result=True)
@app.task(ignore_result=True)
def mark_expired_pending_exports_as_failed(): # pylint: disable=invalid-name
"""
Exports that have not completed within a set time should be marked as
Expand All @@ -431,7 +431,7 @@ def mark_expired_pending_exports_as_failed(): # pylint: disable=invalid-name
exports.update(internal_status=Export.FAILED)


@task(ignore_result=True)
@app.task(ignore_result=True)
def delete_expired_failed_exports():
"""
Delete old failed exports
Expand Down
28 changes: 0 additions & 28 deletions onadata/libs/tests/utils/test_api_export_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from collections import OrderedDict, defaultdict

import mock
from celery.backends.amqp import BacklogLimitExceeded
from django.http import Http404
from django.test.utils import override_settings
from kombu.exceptions import OperationalError
Expand Down Expand Up @@ -102,33 +101,6 @@ def __init__(self):
with self.assertRaises(Http404):
get_async_response('job_uuid', request, self.xform)

# pylint: disable=invalid-name
@mock.patch('onadata.libs.utils.api_export_tools.AsyncResult')
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
def test_get_async_response_export_backlog_limit(self, AsyncResult):
"""
Test get_async_response export backlog limit exceeded.
"""

class MockAsyncResult(object): # pylint: disable=R0903
"""Mock AsyncResult"""

def __init__(self):
pass

@property
def state(self):
"""Raise BacklogLimitExceeded"""
raise BacklogLimitExceeded()

AsyncResult.return_value = MockAsyncResult()
self._publish_transportation_form_and_submit_instance()
request = self.factory.post('/')
request.user = self.user

result = get_async_response('job_uuid', request, self.xform)
self.assertEqual(result, {'job_status': 'PENDING'})

def test_response_for_format(self):
"""
Test response format type.
Expand Down
1 change: 0 additions & 1 deletion onadata/libs/tests/utils/test_csv_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import mock
import unicodecsv as ucsv
from celery.backends.amqp import BacklogLimitExceeded
from django.conf import settings
from mock import patch

Expand Down
4 changes: 0 additions & 4 deletions onadata/libs/utils/api_export_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from datetime import datetime

import httplib2
from celery.backends.amqp import BacklogLimitExceeded
from celery.result import AsyncResult
from django.conf import settings
from django.http import Http404, HttpResponseRedirect
Expand Down Expand Up @@ -516,9 +515,6 @@ def _get_response():
raise ServiceUnavailable

return get_async_response(job_uuid, request, xform, count + 1)
except BacklogLimitExceeded:
# most likely still processing
resp = async_status(celery_state_to_status('PENDING'))

return resp

Expand Down
28 changes: 12 additions & 16 deletions onadata/libs/utils/csv_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

import unicodecsv as ucsv
import xlrd
from celery import current_task, task
from celery.backends.amqp import BacklogLimitExceeded
from celery import current_task
from celery.result import AsyncResult
from dateutil.parser import parse
from django.conf import settings
Expand All @@ -28,6 +27,7 @@
from future.utils import iteritems
from multidb.pinning import use_master

from onadata.celery import app
from onadata.apps.logger.models import Instance, XForm
from onadata.apps.messaging.constants import XFORM, SUBMISSION_DELETED
from onadata.apps.messaging.serializers import send_message
Expand Down Expand Up @@ -142,7 +142,7 @@ def dict_pathkeys_to_nested_dicts(dictionary):
return data


@task()
@app.task()
def submit_csv_async(username, xform_id, file_path, overwrite=False):
"""Imports CSV data to an existing xform asynchrounously."""
xform = XForm.objects.get(pk=xform_id)
Expand Down Expand Up @@ -489,21 +489,17 @@ def get_async_csv_submission_status(job_uuid):
return async_status(FAILED, u'Empty job uuid')

job = AsyncResult(job_uuid)
try:
# result = (job.result or job.state)
if job.state not in ['SUCCESS', 'FAILURE']:
response = async_status(celery_state_to_status(job.state))
if isinstance(job.info, dict):
response.update(job.info)

return response
# result = (job.result or job.state)
if job.state not in ['SUCCESS', 'FAILURE']:
response = async_status(celery_state_to_status(job.state))
if isinstance(job.info, dict):
response.update(job.info)

if job.state == 'FAILURE':
return async_status(
celery_state_to_status(job.state), text(job.result))
return response

except BacklogLimitExceeded:
return async_status(celery_state_to_status('PENDING'))
if job.state == 'FAILURE':
return async_status(
celery_state_to_status(job.state), text(job.result))

return job.get()

Expand Down
Loading

0 comments on commit 6b6caa2

Please sign in to comment.