Skip to content
This repository has been archived by the owner on Nov 28, 2020. It is now read-only.

Commit

Permalink
disq support for coverage (#146)
Browse files Browse the repository at this point in the history
* disq support for coverage

* added unit tests for disq

* disq support for coverage

* added unit tests for disq

* Bug fixing reading BAMs and CRAMs

* Fixing gkl logging

* Disable tests in publishing phase

* Changing defaut ValidationStringency to LENIENT

* Changing defaut ValidationStringency to SILENT

* Docs update
  • Loading branch information
mwiewior authored Apr 14, 2019
1 parent b7719a8 commit 793a585
Show file tree
Hide file tree
Showing 26 changed files with 373 additions and 1,330 deletions.
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ node {

echo "branch: ${env.BRANCH_NAME}"
echo 'Publishing to ZSI-BIO snapshots repository....'
sh "SBT_OPTS='-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=2G -Xmx2G' ${tool name: 'sbt-0.13.15', type: 'org.jvnet.hudson.plugins.SbtPluginBuilder$SbtInstallation'}/bin/sbt publish"
sh "SBT_OPTS='-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=2G -Xmx2G' ${tool name: 'sbt-0.13.15', type: 'org.jvnet.hudson.plugins.SbtPluginBuilder$SbtInstallation'}/bin/sbt 'set test in publish := {}' publish"


}
Expand Down
23 changes: 19 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
import sbtassembly.AssemblyPlugin.autoImport.ShadeRule

import scala.util.Properties

name := """bdg-sequila"""

version := "0.5.3-spark-2.4.0-SNAPSHOT"
version := "0.5.4-spark-2.4.1-SNAPSHOT"

organization := "org.biodatageeks"

scalaVersion := "2.11.8"

val DEFAULT_SPARK_2_VERSION = "2.4.0"
val DEFAULT_SPARK_2_VERSION = "2.4.1"
val DEFAULT_HADOOP_VERSION = "2.6.5"


lazy val sparkVersion = Properties.envOrElse("SPARK_VERSION", DEFAULT_SPARK_2_VERSION)
lazy val hadoopVersion = Properties.envOrElse("SPARK_HADOOP_VERSION", DEFAULT_HADOOP_VERSION)



libraryDependencies += "org.seqdoop" % "hadoop-bam" % "7.10.0"

dependencyOverrides += "com.google.guava" % "guava" % "15.0"
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % hadoopVersion
Expand All @@ -39,7 +41,7 @@ libraryDependencies += "org.rogach" %% "scallop" % "3.1.2"

libraryDependencies += "org.hammerlab.bdg-utils" %% "cli" % "0.3.0"

libraryDependencies += "com.github.samtools" % "htsjdk" % "2.18.2"
libraryDependencies += "com.github.samtools" % "htsjdk" % "2.19.0"


libraryDependencies += "ch.cern.sparkmeasure" %% "spark-measure" % "0.13" excludeAll (ExclusionRule("org.apache.hadoop"))
Expand All @@ -61,6 +63,8 @@ libraryDependencies += "org.apache.derby" % "derbyclient" % "10.14.2.0"

libraryDependencies += "org.biodatageeks" % "bdg-performance_2.11" % "0.2-SNAPSHOT" excludeAll (ExclusionRule("org.apache.hadoop"))

libraryDependencies += "org.disq-bio" % "disq" % "0.3.0"




Expand Down Expand Up @@ -94,6 +98,17 @@ resolvers ++= Seq(
"Hortonworks" at "http://repo.hortonworks.com/content/repositories/releases/"
)

//logLevel in assembly := Level.Debug

//fix for hdtsdjk patch in hadoop-bam and disq
assemblyShadeRules in assembly := Seq(
ShadeRule.rename("htsjdk.samtools.SAMRecordHelper" -> "htsjdk.samtools.SAMRecordHelperDisq").inLibrary("org.disq-bio" % "disq" % "0.3.0"),
ShadeRule.rename("htsjdk.samtools.SAMRecordHelper" -> "htsjdk.samtools.SAMRecordHelperHadoopBAM").inLibrary("org.seqdoop" % "hadoop-bam" % "7.10.0")

)




assemblyMergeStrategy in assembly := {
case PathList("org", "apache", xs@_*) => MergeStrategy.first
Expand Down
2 changes: 2 additions & 0 deletions docs/source/fileformats/fileformats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ In order to start using optimized Intel inflater library you need simply to set
ss.sqlContext.setConf("spark.biodatageeks.bam.useGKLInflate","true")
ss.sql(...)
Swappable alignment file reading mechanism
====================================================================
ADAM
Expand Down
8 changes: 7 additions & 1 deletion docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ SeQuiLa is an ANSI-SQL compliant solution for efficient genomic intervals queryi

* SeQuiLa is scalable:

- implemented in Scala in Apache Spark 2.2 environment
- implemented in Scala in Apache Spark 2.4.x environment
- can be run on single computer (locally) or Hadoop cluster using YARN


Expand Down Expand Up @@ -86,6 +86,12 @@ Example.scala
.. rubric:: Release notes:

0.5.4 (2019-04-13)
- support for Apache Spark: 2.4.1 and HDP 2.3.2.3.1.0.0-78
- support for `disq <https://github.com/disq-bio/disq>`_ for reading BAM and CRAM files in coverage calculations
- `swappable alignment file read mechanism <fileformats/fileformats.html#swappable-alignment-file-reading-mechanism>`_ (``spark.biodatageeks.readAligment.method`` parameter defaults to "hadoopBAM")
- support for `long reads <usecases/usecases.html#nanopore-long-reads-from-wgs-analyses>`_ (e.g. Nanopore) using disq library

0.5

- new result type (fixed lenght windows) for depth of coverage calculations
Expand Down
49 changes: 49 additions & 0 deletions docs/source/usecases/usecases.rst
Original file line number Diff line number Diff line change
Expand Up @@ -707,3 +707,52 @@ Simple Multisample analyses
.option("header", "true")
.option("delimiter", "\t")
.csv("/data/input/fc.txt")
Nanopore long reads from WGS analyses
#####################################

.. code-block:: bash
imwiewior@cdh00:/data/work/nanopore_bam/minimap> hdfs dfs -du -h /data/granges/nanopore/* | grep sorted
130.9 G 261.9 G /data/granges/nanopore/NA12878-Albacore2.1.sorted.bam
126.5 G 253.0 G /data/granges/nanopore/rel5-guppy-0.3.0-chunk10k.sorted.bam
51.2 M 102.4 M /data/granges/nanopore/rel5-guppy-0.3.0-chunk10k.sorted.bam.bai
.. code-block:: scala
import org.apache.spark.sql.SequilaSession
import org.biodatageeks.utils.{SequilaRegister, UDFRegister,BDGInternalParams}
val ss = SequilaSession(spark)
SequilaRegister.register(ss)
/*enable disq support*/
ss.sqlContext.setConf("spark.biodatageeks.readAligment.method", "disq")
/* WGS -bases-blocks*/
ss.sql("""
CREATE TABLE IF NOT EXISTS reads_nanopore
USING org.biodatageeks.datasources.BAM.BAMDataSource
OPTIONS(path '/data/granges/nanopore/*sorted*.bam')
""".stripMargin)
ss.sql("select distinct sampleId from reads_nanopore").show
+--------------------------------+
| sampleId|
+--------------------------------+
| NA12878-Albacore2.1.sorted|
|rel5-guppy-0.3.0-chunk10k.sorted|
+--------------------------------+
/*Albacore mapper*/
spark.time{
ss.sql(s"SELECT * FROM bdg_coverage('reads_nanopore','NA12878-Albacore2.1.sorted', 'blocks')").write.format("parquet").save("/tmp/NA12878-Albacore2.1.sorted.parquet")}
/*guppy mapper*/
spark.time{
ss.sql(s"SELECT * FROM bdg_coverage('reads_nanopore','rel5-guppy-0.3.0-chunk10k.sorted', 'blocks')").write.format("parquet").save("/tmp/rel5-guppy-0.3.0-chunk10k.sorted.parquet")}
7 changes: 7 additions & 0 deletions releasing/release.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env bash

#publish
sbt "set test in publish := {}" clean publish

#assembly
sbt "set test in assembly := {}" clean assembly
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ case class BDGSAMRecord(sampleId: String,
trait BDGAlignFileReaderWriter [T <: BDGAlignInputFormat]{



// val bdgSerialize = new BDGSerializer()
//val serializer = new BDGFastSerializer()
val confMap = new mutable.HashMap[String,String]()
Expand Down Expand Up @@ -104,33 +105,67 @@ trait BDGAlignFileReaderWriter [T <: BDGAlignInputFormat]{

spark
.sparkContext
.hadoopConfiguration.set(SAMHeaderReader.VALIDATION_STRINGENCY_PROPERTY, ValidationStringency.SILENT.toString)
.hadoopConfiguration.set(SAMHeaderReader.VALIDATION_STRINGENCY_PROPERTY, ValidationStringency.LENIENT.toString)
}

def readBAMFile(@transient sqlContext: SQLContext, path: String)(implicit c: ClassTag[T]) = {
def readBAMFile(@transient sqlContext: SQLContext, path: String, refPath: Option[String] = None)(implicit c: ClassTag[T]) = {

val logger = Logger.getLogger(this.getClass.getCanonicalName)
setLocalConf(sqlContext)
setConf("spark.biodatageeks.bam.intervals","") //FIXME: disabled PP
setHadoopConf(sqlContext)




val spark = sqlContext
.sparkSession
val resolvedPath = BDGTableFuncs.getExactSamplePath(spark,path)
// val folderPath = BDGTableFuncs.getParentFolderPath(spark,path)
logger.info(s"######## Reading ${resolvedPath} or ${path}")
val alignReadMethod = spark.sqlContext.getConf(BDGInternalParams.IOReadAlignmentMethod,"hadoopBAM").toLowerCase
logger.info(s"######## Using ${alignReadMethod} for reading alignment files.")

alignReadMethod match {
case "hadoopbam" => {
logger.info(s"Using Intel GKL inflater: ${BDGInternalParams.UseIntelGKL}")
spark.sparkContext
.newAPIHadoopFile[LongWritable, SAMRecordWritable, T](path)
.map(r => r._2.get())
}
case "sparkbam" => {
import spark_bam._, hammerlab.path._
val bamPath = Path(resolvedPath)
spark
.sparkContext
.loadReads(bamPath)
}

if(!spark.sqlContext.getConf("spark.biodatageeks.bam.useSparkBAM","false").toBoolean)
spark.sparkContext
.newAPIHadoopFile[LongWritable, SAMRecordWritable, T](path)
.map(r => r._2.get())
else{
import spark_bam._, hammerlab.path._
val bamPath = Path(resolvedPath)
spark
.sparkContext
.loadReads(bamPath)
case "disq" => {
import org.disq_bio.disq.HtsjdkReadsRddStorage

refPath match {
case Some(ref) => {
HtsjdkReadsRddStorage
.makeDefault(sqlContext.sparkContext)
.validationStringency(ValidationStringency.LENIENT)
.referenceSourcePath(ref)
.read(resolvedPath)
.getReads
.rdd
}
case None => {
HtsjdkReadsRddStorage
.makeDefault(sqlContext.sparkContext)
.validationStringency(ValidationStringency.LENIENT)
.read(resolvedPath)
.getReads
.rdd
}
}
}
}


}


Expand Down Expand Up @@ -275,7 +310,7 @@ class BDGAlignmentRelation[T <:BDGAlignInputFormat](path:String, refPath:Option[
sqlContext
.sparkContext
.hadoopConfiguration
.set(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY,p)
.set(CRAMBDGInputFormat.REFERENCE_SOURCE_PATH_PROPERTY,p)
}
case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ private List<InputSplit> filterByInterval(List<InputSplit> splits, Configuration
.setOption(SamReaderFactory.Option.EAGERLY_DECODE, false)
.setUseAsyncIo(false);
if(conf.get(INFLATE_FACTORY) != null && conf.get(INFLATE_FACTORY).equalsIgnoreCase("intel_gkl")){
System.out.println("Using Intel GKL Inflater");
IntelInflaterFactory intelDeflaterFactory = new IntelInflaterFactory();
readerFactory.inflaterFactory(intelDeflaterFactory);
}
Expand Down
75 changes: 0 additions & 75 deletions src/main/scala/org/biodatageeks/inputformats/BDGContainer.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private static long nextContainerOffset(List<Long> containerOffsets, long positi

@Override
public RecordReader<LongWritable, SAMRecordWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
RecordReader<LongWritable, SAMRecordWritable> rr = new org.seqdoop.hadoop_bam.CRAMBDGRecordReader();
RecordReader<LongWritable, SAMRecordWritable> rr = new CRAMRecordReader();
rr.initialize(split, context);
return rr;
}
Expand Down
Loading

0 comments on commit 793a585

Please sign in to comment.