Skip to content

Commit

Permalink
wip properly filter out flakes from before from-t
Browse files Browse the repository at this point in the history
History queries don't have a concept of "stale flakes" - they want all flakes between
`from-t` and `to-t`. The CachedTRangeResolver filters out "stale flakes" but _not_ all of
the flakes from before `from-t`.

This is a brute force remaking of the call stack so that we can get the correct behavior
from `query-range/time-range`.

<Co-authored-by: Marcela Poffald [email protected]>
  • Loading branch information
dpetran committed Feb 2, 2023
1 parent 639a460 commit bb576aa
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 28 deletions.
51 changes: 44 additions & 7 deletions src/fluree/db/index.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,12 @@
(defn after-t?
"Returns `true` if `flake` has a transaction value after the provided `t`"
[t flake]
(-> flake flake/t (< t)))
(< (flake/t flake) t))

(defn before-t?
"Returns `true` if `flake` has a transaction value after the provided `t`"
[t flake]
(> (flake/t flake) t))

(defn filter-after
"Returns a sequence containing only flakes from the flake set `flakes` with
Expand Down Expand Up @@ -235,14 +240,16 @@

(defn stale-by
"Returns a sequence of flakes from the sorted set `flakes` that are out of date
by the transaction `t` because `flakes` contains another flake with the same
subject and predicate and a transaction value later than that flake but on or
before `t`."
[t flakes]
by the transaction `from-t` because `flakes` contains another flake with the same
subject and predicate and a t-value later than that flake but on or before `from-t`."
[from-t flakes]
(->> flakes
(filter (complement (partial after-t? t)))
(remove (partial after-t? from-t))
(partition-by (juxt flake/s flake/p flake/o))
(mapcat (fn [flakes]
;; if the last flake for a subject/predicate/object combo is an assert,
;; then everything before that is stale (object is necessary for
;; multicardinality flakes)
(let [last-flake (last flakes)]
(if (flake/op last-flake)
(butlast flakes)
Expand All @@ -260,7 +267,37 @@
out-of-range (concat stale-flakes subsequent)]
(flake/disj-all latest out-of-range))))

(defn t-range2
"Returns a sorted set of flakes that are not out of date between the
transactions `from-t` and `to-t`."
([{:keys [flakes] leaf-t :t :as leaf} novelty from-t to-t]
(let [latest (cond-> flakes
(> leaf-t to-t)
(flake/conj-all (novelty-subrange leaf to-t novelty)))
;; flakes that happen after to-t
subsequent (filter-after to-t latest)
previous (filter (partial before-t? from-t) latest)
out-of-range (concat subsequent previous)]
(flake/disj-all latest out-of-range))))

(defrecord CachedTRangeResolver [node-resolver novelty from-t to-t async-cache]
Resolver
(resolve [_ {:keys [id tempid tt-id] :as node}]
(if (branch? node)
(resolve node-resolver node)
(async-cache
[::t-range id tempid tt-id from-t to-t]
(fn [_]
(go-try
(let [resolved (<? (resolve node-resolver node))
flakes (t-range resolved novelty from-t to-t)]
(-> resolved
(dissoc :t)
(assoc :from-t from-t
:to-t to-t
:flakes flakes)))))))))

(defrecord CachedTRangeResolver2 [node-resolver novelty from-t to-t async-cache]
Resolver
(resolve [_ {:keys [id tempid tt-id] :as node}]
(if (branch? node)
Expand All @@ -270,7 +307,7 @@
(fn [_]
(go-try
(let [resolved (<? (resolve node-resolver node))
flakes (t-range resolved novelty from-t to-t)]
flakes (t-range2 resolved novelty from-t to-t)]
(-> resolved
(dissoc :t)
(assoc :from-t from-t
Expand Down
29 changes: 28 additions & 1 deletion src/fluree/db/query/range.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,20 @@
query-xf (extract-query-flakes opts)]
(index/tree-chan resolver root in-range? resolved-leaf? 1 query-xf error-ch)))

(defn resolve-flake-slices2
"Returns a channel that will contain a stream of chunked flake collections that
contain the flakes between `start-flake` and `end-flake` and are within the
transaction range starting at `from-t` and ending at `to-t`."
[{:keys [async-cache] :as conn} root novelty error-ch
{:keys [from-t to-t start-flake end-flake] :as opts}]
(let [resolver (index/->CachedTRangeResolver2 conn novelty from-t to-t async-cache)
cmp (:comparator root)
range-set (flake/sorted-set-by cmp start-flake end-flake)
in-range? (fn [node]
(intersects-range? node range-set))
query-xf (extract-query-flakes opts)]
(index/tree-chan resolver root in-range? resolved-leaf? 1 query-xf error-ch)))

(defn unauthorized?
[f]
(= f ::unauthorized))
Expand Down Expand Up @@ -236,6 +250,19 @@
(filter-authorized db start-flake end-flake error-ch)
(into-page limit offset flake-limit))))

(defn index-range*2
"Return a channel that will eventually hold a sorted vector of the range of
flakes from `db` that meet the criteria specified in the `opts` map."
[{:keys [conn] :as db}
error-ch
{:keys [idx start-flake end-flake limit offset flake-limit] :as opts}]
(let [idx-root (get db idx)
idx-cmp (get-in db [:comparators idx])
novelty (get-in db [:novelty idx])]
(->> (resolve-flake-slices2 conn idx-root novelty error-ch opts)
(filter-authorized db start-flake end-flake error-ch)
(into-page limit offset flake-limit))))

(defn expand-range-interval
"Finds the full index or time range interval including the maximum and minimum
tests when only one test is provided"
Expand Down Expand Up @@ -285,7 +312,7 @@
(let [start-flake (apply resolve-match-flake start-test start-parts)
end-flake (apply resolve-match-flake end-test end-parts)
error-ch (chan)
range-ch (index-range* db
range-ch (index-range*2 db
error-ch
{:idx idx
:from-t from-t
Expand Down
54 changes: 34 additions & 20 deletions test/fluree/db/query/history_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@
conn (test-utils/create-conn)
ledger @(fluree/create conn "historytest" {:context {:ex "http://example.org/ns/"}})

db1 @(test-utils/transact ledger {:id :ex/dan
:ex/x "foo-1"
:ex/y "bar-1"})
ts1 (-> db1 :commit :time)
db1 @(test-utils/transact ledger [{:id :ex/dan
:ex/x "foo-1"
:ex/y "bar-1"}
{:id :ex/cat
:ex/x "foo-1"
:ex/y "bar-1"}
{:id :ex/dog
:ex/x "foo-1"
:ex/y "bar-1"}])
db2 @(test-utils/transact ledger {:id :ex/dan
:ex/x "foo-2"
:ex/y "bar-2"})
ts2 (-> db2 :commit :time)
db3 @(test-utils/transact ledger {:id :ex/dan
:ex/x "foo-3"
:ex/y "bar-3"})
Expand Down Expand Up @@ -79,14 +85,17 @@
{:f/t 4
:f/assert [{:ex/x "foo-dog" :id :ex/dog}
{:ex/x "foo-cat" :id :ex/cat}]
:f/retract []}
:f/retract [{:ex/x "foo-1", :id :ex/dog}
{:ex/x "foo-1", :id :ex/cat}]}
{:f/t 3
:f/assert [{:ex/x "foo-3" :id :ex/dan}]
:f/retract [{:ex/x "foo-2" :id :ex/dan}]}
{:f/t 2
:f/assert [{:ex/x "foo-2" :id :ex/dan}]
:f/retract [{:ex/x "foo-1" :id :ex/dan}]}
{:f/t 1 :f/assert [{:ex/x "foo-1" :id :ex/dan}]
{:f/t 1 :f/assert [{:ex/x "foo-1" :id :ex/dog}
{:ex/x "foo-1" :id :ex/cat}
{:ex/x "foo-1" :id :ex/dan}]
:f/retract []}]
@(fluree/history ledger {:history [nil :ex/x]}))))
(testing "three-tuple flake history"
Expand All @@ -104,13 +113,17 @@
@(fluree/history ledger {:history [:ex/dan :ex/x "foo-cat"]}))))

(testing "at-t"
(is (= [{:f/t 3 :f/assert [{:ex/x "foo-3" :id :ex/dan}] :f/retract []}]
(is (= [{:f/t 3
:f/assert [{:ex/x "foo-3" :id :ex/dan}]
:f/retract [{:ex/x "foo-2", :id :ex/dan}]}]
@(fluree/history ledger {:history [:ex/dan :ex/x] :t {:from 3 :to 3}}))))
(testing "from-t"
(is (= [{:f/t 5
:f/assert [{:ex/x "foo-cat" :id :ex/dan}]
:f/retract [{:ex/x "foo-3" :id :ex/dan}]}
{:f/t 3 :f/assert [{:ex/x "foo-3" :id :ex/dan}] :f/retract []}]
{:f/t 3
:f/assert [{:ex/x "foo-3" :id :ex/dan}]
:f/retract [{:ex/x "foo-2" :id :ex/dan}]}]
@(fluree/history ledger {:history [:ex/dan :ex/x] :t {:from 3}}))))
(testing "to-t"
(is (= [{:f/t 3
Expand All @@ -122,24 +135,25 @@
{:f/t 1 :f/assert [{:ex/x "foo-1" :id :ex/dan}] :f/retract []}]
@(fluree/history ledger {:history [:ex/dan :ex/x] :t {:to 3}}))))
(testing "t-range"
(is (= [{:f/t 3
:f/assert [{:ex/x "foo-3" :id :ex/dan}]
:f/retract [{:ex/x "foo-2" :id :ex/dan}]}
{:f/t 2
:f/assert [{:ex/x "foo-2" :id :ex/dan}]
:f/retract [{:ex/x "foo-1" :id :ex/dan}]}
{:f/t 1 :f/assert [{:ex/x "foo-1" :id :ex/dan}] :f/retract []}]
@(fluree/history ledger {:history [:ex/dan :ex/x] :t {:from 1 :to 3}}))))
(is (= [{:f/t 4
:f/assert [{:ex/x "foo-dog" :id :ex/dog} {:ex/x "foo-cat" :id :ex/cat}]
:f/retract [{:ex/x "foo-1" :id :ex/dog} {:ex/x "foo-1" :id :ex/cat}]}
{:f/t 3
:f/assert [{:ex/x "foo-3" :id :ex/dan}]
:f/retract [{:ex/x "foo-2" :id :ex/dan}]}
{:f/t 2
:f/assert [{:ex/x "foo-2" :id :ex/dan}]
:f/retract [{:ex/x "foo-1" :id :ex/dan}]}]
@(fluree/history ledger {:history [nil :ex/x] :t {:from 2 :to 4}}))))
(testing "datetime-t"
(is (= [{:f/t 3
:f/assert [{:ex/x "foo-3" :id :ex/dan}]
:f/retract [{:ex/x "foo-2" :id :ex/dan}]}
{:f/t 2
:f/assert [{:ex/x "foo-2" :id :ex/dan}]
:f/retract [{:ex/x "foo-1" :id :ex/dan}]}
{:f/t 1 :f/assert [{:ex/x "foo-1" :id :ex/dan}] :f/retract []}]
@(fluree/history ledger {:history [nil :ex/x] :t {:from ts1 :to ts3}}))
"does not include t 4 or 5")
:f/retract [{:ex/x "foo-1" :id :ex/dan}]}]
@(fluree/history ledger {:history [nil :ex/x] :t {:from ts2 :to ts3}}))
"does not include t 1, 4, or 5")
(is (= [{:f/t 5 :f/assert [{:ex/x "foo-cat" :id :ex/dan}] :f/retract []}]
@(fluree/history ledger {:history [:ex/dan :ex/x] :t {:from (util/current-time-iso)}}))
"timestamp translates to first t before ts")
Expand Down

0 comments on commit bb576aa

Please sign in to comment.