Skip to content

Commit

Permalink
Merge pull request #80 from qld-gov-au/develop
Browse files Browse the repository at this point in the history
[QOL-8518] [QOL-8612]  Multiple fixes (Redis ignore exceptions, speed improvements, cleanup s3 acl flags)
  • Loading branch information
ThrawnCA authored Feb 11, 2022
2 parents ced8175 + 30a4984 commit 661f720
Show file tree
Hide file tree
Showing 12 changed files with 473 additions and 109 deletions.
33 changes: 30 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,16 @@ Optional::
# The ckan storage path option must also be set correctly for the fallback to work
ckan.storage_path = path/to/storage/directory

# An optional setting to change the acl of the uploaded files. Default public-read.
# An optional setting to change the ACL of the uploaded files.
# Default 'public-read'.
ckanext.s3filestore.acl = private

# An optional setting to change the ACL of files previously uploaded
# for the resource under different names.
# 'auto' means use the same visibility as the current version.
# Default 'private'.
ckanext.s3filestore.non_current_acl = auto

# An optional setting to control whether the ACLs of uploaded files
# are updated immediately when the dataset is updated, or queued
# for asynchronous processing. Defaults to True (ie asynchronous).
Expand Down Expand Up @@ -120,10 +127,30 @@ Optional::
# The expiry should be longer than the window (not equal);
# otherwise, a URL may expire before a new one is available.
# If either value is zero or negative, then URL caching is disabled.
# 'public_url_cache_window': How long a public (unsigned) URL will be reused.
ckanext.s3filestore.signed_url_expiry = 3600
ckanext.s3filestore.signed_url_cache_window = 1800

# Queue used by s3 plugin, if not set, default queue is used
ckanext.s3filestore.public_url_cache_window = 86400

# Control how long the ACL of an S3 object will be held in cache.
# Uploading a new file overrides this. Default is 86400 (24 hours).
ckanext.s3filestore.acl_cache_window = 2592000

# If set, then prior objects uploaded not matching current filename for a
# resource may be deleted after the specified number of days from uploaded date.
# If less than zero, nothing is deleted. Defaults to -1.
#
# I.e. delete_non_current_days is set to 90 days
# If resource was uploaded 91 days ago, it will be marked for deletion
# If resource was uploaded 10 days ago, it will be deleted after 80 days time
# until next job on dataset/resource is run.
#
# Note: If S3 Versioning is enabled, then file can be recovered per external policy.
# Similar to same filename being used.
# If S3 Versioning is not enabled, then file is not recoverable.
ckanext.s3filestore.delete_non_current_days = 90

# Queue used by s3 plugin, if not set, `default` queue is used
ckanext.s3filestore.queue = bulk


Expand Down
33 changes: 33 additions & 0 deletions ckanext/s3filestore/cli_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,39 @@ def _to_pairtree_path(path):

_upload_files_to_s3(resource_ids_and_names, resource_ids_and_paths)

def update_all_visibility(self):
if config.get('ckanext.s3filestore.acl', None) != 'auto':
print("ckanext.s3filestore.acl must be set to 'auto' to execute update_all_visibility")
return

print("Updating the visibility of all datasets")

packages_ids = []

with DBConnection(config) as connection:

packages = connection.execute(text('''
SELECT distinct package_id
FROM resource
WHERE state = 'active'
AND url IS NOT NULL
AND url <> ''
AND url_type = 'upload'
'''))

if packages.rowcount:
for package in packages:
# package[0] is expected to be the id within the tuple
packages_ids.append(package[0])
else:
print("No resources found to make visible")

for package_id in packages_ids:
try:
get_action('package_patch')(data_dict={'id': package_id}, context={'ignore_auth': True})
except Exception as e:
print("Unable to package_patch on package_id '{}', exception {} ".format(package_id, e))


def _upload_files_to_s3(resource_ids_and_names, resource_ids_and_paths):
AWS_BUCKET_NAME = config.get('ckanext.s3filestore.aws_bucket_name')
Expand Down
5 changes: 5 additions & 0 deletions ckanext/s3filestore/click_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,8 @@ def upload(identifier):
commands.upload_pairtree()
else:
commands.upload_single(identifier)


@s3.command(short_help=u'Updates the visibility of all existing S3 objects to match current config')
def update_all_visibility():
S3FilestoreCommands().update_all_visibility()
6 changes: 6 additions & 0 deletions ckanext/s3filestore/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ class TestConnection(toolkit.CkanCommand, S3FilestoreCommands):
Usage:
s3 update-all-visibility
Updates the visibility of all existing S3 objects to match current config
s3 check-config
Checks if the configuration entered in the ini file is correct
Expand Down Expand Up @@ -43,6 +47,8 @@ def command(self):
self._load_config()
if self.args[0] == 'check-config':
self.check_config()
elif self.args[0] == 'update-all-visibility':
self.update_all_visibility()
elif self.args[0] == 'upload':
if len(self.args) < 2 or self.args[1] == 'all':
self.upload_all()
Expand Down
60 changes: 41 additions & 19 deletions ckanext/s3filestore/plugin.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
# encoding: utf-8
import os

import logging
import six

from ckan import plugins
import ckantoolkit as toolkit
from ckanext.s3filestore import uploader as s3_uploader
from ckan.lib.uploader import ResourceUpload as DefaultResourceUpload,\
get_resource_uploader

from ckanext.s3filestore.tasks import s3_afterUpdatePackage
import ckanext.s3filestore.tasks as tasks

from ckanext.s3filestore.redis_helper import RedisHelper

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -79,24 +82,35 @@ def get_uploader(self, upload_to, old_filename=None):

def after_update(self, context, pkg_dict):
''' Update the access of each S3 object to match the package.
'''
'''
pkg_id = pkg_dict['id']
is_private = pkg_dict.get('private', False)
LOG.debug("after_update: Package %s has been updated, notifying resources", pkg_id)

if 'resources' not in pkg_dict:
pkg_dict = toolkit.get_action('package_show')(
context=context, data_dict={'id': pkg_id})

is_private = pkg_dict.get('private', False)
is_private_str = six.text_type(is_private)

redis = RedisHelper()
cache_private = redis.get(pkg_id + '/private')
redis.put(pkg_id + '/private', is_private_str, expiry=86400)
# compare current and previous 'private' flags so we know
# if visibility has changed
if cache_private is not None and cache_private == is_private_str:
LOG.debug("Package %s privacy is unchanged", pkg_id)
return

# visibility has changed; update associated S3 objects
visibility_level = 'private' if is_private else 'public-read'
async_update = self.async_visibility_update
if async_update:
try:
self.enqueue_resource_visibility_update_job(visibility_level, pkg_id, pkg_dict)
self.enqueue_resource_visibility_update_job(visibility_level, pkg_id)
except Exception as e:
LOG.debug("after_update: Could not enqueue due to %s, doing inline", e)
LOG.debug("after_update: Failed to enqueue, updating inline. Error: [%s]", e)
async_update = False
if not async_update:
if 'resources' not in pkg_dict:
pkg_dict = toolkit.get_action('package_show')(
context=context, data_dict={'id': pkg_id})
self.after_update_resource_list_update(visibility_level, pkg_id, pkg_dict)

def after_update_resource_list_update(self, visibility_level, pkg_id, pkg_dict):
Expand All @@ -110,20 +124,28 @@ def after_update_resource_list_update(self, visibility_level, pkg_id, pkg_dict):
target_acl=visibility_level)
LOG.debug("after_update_resource_list_update: Package %s has been updated, notifying resources finished", pkg_id)

def enqueue_resource_visibility_update_job(self, visibility_level, pkg_id, pkg_dict):
ckan_ini_filepath = os.path.abspath(toolkit.config['__file__'])
resources = pkg_dict
args = [ckan_ini_filepath, visibility_level, pkg_id, resources]
kwargs = {
'args': args,
'title': "s3_afterUpdatePackage: setting " + visibility_level + " on " + pkg_id
def enqueue_resource_visibility_update_job(self, visibility_level, pkg_id):

enqueue_args = {
'fn': tasks.s3_afterUpdatePackage,
'title': "s3_afterUpdatePackage: setting {} on {}".format(visibility_level, pkg_id),
'kwargs': {'visibility_level': visibility_level, 'pkg_id': pkg_id},
}
if toolkit.check_ckan_version('2.8'):
ttl = 24 * 60 * 60 # 24 hour ttl.
rq_kwargs = {
'ttl': ttl
}
if toolkit.check_ckan_version('2.9'):
rq_kwargs['failure_ttl'] = ttl
enqueue_args['rq_kwargs'] = rq_kwargs

# Optional variable, if not set, default queue is used
queue = toolkit.config.get('ckanext.s3filestore.queue', None)
if queue:
kwargs['queue'] = queue
enqueue_args['queue'] = queue

toolkit.enqueue_job(s3_afterUpdatePackage, **kwargs)
toolkit.enqueue_job(**enqueue_args)
LOG.debug("enqueue_resource_visibility_update_job: Package %s has been enqueued",
pkg_id)

Expand Down
53 changes: 53 additions & 0 deletions ckanext/s3filestore/redis_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# encoding: utf-8

import logging
import six

from ckan.lib.redis import connect_to_redis

log = logging.getLogger(__name__)

REDIS_PREFIX = 'ckanext-s3filestore:'


class RedisHelper:

def _get_cache_key(self, path):
return REDIS_PREFIX + path

def get(self, key):
''' Get a value from the cache, if available.
Returned values will be converted to text type instead of bytes.
'''
cache_key = self._get_cache_key(key)
try:
redis_conn = connect_to_redis()
cache_value = redis_conn.get(cache_key)
except Exception as e:
log.error("Failed to connect to Redis cache: %s", e)
cache_value = None
if cache_value is not None and hasattr(six, 'ensure_text'):
cache_value = six.ensure_text(cache_value)
return cache_value

def put(self, key, value, expiry=None):
''' Set a URL value in the cache, if available, with the
specified expiry. If expiry is None, no action is taken.
'''
if expiry:
cache_key = self._get_cache_key(key)
try:
redis_conn = connect_to_redis()
redis_conn.set(cache_key, value, ex=expiry)
except Exception as e:
log.error("Failed to connect to Redis cache: %s", e)

def delete(self, key):
''' Delete a value from the cache, if available.
'''
cache_key = self._get_cache_key(key)
try:
redis_conn = connect_to_redis()
redis_conn.delete(cache_key)
except Exception as e:
log.error("Failed to connect to Redis cache: %s", e)
26 changes: 19 additions & 7 deletions ckanext/s3filestore/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,38 @@
log = logging.getLogger(__name__)


def s3_afterUpdatePackage(ckan_ini_filepath, visibility_level, pkg_id, pkg_dict):
'''
Archive a package.
def s3_afterUpdatePackage(ckan_ini_filepath=None, visibility_level=None, pkg_id=None, pkg_dict=None):
u'''
After Update a package.
:param string ckan_ini_filepath: Deprecated, will be removed version+1 release so that in situ jobs are not lost.
:param boolean visibility_level: what visibility should be set
:param string pkg_id: package id for resources to update
:param dict pkg_dic: Deprecated, will be removed version+1 release so that in situ jobs are not lost.
:raises Exception: if job has failure.
'''
if ckan_ini_filepath:
toolkit.load_config(ckan_ini_filepath)

log.info('Starting s3_afterUpdatePackage task: package_id=%r, visibility_level=%s', pkg_id, visibility_level)

# Do all work in a sub-routine so it can be tested without a job queue.
# Also put try/except around it, as it is easier to monitor CKAN's log
# rather than a queue's task status.
try:
pkg_dict = toolkit.get_action('package_show')({'ignore_auth': True}, {'id': pkg_id})

plugin = p.get_plugin("s3filestore")
plugin.after_update_resource_list_update(visibility_level, pkg_id, pkg_dict)
log.info('Finished s3_afterUpdatePackage task: package_id=%r, visibility_level=%s', pkg_id, visibility_level)

except Exception as e:
if os.environ.get('DEBUG'):
raise
# Any problem at all is logged and reraised so that the job queue
# can log it too
log.error('Error occurred during s3_afterUpdatePackage of package %s: %s',
pkg_id, e)
log.error('Error s3_afterUpdatePackage task: package_id=%r, visibility_level=%s stackTrace: %s',
pkg_id, visibility_level, e)
raise
40 changes: 39 additions & 1 deletion ckanext/s3filestore/tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import mock
from parameterized import parameterized

import ckantoolkit as toolkit

from ckanext.s3filestore import tasks
from ckanext.s3filestore.plugin import S3FileStorePlugin


Expand All @@ -27,7 +30,7 @@ def test_update_config(self):
(False, 'public-read')
])
def test_package_after_update(self, is_private, expected_acl):
'''S3 object visibility is updated to match package'''
''' S3 object visibility is updated to match package'''
pkg_dict = {'id': 'test-package',
'resources': [{'id': 'test-resource'}]}
if is_private is not None:
Expand All @@ -42,3 +45,38 @@ def test_package_after_update(self, is_private, expected_acl):
self.plugin.after_update({}, pkg_dict)
mock_uploader.update_visibility.assert_called_once_with(
'test-resource', target_acl=expected_acl)

def test_enqueueing_visibility_update(self):
''' Asynchronous job is created to update object visibility.
'''
# ensure that we don't trigger errors
self.plugin.enqueue_resource_visibility_update_job('private', 'abcde')

# check that the args were actually passed in
with mock.patch('rq.Queue.enqueue_call') as enqueue_call:
self.plugin.enqueue_resource_visibility_update_job('private', 'abcde')
if toolkit.check_ckan_version(max_version='2.7.99'):
enqueue_call.assert_called_once_with(
func=tasks.s3_afterUpdatePackage,
args=[],
kwargs={'visibility_level': 'private', 'pkg_id': 'abcde'}
)
else:
from ckan.lib.jobs import DEFAULT_JOB_TIMEOUT
if toolkit.check_ckan_version('2.9'):
enqueue_call.assert_called_once_with(
func=tasks.s3_afterUpdatePackage,
args=[],
kwargs={'visibility_level': 'private', 'pkg_id': 'abcde'},
timeout=DEFAULT_JOB_TIMEOUT,
ttl=86400,
failure_ttl=86400
)
else:
enqueue_call.assert_called_once_with(
func=tasks.s3_afterUpdatePackage,
args=[],
kwargs={'visibility_level': 'private', 'pkg_id': 'abcde'},
timeout=DEFAULT_JOB_TIMEOUT,
ttl=86400
)
Loading

0 comments on commit 661f720

Please sign in to comment.