Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Demultiplexed FASTQ support #23

Merged
merged 16 commits into from
Sep 6, 2023
9 changes: 5 additions & 4 deletions src/main/scala/org/broadinstitute/gpp/poolq3/PoolQ.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ object PoolQ {
config.skipShortReads
)

val colBarcodePolicy =
BarcodePolicy(config.colBarcodePolicyStr, colReferenceData.barcodeLength, config.skipShortReads)
val colBarcodePolicyOpt =
config.colBarcodePolicyStr.map(pol => BarcodePolicy(pol, colReferenceData.barcodeLength, config.skipShortReads))

val umiInfo = (config.input.umiReference, config.umiBarcodePolicyStr).mapN { (r, p) =>
log.info("Reading UMI reference data")
Expand All @@ -102,10 +102,11 @@ object PoolQ {
)

log.info("Building column reference")
val colBarcodeLength = colBarcodePolicyOpt.map(_.length).getOrElse(colReferenceData.barcodeLength)
val colReference: Reference =
referenceFor(
config.colMatchFn,
ReferenceData.truncator(colBarcodePolicy.length),
ReferenceData.truncator(colBarcodeLength),
config.countAmbiguous,
colReferenceData.mappings
)
Expand All @@ -116,7 +117,7 @@ object PoolQ {
}

val barcodes: CloseableIterable[Barcodes] =
barcodeSource(config.input, rowBarcodePolicy, revRowBarcodePolicyOpt, colBarcodePolicy, umiInfo.map(_._2))
barcodeSource(config.input, rowBarcodePolicy, revRowBarcodePolicyOpt, colBarcodePolicyOpt, umiInfo.map(_._2))

lazy val unexpectedSequenceCacheDir: Option[Path] =
if (config.skipUnexpectedSequenceReport) None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ final case class PoolQConfig(
countAmbiguous: Boolean = false,
rowBarcodePolicyStr: String = "",
reverseRowBarcodePolicyStr: Option[String] = None,
colBarcodePolicyStr: String = "",
colBarcodePolicyStr: Option[String] = None,
umiBarcodePolicyStr: Option[String] = None,
skipUnexpectedSequenceReport: Boolean = false,
unexpectedSequenceCacheDir: Option[Path] = None,
Expand Down Expand Up @@ -197,7 +197,7 @@ object PoolQConfig {
}

val _ = opt[String]("col-barcode-policy").valueName("<barcode-policy>").action { (p, c) =>
c.copy(colBarcodePolicyStr = p)
c.copy(colBarcodePolicyStr = Some(p))
}

val _ = opt[String]("umi-barcode-policy").valueName("<barcode-policy>").action { (p, c) =>
Expand Down Expand Up @@ -339,7 +339,7 @@ object PoolQConfig {
}
args += (("row-barcode-policy", config.rowBarcodePolicyStr))
config.reverseRowBarcodePolicyStr.foreach(p => args += (("rev-row-barcode-policy", p)))
args += (("col-barcode-policy", config.colBarcodePolicyStr))
config.colBarcodePolicyStr.foreach(pol => args += (("col-barcode-policy", pol)))
umiInfo.map(_._2).foreach(str => args += (("umi-barcode-policy", str)))

// deal with the unexpected sequence options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@ object ReadsSource {
final case class SelfContained(paths: Nel[Path]) extends ReadsSource
final case class Split(index: Nel[Path], forward: Nel[Path]) extends ReadsSource
final case class PairedEnd(index: Nel[Path], forward: Nel[Path], reverse: Nel[Path]) extends ReadsSource
final case class Dmuxed(read1: Nel[(Option[String], Path)]) extends ReadsSource

final case class DmuxedPairedEnd(read1: Nel[(Option[String], Path)], read2: Nel[(Option[String], Path)])
extends ReadsSource

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2022 The Broad Institute, Inc. All rights reserved.
*
* SPDX-License-Identifier: BSD-3-Clause
*/
package org.broadinstitute.gpp.poolq3.barcode

import org.broadinstitute.gpp.poolq3.parser.{CloseableIterable, CloseableIterator, DmuxedIterable}
import org.broadinstitute.gpp.poolq3.types.Read

final class DmuxedBarcodeSource(parser: DmuxedIterable, rowPolicy: BarcodePolicy, umiPolicyOpt: Option[BarcodePolicy])
extends CloseableIterable[Barcodes] {

private def colBarcodeOpt = parser.indexBarcode.map(bc => FoundBarcode(bc.toCharArray, 0))
mtomko marked this conversation as resolved.
Show resolved Hide resolved

private[this] class BarcodeIterator(iterator: CloseableIterator[Read]) extends CloseableIterator[Barcodes] {
override def hasNext: Boolean = iterator.hasNext

override def next(): Barcodes = {
val nextRead = iterator.next()
val rowBarcodeOpt = rowPolicy.find(nextRead)
val umiBarcodeOpt = umiPolicyOpt.flatMap(_.find(nextRead))
Barcodes(rowBarcodeOpt, None, colBarcodeOpt, umiBarcodeOpt)
}

override def close(): Unit = iterator.close()
}

override def iterator: CloseableIterator[Barcodes] = new BarcodeIterator(parser.iterator)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2022 The Broad Institute, Inc. All rights reserved.
*
* SPDX-License-Identifier: BSD-3-Clause
*/
package org.broadinstitute.gpp.poolq3.barcode

import org.broadinstitute.gpp.poolq3.parser.{CloseableIterable, CloseableIterator, DmuxedIterable}
import org.broadinstitute.gpp.poolq3.types.{Read, ReadIdCheckPolicy}

class DmuxedPairedEndBarcodeSource(
rowParser: DmuxedIterable,
revRowParser: DmuxedIterable,
rowPolicy: BarcodePolicy,
revRowPolicy: BarcodePolicy,
umiPolicyOpt: Option[BarcodePolicy],
readIdCheckPolicy: ReadIdCheckPolicy
) extends CloseableIterable[Barcodes] {

private def colBarcodeOpt = rowParser.indexBarcode.map(bc => FoundBarcode(bc.toCharArray, 0))
mtomko marked this conversation as resolved.
Show resolved Hide resolved

private[this] class BarcodeIterator(rowIterator: CloseableIterator[Read], revRowIterator: CloseableIterator[Read])
extends CloseableIterator[Barcodes] {

final override def hasNext: Boolean = rowIterator.hasNext && revRowIterator.hasNext

final override def next(): Barcodes = {
val nextRow = rowIterator.next()
val nextRevRow = revRowIterator.next()
readIdCheckPolicy.check(nextRow, nextRevRow)
val rowBarcodeOpt = rowPolicy.find(nextRow)
val revRowBarcodeOpt = revRowPolicy.find(nextRevRow)
val umiBarcodeOpt = umiPolicyOpt.flatMap(_.find(nextRow))
Barcodes(rowBarcodeOpt, revRowBarcodeOpt, colBarcodeOpt, umiBarcodeOpt)
}

final override def close(): Unit =
try rowIterator.close()
finally revRowIterator.close()

}

override def iterator: CloseableIterator[Barcodes] =
new BarcodeIterator(rowParser.iterator, revRowParser.iterator)

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ final class ThreeFileBarcodeSource(

final override def close(): Unit =
try rowIterator.close()
finally colIterator.close()
finally
try revRowIterator.close()
finally colIterator.close()

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package object barcode {
config: PoolQInput,
rowBarcodePolicy: BarcodePolicy,
revRowBarcodePolicyOpt: Option[BarcodePolicy],
colBarcodePolicy: BarcodePolicy,
colBarcodePolicyOpt: Option[BarcodePolicy],
umiBarcodePolicyOpt: Option[BarcodePolicy]
): CloseableIterable[Barcodes] =
(config.readsSource, revRowBarcodePolicyOpt) match {
case (ReadsSource.Split(index, forward), None) =>
(config.readsSource, revRowBarcodePolicyOpt, colBarcodePolicyOpt) match {
case (ReadsSource.Split(index, forward), None, Some(colBarcodePolicy)) =>
new TwoFileBarcodeSource(
parserFor(forward.toList),
parserFor(index.toList),
Expand All @@ -32,7 +32,7 @@ package object barcode {
umiBarcodePolicyOpt,
config.readIdCheckPolicy
)
case (ReadsSource.PairedEnd(index, forward, reverse), Some(revRowBarcodePolicy)) =>
case (ReadsSource.PairedEnd(index, forward, reverse), Some(revRowBarcodePolicy), Some(colBarcodePolicy)) =>
new ThreeFileBarcodeSource(
parserFor(forward.toList),
parserFor(reverse.toList),
Expand All @@ -43,7 +43,7 @@ package object barcode {
umiBarcodePolicyOpt,
config.readIdCheckPolicy
)
case (ReadsSource.SelfContained(paths), None) =>
case (ReadsSource.SelfContained(paths), None, Some(colBarcodePolicy)) =>
new SingleFileBarcodeSource(parserFor(paths.toList), rowBarcodePolicy, colBarcodePolicy, umiBarcodePolicyOpt)
case _ =>
throw new IllegalArgumentException("Incompatible reads and barcode policy settings")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
*/
package org.broadinstitute.gpp.poolq3.parser

import java.nio.file.Path

import scala.collection.mutable

import org.broadinstitute.gpp.poolq3.types.Read

abstract class CloseableIterable[A] extends Iterable[A] {
override def iterator: CloseableIterator[A]
}
Expand All @@ -23,3 +29,63 @@ object CloseableIterable {
}

}

abstract class DmuxedIterable extends CloseableIterable[Read] {

/** `Some(barcode)` or else `None` if unmatched */
def indexBarcode: Option[String]
}

object DmuxedIterable {

def apply(iterable: Iterable[(Option[String], Path)], parserFor: Path => CloseableIterator[Read]): DmuxedIterable =
new DmuxedIterableImpl(iterable, parserFor)

def apply(data: List[(Option[String], List[String])]): DmuxedIterable = {
val data2: List[(Option[String], List[Read])] = data.map { case (bco, seqs) =>
(bco, seqs.zipWithIndex.map { case (seq, i) => Read(i.toString, seq) })
}
new DmuxedIterableImpl(data2, CloseableIterator.ofList)
}

private class DmuxedIterableImpl[A](src: Iterable[(Option[String], A)], makeIterator: A => CloseableIterator[Read])
extends DmuxedIterable {

private val queue: mutable.Queue[(Option[String], A)] = mutable.Queue.from(src)

var current: CloseableIterator[Read] = _

var indexBarcode: Option[String] = _

override def iterator: CloseableIterator[Read] = new CloseableIterator[Read] {

override def hasNext: Boolean = {
var currentHasNext = if (current == null) false else current.hasNext
while (!currentHasNext && queue.nonEmpty) {
val head = queue.dequeue()
if (head != null) {
val old = current
indexBarcode = head._1
current = makeIterator(head._2)
if (old != null) {
old.close()
}
currentHasNext = current.hasNext
}
}
currentHasNext
}

override def next(): Read =
if (current == null) throw new NoSuchElementException
else current.next()

override def close(): Unit = {
Option(current).foreach(_.close())
}

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,15 @@ package org.broadinstitute.gpp.poolq3.parser
import java.io.Closeable

abstract class CloseableIterator[A] extends Iterator[A] with Closeable

object CloseableIterator {

/** A convenience implementation used for testing */
def ofList[A](xs: List[A]): CloseableIterator[A] = new CloseableIterator[A] {
val iter = xs.iterator
override def close(): Unit = ()
override def hasNext: Boolean = iter.hasNext
override def next(): A = iter.next()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2022 The Broad Institute, Inc. All rights reserved.
*
* SPDX-License-Identifier: BSD-3-Clause
*/
package org.broadinstitute.gpp.poolq3.barcode

import cats.syntax.all._
import munit.FunSuite
import org.broadinstitute.gpp.poolq3.parser.DmuxedIterable

class DmuxedBarcodeSourceTest extends FunSuite {

private[this] val rowPolicy = BarcodePolicy("FIXED@0", 10, skipShortReads = true)

def fb(s: String) = Barcodes(FoundBarcode(s.toCharArray, 0).some, None, None, None)

def fb(i: String, s: String) =
Barcodes(FoundBarcode(s.toCharArray, 0).some, None, FoundBarcode(i.toCharArray, 0).some, None)

test("it works") {
val iterable = DmuxedIterable(
List(
None -> List("AAAAAAAAAA", "AAAAAAAAAC", "AAAAAAAAAG"),
Some("CTCGAG") -> List("AAAAAAAAAA", "AACCCCGGTT", "AATTGGTTAA")
)
)

val src = new DmuxedBarcodeSource(iterable, rowPolicy, None)
assertEquals(
src.toList,
List(
fb("AAAAAAAAAA"),
fb("AAAAAAAAAC"),
fb("AAAAAAAAAG"),
fb("CTCGAG", "AAAAAAAAAA"),
fb("CTCGAG", "AACCCCGGTT"),
fb("CTCGAG", "AATTGGTTAA")
)
)
}

test("nothing works") {
val iterable = DmuxedIterable(Nil)
val src = new DmuxedBarcodeSource(iterable, rowPolicy, None)
assertEquals(src.toList, Nil)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2022 The Broad Institute, Inc. All rights reserved.
*
* SPDX-License-Identifier: BSD-3-Clause
*/
package org.broadinstitute.gpp.poolq3.barcode

import cats.syntax.all._
import munit.FunSuite
import org.broadinstitute.gpp.poolq3.parser.DmuxedIterable
import org.broadinstitute.gpp.poolq3.types.ReadIdCheckPolicy

class DmuxedPairedEndBarcodeSourceTest extends FunSuite {

private[this] val rowPolicy = BarcodePolicy("FIXED@0", 4, skipShortReads = true)
private[this] val revRowPolicy = BarcodePolicy("FIXED@0", 3, skipShortReads = true)

def fb(r1: String, r2: String) =
Barcodes(FoundBarcode(r1.toCharArray, 0).some, FoundBarcode(r2.toCharArray, 0).some, None, None)

def fb(i: String, r1: String, r2: String) =
Barcodes(
FoundBarcode(r1.toCharArray, 0).some,
FoundBarcode(r2.toCharArray, 0).some,
FoundBarcode(i.toCharArray, 0).some,
None
)

test("it works") {
val iter1 =
DmuxedIterable(List(None -> List("AAAA", "CCCC", "GGGG"), Some("CTCGAG") -> List("TTAA", "CCGG", "AATT")))

val iter2 = DmuxedIterable(List(None -> List("AGA", "CTC", "GAG"), Some("CTCGAG") -> List("TGT", "CAC", "TCT")))

val src = new DmuxedPairedEndBarcodeSource(iter1, iter2, rowPolicy, revRowPolicy, None, ReadIdCheckPolicy.Lax)
assertEquals(
src.toList,
List(
fb("AAAA", "AGA"),
fb("CCCC", "CTC"),
fb("GGGG", "GAG"),
fb("CTCGAG", "TTAA", "TGT"),
fb("CTCGAG", "CCGG", "CAC"),
fb("CTCGAG", "AATT", "TCT")
)
)
}

test("nothing works") {
val i1 = DmuxedIterable(Nil)
val i2 = DmuxedIterable(Nil)
val src = new DmuxedPairedEndBarcodeSource(i1, i2, rowPolicy, revRowPolicy, None, ReadIdCheckPolicy.Illumina)
assertEquals(src.toList, Nil)
}

}
Loading