Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Sipstore addition with unlocked bucket #1119

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cap/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,3 +642,6 @@ def _(x):
# ================
REANA_ACCESS_TOKEN = os.environ.get(
'APP_REANA_ACCESS_TOKEN', None)

SIPSTORE_DEFAULT_AGENT_JSONSCHEMA = 'sipstore/agent-v0.0.1.json'
SIPSTORE_DEFAULT_BAGIT_JSONSCHEMA = 'sipstore/bagit-v0.0.1.json'
22 changes: 22 additions & 0 deletions cap/jsonschemas/sipstore/agent-v0.0.1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"allow_all": true,
"experiment": null,
"fullname": null,
"is_deposit": false,
"jsonschema": {
"type": "object",
"title": "SIPStore Agent schema.",
"description": "User agent information making the SIP.",
"properties": {
"orcid": {
"type": "string"
},
"email": {
"type": "string"
},
"ip_address": {
"type": "string"
}
}
}
}
77 changes: 77 additions & 0 deletions cap/jsonschemas/sipstore/bagit-v0.0.1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"allow_all": true,
"experiment": null,
"fullname": null,
"is_deposit": false,
"jsonschema": {
"definitions": {
"file": {
"type": "object",
"title": "Archived file information.",
"description": "JSON describing a single file.",
"additionalProperties": false,
"properties": {
"filepath": {
"description":
"Filepath to the archived file, relative to the archived directory root.",
"type": "string"
},
"fullpath": {
"description":
"Absolute filepath to the file in the archive file system.",
"type": "string"
},
"size": {
"description": "Size of the file in bytes.",
"type": "number"
},
"checksum": {
"description":
"MD5 checksum of the file. Always starts with 'md5:' prefix.",
"type": "string"
},
"file_uuid": {
"description":
"UUID of the related FileInstance object. Used for Record's data files only.",
"type": "string"
},
"metadata_id": {
"description":
"ID of the type (SIPMetadataType.id) of the related SIPMetadata object. Used for Record's metadata files only.",
"type": "number"
},
"sipfilepath": {
"description":
"Original SIPFile.filepath value. Used for Record's data files only.",
"type": "string"
},
"filename": {
"description":
"Filename of the SIPFile in the archive. Used for Record's data files only.",
"type": "string"
},
"content": {
"description":
"Text-content of the file. Used for BagIt metadata files only.",
"type": "string"
},
"fetched": {
"description":
"Marks whether given file is fetched from another bag (specified in 'fetch.txt'). If the key does not exist or is set to false, it is assumed that the file is written down in the bag, hence NOT fetched. Used for Record's data files only.",
"type": "boolean"
}
},
"required": ["filepath", "fullpath", "size", "checksum"]
}
},
"properties": {
"files": {
"description": "All files stored in this archive package.",
"type": "array",
"items": {
"$ref": "#/definitions/file"
}
}
}
}
}
55 changes: 55 additions & 0 deletions cap/jsonschemas/sipstore/file-v0.0.1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"allow_all": true,
"experiment": null,
"fullname": null,
"is_deposit": false,
"jsonschema": {
"properties": {
"filepath": {
"description":
"Filepath to the archived file, relative to the archived directory root.",
"type": "string"
},
"fullpath": {
"description":
"Absolute filepath to the file in the archive file system.",
"type": "string"
},
"size": {
"description": "Size of the file in bytes.",
"type": "number"
},
"checksum": {
"description":
"MD5 checksum of the file. Always starts with 'md5:' prefix.",
"type": "string"
},
"file_uuid": {
"description":
"UUID of the related FileInstance object. Used for Record's data files only.",
"type": "string"
},
"metadata_id": {
"description":
"ID of the type (SIPMetadataType.id) of the related SIPMetadata object. Used for Record's metadata files only.",
"type": "number"
},
"sipfilepath": {
"description":
"Original SIPFile.filepath value. Used for Record's data files only.",
"type": "string"
},
"filename": {
"description":
"Filename of the SIPFile in the archive. Used for Record's data files only.",
"type": "string"
},
"content": {
"description":
"Text-content of the file. Used for BagIt metadata files only.",
"type": "string"
}
},
"required": ["filepath", "fullpath", "size", "checksum"]
}
}
159 changes: 125 additions & 34 deletions cap/modules/deposit/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
from __future__ import absolute_import, print_function

import copy
import shutil
import tempfile
from copy import deepcopy
from functools import wraps
from contextlib import contextmanager


import requests
from celery import shared_task
Expand All @@ -43,29 +43,36 @@
from invenio_files_rest.errors import MultipartMissingParts
from invenio_files_rest.models import Bucket, FileInstance, ObjectVersion
from invenio_jsonschemas.errors import JSONSchemaNotFound
from invenio_pidstore.errors import PIDDoesNotExistError
from invenio_records.models import RecordMetadata
from invenio_records_files.models import RecordsBuckets
from invenio_rest.errors import FieldError

from invenio_sipstore.api import RecordSIP, SIP as SIPApi
from invenio_sipstore.archivers import BagItArchiver
from invenio_sipstore.models import SIP as SIPModel, \
RecordSIP as RecordSIPModel

from jsonschema.validators import Draft4Validator, RefResolutionError
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import NoResultFound
from werkzeug.local import LocalProxy

from cap.config import FILES_URL_MAX_SIZE
from cap.modules.records.api import CAPRecord
from cap.modules.repoimporter.repo_importer import RepoImporter
from cap.modules.schemas.models import Schema
from cap.modules.user.errors import DoesNotExistInLDAP
from cap.modules.user.utils import (get_existing_or_register_role,
get_existing_or_register_user)

from .errors import (DepositValidationError, FileUploadError,
from .errors import (ArchivingError, DepositValidationError, FileUploadError,
UpdateDepositPermissionsError)
from .fetchers import cap_deposit_fetcher
from .minters import cap_deposit_minter
from .permissions import (AdminDepositPermission, CloneDepositPermission,
DepositAdminActionNeed, DepositReadActionNeed,
DepositUpdateActionNeed, UpdateDepositPermission)
from .utils import compare_files, task_commit, ensure_content_length

_datastore = LocalProxy(lambda: current_app.extensions['security'].datastore)

Expand Down Expand Up @@ -171,6 +178,76 @@ def wrapper(self, *args, **kwargs):
return method(self, *args, **kwargs)
return wrapper

@contextmanager
def _process_files(self, record_id, data):
"""Snapshot bucket and add files in record during first publishing."""
# import ipdb
# ipdb.set_trace()
# if self.files:
assert not self.files.bucket.locked
self.files.bucket.locked = False # Do not lock the bucket
# snapshot = self.files.bucket.snapshot(lock=True)
snapshot = self.files.bucket.snapshot()
data['_files'] = self.files.dumps(bucket=snapshot.id)
yield data
db.session.add(RecordsBuckets(
record_id=record_id, bucket_id=snapshot.id
))
# else:
# yield data

def _publish_edited(self):
"""Publish the deposit after for editing."""
record_pid, record = self.fetch_published()
# self.sync(record.files.bucket)
self.files.bucket.sync(record.files.bucket)
if record.revision_id == self['_deposit']['pid']['revision_id']:
data = dict(self.dumps())
else:
data = self.merge_with_published()

data['$schema'] = self.record_schema
data['_deposit'] = self['_deposit']
record = record.__class__(data, model=record.model)
return record

def prepare_record_for_sip(self, deposit, create_sip_files=None,
is_first_publishing=False):
recid, record = deposit.fetch_published()
sip_patch_of = None
if not is_first_publishing:
sip_recid = recid

sip_patch_of = (
db.session.query(SIPModel)
.join(RecordSIPModel, RecordSIPModel.sip_id == SIPModel.id)
.filter(RecordSIPModel.pid_id == sip_recid.id)
.order_by(SIPModel.created.desc())
.first()
)

recordsip = RecordSIP.create(
recid, record, archivable=True,
create_sip_files=create_sip_files,
sip_metadata_type='json',
user_id=current_user.id,
agent=None)

archiver = BagItArchiver(
recordsip.sip, include_all_previous=(not is_first_publishing),
patch_of=sip_patch_of)

archiver.save_bagit_metadata()

sip = (
RecordSIPModel.query
.filter_by(pid_id=recid.id)
.order_by(RecordSIPModel.created.desc())
.first().sip
)

archive_sip.delay(str(sip.id))

@mark_as_action
def permissions(self, pid=None):
"""Permissions action.
Expand All @@ -197,7 +274,22 @@ def publish(self, *args, **kwargs):
if file_.data['checksum'] is None:
raise MultipartMissingParts()

return super(CAPDeposit, self).publish(*args, **kwargs)
try:
_, last_record = self.fetch_published()
is_first_publishing = False
fetched_files = last_record.files
create_sip_files = not compare_files(fetched_files, self.files)
except (PIDDoesNotExistError, KeyError):
is_first_publishing = True
create_sip_files = True if self.files else False

deposit = super(CAPDeposit, self).publish(*args, **kwargs)
self.prepare_record_for_sip(
deposit,
create_sip_files=create_sip_files,
is_first_publishing=is_first_publishing)

return deposit

@mark_as_action
def upload(self, pid=None, *args, **kwargs):
Expand Down Expand Up @@ -601,32 +693,31 @@ def download_repo(pid, url, filename):
task_commit(record, response.raw, filename, total)


def task_commit(record, response, filename, total):
"""Commit file to the record."""
record.files[filename].file.set_contents(
response,
default_location=record.files.bucket.location.uri,
size=total
)
db.session.commit()


def ensure_content_length(
url, method='GET',
session=None,
max_size=FILES_URL_MAX_SIZE or 2**20,
*args, **kwargs):
"""Add Content-Length when no present."""
kwargs['stream'] = True
session = session or requests.Session()
r = session.request(method, url, *args, **kwargs)
if 'Content-Length' not in r.headers:
# stream content into a temporary file so we can get the real size
spool = tempfile.SpooledTemporaryFile(max_size)
shutil.copyfileobj(r.raw, spool)
r.headers['Content-Length'] = str(spool.tell())
spool.seek(0)
# replace the original socket with our temporary file
r.raw._fp.close()
r.raw._fp = spool
return r
@shared_task(ignore_result=True, max_retries=6,
default_retry_delay=4 * 60 * 60)
def archive_sip(sip_uuid):
"""Send the SIP for archiving.

Retries every 4 hours, six times, which should work for up to 24 hours
archiving system downtime.
:param sip_uuid: UUID of the SIP for archiving.
:type sip_uuid: str
"""
try:
sip = SIPApi(SIPModel.query.get(sip_uuid))
archiver = BagItArchiver(sip)
bagmeta = archiver.get_bagit_metadata(sip)
if bagmeta is None:
raise ArchivingError(
'Bagit metadata does not exist for SIP: {0}.'.format(sip.id))
if sip.archived:
raise ArchivingError(
'SIP was already archived {0}.'.format(sip.id))
archiver.write_all_files()
sip.archived = True
db.session.commit()
except Exception as exc:
# On ArchivingError (see above), do not retry, but re-raise
if not isinstance(exc, ArchivingError):
archive_sip.retry(exc=exc)
raise
4 changes: 4 additions & 0 deletions cap/modules/deposit/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
from invenio_rest.errors import RESTException


class ArchivingError(Exception):
"""Represents a SIP archiving error that can occur during task."""


class DepositDoesNotExist(Exception):
"""Deposit with given key does not exist exception."""

Expand Down
Loading