Skip to content

Commit

Permalink
Merge pull request #67 from qld-gov-au/develop
Browse files Browse the repository at this point in the history
[QOL-8515] speed improvements to s3 private/public on package level.
  • Loading branch information
duttonw authored Jan 6, 2022
2 parents 2056ad8 + e851a78 commit 49fed84
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 17 deletions.
10 changes: 10 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ Optional::
# An optional setting to change the acl of the uploaded files. Default public-read.
ckanext.s3filestore.acl = private

# An optional setting to control whether the ACLs of uploaded files
# are updated immediately when the dataset is updated, or queued
# for asynchronous processing. Defaults to True (ie asynchronous).
# NB Inline updates occur during the same transaction as the dataset update,
# and may cause significant overhead for datasets with many resources.
ckanext.s3filestore.acl.async_update = False

# An optional setting to specify which addressing style to use.
# This controls whether the bucket name is in the hostname or is
# part of the URL path. Options are 'path', 'virtual', and 'auto';
Expand Down Expand Up @@ -121,6 +128,9 @@ Optional::
ckanext.s3filestore.signed_url_expiry = 3600
ckanext.s3filestore.signed_url_cache_window = 1800

# Queue used by s3 plugin, if not set, default queue is used
# i.e.
ckanext.s3filestore.queue = bulk

------------------------
Development Installation
Expand Down
53 changes: 40 additions & 13 deletions ckanext/s3filestore/plugin.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
# encoding: utf-8

import os
import logging

from routes.mapper import SubMapper
from ckan import plugins
import ckantoolkit as toolkit

from ckanext.s3filestore import uploader as s3_uploader
from ckanext.s3filestore.views import\
resource as resource_view, uploads as uploads_view
from ckan.lib.uploader import ResourceUpload as DefaultResourceUpload,\
get_resource_uploader

from ckanext.s3filestore.tasks import s3_afterUpdatePackage

LOG = logging.getLogger(__name__)
toolkit = plugins.toolkit


class S3FileStorePlugin(plugins.SingletonPlugin):

plugins.implements(plugins.IConfigurer)
plugins.implements(plugins.IConfigurable)
plugins.implements(plugins.IUploader)
plugins.implements(plugins.IPackageController, inherit=True)

if plugins.toolkit.check_ckan_version(min_version='2.8.0'):
if toolkit.check_ckan_version(min_version='2.8.0'):
plugins.implements(plugins.IBlueprint)
else:
plugins.implements(plugins.IRoutes, inherit=True)
Expand Down Expand Up @@ -63,6 +65,9 @@ def configure(self, config):
s3_uploader.BaseS3Uploader().get_s3_bucket(
config.get('ckanext.s3filestore.aws_bucket_name'))

self.async_visibility_update = toolkit.asbool(config.get(
'ckanext.s3filestore.acl.async_update', 'True'))

# IUploader

def get_resource_uploader(self, data_dict):
Expand All @@ -77,31 +82,53 @@ def get_uploader(self, upload_to, old_filename=None):

def after_update(self, context, pkg_dict):
''' Update the access of each S3 object to match the package.
'''
'''
pkg_id = pkg_dict['id']
is_private = pkg_dict.get('private', False)
LOG.debug("after_update: Package %s has been updated, notifying resources", pkg_id)

# This is triggered repeatedly in the worker thread from plugins like
# 'validation' and 'archiver', so it needs to be efficient when
# no work is required.
latest_revision = toolkit.get_action('package_activity_list')(
context={'ignore_auth': True}, data_dict={'id': pkg_id, 'limit': 1})
if latest_revision and latest_revision[0]['data'].get('private', False) == is_private:
return

if 'resources' not in pkg_dict:
pkg_dict = toolkit.get_action('package_show')(
context=context, data_dict={'id': pkg_id})

visibility_level = 'private' if is_private else 'public-read'
async_update = self.async_visibility_update
if async_update:
try:
self.enqueue_resource_visibility_update_job(visibility_level, pkg_id, pkg_dict)
except Exception as e:
LOG.debug("after_update: Could not enqueue due to %s, doing inline", e)
async_update = False
if not async_update:
self.after_update_resource_list_update(visibility_level, pkg_id, pkg_dict)

def after_update_resource_list_update(self, visibility_level, pkg_id, pkg_dict):

LOG.debug("after_update_resource_list_update: Package %s has been updated, notifying resources", pkg_id)
for resource in pkg_dict['resources']:
uploader = get_resource_uploader(resource)
if hasattr(uploader, 'update_visibility'):
uploader.update_visibility(
resource['id'],
target_acl=visibility_level)
LOG.debug("after_update: Package %s has been updated, notifying resources finished", pkg_id)
LOG.debug("after_update_resource_list_update: Package %s has been updated, notifying resources finished", pkg_id)

def enqueue_resource_visibility_update_job(self, visibility_level, pkg_id, pkg_dict):
ckan_ini_filepath = os.path.abspath(toolkit.config['__file__'])
resources = pkg_dict
args = [ckan_ini_filepath, visibility_level, resources]
kwargs = {
'args': args,
'title': "s3_afterUpdatePackage: setting " + visibility_level + " on " + pkg_id
}
# Optional variable, if not set, default queue is used
queue = toolkit.config.get('ckanext.s3filestore.queue', None)
if queue:
kwargs['queue'] = queue

toolkit.enqueue_job(s3_afterUpdatePackage, **kwargs)
LOG.debug("enqueue_resource_visibility_update_job: Package %s has been enqueued",
pkg_id)

# IRoutes
# Ignored on CKAN >= 2.8
Expand Down
34 changes: 34 additions & 0 deletions ckanext/s3filestore/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# encoding: utf-8

import logging
import os

from ckan import plugins as p

toolkit = p.toolkit
log = logging.getLogger(__name__)


def s3_afterUpdatePackage(ckan_ini_filepath, visibility_level, pkg_id, pkg_dict):
'''
Archive a package.
'''
if ckan_ini_filepath:
toolkit.load_config(ckan_ini_filepath)

log.info('Starting s3_afterUpdatePackage task: package_id=%r, visibility_level=%s', pkg_id, visibility_level)

# Do all work in a sub-routine so it can be tested without a job queue.
# Also put try/except around it, as it is easier to monitor CKAN's log
# rather than a queue's task status.
try:
plugin = p.get_plugin("s3filestore")
plugin.after_update_resource_list_update(visibility_level, pkg_id, pkg_dict)
except Exception as e:
if os.environ.get('DEBUG'):
raise
# Any problem at all is logged and reraised so that the job queue
# can log it too
log.error('Error occurred during s3_afterUpdatePackage of package %s: %s',
pkg_id, e)
raise
6 changes: 2 additions & 4 deletions ckanext/s3filestore/tests/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ def _upload_resource(self):

file_path = os.path.join(os.path.dirname(__file__), 'data.csv')
resource = helpers.call_action('resource_create', context={'ignore_auth': True}, package_id='my-dataset',
upload=open(file_path),
url='file.txt')
upload=open(file_path))
return resource

@helpers.change_config('ckan.site_url', 'http://mytest.ckan.net')
Expand Down Expand Up @@ -133,8 +132,7 @@ def _upload_resource(self):

file_path = os.path.join(os.path.dirname(__file__), 'data.csv')
resource = demo.action.resource_create(package_id='my-dataset',
upload=open(file_path),
url='file.txt')
upload=open(file_path))
return resource, demo, app

@helpers.change_config('ckan.site_url', 'http://mytest.ckan.net')
Expand Down
1 change: 1 addition & 0 deletions test.ini
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ ckanext.s3filestore.host_name = http://moto:5000
ckanext.s3filestore.aws_bucket_name = my-bucket
ckanext.s3filestore.aws_storage_path = my-path
ckanext.s3filestore.filesystem_download_fallback = True
ckanext.s3filestore.acl.async_update = False

# Logging configuration
[loggers]
Expand Down

0 comments on commit 49fed84

Please sign in to comment.