Skip to content

Commit

Permalink
Merge pull request #1685 from geosolutions-it/session_owned
Browse files Browse the repository at this point in the history
Session owned
  • Loading branch information
mattiagiupponi authored Dec 22, 2023
2 parents d6f0d4a + 922ac87 commit 822162f
Show file tree
Hide file tree
Showing 10 changed files with 455 additions and 397 deletions.
7 changes: 7 additions & 0 deletions geonode/geoserver/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
http_client,
get_legend_url,
is_monochromatic_image,
keep_alive,
set_resource_default_links)

from .security import set_geowebcache_invalidate_cache
Expand Down Expand Up @@ -1263,11 +1264,13 @@ def save_style(gs_style, layer):
sld_name = gs_style.name
sld_body = gs_style.sld_body
if not gs_style.workspace:
keep_alive()
logger.debug(f"save_style: creating style in geoserver {layer.workspace}:{style_name}")
gs_style = gs_catalog.create_style(
style_name, sld_body,
raw=True, overwrite=True,
workspace=layer.workspace)
keep_alive()

style = None
try:
Expand Down Expand Up @@ -2073,7 +2076,11 @@ def sync_instance_with_geoserver(
logger.debug(f"SYNC: CLEAR GEOSERVER CATALOG")

# Let's reset the connections first
keep_alive()
gs_catalog._cache.clear()

keep_alive()

#gs_catalog.reset()

gs_resource = None
Expand Down
1 change: 1 addition & 0 deletions geonode/upload/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#########################################################################
import json
from urllib.parse import parse_qsl, urlparse
from django.conf import settings
from dynamic_rest.viewsets import DynamicModelViewSet
from dynamic_rest.filters import DynamicFilterBackend, DynamicSortingFilter

Expand Down
19 changes: 19 additions & 0 deletions geonode/upload/migrations/0037_auto_20231222_1036.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by Django 2.2.28 on 2023-12-22 10:36

import django.core.validators
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('upload', '0036_auto_20221019_0937'),
]

operations = [
migrations.AlterField(
model_name='uploadsizelimit',
name='max_size',
field=models.BigIntegerField(default=104857600, help_text='The maximum file size allowed for upload (bytes).', validators=[django.core.validators.MinValueValidator(limit_value=0)]),
),
]
8 changes: 5 additions & 3 deletions geonode/upload/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ def invalidate_from_session(self, upload_session):
import_id=upload_session.import_session.id
).update(state=Upload.STATE_INVALID)

def update_from_session(self, upload_session, layer=None):
def update_from_session(self, upload_session, layer=None, owned=False):
upload_session.owned = owned
return self.get(
user=upload_session.user,
name=upload_session.name,
import_id=upload_session.import_session.id).update_from_session(
upload_session, layer=layer)
upload_session, layer=layer, owned=owned)

def create_from_session(self, user, import_session):
return self.create(
Expand Down Expand Up @@ -140,7 +141,8 @@ def get_session(self):
return pickle.loads(
base64.decodebytes(self.session.encode('UTF-8')))

def update_from_session(self, upload_session, layer=None):
def update_from_session(self, upload_session, layer=None, owned=False):
upload_session.owned = owned
self.session = base64.encodebytes(pickle.dumps(upload_session)).decode('UTF-8')
self.name = upload_session.name
self.user = upload_session.user
Expand Down
24 changes: 17 additions & 7 deletions geonode/upload/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,13 @@ def finalize_incomplete_session_uploads(self, *args, **kwargs):
_upload_tasks = []
_upload_ids_expired = []

_is_owned = kwargs.get("owned", True)
# Check first if we need to delete stale sessions
expiry_time = now() - timedelta(hours=UPLOAD_SESSION_EXPIRY_HOURS)
for _upload in Upload.objects.exclude(state=Upload.STATE_PROCESSED).exclude(date__gt=expiry_time):
if _upload.get_session.owned:
logger.info(f"Upload session is still owned, skipping...!")
continue
_upload.set_processing_state(Upload.STATE_INVALID)
_upload_ids_expired.append(_upload.id)
_upload_tasks.append(
Expand All @@ -73,7 +77,7 @@ def finalize_incomplete_session_uploads(self, *args, **kwargs):
)

upload_workflow_finalizer = _upload_workflow_finalizer.signature(
args=('_upload_session_cleanup', _upload_ids_expired,),
args=('_upload_session_cleanup', _upload_ids_expired, ),
immutable=True
).on_error(
_upload_workflow_error.signature(
Expand All @@ -97,7 +101,7 @@ def finalize_incomplete_session_uploads(self, *args, **kwargs):
logger.exception(e)
session = None

if session:
if session and (not settings.ASYNC_SIGNALS or not _upload.get_session.owned):
_upload_ids.append(_upload.id)
_upload_tasks.append(
_update_upload_session_state.signature(
Expand All @@ -109,11 +113,11 @@ def finalize_incomplete_session_uploads(self, *args, **kwargs):
_upload.set_processing_state(Upload.STATE_PROCESSED)

upload_workflow_finalizer = _upload_workflow_finalizer.signature(
args=('_update_upload_session_state', _upload_ids,),
args=('_update_upload_session_state', _upload_ids, _is_owned),
immutable=True
).on_error(
_upload_workflow_error.signature(
args=('_update_upload_session_state', _upload_ids,),
args=('_update_upload_session_state', _upload_ids, _is_owned),
immutable=True
)
)
Expand All @@ -132,7 +136,7 @@ def finalize_incomplete_session_uploads(self, *args, **kwargs):
queue='upload',
acks_late=False,
ignore_result=False)
def _upload_workflow_finalizer(self, task_name: str, upload_ids: list):
def _upload_workflow_finalizer(self, task_name: str, upload_ids: list, is_owned = False):
"""Task invoked at 'upload_workflow.chord' end in the case everything went well.
"""
if upload_ids:
Expand Down Expand Up @@ -160,7 +164,7 @@ def _upload_workflow_error(self, task_name: str, upload_ids: list):
queue='upload',
acks_late=False,
ignore_result=False)
def _update_upload_session_state(self, upload_session_id: int):
def _update_upload_session_state(self, upload_session_id: int, _is_owned: bool = False):
"""Task invoked by 'upload_workflow.chord' in order to process all the 'PENDING' Upload tasks."""
lock_id = f'_update_upload_session_state-{upload_session_id}'
with AcquireLock(lock_id, blocking=None, title='_update_upload_session_state') as lock:
Expand All @@ -170,7 +174,11 @@ def _update_upload_session_state(self, upload_session_id: int):
if not session or session.state != Upload.STATE_COMPLETE:
session = gs_uploader.get_session(_upload.import_id)

if session:
logger.error(f"Session owned value: {_upload.get_session.owned} _is_owned value: {_is_owned}, upload_session_id: {upload_session_id}")
_is_owned = not _upload.get_session.owned or _is_owned
logger.error(f"Session is owned: {_is_owned} upload_session_id: {upload_session_id}")

if session and not _is_owned:
try:
_response = next_step_response(None, _upload.get_session)
_upload.refresh_from_db()
Expand Down Expand Up @@ -215,6 +223,8 @@ def _update_upload_session_state(self, upload_session_id: int):
if _upload.state not in (Upload.STATE_COMPLETE, Upload.STATE_PROCESSED):
_upload.set_processing_state(Upload.STATE_INVALID)
logger.error(f"Upload {upload_session_id} deleted with state {_upload.state}.")
elif _is_owned:
pass
elif _upload.state != Upload.STATE_PROCESSED:
_upload.set_processing_state(Upload.STATE_INVALID)
logger.error(f"Unable to find the Importer Session - Upload {upload_session_id} deleted with state {_upload.state}.")
Expand Down
Loading

0 comments on commit 822162f

Please sign in to comment.