diff --git a/tidb/src/tidb/bank.clj b/tidb/src/tidb/bank.clj index d88042a22..ce401833b 100644 --- a/tidb/src/tidb/bank.clj +++ b/tidb/src/tidb/bank.clj @@ -17,6 +17,14 @@ :to [to (- b2 amount) b2] :amount amount}) +(defn single-stmt-transfer! [conn op] + (let [{:keys [from to amount]} (:value op)] + (c/execute! conn + ["update accounts set balance = balance + if(id=?,-?,?) where id=? or (id=? and 1/if(balance>=?,1,0))" + from amount amount to from amount] + {:transaction? false}) + (attach-txn-info conn (assoc op :type :ok)))) + (defrecord BankClient [conn tbl-created?] client/Client (open! [this test node] @@ -49,42 +57,44 @@ (catch java.sql.SQLIntegrityConstraintViolationException e nil)))))) (invoke! [this test op] - (with-txn op [c conn {:isolation (util/isolation-level test) - :before-hook (partial c/rand-init-txn! test conn)}] - (try - (case (:f op) - :read (->> (c/query c [(str "select * from accounts")]) - (map (juxt :id :balance)) - (into (sorted-map)) - (assoc op :type :ok, :value)) - - :transfer - (let [{:keys [from to amount]} (:value op) - b1 (-> c - (c/query [(str "select * from accounts where id = ? " - (:read-lock test)) from] - {:row-fn :balance}) - first - (- amount)) - b2 (-> c - (c/query [(str "select * from accounts where id = ? " - (:read-lock test)) - to] - {:row-fn :balance}) - first - (+ amount))] - (cond (neg? b1) - (assoc op :type :fail, :value [:negative from b1]) - (neg? b2) - (assoc op :type :fail, :value [:negative to b2]) - true - (if (:update-in-place test) - (do (c/execute! c ["update accounts set balance = balance - ? where id = ?" amount from]) - (c/execute! c ["update accounts set balance = balance + ? where id = ?" amount to]) - (assoc op :type :ok :value (transfer_value from to b1 b2 amount))) - (do (c/update! c :accounts {:balance b1} ["id = ?" from]) - (c/update! c :accounts {:balance b2} ["id = ?" to]) - (assoc op :type :ok :value (transfer_value from to b1 b2 amount)))))))))) + (if (and (= :transfer (:f op)) (:single-stmt-write test)) + (with-error-handling op (single-stmt-transfer! conn op)) + (with-txn op [c conn {:isolation (util/isolation-level test) + :before-hook (partial c/rand-init-txn! test conn)}] + (try + (case (:f op) + :read (->> (c/query c [(str "select * from accounts")]) + (map (juxt :id :balance)) + (into (sorted-map)) + (assoc op :type :ok, :value)) + + :transfer + (let [{:keys [from to amount]} (:value op) + b1 (-> c + (c/query [(str "select * from accounts where id = ? " + (:read-lock test)) from] + {:row-fn :balance}) + first + (- amount)) + b2 (-> c + (c/query [(str "select * from accounts where id = ? " + (:read-lock test)) + to] + {:row-fn :balance}) + first + (+ amount))] + (cond (neg? b1) + (assoc op :type :fail, :value [:negative from b1]) + (neg? b2) + (assoc op :type :fail, :value [:negative to b2]) + true + (if (:update-in-place test) + (do (c/execute! c ["update accounts set balance = balance - ? where id = ?" amount from]) + (c/execute! c ["update accounts set balance = balance + ? where id = ?" amount to]) + (assoc op :type :ok :value (transfer_value from to b1 b2 amount))) + (do (c/update! c :accounts {:balance b1} ["id = ?" from]) + (c/update! c :accounts {:balance b2} ["id = ?" to]) + (assoc op :type :ok :value (transfer_value from to b1 b2 amount))))))))))) (teardown! [_ test]) diff --git a/tidb/src/tidb/comments.clj b/tidb/src/tidb/comments.clj index d6310adbf..78f2a3d06 100644 --- a/tidb/src/tidb/comments.clj +++ b/tidb/src/tidb/comments.clj @@ -63,8 +63,9 @@ :write (let [[k id] (:value op) table (id->table table-count id)] (c/rand-init-txn! test conn) - (c/insert! conn table {:id id, :tkey k}) - (assoc op :type :ok)) + (c/insert! conn table {:id id, :tkey k} + {:transaction? (not (:single-stmt-write test))}) + (c/attach-txn-info conn (assoc op :type :ok))) :read (with-txn op [c conn {:isolation (util/isolation-level test)}] (->> (table-names table-count) diff --git a/tidb/src/tidb/core.clj b/tidb/src/tidb/core.clj index 833d65658..a6d3b9ced 100644 --- a/tidb/src/tidb/core.clj +++ b/tidb/src/tidb/core.clj @@ -497,6 +497,9 @@ [nil "--pd-services" "If true, run pd as mutiple micro services." :default false] + [nil "--single-stmt-write", "If true, performs write operations in a single statement when possible." + :default false] + [nil "--predicate-read" "If present, try to read using a query over a secondary key, rather than by primary key. Implied by --use-index." :default false] diff --git a/tidb/src/tidb/monotonic.clj b/tidb/src/tidb/monotonic.clj index a0ddecad6..b81f54142 100644 --- a/tidb/src/tidb/monotonic.clj +++ b/tidb/src/tidb/monotonic.clj @@ -14,7 +14,8 @@ [jepsen.tests.cycle.append :as append] [tidb [sql :as c :refer :all] [txn :as txn] - [util :as util]])) + [util :as util]] + [tidb.sql :as c])) (defn read-key "Read a specific key's value from the table. Missing values are represented @@ -36,6 +37,13 @@ (into (sorted-map)))) ;(zipmap ks (map (partial read-key c test) ks))) +(defn single-stmt-inc! [conn op] + (let [k (:value op) + q (str "insert into cycle values (?, ?, 0) " + "on duplicate key update val = values(val)+1")] + (c/execute! conn [q k k] {:transaction? false}) + (assoc op :type :ok, :value {}))) + (defrecord IncrementClient [conn] client/Client (open! [this test node] @@ -53,38 +61,39 @@ (c/execute! conn ["alter table cycle cache"])))) (invoke! [this test op] - (c/with-txn op [c conn {:isolation (util/isolation-level test) - :before-hook (partial c/rand-init-txn! test conn)}] - ;(let [c conn] - (case (:f op) - :read (let [v (read-keys c test (shuffle (keys (:value op))))] - (assoc op :type :ok, :value v)) - :inc (let [k (:value op)] - (if (:update-in-place test) - ; Update directly - (do (when (= [0] (c/execute! - c [(str "update cycle set val = val + 1" - " where pk = ?") k])) - ; That failed; insert - (c/insert! c "cycle" {:pk k, :sk k, :val 0})) - ; We can't place any constraints on the values since we - ; didn't read anything - (assoc op :type :ok, :value {})) - - ; Update via separate r/w - (let [v (read-key c test k (not= "optimistic" (:txn-mode test)))] - (if (= -1 v) - (c/insert! c "cycle" {:pk k, :sk k, :val 0}) - (c/update! c "cycle" {:val (inc v)}, - [(str (if (:use-index test) "sk" "pk") " = ?") - k])) - ; The monotonic value constraint isn't actually enough to - ; capture all the ordering dependencies here: an increment - ; from x->y must fall after every read of x, and before - ; every read of y, but the monotonic order relation can only - ; enforce one of those. We'll return the written value here. - ; Still better than nothing. - (assoc op :type :ok :value {k (inc v)}))))))) + (if (and (= :inc (:f op)) (:single-stmt-write test)) + (c/with-error-handling op (single-stmt-inc! conn op)) + (c/with-txn op [c conn {:isolation (util/isolation-level test) + :before-hook (partial c/rand-init-txn! test conn)}] + (case (:f op) + :read (let [v (read-keys c test (shuffle (keys (:value op))))] + (assoc op :type :ok, :value v)) + :inc (let [k (:value op)] + (if (:update-in-place test) + ; Update directly + (do (when (= [0] (c/execute! + c [(str "update cycle set val = val + 1" + " where pk = ?") k])) + ; That failed; insert + (c/insert! c "cycle" {:pk k, :sk k, :val 0})) + ; We can't place any constraints on the values since we + ; didn't read anything + (assoc op :type :ok, :value {})) + + ; Update via separate r/w + (let [v (read-key c test k (not= "optimistic" (:txn-mode test)))] + (if (= -1 v) + (c/insert! c "cycle" {:pk k, :sk k, :val 0}) + (c/update! c "cycle" {:val (inc v)}, + [(str (if (:use-index test) "sk" "pk") " = ?") + k])) + ; The monotonic value constraint isn't actually enough to + ; capture all the ordering dependencies here: an increment + ; from x->y must fall after every read of x, and before + ; every read of y, but the monotonic order relation can only + ; enforce one of those. We'll return the written value here. + ; Still better than nothing. + (assoc op :type :ok :value {k (inc v)})))))))) (teardown! [this test]) diff --git a/tidb/src/tidb/sets.clj b/tidb/src/tidb/sets.clj index e28b72908..0ed4a14f3 100644 --- a/tidb/src/tidb/sets.clj +++ b/tidb/src/tidb/sets.clj @@ -27,7 +27,8 @@ (c/with-txn-aborts op (c/rand-init-txn! test conn) (case (:f op) - :add (do (c/insert! conn :sets (select-keys op [:value])) + :add (do (c/insert! conn :sets (select-keys op [:value]) + {:transaction? (not (:single-stmt-write test))}) (c/attach-txn-info conn (assoc op :type :ok))) :read (->> (c/query conn ["select * from sets"]) diff --git a/tidb/src/tidb/sql.clj b/tidb/src/tidb/sql.clj index e70b7f5bf..934fd8d1a 100644 --- a/tidb/src/tidb/sql.clj +++ b/tidb/src/tidb/sql.clj @@ -225,6 +225,7 @@ (catch java.sql.BatchUpdateException e# (condp re-find (.getMessage e#) #"Query timed out" (assoc ~op :type :info, :error :query-timed-out) + #"Division by 0" (assoc ~op :type :fail, :error :division-by-zero) (throw e#))) (catch java.sql.SQLNonTransientConnectionException e# diff --git a/tidb/src/tidb/txn.clj b/tidb/src/tidb/txn.clj index 76605ef20..9118c3b63 100644 --- a/tidb/src/tidb/txn.clj +++ b/tidb/src/tidb/txn.clj @@ -1,8 +1,7 @@ (ns tidb.txn "Client for transactional workloads." - (:require [clojure.tools.logging :refer [info]] - [jepsen [client :as client] - [generator :as gen]] + (:require [clojure.string :as str] + [jepsen [client :as client]] [tidb.sql :as c :refer :all] [tidb.util :as util])) @@ -49,6 +48,38 @@ k k (str v) (str v)])] v))])) +(defn txn-type [test table-count txn] + (let [ops (set (map first txn)) + tbls (set (map #(table-for table-count (second %)) txn)) + single-stmt? (and (:single-stmt-write test) + (= 1 (count ops)) + (= 1 (count tbls)))] + (cond + (and single-stmt? (= :w (first ops))) :single-stmt-write + (and single-stmt? (= :append (first ops))) :single-stmt-append + (= 1 (count txn)) :query + :else :txn))) + +(defn single-stmt-write! [conn table-count op] + (let [txn (:value op) + table (table-for table-count (second (first txn))) + query (str "insert into " table " (id, sk, val) values " + (str/join ", " (repeat (count txn) "(?, ?, ?)")) + " on duplicate key update val = values(val)") + args (mapcat (fn [[_ k v]] [k k v]) txn)] + (c/execute! conn (into [query] args) {:transaction? false}) + (attach-txn-info conn (assoc op :type :ok)))) + +(defn single-stmt-append! [conn table-count op] + (let [txn (:value op) + table (table-for table-count (second (first txn))) + query (str "insert into " table " (id, sk, val) values " + (str/join ", " (repeat (count txn) "(?, ?, ?)")) + " on duplicate key update val = CONCAT(val, ',', values(val))") + args (mapcat (fn [[_ k v]] [k k (str v)]) txn)] + (c/execute! conn (into [query] args) {:transaction? false}) + (attach-txn-info conn (assoc op :type :ok)))) + (defrecord Client [conn val-type table-count] client/Client (open! [this test node] @@ -68,19 +99,26 @@ (c/execute! conn [(str "alter table " (table-name i) " cache")]))))) (invoke! [this test op] - (let [txn (:value op) - use-txn? (< 1 (count txn))] - ;use-txn? false] - (if use-txn? - (c/with-txn op [c conn {:isolation (util/isolation-level test) - :before-hook (partial c/rand-init-txn! test conn)}] - (assoc op :type :ok, :value - (mapv (partial mop! c test table-count) txn))) - (c/with-error-handling op - (let [attach-info (if (= :r (-> txn first first)) c/attach-query-info c/attach-txn-info)] - (attach-info conn - (assoc op :type :ok, :value - (mapv (partial mop! conn test table-count) txn)))))))) + (let [txn (:value op)] + (condp = (txn-type test table-count txn) + + :single-stmt-write + (c/with-error-handling op (single-stmt-write! conn table-count op)) + + :single-stmt-append + (c/with-error-handling op (single-stmt-append! conn table-count op)) + + :query + (c/with-error-handling op + (let [attach-info (if (= :r (-> txn first first)) c/attach-query-info c/attach-txn-info) + res (mapv (partial mop! conn test table-count) txn)] + (attach-info conn (assoc op :type :ok, :value res)))) + + ;; else + (c/with-txn op [c conn {:isolation (util/isolation-level test) + :before-hook (partial c/rand-init-txn! test conn)}] + (assoc op :type :ok, :value + (mapv (partial mop! c test table-count) txn)))))) (teardown! [this test])