Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write CRAM on Spark. #1488

Merged
merged 1 commit into from
Mar 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,15 @@ public JavaRDD<GATKRead> getUnfilteredReads() {
/**
* Writes the reads from a {@link JavaRDD} to an output file.
* @param ctx the JavaSparkContext to write.
* @param outputFile path to the output bam.
* @param outputFile path to the output bam/cram.
* @param reads reads to write.
*/
public void writeReads(final JavaSparkContext ctx, final String outputFile, JavaRDD<GATKRead> reads) {
try {
ReadsSparkSink.writeReads(ctx, outputFile, reads, readsHeader, shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE, getRecommendedNumReducers());
ReadsSparkSink.writeReads(ctx, outputFile,
hasReference() ? referenceArguments.getReferenceFile().getAbsolutePath() : null,
reads, readsHeader, shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE,
getRecommendedNumReducers());
} catch (IOException e) {
throw new GATKException("unable to write bam: " + e);
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.google.api.services.storage.Storage;
import com.google.cloud.genomics.dataflow.readers.bam.BAMIO;
import htsjdk.samtools.*;
import htsjdk.samtools.cram.build.CramIO;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -199,7 +201,7 @@ public boolean accept(Path path) {
}
path = bamFiles[0].getPath(); // Hadoop-BAM writes the same header to each shard, so use the first one
}
setHadoopBAMConfigurationProperties(path.getName(), referencePath);
setHadoopBAMConfigurationProperties(filePath, referencePath);
return SAMHeaderReader.readSAMHeaderFrom(path, ctx.hadoopConfiguration());
} catch (IOException e) {
throw new UserException("Failed to read bam header from " + filePath + "\n Caused by:" + e.getMessage(), e);
Expand All @@ -223,7 +225,8 @@ private void setHadoopBAMConfigurationProperties(final String inputName, final S
final Configuration conf = ctx.hadoopConfiguration();
conf.set(SAMHeaderReader.VALIDATION_STRINGENCY_PROPERTY, validationStringency.name());

if (!IOUtils.isCramFileName(inputName)) { // only set the reference for CRAM input
if (!IOUtils.isCramFileName(inputName)) {
// only set the reference for CRAM input
conf.unset(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMRecord;
import htsjdk.samtools.SAMRecordCoordinateComparator;
import htsjdk.samtools.ValidationStringency;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -49,14 +50,16 @@ private void shutdownMiniCluster() {
@DataProvider(name = "loadReadsBAM")
public Object[][] loadReadsBAM() {
return new Object[][]{
{testDataDir + "tools/BQSR/HiSeq.1mb.1RG.2k_lines.bam", "ReadsSparkSinkUnitTest1", ".bam"},
{testDataDir + "tools/BQSR/expected.HiSeq.1mb.1RG.2k_lines.alternate.recalibrated.DIQ.bam", "ReadsSparkSinkUnitTest2", ".bam"},
{testDataDir + "tools/BQSR/HiSeq.1mb.1RG.2k_lines.bam", "ReadsSparkSinkUnitTest1", null, ".bam"},
{testDataDir + "tools/BQSR/expected.HiSeq.1mb.1RG.2k_lines.alternate.recalibrated.DIQ.bam", "ReadsSparkSinkUnitTest2", null, ".bam"},

// This file has unmapped reads that are set to the position of their mates -- the ordering check
// in the tests below will fail if our ordering of these reads relative to the mapped reads
// is not consistent with the definition of coordinate sorting as defined in
// htsjdk.samtools.SAMRecordCoordinateComparator
{testDataDir + "tools/BQSR/CEUTrio.HiSeq.WGS.b37.ch20.1m-1m1k.NA12878.bam", "ReadsSparkSinkUnitTest3", ".bam"},
{testDataDir + "tools/BQSR/CEUTrio.HiSeq.WGS.b37.ch20.1m-1m1k.NA12878.bam", "ReadsSparkSinkUnitTest3", null, ".bam"},
{testDataDir + "tools/BQSR/NA12878.chr17_69k_70k.dictFix.cram", "ReadsSparkSinkUnitTest5",
publicTestDir + "human_g1k_v37.chr17_1Mb.fasta", ".cram"},
};
}

Expand All @@ -82,45 +85,44 @@ public Object[][] loadReadsADAM() {
}

@Test(dataProvider = "loadReadsBAM", groups = "spark")
public void readsSinkTest(String inputBam, String outputFileName, String outputFileExtension) throws IOException {
public void readsSinkTest(String inputBam, String outputFileName, String referenceFile, String outputFileExtension) throws IOException {
final File outputFile = createTempFile(outputFileName, outputFileExtension);
assertSingleShardedWritingWorks(inputBam, outputFile.getAbsolutePath());
assertSingleShardedWritingWorks(inputBam, referenceFile, outputFile.getAbsolutePath());
}

@Test(dataProvider = "loadReadsBAM", groups = "spark")
public void readsSinkHDFSTest(String inputBam, String outputFileName, String outputFileExtension) throws IOException {
public void readsSinkHDFSTest(String inputBam, String outputFileName, String referenceFileName, String outputFileExtension) throws IOException {
final String outputHDFSPath = MiniClusterUtils.getTempPath(cluster, outputFileName, outputFileExtension).toString();
Assert.assertTrue(BucketUtils.isHadoopUrl(outputHDFSPath));
assertSingleShardedWritingWorks(inputBam, outputHDFSPath);
assertSingleShardedWritingWorks(inputBam, referenceFileName, outputHDFSPath);
}

@Test(dataProvider = "loadReadsBAM", groups = "spark")
public void testWritingToAnExistingFileHDFS(String inputBam, String outputFileName, String outputFileExtension) throws IOException {
public void testWritingToAnExistingFileHDFS(String inputBam, String outputFileName, String referenceFileName, String outputFileExtension) throws IOException {
final Path outputPath = MiniClusterUtils.getTempPath(cluster, outputFileName, outputFileExtension);
final FileSystem fs = outputPath.getFileSystem(new Configuration());
Assert.assertTrue(fs.createNewFile(outputPath));
Assert.assertTrue(fs.exists(outputPath));
assertSingleShardedWritingWorks(inputBam, outputPath.toString());

assertSingleShardedWritingWorks(inputBam, referenceFileName, outputPath.toString());
}

@Test
public void testWritingToFileURL() throws IOException {
String inputBam = testDataDir + "tools/BQSR/HiSeq.1mb.1RG.2k_lines.bam";
String outputUrl = "file:///" + createTempFile("ReadsSparkSinkUnitTest1", ".bam").getAbsolutePath();
assertSingleShardedWritingWorks(inputBam, outputUrl);
assertSingleShardedWritingWorks(inputBam, null, outputUrl);
}

private void assertSingleShardedWritingWorks(String inputBam, String outputPath) throws IOException {
private void assertSingleShardedWritingWorks(String inputBam, String referenceFile, String outputPath) throws IOException {
JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();

ReadsSparkSource readSource = new ReadsSparkSource(ctx);
JavaRDD<GATKRead> rddParallelReads = readSource.getParallelReads(inputBam, null);
SAMFileHeader header = readSource.getHeader(inputBam, null, null);
JavaRDD<GATKRead> rddParallelReads = readSource.getParallelReads(inputBam, referenceFile);
SAMFileHeader header = readSource.getHeader(inputBam, referenceFile, null);

ReadsSparkSink.writeReads(ctx, outputPath, rddParallelReads, header, ReadsWriteFormat.SINGLE);
ReadsSparkSink.writeReads(ctx, outputPath, referenceFile, rddParallelReads, header, ReadsWriteFormat.SINGLE);

JavaRDD<GATKRead> rddParallelReads2 = readSource.getParallelReads(outputPath, null);
JavaRDD<GATKRead> rddParallelReads2 = readSource.getParallelReads(outputPath, referenceFile);
final List<GATKRead> writtenReads = rddParallelReads2.collect();

assertReadsAreSorted(header, writtenReads);
Expand All @@ -140,23 +142,23 @@ private static void assertReadsAreSorted(SAMFileHeader header, List<GATKRead> wr
}

@Test(dataProvider = "loadReadsBAM", groups = "spark")
public void readsSinkShardedTest(String inputBam, String outputFileName, String outputFileExtension) throws IOException {
public void readsSinkShardedTest(String inputBam, String outputFileName, String referenceFile, String outputFileExtension) throws IOException {
final File outputFile = createTempFile(outputFileName, outputFileExtension);
JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();

ReadsSparkSource readSource = new ReadsSparkSource(ctx);
JavaRDD<GATKRead> rddParallelReads = readSource.getParallelReads(inputBam, null);
JavaRDD<GATKRead> rddParallelReads = readSource.getParallelReads(inputBam, referenceFile);
rddParallelReads = rddParallelReads.repartition(2); // ensure that the output is in two shards
SAMFileHeader header = readSource.getHeader(inputBam, null, null);
SAMFileHeader header = readSource.getHeader(inputBam, referenceFile, null);

ReadsSparkSink.writeReads(ctx, outputFile.getAbsolutePath(), rddParallelReads, header, ReadsWriteFormat.SHARDED);
ReadsSparkSink.writeReads(ctx, outputFile.getAbsolutePath(), referenceFile, rddParallelReads, header, ReadsWriteFormat.SHARDED);
int shards = outputFile.listFiles((dir, name) -> !name.startsWith(".") && !name.startsWith("_")).length;
Assert.assertEquals(shards, 2);
// check that no local .crc files are created
int crcs = outputFile.listFiles((dir, name) -> name.startsWith(".") && name.endsWith(".crc")).length;
Assert.assertEquals(crcs, 0);

JavaRDD<GATKRead> rddParallelReads2 = readSource.getParallelReads(outputFile.getAbsolutePath(), null);
JavaRDD<GATKRead> rddParallelReads2 = readSource.getParallelReads(outputFile.getAbsolutePath(), referenceFile);
// reads are not globally sorted, so don't test that
Assert.assertEquals(rddParallelReads.count(), rddParallelReads2.count());
}
Expand All @@ -176,7 +178,7 @@ public void readsSinkADAMTest(String inputBam, String outputDirectoryName) throw
.filter(r -> !r.isUnmapped()); // filter out unmapped reads (see comment below)
SAMFileHeader header = readSource.getHeader(inputBam, null, null);

ReadsSparkSink.writeReads(ctx, outputDirectory.getAbsolutePath(), rddParallelReads, header, ReadsWriteFormat.ADAM);
ReadsSparkSink.writeReads(ctx, outputDirectory.getAbsolutePath(), null, rddParallelReads, header, ReadsWriteFormat.ADAM);

JavaRDD<GATKRead> rddParallelReads2 = readSource.getADAMReads(outputDirectory.getAbsolutePath(), null, header);
Assert.assertEquals(rddParallelReads.count(), rddParallelReads2.count());
Expand Down
Loading