Skip to content

Commit

Permalink
Improve join duplicate detection
Browse files Browse the repository at this point in the history
  • Loading branch information
pditommaso committed Jul 19, 2020
1 parent 05c358c commit 33eb9ef
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 18 deletions.
33 changes: 21 additions & 12 deletions modules/nextflow/src/main/groovy/nextflow/extension/JoinOp.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ class JoinOp {
result.onComplete = {
if( stopCount.decrementAndGet()==0 && !failed ) {
try {
if( remainder )
remainder0(buffer,size,target)
if( remainder || failOnDuplicate )
checkRemainder(buffer,size,target)
if( failOnMismatch )
checkForMismatch(buffer)
}
Expand Down Expand Up @@ -173,6 +173,9 @@ class JoinOp {
// get the index key for this object
final item0 = DataflowHelper.makeKey(pivot, data)

// check for unique keys
checkForDuplicate(item0.keys, item0.values, index, false)

// given a key we expect to receive on object with the same key on each channel
Map<Integer,List> channels = buffer.get(item0.keys)
if( channels==null ) {
Expand Down Expand Up @@ -213,11 +216,12 @@ class JoinOp {
}
}

checkForDuplicate(item0.keys, result)
// track unique keys
checkForDuplicate(item0.keys, item0.values, index, true)
return result
}

private final void remainder0( Map<Object,Map<Integer,List>> buffers, int count, DataflowWriteChannel target ) {
private final void checkRemainder(Map<Object,Map<Integer,List>> buffers, int count, DataflowWriteChannel target ) {
log.trace "Operator `join` remainder buffer: ${-> buffers}"

for( Object key : buffers.keySet() ) {
Expand All @@ -232,6 +236,9 @@ class JoinOp {
for( int i=0; i<count; i++ ) {
List values = entry[i]
if( values ) {
// track unique keys
checkForDuplicate(key, values[0], i,false)

addToList(result, values[0])
fill |= true
values.remove(0)
Expand All @@ -243,8 +250,8 @@ class JoinOp {

if( fill ) {
final value = singleton() ? result[0] : result
checkForDuplicate(key,value)
target.bind(value)
// bind value to target channel
if( remainder ) target.bind(value)
}
else
break
Expand All @@ -253,21 +260,23 @@ class JoinOp {
}
}

protected void checkForDuplicate( key, value ) {
if( failOnDuplicate && !uniqueKeys.add(key) )
throw new AbortOperationException("Detected join operation duplicate element -- offending key=${csv0(key,',')}; value=${csv0(value,',')}")
protected void checkForDuplicate( key, value, int dir, boolean add ) {
if( failOnDuplicate && ( (add && !uniqueKeys.add(key)) || (!add && uniqueKeys.contains(key)) ) ) {
final msg = "Detected join operation duplicate emission on ${dir==0 ? 'left' : 'right'} channel -- offending element: key=${csv0(key,',')}; value=${csv0(value,',')}"
throw new AbortOperationException(msg)
}
}

protected void checkForMismatch( Map<Object,Map<Integer,List>> buffers ) {
final result = new HashMap<Object,List>()
for( Object key : buffers.keySet() ) {
Map<Integer,List> el = buffers.get(key)
final reminder = el.entrySet()
if( !reminder )
final remainder0 = el.entrySet()
if( !remainder0 )
continue

result[key] = []
for( Map.Entry entry : reminder ) {
for( Map.Entry entry : remainder0 ) {
result[key].add(csv0(entry.value,','))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class JoinOpTest extends Specification {
}


def 'should join pair with singleton and reminder' () {
def 'should join pair with singleton and remainder' () {

when:
def left = Channel.from(['P', 0], ['X', 1], ['Y', 2], ['Z', 3])
Expand Down Expand Up @@ -296,10 +296,11 @@ class JoinOpTest extends Specification {
await(sess)
then:
sess.isAborted()
sess.getError().message.startsWith('Detected join operation duplicate element -- offending key=X')
and:
sess.error.message ==~ /Detected join operation duplicate emission on (left|right) channel -- offending element: key=X; value=(3|4|5|6)/
}

def 'should fail on duplicate with reminder' () {
def 'should fail on duplicate with remainder' () {
given:
def ch1 = (DataflowReadChannel) Channel.of(['X', 1], ['X', 3])
def ch2 = (DataflowReadChannel) Channel.of(['X', 2])
Expand All @@ -313,22 +314,27 @@ class JoinOpTest extends Specification {
await(sess)
then:
sess.isAborted()
sess.getError().message.startsWith('Detected join operation duplicate element -- offending key=X')
sess.getError().message == 'Detected join operation duplicate emission on left channel -- offending element: key=X; value=3'
}

def 'should not fail on duplicate without reminder' () {
def 'should fail on duplicate without remainder' () {
given:
def ch1 = (DataflowReadChannel) Channel.of(['X', 1], ['X', 3])
def ch2 = (DataflowReadChannel) Channel.of(['X', 2])
and:
def sess = Global.session as Session

when:
def op = new JoinOp(ch1, ch2, [failOnDuplicate:true])
def result = op.apply().toList().getVal()
then:
result == [ ['X',1,2] ]
await(sess)
then:
sess.isAborted()
sess.getError().message == 'Detected join operation duplicate emission on left channel -- offending element: key=X; value=3'
}


protected void await(Session session) {
def begin = System.currentTimeMillis()
while( !session.isAborted() && System.currentTimeMillis()-begin<5_000 )
Expand Down

0 comments on commit 33eb9ef

Please sign in to comment.