Skip to content

Commit

Permalink
feat(235) explicit output uri (#237)
Browse files Browse the repository at this point in the history
#235 and friends

---------

Co-authored-by: Dr. Ernie Prabhakar <[email protected]>
  • Loading branch information
drernie and drernie authored Sep 12, 2024
1 parent 84e1601 commit 27d2f76
Show file tree
Hide file tree
Showing 20 changed files with 647 additions and 240 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/mega-linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Checkout Code
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4
with:
token: ${{ secrets.PAT || secrets.GITHUB_TOKEN }}
token: ${{ secrets.GITHUB_TOKEN }} # secrets.PAT ||
fetch-depth: 0 # If you use VALIDATE_ALL_CODEBASE = true, you can remove this line to improve performances

# MegaLinter
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
os: [windows-latest, ubuntu-latest, macos-latest]
java_version: [11, 17, 19]
runs-on: ${{ matrix.os }}

Expand Down Expand Up @@ -60,14 +60,14 @@ jobs:
with:
name: nf-quilt-test-reports-${{ matrix.os }}-${{ matrix.java_version }}
path: |
D:\a\nf-quilt\nf-quilt\plugins\nf-quilt\build\reports\tests\test\
D:\a\nf-quilt\nf-quilt\plugins\nf-quilt\build\reports\
overwrite: true
- name: Archive production artifacts (Linux and MacOS)
uses: actions/upload-artifact@v4
if: ${{ always() && matrix.os != 'windows-latest' }}
with:
name: nf-quilt-test-reports-${{ matrix.os }}-${{ matrix.java_version }}
path: |
${{ github.workspace }}/plugins/nf-quilt/build/reports/tests/test/
${{ github.workspace }}/plugins/nf-quilt/build/reports/
overwrite: true

31 changes: 29 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,35 @@
# Changelog

## [0.8.1] UNRELEASED
## [0.8.6] 2024-09-11

- Fix bug in path extraction from S3 URIs
- Fix addOverlay bug on subfolders
- Fix Windows tests
- Improve test coverage

## [0.8.5] 2024-09-10a

- Error with packaging subfolders on S3 overlay
- Improved overlay debugging

## [0.8.4] 2024-09-10

- Fix bug with unrecognized output URIs

## [0.8.3] 2024-09-08

- Fix Windows bug with overlay files

## [0.8.2] 2024-09-07

- Use copyFile rather than writeString for overlay files [requires NextFlow 23 or later]
- Restore README and quilt_summarize to output

## [0.8.1] 2024-09-05

- Get output URI directly from params
- Add `dest` parameter to Quilt URIs inferred from S3 URIs
- Specify `outputPrefixes` using `quilt` section of `nextflow.config`
- Stop proactively installing packages

## [0.8.0] 2024-08-31

Expand Down
21 changes: 17 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,22 @@ Nextflow plugin for reading and writing Quilt packages as a FileSystem
developed by [Quilt Data](https://quiltdata.com/) that enables you read and write directly
to Quilt packages using `quilt+s3` URIs wherever your Nextflow pipeline currently use `s3` URIs.

In v0.8+, the plugin can even be used with "native" URIs, and it will automatically register a Quilt package at the root of the bucket.
## NEW: Use nf-quilt plugin with existing S3 URIs

Inspired by the original [`nf-quilt`](https://github.com/nextflow-io/nf-quilt) plugin (v0.2.0) developed by Seqera labs.
In v0.8+, the plugin can even be used with "native" S3 URIs. You can continue using your exising S3 URIs,and Nextflow will write the data out as usual. However, simply by adding the `nf-quilt` plugin, you can also "overlay" that data with a Quilt package containing all the metadata from that run.

For example:

```shell
nextflow run nf-core/rnaseq -plugins nf-quilt --outdir "s3://quilt-example-bucket/test/nf_quilt_rnaseq"
# other parameters omitted for brevity
```

will automatically create the package:

```url
quilt+s3://quilt-example-bucket#package=test/nf_quilt_rnaseq
```

## I. Using the nf-quilt plugin in Production

Expand Down Expand Up @@ -80,8 +93,8 @@ From the command-line, do, e.g.:
```bash
# export NXF_VER=23.04.3
export LOG4J_DEBUG=true # for verbose logging
export NXF_PLUGINS_TEST_REPOSITORY=https://github.com/quiltdata/nf-quilt/releases/download/0.8.0/nf-quilt-0.8.0-meta.json
nextflow run main.nf -plugins [email protected].0
export NXF_PLUGINS_TEST_REPOSITORY=https://github.com/quiltdata/nf-quilt/releases/download/0.8.6/nf-quilt-0.8.6-meta.json
nextflow run main.nf -plugins [email protected].6
```

For Tower, you can use the "Pre-run script" to set the environment variables.
Expand Down
2 changes: 1 addition & 1 deletion plugins/nf-quilt/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ jacocoTestCoverageVerification {
violationRules {
rule {
limit {
minimum = 0.65
minimum = 0.7
}
}

Expand Down
191 changes: 124 additions & 67 deletions plugins/nf-quilt/src/main/nextflow/quilt/QuiltObserver.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package nextflow.quilt

import nextflow.Session
import nextflow.quilt.jep.QuiltParser
import nextflow.quilt.jep.QuiltPackage
import nextflow.quilt.nio.QuiltPath
import nextflow.quilt.nio.QuiltPathFactory
import nextflow.trace.TraceObserver
Expand All @@ -38,10 +39,11 @@ import groovy.util.logging.Slf4j
class QuiltObserver implements TraceObserver {

private Session session
final private Map<String,String> uniqueURIs = [:]
final private Map<String,String> publishedURIs = [:]
private String workDir

// Is this overkill? Do we only ever have one output package per run?
// Is this overkill? Do we only ever have one output URI per run?
private String[] outputPrefixes = ['pub', 'out']
final private Map<String,String> outputURIs = [:]
final private Map<String, Map<String, Path>> packageOverlays = [:]
final private Lock lock = new ReentrantLock() // Need this because of threads

Expand All @@ -57,107 +59,162 @@ class QuiltObserver implements TraceObserver {
return null
}

static String pkgKey(QuiltPath path) {
return "${path.getBucket()}/${path.getPackageName()}"
static String quiltURIfromS3(String s3uri) {
log.debug("quiltURIfromS3: $s3uri")
String[] partsArray = s3uri.split('/')
List<String> parts = new ArrayList(partsArray.toList())
// parts.eachWithIndex { p, i -> println("quiltURIfromS3.parts[$i]: $p") }

if (parts.size() < 2) {
throw new IllegalArgumentException("Invalid s3uri[${parts.size()}]: $parts")
}
parts = parts.drop(2)
if (parts[0].endsWith(':')) {
parts = parts.drop(1)
}
String bucket = parts.remove(0)
String dest = parts.join('%2f')
String suffix = parts.size() > 1 ? parts.removeLast() : 'default_suffix'
String prefix = parts.size() > 0 ? parts.removeLast() : 'default_prefix'
String base = "quilt+s3://${bucket}#package=${prefix}%2f${suffix}"
String uri = base + '&dest=' + ((dest) ?: '/')
return uri
}

static String pathless(String uri) {
return uri.replaceFirst(/&path=[^&]+/, '')
static String pkgKey(QuiltPath path) {
return QuiltPackage.osConvert("${path.getBucket()}/${path.getPackageName()}")
}

String checkPath(QuiltPath path, boolean published = false) {
log.debug("checkPath[$path] published[$published]")
String key = pkgKey(path)
String uri = pathless(path.toUriString())
// only keep the longest pathless URI for each key
if (uniqueURIs[key]?.length() < uri.length()) {
uniqueURIs[key] = uri
}
if (published) {
publishedURIs[key] = uniqueURIs[key]
void findOutputParams(Map<String, Object> params) {
log.debug("findOutputParams[$params]")
params.each { key, value ->
String uri = "$value"
if (outputPrefixes.any { key.startsWith(it) && !key.contains('-') }) {
String[] splits = uri.split(':')
if (splits.size() < 2) {
log.debug("Unrecognized URI[$uri] for key[$key] matching $outputPrefixes")
return
}
String scheme = splits[0]
if (scheme == 's3') {
uri = quiltURIfromS3(uri)
} else if (scheme != 'quilt+s3') {
log.warn("Unrecognized scheme:$scheme for output URI[$key]: $uri")
return
}
QuiltPath path = QuiltPathFactory.parse(uri)
String pkgKey = pkgKey(path)
outputURIs[pkgKey] = uri
}
}
return uniqueURIs[key]
}

String extractPackageURI(Path nonQuiltPath) {
String pathString = nonQuiltPath.toUri()
// println("extractPackageURI.pathString[${nonQuiltPath}] -> $pathString")
String[] partsArray = pathString.split('/')
List<String> parts = new ArrayList(partsArray.toList())
// parts.eachWithIndex { p, i -> println("extractPackageURI.parts[$i]: $p") }

if (parts.size() < 3) {
throw new IllegalArgumentException("Invalid pathString: $pathString ($nonQuiltPath)")
void checkConfig(Map<String, Map<String,Object>> config) {
Object prefixes = config.get('quilt')?.get('outputPrefixes')
if (prefixes) {
outputPrefixes = prefixes as String[]
}
parts = parts.drop(3)
if (parts[0].endsWith(':')) {
parts = parts.drop(1)
}

String workRelative(Path src) {
Path source = src.toAbsolutePath().normalize()
Path workDir = session.workDir.toAbsolutePath().normalize()
try {
Path subPath = workDir.relativize(source)
// drop first two components, which are the workDir
Path relPath = subPath.subpath(2, subPath.getNameCount())
return relPath.toString()
} catch (IllegalArgumentException e) {
log.error("workRelative.fallback: $e")
log.warn("Cannot relativize source:${source.getClass()} to workDir:${workDir.getClass()}")
return source.toString()
}
String bucket = parts.remove(0)
String file_path = parts.remove(parts.size() - 1)
String prefix = parts.size() > 0 ? parts.remove(0) : 'default_prefix'
String suffix = parts.size() > 0 ? parts.remove(0) : 'default_suffix'
if (parts.size() > 0) {
String folder_path = parts.join('/')
file_path = folder_path + '/' + file_path
}

String pkgRelative(String key, Path dest) {
String destString = QuiltPackage.osConvert(dest.toAbsolutePath().normalize().toString())
String pkgKey = QuiltPackage.osConvert(key)
// find pkgKey in destination.toString()
int index = destString.indexOf(pkgKey)
println("pkgRelative[$index]: $pkgKey in $destString")
// return the portion after the end of pkgKey
int len = index + pkgKey.length() + 1
if (index >= 0 && len < destString.length()) {
return destString.substring(len)
}
return null
}

// TODO: should overlay packages always force to new versions?
String base = "quilt+s3://${bucket}#package=${prefix}%2f${suffix}"
String uri = "${base}&path=${file_path}"

String key = pkgKey(QuiltPathFactory.parse(uri))
Map<String, Path> current = packageOverlays.get(key, [:]) as Map<String, Path>
current[file_path] = nonQuiltPath
lock.withLock {
uniqueURIs[key] = base
publishedURIs[key] = base
packageOverlays[key] = current
String addOverlay(String pkgKey, Path dest, Path source) {
lock.lock()
try {
Map<String, Path> overlays = packageOverlays.get(pkgKey, [:]) as Map<String, Path>
String relPath = pkgRelative(pkgKey, dest)
println("addOverlay.relPath: $relPath")
log.debug("addOverlay[$relPath] = dest:$dest <= source:$source")
overlays[relPath] = source
packageOverlays[pkgKey] = overlays
return relPath
} finally {
lock.unlock()
}
return uri
return null
}

void checkParams(Map params) {
log.debug("checkParams[$params]")
params.each { k, value ->
String uri = "$value"
if (uri.startsWith(QuiltParser.SCHEME)) {
log.debug("checkParams.uri[$k]: $uri")
QuiltPath path = QuiltPathFactory.parse(uri)
checkPath(path)
boolean confirmQuiltPath(QuiltPath qPath) {
log.debug("confirmQuiltPath[$qPath]")
String key = pkgKey(qPath)
log.debug("confirmQuiltPath: key[$key] in outputURIs[${outputURIs.size()}]: $outputURIs")
return outputURIs.containsKey(key) ? true : false
}

boolean canOverlayPath(Path dest, Path source) {
log.debug("canOverlayPath[$dest] <- $source")
Set<String> keys = outputURIs.keySet()
for (String key : keys) {
if (dest.toString().contains(key)) {
log.debug("canOverlayPath: matched key[$key] to $dest")
addOverlay(key, dest, source)
return true
}
}
log.error("canOverlayPath: no key found for $dest in $keys")
return false
}

@Override
void onFlowCreate(Session session) {
log.debug("`onFlowCreate` $this")
this.session = session
checkParams(session.getParams())
this.workDir = session.config.workDir
findOutputParams(session.getParams())
checkConfig(session.config)
}

// NOTE: TraceFileObserver calls onFilePublish _before_ onFlowCreate
@Override
void onFilePublish(Path destination, Path source) {
// Path source may be null, won't work with older versions of Nextflow
log.debug("onFilePublish.Path[$destination] <- $source")
if (!outputURIs) {
// NOTE: TraceFileObserver calls onFilePublish _before_ onFlowCreate
log.debug('onFilePublish: no outputURIs yet')
return
}
QuiltPath qPath = asQuiltPath(destination)
if (qPath) {
checkPath(qPath, true)
} else {
String uri = extractPackageURI(destination)
log.debug("onFilePublish.NonQuiltPath[$destination]: $uri")
boolean ok = (qPath != null) ? confirmQuiltPath(qPath) : canOverlayPath(destination, source)
if (!ok) {
log.error("onFilePublish: no match for $destination")
}
}

@Override
void onFlowComplete() {
log.debug("onFlowComplete.publishedURIs[${publishedURIs.size()}]: $publishedURIs")
log.debug("onFlowComplete.outputURIs[${outputURIs.size()}]: $outputURIs")
// create QuiltProduct for each unique package URI
publishedURIs.each { key, uri ->
outputURIs.each { key, uri ->
QuiltPath path = QuiltPathFactory.parse(uri)
Map<String, Path> overlays = packageOverlays.get(key, [:]) as Map<String, Path>
// log.debug("onFlowComplete.pkg: $path overlays[${overlays?.size()}]: $overlays")
log.debug("onFlowComplete.pkg: $path overlays[${overlays?.size()}]: $overlays")
new QuiltProduct(path, session, overlays)
}
}
Expand Down
Loading

0 comments on commit 27d2f76

Please sign in to comment.