Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for re-serializing (sharding and reassembling) CRAM containers. #1609

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions src/main/java/htsjdk/samtools/CRAMContainerStreamRewriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package htsjdk.samtools;

import htsjdk.samtools.cram.build.CramIO;
import htsjdk.samtools.cram.structure.Container;
import htsjdk.samtools.cram.structure.CramHeader;
import htsjdk.samtools.util.RuntimeIOException;

import java.io.IOException;
import java.io.OutputStream;

/**
* Rewrite a series of containers to a new stream. The CRAM header and SAMFileHeader containers are automatically
* written to the stream when this class is instantiated. An EOF container is automatically written when
* {@link #finish()} is called.
*/
public class CRAMContainerStreamRewriter {
private final OutputStream outputStream;
private final String outputStreamIdentifier;
private final CramHeader cramHeader;
private final SAMFileHeader samFileHeader;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is assigned but never accessed.

private final CRAMIndexer cramIndexer;

private long streamOffset = 0L;
private long recordCounter = 0L;

/**
* Create a CRAMContainerStreamRewriter for writing a series of CRAM containers into an output
* stream, with an optional output index.
*
* @param outputStream where to write the CRAM stream.
* @param samFileHeader {@link SAMFileHeader} to be used. Sort order is determined by the sortOrder property of this arg.
* @param outputStreamIdentifier used for display in error message display
* @param indexer CRAM indexer. Can be null if no index is required.
*/
public CRAMContainerStreamRewriter(
final OutputStream outputStream,
final CramHeader cramHeader,
final SAMFileHeader samFileHeader,
final String outputStreamIdentifier,
final CRAMIndexer indexer) {
this.outputStream = outputStream;
this.cramHeader = cramHeader;
this.samFileHeader = samFileHeader;
this.outputStreamIdentifier = outputStreamIdentifier;
this.cramIndexer = indexer;

streamOffset = CramIO.writeCramHeader(cramHeader, outputStream);
streamOffset += Container.writeSAMFileHeaderContainer(cramHeader.getCRAMVersion(), samFileHeader, outputStream);
}

/**
* Writes a container to a stream, updating the (stream-relative) global record counter and byte offsets.
*
* Since this method mutates the values in the container, the container is no longer valid in the context
* of the stream from which it originated.
*
* @param container the container to emit to the stream. the container must conform to the version and sort
* order specified in the CRAM header and SAM header provided to the constructor
* {@link #CRAMContainerStreamRewriter(OutputStream, CramHeader, SAMFileHeader, String, CRAMIndexer)}.
* All the containers serialized to a single stream using this method must have originated from the
* same original context(/stream), obtained via {@link htsjdk.samtools.cram.build.CramContainerIterator}.
*/
public void rewriteContainer(final Container container) {
// update the container and slices with the correct global record counter and byte offsets
// (required for indexing)
container.relocateContainer(recordCounter, streamOffset);

// re-serialize the entire container and slice(s), block by block
streamOffset += container.write(cramHeader.getCRAMVersion(), outputStream);
recordCounter += container.getContainerHeader().getNumberOfRecords();

if (cramIndexer != null) {
cramIndexer.processContainer(container, ValidationStringency.SILENT);
}
}

/**
* Finish writing to the stream. Flushes the record cache and optionally emits an EOF container.
*/
public void finish() {
try {
CramIO.writeCramEOF(cramHeader.getCRAMVersion(), outputStream);
outputStream.flush();
if (cramIndexer != null) {
cramIndexer.finish();
}
} catch (final IOException e) {
throw new RuntimeIOException(String.format("IOException closing stream for %s", outputStreamIdentifier));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the output stream actually closed here? It seems like it's just flushed.

}
}

}
1 change: 0 additions & 1 deletion src/main/java/htsjdk/samtools/CRAMIndexer.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package htsjdk.samtools;

import htsjdk.samtools.cram.structure.CompressorCache;
import htsjdk.samtools.cram.structure.Container;

/**
Expand Down
25 changes: 24 additions & 1 deletion src/main/java/htsjdk/samtools/cram/structure/Container.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class Container {
private final List<Slice> slices;

// container's byte offset from the start of the containing stream, used for indexing
private final long containerByteOffset;
private long containerByteOffset;

/**
* Create a Container with a {@link ReferenceContext} derived from its {@link Slice}s.
Expand Down Expand Up @@ -190,6 +190,7 @@ public int write(final CRAMVersion cramVersion, final OutputStream outputStream)
// landmark 0 = byte length of the compression header
// landmarks after 0 = byte length of the compression header plus all slices before this one
landmarks.add(tempOutputStream.size());
slice.byteOffsetOfContainer = containerByteOffset;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this updated again here when it's already set by relocate?

slice.write(cramVersion, tempOutputStream);
}
getContainerHeader().setLandmarks(landmarks);
Expand Down Expand Up @@ -335,6 +336,28 @@ public List<SAMRecord> getSAMRecords(
public CompressionHeader getCompressionHeader() { return compressionHeader; }
public AlignmentContext getAlignmentContext() { return containerHeader.getAlignmentContext(); }
public long getContainerByteOffset() { return containerByteOffset; }

/**
* Update the stream-relative values (global record counter and stream byte offset) for this
* container. For use when re-serializing a container that has been read from an existing stream
* into a new stream. This method mutates the container and it's slices - the container is no
* longer valid in the context of it's original stream.
*
* @param containerRecordCounter the new global record counter for this container
* @param streamByteOffset the new stream byte offset counter for this container
* @return the updated global record counter
*/
public long relocateContainer(final long containerRecordCounter, final long streamByteOffset) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to mutate the container? It was previously effectively immutable wasn't it? It seems like it would be nicer to copy it since I assume the container / slice objects are fairly lightweight and you could just pass through the reference to the heavy data?

Not a blocker if you think this is necessary for efficiency reasons / copying a container being very annoying.

this.containerByteOffset = streamByteOffset;
this.getContainerHeader().setGlobalRecordCounter(containerRecordCounter);

long sliceRecordCounter = containerRecordCounter;
for (final Slice slice : getSlices()) {
sliceRecordCounter = slice.relocateSlice(sliceRecordCounter, streamByteOffset);
}
return sliceRecordCounter;
}

public List<Slice> getSlices() { return slices; }
public boolean isEOF() {
return containerHeader.isEOF() && (getSlices() == null || getSlices().size() == 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ContainerHeader {
// total length of all blocks in this container (total length of this container, minus the Container Header).
private final AlignmentContext alignmentContext;
private final int recordCount;
private final long globalRecordCounter;
private long globalRecordCounter;
private final long baseCount;
private final int blockCount;

Expand Down Expand Up @@ -249,4 +249,8 @@ public boolean isEOF() {
return v3 || v2;
}

void setGlobalRecordCounter(final long recordCounter) {
this.globalRecordCounter = recordCount;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, unfortunate to make this mutable.

}

}
20 changes: 18 additions & 2 deletions src/main/java/htsjdk/samtools/cram/structure/Slice.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class Slice {
// Slice header components as defined in the spec
private final AlignmentContext alignmentContext; // ref sequence, alignment start and span
private final int nRecords;
private final long globalRecordCounter;
private long globalRecordCounter;
private final int nSliceBlocks; // includes the core block and external blocks, but not the header block
private List<Integer> contentIDs;
private int embeddedReferenceBlockContentID = EMBEDDED_REFERENCE_ABSENT_CONTENT_ID;
Expand All @@ -78,7 +78,7 @@ public class Slice {

private final CompressionHeader compressionHeader;
private final SliceBlocks sliceBlocks;
private final long byteOffsetOfContainer;
public long byteOffsetOfContainer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make this public? It seems like it's already updated through relocate.


private Block sliceHeaderBlock;

Expand Down Expand Up @@ -518,6 +518,22 @@ public void normalizeCRAMRecords(final List<CRAMCompressionRecord> cramCompressi
}
}

/**
* Update the stream-relative values (global record counter and container stream byte offset) for
* this slice. For use when re-serializing a container that has been read from an existing stream
* into a new stream. This method mutates the container and it's slices - the container is no
* longer valid in the context of it's original stream.
*
* @param sliceRecordCounter the new global record counter for this slice
* @param containerByteOffset the new stream byte offset counter for this slice's enclosing container
* @return the updated global record counter
*/
long relocateSlice(final long sliceRecordCounter, final long containerByteOffset) {
this.byteOffsetOfContainer = containerByteOffset;
this.globalRecordCounter = sliceRecordCounter;
return sliceRecordCounter + getNumberOfRecords();
}

private int getReferenceOffset(final boolean hasEmbeddedReference) {
final ReferenceContext sliceReferenceContext = getAlignmentContext().getReferenceContext();
return sliceReferenceContext.isMappedSingleRef() && hasEmbeddedReference ?
Expand Down
Loading