Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix project transfer failure in some circumstances #5135

Merged
merged 4 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kobo/apps/openrosa/apps/logger/models/attachment.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# coding: utf-8
import mimetypes
import os

Expand All @@ -10,6 +9,7 @@
from kpi.deployment_backends.kc_access.storage import (
default_kobocat_storage as default_storage,
)
from kpi.fields.file import ExtendedFileField
from .instance import Instance


Expand Down Expand Up @@ -41,7 +41,7 @@ class Attachment(models.Model):
instance = models.ForeignKey(
Instance, related_name='attachments', on_delete=models.CASCADE
)
media_file = models.FileField(
media_file = ExtendedFileField(
storage=default_storage,
upload_to=upload_to,
max_length=380,
Expand Down
4 changes: 4 additions & 0 deletions kobo/apps/project_ownership/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ class AsyncTaskException(Exception):

class TransferAlreadyProcessedException(Exception):
pass


class TransferStillPendingException(Exception):
pass
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from django.core.management import call_command
from django.core.management.base import BaseCommand

from ...models import (
Transfer,
TransferStatus,
TransferStatusChoices,
TransferStatusTypeChoices,
)
from ...utils import (
move_media_files,
move_attachments,
rewrite_mongo_userform_id,
)


class Command(BaseCommand):
help = (
'Resume project ownership transfers done under `2.024.25` which failed '
'with error: "Project A : previous_owner -> new_owner is not in progress"'
)

def handle(self, *args, **options):

usernames = set()
verbosity = options['verbosity']

for transfer_status in TransferStatus.objects.filter(
status=TransferStatusChoices.FAILED,
status_type=TransferStatusTypeChoices.GLOBAL,
error__icontains='is not in progress',
).iterator():
transfer = transfer_status.transfer
if transfer.asset.pending_delete:
if verbosity:
self.stdout.write(
f'Project `{transfer.asset}` is in trash bin, skip it!'
)
continue

if not self._validate_whether_transfer_can_be_fixed(transfer):
if verbosity:
self.stdout.write(
f'Project `{transfer.asset}` transfer cannot be fixed'
f' automatically'
)
continue

if not transfer.asset.has_deployment:
continue

if verbosity:
self.stdout.write(
f'Resuming `{transfer.asset}` transfer…'
)
self._move_data(transfer)
move_attachments(transfer)
move_media_files(transfer)
if verbosity:
self.stdout.write('\tDone!')
usernames.add(transfer.invite.recipient.username)

# Update attachment storage bytes counters
for username in usernames:
call_command(
'update_attachment_storage_bytes',
verbosity=verbosity,
force=True,
username=username,
)

def _move_data(self, transfer: Transfer):

# Sanity check
asset = transfer.asset
rewrite_mongo_userform_id(transfer)
number_of_submissions = asset.deployment.xform.num_of_submissions
submission_ids = [
s['_id']
for s in asset.deployment.get_submissions(asset.owner, fields=['_id'])
]

if number_of_submissions == (mongo_document_count := len(submission_ids)):
self.stdout.write(f'\tSuccess: {number_of_submissions} submissions moved!')
else:
missing_count = number_of_submissions - mongo_document_count
self.stdout.write(
f'\t⚠️ Only {mongo_document_count} submissions moved, '
f'{missing_count} are missing!'
)

def _validate_whether_transfer_can_be_fixed(self, transfer: Transfer) -> bool:
original_new_owner_id = transfer.invite.recipient_id
current_owner_id = transfer.asset.owner_id

return current_owner_id == original_new_owner_id
33 changes: 20 additions & 13 deletions kobo/apps/project_ownership/models/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def status(self, value: Union[str, tuple[str]]):
global_status.status = value

global_status.save()

self.date_modified = timezone.now()
self.save(update_fields=['date_modified'])
self._update_invite_status()
Expand All @@ -100,6 +101,7 @@ def transfer_project(self):
success = False
try:
if not self.asset.has_deployment:
_rewrite_mongo = False
with transaction.atomic():
self._reassign_project_permissions(update_deployment=False)
self._sent_in_app_messages()
Expand All @@ -113,6 +115,7 @@ def transfer_project(self):
status=TransferStatusChoices.SUCCESS
)
else:
_rewrite_mongo = True
with transaction.atomic():
with kc_transaction_atomic():
deployment = self.asset.deployment
Expand All @@ -129,19 +132,9 @@ def transfer_project(self):

self._sent_in_app_messages()

# Move submissions, media files and attachments in background
# tasks because it can take a while to complete on big projects

# 1) Rewrite `_userform_id` in MongoDB
async_task.delay(
self.pk, TransferStatusTypeChoices.SUBMISSIONS
)

# 2) Move media files to new owner's home directory
async_task.delay(
self.pk, TransferStatusTypeChoices.MEDIA_FILES
)

# Do not delegate anything to Celery before the transaction has
# been validated. Otherwise, Celery could fetch outdated data.
transaction.on_commit(lambda: self._start_async_jobs(_rewrite_mongo))
success = True
finally:
if not success:
Expand Down Expand Up @@ -265,6 +258,20 @@ def _sent_in_app_messages(self):
]
)

def _start_async_jobs(self, rewrite_mongo: bool = True):
# Move submissions, media files and attachments in background
# tasks because it can take a while to complete on big projects
if rewrite_mongo:
# 1) Rewrite `_userform_id` in MongoDB
async_task.delay(
self.pk, TransferStatusTypeChoices.SUBMISSIONS
)

# 2) Move media files to new owner's home directory
async_task.delay(
self.pk, TransferStatusTypeChoices.MEDIA_FILES
)

def _update_invite_status(self):
"""
Update the status of the invite based on the status of each transfer
Expand Down
11 changes: 9 additions & 2 deletions kobo/apps/project_ownership/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from kobo.celery import celery_app
from kpi.utils.mailer import EmailMessage, Mailer
from .exceptions import AsyncTaskException
from .exceptions import AsyncTaskException, TransferStillPendingException
from .models.choices import (
InviteStatusChoices,
TransferStatusChoices,
Expand All @@ -28,6 +28,7 @@
autoretry_for=(
SoftTimeLimitExceeded,
TimeLimitExceeded,
TransferStillPendingException,
),
max_retry=5,
retry_backoff=60,
Expand All @@ -43,8 +44,14 @@ def async_task(transfer_id: int, async_task_type: str):

transfer = Transfer.objects.get(pk=transfer_id)

if transfer.status == TransferStatusChoices.PENDING:
# Sometimes, a race condition occurs: the Celery task starts, but
# `transfer.status` has not been updated fast enough.
# Raise an exception which allows retry.
raise TransferStillPendingException

if transfer.status != TransferStatusChoices.IN_PROGRESS:
raise AsyncTaskException(f'`{transfer}` is not in progress')
raise AsyncTaskException(f'`{transfer}` is not in progress: {transfer.status}')

TransferStatus.update_status(
transfer_id=transfer_id,
Expand Down
23 changes: 11 additions & 12 deletions kobo/apps/project_ownership/tests/api/v2/test_api.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import uuid

from constance.test import override_config
from datetime import timedelta
from dateutil.parser import isoparse
from django.conf import settings
from django.contrib.auth import get_user_model
from django.utils import timezone
from mock import patch, MagicMock
from rest_framework import status
from rest_framework.reverse import reverse
from unittest.mock import ANY

from kobo.apps.project_ownership.models import (
Invite,
Expand All @@ -18,11 +15,11 @@
)
from kobo.apps.project_ownership.tests.utils import MockServiceUsageSerializer
from kobo.apps.trackers.utils import update_nlp_counter

from kpi.constants import PERM_VIEW_ASSET
from kpi.models import Asset
from kpi.tests.base_test_case import BaseAssetTestCase
from kpi.tests.kpi_test_case import KpiTestCase
from kpi.tests.utils.transaction import immediate_on_commit
from kpi.urls.router_api_v2 import URL_NAMESPACE as ROUTER_URL_NAMESPACE


Expand Down Expand Up @@ -432,7 +429,7 @@ def test_account_usage_transferred_to_new_user(self):
response = self.client.get(service_usage_url)
assert response.data == expected_empty_data

# Transfer project from someuser to anotheruser
# Transfer the project from someuser to anotheruser
self.client.login(username='someuser', password='someuser')
payload = {
'recipient': self.absolute_reverse(
Expand All @@ -445,9 +442,10 @@ def test_account_usage_transferred_to_new_user(self):
'kpi.deployment_backends.backends.MockDeploymentBackend.xform',
MagicMock(),
):
response = self.client.post(
self.invite_url, data=payload, format='json'
)
with immediate_on_commit():
response = self.client.post(
self.invite_url, data=payload, format='json'
)
assert response.status_code == status.HTTP_201_CREATED

# someuser should have no usage reported anymore
Expand Down Expand Up @@ -495,7 +493,7 @@ def test_data_accessible_to_new_user(self):
) == 0
)

# Transfer project from someuser to anotheruser
# Transfer the project from someuser to anotheruser
self.client.login(username='someuser', password='someuser')
payload = {
'recipient': self.absolute_reverse(
Expand All @@ -508,9 +506,10 @@ def test_data_accessible_to_new_user(self):
'kpi.deployment_backends.backends.MockDeploymentBackend.xform',
MagicMock(),
):
response = self.client.post(
self.invite_url, data=payload, format='json'
)
with immediate_on_commit():
response = self.client.post(
self.invite_url, data=payload, format='json'
)
assert response.status_code == status.HTTP_201_CREATED

# anotheruser is the owner and should see the project
Expand Down
7 changes: 5 additions & 2 deletions kobo/apps/project_ownership/tests/test_transfer_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from django.test import TestCase

from kpi.models import Asset
from kpi.tests.utils.transaction import immediate_on_commit
from ..models import (
Invite,
InviteStatusChoices,
Expand Down Expand Up @@ -105,9 +106,11 @@ def test_calculated_failed_transfer_status(self):
assert self.invite.status == InviteStatusChoices.FAILED

def test_draft_project_transfer(self):
# when project is a draft, there are no celery tasks called to move
# When a project is a draft, there are no celery tasks called to move
# submissions (and related attachments).
self.transfer.transfer_project()
with immediate_on_commit():
self.transfer.transfer_project()

assert self.transfer.status == TransferStatusChoices.SUCCESS

# However, the status of each async task should still be updated to
Expand Down
25 changes: 25 additions & 0 deletions kpi/tests/utils/transaction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from contextlib import contextmanager
from unittest import mock

from django.contrib.auth.management import DEFAULT_DB_ALIAS


@contextmanager
def immediate_on_commit(using=None):
"""
Context manager executing transaction.on_commit() hooks immediately as
if the connection was in auto-commit mode. This is required when
using a subclass of django.test.TestCase as all tests are wrapped in
a transaction that never gets committed.

Source: https://code.djangoproject.com/ticket/30457#comment:1
"""
immediate_using = DEFAULT_DB_ALIAS if using is None else using

def on_commit(func, using=None):
using = DEFAULT_DB_ALIAS if using is None else using
if using == immediate_using:
func()

with mock.patch('django.db.transaction.on_commit', side_effect=on_commit) as patch:
yield patch
Loading