Skip to content

Commit

Permalink
Support CRAI index and CRAM random access
Browse files Browse the repository at this point in the history
  • Loading branch information
athos committed Apr 24, 2024
1 parent 4699b7f commit be82cb6
Show file tree
Hide file tree
Showing 13 changed files with 290 additions and 74 deletions.
29 changes: 29 additions & 0 deletions src/cljam/io/crai.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
(ns cljam.io.crai
(:require [cljam.util :as util]
[cljam.util.intervals :as intervals]
[clojure.java.io :as io]
[clojure.string :as str]))

(defn read-index
"Reads a CRAI file `f` and creates an index."
[f refs]
(let [refs (vec refs)]
(with-open [rdr (io/reader (util/compressor-input-stream f))]
(->> (line-seq rdr)
(map (fn [line]
(let [[^long seq-id ^long start ^long span container-offset slice-offset size]
(map #(Long/parseLong %) (str/split line #"\t"))
unmapped? (neg? seq-id)]
{:chr (if unmapped? "*" (:name (nth refs seq-id)))
:start (if unmapped? 0 start)
:end (if unmapped? 0 (+ start span))
:container-offset container-offset
:slice-offset slice-offset
:size size})))
intervals/index-intervals))))

(defn find-overlapping-entries
"Finds and returns all entries from the index that overlap with the specified
region."
[idx chr start end]
(intervals/find-overlap-intervals idx chr start end))
11 changes: 10 additions & 1 deletion src/cljam/io/cram.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns cljam.io.cram
"Alpha - subject to change. Provides functions for reading from a CRAM file."
(:refer-clojure :exclude [indexed?])
(:require [cljam.io.cram.core :as cram]
[cljam.io.protocols :as protocols]
[cljam.io.util :as io-util])
Expand Down Expand Up @@ -36,5 +37,13 @@
(defn read-alignments
"Reads all the alignments from the CRAM file and returns them as a lazy sequence
of record maps."
([rdr]
(protocols/read-alignments rdr))
([rdr region]
(protocols/read-alignments rdr region)))

(defn indexed?
"Returns true if the reader can be randomly accessed, false if not. Note this
function immediately realizes a delayed index."
[rdr]
(protocols/read-alignments rdr))
(protocols/indexed? rdr))
7 changes: 5 additions & 2 deletions src/cljam/io/cram/core.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns cljam.io.cram.core
(:require [cljam.io.cram.seq-resolver :as resolver]
(:require [cljam.io.crai :as crai]
[cljam.io.cram.seq-resolver :as resolver]
[cljam.io.cram.reader :as reader.core]
[cljam.io.sam.util.refs :as util.refs]
[cljam.io.util.byte-buffer :as bb]
Expand All @@ -23,7 +24,8 @@
seq-resolver (some-> reference resolver/seq-resolver)
header (volatile! nil)
refs (delay (util.refs/make-refs @header))
rdr (reader.core/->CRAMReader url ch bb header refs seq-resolver)]
idx (delay (crai/read-index (str f ".crai") @refs))
rdr (reader.core/->CRAMReader url ch bb header refs idx seq-resolver)]
(reader.core/read-file-definition rdr)
(vreset! header (reader.core/read-header rdr))
rdr))
Expand All @@ -40,6 +42,7 @@
rdr' (reader.core/->CRAMReader url ch bb
(delay @(.-header rdr))
(delay @(.-refs rdr))
(delay @(.-index rdr))
seq-resolver)]
(reader.core/read-file-definition rdr')
(reader.core/skip-container rdr')
Expand Down
17 changes: 8 additions & 9 deletions src/cljam/io/cram/decode/record.clj
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@
(+ (long (:pos record)))))

(defn- record-seq
[seq-resolver {:keys [preservation-map]} {:keys [rname pos] :as record} features]
(let [region {:chr rname :start pos :end (::end record)}
[seq-resolver {:keys [preservation-map]} {:keys [rname pos end] :as record} features]
(let [region {:chr rname :start pos :end end}
ref-bases (.getBytes ^String (resolver/resolve-sequence seq-resolver region))
len (long (::len record))
bs (byte-array len (byte (int \N)))
Expand Down Expand Up @@ -243,7 +243,7 @@
(fn [record]
(let [fs (features-decoder)
end (record-end record fs)
record' (assoc record ::end end)]
record' (assoc record :end end)]
(assoc record'
:mapq (MQ)
:seq (record-seq seq-resolver compression-header record' fs)
Expand All @@ -262,7 +262,8 @@
record' (assoc record
:seq (String. (.array bb))
:mapq 0
:cigar "*")]
:cigar "*"
:end (:pos record))]
(if (zero? (bit-and flag 0x01))
record'
(assoc record' :qual (decode-qual len QS))))))
Expand Down Expand Up @@ -310,10 +311,8 @@
:pnext (:pos mate)))

(defn- update-mate-records
[{^long s1 :pos :as r1} {^long s2 :pos :as r2}]
(let [e1 (long (::end r1))
e2 (long (::end r2))
r1' (update-next-mate r1 r2)
[{^long s1 :pos ^long e1 :end :as r1} {^long s2 :pos ^long e2 :end :as r2}]
(let [r1' (update-next-mate r1 r2)
r2' (update-next-mate r2 r1)]
(if (or (sam.flag/unmapped? (:flag r1))
(sam.flag/unmapped? (:flag r2))
Expand Down Expand Up @@ -353,4 +352,4 @@
(dotimes [i n]
(aset records i (record-decoder)))
(resolve-mate-records records)
(map #(dissoc % ::flag ::len ::end ::next-fragment) records)))
(map #(dissoc % ::flag ::len ::next-fragment) records)))
71 changes: 53 additions & 18 deletions src/cljam/io/cram/reader.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
(:require [cljam.io.cram.decode.data-series :as ds]
[cljam.io.cram.decode.record :as record]
[cljam.io.cram.decode.structure :as struct]
[cljam.io.protocols :as protocols])
(:import [java.io Closeable]
[cljam.io.protocols :as protocols]
[cljam.util.intervals :as intervals])
(:import [java.io Closeable FileNotFoundException]
[java.nio Buffer ByteBuffer ByteOrder]
[java.nio.channels FileChannel FileChannel$MapMode]))

(declare read-alignments)
(declare read-alignments read-alignments-in-region)

(deftype CRAMReader [url channel buffer header refs seq-resolver]
(deftype CRAMReader [url channel buffer header refs index seq-resolver]
Closeable
(close [_]
(when seq-resolver
Expand All @@ -19,20 +20,29 @@
(reader-url [_] url)
(read [this]
(protocols/read-alignments this))
#_(read [_ option])
(indexed? [_] false)
(read [this region]
(protocols/read-alignments this region))
(indexed? [_]
(try
@index
true
(catch FileNotFoundException _
false)))
protocols/IAlignmentReader
(read-header [_] @header)
(read-refs [_] @refs)
(read-alignments [this]
(read-alignments this))
#_(read-alignments [_ region])
(read-alignments [this region]
(read-alignments-in-region this region))
#_(read-blocks [_])
#_(read-blocks [_ region])
#_(read-blocks [_ region option])
#_protocols/IRegionReader
#_(read-in-region [_ region])
#_(read-in-region [_ region option]))
protocols/IRegionReader
(read-in-region [this region]
(protocols/read-in-region this region {}))
(read-in-region [this region _]
(protocols/read-alignments this region)))

(defn- read-to-buffer
([rdr] (read-to-buffer rdr nil))
Expand Down Expand Up @@ -65,14 +75,19 @@
ds-decoders
tag-decoders)))

(defn- read-container-records [^CRAMReader rdr ^ByteBuffer bb container-header]
(let [container-header-end (.position bb)
compression-header (struct/decode-compression-header-block bb)]
(->> (:landmarks container-header)
(mapcat
(fn [^long landmark]
(.position ^Buffer bb (+ container-header-end landmark))
(read-slice-records rdr bb compression-header))))))
(defn- read-container-records
([rdr bb container-header]
(read-container-records rdr bb container-header nil))
([^CRAMReader rdr ^ByteBuffer bb container-header idx-entries]
(let [container-header-end (.position bb)
compression-header (struct/decode-compression-header-block bb)]
(->> (if (seq idx-entries)
(map :slice-offset idx-entries)
(:landmarks container-header))
(mapcat
(fn [^long landmark]
(.position ^Buffer bb (+ container-header-end landmark))
(read-slice-records rdr bb compression-header)))))))

(defn- with-next-container-header [^CRAMReader rdr f]
(let [^FileChannel ch (.-channel rdr)
Expand Down Expand Up @@ -119,3 +134,23 @@
(when-let [alns (read-container-with rdr read1)]
(concat alns (lazy-seq (step))))))]
(step))))

(defn- read-alignments-in-region
[^CRAMReader rdr {:keys [chr start end] :or {start 0 end Long/MAX_VALUE}}]
(let [^FileChannel ch (.-channel rdr)
idx @(.-index rdr)
offset->entries (->> (intervals/find-overlap-intervals idx chr start end)
(group-by :container-offset)
(into (sorted-map)))]
(letfn [(read-fn [entries]
(fn [container-header bb]
(read-container-records rdr bb container-header entries)))
(step [[[^long offset entries] & more]]
(when offset
(.position ch offset)
(concat (read-container-with rdr (read-fn entries))
(lazy-seq (step more)))))]
(filter #(and (= (:rname %) chr)
(<= (long (:pos %)) (long end))
(<= (long start) (long (:end %))))
(step offset->entries)))))
1 change: 1 addition & 0 deletions src/cljam/io/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
#"(?i)\.sam$" :sam
#"(?i)\.bai$" :bai
#"(?i)\.bam$" :bam
#"(?i)\.crai$" :crai
#"(?i)\.cram$" :cram
#"(?i)\.f(ast)?q" :fastq
#"(?i)\.fai$" :fai
Expand Down
Binary file modified test-resources/cram/medium.cram
Binary file not shown.
Binary file added test-resources/cram/medium.cram.crai
Binary file not shown.
118 changes: 118 additions & 0 deletions test/cljam/io/crai_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
(ns cljam.io.crai-test
(:require [cljam.io.crai :as crai]
[cljam.test-common :as common]
[clojure.test :refer [deftest are]]))

(def ^:private test-refs
(->> (concat (range 1 23) ["X" "Y"])
(mapv #(array-map :name (str "chr" %)))))

(deftest read-index-test
(let [idx (crai/read-index common/medium-crai-file test-refs)]
(are [?chr ?start ?end ?expected]
(= ?expected (crai/find-overlapping-entries idx ?chr ?start ?end))
"chr1" 1 Long/MAX_VALUE
[{:chr "chr1"
:start 546609
:end (+ 546609 205262429)
:container-offset 324
:slice-offset 563
:size 22007}
{:chr "chr1"
:start 206547069
:end (+ 206547069 42644506)
:container-offset 324
:slice-offset 22570
:size 7349}]

"chr1" 550000 600000
[{:chr "chr1"
:start 546609
:end (+ 546609 205262429)
:container-offset 324
:slice-offset 563
:size 22007}]

"chr1" 210000000 240000000
[{:chr "chr1"
:start 206547069
:end (+ 206547069 42644506)
:container-offset 324
:slice-offset 22570
:size 7349}]

"chr1" 200000000 210000000
[{:chr "chr1"
:start 546609
:end (+ 546609 205262429)
:container-offset 324
:slice-offset 563
:size 22007}
{:chr "chr1"
:start 206547069
:end (+ 206547069 42644506)
:container-offset 324
:slice-offset 22570
:size 7349}]

"*" 0 0
[{:chr "*"
:start 0
:end 0
:container-offset 354657
:slice-offset 563
:size 23119}
{:chr "*"
:start 0
:end 0
:container-offset 378365
:slice-offset 171
:size 23494}
{:chr "*"
:start 0
:end 0
:container-offset 378365
:slice-offset 23665
:size 23213}
{:chr "*"
:start 0
:end 0
:container-offset 378365
:slice-offset 46878
:size 23051}
{:chr "*"
:start 0
:end 0
:container-offset 378365
:slice-offset 69929
:size 23563}
{:chr "*"
:start 0
:end 0
:container-offset 378365
:slice-offset 93492
:size 24231}
{:chr "*"
:start 0
:end 0
:container-offset 378365
:slice-offset 117723
:size 24078}
{:chr "*"
:start 0
:end 0
:container-offset 378365
:slice-offset 141801
:size 23871}
{:chr "*"
:start 0
:end 0
:container-offset 378365
:slice-offset 165672
:size 24365}
{:chr "*"
:start 0
:end 0
:container-offset 378365
:slice-offset 190037
:size 12326}])))
Loading

0 comments on commit be82cb6

Please sign in to comment.