Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Demultiplexed FASTQ support #23

Merged
merged 16 commits into from
Sep 6, 2023
Merged
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 3.7.0
* Support for processing demultiplexed FASTQ files

## 3.6.4
No user-facing changes

Expand Down
322 changes: 192 additions & 130 deletions docs/MANUAL.md

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions src/main/scala/org/broadinstitute/gpp/poolq3/PoolQ.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ object PoolQ {
config.skipShortReads
)

val colBarcodePolicy =
BarcodePolicy(config.colBarcodePolicyStr, colReferenceData.barcodeLength, config.skipShortReads)
val colBarcodePolicyOpt =
config.colBarcodePolicyStr.map(pol => BarcodePolicy(pol, colReferenceData.barcodeLength, config.skipShortReads))

val umiInfo = (config.input.umiReference, config.umiBarcodePolicyStr).mapN { (r, p) =>
log.info("Reading UMI reference data")
Expand All @@ -102,10 +102,11 @@ object PoolQ {
)

log.info("Building column reference")
val colBarcodeLength = colBarcodePolicyOpt.map(_.length).getOrElse(colReferenceData.barcodeLength)
val colReference: Reference =
referenceFor(
config.colMatchFn,
ReferenceData.truncator(colBarcodePolicy.length),
ReferenceData.truncator(colBarcodeLength),
config.countAmbiguous,
colReferenceData.mappings
)
Expand All @@ -116,7 +117,7 @@ object PoolQ {
}

val barcodes: CloseableIterable[Barcodes] =
barcodeSource(config.input, rowBarcodePolicy, revRowBarcodePolicyOpt, colBarcodePolicy, umiInfo.map(_._2))
barcodeSource(config.input, rowBarcodePolicy, revRowBarcodePolicyOpt, colBarcodePolicyOpt, umiInfo.map(_._2))

lazy val unexpectedSequenceCacheDir: Option[Path] =
if (config.skipUnexpectedSequenceReport) None
Expand Down Expand Up @@ -214,7 +215,7 @@ object PoolQ {
rowReferenceData: ReferenceData,
rowBarcodePolicyStr: String,
reverseRowBarcodePolicyStr: Option[String],
reverseRowReads: Option[Path],
reverseRowReads: Option[(Option[String], Path)],
skipShortReads: Boolean
): (BarcodePolicy, Option[BarcodePolicy], Int) =
(reverseRowBarcodePolicyStr, reverseRowReads)
Expand Down
109 changes: 77 additions & 32 deletions src/main/scala/org/broadinstitute/gpp/poolq3/PoolQConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,41 @@ final case class PoolQInput(
colReference: Path = DefaultPath,
umiReference: Option[Path] = None,
globalReference: Option[Path] = None,
rowReads: Option[Path] = None,
reverseRowReads: Option[Path] = None,
rowReads: Option[(Option[String], Path)] = None,
reverseRowReads: Option[(Option[String], Path)] = None,
colReads: Option[Path] = None,
reads: Option[Path] = None,
reads: Option[(Option[String], Path)] = None,
readIdCheckPolicy: ReadIdCheckPolicy = ReadIdCheckPolicy.Strict,
// these are companion to rowReads, reverseRowReads, colReads, and reads
// they are added thusly to retain source compatibility with the old object
addlRowReads: List[Path] = Nil,
addlReverseRowReads: List[Path] = Nil,
addlRowReads: List[(Option[String], Path)] = Nil,
addlReverseRowReads: List[(Option[String], Path)] = Nil,
addlColReads: List[Path] = Nil,
addlReads: List[Path] = Nil
addlReads: List[(Option[String], Path)] = Nil,
demultiplexed: Boolean = false
) {

def readsSourceE: Either[Exception, ReadsSource] = (rowReads, reverseRowReads, colReads, reads) match {
case (None, None, None, Some(r)) => Right(ReadsSource.SelfContained(Nel(r, addlReads)))
case (Some(rr), None, Some(cr), None) =>
val rs = ReadsSource.Split(Nel(cr, addlColReads), Nel(rr, addlRowReads))
def readsSourceE: Either[Exception, ReadsSource] = (rowReads, reverseRowReads, colReads, reads, demultiplexed) match {
case (None, None, None, Some(r), false) =>
Right(ReadsSource.SelfContained(Nel(r._2, addlReads.view.map(_._2).toList)))
case (Some(rr), None, Some(cr), None, false) =>
val rs = ReadsSource.Split(Nel(cr, addlColReads), Nel(rr._2, addlRowReads.view.map(_._2).toList))
if (rs.forward.length == rs.index.length) Right(rs)
else Left(PoolQException("Number of row, column, and reverse reads files must match"))
case (Some(rr), Some(rrr), Some(cr), None) =>
val rs = ReadsSource.PairedEnd(Nel(cr, addlColReads), Nel(rr, addlRowReads), Nel(rrr, addlReverseRowReads))
case (Some(rr), Some(rrr), Some(cr), None, false) =>
val rs = ReadsSource.PairedEnd(
Nel(cr, addlColReads),
Nel(rr._2, addlRowReads.view.map(_._2).toList),
Nel(rrr._2, addlReverseRowReads.view.map(_._2).toList)
)
if (rs.forward.length == rs.index.length && rs.forward.length == rs.reverse.length) Right(rs)
else Left(PoolQException("Number of row and column reads files must match"))
case (None, None, None, Some(r), true) =>
Right(ReadsSource.Dmuxed(Nel(r, addlReads)))
case (Some(rr), Some(rrr), None, None, true) =>
val rs = ReadsSource.DmuxedPairedEnd(Nel(rr, addlRowReads), Nel(rrr, addlReverseRowReads))
if (rs.read1.map(_._1) == rs.read2.map(_._1)) Right(rs)
else Left(PoolQException("Row and column reads files must match"))
case _ => Left(PoolQException("Conflicting input options"))
}

Expand Down Expand Up @@ -73,7 +85,7 @@ final case class PoolQConfig(
countAmbiguous: Boolean = false,
rowBarcodePolicyStr: String = "",
reverseRowBarcodePolicyStr: Option[String] = None,
colBarcodePolicyStr: String = "",
colBarcodePolicyStr: Option[String] = None,
umiBarcodePolicyStr: Option[String] = None,
skipUnexpectedSequenceReport: Boolean = false,
unexpectedSequenceCacheDir: Option[Path] = None,
Expand All @@ -96,6 +108,8 @@ final case class PoolQConfig(

object PoolQConfig {

private[poolq3] val BarcodePathRegex = "([ACGT]+):(.+)".r

private[poolq3] val DefaultPath = Paths.get(".")

implicit private[this] val readPath: Read[Path] = implicitly[Read[File]].map(_.toPath)
Expand All @@ -107,6 +121,16 @@ object PoolQConfig {
}
}

implicit private[this] val readBarcodePaths: Read[List[(Option[String], Path)]] = implicitly[Read[Seq[String]]].map {
args =>
args.view.map { arg =>
arg match {
case BarcodePathRegex(bc, pathStr) => (Option(bc), Paths.get(pathStr))
case _ => (None, Paths.get(arg))
}
}.toList
}

implicit private[this] val readReadIdCheckPolicy: Read[ReadIdCheckPolicy] =
implicitly[Read[String]].map(ReadIdCheckPolicy.forName)

Expand Down Expand Up @@ -143,37 +167,41 @@ object PoolQConfig {
c.copy(input = c.input.copy(globalReference = Some(f.toPath)))
}

val _ = opt[(Path, List[Path])]("row-reads")
val _ = opt[List[(Option[String], Path)]]("row-reads")
.valueName("<files>")
.action { case ((p, ps), c) => c.copy(input = c.input.copy(rowReads = Some(p), addlRowReads = ps)) }
.action { case (ps, c) => c.copy(input = c.input.copy(rowReads = ps.headOption, addlRowReads = ps.drop(1))) }
.text("required if reads are split between two files")
.validate { case (p, ps) => (p :: ps).traverse_(existsAndIsReadable) }
.validate(_.view.map(_._2).toList.traverse_(existsAndIsReadable))

val _ = opt[(Path, List[Path])]("rev-row-reads")
val _ = opt[List[(Option[String], Path)]]("rev-row-reads")
.valueName("<files>")
.action { case ((p, ps), c) =>
c.copy(input = c.input.copy(reverseRowReads = Some(p), addlReverseRowReads = ps))
.action { case (ps, c) =>
c.copy(input = c.input.copy(reverseRowReads = ps.headOption, addlReverseRowReads = ps.drop(1)))
}
.text("required for processing paired-end sequencing data")
.validate { case (p, ps) => (p :: ps).traverse_(existsAndIsReadable) }
.validate(_.view.map(_._2).toList.traverse_(existsAndIsReadable))

val _ = opt[(Path, List[Path])]("col-reads")
.valueName("<files>")
.action { case ((p, ps), c) => c.copy(input = c.input.copy(colReads = Some(p), addlColReads = ps)) }
.text("required if reads are split between two files")
.validate { case (p, ps) => (p :: ps).traverse_(existsAndIsReadable) }

val _ = opt[(Path, List[Path])]("reads")
val _ = opt[List[(Option[String], Path)]]("reads")
.valueName("<files>")
.action { case ((p, ps), c) => c.copy(input = c.input.copy(reads = Some(p), addlReads = ps)) }
.action { case (ps, c) => c.copy(input = c.input.copy(reads = ps.headOption, addlReads = ps.drop(1))) }
.text("required if reads are contained in a single file")
.validate { case (p, ps) => (p :: ps).traverse_(existsAndIsReadable) }
.validate(_.view.map(_._2).toList.traverse_(existsAndIsReadable))

val _ = opt[ReadIdCheckPolicy]("read-id-check-policy")
.valueName("<policy>")
.action((p, c) => c.copy(input = c.input.copy(readIdCheckPolicy = p)))
.text("read ID check policy; one of [lax, strict, illumina]")

val _ = opt[Unit]("demultiplexed")
.action((_, c) => c.copy(input = c.input.copy(demultiplexed = true)))
.text("when true, expects demultiplexed FASTQ files")

val _ = opt[String]("row-matcher")
.valueName("<matcher>")
.action((m, c) => c.copy(rowMatchFn = m))
Expand All @@ -197,7 +225,7 @@ object PoolQConfig {
}

val _ = opt[String]("col-barcode-policy").valueName("<barcode-policy>").action { (p, c) =>
c.copy(colBarcodePolicyStr = p)
c.copy(colBarcodePolicyStr = Some(p))
}

val _ = opt[String]("umi-barcode-policy").valueName("<barcode-policy>").action { (p, c) =>
Expand Down Expand Up @@ -286,11 +314,13 @@ object PoolQConfig {
val _ = opt[Unit]("noop").hidden().action((_, c) => c.copy(noopConsumer = true))

val _ = checkConfig { c =>
val readsCheck = (c.input.reads, c.input.rowReads, c.input.colReads) match {
case (None, None, None) => failure("No reads files specified.")
case (None, None, Some(_)) =>
val readsCheck = (c.input.reads, c.input.rowReads, c.input.colReads, c.input.demultiplexed) match {
case (None, None, None, _) => failure("No reads files specified.")
case (None, None, Some(_), false) =>
failure("Column barcode file specified but no row barcodes file specified.")
case (None, Some(_), None) =>
case (_, _, Some(_), true) =>
failure("Column barcode file specified for demultiplexed reads.")
case (None, Some(_), None, false) =>
failure("Row barcode file specified but no column barcodes file specified.")
case _ => success
}
Expand Down Expand Up @@ -319,16 +349,31 @@ object PoolQConfig {
def files(name: String, path: Option[Path], addl: List[Path]): Option[(String, String)] =
path.map(file => (name, (file :: addl).map(_.getFileName.toString).mkString(",")))

def barcodeFiles(
name: String,
path: Option[(Option[String], Path)],
addl: List[(Option[String], Path)]
): Option[(String, String)] =
path.map { file =>
val barcodedFiles = (file :: addl)
.map { case (bcOpt, file) =>
val prefix = bcOpt.fold("")(bc => s"$bc:")
s"$prefix${file.getFileName.toString}"
}
.mkString(",")
(name, barcodedFiles)
}

// input files
val input = config.input
args += (("row-reference", input.rowReference.getFileName.toString))
args += (("col-reference", input.colReference.getFileName.toString))
umiInfo.map(_._1).foreach(file => args += (("umi-reference", file.getFileName.toString)))
input.globalReference.foreach(file => args += (("global-reference", file.getFileName.toString)))
files("row-reads", input.rowReads, input.addlRowReads).foreach(t => args += t)
files("rev-row-reads", input.reverseRowReads, input.addlReverseRowReads).foreach(t => args += t)
barcodeFiles("row-reads", input.rowReads, input.addlRowReads).foreach(t => args += t)
barcodeFiles("rev-row-reads", input.reverseRowReads, input.addlReverseRowReads).foreach(t => args += t)
files("col-reads", input.colReads, input.addlColReads).foreach(t => args += t)
files("reads", input.reads, input.addlReads).foreach(t => args += t)
barcodeFiles("reads", input.reads, input.addlReads).foreach(t => args += t)
args += (("read-id-check-policy", input.readIdCheckPolicy.name))

// run control
Expand All @@ -339,7 +384,7 @@ object PoolQConfig {
}
args += (("row-barcode-policy", config.rowBarcodePolicyStr))
config.reverseRowBarcodePolicyStr.foreach(p => args += (("rev-row-barcode-policy", p)))
args += (("col-barcode-policy", config.colBarcodePolicyStr))
config.colBarcodePolicyStr.foreach(pol => args += (("col-barcode-policy", pol)))
umiInfo.map(_._2).foreach(str => args += (("umi-barcode-policy", str)))

// deal with the unexpected sequence options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@ object ReadsSource {
final case class SelfContained(paths: Nel[Path]) extends ReadsSource
final case class Split(index: Nel[Path], forward: Nel[Path]) extends ReadsSource
final case class PairedEnd(index: Nel[Path], forward: Nel[Path], reverse: Nel[Path]) extends ReadsSource
final case class Dmuxed(read1: Nel[(Option[String], Path)]) extends ReadsSource

final case class DmuxedPairedEnd(read1: Nel[(Option[String], Path)], read2: Nel[(Option[String], Path)])
extends ReadsSource

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2022 The Broad Institute, Inc. All rights reserved.
*
* SPDX-License-Identifier: BSD-3-Clause
*/
package org.broadinstitute.gpp.poolq3.barcode

import org.broadinstitute.gpp.poolq3.parser.{CloseableIterable, CloseableIterator, DmuxedIterable}
import org.broadinstitute.gpp.poolq3.types.Read

final class DmuxedBarcodeSource(parser: DmuxedIterable, rowPolicy: BarcodePolicy, umiPolicyOpt: Option[BarcodePolicy])
extends CloseableIterable[Barcodes] {

private def colBarcodeOpt = parser.indexBarcode.map(bc => FoundBarcode(bc.toCharArray, 0))
mtomko marked this conversation as resolved.
Show resolved Hide resolved

private[this] class BarcodeIterator(iterator: CloseableIterator[Read]) extends CloseableIterator[Barcodes] {
override def hasNext: Boolean = iterator.hasNext

override def next(): Barcodes = {
val nextRead = iterator.next()
val rowBarcodeOpt = rowPolicy.find(nextRead)
val umiBarcodeOpt = umiPolicyOpt.flatMap(_.find(nextRead))
Barcodes(rowBarcodeOpt, None, colBarcodeOpt, umiBarcodeOpt)
}

override def close(): Unit = iterator.close()
}

override def iterator: CloseableIterator[Barcodes] = new BarcodeIterator(parser.iterator)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2022 The Broad Institute, Inc. All rights reserved.
*
* SPDX-License-Identifier: BSD-3-Clause
*/
package org.broadinstitute.gpp.poolq3.barcode

import org.broadinstitute.gpp.poolq3.parser.{CloseableIterable, CloseableIterator, DmuxedIterable}
import org.broadinstitute.gpp.poolq3.types.{Read, ReadIdCheckPolicy}

class DmuxedPairedEndBarcodeSource(
rowParser: DmuxedIterable,
revRowParser: DmuxedIterable,
rowPolicy: BarcodePolicy,
revRowPolicy: BarcodePolicy,
umiPolicyOpt: Option[BarcodePolicy],
readIdCheckPolicy: ReadIdCheckPolicy
) extends CloseableIterable[Barcodes] {

private def colBarcodeOpt = rowParser.indexBarcode.map(bc => FoundBarcode(bc.toCharArray, 0))
mtomko marked this conversation as resolved.
Show resolved Hide resolved

private[this] class BarcodeIterator(rowIterator: CloseableIterator[Read], revRowIterator: CloseableIterator[Read])
extends CloseableIterator[Barcodes] {

final override def hasNext: Boolean = rowIterator.hasNext && revRowIterator.hasNext

final override def next(): Barcodes = {
val nextRow = rowIterator.next()
val nextRevRow = revRowIterator.next()
readIdCheckPolicy.check(nextRow, nextRevRow)
val rowBarcodeOpt = rowPolicy.find(nextRow)
val revRowBarcodeOpt = revRowPolicy.find(nextRevRow)
val umiBarcodeOpt = umiPolicyOpt.flatMap(_.find(nextRow))
Barcodes(rowBarcodeOpt, revRowBarcodeOpt, colBarcodeOpt, umiBarcodeOpt)
}

final override def close(): Unit =
try rowIterator.close()
finally revRowIterator.close()

}

override def iterator: CloseableIterator[Barcodes] =
new BarcodeIterator(rowParser.iterator, revRowParser.iterator)

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ final class ThreeFileBarcodeSource(

final override def close(): Unit =
try rowIterator.close()
finally colIterator.close()
finally
try revRowIterator.close()
finally colIterator.close()

}

Expand Down
Loading