Skip to content

Commit

Permalink
Merge pull request #362 from fluree/feature/history-query-improvements
Browse files Browse the repository at this point in the history
history query improvements
  • Loading branch information
dpetran authored Feb 6, 2023
2 parents f877077 + 52e1d54 commit 10cec7e
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 116 deletions.
2 changes: 1 addition & 1 deletion src/data_readers.clj
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{Flake fluree.db.flake/parts->Flake}
{Flake fluree.db.flake/parts->Flake}
98 changes: 68 additions & 30 deletions src/fluree/db/api/query.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
[fluree.json-ld :as json-ld]
[fluree.db.db.json-ld :as jld-db]
[malli.core :as m]
[fluree.db.util.log :as log]))
[fluree.db.util.log :as log]
[fluree.db.constants :as const]
[fluree.db.datatype :as datatype]))

#?(:clj (set! *warn-on-reflection* true))

Expand All @@ -36,38 +38,66 @@
(re-find #"[a-z0-9]+/[a-z0-9]+" ledger-id))


(defn t-flakes->json-ld
[db compact cache fuel error-ch t-flakes]

(defn s-flakes->json-ld
[db cache compact fuel error-ch s-flakes]
(async/go
(try*
(let [assert-flakes (not-empty (filter flake/op t-flakes))
retract-flakes (not-empty (filter (complement flake/op) t-flakes))

asserts-chan (json-ld-resp/flakes->res db cache compact fuel 1000000
{:wildcard? true, :depth 0}
0 assert-flakes)
retracts-chan (json-ld-resp/flakes->res db cache compact fuel 1000000
{:wildcard? true, :depth 0}
0 retract-flakes)

asserts (<? asserts-chan)
retracts (<? retracts-chan)

;; t is always positive for users
result (cond-> {:t (- (flake/t (first t-flakes)))}
asserts (assoc :assert asserts)
retracts (assoc :retract retracts))]
result)
(let [json-chan (json-ld-resp/flakes->res db cache compact fuel 1000000
{:wildcard? true, :depth 0}
0 s-flakes)]
(-> (<? json-chan)
;; add the id in case the iri flake isn't present in s-flakes
(assoc :id (json-ld/compact (<? (dbproto/-iri db (flake/s (first s-flakes)))) compact))))
(catch* e
(log/error e "Error converting history flakes.")
(async/>! error-ch e)))))

(defn t-flakes->json-ld
[db compact cache fuel error-ch t-flakes]
(go-try
(let [{assert-flakes true
retract-flakes false} (group-by flake/op t-flakes)

s-flake-partitions (fn [flakes]
(->> flakes
(group-by flake/s)
(vals)
(async/to-chan!)))

s-asserts-ch (s-flake-partitions assert-flakes)
s-retracts-ch (s-flake-partitions retract-flakes)

s-asserts-out-ch (async/chan)
s-retracts-out-ch (async/chan)

s-asserts-json-ch (async/into [] s-asserts-out-ch)
s-retracts-json-ch (async/into [] s-retracts-out-ch)]
;; process asserts
(async/pipeline-async 2
s-asserts-out-ch
(fn [assert-flakes ch]
(-> (s-flakes->json-ld db cache compact fuel error-ch assert-flakes)
(async/pipe ch)))
s-asserts-ch)
;; process retracts
(async/pipeline-async 2
s-retracts-out-ch
(fn [retract-flakes ch]
(-> (s-flakes->json-ld db cache compact fuel error-ch retract-flakes)
(async/pipe ch)))
s-retracts-ch)
{(json-ld/compact const/iri-t compact) (- (flake/t (first t-flakes)))
(json-ld/compact const/iri-assert compact) (async/<! s-asserts-json-ch)
(json-ld/compact const/iri-retract compact) (async/<! s-retracts-json-ch)})))

(defn history-flakes->json-ld
[db q flakes]
(go-try
(let [fuel (volatile! 0)
cache (volatile! {})
compact (json-ld/compact-fn (fql-parse/parse-context q db))
context (fql-parse/parse-context q db)
compact (json-ld/compact-fn context)

error-ch (async/chan)
out-ch (async/chan)
Expand Down Expand Up @@ -122,12 +152,16 @@
[:t {:optional true}
[:and
[:map
[:from {:optional true} pos-int?]
[:to {:optional true} pos-int?]]
[:from {:optional true} [:or
pos-int?
datatype/iso8601-datetime-re]]
[:to {:optional true} [:or
pos-int?
datatype/iso8601-datetime-re]]]
[:fn {:error/message "Either \"from\" or \"to\" `t` keys must be provided."}
(fn [{:keys [from to]}] (or from to))]
[:fn {:error/message "\"from\" value must be less than or equal to \"to\" value."}
(fn [{:keys [from to]}] (if (and from to)
(fn [{:keys [from to]}] (if (and from to (number? from) (number? to))
(<= from to)
true))]]]])

Expand Down Expand Up @@ -172,11 +206,15 @@
[pattern idx] (get-history-pattern query)

;; from and to are positive ints, need to convert to negative or fill in default values
{:keys [from to]} t
[from-t to-t] [(if from (- from) -1) (if to (- to) (:t db))]

flakes (<? (query-range/time-range db idx = pattern {:from-t from-t :to-t to-t}))
results (<? (history-flakes->json-ld db query-map flakes))]
{:keys [from to]} t
[from-t to-t] [(cond (string? from) (<? (time-travel/datetime->t db from))
(number? from) (- from)
:else -1)
(cond (string? to) (<? (time-travel/datetime->t db to))
(number? to) (- to)
:else (:t db))]
flakes (<? (query-range/time-range db idx = pattern {:from-t from-t :to-t to-t}))
results (<? (history-flakes->json-ld db query-map flakes))]
results))))

(defn query
Expand Down
53 changes: 45 additions & 8 deletions src/fluree/db/index.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,12 @@
(defn after-t?
"Returns `true` if `flake` has a transaction value after the provided `t`"
[t flake]
(-> flake flake/t (< t)))
(< (flake/t flake) t))

(defn before-t?
"Returns `true` if `flake` has a transaction value before the provided `t`"
[t flake]
(> (flake/t flake) t))

(defn filter-after
"Returns a sequence containing only flakes from the flake set `flakes` with
Expand Down Expand Up @@ -235,14 +240,16 @@

(defn stale-by
"Returns a sequence of flakes from the sorted set `flakes` that are out of date
by the transaction `t` because `flakes` contains another flake with the same
subject and predicate and a transaction value later than that flake but on or
before `t`."
[t flakes]
by the transaction `from-t` because `flakes` contains another flake with the same
subject and predicate and a t-value later than that flake but on or before `from-t`."
[from-t flakes]
(->> flakes
(filter (complement (partial after-t? t)))
(remove (partial after-t? from-t))
(partition-by (juxt flake/s flake/p flake/o))
(mapcat (fn [flakes]
;; if the last flake for a subject/predicate/object combo is an assert,
;; then everything before that is stale (object is necessary for
;; multicardinality flakes)
(let [last-flake (last flakes)]
(if (flake/op last-flake)
(butlast flakes)
Expand All @@ -266,11 +273,41 @@
(if (branch? node)
(resolve node-resolver node)
(async-cache
[::t-range id tempid tt-id from-t to-t]
[::t-range id tempid tt-id from-t to-t]
(fn [_]
(go-try
(let [resolved (<? (resolve node-resolver node))
flakes (t-range resolved novelty from-t to-t)]
(-> resolved
(dissoc :t)
(assoc :from-t from-t
:to-t to-t
:flakes flakes)))))))))

(defn history-t-range
"Returns a sorted set of flakes between the transactions `from-t` and `to-t`."
([{:keys [flakes] leaf-t :t :as leaf} novelty from-t to-t]
(let [latest (cond-> flakes
(> leaf-t to-t)
(flake/conj-all (novelty-subrange leaf to-t novelty)))
;; flakes that happen after to-t
subsequent (filter-after to-t latest)
;; flakes that happen before from-t
previous (filter (partial before-t? from-t) latest)
out-of-range (concat subsequent previous)]
(flake/disj-all latest out-of-range))))

(defrecord CachedHistoryRangeResolver [node-resolver novelty from-t to-t async-cache]
Resolver
(resolve [_ {:keys [id tempid tt-id] :as node}]
(if (branch? node)
(resolve node-resolver node)
(async-cache
[::history-t-range id tempid tt-id from-t to-t]
(fn [_]
(go-try
(let [resolved (<? (resolve node-resolver node))
flakes (t-range resolved novelty from-t to-t)]
flakes (history-t-range resolved novelty from-t to-t)]
(-> resolved
(dissoc :t)
(assoc :from-t from-t
Expand Down
62 changes: 32 additions & 30 deletions src/fluree/db/query/range.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,6 @@
(defn time-range
"Range query across an index.
Uses a DB, but in the future support supplying a connection and db name, as we
don't need a 't'
Ranges take the natural numeric sort orders, but all results will return in
reverse order (newest subjects and predicates first).
Expand All @@ -264,42 +261,47 @@
:to-t - stop transaction - can be null, which pulls full history
:xform - xform applied to each result individually. This is not used
when :chan is supplied.
:limit - max number of flakes to return"
([db idx] (time-range db idx {}))
([db idx opts] (time-range db idx >= (min-match idx) <= (max-match idx) opts))
([db idx test match] (time-range db idx test match {}))
:flake-limit - max number of flakes to return"
([db idx test match opts]
(let [[start-test start-match end-test end-match]
(expand-range-interval idx test match)]
(time-range db idx start-test start-match end-test end-match opts)))
([db idx start-test start-match end-test end-match]
(time-range db idx start-test start-match end-test end-match {}))
([{t :t :as db} idx start-test start-match end-test end-match opts]
(let [{:keys [limit from-t to-t]
([{:keys [t conn ] :as db} idx start-test start-match end-test end-match opts]
(let [{:keys [limit offset flake-limit from-t to-t]
:or {from-t t, to-t t}}
opts

start-parts (match->flake-parts db idx start-match)
end-parts (match->flake-parts db idx end-match)]
end-parts (match->flake-parts db idx end-match)

start-flake (apply resolve-match-flake start-test start-parts)
end-flake (apply resolve-match-flake end-test end-parts)
error-ch (chan)

;; index-range*
idx-root (get db idx)
idx-cmp (get-in db [:comparators idx])
novelty (get-in db [:novelty idx])

;; resolve-flake-slices
resolver (index/->CachedHistoryRangeResolver conn novelty from-t to-t (:async-cache conn))
cmp (:comparator idx-root)
range-set (flake/sorted-set-by cmp start-flake end-flake)
in-range? (fn [node] (intersects-range? node range-set))
query-xf (extract-query-flakes {:idx idx
:start-test start-test
:start-flake start-flake
:end-test end-test
:end-flake end-flake})]
(go-try
(let [start-flake (apply resolve-match-flake start-test start-parts)
end-flake (apply resolve-match-flake end-test end-parts)
error-ch (chan)
range-ch (index-range* db
error-ch
{:idx idx
:from-t from-t
:to-t to-t
:start-test start-test
:start-flake start-flake
:end-test end-test
:end-flake end-flake
:flake-limit limit})]
(async/alt!
error-ch ([e]
(throw e))
range-ch ([hist-range]
hist-range)))))))
(let [history-ch (->> (index/tree-chan resolver idx-root in-range? resolved-leaf? 1 query-xf error-ch)
(filter-authorized db start-flake end-flake error-ch)
(into-page limit offset flake-limit))]
(async/alt!
error-ch ([e]
(throw e))
history-ch ([hist-range]
hist-range)))))))

(defn index-range
"Range query across an index as of a 't' defined by the db.
Expand Down
Loading

0 comments on commit 10cec7e

Please sign in to comment.