Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added an argument to specify the parts directory for ReadsSparkSink #4666

Merged
merged 5 commits into from
May 9, 2018

Conversation

jamesemery
Copy link
Collaborator

This should allow for the .parts whiteout to live elsewhere which is needed for using LOCAL disks with spark.

Fixes #4636

@codecov-io
Copy link

codecov-io commented Apr 17, 2018

Codecov Report

Merging #4666 into master will increase coverage by 0.547%.
The diff coverage is 66.667%.

@@              Coverage Diff               @@
##              master    #4666       +/-   ##
==============================================
+ Coverage     79.823%   80.37%   +0.547%     
- Complexity     17328    18189      +861     
==============================================
  Files           1075     1083        +8     
  Lines          62917    66551     +3634     
  Branches       10181    11208     +1027     
==============================================
+ Hits           50222    53487     +3265     
- Misses          8714     8924      +210     
- Partials        3981     4140      +159
Impacted Files Coverage Δ Complexity Δ
...lbender/tools/spark/pathseq/PathSeqScoreSpark.java 57.407% <ø> (ø) 7 <0> (ø) ⬇️
...nder/tools/spark/pathseq/PathSeqPipelineSpark.java 81.25% <ø> (ø) 7 <0> (ø) ⬇️
...stitute/hellbender/engine/spark/GATKSparkTool.java 83.495% <100%> (+0.162%) 56 <0> (ø) ⬇️
...k/pipelines/BwaAndMarkDuplicatesPipelineSpark.java 92.593% <100%> (+13.645%) 7 <0> (+3) ⬆️
...ellbender/tools/spark/pathseq/PathSeqBwaSpark.java 67.391% <100%> (ø) 7 <0> (ø) ⬇️
...ender/engine/spark/datasources/ReadsSparkSink.java 73.043% <50%> (-2.411%) 26 <2> (+2)
...der/tools/walkers/mutect/M2ArgumentCollection.java 96.875% <0%> (-3.125%) 3% <0%> (+2%)
...broadinstitute/hellbender/utils/test/BaseTest.java 62.921% <0%> (-1.523%) 49% <0%> (+16%)
...hellbender/utils/read/markduplicates/ReadsKey.java 93.103% <0%> (-1.341%) 15% <0%> (+6%)
...itute/hellbender/tools/walkers/bqsr/ApplyBQSR.java 90.909% <0%> (-0.758%) 11% <0%> (+5%)
... and 56 more


final JavaRDD<SAMRecord> sortedReads = SparkUtils.sortReads(reads, header, numReducers);
final String outputPartsDirectory = outputFile + ".parts/";
final String outputPartsDirectory = outputPartsDir==null?outputFile + ".parts/":outputPartsDir;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spaces please

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

BufferedWriter testfile = new BufferedWriter(new FileWriter(defaultDir.getAbsolutePath() + "/test.txt"));
testfile.write("test");
testfile.close();
Runtime.getRuntime().exec("chmod a-w -R " + defaultDir.getAbsolutePath()+"/");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you have to shell out? can't you just use defaultDir.setWriteable(false)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IntelliJ makes a big fuss about how defaultDir.setWriteable(false) will not be respected in the test suite, and i found that it wasn't resulting in failure when we tried to write to it.


// Make a directory with unusable permissions in place of where the default file will live
final File defaultDir = new File(outputFile.getAbsolutePath() + ".parts/");
defaultDir.mkdir();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should assert that this returned true

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

assertSingleShardedWritingWorks(inputBam, referenceFile, outputFile.getAbsolutePath(), outputDir.getAbsolutePath());

// Test that the file wasn't deleted when spark cleared its temp directory
Assert.assertEquals(new File(defaultDir.getAbsolutePath() + "/test.txt").exists(), true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertTrue is probably better here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

final File outputDir = createTempDir(outputFileName + ".someOtherPlace");

// Make a directory with unusable permissions in place of where the default file will live
final File defaultDir = new File(outputFile.getAbsolutePath() + ".parts/");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract a constant for this value in the ReadSparkSink and refer to it, or even a method that takes a file and returns the default parts folder

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

fullName = OUTPUT_SHARD_DIR_LONG_NAME,
optional = true)
protected String shardedPartsDir = null;

@Argument(doc="For tools that shuffle data or write an output, sets the number of reducers. Defaults to 0, which gives one partition per 10MB of input.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want a check that makes sure you're not setting this if shardedOutput == true, because it will be confusingly ignored.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a check in the writer which will throw.

@@ -180,7 +180,7 @@ private void writeBam(final JavaRDD<GATKRead> reads, final String inputBamPath,
try {
ReadsSparkSink.writeReads(ctx, outputPath, bwaArgs.referencePath, reads, header,
shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE,
PSUtils.pathseqGetRecommendedNumReducers(inputBamPath, numReducers, getTargetPartitionSize()));
PSUtils.pathseqGetRecommendedNumReducers(inputBamPath, numReducers, getTargetPartitionSize()), null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these nulls should all be replaced with references to shardedPartsDir

@@ -289,7 +289,7 @@ protected void runTool(final JavaSparkContext ctx) {
final int numPartitions = Math.max(1, (int) (numTotalReads / readsPerPartitionOutput));
final JavaRDD<GATKRead> readsFinalRepartitioned = readsFinal.coalesce(numPartitions, false);
ReadsSparkSink.writeReads(ctx, outputPath, null, readsFinalRepartitioned, header,
shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE, numPartitions);
shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE, numPartitions, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likewise

@@ -221,7 +221,7 @@ protected void runTool(final JavaSparkContext ctx) {
if (outputPath != null) {
try {
ReadsSparkSink.writeReads(ctx, outputPath, null, readsFinal, header,
shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE, recommendedNumReducers);
shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE, recommendedNumReducers, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

@@ -66,7 +66,7 @@ protected void runTool(final JavaSparkContext ctx) {
referenceArguments.getReferencePath().toAbsolutePath().toUri().toString(),
markedReads, bwaEngine.getHeader(),
shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE,
getRecommendedNumReducers());
getRecommendedNumReducers(), null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

@lbergelson lbergelson assigned jamesemery and unassigned lbergelson Apr 24, 2018
@jamesemery jamesemery assigned lbergelson and unassigned jamesemery Apr 25, 2018
@jamesemery
Copy link
Collaborator Author

Responded to a round of comments

}

@Test(dataProvider = "loadReadsBAM", groups = "spark")
public void outputDirectoryTest(String inputBam, String outputFileName, String referenceFile, String outputFileExtension) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote this:

    @Test(dataProvider = "loadReadsBAM", groups = "spark")
    public void testSpecifyPartsDir(String inputBam, String outputFileName, String referenceFile, String outputFileExtension) throws IOException {
        final File outputFile = createTempFile(outputFileName, outputFileExtension);
        final File nonDefaultShardsDir = createTempDir(outputFileName + ".someOtherPlace");


        final java.nio.file.Path defaultPartsDir = IOUtils.getPath(ReadsSparkSink.getDefaultPartsDirectory(outputFile.getAbsolutePath()));
        final java.nio.file.Path subpath = defaultPartsDir.resolve("subpath");

        // Make a directory with unusable permissions in place of where the default parts file will live
        try {
            final Set<PosixFilePermission> readOnly = EnumSet.of(PosixFilePermission.OWNER_READ);
            final FileAttribute<Set<PosixFilePermission>> readOnlyPermissions = PosixFilePermissions.asFileAttribute(readOnly);

            Files.createDirectory(defaultPartsDir);
            // An empty directory seems to be able to be deleted even with write permissions disabled, so put a file in it
            Files.createFile(subpath, readOnlyPermissions);
            Files.setPosixFilePermissions(defaultPartsDir, readOnly);

            //assert it fails when writing to the default path
            Assert.assertThrows(() -> assertSingleShardedWritingWorks(inputBam, referenceFile, outputFile.getAbsolutePath(), null));

            //show this succeeds when specifying a different path for the parts directory
            assertSingleShardedWritingWorks(inputBam, referenceFile, outputFile.getAbsolutePath(), nonDefaultShardsDir.getAbsolutePath());

            // Test that the file wasn't deleted when spark cleared its temp directory
            Assert.assertTrue(Files.exists(defaultPartsDir));
        } finally {
            try {
                final EnumSet<PosixFilePermission> readWriteExecute = EnumSet.of(PosixFilePermission.OWNER_READ,
                                                                          PosixFilePermission.OWNER_WRITE,
                                                                          PosixFilePermission.OWNER_EXECUTE);
                Files.setPosixFilePermissions(defaultPartsDir, readWriteExecute);
                Files.setPosixFilePermissions(subpath, readWriteExecute);
                Files.deleteIfExists(subpath);
                Files.deleteIfExists(defaultPartsDir);
            } catch (IOException e) {
                System.out.print("Failed to delete test file");
                e.printStackTrace();
            }
        }
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jamesemery I changed it to be more assertive and also to not shell out.

@lbergelson lbergelson assigned jamesemery and unassigned lbergelson Apr 27, 2018
@lbergelson
Copy link
Member

@jamesemery Apparently my test doesn't work on travis. Gaaah.

@jamesemery jamesemery assigned lbergelson and unassigned jamesemery May 9, 2018
@jamesemery
Copy link
Collaborator Author

@lbergelson Returned to shelling out.

@lbergelson
Copy link
Member

so sad

@jamesemery jamesemery merged commit 3bf60ef into master May 9, 2018
@davidbenjamin davidbenjamin deleted the je_configureSparkOutDir branch February 24, 2021 15:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants