Skip to content

Commit

Permalink
Merge pull request #1853 from onaio/enketo_different_root_nodes_fix
Browse files Browse the repository at this point in the history
Set root node for created submissions to the XForms configured root node
  • Loading branch information
DavisRayM committed Aug 6, 2020
2 parents 05d29f5 + 46dc1a3 commit d79754d
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 4 deletions.
11 changes: 11 additions & 0 deletions onadata/apps/api/tests/viewsets/test_xform_submission_viewset.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,17 @@ def test_rapidpro_post_submission(self):
self.assertContains(response, 'Successful submission', status_code=201)
self.assertTrue(response.has_header('Date'))
self.assertEqual(response['Location'], 'http://testserver/submission')
# InstanceID is returned as uuid:<uuid>.
# Retrieving the uuid without the prefix in order to retrieve
# the Instance object
uuid = response.data.get('instanceID').split(':')[1]
instance = Instance.objects.get(uuid=uuid)
expected_xml = (
f"<?xml version='1.0' ?><{self.xform.survey.name} id="
"'transportation_2011_07_25'><fruit_name>orange"
f"</fruit_name></{self.xform.survey.name}>")
self.assertEqual(instance.xml, expected_xml)
self.assertEqual(self.xform.survey.name, 'data')

def test_legacy_rapidpro_post_submission(self):
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""
Management command used to replace the root node of an Instance when
the root node is the XForm ID
Example usage:
python manage.py replace_form_id_root_node -c -i 1,2,3
"""
import re
from hashlib import sha256

from django.core.management.base import BaseCommand, CommandError
from django.utils import timezone
from django.utils.translation import gettext as _

from onadata.apps.logger.models import Instance
from onadata.apps.logger.models.instance import InstanceHistory


def replace_form_id_with_correct_root_node(
inst_id: int, root: str = None, commit: bool = False) -> str:
inst: Instance = Instance.objects.get(id=inst_id, deleted_at__isnull=True)
initial_xml = inst.xml
form_id = re.escape(inst.xform.id_string)
if not root:
root = inst.xform.survey.name

opening_tag_regex = f"<{form_id}"
closing_tag_regex = f"</{form_id}>"
edited_xml = re.sub(opening_tag_regex, f'<{root}', initial_xml)
edited_xml = re.sub(closing_tag_regex, f'</{root}>', edited_xml)

if commit:
last_edited = timezone.now()
history = InstanceHistory.objects.create(
xml=initial_xml,
checksum=inst.checksum,
xform_instance=inst,
)
inst.last_edited = last_edited
inst.checksum = sha256(edited_xml.encode('utf-8')).hexdigest()
inst.xml = edited_xml
inst.save()
return f"Modified Instance ID {inst.id} - History object {history.id}"
else:
return edited_xml


class Command(BaseCommand):
help = _("Replaces form ID String with 'data' for an instances root node")

def add_arguments(self, parser):
parser.add_argument(
'--instance-ids',
'-i',
dest='instance_ids',
help='Comma-separated list of instance ids.'
)
parser.add_argument(
'--commit-changes',
'-c',
action='store_true',
dest='commit',
default=False,
help='Save XML changes'
)
parser.add_argument(
'--root-node',
'-r',
dest='root',
default=None,
help='Default root node name to replace the form ID with'
)

def handle(self, *args, **options):
instance_ids = options.get('instance_ids').split(',')
commit = options.get('commit')
root = options.get('root')

if not instance_ids:
raise CommandError('No instance id provided.')

for inst_id in instance_ids:
try:
msg = replace_form_id_with_correct_root_node(
inst_id, root=root, commit=commit)
except Instance.DoesNotExist:
msg = f"Instance with ID {inst_id} does not exist"

self.stdout.write(msg)
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""
Module containing the tests for the replace_form_id_root_node
management command
"""
from io import BytesIO

from django.conf import settings

from onadata.apps.main.tests.test_base import TestBase
from onadata.apps.logger.import_tools import django_file
from onadata.apps.logger.management.commands.replace_form_id_root_node \
import replace_form_id_with_correct_root_node
from onadata.libs.utils.logger_tools import create_instance


class TestReplaceFormIDRootNodeCommand(TestBase):
"""TestReplaceFormIDRootNodeCommand Class"""

def test_replaces_form_id_root_node(self):
"""
Test that the command correctly replaces the form ID
"""
md = """
| survey | | | |
| | type | name | label |
| | file | file | File |
| | image | image | Image |
"""
self._create_user_and_login()
xform = self._publish_markdown(md, self.user)
id_string = xform.id_string

xml_string = f"""
<{id_string} id="{id_string}">
<meta>
<instanceID>uuid:UJ5jz4EszdgH8uhy8nss1AsKaqBPO5VN7</instanceID>
</meta>
<file>Health_2011_03_13.xml_2011-03-15_20-30-28.xml</file>
<image>1300221157303.jpg</image>
</{id_string}>
"""
media_root = (f'{settings.PROJECT_ROOT}/apps/logger/tests/Health'
'_2011_03_13.xml_2011-03-15_20-30-28/')
image_media = django_file(
path=f'{media_root}1300221157303.jpg', field_name='image',
content_type='image/jpeg')
file_media = django_file(
path=f'{media_root}Health_2011_03_13.xml_2011-03-15_20-30-28.xml',
field_name='file', content_type='text/xml')
instance = create_instance(
self.user.username,
BytesIO(xml_string.strip().encode('utf-8')),
media_files=[file_media, image_media])

# Attempt replacement of root node name
replace_form_id_with_correct_root_node(
inst_id=instance.id, root='data', commit=True)
instance.refresh_from_db()

expected_xml = f"""<data id="{id_string}">
<meta>
<instanceID>uuid:UJ5jz4EszdgH8uhy8nss1AsKaqBPO5VN7</instanceID>
</meta>
<file>Health_2011_03_13.xml_2011-03-15_20-30-28.xml</file>
<image>1300221157303.jpg</image>
</data>"""
self.assertEqual(instance.xml, expected_xml)
3 changes: 2 additions & 1 deletion onadata/libs/serializers/data_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ def create_submission(request, username, data_dict, xform_id):
"""
Returns validated data object instances
"""
xml_string = dict2xform(data_dict, xform_id)
xml_string = dict2xform(
data_dict, xform_id, username=username)
xml_file = BytesIO(xml_string.encode('utf-8'))

error, instance = safe_create_instance(username, xml_file, [], None,
Expand Down
34 changes: 31 additions & 3 deletions onadata/libs/utils/logger_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,38 @@ def _get_instance(xml, new_uuid, submitted_by, status, xform, checksum):
return instance


def dict2xform(jsform, form_id, root=None):
def dict2xform(jsform, form_id, root=None, username=None):
"""
Converts a dictionary containing submission data into an XML
Submission for the appropriate form.
:param jsform (dict): A python dictionary object containing the submission
data
:param form_id (str or XForm): An XForm object or a string value
representing the forms id_string
:param root (str): An optional string that should be used as the
root nodes name. Defaults to None
:param: username (str): An optional string representing a users
username. Used alongside the `form_id` to
locate the XForm object the user is
trying to submit data too. Defaults to None
:returns: Returns a string containing the Submission XML
:rtype: str
"""
if not root:
root = form_id
return u"<?xml version='1.0' ?><{0} id='{1}'>{2}</{0}>".format(
if username:
if isinstance(form_id, XForm):
root = form_id.survey.name
else:
form = XForm.objects.filter(
id_string__iexact=form_id,
user__username__iexact=username,
deleted_at__isnull=True).first()
root = form.survey.name if form else 'data'
else:
root = 'data'

return "<?xml version='1.0' ?><{0} id='{1}'>{2}</{0}>".format(
root, form_id, dict2xml(jsform))


Expand Down

0 comments on commit d79754d

Please sign in to comment.