Skip to content

Commit

Permalink
Embed ref seqs if :embed-reference? is set to true
Browse files Browse the repository at this point in the history
  • Loading branch information
athos committed Oct 10, 2024
1 parent 66b4d3e commit 03e8b21
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 167 deletions.
17 changes: 10 additions & 7 deletions src/cljam/io/cram.clj
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,16 @@
a tag keyword, returns a keyword or a set of keywords representing
compression method. It may return another function to add more conditions
for the tag type and/or block encoding.
- create-index?: If true, creates a .crai index file in the course of CRAM
file writing.
- skip-sort-order-check?: When creating a CRAM index for the CRAM file,
the CRAM writer, by default, checks if the header is declared as
`SO:coordinate` and raises an error if not.
If this option is set to true, the CRAM writer will skip the header check
and create an index file regardless of the header declaration."
- create-index?: If set to true, the CRAM writer creates a .crai index file
in the course of CRAM file writing.
- embed-reference?: If set to true, the CRAM writer embeds the reference
sequences into the resulting CRAM file, allowing it to be read without
needing an external reference file.
- skip-sort-order-check?: When creating a CRAM index or embedding reference
sequences into a CRAM file, the CRAM writer, by default, checks if
the header specifies `SO:coordinate` and raises an error if it does not.
If this option is set to true, the CRAM writer will skip this header
check and proceed regardless of the header's sort order declaration."
(^CRAMWriter [f] (writer f {}))
(^CRAMWriter [f option] (cram/writer f option)))

Expand Down
68 changes: 36 additions & 32 deletions src/cljam/io/cram/data_series.clj
Original file line number Diff line number Diff line change
Expand Up @@ -147,42 +147,46 @@

(def ^{:doc "Default encodings for all the data series"}
default-data-series-encodings
{:BF {:content-id 1, :codec :external, :compressor :gzip}
:CF {:content-id 2, :codec :external, :compressor :gzip}
:RI {:content-id 3, :codec :external, :compressor :gzip}
:RL {:content-id 4, :codec :external, :compressor :gzip}
:AP {:content-id 5, :codec :external, :compressor :gzip}
:RG {:content-id 6, :codec :external, :compressor :gzip}
:RN {:content-id 7, :codec :byte-array-stop, :stop-byte (int \tab), :compressor :gzip}
:MF {:content-id 8, :codec :external, :compressor :gzip}
:NS {:content-id 9, :codec :external, :compressor :gzip}
:NP {:content-id 10, :codec :external, :compressor :gzip}
:TS {:content-id 11, :codec :external, :compressor :gzip}
:NF {:content-id 12, :codec :external, :compressor :gzip}
:TL {:content-id 13, :codec :external, :compressor :gzip}
:FN {:content-id 14, :codec :external, :compressor :gzip}
:FC {:content-id 15, :codec :external, :compressor :gzip}
:FP {:content-id 16, :codec :external, :compressor :gzip}
:DL {:content-id 17, :codec :external, :compressor :gzip}
{;; :embedded-ref is a pseudo data series and won't be used for the real encoding
;; except for the value for :compressor, which is used when compressing embedded
;; reference blocks
:embedded-ref {:content-id 1, :codec :external, :compressor :gzip}
:BF {:content-id 2, :codec :external, :compressor :gzip}
:CF {:content-id 3, :codec :external, :compressor :gzip}
:RI {:content-id 4, :codec :external, :compressor :gzip}
:RL {:content-id 5, :codec :external, :compressor :gzip}
:AP {:content-id 6, :codec :external, :compressor :gzip}
:RG {:content-id 7, :codec :external, :compressor :gzip}
:RN {:content-id 8, :codec :byte-array-stop, :stop-byte (int \tab), :compressor :gzip}
:MF {:content-id 9, :codec :external, :compressor :gzip}
:NS {:content-id 10, :codec :external, :compressor :gzip}
:NP {:content-id 11, :codec :external, :compressor :gzip}
:TS {:content-id 12, :codec :external, :compressor :gzip}
:NF {:content-id 13, :codec :external, :compressor :gzip}
:TL {:content-id 14, :codec :external, :compressor :gzip}
:FN {:content-id 15, :codec :external, :compressor :gzip}
:FC {:content-id 16, :codec :external, :compressor :gzip}
:FP {:content-id 17, :codec :external, :compressor :gzip}
:DL {:content-id 18, :codec :external, :compressor :gzip}
:BB {:codec :byte-array-len
:len-encoding {:codec :external, :content-id 18, :compressor :gzip}
:val-encoding {:codec :external, :content-id 19, :compressor :gzip}}
:len-encoding {:codec :external, :content-id 19, :compressor :gzip}
:val-encoding {:codec :external, :content-id 20, :compressor :gzip}}
:QQ {:codec :byte-array-len
:len-encoding {:codec :external, :content-id 20, :compressor :gzip}
:val-encoding {:codec :external, :content-id 21, :compressor :gzip}}
:BS {:content-id 22, :codec :external, :compressor :gzip}
:len-encoding {:codec :external, :content-id 21, :compressor :gzip}
:val-encoding {:codec :external, :content-id 22, :compressor :gzip}}
:BS {:content-id 23, :codec :external, :compressor :gzip}
:IN {:codec :byte-array-len
:len-encoding {:codec :external, :content-id 23, :compressor :gzip}
:val-encoding {:codec :external, :content-id 24, :compressor :gzip}}
:RS {:content-id 25, :codec :external, :compressor :gzip}
:PD {:content-id 26, :codec :external, :compressor :gzip}
:HC {:content-id 27, :codec :external, :compressor :gzip}
:len-encoding {:codec :external, :content-id 24, :compressor :gzip}
:val-encoding {:codec :external, :content-id 25, :compressor :gzip}}
:RS {:content-id 26, :codec :external, :compressor :gzip}
:PD {:content-id 27, :codec :external, :compressor :gzip}
:HC {:content-id 28, :codec :external, :compressor :gzip}
:SC {:codec :byte-array-len
:len-encoding {:codec :external, :content-id 28, :compressor :gzip}
:val-encoding {:codec :external, :content-id 29, :compressor :gzip}}
:MQ {:content-id 30, :codec :external, :compressor :gzip}
:BA {:content-id 31, :codec :external, :compressor :gzip}
:QS {:content-id 32, :codec :external, :compressor :gzip}})
:len-encoding {:codec :external, :content-id 29, :compressor :gzip}
:val-encoding {:codec :external, :content-id 30, :compressor :gzip}}
:MQ {:content-id 31, :codec :external, :compressor :gzip}
:BA {:content-id 32, :codec :external, :compressor :gzip}
:QS {:content-id 33, :codec :external, :compressor :gzip}})

(defn- build-codec-encoder
[{:keys [codec content-id compressor] :as params} data-type content-id->state]
Expand Down
13 changes: 8 additions & 5 deletions src/cljam/io/cram/encode/context.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

(defn make-container-context
"Creates a new container context."
[cram-header seq-resolver]
[cram-header seq-resolver options]
(let [rname->idx (into {}
(map-indexed (fn [i {:keys [SN]}] [SN i]))
(:SQ cram-header))
Expand All @@ -24,14 +24,17 @@
:preservation-map preservation-map
:subst-mat subst-mat
:seq-resolver seq-resolver
:tag-dict-builder tag-dict-builder}))
:tag-dict-builder tag-dict-builder
:options options}))

(defn finalize-container-context
"Finalizes the builders in the container context and returns a new container
context containing those builders' results. This operation must be done before
creating a slice context."
[container-ctx alignment-stats ds-compressor-overrides tag-compressor-overrides]
(let [ds-encodings (-> ds/default-data-series-encodings
[container-ctx alignment-stats]
(let [{:keys [ds-compressor-overrides
tag-compressor-overrides]} (:options container-ctx)
ds-encodings (-> ds/default-data-series-encodings
(ds/apply-ds-compressor-overrides ds-compressor-overrides))
tag-dict (tag-dict/build-tag-dict (:tag-dict-builder container-ctx))
tag-encodings (-> (tag-dict/build-tag-encodings tag-dict)
Expand All @@ -46,7 +49,7 @@
"Creates a slice context for the ith slice from the given container context.
Note that the container context must be finalized with `finalize-container-context`."
[{:keys [alignment-stats ds-encodings tag-encodings] :as container-ctx} i]
(let [ds-encoders (ds/build-data-series-encoders ds-encodings)
(let [ds-encoders (ds/build-data-series-encoders (dissoc ds-encodings :embedded-ref))
tag-encoders (ds/build-tag-encoders tag-encodings)]
(assoc container-ctx
:alignment-stats (nth alignment-stats i)
Expand Down
30 changes: 18 additions & 12 deletions src/cljam/io/cram/encode/partitioning.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
(:import [java.util ArrayList List]))

(defn- slice-record-collector
[header {:keys [^long records-per-slice ^long min-single-ref-slice-size]
:or {records-per-slice 10000, min-single-ref-slice-size 1000}}]
[header
{:keys [^long records-per-slice ^long min-single-ref-slice-size embed-reference?]
:or {records-per-slice 10000, min-single-ref-slice-size 1000}}]
(let [sort-by-coord? (= (sam.header/sort-order header) sam.header/order-coordinate)
;; CRAM files sorted by coord can easily get back from a multi-ref state
;; to a single-ref state. To support this, multi-ref slices should be kept
Expand All @@ -26,15 +27,17 @@

:unmapped
(if (< n-records records-per-slice)
(let [ref-state' (cond (= rname "*") ref-state
sort-by-coord? (throw
(ex-info
(str "Unmapped records "
"must be last")
{}))
:else :multi-ref)]
(.add slice-records aln)
(recur ref-state' more))
(if-let [ref-state' (cond (= rname "*") ref-state
sort-by-coord? (throw
(ex-info
(str "Unmapped records "
"must be last")
{}))
embed-reference? nil
:else :multi-ref)]
(do (.add slice-records aln)
(recur ref-state' more))
[ref-state slice-records alns])
[ref-state slice-records alns])

:multi-ref
Expand All @@ -52,7 +55,10 @@
;; slices, instead of creating and adding a new multi-ref slice
;; to that container, add the accumulated single-ref slice
;; to the container and add the current record to the next slice
(if (and empty-container? (< n-records min-single-ref-slice-size))
(if (and empty-container?
;; reference embedding is not compatible with multi-ref slices
(not embed-reference?)
(< n-records min-single-ref-slice-size))
(do (.add slice-records aln)
(recur :multi-ref more))
[ref-state slice-records alns]))))
Expand Down
7 changes: 5 additions & 2 deletions src/cljam/io/cram/encode/structure.clj
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,13 @@
(defn encode-compression-header-block
"Encodes a compression header block to the given OutputStream."
[out preservation-map subst-mat tag-dict ds-encodings tag-encodings]
(let [bs (with-out-byte-array
;; ensure that ds-encodings does not contain the encoding for the :embedded-ref
;; pseudo data series
(let [ds-encodings' (dissoc ds-encodings :embedded-ref)
bs (with-out-byte-array
(fn [out']
(encode-preservation-map out' preservation-map subst-mat tag-dict)
(encode-data-series-encodings out' ds-encodings)
(encode-data-series-encodings out' ds-encodings')
(encode-tag-encoding-map out' tag-encodings)))]
(encode-block out :raw 1 0 (alength bs) bs)))

Expand Down
119 changes: 79 additions & 40 deletions src/cljam/io/cram/writer.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns cljam.io.cram.writer
(:require [cljam.io.crai :as crai]
[cljam.io.cram.encode.alignment-stats :as stats]
[cljam.io.cram.encode.compressor :as compressor]
[cljam.io.cram.encode.context :as context]
[cljam.io.cram.encode.partitioning :as partition]
[cljam.io.cram.encode.record :as record]
Expand Down Expand Up @@ -41,46 +42,69 @@
(defn write-header
"Writes the CRAM header."
[^CRAMWriter wtr header]
(when (and (.-index-writer wtr)
(not (:skip-sort-order-check? (.-options wtr)))
(not= (sam.header/sort-order header) sam.header/order-coordinate))
(throw
(ex-info "Cannot create CRAM index file for CRAM file not declared as sorted by coordinate"
{:sort-order (sam.header/sort-order header)})))
(struct/encode-cram-header-container (.-stream wtr) header))
(let [opts (.-options wtr)
sort-order (sam.header/sort-order header)]
(when (and (not (= sort-order sam.header/order-coordinate))
(not (:skip-sort-order-check? opts)))
(when (.-index-writer wtr)
(throw
(ex-info (str "Cannot create CRAM index file for CRAM file not declared "
"as sorted by coordinate")
{:sort-order sort-order})))
(when (:embed-reference? opts)
(throw
(ex-info (str "Cannot embed reference sequences for CRAM file not declared "
"as sorted by coordinate")
{:sort-order sort-order}))))
(struct/encode-cram-header-container (.-stream wtr) header)))

(defn- preprocess-records
[cram-header seq-resolver options ^objects container-records]
(let [container-ctx (context/make-container-context cram-header seq-resolver)
{:keys [ds-compressor-overrides tag-compressor-overrides]} options
(let [container-ctx (context/make-container-context cram-header seq-resolver options)
stats (mapv (partial record/preprocess-slice-records container-ctx)
container-records)]
(context/finalize-container-context container-ctx
stats
ds-compressor-overrides
tag-compressor-overrides)))

(defn- generate-blocks [slice-ctx]
(->> (context/encoding-results slice-ctx)
(keep (fn [{:keys [^long raw-size] :as block}]
(when (pos? raw-size)
(update block :data
(fn [^bytes data]
(struct/generate-block (:compressor block) 4
(:content-id block) raw-size
data))))))
;; sort + dedupe by :content-id
(into (sorted-map) (map (juxt :content-id identity)))
vals
(cons {:content-id 0
:data (struct/generate-block :raw 5 0 0 (byte-array 0))})))

(defn- reference-md5
[{:keys [seq-resolver cram-header]} {:keys [^long ri ^long start ^long end]}]
(context/finalize-container-context container-ctx stats)))

(def ^:private ^:const embedded-ref-content-id 1)

(defn- generate-embedded-ref-block [slice-ctx ^bytes embedded-ref]
(let [raw-size (alength embedded-ref)
compr (compressor/compressor (get-in slice-ctx [:ds-encodings :embedded-ref :compressor]))
_ (with-open [^OutputStream os (compressor/compressor-output-stream compr)]
(.write os embedded-ref))
{:keys [compressor data]} (compressor/->compressed-result compr)]
{:content-id embedded-ref-content-id
:data (struct/generate-block compressor 4 embedded-ref-content-id raw-size data)}))

(defn- generate-blocks [slice-ctx ^bytes embedded-ref]
(letfn [(inject-embedded-ref-block [blocks]
(cond->> blocks
embedded-ref
(cons (generate-embedded-ref-block slice-ctx embedded-ref))))]
(->> (context/encoding-results slice-ctx)
(keep (fn [{:keys [^long raw-size] :as block}]
(when (pos? raw-size)
(update block :data
(fn [^bytes data]
(struct/generate-block (:compressor block) 4
(:content-id block) raw-size
data))))))
;; sort + dedupe by :content-id
(into (sorted-map) (map (juxt :content-id identity)))
vals
inject-embedded-ref-block
(cons {:content-id 0
:data (struct/generate-block :raw 5 0 0 (byte-array 0))}))))

(defn- resolve-reference-seq
^bytes [{:keys [seq-resolver cram-header]} {:keys [^long ri ^long start ^long end]}]
(let [chr (:SN (nth (:SQ cram-header) ri))]
(resolver/resolve-sequence seq-resolver chr start end)))

(defn- reference-md5 [slice-ctx ^bytes embedded-ref {:keys [^long ri] :as stats}]
(if (neg? ri)
(byte-array 16)
(let [chr (:SN (nth (:SQ cram-header) ri))
ref-bases (resolver/resolve-sequence seq-resolver chr start end)
(let [ref-bases (or embedded-ref (resolve-reference-seq slice-ctx stats))
md5 (MessageDigest/getInstance "md5")]
(.digest md5 ref-bases))))

Expand All @@ -94,11 +118,14 @@
(defn- generate-slice [slice-ctx counter slice-records]
(record/encode-slice-records slice-ctx slice-records)
(let [stats (:alignment-stats slice-ctx)
blocks (generate-blocks slice-ctx)
ref-md5 (reference-md5 slice-ctx stats)
embedded-ref (when (and (:embed-reference? (:options slice-ctx))
(>= (long (:ri stats)) 0))
(resolve-reference-seq slice-ctx stats))
blocks (generate-blocks slice-ctx embedded-ref)
ref-md5 (reference-md5 slice-ctx embedded-ref stats)
header (assoc (stats->header-base stats)
:counter counter
:embedded-reference -1
:embedded-reference (if embedded-ref embedded-ref-content-id -1)
:reference-md5 ref-md5)
header-block (struct/generate-slice-header-block header blocks)
block-data (mapv :data blocks)]
Expand All @@ -119,9 +146,20 @@
acc)))

(defn- generate-compression-header-block
^bytes [{:keys [preservation-map subst-mat tag-dict ds-encodings tag-encodings]}]
(struct/generate-compression-header-block preservation-map subst-mat tag-dict
ds-encodings tag-encodings))
^bytes
[{:keys [preservation-map subst-mat tag-dict ds-encodings tag-encodings]} slices]
(let [preservation-map' (cond-> preservation-map
;; A reference file is not required when either of
;; the following holds for every slice in the container:
;; - the slice has an embedded reference block
;; - the slice only contains unmapped records
(every? (fn [{:keys [header]}]
(or (>= (long (:embedded-reference header)) 0)
(= (long (:ref-seq-id header)) -1)))
slices)
(assoc :RR false))]
(struct/generate-compression-header-block preservation-map' subst-mat tag-dict
ds-encodings tag-encodings)))

(defn- generate-container-header [container-ctx ^bytes compression-header-block slices]
(let [stats (stats/merge-stats (:alignment-stats container-ctx))
Expand Down Expand Up @@ -175,7 +213,8 @@
(let [container-ctx (preprocess-records cram-header (.-seq-resolver wtr)
(.-options wtr) container-records)
slices (generate-slices container-ctx counter container-records)
compression-header-block (generate-compression-header-block container-ctx)
compression-header-block (generate-compression-header-block container-ctx
slices)
container-header (generate-container-header container-ctx
compression-header-block
slices)
Expand Down
Loading

0 comments on commit 03e8b21

Please sign in to comment.