diff --git a/dev/json_ld/shacl.clj b/dev/json_ld/shacl.clj index 397a3b15e..7a058b59d 100644 --- a/dev/json_ld/shacl.clj +++ b/dev/json_ld/shacl.clj @@ -18,7 +18,6 @@ [fluree.db.util.log :as log] [fluree.db.index :as index] [criterium.core :as criterium] - [clojure.data.avl :as avl] [clojure.tools.reader.edn :as edn])) diff --git a/dev/user.clj b/dev/user.clj index a8cfc0d44..bac062570 100644 --- a/dev/user.clj +++ b/dev/user.clj @@ -10,7 +10,7 @@ [fluree.db.flake :as flake] [fluree.db.util.json :as json] [fluree.db.serde.json :as serdejson] - [fluree.db.storage.core :as storage] + [fluree.db.storage :as storage] [fluree.db.query.fql :as fql] [fluree.db.query.range :as query-range] [fluree.db.dbproto :as dbproto] diff --git a/src/fluree/db/conn/file.cljc b/src/fluree/db/conn/file.cljc index be7919c43..d3b7f0318 100644 --- a/src/fluree/db/conn/file.cljc +++ b/src/fluree/db/conn/file.cljc @@ -10,7 +10,7 @@ [fluree.db.conn.cache :as conn-cache] [fluree.db.conn.state-machine :as state-machine] [fluree.db.util.log :as log :include-macros true] - [fluree.db.storage.core :as storage] + [fluree.db.storage :as storage] [fluree.db.indexer.default :as idx-default] [fluree.db.serde.json :refer [json-serde]] #?@(:cljs [["fs" :as fs] @@ -255,7 +255,7 @@ [conn {:keys [id leaf tempid] :as node}] (let [cache-key [::resolve id tempid]] (if (= :empty id) - (storage/resolve-empty-leaf node) + (storage/resolve-empty-node node) (conn-cache/lru-lookup lru-cache-atom cache-key diff --git a/src/fluree/db/conn/ipfs.cljc b/src/fluree/db/conn/ipfs.cljc index 9dbcf6675..392260162 100644 --- a/src/fluree/db/conn/ipfs.cljc +++ b/src/fluree/db/conn/ipfs.cljc @@ -1,5 +1,5 @@ (ns fluree.db.conn.ipfs - (:require [fluree.db.storage.core :as storage] + (:require [fluree.db.storage :as storage] [fluree.db.index :as index] [fluree.db.util.context :as ctx-util] [fluree.db.util.core :as util :refer [exception?]] @@ -131,7 +131,7 @@ [conn {:keys [id leaf tempid] :as node}] (let [cache-key [::resolve id tempid]] (if (= :empty id) - (storage/resolve-empty-leaf node) + (storage/resolve-empty-node node) (conn-cache/lru-lookup lru-cache-atom cache-key diff --git a/src/fluree/db/conn/memory.cljc b/src/fluree/db/conn/memory.cljc index a4b06809f..09234a045 100644 --- a/src/fluree/db/conn/memory.cljc +++ b/src/fluree/db/conn/memory.cljc @@ -1,6 +1,6 @@ (ns fluree.db.conn.memory (:require [clojure.core.async :as async :refer [go]] - [fluree.db.storage.core :as storage] + [fluree.db.storage :as storage] [fluree.db.index :as index] [fluree.db.util.context :as ctx-util] [fluree.db.util.core :as util] @@ -163,7 +163,7 @@ [_ node] ;; all root index nodes will be empty - (storage/resolve-empty-leaf node)) + (storage/resolve-empty-node node)) #?@(:clj [full-text/IndexConnection diff --git a/src/fluree/db/conn/s3.clj b/src/fluree/db/conn/s3.clj index 5de9f3d69..db498a475 100644 --- a/src/fluree/db/conn/s3.clj +++ b/src/fluree/db/conn/s3.clj @@ -12,7 +12,7 @@ [fluree.db.indexer.default :as idx-default] [fluree.db.ledger.proto :as ledger-proto] [fluree.db.serde.json :refer [json-serde]] - [fluree.db.storage.core :as storage] + [fluree.db.storage :as storage] [fluree.db.util.context :as ctx-util] [fluree.db.util.json :as json] [fluree.db.util.log :as log] @@ -224,7 +224,7 @@ (resolve [conn {:keys [id leaf tempid] :as node}] (let [cache-key [::resolve id tempid]] (if (= :empty id) - (storage/resolve-empty-leaf node) + (storage/resolve-empty-node node) (conn-cache/lru-lookup lru-cache-atom cache-key (fn [_] (storage/resolve-index-node diff --git a/src/fluree/db/db/json_ld.cljc b/src/fluree/db/db/json_ld.cljc index 6827b9c74..60df88b46 100644 --- a/src/fluree/db/db/json_ld.cljc +++ b/src/fluree/db/db/json_ld.cljc @@ -313,11 +313,11 @@ opst-cmp :opst tspo-cmp :tspo} index/default-comparators - spot (index/empty-branch method alias spot-cmp) - psot (index/empty-branch method alias psot-cmp) - post (index/empty-branch method alias post-cmp) - opst (index/empty-branch method alias opst-cmp) - tspo (index/empty-branch method alias tspo-cmp) + spot (index/empty-branch alias spot-cmp) + psot (index/empty-branch alias psot-cmp) + post (index/empty-branch alias post-cmp) + opst (index/empty-branch alias opst-cmp) + tspo (index/empty-branch alias tspo-cmp) stats {:flakes 0, :size 0, :indexed 0} schema (vocab/base-schema) branch (branch/branch-meta ledger) @@ -327,7 +327,7 @@ (map->JsonLdDb {:ledger ledger :conn conn :method method - :alias alias + :ledger-alias alias :branch (:name branch) :commit (:commit branch) :t 0 diff --git a/src/fluree/db/flake.cljc b/src/fluree/db/flake.cljc index 7a3f080b1..cb191d06a 100644 --- a/src/fluree/db/flake.cljc +++ b/src/fluree/db/flake.cljc @@ -61,6 +61,18 @@ (def ^:const MIN-PREDICATE-ID (min-subject-id const/$_predicate)) (def ^:const MAX-PREDICATE-ID (max-subject-id const/$_predicate)) +(def ^:const min-s util/max-long) +(def ^:const max-s util/min-long) +(def ^:const min-p 0) +(def ^:const max-p MAX-PREDICATE-ID) +(def ^:const min-dt util/min-integer) +(def ^:const max-dt util/max-integer) +(def ^:const min-t 0) +(def ^:const max-t util/min-long) +(def ^:const min-op false) +(def ^:const max-op true) +(def ^:const min-meta util/min-integer) +(def ^:const max-meta util/max-integer) (defn sid->cid "Will return a collection id from a subject-id." @@ -228,8 +240,10 @@ [(s flake) (p flake) (o flake) (dt flake) (t flake) (op flake) (m flake)]) (def maximum - "The largest flake possible" - (->Flake util/max-long 0 util/max-long const/$xsd:decimal 0 true nil)) + (->Flake max-s max-p max-s max-dt max-t max-op max-meta)) + +(def minimum + (->Flake min-s min-p min-s min-dt min-t min-op min-meta)) (defn- assoc-flake "Assoc for Flakes" @@ -417,28 +431,6 @@ (cmp-bool (op f1) (op f2)) (cmp-meta (m f1) (m f2)))) - -(defn cmp-flakes-history - "Note this is not suitable for a set, only a vector/list." - [f1 f2] - (combine-cmp - (cmp-long (t f1) (t f2)) - #?(:clj (Boolean/compare (op f2) (op f1)) - :cljs (compare (op f2) (op f1))))) - - -(defn cmp-history-quick-reverse-sort - "Sorts by transaction time in ascending order (newest first), then by - the boolean operation descending so assertions (true) come before retractions (false) - so that we can 're-play' the log in reverse order to come up with historical states. - Suitable only for sorting a vector, not a sorted set." - [f1 f2] - (combine-cmp - (cmp-long (t f1) (t f2)) - #?(:clj (Boolean/compare (op f2) (op f1)) - :cljs (compare (op f2) (op f1))))) - - (defn flip-flake "Takes a flake and returns one with the provided block and op flipped from true/false. Don't over-ride no-history, even if no-history for this predicate has changed. New inserts @@ -448,35 +440,6 @@ ([flake t] (->Flake (s flake) (p flake) (o flake) (dt flake) t (not (op flake)) (m flake)))) -(defn slice - "From and to are Flakes" - [ss from to] - (cond - (and from to) (avl/subrange ss >= from <= to) - (nil? from) (avl/subrange ss <= to) - (nil? to) (avl/subrange ss >= from) - :else (throw (ex-info "Unexpected error performing slice, both from and to conditions are nil. Please report." - {:status 500 - :error :db/unexpected-error})))) - -(defn match-spot - "Returns all matching flakes to a specific subject, and optionaly also a predicate if provided - Must be provided with subject/predicate integer ids, no lookups are performed." - [ss sid pid] - (if pid - (avl/subrange ss >= (->Flake sid pid nil -1 nil nil nil) - <= (->Flake sid (inc pid) nil util/max-long nil nil nil)) - (avl/subrange ss > (->Flake (inc sid) MAX-COLL-SUBJECTS nil nil nil nil nil) - < (->Flake (dec sid) -1 nil nil nil nil nil)))) - - -(defn match-post - "Returns all matching flakes to a predicate + object match." - [ss pid o dt] - (avl/subrange ss - >= (->Flake util/max-long pid o dt nil nil nil) - <= (->Flake 0 pid o dt nil nil nil))) - (defn match-tspo "Returns all matching flakes to a specific 't' value." [ss t] @@ -484,20 +447,15 @@ >= (->Flake util/max-long nil nil nil t nil nil) <= (->Flake util/min-long nil nil nil t nil nil))) -(defn lookup - [ss start-flake end-flake] - (avl/subrange ss >= start-flake <= end-flake)) - (defn subrange ([ss test flake] (avl/subrange ss test flake)) ([ss start-test start-flake end-test end-flake] (avl/subrange ss start-test start-flake end-test end-flake))) - -(defn split-at - [n ss] - (avl/split-at n ss)) +(defn nearest + [ss test f] + (avl/nearest ss test f)) (defn lower-than-all? [f ss] @@ -511,14 +469,6 @@ (and (nil? e) (empty? upper)))) -(defn split-by-flake - "Splits a sorted set at a given flake. If there is an exact match for flake, - puts it in the left-side. Primarily for use with last-flake." - [f ss] - (let [[l e r] (avl/split-key f ss)] - [(if e (conj l e) l) r])) - - (defn sorted-set-by [comparator & flakes] (apply avl/sorted-set-by comparator flakes)) @@ -549,6 +499,23 @@ [ss to-remove] (transient-reduce disj! ss to-remove)) +(defn revise + "Changes the composition of the sorted set `ss` by adding all the flakes in the + `to-add` collection and removing all flakes in the `to-remove` collection." + [ss to-add to-remove] + (let [trans (transient ss) + removed (loop [[f & r] to-remove + t-set trans] + (if f + (recur r (disj! t-set f)) + t-set)) + added (loop [[f & r] to-add + t-set removed] + (if f + (recur r (conj! t-set f)) + t-set))] + (persistent! added))) + (defn assoc-all [sm entries] (transient-reduce (fn [m [k v]] @@ -607,22 +574,3 @@ "Returns approx number of bytes in a collection of flakes." [flakes] (reduce (fn [size f] (+ size (size-flake f))) 0 flakes)) - - -(defn size-kb - "Like size-bytes, but kb. - Rounds down for simplicity, as bytes is just an estimate anyhow." - [flakes] - (-> (size-bytes flakes) - (/ 1000) - (double) - (Math/round))) - - -(defn take - "Takes n flakes from a sorted flake set, retaining the set itself." - [n flake-set] - (if (>= n (count flake-set)) - flake-set - (let [k (nth flake-set n)] - (first (avl/split-key k flake-set))))) diff --git a/src/fluree/db/index.cljc b/src/fluree/db/index.cljc index 60408da4a..cc51477ed 100644 --- a/src/fluree/db/index.cljc +++ b/src/fluree/db/index.cljc @@ -1,7 +1,6 @@ (ns fluree.db.index (:refer-clojure :exclude [resolve]) - (:require [clojure.data.avl :as avl] - [fluree.db.flake :as flake] + (:require [fluree.db.flake :as flake] #?(:clj [clojure.core.async :refer [chan go !] :as async] :cljs [cljs.core.async :refer [chan go !] :as async]) [fluree.db.util.async :refer [! error-ch e))))) - (defn resolved? "Returns `true` if the data associated with the index node map `node` is fully resolved from storage" @@ -55,6 +44,11 @@ (leaf? node) (not (nil? (:flakes node))) (branch? node) (not (nil? (:children node))))) +(defn resolved-leaf? + [node] + (and (leaf? node) + (resolved? node))) + (defn unresolve "Clear the populated child node attributes from the supplied `node` map if it represents a branch, or the populated flakes if `node` represents a leaf." @@ -63,27 +57,6 @@ (leaf? node) (dissoc node :flakes) (branch? node) (dissoc node :children))) -(defn lookup - [branch flake] - (when (and (branch? branch) - (resolved? branch)) - (let [{:keys [children]} branch] - (-> children - (avl/nearest <= flake) - (or (first children)) - val)))) - -(defn lookup-leaf - [r branch flake] - (go-try - (when (and (branch? branch) - (resolved? branch)) - (loop [child (lookup branch flake)] - (if (leaf? child) - child - (recur ( leaf @@ -109,33 +82,25 @@ (assoc new-leaf :first new-first))) (defn empty-leaf - "Returns a blank leaf node map for the provided `network`, `ledger-id`, and index + "Returns a blank leaf node map for the provided `ledger-alias` and index comparator `cmp`." - [network ledger-id cmp] - {:comparator cmp - :network network - :ledger-id ledger-id - :id :empty - :tempid (random-uuid) - :leaf true - :first flake/maximum - :rhs nil - :size 0 - :t 0 - :leftmost? true}) - -(defn new-leaf - [network ledger-id cmp flakes] - (let [empty-set (flake/sorted-set-by cmp)] - (-> (empty-leaf network ledger-id cmp) - (assoc :flakes empty-set) - (add-flakes flakes)))) + [ledger-alias cmp] + {:comparator cmp + :ledger-alias ledger-alias + :id :empty + :tempid (random-uuid) + :leaf true + :first flake/maximum + :rhs nil + :size 0 + :t 0 + :leftmost? true}) (defn descendant? "Checks if the `node` passed in the second argument is a descendant of the `branch` passed in the first argument" [{:keys [rhs leftmost?], cmp :comparator, first-flake :first, :as branch} - {node-first :first, node-rhs :rhs, :as node}] + {node-first :first, node-rhs :rhs, :as _node}] (if-not (branch? branch) false (and (or leftmost? @@ -160,37 +125,21 @@ (defn empty-branch "Returns a blank branch node which contains a single empty leaf node for the - provided `network`, `ledger-id`, and index comparator `cmp`." - [network ledger-id cmp] - (let [child-node (empty-leaf network ledger-id cmp) + provided `ledger-alias` and index comparator `cmp`." + [ledger-alias cmp] + (let [child-node (empty-leaf ledger-alias cmp) children (child-map cmp child-node)] - {:comparator cmp - :network network - :ledger-id ledger-id - :id :empty - :tempid (random-uuid) - :leaf false - :first flake/maximum - :rhs nil - :children children - :size 0 - :t 0 - :leftmost? true})) - -(defn reset-children - [{:keys [comparator size] :as branch} new-child-nodes] - (let [new-kids (apply child-map comparator new-child-nodes) - new-first (or (some-> new-kids first key) - flake/maximum) - new-size (->> new-child-nodes - (map :size) - (reduce + size))] - (assoc branch :first new-first, :size new-size, :children new-kids))) - -(defn new-branch - [network ledger-id cmp child-nodes] - (-> (empty-branch network ledger-id cmp) - (reset-children child-nodes))) + {:comparator cmp + :ledger-alias ledger-alias + :id :empty + :tempid (random-uuid) + :leaf false + :first flake/maximum + :rhs nil + :children children + :size 0 + :t 0 + :leftmost? true})) (defn after-t? "Returns `true` if `flake` has a transaction value after the provided `t`" @@ -217,20 +166,20 @@ (flake/disj-all flakes))) (defn novelty-subrange - [{:keys [rhs leftmost?], first-flake :first, :as node} through-t novelty] + [{:keys [rhs leftmost?], first-flake :first, :as _node} through-t novelty] (log/trace "novelty-subrange: first-flake:" first-flake "\nrhs:" rhs "\nleftmost?" leftmost?) (let [subrange (cond ;; standard case: both left and right boundaries (and rhs (not leftmost?)) - (avl/subrange novelty > first-flake <= rhs) + (flake/subrange novelty > first-flake <= rhs) ;; right only boundary (and rhs leftmost?) - (avl/subrange novelty <= rhs) + (flake/subrange novelty <= rhs) ;; left only boundary (and (nil? rhs) (not leftmost?)) - (avl/subrange novelty > first-flake) + (flake/subrange novelty > first-flake) ;; no boundary (and (nil? rhs) leftmost?) @@ -279,7 +228,7 @@ (defrecord TRangeResolver [node-resolver novelty from-t to-t] Resolver - (resolve [_ {:keys [id tempid tt-id] :as node}] + (resolve [_ node] (if (branch? node) (resolve node-resolver node) (resolve-t-range node-resolver node novelty from-t to-t)))) @@ -355,20 +304,80 @@ [node] (-> node ::expanded true?)) +(defn trim-leaf + "Remove flakes from the index leaf node `leaf` that are outside of the interval + defined by `start-flake` and `end-flake`. nil values for either `start-flake` + or `end-flake` makes that side of the interval unlimited." + [leaf start-flake end-flake] + (cond + (and start-flake end-flake) + (update leaf :flakes flake/subrange >= start-flake <= end-flake) + + start-flake + (update leaf :flakes flake/subrange >= start-flake) + + end-flake + (update leaf :flakes flake/subrange <= end-flake) + + :else + leaf)) + +(defn trim-branch + "Remove child nodes from the index branch node `branch` that do not contain + flakes in the interval defined by `start-flake` and `end-flake`. nil values + for either `start-flake` or `end-flake` makes that side of the interval + unlimited." + [{:keys [children] :as branch} start-flake end-flake] + (let [start-key (some->> start-flake (flake/nearest children <=) key) + end-key (some->> end-flake (flake/nearest children <=) key)] + (cond + (and start-key end-key) + (update branch :children flake/subrange >= start-key <= end-key) + + start-key + (update branch :children flake/subrange >= start-key) + + end-key + (update branch :children flake/subrange <= end-key) + + :else + branch))) + +(defn trim-node + "Remove flakes or children from the index leaf or branch node `node` that are + outside of the interval defined by `start-flake` and `end-flake`. nil values + for either `start-flake` or `end-flake` makes that side of the interval + unlimited." + [node start-flake end-flake] + (if (leaf? node) + (trim-leaf node start-flake end-flake) + (trim-branch node start-flake end-flake))) + +(defn try-resolve + [r start-flake end-flake error-ch node] + (go + (try* (let [resolved (! error-ch e))))) + (defn resolve-when - [r resolve? error-ch node] + [r start-flake end-flake resolve? error-ch node] (if (resolve? node) - (try-resolve r error-ch node) + (try-resolve r start-flake end-flake error-ch node) (doto (chan) (async/put! node)))) (defn resolve-children-when - [r resolve? error-ch branch] + [r start-flake end-flake resolve? error-ch branch] (if (resolved? branch) (->> branch :children (map (fn [[_ child]] - (resolve-when r resolve? error-ch child))) + (resolve-when r start-flake end-flake resolve? error-ch child))) (async/map vector)) (go []))) @@ -377,29 +386,35 @@ descended from `root` in depth-first order. `resolve?` is a boolean function that will be applied to each node to determine whether or not the data associated with that node will be resolved from disk using the supplied - `Resolver` `r`. `include?` is a boolean function that will be applied to each - node to determine if it will be included in the final output node stream, `n` - is an optional parameter specifying the number of nodes to load concurrently, - and `xf` is an optional transducer that will transform the output stream if - supplied." - ([r root resolve? include? error-ch] - (tree-chan r root resolve? include? 1 identity error-ch)) - ([r root resolve? include? n xf error-ch] + `Resolver` `r`. `start-flake` and `end-flake` are flakes for which only nodes + that contain flakes within the interval defined by them will be considered, + `n` is an optional parameter specifying the number of nodes to load + concurrently, and `xf` is an optional transducer that will transform the + output stream if supplied." + ([r root resolve? error-ch] + (tree-chan r root resolve? 1 identity error-ch)) + ([r root resolve? n xf error-ch] + (tree-chan r root nil nil resolve? n xf error-ch)) + ([r root start-flake end-flake resolve? n xf error-ch] (let [out (chan n xf)] (go - (let [root-node (! out (unmark-expanded node))) + (do (>! out (unmark-expanded node)) (recur stack*)) - (let [children ( stack* (conj (mark-expanded node)) - (into (rseq children)))] + (into (rseq children)))] ; reverse children + ; to ensure final + ; nodes are in + ; depth-first order (recur stack**)))))) (async/close! out))) out))) diff --git a/src/fluree/db/indexer/default.cljc b/src/fluree/db/indexer/default.cljc index a565eabae..fb1466c0b 100644 --- a/src/fluree/db/indexer/default.cljc +++ b/src/fluree/db/indexer/default.cljc @@ -1,7 +1,7 @@ (ns fluree.db.indexer.default (:require [fluree.db.indexer.proto :as idx-proto] [fluree.db.index :as index] - [fluree.db.storage.core :as storage] + [fluree.db.storage :as storage] [fluree.db.flake :as flake] [fluree.db.util.core :as util #?(:clj :refer :cljs :refer-macros) [try* catch*]] [clojure.core.async :as async] @@ -213,7 +213,7 @@ (if (overflow-leaf? leaf) (let [target-size (/ *overflow-bytes* 2)] (log/debug "Rebalancing index leaf:" - (select-keys leaf [:id :network :ledger-id])) + (select-keys leaf [:id :ledger-alias])) (loop [[f & r] flakes cur-size 0 cur-first f @@ -325,7 +325,7 @@ collection purposes" [{:keys [id] :as node}] (cond-> node - (index/resolved? node) (assoc ::old-id id))) + (index/resolved? node) (assoc ::old-id id))) (defn update-branch-ids "When using IPFS, we don't know what the leaf id will be until written, therefore @@ -345,7 +345,7 @@ "Writes `node` to storage, and puts any errors onto the `error-ch`" [db idx node error-ch updated-ids] (let [node (dissoc node ::old-id) - display-node (select-keys node [:id :ledger-id])] + display-node (select-keys node [:id :ledger-alias])] (async/go (try* (if (index/leaf? node) @@ -402,7 +402,7 @@ novel? (fn [node] (or (seq remove-preds) (seq (index/novelty-subrange node t novelty))))] - (->> (index/tree-chan conn root novel? (constantly true) 1 refresh-xf error-ch) + (->> (index/tree-chan conn root novel? 1 refresh-xf error-ch) (write-resolved-nodes db idx changes-ch error-ch)))) (defn extract-root @@ -451,13 +451,12 @@ (defn refresh [indexer - {:keys [ecount novelty t network ledger-id] :as db} + {:keys [ecount novelty t ledger-alias] :as db} {:keys [remove-preds changes-ch]}] (go-try (let [start-time-ms (util/current-time-millis) novelty-size (:size novelty) - init-stats {:network network - :ledger-id ledger-id + init-stats {:ledger-alias ledger-alias :t t :novelty-size novelty-size :start-time (util/current-time-iso)}] diff --git a/src/fluree/db/json_ld/commit_data.cljc b/src/fluree/db/json_ld/commit_data.cljc index c86b4f284..2caaa07d1 100644 --- a/src/fluree/db/json_ld/commit_data.cljc +++ b/src/fluree/db/json_ld/commit_data.cljc @@ -364,11 +364,6 @@ [flakes] (filter ref? flakes)) -(defn update-novelty-idx - [novelty-idx add remove] - (-> (reduce disj novelty-idx remove) - (into add))) - (defn update-novelty ([db add] (update-novelty db add [])) @@ -383,11 +378,11 @@ add (+ (flake/size-bytes add)) rem (- (flake/size-bytes rem)))] (-> db - (update-in [:novelty :spot] update-novelty-idx add rem) - (update-in [:novelty :psot] update-novelty-idx add rem) - (update-in [:novelty :post] update-novelty-idx add rem) - (update-in [:novelty :opst] update-novelty-idx ref-add ref-rem) - (update-in [:novelty :tspo] update-novelty-idx add rem) + (update-in [:novelty :spot] flake/revise add rem) + (update-in [:novelty :psot] flake/revise add rem) + (update-in [:novelty :post] flake/revise add rem) + (update-in [:novelty :opst] flake/revise ref-add ref-rem) + (update-in [:novelty :tspo] flake/revise add rem) (update-in [:novelty :size] + flake-size) (update-in [:stats :size] + flake-size) (update-in [:stats :flakes] + flake-count))))) @@ -424,7 +419,7 @@ children* (reduce-kv (fn [children* k v] (assoc children* k (assoc v :tt-id tt-id))) - {} children)] + (empty children) children)] (assoc db* idx (assoc node :tt-id tt-id :children children*)))) db indexes) diff --git a/src/fluree/db/json_ld/reify.cljc b/src/fluree/db/json_ld/reify.cljc index 32b4b9202..882a828cd 100644 --- a/src/fluree/db/json_ld/reify.cljc +++ b/src/fluree/db/json_ld/reify.cljc @@ -4,9 +4,10 @@ [fluree.db.constants :as const] [fluree.db.json-ld.ledger :as jld-ledger] [fluree.db.json-ld.vocab :as vocab] + [fluree.db.util.core :as util] [fluree.db.util.async :refer [!] :as async] - :cljs [cljs.core.async :refer [chan !] :refer-macros [go go-loop] :as async]) + #?(:clj [clojure.core.async :refer [chan go >!] :as async] + :cljs [cljs.core.async :refer [chan >!] :refer-macros [go] :as async]) [fluree.db.permissions-validate :as perm-validate] [fluree.db.util.async :refer [= test) util/min-integer util/max-integer))] (flake/create s p o' dt t op m'))) -(defn resolved-leaf? - [node] - (and (index/leaf? node) - (index/resolved? node))) - (defn intersects-range? "Returns true if the supplied `node` contains flakes between the `lower` and `upper` flakes, according to the `node`'s comparator." @@ -125,19 +119,15 @@ map. The result of the transformation will be a stream of collections of flakes from the leaf nodes in the input stream, with one flake collection for each input leaf." - [{:keys [start-flake start-test end-flake end-test flake-xf] :as opts}] - (let [query-xf (comp (map :flakes) - (map (fn [flakes] - (flake/subrange flakes - start-test start-flake - end-test end-flake))) - (map (fn [flakes] - (into [] (query-filter opts) flakes))))] - (if flake-xf - (let [slice-xf (map (fn [flakes] - (sequence flake-xf flakes)))] - (comp query-xf slice-xf)) - query-xf))) + [{:keys [flake-xf] :as opts}] + (let [flake-xfs (cond-> [(query-filter opts)] + flake-xf (conj flake-xf)) + flake-xf* (apply comp flake-xfs) + query-xf (comp (filter index/resolved-leaf?) + (map :flakes) + (map (fn [flakes] + (into [] flake-xf* flakes))))] + query-xf)) (defn resolve-flake-slices "Returns a channel that will contain a stream of chunked flake collections that @@ -146,12 +136,8 @@ [{:keys [lru-cache-atom] :as conn} root novelty error-ch {:keys [from-t to-t start-flake end-flake] :as opts}] (let [resolver (index/->CachedTRangeResolver conn novelty from-t to-t lru-cache-atom) - cmp (:comparator root) - range-set (flake/sorted-set-by cmp start-flake end-flake) - in-range? (fn [node] - (intersects-range? node range-set)) query-xf (extract-query-flakes opts)] - (index/tree-chan resolver root in-range? resolved-leaf? 1 query-xf error-ch))) + (index/tree-chan resolver root start-flake end-flake any? 1 query-xf error-ch))) (defn unauthorized? [f] @@ -293,7 +279,8 @@ :end-test end-test :end-flake end-flake})] (go-try - (let [history-ch (->> (index/tree-chan resolver idx-root in-range? resolved-leaf? 1 query-xf error-ch) + (let [history-ch (->> (index/tree-chan resolver idx-root start-flake end-flake + in-range? 1 query-xf error-ch) (filter-authorized db start-flake end-flake error-ch) (into-page limit offset flake-limit))] (async/alt! @@ -368,75 +355,3 @@ (throw e)) range-ch ([idx-range] idx-range))))))) - -(defn non-nil-non-boolean? - [o] - (and (not (nil? o)) - (not (boolean? o)))) - -(defn tag-string? - [possible-tag] - (re-find #"^[a-zA-Z0-9-_]*/[a-zA-Z0-9-_]*:[a-zA-Z0-9-]*$" possible-tag)) - -(def ^:const tag-sid-start (flake/min-subject-id const/$_tag)) -(def ^:const tag-sid-end (flake/max-subject-id const/$_tag)) - -(defn is-tag-flake? - "Returns true if flake is a root setting flake." - [f] - (<= tag-sid-start (flake/o f) tag-sid-end)) - - -(defn coerce-tag-flakes - [db flakes] - (async/go-loop [[flake & r] flakes - acc []] - (if flake - (if (is-tag-flake? flake) - (let [[s p o _ t op m] (flake/Flake->parts flake) - o* (= [(flake/max-subject-id id)] - <= [(flake/min-subject-id id)] - opts)) - (throw (ex-info (str "Invalid collection name: " (pr-str name)) - {:status 400 - :error :db/invalid-collection}))) - (catch* e e))))) - -(defn _block-or_tx-collection - "Returns spot index range for only the requested collection." - [db opts] - (index-range db :spot > [0] <= [util/min-long] opts)) - -(defn txn-from-flakes - "Returns vector of transactions from a set of flakes. - Each transaction is a map with the following keys: - 1. db - the associated ledger - 2. tx - a map containing all transaction data in the original cmd - 3. nonce - the nonce - 4. auth - the authority that submitted the transaction - 5. expire - expiration" - [flakes] - (loop [[flake' & r] flakes result* []] - (if (nil? flake') - result* - (let [obj (flake/o flake') - cmd-map (try* - (json/parse obj) - (catch* e nil)) ; log an error if transaction is not parsable? - {:keys [type db tx nonce auth expire]} cmd-map] - (recur r - (if (= type "tx") - (conj result* {:db db :tx tx :nonce nonce :auth auth :expire expire}) - result*)))))) diff --git a/src/fluree/db/query/sql.cljc b/src/fluree/db/query/sql.cljc index 58e147074..cd1676aca 100644 --- a/src/fluree/db/query/sql.cljc +++ b/src/fluree/db/query/sql.cljc @@ -491,5 +491,5 @@ sql parse-rule first - (select-keys [:select :selectDistinct :selectOne :where :block :prefixes + (select-keys [:select :selectDistinct :selectOne :where :prefixes :vars :opts]))) diff --git a/src/fluree/db/query/subject_crawl/common.cljc b/src/fluree/db/query/subject_crawl/common.cljc index 94b84400a..20509aa5d 100644 --- a/src/fluree/db/query/subject_crawl/common.cljc +++ b/src/fluree/db/query/subject_crawl/common.cljc @@ -2,6 +2,7 @@ (:require [clojure.core.async :refer [go] :as async] [fluree.db.util.async :refer [ [(map :flakes) + (apply comp (cond-> [(filter index/resolved-leaf?) + (map :flakes) (map (fn [flakes] (flake/subrange flakes start-test start-flake diff --git a/src/fluree/db/query/subject_crawl/subject.cljc b/src/fluree/db/query/subject_crawl/subject.cljc index 84c0d6644..dc140d9e7 100644 --- a/src/fluree/db/query/subject_crawl/subject.cljc +++ b/src/fluree/db/query/subject_crawl/subject.cljc @@ -55,7 +55,7 @@ (map (fn [flakes] (filter filter-fn flakes))))}) resolver (index/->CachedTRangeResolver conn (get novelty idx*) t t (:lru-cache-atom conn)) - tree-chan (index/tree-chan resolver idx-root in-range? query-range/resolved-leaf? 1 query-xf error-ch)] + tree-chan (index/tree-chan resolver idx-root in-range? 1 query-xf error-ch)] (async/go-loop [] (let [next-chunk (Flake) (:first index-data) (update :first flake/parts->Flake) true (assoc :comparator cmp - :network network - :ledger-id ledger-id + :ledger-alias ledger-alias :t t :leftmost? true)))) @@ -153,9 +152,9 @@ (defn read-garbage "Returns garbage file data for a given index t." - [conn network ledger-id t] + [conn ledger-alias t] (go-try - (let [key (ledger-garbage-key network ledger-id t) + (let [key (ledger-garbage-key ledger-alias t) data ( branch-metadata @@ -224,7 +223,7 @@ (defn resolve-index-node ([conn node] - (resolve-index-node node nil)) + (resolve-index-node conn node nil)) ([conn {:keys [comparator leaf] :as node} error-fn] (assert comparator "Cannot resolve index node; configuration does not have a comparator.") (let [return-ch (async/chan)] @@ -252,3 +251,21 @@ empty-node (assoc node :flakes empty-set)] (async/put! ch empty-node) ch)) + +(defn resolve-empty-branch + [{:keys [comparator ledger-alias] :as node}] + (let [ch (async/chan) + child (index/empty-leaf ledger-alias comparator) + children (flake/sorted-map-by comparator child) + empty-node (assoc node :children children)] + (async/put! ch empty-node) + ch)) + +(defn resolve-empty-node + [node] + (if (index/resolved? node) + (doto (async/chan) + (async/put! node)) + (if (index/leaf? node) + (resolve-empty-leaf node) + (resolve-empty-branch node)))) diff --git a/src/fluree/db/util/pprint.clj b/src/fluree/db/util/pprint.clj index 3d8a2ea51..dba1c367f 100644 --- a/src/fluree/db/util/pprint.clj +++ b/src/fluree/db/util/pprint.clj @@ -17,7 +17,7 @@ str-vec (if (index/leaf? index) (let [node-count (count (:flakes index))] (swap! count-atom + node-count) - (conj str-vec (str ":" node-count))) + (conj str-vec (str ":" (:t index) "-" node-count))) str-vec) first-flake (:first index) main-str (apply str str-vec) diff --git a/src/fluree/db/util/tx.clj b/src/fluree/db/util/tx.clj deleted file mode 100644 index f812ea2c9..000000000 --- a/src/fluree/db/util/tx.clj +++ /dev/null @@ -1,152 +0,0 @@ -(ns fluree.db.util.tx - (:require [fluree.db.util.json :as json] - [fluree.crypto :as crypto] - [clojure.string :as str] - [fluree.db.util.async :refer [ (try (json/parse cmd) - (catch Exception _ - (throw (ex-info (format "Transaction %s is not valid JSON, ignoring." id) - {:status 400 :error :db/invalid-transaction})))) - (assoc :txid id - :cmd cmd - :sig sig - :signed signed) - util/without-nils) - _ (log/trace "Validating command:" cmd-map) - sig-authority (try (crypto/account-id-from-message (or signed cmd) sig) - (catch Exception _ - (throw (ex-info (format "Transaction %s has an invalid signature." id) - {:status 400 :error :db/invalid-signature})))) - ;; merge everything together into one map for transaction. - current-time (System/currentTimeMillis) - {:keys [auth authority expire]} cmd-map - expired? (and expire (< expire current-time)) - _ (when expired? - (throw (ex-info (format "Transaction %s is expired. Current time: %s expire time: %s." id current-time expire) - {:status 400 :error :db/expired-transaction}))) - cmd-map* (cond - (and (nil? auth) (nil? authority)) - (assoc cmd-map :auth sig-authority) - - (and (nil? auth) authority) - (throw (ex-info (format "Transaction %s invalid. An authority without an auth is not allowed." id) - {:status 400 :error :db/missing-auth})) - - (and auth authority) - (if (= authority sig-authority) - cmd-map - (throw (ex-info (format "Transaction %s is invalid. Signing authority: %s does not match command authority: %s." id sig-authority authority) - {:status 400 :error :db/invalid-authority}))) - - (and auth (nil? authority)) - (if (= auth sig-authority) - cmd-map - (assoc cmd-map :authority sig-authority)))] - cmd-map*)) - - -(defn gen-tx-hash - "From a list of transaction flakes, returns the sha3 hash. - - Note, this assumes the _tx/hash flake is NOT included in this list, - else the resulting hash will be different from the one that would have - been computed when performing the transaction." - ([tx-flakes] - ;; sort in block sort order as defined by fluree.db.flake/cmp-flakes-block - (-> (apply flake/sorted-set-by flake/cmp-flakes-block tx-flakes) - (gen-tx-hash true))) - ([tx-flakes sorted?] - (if-not sorted? - (gen-tx-hash tx-flakes) - (->> tx-flakes - (mapv #(let [f %] - (vector (flake/s f) (flake/p f) (flake/o f) (flake/t f) (flake/op f) (flake/m f)))) - (json/stringify) - (crypto/sha3-256))))) - - -;;; -;;; Block merkle root calculation -;;; - -(defn- exp [x n] - (loop [acc 1 n n] - (if (zero? n) - acc - (recur (long (* x acc)) (dec n))))) ; long keeps recur arg primitive - -(defn- find-closest-power-2 - [n] - (loop [i 1] - (if (>= (exp 2 i) n) - (exp 2 i) - (recur (inc i))))) - -(defn- generate-hashes - [cmds] - (loop [[f s & r] cmds - acc []] - (let [hash (crypto/sha2-256 (str f s)) - acc* (conj acc hash)] - (if r - (recur r acc*) - acc*)))) - -(defn generate-merkle-root - "hashes should already be in the correct order." - [& hashes] - (let [count-cmds (count hashes) - repeat-last (- count-cmds (find-closest-power-2 count-cmds)) - leaves-ordrd (concat hashes (repeat repeat-last (last hashes)))] - (loop [merkle-results (apply generate-hashes leaves-ordrd)] - (if (> 1 (count merkle-results)) - (recur (apply generate-hashes merkle-results)) - (first merkle-results))))) - - -;; TODO - moved this from the original transact namespace. Need to look at how this special treatment is handled -;; and verify it is being done in a reasonable way. -(defn create-new-ledger-tx - [tx-map] - (let [{:keys [ledger alias auth doc fork forkBlock]} tx-map - ledger-name (if (sequential? ledger) - (str (first ledger) "/" (second ledger)) - (str/replace ledger "/$" "/")) - tx (util/without-nils - {:_id "db$newdb" - :_action :insert - :id ledger-name - :alias (or alias ledger-name) - :root auth - :doc doc - :fork fork - :forkBlock forkBlock})] - [tx])) - - -(defn make-candidate-db - "Assigns a tempid to all index roots, which ensures caching for this candidate db - is independent from any 'official' db with the same block." - [db] - (let [tempid (random-uuid) - indexes [:spot :psot :post :opst]] - (reduce - (fn [db idx] - (let [index (assoc (get db idx) :tempid tempid)] - (assoc db idx index))) - db indexes)))