Skip to content

Commit

Permalink
Merge pull request #1488 from broadinstitute/cn_spark_write_cram
Browse files Browse the repository at this point in the history
Write CRAM on Spark.
  • Loading branch information
cmnbroad committed Mar 3, 2016
2 parents 44f4a1e + 55faeb8 commit 3176a26
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,15 @@ public JavaRDD<GATKRead> getUnfilteredReads() {
/**
* Writes the reads from a {@link JavaRDD} to an output file.
* @param ctx the JavaSparkContext to write.
* @param outputFile path to the output bam.
* @param outputFile path to the output bam/cram.
* @param reads reads to write.
*/
public void writeReads(final JavaSparkContext ctx, final String outputFile, JavaRDD<GATKRead> reads) {
try {
ReadsSparkSink.writeReads(ctx, outputFile, reads, readsHeader, shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE, getRecommendedNumReducers());
ReadsSparkSink.writeReads(ctx, outputFile,
hasReference() ? referenceArguments.getReferenceFile().getAbsolutePath() : null,
reads, readsHeader, shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE,
getRecommendedNumReducers());
} catch (IOException e) {
throw new GATKException("unable to write bam: " + e);
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.google.api.services.storage.Storage;
import com.google.cloud.genomics.dataflow.readers.bam.BAMIO;
import htsjdk.samtools.*;
import htsjdk.samtools.cram.build.CramIO;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -199,7 +201,7 @@ public boolean accept(Path path) {
}
path = bamFiles[0].getPath(); // Hadoop-BAM writes the same header to each shard, so use the first one
}
setHadoopBAMConfigurationProperties(path.getName(), referencePath);
setHadoopBAMConfigurationProperties(filePath, referencePath);
return SAMHeaderReader.readSAMHeaderFrom(path, ctx.hadoopConfiguration());
} catch (IOException e) {
throw new UserException("Failed to read bam header from " + filePath + "\n Caused by:" + e.getMessage(), e);
Expand All @@ -223,7 +225,8 @@ private void setHadoopBAMConfigurationProperties(final String inputName, final S
final Configuration conf = ctx.hadoopConfiguration();
conf.set(SAMHeaderReader.VALIDATION_STRINGENCY_PROPERTY, validationStringency.name());

if (!IOUtils.isCramFileName(inputName)) { // only set the reference for CRAM input
if (!IOUtils.isCramFileName(inputName)) {
// only set the reference for CRAM input
conf.unset(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMRecord;
import htsjdk.samtools.SAMRecordCoordinateComparator;
import htsjdk.samtools.ValidationStringency;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -49,14 +50,16 @@ private void shutdownMiniCluster() {
@DataProvider(name = "loadReadsBAM")
public Object[][] loadReadsBAM() {
return new Object[][]{
{testDataDir + "tools/BQSR/HiSeq.1mb.1RG.2k_lines.bam", "ReadsSparkSinkUnitTest1", ".bam"},
{testDataDir + "tools/BQSR/expected.HiSeq.1mb.1RG.2k_lines.alternate.recalibrated.DIQ.bam", "ReadsSparkSinkUnitTest2", ".bam"},
{testDataDir + "tools/BQSR/HiSeq.1mb.1RG.2k_lines.bam", "ReadsSparkSinkUnitTest1", null, ".bam"},
{testDataDir + "tools/BQSR/expected.HiSeq.1mb.1RG.2k_lines.alternate.recalibrated.DIQ.bam", "ReadsSparkSinkUnitTest2", null, ".bam"},

// This file has unmapped reads that are set to the position of their mates -- the ordering check
// in the tests below will fail if our ordering of these reads relative to the mapped reads
// is not consistent with the definition of coordinate sorting as defined in
// htsjdk.samtools.SAMRecordCoordinateComparator
{testDataDir + "tools/BQSR/CEUTrio.HiSeq.WGS.b37.ch20.1m-1m1k.NA12878.bam", "ReadsSparkSinkUnitTest3", ".bam"},
{testDataDir + "tools/BQSR/CEUTrio.HiSeq.WGS.b37.ch20.1m-1m1k.NA12878.bam", "ReadsSparkSinkUnitTest3", null, ".bam"},
{testDataDir + "tools/BQSR/NA12878.chr17_69k_70k.dictFix.cram", "ReadsSparkSinkUnitTest5",
publicTestDir + "human_g1k_v37.chr17_1Mb.fasta", ".cram"},
};
}

Expand All @@ -82,45 +85,44 @@ public Object[][] loadReadsADAM() {
}

@Test(dataProvider = "loadReadsBAM", groups = "spark")
public void readsSinkTest(String inputBam, String outputFileName, String outputFileExtension) throws IOException {
public void readsSinkTest(String inputBam, String outputFileName, String referenceFile, String outputFileExtension) throws IOException {
final File outputFile = createTempFile(outputFileName, outputFileExtension);
assertSingleShardedWritingWorks(inputBam, outputFile.getAbsolutePath());
assertSingleShardedWritingWorks(inputBam, referenceFile, outputFile.getAbsolutePath());
}

@Test(dataProvider = "loadReadsBAM", groups = "spark")
public void readsSinkHDFSTest(String inputBam, String outputFileName, String outputFileExtension) throws IOException {
public void readsSinkHDFSTest(String inputBam, String outputFileName, String referenceFileName, String outputFileExtension) throws IOException {
final String outputHDFSPath = MiniClusterUtils.getTempPath(cluster, outputFileName, outputFileExtension).toString();
Assert.assertTrue(BucketUtils.isHadoopUrl(outputHDFSPath));
assertSingleShardedWritingWorks(inputBam, outputHDFSPath);
assertSingleShardedWritingWorks(inputBam, referenceFileName, outputHDFSPath);
}

@Test(dataProvider = "loadReadsBAM", groups = "spark")
public void testWritingToAnExistingFileHDFS(String inputBam, String outputFileName, String outputFileExtension) throws IOException {
public void testWritingToAnExistingFileHDFS(String inputBam, String outputFileName, String referenceFileName, String outputFileExtension) throws IOException {
final Path outputPath = MiniClusterUtils.getTempPath(cluster, outputFileName, outputFileExtension);
final FileSystem fs = outputPath.getFileSystem(new Configuration());
Assert.assertTrue(fs.createNewFile(outputPath));
Assert.assertTrue(fs.exists(outputPath));
assertSingleShardedWritingWorks(inputBam, outputPath.toString());

assertSingleShardedWritingWorks(inputBam, referenceFileName, outputPath.toString());
}

@Test
public void testWritingToFileURL() throws IOException {
String inputBam = testDataDir + "tools/BQSR/HiSeq.1mb.1RG.2k_lines.bam";
String outputUrl = "file:///" + createTempFile("ReadsSparkSinkUnitTest1", ".bam").getAbsolutePath();
assertSingleShardedWritingWorks(inputBam, outputUrl);
assertSingleShardedWritingWorks(inputBam, null, outputUrl);
}

private void assertSingleShardedWritingWorks(String inputBam, String outputPath) throws IOException {
private void assertSingleShardedWritingWorks(String inputBam, String referenceFile, String outputPath) throws IOException {
JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();

ReadsSparkSource readSource = new ReadsSparkSource(ctx);
JavaRDD<GATKRead> rddParallelReads = readSource.getParallelReads(inputBam, null);
SAMFileHeader header = readSource.getHeader(inputBam, null, null);
JavaRDD<GATKRead> rddParallelReads = readSource.getParallelReads(inputBam, referenceFile);
SAMFileHeader header = readSource.getHeader(inputBam, referenceFile, null);

ReadsSparkSink.writeReads(ctx, outputPath, rddParallelReads, header, ReadsWriteFormat.SINGLE);
ReadsSparkSink.writeReads(ctx, outputPath, referenceFile, rddParallelReads, header, ReadsWriteFormat.SINGLE);

JavaRDD<GATKRead> rddParallelReads2 = readSource.getParallelReads(outputPath, null);
JavaRDD<GATKRead> rddParallelReads2 = readSource.getParallelReads(outputPath, referenceFile);
final List<GATKRead> writtenReads = rddParallelReads2.collect();

assertReadsAreSorted(header, writtenReads);
Expand All @@ -140,23 +142,23 @@ private static void assertReadsAreSorted(SAMFileHeader header, List<GATKRead> wr
}

@Test(dataProvider = "loadReadsBAM", groups = "spark")
public void readsSinkShardedTest(String inputBam, String outputFileName, String outputFileExtension) throws IOException {
public void readsSinkShardedTest(String inputBam, String outputFileName, String referenceFile, String outputFileExtension) throws IOException {
final File outputFile = createTempFile(outputFileName, outputFileExtension);
JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();

ReadsSparkSource readSource = new ReadsSparkSource(ctx);
JavaRDD<GATKRead> rddParallelReads = readSource.getParallelReads(inputBam, null);
JavaRDD<GATKRead> rddParallelReads = readSource.getParallelReads(inputBam, referenceFile);
rddParallelReads = rddParallelReads.repartition(2); // ensure that the output is in two shards
SAMFileHeader header = readSource.getHeader(inputBam, null, null);
SAMFileHeader header = readSource.getHeader(inputBam, referenceFile, null);

ReadsSparkSink.writeReads(ctx, outputFile.getAbsolutePath(), rddParallelReads, header, ReadsWriteFormat.SHARDED);
ReadsSparkSink.writeReads(ctx, outputFile.getAbsolutePath(), referenceFile, rddParallelReads, header, ReadsWriteFormat.SHARDED);
int shards = outputFile.listFiles((dir, name) -> !name.startsWith(".") && !name.startsWith("_")).length;
Assert.assertEquals(shards, 2);
// check that no local .crc files are created
int crcs = outputFile.listFiles((dir, name) -> name.startsWith(".") && name.endsWith(".crc")).length;
Assert.assertEquals(crcs, 0);

JavaRDD<GATKRead> rddParallelReads2 = readSource.getParallelReads(outputFile.getAbsolutePath(), null);
JavaRDD<GATKRead> rddParallelReads2 = readSource.getParallelReads(outputFile.getAbsolutePath(), referenceFile);
// reads are not globally sorted, so don't test that
Assert.assertEquals(rddParallelReads.count(), rddParallelReads2.count());
}
Expand All @@ -176,7 +178,7 @@ public void readsSinkADAMTest(String inputBam, String outputDirectoryName) throw
.filter(r -> !r.isUnmapped()); // filter out unmapped reads (see comment below)
SAMFileHeader header = readSource.getHeader(inputBam, null, null);

ReadsSparkSink.writeReads(ctx, outputDirectory.getAbsolutePath(), rddParallelReads, header, ReadsWriteFormat.ADAM);
ReadsSparkSink.writeReads(ctx, outputDirectory.getAbsolutePath(), null, rddParallelReads, header, ReadsWriteFormat.ADAM);

JavaRDD<GATKRead> rddParallelReads2 = readSource.getADAMReads(outputDirectory.getAbsolutePath(), null, header);
Assert.assertEquals(rddParallelReads.count(), rddParallelReads2.count());
Expand Down
Loading

0 comments on commit 3176a26

Please sign in to comment.