From d8a8d0c8d406aec6bed8b32015eb3ee01bc1cbd7 Mon Sep 17 00:00:00 2001 From: Tom White Date: Mon, 10 Sep 2018 10:27:34 +0100 Subject: [PATCH 1/5] Upgrade to htsjdk 2.16.1 --- pom.xml | 2 +- .../disq/impl/formats/bam/HeaderlessBamOutputFormat.java | 3 ++- .../disq/impl/formats/bgzf/BGZFCompressionOutputStream.java | 5 +++-- .../org/disq_bio/disq/impl/formats/vcf/VcfOutputFormat.java | 3 ++- .../java/org/disq_bio/disq/impl/formats/vcf/VcfSink.java | 4 +++- 5 files changed, 11 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index 6edf746..6cd4762 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ 1.8 1.8 - 2.16.0 + 2.16.1 2.7.0 2.2.2 diff --git a/src/main/java/org/disq_bio/disq/impl/formats/bam/HeaderlessBamOutputFormat.java b/src/main/java/org/disq_bio/disq/impl/formats/bam/HeaderlessBamOutputFormat.java index 9a83d3d..ca91542 100644 --- a/src/main/java/org/disq_bio/disq/impl/formats/bam/HeaderlessBamOutputFormat.java +++ b/src/main/java/org/disq_bio/disq/impl/formats/bam/HeaderlessBamOutputFormat.java @@ -5,6 +5,7 @@ import htsjdk.samtools.SAMRecord; import htsjdk.samtools.util.BinaryCodec; import htsjdk.samtools.util.BlockCompressedOutputStream; +import java.io.File; import java.io.IOException; import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; @@ -31,7 +32,7 @@ static class BamRecordWriter extends RecordWriter { public BamRecordWriter(Configuration conf, Path file, SAMFileHeader header) throws IOException { this.out = file.getFileSystem(conf).create(file); - BlockCompressedOutputStream compressedOut = new BlockCompressedOutputStream(out, null); + BlockCompressedOutputStream compressedOut = new BlockCompressedOutputStream(out, (File) null); binaryCodec = new BinaryCodec(compressedOut); bamRecordCodec = new BAMRecordCodec(header); bamRecordCodec.setOutputStream(compressedOut); diff --git a/src/main/java/org/disq_bio/disq/impl/formats/bgzf/BGZFCompressionOutputStream.java b/src/main/java/org/disq_bio/disq/impl/formats/bgzf/BGZFCompressionOutputStream.java index e2241a3..b853df3 100644 --- a/src/main/java/org/disq_bio/disq/impl/formats/bgzf/BGZFCompressionOutputStream.java +++ b/src/main/java/org/disq_bio/disq/impl/formats/bgzf/BGZFCompressionOutputStream.java @@ -1,6 +1,7 @@ package org.disq_bio.disq.impl.formats.bgzf; import htsjdk.samtools.util.BlockCompressedOutputStream; +import java.io.File; import java.io.IOException; import java.io.OutputStream; import org.apache.hadoop.io.compress.CompressionOutputStream; @@ -18,7 +19,7 @@ public class BGZFCompressionOutputStream extends CompressionOutputStream { public BGZFCompressionOutputStream(OutputStream out) throws IOException { super(out); - this.output = new BlockCompressedOutputStream(out, null); + this.output = new BlockCompressedOutputStream(out, (File) null); } public void write(int b) throws IOException { @@ -35,7 +36,7 @@ public void finish() throws IOException { public void resetState() throws IOException { output.flush(); - output = new BlockCompressedOutputStream(out, null); + output = new BlockCompressedOutputStream(out, (File) null); } public void close() throws IOException { diff --git a/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfOutputFormat.java b/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfOutputFormat.java index ceabc57..be5ae48 100644 --- a/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfOutputFormat.java +++ b/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfOutputFormat.java @@ -5,6 +5,7 @@ import htsjdk.variant.variantcontext.writer.VariantContextWriter; import htsjdk.variant.variantcontext.writer.VariantContextWriterBuilder; import htsjdk.variant.vcf.VCFHeader; +import java.io.File; import java.io.IOException; import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; @@ -33,7 +34,7 @@ public VcfRecordWriter(Configuration conf, Path file, VCFHeader header, String e boolean compressed = extension.endsWith(BGZFCodec.DEFAULT_EXTENSION) || extension.endsWith(".gz"); if (compressed) { - out = new BlockCompressedOutputStream(out, null); + out = new BlockCompressedOutputStream(out, (File) null); } variantContextWriter = new VariantContextWriterBuilder().clearOptions().setOutputVCFStream(out).build(); diff --git a/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfSink.java b/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfSink.java index 60ef594..7295b96 100644 --- a/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfSink.java +++ b/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfSink.java @@ -8,6 +8,7 @@ import htsjdk.variant.variantcontext.writer.VariantContextWriterBuilder; import htsjdk.variant.vcf.VCFEncoder; import htsjdk.variant.vcf.VCFHeader; +import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.util.Iterator; @@ -50,7 +51,8 @@ public void save( String headerFile = tempPartsDirectory + "/header" + (compressed ? BGZFCodec.DEFAULT_EXTENSION : ""); try (OutputStream headerOut = fileSystemWrapper.create(jsc.hadoopConfiguration(), headerFile)) { - OutputStream out = compressed ? new BlockCompressedOutputStream(headerOut, null) : headerOut; + OutputStream out = + compressed ? new BlockCompressedOutputStream(headerOut, (File) null) : headerOut; VariantContextWriter writer = new VariantContextWriterBuilder().clearOptions().setOutputVCFStream(out).build(); writer.writeHeader(vcfHeader); From 9a4eef848258234c22d70f7cb7a1d465d1836143 Mon Sep 17 00:00:00 2001 From: Tom White Date: Mon, 10 Sep 2018 11:00:12 +0100 Subject: [PATCH 2/5] Use all available cores for testing --- README.md | 2 +- src/test/java/org/disq_bio/disq/BaseTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index b5ce5d5..fa40dd0 100644 --- a/README.md +++ b/README.md @@ -246,7 +246,7 @@ The following tests all the [GATK 'large' files](https://github.com/broadinstitu ``` mvn verify \ -Ddisq.test.real.world.files.dir=/home/gatk/src/test/resources/large \ - -Ddisq.test.real.world.files.ref=/homegatk/src/test/resources/large/human_g1k_v37.20.21.fasta \ + -Ddisq.test.real.world.files.ref=/home/gatk/src/test/resources/large/human_g1k_v37.20.21.fasta \ -Ddisq.samtools.bin=/path/to/bin/samtools \ -Ddisq.bcftools.bin=/path/to/bin/bcftools ``` diff --git a/src/test/java/org/disq_bio/disq/BaseTest.java b/src/test/java/org/disq_bio/disq/BaseTest.java index 36a4a79..cc54f2c 100644 --- a/src/test/java/org/disq_bio/disq/BaseTest.java +++ b/src/test/java/org/disq_bio/disq/BaseTest.java @@ -20,7 +20,7 @@ public abstract class BaseTest { public static void setup() { SparkConf sparkConf = new SparkConf(); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - jsc = new JavaSparkContext("local", "myapp", sparkConf); + jsc = new JavaSparkContext("local[*]", "myapp", sparkConf); } @AfterClass From 8f6dfc1f0f7260ba3bacc05625ea22e7b594495f Mon Sep 17 00:00:00 2001 From: Tom White Date: Tue, 4 Sep 2018 17:34:01 +0100 Subject: [PATCH 3/5] VCFCodec is not thread-safe, so use a separate instance for each partition. --- .../org/disq_bio/disq/impl/formats/vcf/VcfSource.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfSource.java b/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfSource.java index 1b410e8..ba7e79a 100644 --- a/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfSource.java +++ b/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfSource.java @@ -14,6 +14,7 @@ import htsjdk.variant.variantcontext.VariantContext; import htsjdk.variant.vcf.VCFCodec; import htsjdk.variant.vcf.VCFHeader; +import htsjdk.variant.vcf.VCFHeaderVersion; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; @@ -96,14 +97,18 @@ public JavaRDD getVariants( } enableBGZFCodecs(conf); - Broadcast vcfCodecBroadcast = jsc.broadcast(getVCFCodec(jsc, path)); + VCFCodec vcfCodec = getVCFCodec(jsc, path); + VCFHeader header = vcfCodec.getHeader(); + VCFHeaderVersion version = vcfCodec.getVersion(); + Broadcast headerBroadcast = jsc.broadcast(header); Broadcast> intervalsBroadcast = intervals == null ? null : jsc.broadcast(intervals); return textFile(jsc, conf, path, intervals) .mapPartitions( (FlatMapFunction, VariantContext>) lines -> { - VCFCodec codec = vcfCodecBroadcast.getValue(); + VCFCodec codec = new VCFCodec(); + codec.setVCFHeader(headerBroadcast.getValue(), version); final OverlapDetector overlapDetector = intervalsBroadcast == null ? null From a62a09fdc3d99b9473eb38c77f01e1e29c9b2cb6 Mon Sep 17 00:00:00 2001 From: Tom White Date: Mon, 10 Sep 2018 16:56:47 +0100 Subject: [PATCH 4/5] Add comments --- src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfSource.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfSource.java b/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfSource.java index ba7e79a..8666cd8 100644 --- a/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfSource.java +++ b/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfSource.java @@ -99,6 +99,7 @@ public JavaRDD getVariants( VCFCodec vcfCodec = getVCFCodec(jsc, path); VCFHeader header = vcfCodec.getHeader(); + // get the version separately since htsjdk doesn't provide a way to get it from the header VCFHeaderVersion version = vcfCodec.getVersion(); Broadcast headerBroadcast = jsc.broadcast(header); Broadcast> intervalsBroadcast = intervals == null ? null : jsc.broadcast(intervals); @@ -107,6 +108,7 @@ public JavaRDD getVariants( .mapPartitions( (FlatMapFunction, VariantContext>) lines -> { + // VCFCodec is not threadsafe, so create a new one for each task VCFCodec codec = new VCFCodec(); codec.setVCFHeader(headerBroadcast.getValue(), version); final OverlapDetector overlapDetector = From d3370a1d1f135baeb33ca17955c107b1800e50a6 Mon Sep 17 00:00:00 2001 From: Tom White Date: Thu, 13 Sep 2018 11:23:09 +0100 Subject: [PATCH 5/5] Make VCFCodec final --- .../disq_bio/disq/impl/formats/vcf/VcfSource.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfSource.java b/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfSource.java index 8666cd8..a08935b 100644 --- a/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfSource.java +++ b/src/main/java/org/disq_bio/disq/impl/formats/vcf/VcfSource.java @@ -97,19 +97,20 @@ public JavaRDD getVariants( } enableBGZFCodecs(conf); - VCFCodec vcfCodec = getVCFCodec(jsc, path); - VCFHeader header = vcfCodec.getHeader(); + final VCFCodec vcfCodec = getVCFCodec(jsc, path); + final VCFHeader header = vcfCodec.getHeader(); // get the version separately since htsjdk doesn't provide a way to get it from the header - VCFHeaderVersion version = vcfCodec.getVersion(); - Broadcast headerBroadcast = jsc.broadcast(header); - Broadcast> intervalsBroadcast = intervals == null ? null : jsc.broadcast(intervals); + final VCFHeaderVersion version = vcfCodec.getVersion(); + final Broadcast headerBroadcast = jsc.broadcast(header); + final Broadcast> intervalsBroadcast = + intervals == null ? null : jsc.broadcast(intervals); return textFile(jsc, conf, path, intervals) .mapPartitions( (FlatMapFunction, VariantContext>) lines -> { // VCFCodec is not threadsafe, so create a new one for each task - VCFCodec codec = new VCFCodec(); + final VCFCodec codec = new VCFCodec(); codec.setVCFHeader(headerBroadcast.getValue(), version); final OverlapDetector overlapDetector = intervalsBroadcast == null