diff --git a/onadata/libs/tests/utils/test_export_tools.py b/onadata/libs/tests/utils/test_export_tools.py index 6f9b4e7fd4..7041cdf4ce 100644 --- a/onadata/libs/tests/utils/test_export_tools.py +++ b/onadata/libs/tests/utils/test_export_tools.py @@ -6,9 +6,9 @@ import shutil import tempfile import zipfile -from builtins import open from datetime import date, datetime, timedelta +from builtins import open from django.conf import settings from django.contrib.sites.models import Site from django.core.files.storage import default_storage @@ -23,6 +23,7 @@ from onadata.apps.logger.models import Attachment, Instance, XForm from onadata.apps.main.tests.test_base import TestBase from onadata.apps.viewer.models.export import Export +from onadata.apps.viewer.models.parsed_instance import query_data from onadata.libs.serializers.merged_xform_serializer import \ MergedXFormSerializer from onadata.libs.utils.export_builder import (encode_if_str, @@ -630,3 +631,72 @@ def test_get_repeat_index_tags(self): with self.assertRaises(exceptions.ParseError): get_repeat_index_tags('p') + + def test_generate_filtered_attachments_zip_export(self): + """Test media zip file export filters attachments""" + filenames = [ + 'OSMWay234134797.osm', + 'OSMWay34298972.osm', + ] + osm_fixtures_dir = os.path.realpath( + os.path.join( + os.path.dirname(api_tests.__file__), 'fixtures', 'osm')) + paths = [ + os.path.join(osm_fixtures_dir, filename) for filename in filenames + ] + xlsform_path = os.path.join(osm_fixtures_dir, 'osm.xlsx') + self._publish_xls_file_and_set_xform(xlsform_path) + submission_path = os.path.join(osm_fixtures_dir, 'instance_a.xml') + count = Attachment.objects.filter(extension='osm').count() + self._make_submission_w_attachment(submission_path, paths) + self._make_submission_w_attachment(submission_path, paths) + self.assertTrue( + Attachment.objects.filter(extension='osm').count() > count) + + options = { + "extension": Export.ZIP_EXPORT, + "query": u'{"_submission_time": {"$lte": "2019-01-13T00:00:00"}}'} + filter_query = options.get("query") + instance_ids = query_data( + self.xform, fields='["_id"]', query=filter_query) + + export = generate_attachments_zip_export( + Export.ZIP_EXPORT, self.user.username, self.xform.id_string, None, + options) + + self.assertTrue(export.is_successful) + + temp_dir = tempfile.mkdtemp() + zip_file = zipfile.ZipFile(default_storage.path(export.filepath), "r") + zip_file.extractall(temp_dir) + zip_file.close() + + filtered_attachments = Attachment.objects.filter( + instance__xform_id=self.xform.pk).filter( + instance_id__in=[i_id['_id'] for i_id in instance_ids]) + + self.assertNotEqual( + Attachment.objects.count(), filtered_attachments.count()) + + for a in filtered_attachments: + self.assertTrue( + os.path.exists(os.path.join(temp_dir, a.media_file.name))) + shutil.rmtree(temp_dir) + + # export with no query + options.pop('query') + export1 = generate_attachments_zip_export( + Export.ZIP_EXPORT, self.user.username, self.xform.id_string, None, + options) + + self.assertTrue(export1.is_successful) + + temp_dir = tempfile.mkdtemp() + zip_file = zipfile.ZipFile(default_storage.path(export1.filepath), "r") + zip_file.extractall(temp_dir) + zip_file.close() + + for a in Attachment.objects.all(): + self.assertTrue( + os.path.exists(os.path.join(temp_dir, a.media_file.name))) + shutil.rmtree(temp_dir) diff --git a/onadata/libs/utils/export_tools.py b/onadata/libs/utils/export_tools.py index 7baab0ff7c..2237084ebb 100644 --- a/onadata/libs/utils/export_tools.py +++ b/onadata/libs/utils/export_tools.py @@ -9,11 +9,10 @@ import os import re import sys -import builtins from datetime import datetime, timedelta -from future.moves.urllib.parse import urlparse -from future.utils import iteritems +import builtins +import six from django.conf import settings from django.contrib.auth.models import User from django.core.files.base import File @@ -23,8 +22,8 @@ from django.shortcuts import render_to_response from django.utils import timezone from django.utils.translation import ugettext as _ - -import six +from future.moves.urllib.parse import urlparse +from future.utils import iteritems from json2xlsclient.client import Client from rest_framework import exceptions from savReaderWriter import SPSSIOError @@ -404,6 +403,7 @@ def generate_attachments_zip_export(export_type, username, id_string, ext: File extension of the generated export """ export_type = options.get("extension", export_type) + filter_query = options.get("query") if xform is None: xform = XForm.objects.get(user__username=username, id_string=id_string) @@ -413,18 +413,24 @@ def generate_attachments_zip_export(export_type, username, id_string, attachments = Attachment.objects.filter( instance_id__in=[ rec.get('_id') - for rec in dataview.query_data(dataview, all_data=True)], + for rec in dataview.query_data( + dataview, all_data=True, filter_query=filter_query)], instance__deleted_at__isnull=True) else: + instance_ids = query_data(xform, fields='["_id"]', query=filter_query) attachments = Attachment.objects.filter( instance__deleted_at__isnull=True) if xform.is_merged_dataset: attachments = attachments.filter( instance__xform_id__in=[ i for i in xform.mergedxform.xforms.filter( - deleted_at__isnull=True).values_list('id', flat=True)]) + deleted_at__isnull=True).values_list( + 'id', flat=True)]).filter( + instance_id__in=[i_id['_id'] for i_id in instance_ids]) else: - attachments = attachments.filter(instance__xform_id=xform.pk) + attachments = attachments.filter( + instance__xform_id=xform.pk).filter( + instance_id__in=[i_id['_id'] for i_id in instance_ids]) filename = "%s_%s.%s" % (id_string, datetime.now().strftime("%Y_%m_%d_%H_%M_%S"),