Skip to content

Commit

Permalink
Fix stale data sent to rest services after editing submission (#2667)
Browse files Browse the repository at this point in the history
* fix stale data sent to rest services when editing submission

The data sent to rest services can be stale if the instance is not yet saved to the database when a submission is edited

* pin to master

* add comment

* fix typo in comment

* Wait for transaction to complete before calling webhooks (#2675)

* wait for transaction to complete before calling webhooks

* refactor code

* refactor code

* fix exception

fix exception AttributeError: 'ParsedInstance' object has no attribute 'xform'

* fix failing tests

capture on commit callbacks

* move misplaced comment

* use master database when calling external services

Call webhooks is a second-effect of creating/updating a submission. The operation should be pinned to master database, so that the replicas can be used by the clients reading from the API

* fix failing tests

* fix failing tests
  • Loading branch information
kelvin-muchiri committed Aug 23, 2024
1 parent 4bb2c1d commit fea517d
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 68 deletions.
5 changes: 4 additions & 1 deletion onadata/apps/api/tests/viewsets/test_data_viewset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3782,7 +3782,10 @@ def test_data_retrieve_instance_osm_format(self):
self._publish_xls_form_to_project(xlsform_path=xlsform_path)
submission_path = os.path.join(osm_fixtures_dir, "instance_a.xml")
files = [open(path, "rb") for path in paths]
self._make_submission(submission_path, media_file=files)

with self.captureOnCommitCallbacks(execute=True):
# Ensure on commit callbacks are executed
self._make_submission(submission_path, media_file=files)
self.assertTrue(hasattr(self, "instance"))
self.assertEqual(self.instance.attachments.all().count(), len(files))

Expand Down
12 changes: 9 additions & 3 deletions onadata/apps/api/tests/viewsets/test_osm_viewset.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,15 @@ def _publish_osm_with_submission(self):
count = Attachment.objects.filter(extension="osm").count()
count_osm = OsmData.objects.count()
_submission_time = parse_datetime("2013-02-18 15:54:01Z")
self._make_submission(
submission_path, media_file=files, forced_submission_time=_submission_time
)

with self.captureOnCommitCallbacks(execute=True):
# Ensure on commit callbacks are executed
self._make_submission(
submission_path,
media_file=files,
forced_submission_time=_submission_time,
)

self.assertTrue(Attachment.objects.filter(extension="osm").count() > count)
self.assertEqual(OsmData.objects.count(), count_osm + 2)

Expand Down
32 changes: 20 additions & 12 deletions onadata/apps/api/tests/viewsets/test_xform_submission_viewset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1266,12 +1266,16 @@ def test_new_submission_sent_to_rapidpro(self, mock_send):

with open(submission_path, "rb") as sf:
data = {"xml_submission_file": sf, "media_file": f}
request = self.factory.post("/submission", data)
response = self.view(request)
self.assertEqual(response.status_code, 401)
auth = DigestAuth("bob", "bobbob")
request.META.update(auth(request.META, response))
response = self.view(request, username=self.user.username)

with self.captureOnCommitCallbacks(execute=True):
# Ensure on commit callbacks are executed
request = self.factory.post("/submission", data)
response = self.view(request)
self.assertEqual(response.status_code, 401)
auth = DigestAuth("bob", "bobbob")
request.META.update(auth(request.META, response))
response = self.view(request, username=self.user.username)

self.assertContains(response, "Successful submission", status_code=201)
instance = Instance.objects.all().order_by("-pk")[0]
mock_send.assert_called_once_with(rest_service.service_url, instance)
Expand Down Expand Up @@ -1310,12 +1314,16 @@ def test_edit_submission_sent_to_rapidpro(self, mock_send):

with open(submission_path, "rb") as sf:
data = {"xml_submission_file": sf, "media_file": f}
request = self.factory.post("/submission", data)
response = self.view(request)
self.assertEqual(response.status_code, 401)
auth = DigestAuth("bob", "bobbob")
request.META.update(auth(request.META, response))
response = self.view(request, username=self.user.username)

with self.captureOnCommitCallbacks(execute=True):
# Ensure on commit callbacks are executed
request = self.factory.post("/submission", data)
response = self.view(request)
self.assertEqual(response.status_code, 401)
auth = DigestAuth("bob", "bobbob")
request.META.update(auth(request.META, response))
response = self.view(request, username=self.user.username)

self.assertContains(response, "Successful submission", status_code=201)
new_uuid = "6b2cc313-fc09-437e-8139-fcd32f695d41"
instance = Instance.objects.get(uuid=new_uuid)
Expand Down
4 changes: 1 addition & 3 deletions onadata/apps/restservice/services/generic_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ class ServiceDefinition(RestServiceInterface): # pylint: disable=too-few-public
def send(self, url, data=None):
"""Post submisison JSON data to an external service that accepts a JSON post."""
if data:
# We use Instance.get_full_dict() instead of Instance.json because
# when asynchronous processing is enabled, the json may not be upto date
post_data = json.dumps(data.get_full_dict())
post_data = json.dumps(data.json)
headers = {"Content-Type": "application/json"}
try:
requests.post(
Expand Down
4 changes: 1 addition & 3 deletions onadata/apps/restservice/services/textit.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ def send(self, url, data=None):
:param data:
:return:
"""
# We use Instance.get_full_dict() instead of Instance.json because
# when asynchronous processing is enabled, the json may not be upto date
extra_data = self.clean_keys_of_slashes(data.get_full_dict())
extra_data = self.clean_keys_of_slashes(data.json)
data_value = MetaData.textit(data.xform)

if data_value:
Expand Down
23 changes: 3 additions & 20 deletions onadata/apps/restservice/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,9 @@
RestService signals module
"""
import django.dispatch
from django.conf import settings
from multidb.pinning import use_master

from onadata.apps.restservice.tasks import call_service_async
from onadata.apps.restservice.utils import call_service
from onadata.apps.logger.models.instance import Instance

ASYNC_POST_SUBMISSION_PROCESSING_ENABLED = getattr(
settings, "ASYNC_POST_SUBMISSION_PROCESSING_ENABLED", False
)

# pylint: disable=invalid-name
trigger_webhook = django.dispatch.Signal()
Expand All @@ -22,19 +15,9 @@ def call_webhooks(sender, **kwargs): # pylint: disable=unused-argument
"""
Call webhooks signal.
"""
instance_id = kwargs["instance"].pk
if ASYNC_POST_SUBMISSION_PROCESSING_ENABLED:
call_service_async.apply_async(args=[instance_id], countdown=1)
else:
with use_master:
try:
instance = Instance.objects.get(pk=instance_id)
except Instance.DoesNotExist:
# if the instance has already been removed we do not send it to the
# service
pass
else:
call_service(instance)
instance = kwargs["instance"]

call_service_async.apply_async(args=[instance.pk], countdown=1)


trigger_webhook.connect(call_webhooks, dispatch_uid="call_webhooks")
18 changes: 10 additions & 8 deletions onadata/apps/restservice/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
"""
restservice async functions.
"""
from multidb.pinning import use_master

from onadata.apps.logger.models.instance import Instance
from onadata.apps.restservice.utils import call_service
from onadata.celeryapp import app
Expand All @@ -10,13 +12,13 @@
@app.task()
def call_service_async(instance_pk):
"""Async function that calls call_service()."""
# load the parsed instance
# Use master database
with use_master:
try:
instance = Instance.objects.get(pk=instance_pk)
except Instance.DoesNotExist:
# if the instance has already been removed we do not send it to the
# service
return

try:
instance = Instance.objects.get(pk=instance_pk)
except Instance.DoesNotExist:
# if the instance has already been removed we do not send it to the
# service
pass
else:
call_service(instance)
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,10 @@ def test_textit_flow(self, mock_http):
)

self.assertFalse(mock_http.called)
self._make_submissions()

with self.captureOnCommitCallbacks(execute=True):
# Ensure on commit callbacks are executed
self._make_submissions()

self.assertTrue(mock_http.called)
self.assertEqual(mock_http.call_count, 4)
Expand Down
30 changes: 14 additions & 16 deletions onadata/apps/viewer/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
import django.dispatch
from django.conf import settings
from django.db import transaction
from django.db.models.signals import post_save

from onadata.apps.logger.models import Instance
Expand Down Expand Up @@ -37,29 +38,26 @@ def _post_process_submissions(instance):

def post_save_submission(sender, **kwargs): # pylint: disable=unused-argument
"""
Calls webhooks and OSM data processing for ParsedInstance model.
Calls webhooks and OSM data processing for ParsedInstance/Instance model.
"""
parsed_instance = kwargs.get("instance")
created = kwargs.get("created")
instance = kwargs.get("instance")

if created and isinstance(instance, ParsedInstance):
# Get submission from ParsedInstance
instance = instance.instance

if created:
_post_process_submissions(parsed_instance.instance)
if isinstance(instance, Instance):
# Trigger webhooks only if the Instance has been commited by using
# transaction.on_commit. In case, the transaction is rolled back,
# the webhooks will not be called. Also, ensures getting the Instance
# again from the database later will not return stale data
transaction.on_commit(lambda: _post_process_submissions(instance))


post_save.connect(
post_save_submission, sender=ParsedInstance, dispatch_uid="post_save_submission"
)


def process_saved_submission(sender, **kwargs): # pylint: disable=unused-argument
"""
Calls webhooks and OSM data processing for Instance model.
"""
instance = kwargs.get("instance")
if instance:
_post_process_submissions(instance)


process_submission.connect(
process_saved_submission, sender=Instance, dispatch_uid="process_saved_submission"
post_save_submission, sender=Instance, dispatch_uid="process_saved_submission"
)
5 changes: 4 additions & 1 deletion onadata/libs/tests/utils/test_export_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ def test_generate_osm_export(self):
self._publish_xls_file_and_set_xform(xlsform_path)
submission_path = os.path.join(osm_fixtures_dir, "instance_a.xml")
count = Attachment.objects.filter(extension="osm").count()
self._make_submission_w_attachment(submission_path, paths)

with self.captureOnCommitCallbacks(execute=True):
# Ensure on commit callbacks are executed
self._make_submission_w_attachment(submission_path, paths)
self.assertTrue(Attachment.objects.filter(extension="osm").count() > count)

options = {"extension": Attachment.OSM}
Expand Down

0 comments on commit fea517d

Please sign in to comment.