diff --git a/onadata/apps/logger/management/commands/recover_deleted_attachments.py b/onadata/apps/logger/management/commands/recover_deleted_attachments.py new file mode 100644 index 0000000000..fa1bc6629f --- /dev/null +++ b/onadata/apps/logger/management/commands/recover_deleted_attachments.py @@ -0,0 +1,55 @@ +""" +Module containing the recover_deleted_attachments management command. + +Used to recover attachments that were accidentally deleted within the system +but are still required/present within the submission XML + +Sample usage: python manage.py recover_deleted_attachments --form 1 +""" +from django.core.management.base import BaseCommand + +from onadata.apps.logger.models import Instance + + +def recover_deleted_attachments(form_id: str, stdout=None): + """ + Recovers attachments that were accidentally soft-deleted + + :param: (str) form_id: Unique identifier for an XForm object + :param: (sys.stdout) stdout: Python standard output. Default: None + """ + instances = Instance.objects.filter( + xform__id=form_id, deleted_at__isnull=True) + for instance in instances: + expected_attachments = instance.get_expected_media() + if not instance.attachments.filter( + deleted_at__isnull=True).count() == len(expected_attachments): + attachments_to_recover = instance.attachments.filter( + deleted_at__isnull=False, + name__in=expected_attachments) + for attachment in attachments_to_recover: + attachment.deleted_at = None + attachment.deleted_by = None + attachment.save() + + if stdout: + stdout.write( + f'Recovered {attachment.name} ID: {attachment.id}') + # Regenerate instance JSON + instance.json = instance.get_full_dict(load_existing=False) + instance.save() + + +class Command(BaseCommand): + """ + Management command used to recover wrongfully deleted + attachments. + """ + help = 'Restore wrongly deleted attachments' + + def add_arguments(self, parser): + parser.add_argument('-f', '--form', dest='form_id', type=int) + + def handle(self, *args, **options): + form_id = options.get('form_id') + recover_deleted_attachments(form_id, self.stdout) diff --git a/onadata/apps/logger/tests/management/__init__.py b/onadata/apps/logger/tests/management/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/onadata/apps/logger/tests/management/commands/__init__.py b/onadata/apps/logger/tests/management/commands/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/onadata/apps/logger/tests/management/commands/test_recover_deleted_attachments.py b/onadata/apps/logger/tests/management/commands/test_recover_deleted_attachments.py new file mode 100644 index 0000000000..03ba6bb992 --- /dev/null +++ b/onadata/apps/logger/tests/management/commands/test_recover_deleted_attachments.py @@ -0,0 +1,74 @@ +""" +Module containing the tests for the recover_deleted_attachments +management command +""" +from io import BytesIO +from datetime import datetime + +from django.conf import settings + +from onadata.apps.main.tests.test_base import TestBase +from onadata.apps.logger.import_tools import django_file +from onadata.apps.logger.management.commands.recover_deleted_attachments \ + import recover_deleted_attachments +from onadata.libs.utils.logger_tools import create_instance + + +class TestRecoverDeletedAttachments(TestBase): + """TestRecoverDeletedAttachments Class""" + # pylint: disable=invalid-name + def test_recovers_wrongly_deleted_attachments(self): + """ + Test that the command recovers the correct + attachment + """ + md = """ + | survey | | | | + | | type | name | label | + | | file | file | File | + | | image | image | Image | + """ # pylint: disable=invalid-name + self._create_user_and_login() + xform = self._publish_markdown(md, self.user) + + xml_string = f""" + + + uuid:UJ5jz4EszdgH8uhy8nss1AsKaqBPO5VN7 + + Health_2011_03_13.xml_2011-03-15_20-30-28.xml + 1300221157303.jpg + + """ + media_root = (f'{settings.PROJECT_ROOT}/apps/logger/tests/Health' + '_2011_03_13.xml_2011-03-15_20-30-28/') + image_media = django_file( + path=f'{media_root}1300221157303.jpg', field_name='image', + content_type='image/jpeg') + file_media = django_file( + path=f'{media_root}Health_2011_03_13.xml_2011-03-15_20-30-28.xml', + field_name='file', content_type='text/xml') + instance = create_instance( + self.user.username, + BytesIO(xml_string.strip().encode('utf-8')), + media_files=[file_media, image_media]) + self.assertEqual( + instance.attachments.filter(deleted_at__isnull=True).count(), 2) + attachment = instance.attachments.first() + + # Soft delete attachment + attachment.deleted_at = datetime.now() + attachment.deleted_by = self.user + attachment.save() + + self.assertEqual( + instance.attachments.filter(deleted_at__isnull=True).count(), 1) + + # Attempt recovery of attachment + recover_deleted_attachments(form_id=instance.xform.id) + + self.assertEqual( + instance.attachments.filter(deleted_at__isnull=True).count(), 2) + attachment.refresh_from_db() + self.assertIsNone(attachment.deleted_at) + self.assertIsNone(attachment.deleted_by) diff --git a/onadata/libs/tests/utils/test_logger_tools.py b/onadata/libs/tests/utils/test_logger_tools.py index ad0f8be87b..e8f227467d 100644 --- a/onadata/libs/tests/utils/test_logger_tools.py +++ b/onadata/libs/tests/utils/test_logger_tools.py @@ -247,7 +247,8 @@ def test_replaced_attachments_not_tracked(self): md = """ | survey | | | | | | type | name | label | - | | image | image1 | Photo | + | | file | file | File | + | | image | image | Image | """ self._create_user_and_login() self.xform = self._publish_markdown(md, self.user) @@ -257,21 +258,28 @@ def test_replaced_attachments_not_tracked(self): uuid:UJ5jz4EszdgH8uhy8nss1AsKaqBPO5VN7 - 1300221157303.jpg + Health_2011_03_13.xml_2011-03-15_20-30-28.xml + 1300221157303.jpg """.format(self.xform.id_string) - file_path = "{}/apps/logger/tests/Health_2011_03_13."\ - "xml_2011-03-15_20-30-28/1300221157303"\ - ".jpg".format(settings.PROJECT_ROOT) - media_file = django_file( - path=file_path, field_name="image1", content_type="image/jpeg") + media_root = (f'{settings.PROJECT_ROOT}/apps/logger/tests/Health' + '_2011_03_13.xml_2011-03-15_20-30-28/') + image_media = django_file( + path=f'{media_root}1300221157303.jpg', field_name='image', + content_type='image/jpeg') + file_media = django_file( + path=f'{media_root}Health_2011_03_13.xml_2011-03-15_20-30-28.xml', + field_name='file', content_type='text/xml') instance = create_instance( self.user.username, BytesIO(xml_string.strip().encode('utf-8')), - media_files=[media_file]) + media_files=[file_media, image_media]) self.assertTrue(instance.json[MEDIA_ALL_RECEIVED]) - self.assertEquals(instance.json[TOTAL_MEDIA], 1) - self.assertEquals(instance.json[MEDIA_COUNT], 1) + self.assertEqual( + instance.attachments.filter(deleted_at__isnull=True).count(), + 2) + self.assertEquals(instance.json[TOTAL_MEDIA], 2) + self.assertEquals(instance.json[MEDIA_COUNT], 2) self.assertEquals(instance.json[TOTAL_MEDIA], instance.total_media) self.assertEquals(instance.json[MEDIA_COUNT], instance.media_count) self.assertEquals(instance.json[MEDIA_ALL_RECEIVED], @@ -304,6 +312,7 @@ def test_replaced_attachments_not_tracked(self): instance2 = Instance.objects.get(pk=instance.pk) self.assertTrue(instance2.json[MEDIA_ALL_RECEIVED]) # Test that only one attachment is recognised for this submission + # Since the file is no longer present in the submission self.assertEquals(instance2.json[TOTAL_MEDIA], 1) self.assertEquals(instance2.json[MEDIA_COUNT], 1) self.assertEquals( diff --git a/onadata/libs/utils/common_tags.py b/onadata/libs/utils/common_tags.py index de7bbfa669..3147a2c7df 100644 --- a/onadata/libs/utils/common_tags.py +++ b/onadata/libs/utils/common_tags.py @@ -90,7 +90,7 @@ OWNER_TEAM_NAME = "Owners" API_TOKEN = 'api-token' -KNOWN_MEDIA_TYPES = ['photo', 'image', 'audio', 'video'] +KNOWN_MEDIA_TYPES = ['photo', 'image', 'audio', 'video', 'file'] MEDIA_FILE_TYPES = { "image": ["image/png", "image/jpeg", "image/jpg"], "audio": ["audio/mp3", "audio/mp4"],