Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/large groups #263

Merged
merged 32 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
446fc4b
order and group results explicitly after where search
zonotope Nov 15, 2022
e41ed24
Merge remote-tracking branch 'origin/main' into feature/group-by
zonotope Nov 16, 2022
d8f416d
put errors on error channel in process-where-item
zonotope Nov 16, 2022
d4f1ebf
process select results in async pipeline
zonotope Nov 16, 2022
9610e29
uncomment test
zonotope Nov 17, 2022
719bd58
wip: group results before ordering
zonotope Nov 18, 2022
2000263
remove commented code
zonotope Nov 21, 2022
8784699
don't process grouped vars specially; they're all grouped now
zonotope Nov 21, 2022
db3001b
remove println
zonotope Nov 21, 2022
4104657
add benchmarking tool in development profile
zonotope Nov 24, 2022
59c18e1
use peek instead of last to optimally get last element from vector
zonotope Nov 24, 2022
4cb6a59
add another (currently failing) test for groupBy
zonotope Nov 23, 2022
68e481a
fix larger groupings
zonotope Nov 24, 2022
310aed8
make fql-test clojure only
zonotope Nov 24, 2022
7499e59
Merge branch 'fix/large-groups' into feature/group-by
zonotope Nov 24, 2022
8f90578
don't pass group-by to order-by update fn
zonotope Nov 25, 2022
a5a2217
Revert "don't process grouped vars specially; they're all grouped now"
zonotope Nov 26, 2022
1894cae
separate grouping vars from the vars (and aggregates) that are grouped
zonotope Nov 26, 2022
30af319
remove unused/commented code
zonotope Nov 26, 2022
d350c20
calculate groupings directly instead of with closures
zonotope Nov 27, 2022
57831f3
clean up some unused code
zonotope Nov 27, 2022
66f9a38
use transduce with cat instead of double reduce
zonotope Nov 27, 2022
74d74ae
don't use closure to collect groups
zonotope Nov 27, 2022
e1e7dba
add implicit grouping test
zonotope Nov 28, 2022
bd35045
aggregate with implicit grouping
zonotope Nov 28, 2022
b102143
fix broken test
zonotope Nov 28, 2022
bcc70ee
order groups
zonotope Nov 28, 2022
fe338e6
only run aggregate test in clj for now
zonotope Nov 28, 2022
d09c42f
cleanup
zonotope Nov 28, 2022
f7f5f5d
Merge pull request #265 from fluree/feature/group-by
zonotope Nov 28, 2022
c96c4fd
remove unused code
zonotope Nov 28, 2022
451f5d2
correct indentation
zonotope Nov 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
:dev
{:extra-paths ["dev" "test" "dev-resources" "src-cljs" "src-nodejs" "src-docs"]
:extra-deps {org.clojure/tools.namespace {:mvn/version "1.3.0"}
criterium/criterium {:mvn/version "0.4.6"}
figwheel-sidecar/figwheel-sidecar {:mvn/version "0.5.20"}
thheller/shadow-cljs {:mvn/version "2.20.7"}}}

Expand Down
3 changes: 2 additions & 1 deletion dev/user.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
[fluree.db.constants :as const]
[fluree.db.query.schema :as schema]
[clojure.string :as str]
[criterium.core :refer [bench]]
;cljs
[figwheel-sidecar.repl-api :as ra]))

Expand Down Expand Up @@ -149,4 +150,4 @@
(async/<!! (fluree.db.api/block-query-async my-conn "test/password" my-query)))

(fluree.db.api/close my-conn)
)
)
4 changes: 2 additions & 2 deletions src/fluree/db/json_ld/transact.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@
(let [{:keys [delete] :as parsed-query} (q-parse/parse db json-ld)
fuel (volatile! 0)
error-ch (async/chan)
where-ch (compound/where parsed-query error-ch fuel max-fuel db)
where-ch (compound/where db parsed-query fuel max-fuel error-ch)
where-results (loop [results []]
(if-let [next-res (async/<! where-ch)]
(recur (into results next-res))
Expand Down Expand Up @@ -444,4 +444,4 @@
(if (and (contains? json-ld :delete)
(contains? json-ld :where))
(delete db util/max-integer json-ld)
(insert db json-ld opts)))
(insert db json-ld opts)))
125 changes: 17 additions & 108 deletions src/fluree/db/query/analytical_parse.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,8 @@
(when-not (every? #(variable-in-where? % where) group-symbols)
(throw (ex-info (str "Group by includes variable(s) not specified in the where clause: " group-by)
{:status 400 :error :db/invalid-query})))
(cond-> (assoc parsed-query :group-by {:input group-by
:parsed (mapv (fn [sym] {:variable sym}) group-symbols)})
(not order-by) (add-order-by group-symbols))))
(assoc parsed-query :group-by {:input group-by
:parsed (mapv (fn [sym] {:variable sym}) group-symbols)})))


(defn get-limit
Expand Down Expand Up @@ -948,13 +947,15 @@

If group-by is used, grouping can re-order output so utilize out-vars from that
as opposed to the last where statement."
[{:keys [spec] :as select} where {group-out-vars :out-vars, grouped-vars :grouped-vars, :as _group-by}]
[{:keys [aggregates spec] :as select} where {group-out-vars :out-vars :as group-by}]
(let [last-where (last where)
out-vars (or group-out-vars
(:out-vars last-where))
grouped-vars (set/union (:grouped-vars group-by)
(into #{} (map :variable) aggregates))
{:keys [all]} (:vars last-where) ;; the last where statement has an aggregation of all variables
spec* (cond->> (mapv #(update-position+type % out-vars all) spec)
grouped-vars (mapv #(if (grouped-vars (:variable %))
grouped-vars (mapv #(if (contains? grouped-vars (:variable %))
(assoc % :grouped? true)
%)))]
(assoc select :spec spec*)))
Expand Down Expand Up @@ -1013,101 +1014,16 @@

(defn update-order-by
"Updates order-by, if applicable, with final where clause positions of items."
[{:keys [parsed] :as order-by} group-by where]
[{:keys [parsed] :as order-by} {group-out-vars :out-vars, :as _group-by} where]
(when order-by
(let [{:keys [out-vars vars] :as _last-where} (last where)
(let [{:keys [vars] :as last-where} (last where)
out-vars (or group-out-vars
(:out-vars last-where))
parsed* (mapv #(update-position+type % out-vars (:all vars)) parsed)
order-by* (assoc order-by :parsed parsed*)
comparator (build-order-fn order-by*)]
(assoc order-by* :comparator comparator))))


(defn grouped-vals-result-fn
"Returns grouped results in a consolidated vector.
e.g. [[:a :b :c] [:a1 :b1 :c1] [:a2 :b2 :c2] [:a3 :b3 :c3]] will turn into:
==> [[:a :a1 :a2 :a3] [:b :b1 :b2 :b3] [:c :c1 :c2 :c3]

The optimized case-specific versions are > 50% faster than the less optimized"
[extraction-positions]
(let [n-positions (count extraction-positions)]
(case n-positions
0 nil
1 (fn [group-results]
(let [grouped (pop group-results)]
(conj grouped (mapv first (last group-results)))))
2 (fn [group-results]
(let [grouped (pop group-results)
grouped-block (last group-results)]
(loop [grouped-block grouped-block
r1-acc (transient [])
r2-acc (transient [])]
(let [[r1 r2] (first grouped-block)]
(if r1
(recur (rest grouped-block) (conj! r1-acc r1) (conj! r2-acc r2))
(-> grouped
(conj (persistent! r1-acc))
(conj (persistent! r2-acc))))))))
3 (fn [group-results]
(loop [group-results group-results
r1-acc (transient [])
r2-acc (transient [])
r3-acc (transient [])]
(let [[r1 r2 r3] (first group-results)]
(if r1
(recur (rest group-results) (conj! r1-acc r1) (conj! r2-acc r2) (conj! r3-acc r3))
[(persistent! r1-acc) (persistent! r2-acc) (persistent! r3-acc)]))))
4 (fn [group-results]
(loop [group-results group-results
r1-acc (transient [])
r2-acc (transient [])
r3-acc (transient [])
r4-acc (transient [])]
(let [[r1 r2 r3 r4] (first group-results)]
(if r1
(recur (rest group-results) (conj! r1-acc r1) (conj! r2-acc r2) (conj! r3-acc r3) (conj! r4-acc r4))
[(persistent! r1-acc) (persistent! r2-acc) (persistent! r3-acc) (persistent! r4-acc)]))))
5 (fn [group-results]
(loop [group-results group-results
r1-acc (transient [])
r2-acc (transient [])
r3-acc (transient [])
r4-acc (transient [])
r5-acc (transient [])]
(let [[r1 r2 r3 r4 r5] (first group-results)]
(if r1
(recur (rest group-results) (conj! r1-acc r1) (conj! r2-acc r2) (conj! r3-acc r3) (conj! r4-acc r4) (conj! r5-acc r5))
[(persistent! r1-acc) (persistent! r2-acc) (persistent! r3-acc) (persistent! r4-acc) (persistent! r5-acc)]))))
;; else - less optimized, handle all other cases
(fn [group-results]
;; note: args are returned here in a list which should be OK downstream.
;; If turned into a vector, benchmarking shows time doubles.
;; A more complex fn using a reducer with transients is about equal in time to what is here.
;; If we must move to using vector in result sets, then performance-wise it will make more sense
(apply mapv (fn [& args] args) group-results)))))


(defn lazy-group-by
"Returns lazily parsed results from group-by.
Even though the query results must be fully realized through sorting,
a pre-requisite of grouping, the grouping itself can be lazy which will
help with large result sets that have a 'limit'."
[grouping-fn grouped-vals-fn results]
(lazy-seq
(when-let [results* (seq results)]
(let [fst (first results*)
fv (grouping-fn fst)
fres (grouped-vals-fn fst)
[next-chunk rest-results] (loop [rest-results (rest results*)
acc [fres]]
(let [result (first rest-results)]
(if result
(if (= fv (grouping-fn result))
(recur (next rest-results) (conj acc (grouped-vals-fn result)))
[(conj fv acc) rest-results])
[(conj fv acc) nil])))]
(cons next-chunk (lazy-group-by grouping-fn grouped-vals-fn (lazy-seq rest-results)))))))


(defn update-group-by
"Updates group-by, if applicable, with final where clause positions of items."
[{:keys [parsed] :as group-by} where]
Expand All @@ -1116,22 +1032,15 @@
parsed* (mapv #(update-position+type % out-vars all) parsed)
group-by* (assoc group-by :parsed parsed*)
grouped-positions (mapv :in-n parsed*) ;; returns 'n' positions of values used for grouping
partition-fn (build-vec-extraction-fn grouped-positions) ;; returns fn containing only grouping vals, used like a 'partition-by' fn
grouped-val-positions (filterv ;; returns 'n' positions of values that are being grouped
(complement (set grouped-positions))
(range (count out-vars)))
grouped-vals-fn (build-vec-extraction-fn grouped-val-positions) ;; returns fn containing only values being grouped (excludes grouping vals)
;; grouping fn takes sorted results, and partitions results by group-by vars returning only the values being grouped.
;; we don't yet merge all results together as that work is unnecessary if using an offset, or limit
grouping-fn (fn [results]
(lazy-group-by partition-fn grouped-vals-fn results))
;; group-finish-fn takes final results and merges results together
group-finish-fn (grouped-vals-result-fn grouped-val-positions)
grouped-out-vars (into (mapv :variable parsed) (map #(nth out-vars %) grouped-val-positions))]
(assoc group-by* :out-vars grouped-out-vars ;; grouping can change output variable ordering, as all grouped vars come first then groupings appended to end
:grouped-vars (into #{} (map #(nth out-vars %) grouped-val-positions)) ;; these are the variable names in the output that are grouped
:grouping-fn grouping-fn
:group-finish-fn group-finish-fn))))
(assoc group-by*
:out-vars grouped-out-vars ;; grouping can change output variable ordering, as all grouped vars come first then groupings appended to end
:grouping-positions grouped-positions
:grouped-val-positions grouped-val-positions
:grouped-vars (into #{} (map #(nth out-vars %) grouped-val-positions))))))


(defn get-clause-vars
Expand Down Expand Up @@ -1247,8 +1156,8 @@
(union/order-out-vars out-vars last-clause order-by)
(order-out-vars out-vars last-clause order-by))
where* (where-meta-reverse where* select-out-vars)
order-by* (update-order-by order-by group-by where*)
group-by* (update-group-by group-by where*)]
group-by* (update-group-by group-by where*)
order-by* (update-order-by order-by group-by* where*)]
(cond-> (assoc parsed-query :where where*
:order-by order-by*
:group-by group-by*)
Expand Down
24 changes: 2 additions & 22 deletions src/fluree/db/query/compound.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@
out-ch))


(defn resolve-where-clause
[{:keys [t] :as db} {:keys [where vars] :as _parsed-query} error-ch fuel max-fuel]
(defn where
[{:keys [t] :as db} {:keys [where vars] :as _parsed-query} fuel max-fuel error-ch]
(let [initial-chan (get-clause-res db nil (first where) t vars fuel max-fuel error-ch)]
(loop [[clause & r] (rest where)
prev-chan initial-chan]
Expand All @@ -236,23 +236,3 @@
:union (process-union db prev-chan error-ch clause t))]
(recur r out-chan))
prev-chan))))

(defn order+group-results
"Ordering must first consume all results and then sort."
[results-ch error-ch fuel max-fuel {:keys [comparator] :as _order-by} {:keys [grouping-fn] :as _group-by}]
(async/go
(let [results (loop [results []]
(if-let [next-res (async/<! results-ch)]
(recur (into results next-res))
results))]
(cond-> (sort comparator results)
grouping-fn grouping-fn))))


(defn where
[parsed-query error-ch fuel max-fuel db]
(let [{:keys [order-by group-by]} parsed-query
where-results (resolve-where-clause db parsed-query error-ch fuel max-fuel)
out-ch (cond-> where-results
order-by (order+group-results error-ch fuel max-fuel order-by group-by))]
out-ch))
Loading