Skip to content

Commit

Permalink
add flows version of cfde-ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
David Kelly committed Oct 20, 2023
1 parent 848ec25 commit 939d56e
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 0 deletions.
Empty file.
74 changes: 74 additions & 0 deletions tools/cfde-ingest/cfde_ingest/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
deriva_scope = "https://auth.globus.org/scopes/app.nih-cfde.org/deriva_all"
environments = ["dev", "staging", "prod"]
globus_cc_app = "KEY_HERE"
globus_secret = "SECRET_HERE"
long_term_storage = "/CFDE/public/"
short_term_storage = "/CFDE/data/"
transfer_scope = "urn:globus:auth:scope:transfer.api.globus.org:all"
allowed_gcs_https_hosts = r"https://[^/]*[.]data[.]globus[.]org/.*"

dev = {
"server": "app-dev.nih-cfde.org",
"gcs_endpoint": "36530efa-a1e3-45dc-a6e7-9560a8e9ac49",
}
staging = {
"server": "app-staging.nih-cfde.org",
"gcs_endpoint": "922ee14d-49b7-4d69-8f1c-8e2ff8207542"
}
prod = {
"server": "app.nih-cfde.org",
"gcs_endpoint": "d4c89edc-a22c-4bc3-bfa2-bca5fd19b404",
}

logging = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"basic": {
"format": "[{asctime}] [{levelname}] {name}: {message}",
"style": "{",
"datefmt": "%Y-%m-%d %H:%M:%S",
},
"cloudwatch": {
"format": "[{levelname}] {name}: {message}",
"style": "{",
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "DEBUG",
"formatter": "basic",
},
"cloudwatch": {
"class": "watchtower.CloudWatchLogHandler",
"level": "DEBUG",
"formatter": "cloudwatch",
"log_group_name": "cfde-ingest",
"log_stream_name": "{strftime:%Y-%m-%d}",
"send_interval": 10,
}
},
"loggers": {
"cfde_ingest": {
"level": "INFO",
"handlers": ["console", "cloudwatch"],
"propagate": False,
},
"cfde_deriva": {
"level": "INFO",
"handlers": ["console", "cloudwatch"],
"propagate": False,
},
"bdbag": {
"level": "WARNING",
"handlers": ["console", "cloudwatch"],
"propagate": False,
},
"globus_sdk": {
"level": "WARNING",
"handlers": ["console", "cloudwatch"],
"propagate": False,
},
},
}
118 changes: 118 additions & 0 deletions tools/cfde-ingest/cfde_ingest/ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import cfde_ingest.config as config
import datetime
import globus_sdk
import logging
import os
import urllib.parse
import urllib.request
from cfde_deriva.registry import Registry, WebauthnUser, WebauthnAttribute
from cfde_deriva.submission import Submission
from globus_sdk import GlobusError
from deriva.core import DerivaServer, DEFAULT_SESSION_CONFIG


class IngestClient:
def __init__(self, userinfo=None, environment=None, dcc_id=None, action_id=None):
self.logger = logging.getLogger("cfde_ingest")
self.action_id = action_id
self.dcc_id = dcc_id
self.userinfo = userinfo

if environment not in config.environments:
raise ValueError(f'Invalid environment {environment}')
self.environment = environment

try:
self.webauthnuser = self.__webauthnuser_from_userinfo(userinfo)
except (TypeError, KeyError):
self.logger.exception("Error while parsing user info")
raise

self.logger.info(f"Initiated IngestClient: action_id={action_id}, "
f"user={userinfo['client']['display_name']}"
f"email={userinfo['client']['email']}, dcc={self.dcc_id}")

@staticmethod
def __webauthnuser_from_userinfo(userinfo):
web_authn_user = WebauthnUser(
userinfo['client']['id'],
userinfo['client']['display_name'],
userinfo['client'].get('full_name'),
userinfo['client'].get('email'),
[
WebauthnAttribute(attr['id'], attr.get('display_name', 'unknown'))
for attr in userinfo['attributes']
]
)
return web_authn_user

def move_to_protected_location(self, transfer_url):
""" Move user submitted datasets to a read-only location """
action_id = self.action_id
dcc_id = self.dcc_id
transfer_token = self.__get_app_token(config.transfer_scope)
auth = globus_sdk.AccessTokenAuthorizer(transfer_token)
tc = globus_sdk.TransferClient(authorizer=auth)
parsed_url = urllib.parse.urlparse(transfer_url)
dcc_name = dcc_id.split(':')[1]
dcc_dir = os.path.join(config.long_term_storage, dcc_name)
old_ext = os.path.splitext(parsed_url.path)[1]
new_filename = f"{datetime.datetime.now().isoformat()}-{action_id}{old_ext}"

if not parsed_url.path.startswith(config.short_term_storage):
raise ValueError("Transfer requested from non-staging directory: {transfer_url}")

new_dataset_path = os.path.join(dcc_dir, new_filename)
gcs_endpoint = getattr(config, self.environment)["gcs_endpoint"]

try:
tc.operation_rename(gcs_endpoint, parsed_url.path, new_dataset_path)
except GlobusError:
self.logger.exception(f"Failed to rename {parsed_url.path} to {new_dataset_path}")
raise

url = urllib.parse.urlunparse((parsed_url.scheme, parsed_url.netloc, new_dataset_path,
"", "", ""))
return url

@staticmethod
def __get_app_token(scope):
cc_app = globus_sdk.ConfidentialAppAuthClient(
client_id=config.globus_cc_app,
client_secret=config.globus_secret)
access_token = globus_sdk.ClientCredentialsAuthorizer(
scopes=scope,
confidential_client=cc_app)
return access_token.access_token

def deriva_ingest(self, archive_url):
servername = getattr(config, self.environment)["server"]
credential = {"bearer-token": self.__get_app_token(config.deriva_scope)}
session_config = DEFAULT_SESSION_CONFIG.copy()
session_config["allow_retry_on_all_methods"] = True
registry = Registry('https', servername, credentials=credential, session_config=session_config)
server = DerivaServer('https', servername, credential, session_config=session_config)
submission_id = self.action_id
registry.validate_dcc_id(self.dcc_id, self.webauthnuser)

# The Header map protects from submitting our https_token to non-Globus URLs. This MUST
# match, otherwise the Submission() client will attempt to download the Globus GCS Auth
# login page instead. r"https://[^/]*[.]data[.]globus[.]org/.*" will match most GCS HTTP
# pages, but if a custom domain is used this MUST be updated to use that instead.
globus_ep = getattr(config, self.environment)["gcs_endpoint"]
https_token = self.__get_app_token(f'https://auth.globus.org/scopes/{globus_ep}/https')
header_map = {
config.allowed_gcs_https_hosts: {"Authorization": f"Bearer {https_token}"}
}

submission = Submission(server, registry, submission_id, self.dcc_id, archive_url,
self.webauthnuser, archive_headers_map=header_map)
submission.ingest()

md = registry.get_datapackage(submission_id)
success = md["status"] == 'cfde_registry_dp_status:content-ready'
error = md.get('diagnostics')
result = {"success": success,
"error": error,
"review_browse_url": md["review_browse_url"]}
return result
31 changes: 31 additions & 0 deletions tools/cfde-ingest/ingest_funcx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@

def ingest(userinfo=None, url=None, dcc_id=None, environment=None):
try:

import cfde_ingest.config
import logging.config
import uuid
from cfde_ingest.ingest import IngestClient

action_id = str(uuid.uuid1())
cfde_ingest.config.logging["handlers"]["cloudwatch"]["log_stream_name"] = action_id
logging.config.dictConfig(cfde_ingest.config.logging)
client = IngestClient(userinfo=userinfo,
environment=environment,
dcc_id=dcc_id,
action_id=action_id)
archive_url = client.move_to_protected_location(transfer_url=url)
result = client.deriva_ingest(archive_url=archive_url)
result["archive_url"] = archive_url
result["submission_id"] = action_id
return result

except Exception:
import traceback
return {
"success": False,
"error": traceback.format_exc(),
"review_browse_url": None,
"archive_url": None,
"submission_id": action_id
}

0 comments on commit 939d56e

Please sign in to comment.