diff --git a/onadata/apps/api/tests/viewsets/test_media_viewset.py b/onadata/apps/api/tests/viewsets/test_media_viewset.py index 53fadac3c4..31a674c155 100644 --- a/onadata/apps/api/tests/viewsets/test_media_viewset.py +++ b/onadata/apps/api/tests/viewsets/test_media_viewset.py @@ -1,10 +1,16 @@ import os import urllib + +from django.utils import timezone from mock import MagicMock, patch from onadata.apps.api.tests.viewsets.test_abstract_viewset import TestAbstractViewSet from onadata.apps.api.viewsets.media_viewset import MediaViewSet from onadata.apps.logger.models import Attachment +from onadata.apps.main.models.meta_data import MetaData +from onadata.apps.main.tests.test_base import TestBase +from onadata.libs.models.share_xform import ShareXForm +from onadata.libs.permissions import EditorRole def attachment_url(attachment, suffix=None): @@ -17,10 +23,11 @@ def attachment_url(attachment, suffix=None): return url -class TestMediaViewSet(TestAbstractViewSet): +class TestMediaViewSet(TestAbstractViewSet, TestBase): """ Test the /api/v1/files endpoint """ + def setUp(self): super(TestMediaViewSet, self).setUp() self.retrieve_view = MediaViewSet.as_view({"get": "retrieve"}) @@ -32,10 +39,67 @@ def test_retrieve_view(self): request = self.factory.get( "/", {"filename": self.attachment.media_file.name}, **self.extra ) - response = self.retrieve_view(request, self.attachment.pk) + response = self.retrieve_view(request, pk=self.attachment.pk) + self.assertEqual(response.status_code, 200, response) + self.assertEqual(type(response.content), bytes) + + # test when the submission is soft deleted + self.attachment.instance.deleted_at = timezone.now() + self.attachment.instance.save() + + request = self.factory.get( + "/", {"filename": self.attachment.media_file.name}, **self.extra + ) + response = self.retrieve_view(request, pk=self.attachment.pk) + self.assertEqual(response.status_code, 404, response) + + def test_anon_retrieve_view(self): + """Test that anonymous users shouldn't retrieve media""" + request = self.factory.get("/", {"filename": self.attachment.media_file.name}) + response = self.retrieve_view(request, pk=self.attachment.pk) + self.assertEqual(response.status_code, 404, response) + + def test_retrieve_no_perms(self): + """Test that users without permissions to retrieve media + shouldn't be able to retrieve media + """ + # create new user + new_user = self._create_user("new_user", "new_user") + self.extra = {"HTTP_AUTHORIZATION": f"Token {new_user.auth_token.key}"} + request = self.factory.get( + "/", {"filename": self.attachment.media_file.name}, **self.extra + ) + self.assertTrue(new_user.is_authenticated) + response = self.retrieve_view(request, pk=self.attachment.pk) + # new user shouldn't have perms to download media + self.assertEqual(response.status_code, 404, response) + + def test_returned_media_is_based_on_form_perms(self): + """Test that attachments are returned based on form meta permissions""" + request = self.factory.get( + "/", {"filename": self.attachment.media_file.name}, **self.extra + ) + response = self.retrieve_view(request, pk=self.attachment.pk) self.assertEqual(response.status_code, 200, response) self.assertEqual(type(response.content), bytes) + # Enable meta perms + new_user = self._create_user("new_user", "new_user") + data_value = "editor-minor|dataentry-minor" + MetaData.xform_meta_permission(self.xform, data_value=data_value) + + instance = ShareXForm(self.xform, new_user.username, EditorRole.name) + instance.save() + auth_extra = {"HTTP_AUTHORIZATION": f"Token {new_user.auth_token.key}"} + + # New user should not be able to view media for + # submissions which they did not submit + request = self.factory.get( + "/", {"filename": self.attachment.media_file.name}, **auth_extra + ) + response = self.retrieve_view(request, pk=self.attachment.pk) + self.assertEqual(response.status_code, 404) + @patch("onadata.libs.utils.image_tools.get_storage_class") @patch("onadata.libs.utils.image_tools.boto3.client") def test_retrieve_view_from_s3(self, mock_presigned_urls, mock_get_storage_class): @@ -55,7 +119,7 @@ def test_retrieve_view_from_s3(self, mock_presigned_urls, mock_get_storage_class request = self.factory.get( "/", {"filename": self.attachment.media_file.name}, **self.extra ) - response = self.retrieve_view(request, self.attachment.pk) + response = self.retrieve_view(request, pk=self.attachment.pk) self.assertEqual(response.status_code, 302, response.url) self.assertEqual(response.url, expected_url) @@ -74,13 +138,74 @@ def test_retrieve_view_from_s3(self, mock_presigned_urls, mock_get_storage_class ExpiresIn=3600, ) + @patch("onadata.libs.utils.image_tools.get_storage_class") + @patch("onadata.libs.utils.image_tools.boto3.client") + def test_anon_retrieve_view_from_s3( + self, mock_presigned_urls, mock_get_storage_class + ): + """Test that anonymous user cannot retrieve media from s3""" + expected_url = ( + "https://testing.s3.amazonaws.com/doe/attachments/" + "4_Media_file/media.png?" + "response-content-disposition=attachment%3Bfilename%3media.png&" + "response-content-type=application%2Foctet-stream&" + "AWSAccessKeyId=AKIAJ3XYHHBIJDL7GY7A" + "&Signature=aGhiK%2BLFVeWm%2Fmg3S5zc05g8%3D&Expires=1615554960" + ) + mock_presigned_urls().generate_presigned_url = MagicMock( + return_value=expected_url + ) + mock_get_storage_class()().bucket.name = "onadata" + request = self.factory.get("/", {"filename": self.attachment.media_file.name}) + response = self.retrieve_view(request, pk=self.attachment.pk) + + self.assertEqual(response.status_code, 404, response) + + @patch("onadata.libs.utils.image_tools.get_storage_class") + @patch("onadata.libs.utils.image_tools.boto3.client") + def test_retrieve_view_from_s3_no_perms( + self, mock_presigned_urls, mock_get_storage_class + ): + """Test that authenticated user without correct perms + cannot retrieve media from s3 + """ + expected_url = ( + "https://testing.s3.amazonaws.com/doe/attachments/" + "4_Media_file/media.png?" + "response-content-disposition=attachment%3Bfilename%3media.png&" + "response-content-type=application%2Foctet-stream&" + "AWSAccessKeyId=AKIAJ3XYHHBIJDL7GY7A" + "&Signature=aGhiK%2BLFVeWm%2Fmg3S5zc05g8%3D&Expires=1615554960" + ) + mock_presigned_urls().generate_presigned_url = MagicMock( + return_value=expected_url + ) + mock_get_storage_class()().bucket.name = "onadata" + request = self.factory.get( + "/", {"filename": self.attachment.media_file.name}, **self.extra + ) + response = self.retrieve_view(request, pk=self.attachment.pk) + # owner should be able to retrieve media + self.assertEqual(response.status_code, 302, response) + + # create new user + new_user = self._create_user("new_user", "new_user") + self.extra = {"HTTP_AUTHORIZATION": f"Token {new_user.auth_token.key}"} + + request = self.factory.get( + "/", {"filename": self.attachment.media_file.name}, **self.extra + ) + response = self.retrieve_view(request, pk=self.attachment.pk) + # new user shouldn't have perms to download media + self.assertEqual(response.status_code, 404, response) + def test_retrieve_view_with_suffix(self): request = self.factory.get( "/", {"filename": self.attachment.media_file.name, "suffix": "large"}, **self.extra, ) - response = self.retrieve_view(request, self.attachment.pk) + response = self.retrieve_view(request, pk=self.attachment.pk) self.assertEqual(response.status_code, 302) self.assertTrue(response["Location"], attachment_url(self.attachment)) @@ -92,7 +217,7 @@ def test_handle_image_exception(self, mock_image_url): {"filename": self.attachment.media_file.name, "suffix": "large"}, **self.extra, ) - response = self.retrieve_view(request, self.attachment.pk) + response = self.retrieve_view(request, pk=self.attachment.pk) self.assertEqual(response.status_code, 400) def test_retrieve_view_small(self): @@ -101,7 +226,7 @@ def test_retrieve_view_small(self): {"filename": self.attachment.media_file.name, "suffix": "small"}, **self.extra, ) - response = self.retrieve_view(request, self.attachment.pk) + response = self.retrieve_view(request, pk=self.attachment.pk) self.assertEqual(response.status_code, 302) self.assertTrue(response["Location"], attachment_url(self.attachment, "small")) @@ -111,7 +236,7 @@ def test_retrieve_view_invalid_suffix(self): {"filename": self.attachment.media_file.name, "suffix": "TK"}, **self.extra, ) - response = self.retrieve_view(request, self.attachment.pk) + response = self.retrieve_view(request, pk=self.attachment.pk) self.assertEqual(response.status_code, 404) def test_retrieve_view_invalid_pk(self): @@ -120,12 +245,12 @@ def test_retrieve_view_invalid_pk(self): {"filename": self.attachment.media_file.name, "suffix": "small"}, **self.extra, ) - response = self.retrieve_view(request, "INVALID") + response = self.retrieve_view(request, pk="INVALID") self.assertEqual(response.status_code, 404) def test_retrieve_view_no_filename_param(self): request = self.factory.get("/", **self.extra) - response = self.retrieve_view(request, self.attachment.pk) + response = self.retrieve_view(request, pk=self.attachment.pk) self.assertEqual(response.status_code, 404) def test_retrieve_small_png(self): @@ -160,6 +285,6 @@ def test_retrieve_small_png(self): {"filename": self.attachment.media_file.name, "suffix": "small"}, **self.extra, ) - response = self.retrieve_view(request, self.attachment.pk) + response = self.retrieve_view(request, pk=self.attachment.pk) self.assertEqual(response.status_code, 302) self.assertTrue(response["Location"], attachment_url(self.attachment, "small")) diff --git a/onadata/apps/api/viewsets/media_viewset.py b/onadata/apps/api/viewsets/media_viewset.py index 422296c88f..11827bbb51 100644 --- a/onadata/apps/api/viewsets/media_viewset.py +++ b/onadata/apps/api/viewsets/media_viewset.py @@ -7,14 +7,14 @@ from django.conf import settings from django.http import Http404 from django.http import HttpResponseRedirect -from django.shortcuts import get_object_or_404 from rest_framework.response import Response from rest_framework import viewsets -from rest_framework.permissions import AllowAny from rest_framework.exceptions import ParseError +from onadata.apps.api.permissions import AttachmentObjectPermissions from onadata.apps.logger.models import Attachment +from onadata.libs import filters from onadata.libs.mixins.authenticate_header_mixin import AuthenticateHeaderMixin from onadata.libs.mixins.cache_control_mixin import CacheControlMixin from onadata.libs.mixins.etags_mixin import ETagsMixin @@ -30,14 +30,19 @@ class MediaViewSet( CacheControlMixin, ETagsMixin, BaseViewset, - viewsets.ViewSet, + viewsets.ReadOnlyModelViewSet, ): """A view to redirect to actual attachments url""" - permission_classes = (AllowAny,) + queryset = Attachment.objects.filter( + instance__deleted_at__isnull=True, deleted_at__isnull=True + ) + filter_backends = (filters.AttachmentFilter, filters.AttachmentTypeFilter) + lookup_field = "pk" + permission_classes = (AttachmentObjectPermissions,) # pylint: disable=invalid-name - def retrieve(self, request, pk=None): + def retrieve(self, request, *args, **kwargs): """ Redirect to final attachment url @@ -49,14 +54,14 @@ def retrieve(self, request, pk=None): return HttpResponseRedirect: redirects to final image url """ + pk = kwargs.get("pk") try: int(pk) except ValueError as exc: raise Http404() from exc else: filename = request.query_params.get("filename") - attachments = Attachment.objects.all() - obj = get_object_or_404(attachments, pk=pk) + obj = self.get_object() if obj.media_file.name != filename: raise Http404()