Skip to content

Commit

Permalink
support single-stmt-write (#101)
Browse files Browse the repository at this point in the history
* support single-stmt-transfer for bank

Signed-off-by: zyguan <[email protected]>

* support single-stmt-write for other workloads

Signed-off-by: zyguan <[email protected]>

---------

Signed-off-by: zyguan <[email protected]>
  • Loading branch information
zyguan authored Mar 28, 2024
1 parent 3df8981 commit fae499b
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 88 deletions.
82 changes: 46 additions & 36 deletions tidb/src/tidb/bank.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@
:to [to (- b2 amount) b2]
:amount amount})

(defn single-stmt-transfer! [conn op]
(let [{:keys [from to amount]} (:value op)]
(c/execute! conn
["update accounts set balance = balance + if(id=?,-?,?) where id=? or (id=? and 1/if(balance>=?,1,0))"
from amount amount to from amount]
{:transaction? false})
(attach-txn-info conn (assoc op :type :ok))))

(defrecord BankClient [conn tbl-created?]
client/Client
(open! [this test node]
Expand Down Expand Up @@ -49,42 +57,44 @@
(catch java.sql.SQLIntegrityConstraintViolationException e nil))))))

(invoke! [this test op]
(with-txn op [c conn {:isolation (util/isolation-level test)
:before-hook (partial c/rand-init-txn! test conn)}]
(try
(case (:f op)
:read (->> (c/query c [(str "select * from accounts")])
(map (juxt :id :balance))
(into (sorted-map))
(assoc op :type :ok, :value))

:transfer
(let [{:keys [from to amount]} (:value op)
b1 (-> c
(c/query [(str "select * from accounts where id = ? "
(:read-lock test)) from]
{:row-fn :balance})
first
(- amount))
b2 (-> c
(c/query [(str "select * from accounts where id = ? "
(:read-lock test))
to]
{:row-fn :balance})
first
(+ amount))]
(cond (neg? b1)
(assoc op :type :fail, :value [:negative from b1])
(neg? b2)
(assoc op :type :fail, :value [:negative to b2])
true
(if (:update-in-place test)
(do (c/execute! c ["update accounts set balance = balance - ? where id = ?" amount from])
(c/execute! c ["update accounts set balance = balance + ? where id = ?" amount to])
(assoc op :type :ok :value (transfer_value from to b1 b2 amount)))
(do (c/update! c :accounts {:balance b1} ["id = ?" from])
(c/update! c :accounts {:balance b2} ["id = ?" to])
(assoc op :type :ok :value (transfer_value from to b1 b2 amount))))))))))
(if (and (= :transfer (:f op)) (:single-stmt-write test))
(with-error-handling op (single-stmt-transfer! conn op))
(with-txn op [c conn {:isolation (util/isolation-level test)
:before-hook (partial c/rand-init-txn! test conn)}]
(try
(case (:f op)
:read (->> (c/query c [(str "select * from accounts")])
(map (juxt :id :balance))
(into (sorted-map))
(assoc op :type :ok, :value))

:transfer
(let [{:keys [from to amount]} (:value op)
b1 (-> c
(c/query [(str "select * from accounts where id = ? "
(:read-lock test)) from]
{:row-fn :balance})
first
(- amount))
b2 (-> c
(c/query [(str "select * from accounts where id = ? "
(:read-lock test))
to]
{:row-fn :balance})
first
(+ amount))]
(cond (neg? b1)
(assoc op :type :fail, :value [:negative from b1])
(neg? b2)
(assoc op :type :fail, :value [:negative to b2])
true
(if (:update-in-place test)
(do (c/execute! c ["update accounts set balance = balance - ? where id = ?" amount from])
(c/execute! c ["update accounts set balance = balance + ? where id = ?" amount to])
(assoc op :type :ok :value (transfer_value from to b1 b2 amount)))
(do (c/update! c :accounts {:balance b1} ["id = ?" from])
(c/update! c :accounts {:balance b2} ["id = ?" to])
(assoc op :type :ok :value (transfer_value from to b1 b2 amount)))))))))))

(teardown! [_ test])

Expand Down
5 changes: 3 additions & 2 deletions tidb/src/tidb/comments.clj
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@
:write (let [[k id] (:value op)
table (id->table table-count id)]
(c/rand-init-txn! test conn)
(c/insert! conn table {:id id, :tkey k})
(assoc op :type :ok))
(c/insert! conn table {:id id, :tkey k}
{:transaction? (not (:single-stmt-write test))})
(c/attach-txn-info conn (assoc op :type :ok)))

:read (with-txn op [c conn {:isolation (util/isolation-level test)}]
(->> (table-names table-count)
Expand Down
3 changes: 3 additions & 0 deletions tidb/src/tidb/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@
[nil "--pd-services" "If true, run pd as mutiple micro services."
:default false]

[nil "--single-stmt-write", "If true, performs write operations in a single statement when possible."
:default false]

[nil "--predicate-read" "If present, try to read using a query over a secondary key, rather than by primary key. Implied by --use-index."
:default false]

Expand Down
75 changes: 42 additions & 33 deletions tidb/src/tidb/monotonic.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
[jepsen.tests.cycle.append :as append]
[tidb [sql :as c :refer :all]
[txn :as txn]
[util :as util]]))
[util :as util]]
[tidb.sql :as c]))

(defn read-key
"Read a specific key's value from the table. Missing values are represented
Expand All @@ -36,6 +37,13 @@
(into (sorted-map))))
;(zipmap ks (map (partial read-key c test) ks)))

(defn single-stmt-inc! [conn op]
(let [k (:value op)
q (str "insert into cycle values (?, ?, 0) "
"on duplicate key update val = values(val)+1")]
(c/execute! conn [q k k] {:transaction? false})
(assoc op :type :ok, :value {})))

(defrecord IncrementClient [conn]
client/Client
(open! [this test node]
Expand All @@ -53,38 +61,39 @@
(c/execute! conn ["alter table cycle cache"]))))

(invoke! [this test op]
(c/with-txn op [c conn {:isolation (util/isolation-level test)
:before-hook (partial c/rand-init-txn! test conn)}]
;(let [c conn]
(case (:f op)
:read (let [v (read-keys c test (shuffle (keys (:value op))))]
(assoc op :type :ok, :value v))
:inc (let [k (:value op)]
(if (:update-in-place test)
; Update directly
(do (when (= [0] (c/execute!
c [(str "update cycle set val = val + 1"
" where pk = ?") k]))
; That failed; insert
(c/insert! c "cycle" {:pk k, :sk k, :val 0}))
; We can't place any constraints on the values since we
; didn't read anything
(assoc op :type :ok, :value {}))

; Update via separate r/w
(let [v (read-key c test k (not= "optimistic" (:txn-mode test)))]
(if (= -1 v)
(c/insert! c "cycle" {:pk k, :sk k, :val 0})
(c/update! c "cycle" {:val (inc v)},
[(str (if (:use-index test) "sk" "pk") " = ?")
k]))
; The monotonic value constraint isn't actually enough to
; capture all the ordering dependencies here: an increment
; from x->y must fall after every read of x, and before
; every read of y, but the monotonic order relation can only
; enforce one of those. We'll return the written value here.
; Still better than nothing.
(assoc op :type :ok :value {k (inc v)})))))))
(if (and (= :inc (:f op)) (:single-stmt-write test))
(c/with-error-handling op (single-stmt-inc! conn op))
(c/with-txn op [c conn {:isolation (util/isolation-level test)
:before-hook (partial c/rand-init-txn! test conn)}]
(case (:f op)
:read (let [v (read-keys c test (shuffle (keys (:value op))))]
(assoc op :type :ok, :value v))
:inc (let [k (:value op)]
(if (:update-in-place test)
; Update directly
(do (when (= [0] (c/execute!
c [(str "update cycle set val = val + 1"
" where pk = ?") k]))
; That failed; insert
(c/insert! c "cycle" {:pk k, :sk k, :val 0}))
; We can't place any constraints on the values since we
; didn't read anything
(assoc op :type :ok, :value {}))

; Update via separate r/w
(let [v (read-key c test k (not= "optimistic" (:txn-mode test)))]
(if (= -1 v)
(c/insert! c "cycle" {:pk k, :sk k, :val 0})
(c/update! c "cycle" {:val (inc v)},
[(str (if (:use-index test) "sk" "pk") " = ?")
k]))
; The monotonic value constraint isn't actually enough to
; capture all the ordering dependencies here: an increment
; from x->y must fall after every read of x, and before
; every read of y, but the monotonic order relation can only
; enforce one of those. We'll return the written value here.
; Still better than nothing.
(assoc op :type :ok :value {k (inc v)}))))))))

(teardown! [this test])

Expand Down
3 changes: 2 additions & 1 deletion tidb/src/tidb/sets.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
(c/with-txn-aborts op
(c/rand-init-txn! test conn)
(case (:f op)
:add (do (c/insert! conn :sets (select-keys op [:value]))
:add (do (c/insert! conn :sets (select-keys op [:value])
{:transaction? (not (:single-stmt-write test))})
(c/attach-txn-info conn (assoc op :type :ok)))

:read (->> (c/query conn ["select * from sets"])
Expand Down
1 change: 1 addition & 0 deletions tidb/src/tidb/sql.clj
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@
(catch java.sql.BatchUpdateException e#
(condp re-find (.getMessage e#)
#"Query timed out" (assoc ~op :type :info, :error :query-timed-out)
#"Division by 0" (assoc ~op :type :fail, :error :division-by-zero)
(throw e#)))

(catch java.sql.SQLNonTransientConnectionException e#
Expand Down
70 changes: 54 additions & 16 deletions tidb/src/tidb/txn.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
(ns tidb.txn
"Client for transactional workloads."
(:require [clojure.tools.logging :refer [info]]
[jepsen [client :as client]
[generator :as gen]]
(:require [clojure.string :as str]
[jepsen [client :as client]]
[tidb.sql :as c :refer :all]
[tidb.util :as util]))

Expand Down Expand Up @@ -49,6 +48,38 @@
k k (str v) (str v)])]
v))]))

(defn txn-type [test table-count txn]
(let [ops (set (map first txn))
tbls (set (map #(table-for table-count (second %)) txn))
single-stmt? (and (:single-stmt-write test)
(= 1 (count ops))
(= 1 (count tbls)))]
(cond
(and single-stmt? (= :w (first ops))) :single-stmt-write
(and single-stmt? (= :append (first ops))) :single-stmt-append
(= 1 (count txn)) :query
:else :txn)))

(defn single-stmt-write! [conn table-count op]
(let [txn (:value op)
table (table-for table-count (second (first txn)))
query (str "insert into " table " (id, sk, val) values "
(str/join ", " (repeat (count txn) "(?, ?, ?)"))
" on duplicate key update val = values(val)")
args (mapcat (fn [[_ k v]] [k k v]) txn)]
(c/execute! conn (into [query] args) {:transaction? false})
(attach-txn-info conn (assoc op :type :ok))))

(defn single-stmt-append! [conn table-count op]
(let [txn (:value op)
table (table-for table-count (second (first txn)))
query (str "insert into " table " (id, sk, val) values "
(str/join ", " (repeat (count txn) "(?, ?, ?)"))
" on duplicate key update val = CONCAT(val, ',', values(val))")
args (mapcat (fn [[_ k v]] [k k (str v)]) txn)]
(c/execute! conn (into [query] args) {:transaction? false})
(attach-txn-info conn (assoc op :type :ok))))

(defrecord Client [conn val-type table-count]
client/Client
(open! [this test node]
Expand All @@ -68,19 +99,26 @@
(c/execute! conn [(str "alter table " (table-name i) " cache")])))))

(invoke! [this test op]
(let [txn (:value op)
use-txn? (< 1 (count txn))]
;use-txn? false]
(if use-txn?
(c/with-txn op [c conn {:isolation (util/isolation-level test)
:before-hook (partial c/rand-init-txn! test conn)}]
(assoc op :type :ok, :value
(mapv (partial mop! c test table-count) txn)))
(c/with-error-handling op
(let [attach-info (if (= :r (-> txn first first)) c/attach-query-info c/attach-txn-info)]
(attach-info conn
(assoc op :type :ok, :value
(mapv (partial mop! conn test table-count) txn))))))))
(let [txn (:value op)]
(condp = (txn-type test table-count txn)

:single-stmt-write
(c/with-error-handling op (single-stmt-write! conn table-count op))

:single-stmt-append
(c/with-error-handling op (single-stmt-append! conn table-count op))

:query
(c/with-error-handling op
(let [attach-info (if (= :r (-> txn first first)) c/attach-query-info c/attach-txn-info)
res (mapv (partial mop! conn test table-count) txn)]
(attach-info conn (assoc op :type :ok, :value res))))

;; else
(c/with-txn op [c conn {:isolation (util/isolation-level test)
:before-hook (partial c/rand-init-txn! test conn)}]
(assoc op :type :ok, :value
(mapv (partial mop! c test table-count) txn))))))

(teardown! [this test])

Expand Down

0 comments on commit fae499b

Please sign in to comment.