Skip to content

Commit

Permalink
Fix issues in channel serialization (#112)
Browse files Browse the repository at this point in the history
  • Loading branch information
lukfor committed Aug 30, 2023
1 parent 10cf0a9 commit 8e89425
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 61 deletions.
69 changes: 39 additions & 30 deletions src/main/resources/com/askimed/nf/test/lang/process/WorkflowMock.nf
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ import groovy.json.JsonGenerator.Converter

nextflow.enable.dsl=2


// comes from testflight to find json files
// comes from nf-test to store json files
params.nf_test_output = ""

// process mapping
Expand All @@ -15,7 +14,6 @@ ${mapping}
// include test process
include { ${process} } from '${script}'


// define custom rules for JSON that will be generated.
def jsonOutput =
new JsonGenerator.Options()
Expand All @@ -26,42 +24,53 @@ def jsonOutput =

workflow {

${process}(*input)
//run process
${process}(*input)

if (${process}.output){
// consumes all output channels and stores items in a json
def channel = Channel.empty()
for (def name in ${process}.out.getNames()) {
channel << tuple(name, ${process}.out.getProperty(name))
}

def array = ${process}.out as Object[]
for (def i = 0; i < array.length ; i++) {
channel << tuple(i, array[i])
}
if (${process}.output){

// consumes all named output channels and stores items in a json file
for (def name in ${process}.out.getNames()) {
serializeChannel(name, ${process}.out.getProperty(name), jsonOutput)
}

// consumes all unnamed output channels and stores items in a json file
def array = ${process}.out as Object[]
for (def i = 0; i < array.length ; i++) {
serializeChannel(i, array[i], jsonOutput)
}

channel.subscribe { outputTupel ->
def sortedList = outputTupel[1].toList()
sortedList.subscribe { list ->
def map = new HashMap()
def outputName = outputTupel[0]
map[outputName] = list
new File("\${params.nf_test_output}/output_\${outputName}.json").text = jsonOutput.toJson(map)
}
}
}

}

def serializeChannel(name, channel, jsonOutput) {
def _name = name
println "Process channel \${_name}..."
def list = [ ]
channel.subscribe(
onNext: {
list.add(it)
},
onComplete: {
def map = new HashMap()
map[_name] = list
def filename = "\${params.nf_test_output}/output_\${_name}.json"
new File(filename).text = jsonOutput.toJson(map)
println "Wrote channel \${_name} to \${filename}"
}
)
}


workflow.onComplete {

def result = [
success: workflow.success,
exitStatus: workflow.exitStatus,
errorMessage: workflow.errorMessage,
errorReport: workflow.errorReport
]
def result = [
success: workflow.success,
exitStatus: workflow.exitStatus,
errorMessage: workflow.errorMessage,
errorReport: workflow.errorReport
]
new File("\${params.nf_test_output}/workflow.json").text = jsonOutput.toJson(result)

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ import groovy.json.JsonGenerator.Converter

nextflow.enable.dsl=2


// comes from testflight to find json files
// comes from nf-test to store json files
params.nf_test_output = ""

// process mapping
Expand All @@ -25,42 +24,53 @@ def jsonOutput =

workflow {

${workflow}(*input)

if (${workflow}.output){
// consumes all output channels and stores items in a json
def channel = Channel.empty()
for (def name in ${workflow}.out.getNames()) {
channel << tuple(name, ${workflow}.out.getProperty(name))
}
//run workflow
${workflow}(*input)

if (${workflow}.output){

// consumes all named output channels and stores items in a json file
for (def name in ${workflow}.out.getNames()) {
serializeChannel(name, ${workflow}.out.getProperty(name), jsonOutput)
}

def array = ${workflow}.out as Object[]
for (def i = 0; i < array.length ; i++) {
channel << tuple(i, array[i])
}

channel.subscribe { outputTupel ->
def sortedList = outputTupel[1].toList()
sortedList.subscribe { list ->
def map = new HashMap()
def outputName = outputTupel[0]
map[outputName] = list
new File("\${params.nf_test_output}/output_\${outputName}.json").text = jsonOutput.toJson(map)
}
}
}
// consumes all unnamed output channels and stores items in a json file
def array = ${workflow}.out as Object[]
for (def i = 0; i < array.length ; i++) {
serializeChannel(i, array[i], jsonOutput)
}

}
}


def serializeChannel(name, channel, jsonOutput) {
def _name = name
println "Process channel \${_name}..."
def list = [ ]
channel.subscribe(
onNext: {
list.add(it)
},
onComplete: {
def map = new HashMap()
map[_name] = list
def filename = "\${params.nf_test_output}/output_\${_name}.json"
new File(filename).text = jsonOutput.toJson(map)
println "Wrote channel \${_name} to \${filename}"
}
)
}


workflow.onComplete {

def result = [
success: workflow.success,
exitStatus: workflow.exitStatus,
errorMessage: workflow.errorMessage,
errorReport: workflow.errorReport
]
def result = [
success: workflow.success,
exitStatus: workflow.exitStatus,
errorMessage: workflow.errorMessage,
errorReport: workflow.errorReport
]
new File("\${params.nf_test_output}/workflow.json").text = jsonOutput.toJson(result)

}
17 changes: 17 additions & 0 deletions src/test/java/com/askimed/nf/test/lang/WorkflowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,15 @@ public void testWorkflowWithRelativePath() throws Exception {
assertEquals(0, exitCode);

}

@Test
public void testWorkflowUnamedOutputs() throws Exception {
App app = new App();
int exitCode = app.run(new String[] { "test", "test-data/workflow/unnamed/trial.unnamed.nf.test" });
assertEquals(0, exitCode);

}

@Test
public void testWorkflowWithNoOutputs() throws Exception {

Expand Down Expand Up @@ -88,5 +96,14 @@ public void testParamsIssue34Setup() throws Exception {
assertEquals(0, exitCode);

}

@Test
public void testHangingWorkflowIssue57() throws Exception {

App app = new App();
int exitCode = app.run(new String[] { "test", "test-data/workflow/hanging/meaningless_workflow.nf.test","--debug"});
assertEquals(0, exitCode);

}

}
10 changes: 10 additions & 0 deletions test-data/workflow/hanging/meaningless_workflow.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
workflow PipeWf {
take:
inputCh

main:
inputCh.set { outputCh }

emit:
outputCh
}
28 changes: 28 additions & 0 deletions test-data/workflow/hanging/meaningless_workflow.nf.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
nextflow_workflow {

name "Test workflow"
script "test-data/workflow/hanging/meaningless_workflow.nf"
workflow "PipeWf"

test("PipeWf will hang") {

when {
workflow {
"""
input[0] = Channel.from([
[
["patientID": "patientA"],
'test_file_1.txt'
]
])
"""
}
}

then {
assert workflow.success
}

}

}
30 changes: 30 additions & 0 deletions test-data/workflow/unnamed/trial.unnamed.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env nextflow
nextflow.enable.dsl=2

process sayHello {
input:
val cheers

output:
stdout emit: verbiage
path "*.txt", emit: output_files

script:
"""
echo -n $cheers
echo -n $cheers > ${cheers}.txt
"""
}

workflow trial {
take: things
main:
sayHello(things)
emit:
sayHello.out.verbiage
sayHello.out.output_files
}

workflow {
Channel.from('a','b') | trial
}
26 changes: 26 additions & 0 deletions test-data/workflow/unnamed/trial.unnamed.nf.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
nextflow_workflow {

name "Test workflow"
script "test-data/workflow/unnamed/trial.unnamed.nf"
workflow "trial"

test("Should run without failures") {
when {
params {
outdir = "tests/results"
}
workflow {
"""
input[0] = Channel.of('a','b')
"""
}
}

then {
//check if test case succeeded
assert workflow.success
assert workflow.out[0].size() == 2
assert workflow.out[1].size() == 2
}
}
}

0 comments on commit 8e89425

Please sign in to comment.