Skip to content

Commit

Permalink
- Fix: race on upload transaction
Browse files Browse the repository at this point in the history
(cherry picked from commit 148fb93)
  • Loading branch information
afabiani committed Dec 1, 2021
1 parent 7614b28 commit c41dedf
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 131 deletions.
10 changes: 1 addition & 9 deletions geonode/geoserver/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
from owslib.wms import WebMapService
from geonode import GeoNodeException
from geonode.utils import http_client
from geonode.layers.models import Layer, UploadSession, Attribute, Style
from geonode.layers.models import Layer, Attribute, Style
from geonode.layers.enumerations import LAYER_ATTRIBUTE_NUMERIC_DATA_TYPES
from geonode.security.views import _perms_info_json
from geonode.security.utils import set_geowebcache_invalidate_cache
Expand Down Expand Up @@ -2120,10 +2120,6 @@ def sync_instance_with_geoserver(
sender=instance.__class__, instance=instance, update_fields=['thumbnail_url'])
return instance

geonode_upload_sessions = UploadSession.objects.filter(resource=instance)
geonode_upload_sessions.update(processed=False)
instance.set_dirty_state()

gs_resource = None
values = None
_tries = 0
Expand Down Expand Up @@ -2316,10 +2312,6 @@ def sync_instance_with_geoserver(
except Exception as e:
logger.exception(e)
return None
finally:
geonode_upload_sessions = UploadSession.objects.filter(resource=instance)
geonode_upload_sessions.update(processed=True)
instance.clear_dirty_state()

# Refreshing layer links
logger.debug(f"... Creating Default Resource Links for Layer {instance.title}")
Expand Down
17 changes: 15 additions & 2 deletions geonode/geoserver/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ def geoserver_finalize_upload(
lock_id = f'{self.request.id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
# Hide the resource until finished
instance.set_dirty_state()

from geonode.upload.models import Upload
upload = Upload.objects.get(import_id=import_id)
upload.layer = instance
Expand Down Expand Up @@ -288,9 +291,19 @@ def geoserver_finalize_upload(
shutil.rmtree(tempdir)
except Exception as e:
logger.warning(e)

try:
# Update the upload sessions
geonode_upload_sessions = UploadSession.objects.filter(resource=instance)
geonode_upload_sessions.update(processed=True)
instance.upload_session = geonode_upload_sessions.first()
except Exception as e:
logger.exception(e)
finally:
upload.complete = True
upload.save()
# Show the resource finally finished
upload.set_processing_state(Upload.STATE_PROCESSED)

signals.upload_complete.send(sender=geoserver_finalize_upload, layer=instance)

Expand All @@ -303,7 +316,7 @@ def geoserver_finalize_upload(
expires=3600,
acks_late=False,
autoretry_for=(Exception, ),
retry_kwargs={'max_retries': 3, 'countdown': 10},
retry_kwargs={'max_retries': 0, 'countdown': 10},
retry_backoff=True,
retry_backoff_max=700,
retry_jitter=True)
Expand Down Expand Up @@ -332,7 +345,7 @@ def geoserver_post_save_layers(
expires=30,
acks_late=False,
autoretry_for=(Exception, ),
retry_kwargs={'max_retries': 3, 'countdown': 10},
retry_kwargs={'max_retries': 0, 'countdown': 10},
retry_backoff=True,
retry_backoff_max=700,
retry_jitter=True)
Expand Down
4 changes: 2 additions & 2 deletions geonode/upload/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def run_setup_hooks(sender, **kwargs):
check_intervals = IntervalSchedule.objects.filter(every=600, period="seconds")
if not check_intervals.exists():
check_interval, _ = IntervalSchedule.objects.get_or_create(
every=600,
every=10,
period="seconds"
)
else:
Expand All @@ -77,7 +77,7 @@ def ready(self):
post_migrate.connect(run_setup_hooks, sender=self)
settings.CELERY_BEAT_SCHEDULE['finalize-incomplete-session-resources'] = {
'task': 'geonode.upload.tasks.finalize_incomplete_session_uploads',
'schedule': 60.0,
'schedule': 10.0,
}


Expand Down
7 changes: 1 addition & 6 deletions geonode/upload/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,7 @@ def progress(self):
return 50.0
elif self.state == Upload.STATE_PROCESSED:
return 100.0
elif self.complete or self.state in (Upload.STATE_COMPLETE, Upload.STATE_RUNNING):
if self.layer and self.layer.processed:
self.set_processing_state(Upload.STATE_PROCESSED)
return 100.0
elif self.state == Upload.STATE_RUNNING:
return 66.0
elif self.state in (Upload.STATE_COMPLETE, Upload.STATE_RUNNING):
return 80.0

def set_resume_url(self, resume_url):
Expand Down
219 changes: 119 additions & 100 deletions geonode/upload/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from gsimporter.api import NotFound

from django.conf import settings
from django.db import transaction
from django.utils.timezone import timedelta, now

from geonode.celery_app import app
Expand Down Expand Up @@ -60,72 +61,77 @@ def finalize_incomplete_session_uploads(self, *args, **kwargs):
lock_id = f'{self.request.id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
_upload_ids = []
_upload_tasks = []

# Check first if we need to delete stale sessions
expiry_time = now() - timedelta(hours=UPLOAD_SESSION_EXPIRY_HOURS)
for _upload in Upload.objects.exclude(state=Upload.STATE_PROCESSED).exclude(date__gt=expiry_time):
_upload.set_processing_state(Upload.STATE_INVALID)
_upload_ids.append(_upload.id)
_upload_tasks.append(
_upload_session_cleanup.signature(
args=(_upload.id,)
with transaction.atomic():
_upload_ids = []
_upload_tasks = []

# Check first if we need to delete stale sessions
expiry_time = now() - timedelta(hours=UPLOAD_SESSION_EXPIRY_HOURS)
for _upload in Upload.objects.exclude(state=Upload.STATE_PROCESSED).exclude(date__gt=expiry_time):
_upload.set_processing_state(Upload.STATE_INVALID)
_upload_ids.append(_upload.id)
_upload_tasks.append(
_upload_session_cleanup.signature(
args=(_upload.id,)
)
)
)

upload_workflow_finalizer = _upload_workflow_finalizer.signature(
args=('_upload_session_cleanup', _upload_ids,),
immutable=True
).on_error(
_upload_workflow_error.signature(
upload_workflow_finalizer = _upload_workflow_finalizer.signature(
args=('_upload_session_cleanup', _upload_ids,),
immutable=True
).on_error(
_upload_workflow_error.signature(
args=('_upload_session_cleanup', _upload_ids,),
immutable=True
)
)
)
upload_workflow = chord(_upload_tasks, body=upload_workflow_finalizer)
upload_workflow.apply_async()

# Let's finish the valid ones
_processing_states = (
Upload.STATE_RUNNING,
Upload.STATE_INVALID,
Upload.STATE_PROCESSED)
for _upload in Upload.objects.exclude(state__in=_processing_states):
session = None
try:
if not _upload.import_id:
raise NotFound
session = _upload.get_session.import_session
if not session or session.state != Upload.STATE_COMPLETE:
session = gs_uploader.get_session(_upload.import_id)
except (NotFound, Exception) as e:
logger.exception(e)
upload_workflow = chord(_upload_tasks, body=upload_workflow_finalizer)
upload_workflow.apply_async()

# Let's finish the valid ones
_exclusion_processing_states = (
Upload.STATE_COMPLETE,
Upload.STATE_PROCESSED)
for _upload in Upload.objects.exclude(state__in=_exclusion_processing_states).exclude(id__in=_upload_ids):
session = None
if _upload.state not in (Upload.STATE_COMPLETE, Upload.STATE_PROCESSED):
_upload.set_processing_state(Upload.STATE_INVALID)
if _upload.layer:
_upload.layer.delete()

if session:
_upload_ids.append(_upload.id)
_upload_tasks.append(
_update_upload_session_state.signature(
args=(_upload.id,)
try:
if not _upload.import_id:
raise NotFound
session = _upload.get_session.import_session
if not session or session.state != Upload.STATE_COMPLETE:
session = gs_uploader.get_session(_upload.import_id)
except (NotFound, Exception) as e:
logger.exception(e)
session = None

if session:
_upload_ids.append(_upload.id)
_upload_tasks.append(
_update_upload_session_state.signature(
args=(_upload.id,)
)
)
)

upload_workflow_finalizer = _upload_workflow_finalizer.signature(
args=('_update_upload_session_state', _upload_ids,),
immutable=True
).on_error(
_upload_workflow_error.signature(
else:
if _upload.state not in (Upload.STATE_COMPLETE, Upload.STATE_PROCESSED):
_upload.set_processing_state(Upload.STATE_INVALID)
_upload_ids.append(_upload.id)
_upload_tasks.append(
_upload_session_cleanup.signature(
args=(_upload.id,)
)
)

upload_workflow_finalizer = _upload_workflow_finalizer.signature(
args=('_update_upload_session_state', _upload_ids,),
immutable=True
).on_error(
_upload_workflow_error.signature(
args=('_update_upload_session_state', _upload_ids,),
immutable=True
)
)
)
upload_workflow = chord(_upload_tasks, body=upload_workflow_finalizer)
upload_workflow.apply_async()
upload_workflow = chord(_upload_tasks, body=upload_workflow_finalizer)
upload_workflow.apply_async()


@app.task(
Expand Down Expand Up @@ -163,38 +169,55 @@ def _upload_workflow_error(self, task_name: str, upload_ids: list):
)
def _update_upload_session_state(self, upload_session_id: int):
"""Task invoked by 'upload_workflow.chord' in order to process all the 'PENDING' Upload tasks."""

lock_id = f'{self.request.id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
_upload = Upload.objects.get(id=upload_session_id)
session = _upload.get_session.import_session
if not session or session.state != Upload.STATE_COMPLETE:
session = gs_uploader.get_session(_upload.import_id)

if session:
try:
content = next_step_response(None, _upload.get_session).content
if isinstance(content, bytes):
content = content.decode('UTF-8')
response_json = json.loads(content)
_success = response_json.get('success', False)
_redirect_to = response_json.get('redirect_to', '')
if _success:
if 'upload/final' not in _redirect_to and 'upload/check' not in _redirect_to:
_upload.set_resume_url(_redirect_to)
_upload.set_processing_state(Upload.STATE_WAITING)
else:
if session.state == Upload.STATE_COMPLETE and _upload.state == Upload.STATE_PENDING:
if not _upload.layer or not _upload.layer.processed:
final_step_view(None, _upload.get_session)
_upload.set_processing_state(Upload.STATE_RUNNING)
except (NotFound, Exception) as e:
logger.exception(e)
if _upload.state not in (Upload.STATE_COMPLETE, Upload.STATE_PROCESSED):
_upload.set_processing_state(Upload.STATE_INVALID)
if _upload.layer:
_upload.layer.delete()
_upload = Upload.objects.get(id=upload_session_id)
session = _upload.get_session.import_session
if not session or session.state != Upload.STATE_COMPLETE:
session = gs_uploader.get_session(_upload.import_id)

if session:
try:
_response = next_step_response(None, _upload.get_session)
_upload.refresh_from_db()
_content = _response.content
if isinstance(_content, bytes):
_content = _content.decode('UTF-8')
_response_json = json.loads(_content)
_success = _response_json.get('success', False)
_redirect_to = _response_json.get('redirect_to', '')
if _success:
if 'upload/final' not in _redirect_to and 'upload/check' not in _redirect_to:
_upload.set_resume_url(_redirect_to)
_upload.set_processing_state(Upload.STATE_WAITING)
else:
if session.state == Upload.STATE_COMPLETE and _upload.state == Upload.STATE_PENDING:
if not _upload.layer or not _upload.layer.processed:
_response = final_step_view(None, _upload.get_session)
_upload.refresh_from_db()
_content = _response.content
if isinstance(_content, bytes):
_content = _content.decode('UTF-8')
_response_json = json.loads(_content)
_success = _response_json.get('success', False)
_status = _response_json.get('status', 'error')
if _status == 'error':
# GeoNode Layer creation errored!
_upload.set_processing_state(Upload.STATE_INVALID)
elif _status == 'pending':
# GeoNode Layer not ready yet...
_upload.set_processing_state(Upload.STATE_PENDING)
elif _upload.state != Upload.STATE_PROCESSED:
if _upload.layer and _upload.layer.processed:
# GeoNode Layer successfully processed...
_upload.set_processing_state(Upload.STATE_PROCESSED)
else:
# GeoNode Layer updating...
_upload.set_processing_state(Upload.STATE_RUNNING)
logger.debug(f"Upload {upload_session_id} updated with state {_upload.state}.")
except (NotFound, Exception) as e:
logger.exception(e)
if _upload.state not in (Upload.STATE_COMPLETE, Upload.STATE_PROCESSED):
_upload.set_processing_state(Upload.STATE_INVALID)
logger.error(f"Upload {upload_session_id} deleted with state {_upload.state}.")


@app.task(
Expand All @@ -206,15 +229,11 @@ def _update_upload_session_state(self, upload_session_id: int):
)
def _upload_session_cleanup(self, upload_session_id: int):
"""Task invoked by 'upload_workflow.chord' in order to remove and cleanup all the 'INVALID' stale Upload tasks."""

lock_id = f'{self.request.id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
try:
_upload = Upload.objects.get(id=upload_session_id)
if _upload.layer:
_upload.layer.delete()
_upload.delete()
logger.debug(f"Upload {upload_session_id} deleted with state {_upload.state}.")
except Exception as e:
logger.error(f"Upload {upload_session_id} errored with exception {e}.")
try:
_upload = Upload.objects.get(id=upload_session_id)
if _upload.layer:
_upload.layer.delete()
_upload.delete()
logger.debug(f"Upload {upload_session_id} deleted with state {_upload.state}.")
except Exception as e:
logger.error(f"Upload {upload_session_id} errored with exception {e}.")
Loading

0 comments on commit c41dedf

Please sign in to comment.