Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MergeSVEvidence #7695

Merged
merged 1 commit into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.broadinstitute.hellbender.utils.IndexUtils;
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.codecs.FeaturesHeader;
import org.broadinstitute.hellbender.tools.sv.SVFeaturesHeader;
import org.broadinstitute.hellbender.utils.gcs.BucketUtils;
import org.broadinstitute.hellbender.utils.io.BlockCompressedIntervalStream.Reader;
import org.broadinstitute.hellbender.utils.io.IOUtils;
Expand Down Expand Up @@ -473,8 +473,8 @@ protected static FeatureReader<VariantContext> getGenomicsDBFeatureReader(final
public SAMSequenceDictionary getSequenceDictionary() {
SAMSequenceDictionary dict = null;
final Object header = getHeader();
if ( header instanceof FeaturesHeader ) {
dict = ((FeaturesHeader)header).getDictionary();
if ( header instanceof SVFeaturesHeader ) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the rationale behind the FeaturesHeader -> SVFeaturesHeader migration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why FeaturesHeader --> SVFeaturesHeader? It just seemed to me that the name was too generic -- suggesting that it applied to all Features -- and, in particular, that it could easily be confused with the FeatureCodecHeader. I just wanted to make the name suggest to which Feature sub-types it applied.

dict = ((SVFeaturesHeader)header).getDictionary();
} else if (header instanceof VCFHeader) {
dict = ((VCFHeader) header).getSequenceDictionary();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.broadinstitute.hellbender.engine;

import com.google.common.annotations.VisibleForTesting;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMSequenceDictionary;
import htsjdk.samtools.util.Locatable;
import htsjdk.tribble.Feature;
Expand Down Expand Up @@ -43,7 +42,7 @@
* by the user on the command line, determines the type of the file and the codec required to decode it,
* creates a FeatureDataSource for that file, and adds it to a query-able resource pool.
*
* Clients can then call {@link #getFeatures(FeatureInput, SimpleInterval)} to query the data source for
* Clients can then call {@link #getFeatures(FeatureInput, Locatable)} to query the data source for
* a particular FeatureInput over a specific interval.
*/
public final class FeatureManager implements AutoCloseable {
Expand All @@ -64,7 +63,7 @@ public final class FeatureManager implements AutoCloseable {
*/
private static final Class<FeatureInput> FEATURE_ARGUMENT_CLASS = FeatureInput.class;

/**
/*
* At startup, walk through the packages in codec packages, and save any (concrete) FeatureCodecs discovered
* in DISCOVERED_CODECS
*/
Expand Down Expand Up @@ -333,6 +332,13 @@ public List<SAMSequenceDictionary> getAllSequenceDictionaries() {
.collect(Collectors.toList());
}

/**
* Get all the sources managed by this FeatureManager.
*/
public Set<FeatureInput<? extends Feature>> getAllInputs() {
return featureSources.keySet();
}

/**
* Given a FeatureInput argument field from our tool, queries the data source for that FeatureInput
* over the specified interval, and returns a List of the Features overlapping that interval from
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
package org.broadinstitute.hellbender.engine;

import htsjdk.samtools.SAMSequenceDictionary;
import htsjdk.samtools.SAMSequenceRecord;
import htsjdk.tribble.Feature;
import htsjdk.variant.vcf.VCFHeader;
import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions;
import org.broadinstitute.hellbender.engine.filters.CountingReadFilter;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.utils.IntervalUtils;
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.tools.sv.SVFeaturesHeader;

import java.util.*;

/**
* A MultiFeatureWalker is a tool that presents one {@link Feature} at a time in sorted order from
* multiple sources of Features. The input files for each feature must be sorted by locus.
*
* To use this walker you need only implement the abstract
* {@link #apply(F, Object, ReadsContext, ReferenceContext)} method in a class that declares
* a collection of FeatureInputs as an argument.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you take my suggestion below to override onStartup() instead of onTraversalStart(), add to the docs here:

and may optionally implement {@link #onTraversalStart()}, {@link #onTraversalSuccess()}, and/or {@link #closeTool()}.

which is the usual pattern for Walker classes

* You may optionally implement {@link #onTraversalStart()}, {@link #onTraversalSuccess()},
* and/or {@link #closeTool()}.
*/
public abstract class MultiFeatureWalker<F extends Feature> extends WalkerBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an ExampleMultiFeatureWalker in org.broadinstitute.hellbender.tools.examples + an ExampleMultiFeatureWalkerIntegrationTest, modeled after the existing ExampleFeatureWalker and ExampleFeatureWalkerIntegrationTest? We generally try to do this for each new traversal added to GATK both as examples for tool authors and to catch regressions in the Walker classes.


private SAMSequenceDictionary dictionary;
private final Set<String> samples = new TreeSet<>();

@Override
public boolean requiresFeatures(){
return true;
}

@Override
public String getProgressMeterRecordLabel() { return "features"; }

@Override
void initializeFeatures() {
features = new FeatureManager(this, FeatureDataSource.DEFAULT_QUERY_LOOKAHEAD_BASES,
cloudPrefetchBuffer, cloudIndexPrefetchBuffer, getGenomicsDBOptions());
}

/**
* Operations performed just prior to the start of traversal.
*/
@Override
public final void onStartup() {
super.onStartup();
setDictionaryAndSamples();
}

/**
* {@inheritDoc}
*
* Implementation of Feature-based traversal.
*
* NOTE: You should only override {@link #traverse()} if you are writing a new walker base class
* in the engine package that extends this class. It is not meant to be overridden by tools
* outside the engine package.
*/
@Override
public void traverse() {
CountingReadFilter readFilter = makeReadFilter();
final MergingIterator<F> iterator =
new MergingIterator<>(dictionary, features, userIntervals);
while ( iterator.hasNext() ) {
final PQEntry<F> entry = iterator.next();
final F feature = entry.getFeature();
final SimpleInterval featureInterval = new SimpleInterval(feature);
apply(feature,
entry.getHeader(),
new ReadsContext(reads, featureInterval, readFilter),
new ReferenceContext(reference, featureInterval));
progressMeter.update(feature);
}
}

/**
* Process an individual feature.
* In general, subclasses should simply stream their output from apply(), and maintain as little
* internal state as possible.
*
* @param feature Current Feature being processed.
* @param header Header object for the source from which the feature was drawn (may be null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add javadoc for the all-important ReadsContext and ReferenceContext args

* @param readsContext An object that allows querying for the reads the overlap the feature
* @param referenceContext An object that allows querying for the reference sequence associated with the feature
*/
public abstract void apply( final F feature,
final Object header,
final ReadsContext readsContext,
final ReferenceContext referenceContext );

/**
* Get the dictionary we settled on
*/
public SAMSequenceDictionary getDictionary() { return dictionary; }

/**
* Get the list of sample names we accumulated
*/
public Set<String> getSampleNames() { return Collections.unmodifiableSet(samples); }

/**
* Choose the most comprehensive dictionary available (see betterDictionary method below),
* and concatenate the sample names available from each feature input.
* Each feature input may have its own dictionary, and the user can specify an additional master
* dictionary, reference dictionary, and reads dictionary. This method makes certain that all
* of these dictionaries are consistent with regard to contig name and order. It's OK if one
* dictionary is a subset of another: we'll choose the most comprehensive dictionary.
* (Can't use the getBestAvailableSequenceDictionary method -- it throws if there are multiple
* dictionaries available.)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could consider pushing some more intelligent behavior up into getBestAvailableSequenceDictionary() to handle the case of multiple Feature dictionaries in a sensible way. We don't have to do that in this PR though -- recommend filing a separate ticket nominating the logic in MultiFeatureWalker for eventual promotion up into GATKTool

*/
private void setDictionaryAndSamples() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some documentation with an overview of the logic here?

DictSource dictSource = new DictSource(getMasterSequenceDictionary(),
StandardArgumentDefinitions.SEQUENCE_DICTIONARY_NAME);
if ( hasReference() ) {
final DictSource refDictSource = new DictSource(reference.getSequenceDictionary(),
StandardArgumentDefinitions.REFERENCE_LONG_NAME);
dictSource = betterDictionary(refDictSource, dictSource);
}
if ( hasReads() ) {
final DictSource readsDictSource = new DictSource(reads.getSequenceDictionary(), "read-source");
dictSource = betterDictionary(readsDictSource, dictSource);
}
for ( final FeatureInput<? extends Feature> input : features.getAllInputs() ) {
final Object header = features.getHeader(input);
if ( header instanceof SVFeaturesHeader ) {
final SVFeaturesHeader svFeaturesHeader = (SVFeaturesHeader)header;
final DictSource featureDictSource = new DictSource(svFeaturesHeader.getDictionary(),
input.getName());
dictSource = betterDictionary(featureDictSource, dictSource);
final List<String> sampleNames = svFeaturesHeader.getSampleNames();
if ( sampleNames != null ) {
samples.addAll(svFeaturesHeader.getSampleNames());
}
} else if (header instanceof VCFHeader ) {
final VCFHeader vcfHeader = (VCFHeader)header;
final DictSource featureDictSource = new DictSource(vcfHeader.getSequenceDictionary(),
input.getName());
dictSource = betterDictionary(featureDictSource, dictSource);
samples.addAll(vcfHeader.getSampleNamesInOrder());
}
}
if ( dictSource.getDictionary() == null ) {
throw new UserException("No dictionary found. Provide one as --" +
StandardArgumentDefinitions.SEQUENCE_DICTIONARY_NAME + " or --" +
StandardArgumentDefinitions.REFERENCE_LONG_NAME + ".");
}
dictionary = dictSource.getDictionary();
}

/**
* Makes sure that the two dictionaries are consistent with regard to contig names and order.
* Returns the more comprehensive (larger) dictionary if they're consistent.
*/
private static DictSource betterDictionary( final DictSource newDict,
final DictSource curDict ) {
if ( curDict.getDictionary() == null ) return newDict;
if ( newDict.getDictionary() == null ) return curDict;
final DictSource smallDict;
final DictSource largeDict;
if ( newDict.getDictionary().size() <= curDict.getDictionary().size() ) {
smallDict = newDict;
largeDict = curDict;
} else {
smallDict = curDict;
largeDict = newDict;
}
int lastIdx = -1;
final SAMSequenceDictionary largeDictionary = largeDict.getDictionary();
for ( final SAMSequenceRecord rec : smallDict.getDictionary().getSequences() ) {
final int newIdx = largeDictionary.getSequenceIndex(rec.getContig());
if ( newIdx == -1 ) {
throw new UserException("Contig " + rec.getContig() + " in the dictionary read from " +
smallDict.getSource() + " does not appear in the larger dictionary read from " +
largeDict.getSource());
}
if ( newIdx <= lastIdx ) {
final String prevContig = largeDictionary.getSequence(lastIdx).getContig();
throw new UserException("Contigs out of order: Contig " + rec.getContig() +
" comes before contig " + prevContig + " in the dictionary read from " +
largeDict.getSource() + ", but follows it in the dictionary read from " +
smallDict.getSource());
}
lastIdx = newIdx;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you just call SequenceDictionaryUtils.supersets() here instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't use SequenceDictionaryUtils.supersets(). That method doesn't test whether the like-named contigs have the same order. We're using the dictionary to get the collation sequence so that we can present the features in sorted order. It's the often revisited problem that Features, SimpleIntervals, etc. aren't comparable.

}
return largeDict;
}

public static final class DictSource {
private final SAMSequenceDictionary dictionary;
private final String source;

public DictSource( final SAMSequenceDictionary dictionary, final String source ) {
this.dictionary = dictionary;
this.source = source;
}

public SAMSequenceDictionary getDictionary() { return dictionary; }
public String getSource() { return source; }
}

public static final class MergingIterator<F extends Feature> implements Iterator<PQEntry<F>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also a quick doc here

final SAMSequenceDictionary dictionary;
final PriorityQueue<PQEntry<F>> priorityQueue;

@SuppressWarnings("unchecked")
public MergingIterator( final SAMSequenceDictionary dictionary,
final FeatureManager featureManager,
final List<SimpleInterval> intervals ) {
final Set<FeatureInput<? extends Feature>> inputs = featureManager.getAllInputs();
this.dictionary = dictionary;
this.priorityQueue = new PriorityQueue<>(inputs.size());
for ( final FeatureInput<? extends Feature> input : inputs ) {
final Iterator<F> iterator =
(Iterator<F>)featureManager.getFeatureIterator(input, intervals);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you add ExampleMultiFeatureWalkerIntegrationTest, make sure to include a test that uses the -L argument to confirm that interval subsetting is hooked up properly to this traversal (which it does appear to be).

final Object header = featureManager.getHeader(input);
addEntry(new PQContext<>(iterator, dictionary, header));
}
}

@Override
public boolean hasNext() {
return !priorityQueue.isEmpty();
}

@Override
public PQEntry<F> next() {
final PQEntry<F> entry = priorityQueue.poll();
if ( entry == null ) {
throw new NoSuchElementException("iterator is exhausted");
}
final PQEntry<F> newEntry = addEntry(entry.getContext());
if ( newEntry != null ) {
if ( newEntry.compareTo(entry) < 0 ) {
final Feature feature = newEntry.getFeature();
throw new UserException("inputs are not sorted at " +
feature.getContig() + ":" + feature.getStart());
}
}
return entry;
}

private PQEntry<F> addEntry( final PQContext<F> context ) {
final Iterator<F> iterator = context.getIterator();
if ( !iterator.hasNext() ) {
return null;
}
final PQEntry<F> entry = new PQEntry<>(context, iterator.next());
priorityQueue.add(entry);
return entry;
}
}

public static final class PQContext<F extends Feature> {
private final Iterator<F> iterator;
private final SAMSequenceDictionary dictionary;
private final Object header;

public PQContext( final Iterator<F> iterator,
final SAMSequenceDictionary dictionary,
final Object header ) {
this.iterator = iterator;
this.dictionary = dictionary;
this.header = header;
}

public Iterator<F> getIterator() { return iterator; }
public SAMSequenceDictionary getDictionary() { return dictionary; }
public Object getHeader() { return header; }
}

public static final class PQEntry<F extends Feature> implements Comparable<PQEntry<F>> {
private final PQContext<F> context;
private final F feature;

public PQEntry( final PQContext<F> context, final F feature ) {
this.context = context;
this.feature = feature;
}

public PQContext<F> getContext() { return context; }
public F getFeature() { return feature; }
public Object getHeader() { return context.getHeader(); }

@Override
public int compareTo( PQEntry<F> entry ) {
return IntervalUtils.compareLocatables(feature, entry.getFeature(), context.getDictionary());
}
}
}
Loading