From e7bda4b5967c66a7a876daac75805af2f8611f7e Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Wed, 21 Aug 2024 12:19:55 +0530 Subject: [PATCH] Atomic Transactions correctness with PRS, ERS and MySQL & Vttablet Restarts (#16553) Signed-off-by: Manan Gupta Signed-off-by: Harshit Gangal Co-authored-by: Harshit Gangal --- .../AtomicTransactionsWithDisruptions.md | 37 +++ .../endtoend/transaction/twopc/main_test.go | 1 + go/test/endtoend/transaction/twopc/schema.sql | 19 +- .../twopc/{fuzzer => stress}/fuzzer_test.go | 159 +++++++++++- .../twopc/{fuzzer => stress}/main_test.go | 14 +- .../twopc/{fuzzer => stress}/schema.sql | 6 + .../transaction/twopc/stress/stress_test.go | 229 ++++++++++++++++++ .../twopc/{fuzzer => stress}/vschema.json | 8 + .../endtoend/transaction/twopc/twopc_test.go | 137 +---------- .../endtoend/transaction/twopc/utils/utils.go | 32 ++- go/vt/vttablet/endtoend/framework/client.go | 2 - go/vt/vttablet/tabletmanager/rpc_actions.go | 13 + .../vttablet/tabletmanager/rpc_replication.go | 9 +- go/vt/vttablet/tabletmanager/tm_init.go | 19 ++ go/vt/vttablet/tabletmanager/tm_state.go | 3 +- go/vt/vttablet/tabletserver/controller.go | 2 + go/vt/vttablet/tabletserver/dt_executor.go | 6 +- .../tabletserver/query_executor_test.go | 3 - go/vt/vttablet/tabletserver/state_manager.go | 7 + .../tabletserver/state_manager_test.go | 5 + .../tabletserver/stateful_connection.go | 4 +- go/vt/vttablet/tabletserver/tabletserver.go | 10 +- .../tabletserver/tabletserver_test.go | 19 +- go/vt/vttablet/tabletserver/twopc.go | 4 +- go/vt/vttablet/tabletserver/tx_engine.go | 159 +++++++----- go/vt/vttablet/tabletserver/tx_prep_pool.go | 31 ++- .../tabletserver/tx_prep_pool_test.go | 18 +- go/vt/vttablet/tabletservermock/controller.go | 3 + test/config.json | 4 +- 29 files changed, 709 insertions(+), 254 deletions(-) create mode 100644 doc/design-docs/AtomicTransactionsWithDisruptions.md rename go/test/endtoend/transaction/twopc/{fuzzer => stress}/fuzzer_test.go (70%) rename go/test/endtoend/transaction/twopc/{fuzzer => stress}/main_test.go (90%) rename go/test/endtoend/transaction/twopc/{fuzzer => stress}/schema.sql (75%) create mode 100644 go/test/endtoend/transaction/twopc/stress/stress_test.go rename go/test/endtoend/transaction/twopc/{fuzzer => stress}/vschema.json (74%) diff --git a/doc/design-docs/AtomicTransactionsWithDisruptions.md b/doc/design-docs/AtomicTransactionsWithDisruptions.md new file mode 100644 index 00000000000..706308b6b2b --- /dev/null +++ b/doc/design-docs/AtomicTransactionsWithDisruptions.md @@ -0,0 +1,37 @@ +# Handling disruptions in atomic transactions + +## Overview + +This document describes how to make atomic transactions resilient in the face of disruptions. The basic design and components involved in an atomic transaction are described in [here](./TwoPhaseCommitDesign.md) The document describes each of the disruptions that can happen in a running cluster and how atomic transactions are engineered to handle them without breaking their guarantee of being atomic. + +## `PlannedReparentShard` and `EmergencyReparentShard` + +For both Planned and Emergency reparents, we call `DemotePrimary` on the primary tablet. For Planned reparent, this call has to succeed, while on Emergency reparent, if the primary is unreachable then this call can fail, and we would still proceed further. + +As part of the `DemotePrimary` flow, when we transition the tablet to a non-serving state, we wait for all the transactions to have completed (in `TxEngine.shutdownLocked()` we have `te.txPool.WaitForEmpty()`). If the user has specified a shutdown grace-period, then after that much time elapses, we go ahead and forcefully kill all running queries. We then also rollback the prepared transactions. It is crucial that we rollback the prepared transactions only after all other writes have been killed, because when we rollback a prepared transaction, it lets go of the locks it was holding. If there were some other conflicting write in progress that hadn't been killed, then it could potentially go through and cause data corruption since we won't be able to prepare the transaction again. All the code to kill queries can be found in `stateManager.terminateAllQueries()`. + +The above outlined steps ensure that we either wait for all prepared transactions to conclude or we rollback them safely so that they can be prepared again on the new primary. + +On the new primary, when we call `PromoteReplica`, we redo all the prepared transactions before we allow any new writes to go through. This ensures that the new primary is in the same state as the old primary was before the reparent. The code for redoing the prepared transactions can be found in `TxEngine.RedoPreparedTransactions()`. + +If everything goes as described above, there is no reason for redoing of prepared transactions to fail. But in case, something unexpected happens and preparing transactions fails, we still allow the vttablet to accept new writes because we decided availability of the tablet is more important. We will however, build tooling and metrics for the users to be notified of these failures and let them handle this in the way they see fit. + +While Planned reparent is an operation where all the processes are running fine, Emergency reparent is called when something has gone wrong with the cluster. Because we call `DemotePrimary` in parallel with `StopReplicationAndBuildStatusMap`, we can run into a case wherein the primary tries to write something to the binlog after all the replicas have stopped replicating. If we were to run without semi-sync, then the primary could potentially commit a prepared transaction, and return a success to the vtgate trying to commit this transaction. The vtgate can then conclude that the transaction is safe to conclude and remove all the metadata information. However, on the new primary since the transaction commit didn't get replicated, it would re-prepare the transaction and would wait for a coordinator to either commit or rollback it, but that would never happen. Essentially we would have a transaction stuck in prepared state on a shard indefinitely. To avoid this situation, it is essential that we run with semi-sync, because this ensures that any write that is acknowledged as a success to the caller, would necessarily have to be replicated to at least one replica. This ensures that the transaction would also already be committed on the new primary. + +## MySQL Restarts + +When MySQL restarts, it loses all the ongoing transactions which includes all the prepared transactions. This is because the transaction logs are not persistent across restarts. This is a MySQL limitation and there is no way to get around this. However, at the Vitess level we must ensure that we can commit the prepared transactions even in case of MySQL restarts without any failures. + +Vttablet has the code to detect MySQL failures and call `stateManager.checkMySQL()` which transitions the tablet to a NotConnected state. This prevents any writes from going through until the vttablet has transitioned back to a serving state. + +However, we cannot rely on `checkMySQL` to ensure that no conflicting writes go through. This is because the time between MySQL restart and the vttablet transitioning to a NotConnected state can be large. During this time, the vttablet would still be accepting writes and some of them could potentially conflict with the prepared transactions. + +To handle this, we rely on the fact that when MySQL restarts, it starts with super-read-only turned on. This means that no writes can go through. It is VTOrc that registers this as an issue and fixes it by calling `UndoDemotePrimary`. As part of that call, before we set MySQL to read-write, we ensure that all the prepared transactions are redone in the read_only state. We use the dba pool (that has admin permissions) to prepare the transactions. This is safe because we know that no conflicting writes can go through until we set MySQL to read-write. The code to set MySQL to read-write after redoing prepared transactions can be found in `TabletManager.redoPreparedTransactionsAndSetReadWrite()`. + +Handling MySQL restarts is the only reason we needed to add the code to redo prepared transactions whenever MySQL transitions from super-read-only to read-write state. Even though, we only need to do this in `UndoDemotePrimary`, it not necessary that it is `UndoDemotePrimary` that sets MySQL to read-write. If the user notices that the tablet is in a read-only state before VTOrc has a chance to fix it, they can manually call `SetReadWrite` on the tablet. +Therefore, the safest option was to always check if we need to redo the prepared transactions whenever MySQL transitions from super-read-only to read-write state. + +## Vttablet Restarts + +When Vttabet restarts, all the previous connections are dropped. It starts in a non-serving state, and then after reading the shard and tablet records from the topo, it transitions to a serving state. +As part of this transition we need to ensure that we redo the prepared transactions before we start accepting any writes. This is done as part of the `TxEngine.transition` function when we transition to an `AcceptingReadWrite` state. We call the same code for redoing the prepared transactions that we called for MySQL restarts, PRS and ERS. diff --git a/go/test/endtoend/transaction/twopc/main_test.go b/go/test/endtoend/transaction/twopc/main_test.go index 4c5e2715563..9a46562d1c7 100644 --- a/go/test/endtoend/transaction/twopc/main_test.go +++ b/go/test/endtoend/transaction/twopc/main_test.go @@ -111,6 +111,7 @@ func start(t *testing.T) (*mysql.Conn, func()) { ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) require.NoError(t, err) + cleanup(t) return conn, func() { conn.Close() diff --git a/go/test/endtoend/transaction/twopc/schema.sql b/go/test/endtoend/transaction/twopc/schema.sql index de9e3ef0656..7c289a03c2a 100644 --- a/go/test/endtoend/transaction/twopc/schema.sql +++ b/go/test/endtoend/transaction/twopc/schema.sql @@ -1,18 +1,21 @@ -create table twopc_user ( - id bigint, +create table twopc_user +( + id bigint, name varchar(64), primary key (id) ) Engine=InnoDB; -create table twopc_music ( - id varchar(64), +create table twopc_music +( + id varchar(64), user_id bigint, - title varchar(64), + title varchar(64), primary key (id) ) Engine=InnoDB; -create table twopc_t1 ( - id bigint, +create table twopc_t1 +( + id bigint, col bigint, - primary key (id, col) + primary key (id) ) Engine=InnoDB; \ No newline at end of file diff --git a/go/test/endtoend/transaction/twopc/fuzzer/fuzzer_test.go b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go similarity index 70% rename from go/test/endtoend/transaction/twopc/fuzzer/fuzzer_test.go rename to go/test/endtoend/transaction/twopc/stress/fuzzer_test.go index ff440164042..e81d0d0d9ab 100644 --- a/go/test/endtoend/transaction/twopc/fuzzer/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go @@ -14,13 +14,18 @@ See the License for the specific language governing permissions and limitations under the License. */ -package fuzzer +package stress import ( "context" "fmt" + "os" + "path" + "strconv" + "strings" "sync" "sync/atomic" + "syscall" "testing" "time" @@ -28,6 +33,8 @@ import ( "golang.org/x/exp/rand" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/syscallutil" + "vitess.io/vitess/go/vt/log" ) var ( @@ -67,10 +74,12 @@ var ( // Moreover, the threadIDs of rows for a given update set in the 3 shards should be the same to ensure that conflicting transactions got committed in the same exact order. func TestTwoPCFuzzTest(t *testing.T) { testcases := []struct { - name string - threads int - updateSets int - timeForTesting time.Duration + name string + threads int + updateSets int + timeForTesting time.Duration + clusterDisruptions []func() + disruptionProbability []int }{ { name: "Single Thread - Single Set", @@ -90,15 +99,24 @@ func TestTwoPCFuzzTest(t *testing.T) { updateSets: 15, timeForTesting: 5 * time.Second, }, + { + name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL and Vttablet restart disruptions", + threads: 15, + updateSets: 15, + timeForTesting: 5 * time.Second, + clusterDisruptions: []func(){prs, ers, mysqlRestarts, vttabletRestarts}, + disruptionProbability: []int{5, 5, 5, 5}, + }, } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { conn, closer := start(t) defer closer() - fz := newFuzzer(tt.threads, tt.updateSets) + fz := newFuzzer(tt.threads, tt.updateSets, tt.clusterDisruptions, tt.disruptionProbability) fz.initialize(t, conn) + conn.Close() // Start the fuzzer. fz.start(t) @@ -108,8 +126,12 @@ func TestTwoPCFuzzTest(t *testing.T) { // Signal the fuzzer to stop. fz.stop() + // Wait for all transactions to be resolved. + waitForResults(t, fmt.Sprintf(`show unresolved transactions for %v`, keyspaceName), "[]", 30*time.Second) // Verify that all the transactions run were actually atomic and no data issues have occurred. fz.verifyTransactionsWereAtomic(t) + + log.Errorf("Verification complete. All good!") }) } } @@ -176,14 +198,20 @@ type fuzzer struct { wg sync.WaitGroup // updateRowVals are the rows that we use to ensure 1 update on each shard with the same increment. updateRowsVals [][]int + // clusterDisruptions are the cluster level disruptions that can happen in a running cluster. + clusterDisruptions []func() + // disruptionProbability is the chance for the disruption to happen. We check this every 100 milliseconds. + disruptionProbability []int } // newFuzzer creates a new fuzzer struct. -func newFuzzer(threads int, updateSets int) *fuzzer { +func newFuzzer(threads int, updateSets int, clusterDisruptions []func(), disruptionProbability []int) *fuzzer { fz := &fuzzer{ - threads: threads, - updateSets: updateSets, - wg: sync.WaitGroup{}, + threads: threads, + updateSets: updateSets, + wg: sync.WaitGroup{}, + clusterDisruptions: clusterDisruptions, + disruptionProbability: disruptionProbability, } // Initially the fuzzer thread is stopped. fz.shouldStop.Store(true) @@ -202,12 +230,16 @@ func (fz *fuzzer) stop() { func (fz *fuzzer) start(t *testing.T) { // We mark the fuzzer thread to be running now. fz.shouldStop.Store(false) - fz.wg.Add(fz.threads) + // fz.threads is the count of fuzzer threads, and one disruption thread. + fz.wg.Add(fz.threads + 1) for i := 0; i < fz.threads; i++ { go func() { fz.runFuzzerThread(t, i) }() } + go func() { + fz.runClusterDisruptionThread(t) + }() } // runFuzzerThread is used to run a thread of the fuzzer. @@ -308,3 +340,108 @@ func (fz *fuzzer) generateInsertQueries(updateSet int, threadId int) []string { }) return queries } + +// runClusterDisruptionThread runs the cluster level disruptions in a separate thread. +func (fz *fuzzer) runClusterDisruptionThread(t *testing.T) { + // Whenever we finish running this thread, we should mark the thread has stopped. + defer func() { + fz.wg.Done() + }() + + for { + // If disruption thread is marked to be stopped, then we should exit this go routine. + if fz.shouldStop.Load() == true { + return + } + // Run a potential disruption + fz.runClusterDisruption(t) + time.Sleep(100 * time.Millisecond) + } + +} + +// runClusterDisruption tries to run a single cluster disruption. +func (fz *fuzzer) runClusterDisruption(t *testing.T) { + for idx, prob := range fz.disruptionProbability { + if rand.Intn(100) < prob { + fz.clusterDisruptions[idx]() + return + } + } +} + +/* +Cluster Level Disruptions for the fuzzer +*/ + +func prs() { + shards := clusterInstance.Keyspaces[0].Shards + shard := shards[rand.Intn(len(shards))] + vttablets := shard.Vttablets + newPrimary := vttablets[rand.Intn(len(vttablets))] + log.Errorf("Running PRS for - %v/%v with new primary - %v", keyspaceName, shard.Name, newPrimary.Alias) + err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, newPrimary.Alias) + if err != nil { + log.Errorf("error running PRS - %v", err) + } +} + +func ers() { + shards := clusterInstance.Keyspaces[0].Shards + shard := shards[rand.Intn(len(shards))] + vttablets := shard.Vttablets + newPrimary := vttablets[rand.Intn(len(vttablets))] + log.Errorf("Running ERS for - %v/%v with new primary - %v", keyspaceName, shard.Name, newPrimary.Alias) + _, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("EmergencyReparentShard", fmt.Sprintf("%s/%s", keyspaceName, shard.Name), "--new-primary", newPrimary.Alias) + if err != nil { + log.Errorf("error running ERS - %v", err) + } +} + +func vttabletRestarts() { + shards := clusterInstance.Keyspaces[0].Shards + shard := shards[rand.Intn(len(shards))] + vttablets := shard.Vttablets + tablet := vttablets[rand.Intn(len(vttablets))] + log.Errorf("Restarting vttablet for - %v/%v - %v", keyspaceName, shard.Name, tablet.Alias) + err := tablet.VttabletProcess.TearDown() + if err != nil { + log.Errorf("error stopping vttablet - %v", err) + return + } + tablet.VttabletProcess.ServingStatus = "SERVING" + for { + err = tablet.VttabletProcess.Setup() + if err == nil { + return + } + // Sometimes vttablets fail to connect to the topo server due to a minor blip there. + // We don't want to fail the test, so we retry setting up the vttablet. + log.Errorf("error restarting vttablet - %v", err) + time.Sleep(1 * time.Second) + } +} + +func mysqlRestarts() { + shards := clusterInstance.Keyspaces[0].Shards + shard := shards[rand.Intn(len(shards))] + vttablets := shard.Vttablets + tablet := vttablets[rand.Intn(len(vttablets))] + log.Errorf("Restarting MySQL for - %v/%v tablet - %v", keyspaceName, shard.Name, tablet.Alias) + pidFile := path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d/mysql.pid", tablet.TabletUID)) + pidBytes, err := os.ReadFile(pidFile) + if err != nil { + // We can't read the file which means the PID file does not exist + // The server must have stopped + return + } + pid, err := strconv.Atoi(strings.TrimSpace(string(pidBytes))) + if err != nil { + log.Errorf("Error in conversion to integer: %v", err) + return + } + err = syscallutil.Kill(pid, syscall.SIGKILL) + if err != nil { + log.Errorf("Error in killing process: %v", err) + } +} diff --git a/go/test/endtoend/transaction/twopc/fuzzer/main_test.go b/go/test/endtoend/transaction/twopc/stress/main_test.go similarity index 90% rename from go/test/endtoend/transaction/twopc/fuzzer/main_test.go rename to go/test/endtoend/transaction/twopc/stress/main_test.go index e0affde186a..9c7ed28fa1a 100644 --- a/go/test/endtoend/transaction/twopc/fuzzer/main_test.go +++ b/go/test/endtoend/transaction/twopc/stress/main_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package fuzzer +package stress import ( "context" @@ -75,12 +75,13 @@ func TestMain(m *testing.M) { // Start keyspace keyspace := &cluster.Keyspace{ - Name: keyspaceName, - SchemaSQL: SchemaSQL, - VSchema: VSchema, - SidecarDBName: sidecarDBName, + Name: keyspaceName, + SchemaSQL: SchemaSQL, + VSchema: VSchema, + SidecarDBName: sidecarDBName, + DurabilityPolicy: "semi_sync", } - if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 0, false); err != nil { + if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false); err != nil { return 1 } @@ -113,4 +114,5 @@ func cleanup(t *testing.T) { utils.ClearOutTable(t, vtParams, "twopc_fuzzer_insert") utils.ClearOutTable(t, vtParams, "twopc_fuzzer_update") + utils.ClearOutTable(t, vtParams, "twopc_t1") } diff --git a/go/test/endtoend/transaction/twopc/fuzzer/schema.sql b/go/test/endtoend/transaction/twopc/stress/schema.sql similarity index 75% rename from go/test/endtoend/transaction/twopc/fuzzer/schema.sql rename to go/test/endtoend/transaction/twopc/stress/schema.sql index 290da808991..5173166bfd4 100644 --- a/go/test/endtoend/transaction/twopc/fuzzer/schema.sql +++ b/go/test/endtoend/transaction/twopc/stress/schema.sql @@ -12,3 +12,9 @@ create table twopc_fuzzer_insert ( key(col), primary key (id, col) ) Engine=InnoDB; + +create table twopc_t1 ( + id bigint, + col bigint, + primary key (id) +) Engine=InnoDB; diff --git a/go/test/endtoend/transaction/twopc/stress/stress_test.go b/go/test/endtoend/transaction/twopc/stress/stress_test.go new file mode 100644 index 00000000000..9912bdf6e19 --- /dev/null +++ b/go/test/endtoend/transaction/twopc/stress/stress_test.go @@ -0,0 +1,229 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stress + +import ( + "context" + "fmt" + "os" + "path" + "strconv" + "strings" + "sync" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/syscallutil" + twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils" + "vitess.io/vitess/go/test/endtoend/utils" + "vitess.io/vitess/go/vt/log" +) + +// TestDisruptions tests that atomic transactions persevere through various disruptions. +func TestDisruptions(t *testing.T) { + testcases := []struct { + disruptionName string + commitDelayTime string + disruption func() error + }{ + { + disruptionName: "No Disruption", + commitDelayTime: "1", + disruption: func() error { + return nil + }, + }, + { + disruptionName: "PlannedReparentShard", + commitDelayTime: "5", + disruption: prsShard3, + }, + { + disruptionName: "MySQL Restart", + commitDelayTime: "5", + disruption: mysqlRestartShard3, + }, + { + disruptionName: "Vttablet Restart", + commitDelayTime: "5", + disruption: vttabletRestartShard3, + }, + { + disruptionName: "EmergencyReparentShard", + commitDelayTime: "5", + disruption: ersShard3, + }, + } + for _, tt := range testcases { + t.Run(fmt.Sprintf("%s-%ss delay", tt.disruptionName, tt.commitDelayTime), func(t *testing.T) { + // Reparent all the shards to first tablet being the primary. + reparentToFirstTablet(t) + // cleanup all the old data. + conn, closer := start(t) + defer closer() + // Start an atomic transaction. + utils.Exec(t, conn, "begin") + // Insert rows such that they go to all the three shards. Given that we have sharded the table `twopc_t1` on reverse_bits + // it is very easy to figure out what value will end up in which shard. + idVals := []int{4, 6, 9} + for _, val := range idVals { + utils.Exec(t, conn, fmt.Sprintf("insert into twopc_t1(id, col) values(%d, 4)", val)) + } + // We want to delay the commit on one of the shards to simulate slow commits on a shard. + twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitShard, "80-") + defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard) + twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitTime, tt.commitDelayTime) + defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime) + // We will execute a commit in a go routine, because we know it will take some time to complete. + // While the commit is ongoing, we would like to run the disruption. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, err := utils.ExecAllowError(t, conn, "commit") + if err != nil { + log.Errorf("Error in commit - %v", err) + } + }() + // Allow enough time for the commit to have started. + time.Sleep(1 * time.Second) + writeCtx, writeCancel := context.WithCancel(context.Background()) + var writerWg sync.WaitGroup + // Run multiple threads to try to write to the database on the same values of id to ensure that we don't + // allow any writes while the transaction is prepared and not committed. + for i := 0; i < 10; i++ { + writerWg.Add(1) + go func() { + defer writerWg.Done() + threadToWrite(t, writeCtx, idVals[i%3]) + }() + } + // Run the disruption. + err := tt.disruption() + require.NoError(t, err) + // Wait for the commit to have returned. We don't actually check for an error in the commit because the user might receive an error. + // But since we are waiting in CommitPrepared, the decision to commit the transaction should have already been taken. + wg.Wait() + // Check the data in the table. + waitForResults(t, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 30*time.Second) + writeCancel() + writerWg.Wait() + }) + } +} + +// threadToWrite is a helper function to write to the database in a loop. +func threadToWrite(t *testing.T, ctx context.Context, id int) { + for { + select { + case <-ctx.Done(): + return + default: + } + conn, err := mysql.Connect(ctx, &vtParams) + if err != nil { + continue + } + _, _ = utils.ExecAllowError(t, conn, fmt.Sprintf("insert into twopc_t1(id, col) values(%d, %d)", id, rand.Intn(10000))) + } +} + +// reparentToFirstTablet reparents all the shards to first tablet being the primary. +func reparentToFirstTablet(t *testing.T) { + ks := clusterInstance.Keyspaces[0] + for _, shard := range ks.Shards { + primary := shard.Vttablets[0] + err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, primary.Alias) + require.NoError(t, err) + } +} + +// waitForResults waits for the results of the query to be as expected. +func waitForResults(t *testing.T, query string, resultExpected string, waitTime time.Duration) { + timeout := time.After(waitTime) + var prevRes []sqltypes.Row + for { + select { + case <-timeout: + t.Fatalf("didn't reach expected results for %s. Last results - %v", query, prevRes) + default: + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + if err == nil { + res := utils.Exec(t, conn, query) + conn.Close() + prevRes = res.Rows + if fmt.Sprintf("%v", res.Rows) == resultExpected { + return + } + } + time.Sleep(100 * time.Millisecond) + } + } +} + +/* +Cluster Level Disruptions for the fuzzer +*/ + +// prsShard3 runs a PRS in shard 3 of the keyspace. It promotes the second tablet to be the new primary. +func prsShard3() error { + shard := clusterInstance.Keyspaces[0].Shards[2] + newPrimary := shard.Vttablets[1] + return clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, newPrimary.Alias) +} + +// ersShard3 runs a ERS in shard 3 of the keyspace. It promotes the second tablet to be the new primary. +func ersShard3() error { + shard := clusterInstance.Keyspaces[0].Shards[2] + newPrimary := shard.Vttablets[1] + _, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("EmergencyReparentShard", fmt.Sprintf("%s/%s", keyspaceName, shard.Name), "--new-primary", newPrimary.Alias) + return err +} + +// vttabletRestartShard3 restarts the first vttablet of the third shard. +func vttabletRestartShard3() error { + shard := clusterInstance.Keyspaces[0].Shards[2] + tablet := shard.Vttablets[0] + return tablet.RestartOnlyTablet() +} + +// mysqlRestartShard3 restarts MySQL on the first tablet of the third shard. +func mysqlRestartShard3() error { + shard := clusterInstance.Keyspaces[0].Shards[2] + vttablets := shard.Vttablets + tablet := vttablets[0] + log.Errorf("Restarting MySQL for - %v/%v tablet - %v", keyspaceName, shard.Name, tablet.Alias) + pidFile := path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d/mysql.pid", tablet.TabletUID)) + pidBytes, err := os.ReadFile(pidFile) + if err != nil { + // We can't read the file which means the PID file does not exist + // The server must have stopped + return err + } + pid, err := strconv.Atoi(strings.TrimSpace(string(pidBytes))) + if err != nil { + return err + } + return syscallutil.Kill(pid, syscall.SIGKILL) +} diff --git a/go/test/endtoend/transaction/twopc/fuzzer/vschema.json b/go/test/endtoend/transaction/twopc/stress/vschema.json similarity index 74% rename from go/test/endtoend/transaction/twopc/fuzzer/vschema.json rename to go/test/endtoend/transaction/twopc/stress/vschema.json index e3854f8f101..415b5958f54 100644 --- a/go/test/endtoend/transaction/twopc/fuzzer/vschema.json +++ b/go/test/endtoend/transaction/twopc/stress/vschema.json @@ -21,6 +21,14 @@ "name": "reverse_bits" } ] + }, + "twopc_t1": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] } } } \ No newline at end of file diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index 5aab1f5a2e2..ce104fa94ec 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -20,8 +20,6 @@ import ( "context" _ "embed" "fmt" - "os" - "path" "reflect" "sort" "strings" @@ -35,19 +33,14 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" + twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils" "vitess.io/vitess/go/test/endtoend/utils" "vitess.io/vitess/go/vt/callerid" - "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/vtgate/vtgateconn" ) -const ( - DebugDelayCommitShard = "VT_DELAY_COMMIT_SHARD" - DebugDelayCommitTime = "VT_DELAY_COMMIT_TIME" -) - // TestDTCommit tests distributed transaction commit for insert, update and delete operations // It verifies the binlog events for the same with transaction state changes and redo statements. func TestDTCommit(t *testing.T) { @@ -996,10 +989,10 @@ func TestReadingUnresolvedTransactions(t *testing.T) { utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)") utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)") // We want to delay the commit on one of the shards to simulate slow commits on a shard. - writeTestCommunicationFile(t, DebugDelayCommitShard, "80-") - defer deleteFile(DebugDelayCommitShard) - writeTestCommunicationFile(t, DebugDelayCommitTime, "5") - defer deleteFile(DebugDelayCommitTime) + twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitShard, "80-") + defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard) + twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitTime, "5") + defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime) // We will execute a commit in a go routine, because we know it will take some time to complete. // While the commit is ongoing, we would like to check that we see the unresolved transaction. var wg sync.WaitGroup @@ -1008,7 +1001,7 @@ func TestReadingUnresolvedTransactions(t *testing.T) { defer wg.Done() _, err := utils.ExecAllowError(t, conn, "commit") if err != nil { - log.Errorf("Error in commit - %v", err) + fmt.Println("Error in commit: ", err.Error()) } }() // Allow enough time for the commit to have started. @@ -1029,121 +1022,3 @@ func TestReadingUnresolvedTransactions(t *testing.T) { }) } } - -// TestDisruptions tests that atomic transactions persevere through various disruptions. -func TestDisruptions(t *testing.T) { - testcases := []struct { - disruptionName string - commitDelayTime string - disruption func() error - }{ - { - disruptionName: "No Disruption", - commitDelayTime: "1", - disruption: func() error { - return nil - }, - }, - { - disruptionName: "PlannedReparentShard", - commitDelayTime: "5", - disruption: prsShard3, - }, - } - for _, tt := range testcases { - t.Run(fmt.Sprintf("%s-%ss timeout", tt.disruptionName, tt.commitDelayTime), func(t *testing.T) { - // Reparent all the shards to first tablet being the primary. - reparentToFistTablet(t) - // cleanup all the old data. - conn, closer := start(t) - defer closer() - // Start an atomic transaction. - utils.Exec(t, conn, "begin") - // Insert rows such that they go to all the three shards. Given that we have sharded the table `twopc_t1` on reverse_bits - // it is very easy to figure out what value will end up in which shard. - utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)") - utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)") - utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)") - // We want to delay the commit on one of the shards to simulate slow commits on a shard. - writeTestCommunicationFile(t, DebugDelayCommitShard, "80-") - defer deleteFile(DebugDelayCommitShard) - writeTestCommunicationFile(t, DebugDelayCommitTime, tt.commitDelayTime) - defer deleteFile(DebugDelayCommitTime) - // We will execute a commit in a go routine, because we know it will take some time to complete. - // While the commit is ongoing, we would like to run the disruption. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - _, err := utils.ExecAllowError(t, conn, "commit") - if err != nil { - log.Errorf("Error in commit - %v", err) - } - }() - // Allow enough time for the commit to have started. - time.Sleep(1 * time.Second) - // Run the disruption. - err := tt.disruption() - require.NoError(t, err) - // Wait for the commit to have returned. We don't actually check for an error in the commit because the user might receive an error. - // But since we are waiting in CommitPrepared, the decision to commit the transaction should have already been taken. - wg.Wait() - // Check the data in the table. - waitForResults(t, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 10*time.Second) - }) - } -} - -// reparentToFistTablet reparents all the shards to first tablet being the primary. -func reparentToFistTablet(t *testing.T) { - ks := clusterInstance.Keyspaces[0] - for _, shard := range ks.Shards { - primary := shard.Vttablets[0] - err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, primary.Alias) - require.NoError(t, err) - } -} - -// writeTestCommunicationFile writes the content to the file with the given name. -// We use these files to coordinate with the vttablets running in the debug mode. -func writeTestCommunicationFile(t *testing.T, fileName string, content string) { - err := os.WriteFile(path.Join(os.Getenv("VTDATAROOT"), fileName), []byte(content), 0644) - require.NoError(t, err) -} - -// deleteFile deletes the file specified. -func deleteFile(fileName string) { - _ = os.Remove(path.Join(os.Getenv("VTDATAROOT"), fileName)) -} - -// waitForResults waits for the results of the query to be as expected. -func waitForResults(t *testing.T, query string, resultExpected string, waitTime time.Duration) { - timeout := time.After(waitTime) - for { - select { - case <-timeout: - t.Fatalf("didn't reach expected results for %s", query) - default: - ctx := context.Background() - conn, err := mysql.Connect(ctx, &vtParams) - require.NoError(t, err) - res := utils.Exec(t, conn, query) - conn.Close() - if fmt.Sprintf("%v", res.Rows) == resultExpected { - return - } - time.Sleep(100 * time.Millisecond) - } - } -} - -/* -Cluster Level Disruptions for the fuzzer -*/ - -// prsShard3 runs a PRS in shard 3 of the keyspace. It promotes the second tablet to be the new primary. -func prsShard3() error { - shard := clusterInstance.Keyspaces[0].Shards[2] - newPrimary := shard.Vttablets[1] - return clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, newPrimary.Alias) -} diff --git a/go/test/endtoend/transaction/twopc/utils/utils.go b/go/test/endtoend/transaction/twopc/utils/utils.go index 7311375ee55..b3b8796accf 100644 --- a/go/test/endtoend/transaction/twopc/utils/utils.go +++ b/go/test/endtoend/transaction/twopc/utils/utils.go @@ -19,12 +19,19 @@ package utils import ( "context" "fmt" + "os" + "path" "testing" + "time" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/vt/log" +) + +const ( + DebugDelayCommitShard = "VT_DELAY_COMMIT_SHARD" + DebugDelayCommitTime = "VT_DELAY_COMMIT_TIME" ) // ClearOutTable deletes everything from a table. Sometimes the table might have more rows than allowed in a single delete query, @@ -33,12 +40,16 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) { ctx := context.Background() for { conn, err := mysql.Connect(ctx, &vtParams) - require.NoError(t, err) + if err != nil { + fmt.Printf("Error in connection - %v\n", err) + continue + } res, err := conn.ExecuteFetch(fmt.Sprintf("SELECT count(*) FROM %v", tableName), 1, false) if err != nil { - log.Errorf("Error in selecting - %v", err) + fmt.Printf("Error in selecting - %v\n", err) conn.Close() + time.Sleep(100 * time.Millisecond) continue } require.Len(t, res.Rows, 1) @@ -51,9 +62,22 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) { } _, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false) if err != nil { - log.Errorf("Error in cleanup deletion - %v", err) + fmt.Printf("Error in cleanup deletion - %v\n", err) conn.Close() + time.Sleep(100 * time.Millisecond) continue } } } + +// WriteTestCommunicationFile writes the content to the file with the given name. +// We use these files to coordinate with the vttablets running in the debug mode. +func WriteTestCommunicationFile(t *testing.T, fileName string, content string) { + err := os.WriteFile(path.Join(os.Getenv("VTDATAROOT"), fileName), []byte(content), 0644) + require.NoError(t, err) +} + +// DeleteFile deletes the file specified. +func DeleteFile(fileName string) { + _ = os.Remove(path.Join(os.Getenv("VTDATAROOT"), fileName)) +} diff --git a/go/vt/vttablet/endtoend/framework/client.go b/go/vt/vttablet/endtoend/framework/client.go index 1cbff71dc25..e4c2aa66066 100644 --- a/go/vt/vttablet/endtoend/framework/client.go +++ b/go/vt/vttablet/endtoend/framework/client.go @@ -187,8 +187,6 @@ func (client *QueryClient) UnresolvedTransactions() ([]*querypb.TransactionMetad // It currently supports only primary->replica and back. func (client *QueryClient) SetServingType(tabletType topodatapb.TabletType) error { err := client.server.SetServingType(tabletType, time.Time{}, true /* serving */, "" /* reason */) - // Wait for TwoPC transition, if necessary - client.server.TwoPCEngineWait() return err } diff --git a/go/vt/vttablet/tabletmanager/rpc_actions.go b/go/vt/vttablet/tabletmanager/rpc_actions.go index 45dd51670ba..21560f9d34b 100644 --- a/go/vt/vttablet/tabletmanager/rpc_actions.go +++ b/go/vt/vttablet/tabletmanager/rpc_actions.go @@ -78,7 +78,20 @@ func (tm *TabletManager) SetReadOnly(ctx context.Context, rdonly bool) error { return err } defer tm.unlock() + superRo, err := tm.MysqlDaemon.IsSuperReadOnly(ctx) + if err != nil { + return err + } + if !rdonly && superRo { + // If super read only is set, then we need to prepare the transactions before setting read_only OFF. + // We need to redo the prepared transactions in read only mode using the dba user to ensure we don't lose them. + // setting read_only OFF will also set super_read_only OFF if it was set. + // If super read only is already off, then we probably called this function from PRS or some other place + // because it is idempotent. We only need to redo prepared transactions the first time we transition from super read only + // to read write. + return tm.redoPreparedTransactionsAndSetReadWrite(ctx) + } return tm.MysqlDaemon.SetReadOnly(ctx, rdonly) } diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 3e745222092..b34e94a16a7 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -544,9 +544,10 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure defer func() { if finalErr != nil && revertPartialFailure && !wasReadOnly { + // We need to redo the prepared transactions in read only mode using the dba user to ensure we don't lose them. // setting read_only OFF will also set super_read_only OFF if it was set - if err := tm.MysqlDaemon.SetReadOnly(ctx, false); err != nil { - log.Warningf("SetReadOnly(false) failed during revert: %v", err) + if err = tm.redoPreparedTransactionsAndSetReadWrite(ctx); err != nil { + log.Warningf("RedoPreparedTransactionsAndSetReadWrite failed during revert: %v", err) } } }() @@ -599,8 +600,8 @@ func (tm *TabletManager) UndoDemotePrimary(ctx context.Context, semiSync bool) e return err } - // Now, set the server read-only false. - if err := tm.MysqlDaemon.SetReadOnly(ctx, false); err != nil { + // We need to redo the prepared transactions in read only mode using the dba user to ensure we don't lose them. + if err = tm.redoPreparedTransactionsAndSetReadWrite(ctx); err != nil { return err } diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go index 6046ed99727..2e70596b686 100644 --- a/go/vt/vttablet/tabletmanager/tm_init.go +++ b/go/vt/vttablet/tabletmanager/tm_init.go @@ -50,6 +50,7 @@ import ( "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/flagutil" "vitess.io/vitess/go/mysql/collations" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/sets" @@ -751,6 +752,24 @@ func (tm *TabletManager) findMysqlPort(retryInterval time.Duration) { } } +// redoPreparedTransactionsAndSetReadWrite redoes prepared transactions in read-only mode. +// We turn off super read only mode, and then redo the transactions. Finally, we turn off read-only mode to allow for further traffic. +func (tm *TabletManager) redoPreparedTransactionsAndSetReadWrite(ctx context.Context) error { + _, err := tm.MysqlDaemon.SetSuperReadOnly(ctx, false) + if err != nil { + // Ignore the error if the sever doesn't support super read only variable. + // We should just redo the preapred transactions before we set it to read-write. + if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Number() == sqlerror.ERUnknownSystemVariable { + log.Warningf("server does not know about super_read_only, continuing anyway...") + } else { + return err + } + } + tm.QueryServiceControl.RedoPreparedTransactions() + err = tm.MysqlDaemon.SetReadOnly(ctx, false) + return err +} + func (tm *TabletManager) initTablet(ctx context.Context) error { tablet := tm.Tablet() err := tm.TopoServer.CreateTablet(ctx, tablet) diff --git a/go/vt/vttablet/tabletmanager/tm_state.go b/go/vt/vttablet/tabletmanager/tm_state.go index d9389bf3559..cf56c515cfc 100644 --- a/go/vt/vttablet/tabletmanager/tm_state.go +++ b/go/vt/vttablet/tabletmanager/tm_state.go @@ -214,9 +214,10 @@ func (ts *tmState) ChangeTabletType(ctx context.Context, tabletType topodatapb.T } if action == DBActionSetReadWrite { + // We need to redo the prepared transactions in read only mode using the dba user to ensure we don't lose them. // We call SetReadOnly only after the topo has been updated to avoid // situations where two tablets are primary at the DB level but not at the vitess level - if err := ts.tm.MysqlDaemon.SetReadOnly(ctx, false); err != nil { + if err = ts.tm.redoPreparedTransactionsAndSetReadWrite(ctx); err != nil { return err } } diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go index 0336d9a73cc..69d2edbfdc1 100644 --- a/go/vt/vttablet/tabletserver/controller.go +++ b/go/vt/vttablet/tabletserver/controller.go @@ -93,6 +93,8 @@ type Controller interface { // CheckThrottler CheckThrottler(ctx context.Context, appName string, flags *throttle.CheckFlags) *throttle.CheckResult GetThrottlerStatus(ctx context.Context) *throttle.ThrottlerStatus + + RedoPreparedTransactions() } // Ensure TabletServer satisfies Controller interface. diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index 1fd1df12d56..a08cd9dc635 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -221,13 +221,13 @@ func (dte *DTExecutor) StartCommit(transactionID int64, dtid string) error { // If the connection is tainted, we cannot take a commit decision on it. if conn.IsTainted() { dte.inTransaction(func(conn *StatefulConnection) error { - return dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_ROLLBACK) + return dte.te.twoPC.Transition(dte.ctx, conn, dtid, DTStateRollback) }) // return the error, defer call above will roll back the transaction. return vterrors.VT10002("cannot commit the transaction on a reserved connection") } - err = dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_COMMIT) + err = dte.te.twoPC.Transition(dte.ctx, conn, dtid, DTStateCommit) if err != nil { return err } @@ -254,7 +254,7 @@ func (dte *DTExecutor) SetRollback(dtid string, transactionID int64) error { } return dte.inTransaction(func(conn *StatefulConnection) error { - return dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_ROLLBACK) + return dte.te.twoPC.Transition(dte.ctx, conn, dtid, DTStateRollback) }) } diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index 771d9e3479d..cc72c629ddb 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -1532,9 +1532,6 @@ func newTestTabletServer(ctx context.Context, flags executorFlags, db *fakesqldb tsv := NewTabletServer(ctx, vtenv.NewTestEnv(), "TabletServerTest", cfg, memorytopo.NewServer(ctx, ""), &topodatapb.TabletAlias{}, srvTopoCounts) target := &querypb.Target{TabletType: topodatapb.TabletType_PRIMARY} err := tsv.StartService(target, dbconfigs, nil /* mysqld */) - if cfg.TwoPCEnable { - tsv.TwoPCEngineWait() - } if err != nil { panic(err) } diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 308f9165ba6..3fe78457b60 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -164,6 +164,7 @@ type ( AcceptReadWrite() AcceptReadOnly() Close() + RollbackPrepared() } subComponent interface { @@ -610,6 +611,12 @@ func (sm *stateManager) terminateAllQueries(wg *sync.WaitGroup) (cancel func()) log.Infof("Killed all stateless OLTP queries.") sm.statefulql.TerminateAll() log.Infof("Killed all OLTP queries.") + // We can rollback prepared transactions only after we have killed all the write queries in progress. + // This is essential because when we rollback a prepared transaction, it lets go of the locks it was holding. + // If there were some other conflicting write in progress that hadn't been killed, then it could potentially go through + // and cause data corruption since we won't be able to prepare the transaction again. + sm.te.RollbackPrepared() + log.Infof("Rollbacked all prepared transactions") }() return cancel } diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 02896eeefe0..f70e77de710 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -379,6 +379,9 @@ func (te *delayedTxEngine) Close() { time.Sleep(50 * time.Millisecond) } +func (te *delayedTxEngine) RollbackPrepared() { +} + type killableConn struct { id int64 killed atomic.Bool @@ -903,6 +906,8 @@ func (te *testTxEngine) Close() { te.state = testStateClosed } +func (te *testTxEngine) RollbackPrepared() {} + type testSubcomponent struct { testOrderState } diff --git a/go/vt/vttablet/tabletserver/stateful_connection.go b/go/vt/vttablet/tabletserver/stateful_connection.go index c0dc973fa87..91d51677241 100644 --- a/go/vt/vttablet/tabletserver/stateful_connection.go +++ b/go/vt/vttablet/tabletserver/stateful_connection.go @@ -174,7 +174,9 @@ func (sc *StatefulConnection) ReleaseString(reason string) { if sc.dbConn == nil { return } - sc.pool.unregister(sc.ConnID, reason) + if sc.pool != nil { + sc.pool.unregister(sc.ConnID, reason) + } sc.dbConn.Recycle() sc.dbConn = nil sc.logReservedConn() diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index e3e951892b7..62cc5ca32f0 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -476,11 +476,6 @@ func (tsv *TabletServer) TableGC() *gc.TableGC { return tsv.tableGC } -// TwoPCEngineWait waits until the TwoPC engine has been opened, and the redo read -func (tsv *TabletServer) TwoPCEngineWait() { - tsv.te.twoPCReady.Wait() -} - // SchemaEngine returns the SchemaEngine part of TabletServer. func (tsv *TabletServer) SchemaEngine() *schema.Engine { return tsv.se @@ -1692,6 +1687,11 @@ func (tsv *TabletServer) GetThrottlerStatus(ctx context.Context) *throttle.Throt return r } +// RedoPreparedTransactions redoes the prepared transactions. +func (tsv *TabletServer) RedoPreparedTransactions() { + tsv.te.RedoPreparedTransactions() +} + // HandlePanic is part of the queryservice.QueryService interface func (tsv *TabletServer) HandlePanic(err *error) { if x := recover(); x != nil { diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 7ffd201c0a4..7f863e26df7 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -154,9 +154,10 @@ func TestTabletServerPrimaryToReplica(t *testing.T) { // Reuse code from tx_executor_test. _, tsv, db := newTestTxExecutor(t, ctx) // This is required because the test is verifying that we rollback transactions on changing serving type, - // but that only happens immediately if the shut down grace period is not specified. - tsv.te.shutdownGracePeriod = 0 - tsv.sm.shutdownGracePeriod = 0 + // but that only happens when we have a shutdown grace period, otherwise we wait for transactions to be resolved + // indefinitely. + tsv.te.shutdownGracePeriod = 1 + tsv.sm.shutdownGracePeriod = 1 defer tsv.StopService() defer db.Close() target := querypb.Target{TabletType: topodatapb.TabletType_PRIMARY} @@ -200,14 +201,20 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) { _, tsv, db := newTestTxExecutor(t, ctx) defer tsv.StopService() defer db.Close() - tsv.SetServingType(topodatapb.TabletType_REPLICA, time.Time{}, true, "") + // This is required because the test is verifying that we rollback transactions on changing serving type, + // but that only happens when we have a shutdown grace period, otherwise we wait for transactions to be resolved + // indefinitely. + tsv.te.shutdownGracePeriod = 1 + tsv.sm.shutdownGracePeriod = 1 + tsv.SetServingType(topodatapb.TabletType_PRIMARY, time.Time{}, false, "") turnOnTxEngine := func() { tsv.SetServingType(topodatapb.TabletType_PRIMARY, time.Time{}, true, "") - tsv.TwoPCEngineWait() } turnOffTxEngine := func() { - tsv.SetServingType(topodatapb.TabletType_REPLICA, time.Time{}, true, "") + // We can use a transition to PRIMARY non-serving or REPLICA serving to turn off the transaction engine. + // With primary serving, the shutdown of prepared transactions is synchronous, but for the latter its asynchronous. + tsv.SetServingType(topodatapb.TabletType_PRIMARY, time.Time{}, false, "") } tpc := tsv.te.twoPC diff --git a/go/vt/vttablet/tabletserver/twopc.go b/go/vt/vttablet/tabletserver/twopc.go index 0bdf4ac0c91..b3c5ab628c3 100644 --- a/go/vt/vttablet/tabletserver/twopc.go +++ b/go/vt/vttablet/tabletserver/twopc.go @@ -374,7 +374,7 @@ func (tpc *TwoPC) ReadTransaction(ctx context.Context, dtid string) (*querypb.Tr return nil, vterrors.Wrapf(err, "error parsing state for dtid %s", dtid) } result.State = querypb.TransactionState(st) - if result.State < querypb.TransactionState_PREPARE || result.State > querypb.TransactionState_COMMIT { + if result.State < DTStatePrepare || result.State > DTStateCommit { return nil, fmt.Errorf("unexpected state for dtid %s: %v", dtid, result.State) } // A failure in time parsing will show up as a very old time, @@ -427,7 +427,7 @@ func (tpc *TwoPC) ReadAllTransactions(ctx context.Context) ([]*tx.DistributedTx, log.Errorf("Error parsing state for dtid %s: %v.", dtid, err) } protostate := querypb.TransactionState(st) - if protostate < querypb.TransactionState_PREPARE || protostate > querypb.TransactionState_COMMIT { + if protostate < DTStatePrepare || protostate > DTStateCommit { log.Errorf("Unexpected state for dtid %s: %v.", dtid, protostate) } curTx = &tx.DistributedTx{ diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index 33e22e321bc..ea4e0b1e41d 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -87,7 +87,6 @@ type TxEngine struct { txPool *TxPool preparedPool *TxPreparedPool twoPC *TwoPC - twoPCReady sync.WaitGroup dxNotify func() } @@ -128,9 +127,6 @@ func NewTxEngine(env tabletenv.Env, dxNotifier func()) *TxEngine { } // AcceptReadWrite will start accepting all transactions. -// If transitioning from RO mode, transactions are rolled -// back before accepting new transactions. This is to allow -// for 2PC state to be correctly initialized. func (te *TxEngine) AcceptReadWrite() { te.transition(AcceptingReadAndWrite) } @@ -149,37 +145,70 @@ func (te *TxEngine) transition(state txEngineState) { } log.Infof("TxEngine transition: %v", state) - switch te.state { - case AcceptingReadOnly, AcceptingReadAndWrite: + + // When we are transitioning from read write state, we should close all transactions. + if te.state == AcceptingReadAndWrite { te.shutdownLocked() - case NotServing: - // No special action. } te.state = state + if te.twopcEnabled && te.state == AcceptingReadAndWrite { + // If the prepared pool is not open, then we need to redo the prepared transactions + // before we open the transaction engine to accept new writes. + // This check is required because during a Promotion, we would have already setup the prepared pool + // and redid the prepared transactions when we turn super_read_only off. So we don't need to do it again. + if !te.preparedPool.IsOpen() { + // We need to redo prepared transactions here to handle vttablet restarts. + // If MySQL continues to work fine, then we won't end up redoing the prepared transactions as part of any RPC call + // since VTOrc won't call `UndoDemotePrimary`. We need to do them as part of this transition. + te.redoPreparedTransactionsLocked() + } + te.startTransactionWatcher() + } te.txPool.Open(te.env.Config().DB.AppWithDB(), te.env.Config().DB.DbaWithDB(), te.env.Config().DB.AppDebugWithDB()) +} - if te.twopcEnabled && te.state == AcceptingReadAndWrite { - // Set the preparedPool to start accepting connections. - te.preparedPool.shutdown = false - // If there are errors, we choose to raise an alert and - // continue anyway. Serving traffic is considered more important - // than blocking everything for the sake of a few transactions. - // We do this async; so we do not end up blocking writes on - // failover for our setup tasks if using semi-sync replication. - te.twoPCReady.Add(1) - go func() { - defer te.twoPCReady.Done() - if err := te.twoPC.Open(te.env.Config().DB); err != nil { - te.env.Stats().InternalErrors.Add("TwopcOpen", 1) - log.Errorf("Could not open TwoPC engine: %v", err) - } - if err := te.prepareFromRedo(); err != nil { - te.env.Stats().InternalErrors.Add("TwopcResurrection", 1) - log.Errorf("Could not prepare transactions: %v", err) - } - te.startTransactionWatcher() - }() +// RedoPreparedTransactions acquires the state lock and calls redoPreparedTransactionsLocked. +func (te *TxEngine) RedoPreparedTransactions() { + if te.twopcEnabled { + te.stateLock.Lock() + defer te.stateLock.Unlock() + te.redoPreparedTransactionsLocked() + } +} + +// redoPreparedTransactionsLocked redoes the prepared transactions. +// If there are errors, we choose to raise an alert and +// continue anyway. Serving traffic is considered more important +// than blocking everything for the sake of a few transactions. +// We do this async; so we do not end up blocking writes on +// failover for our setup tasks if using semi-sync replication. +func (te *TxEngine) redoPreparedTransactionsLocked() { + oldState := te.state + // We shutdown to ensure no other writes are in progress. + te.shutdownLocked() + defer func() { + te.state = oldState + }() + + if err := te.twoPC.Open(te.env.Config().DB); err != nil { + te.env.Stats().InternalErrors.Add("TwopcOpen", 1) + log.Errorf("Could not open TwoPC engine: %v", err) + return + } + + // We should only open the prepared pool and the transaction pool if the opening of twoPC pool is successful. + // We use the prepared pool being open to know if we need to redo the prepared transactions. + // So if we open the prepared pool and then opening of twoPC fails, we will never end up opening the twoPC pool at all! + // This is why opening prepared pool after the twoPC pool is crucial for correctness. + te.preparedPool.Open() + // We have to defer opening the transaction pool because we call shutdown in the beginning that closes it. + // We want to open the transaction pool after the prepareFromRedo has run. Also, we want this to run even if that fails. + defer te.txPool.Open(te.env.Config().DB.AppWithDB(), te.env.Config().DB.DbaWithDB(), te.env.Config().DB.AppDebugWithDB()) + + if err := te.prepareFromRedo(); err != nil { + te.env.Stats().InternalErrors.Add("TwopcResurrection", 1) + log.Errorf("Could not prepare transactions: %v", err) } } @@ -306,11 +335,6 @@ func (te *TxEngine) shutdownLocked() { te.stateLock.Lock() log.Infof("TxEngine - state lock acquired again") - // Shut down functions are idempotent. - // No need to check if 2pc is enabled. - log.Infof("TxEngine - stop watchdog") - te.stopTransactionWatcher() - poolEmpty := make(chan bool) rollbackDone := make(chan bool) // This goroutine decides if transactions have to be @@ -333,13 +357,6 @@ func (te *TxEngine) shutdownLocked() { // connections. te.txPool.scp.ShutdownNonTx() if te.shutdownGracePeriod <= 0 { - // No grace period was specified. Wait indefinitely for transactions to be concluded. - // TODO(sougou): invoking rollbackPrepared is incorrect here. Prepared statements should - // actually be rolled back last. But this will cause the shutdown to hang because the - // tx pool will never become empty, because the prepared pool is holding on to connections - // from the tx pool. But we plan to deprecate this approach to 2PC. So, this - // should eventually be deleted. - te.rollbackPrepared() log.Info("No grace period specified: performing normal wait.") return } @@ -354,6 +371,9 @@ func (te *TxEngine) shutdownLocked() { log.Info("Transactions completed before grace period: shutting down.") } }() + // It is important to note, that we aren't rolling back prepared transactions here. + // That is happneing in the same place where we are killing queries. This will block + // until either all prepared transactions get resolved or rollbacked. log.Infof("TxEngine - waiting for empty txPool") te.txPool.WaitForEmpty() // If the goroutine is still running, signal that it can exit. @@ -362,10 +382,19 @@ func (te *TxEngine) shutdownLocked() { log.Infof("TxEngine - making sure the goroutine has returned") <-rollbackDone + // We stop the transaction watcher so late, because if the user isn't running + // with any shutdown grace period, we still want the watcher to run while we are waiting + // for resolving transactions. + log.Infof("TxEngine - stop transaction watcher") + te.stopTransactionWatcher() + + // Mark the prepared pool closed. log.Infof("TxEngine - closing the txPool") te.txPool.Close() log.Infof("TxEngine - closing twoPC") te.twoPC.Close() + log.Infof("TxEngine - closing the prepared pool") + te.preparedPool.Close() log.Infof("TxEngine - finished shutdownLocked") } @@ -391,16 +420,17 @@ outer: if txid > maxid { maxid = txid } - conn, _, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + // We need to redo the prepared transactions using a dba user because MySQL might still be in read only mode. + conn, err := te.beginNewDbaConnection(ctx) if err != nil { - allErr.RecordError(err) + allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid)) continue } for _, stmt := range preparedTx.Queries { conn.TxProperties().RecordQuery(stmt) _, err := conn.Exec(ctx, stmt, 1, false) if err != nil { - allErr.RecordError(err) + allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid)) te.txPool.RollbackAndRelease(ctx, conn) continue outer } @@ -409,7 +439,7 @@ outer: // we don't want to write again to the redo log. err = te.preparedPool.Put(conn, preparedTx.Dtid) if err != nil { - allErr.RecordError(err) + allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid)) continue } } @@ -428,21 +458,21 @@ outer: return allErr.Error() } -// shutdownTransactions rolls back all open transactions -// including the prepared ones. -// This is used for transitioning from a primary to a non-primary -// serving type. +// shutdownTransactions rolls back all open transactions that are idol. +// These are transactions that are open but no write is executing on them right now. +// By definition, prepared transactions aren't part of them since these are transactions on which +// the user has issued a commit command. These transactions are rollbacked elsewhere when we kill all writes. +// This is used for transitioning from a primary to a non-primary serving type. func (te *TxEngine) shutdownTransactions() { - te.rollbackPrepared() ctx := tabletenv.LocalContext() - // The order of rollbacks is currently not material because - // we don't allow new statements or commits during - // this function. In case of any such change, this will - // have to be revisited. te.txPool.Shutdown(ctx) } -func (te *TxEngine) rollbackPrepared() { +// RollbackPrepared rollbacks all the prepared transactions. +// This should only be called after we are certain no other writes are in progress. +// If there were some other conflicting write in progress that hadn't been killed, then it could potentially go through +// and cause data corruption since we won't be able to prepare the transaction again. +func (te *TxEngine) RollbackPrepared() { ctx := tabletenv.LocalContext() for _, conn := range te.preparedPool.FetchAllForRollback() { te.txPool.Rollback(ctx, conn) @@ -581,3 +611,22 @@ func (te *TxEngine) Release(connID int64) error { return nil } + +// beginNewDbaConnection gets a new dba connection and starts a transaction in it. +// This should only be used to redo prepared transactions. All the other writes should use the normal pool. +func (te *TxEngine) beginNewDbaConnection(ctx context.Context) (*StatefulConnection, error) { + dbConn, err := connpool.NewConn(ctx, te.env.Config().DB.DbaWithDB(), nil, nil, te.env) + if err != nil { + return nil, err + } + + sc := &StatefulConnection{ + dbConn: &connpool.PooledConn{ + Conn: dbConn, + }, + env: te.env, + } + + _, _, err = te.txPool.begin(ctx, nil, false, sc, nil) + return sc, err +} diff --git a/go/vt/vttablet/tabletserver/tx_prep_pool.go b/go/vt/vttablet/tabletserver/tx_prep_pool.go index d5376172856..c801e208e33 100644 --- a/go/vt/vttablet/tabletserver/tx_prep_pool.go +++ b/go/vt/vttablet/tabletserver/tx_prep_pool.go @@ -36,8 +36,8 @@ type TxPreparedPool struct { mu sync.Mutex conns map[string]*StatefulConnection reserved map[string]error - // shutdown tells if the prepared pool has been drained and shutdown. - shutdown bool + // open tells if the prepared pool is open for accepting transactions. + open bool capacity int } @@ -60,7 +60,7 @@ func (pp *TxPreparedPool) Put(c *StatefulConnection, dtid string) error { pp.mu.Lock() defer pp.mu.Unlock() // If the pool is shutdown, we don't accept new prepared transactions. - if pp.shutdown { + if !pp.open { return vterrors.VT09025("pool is shutdown") } if _, ok := pp.reserved[dtid]; ok { @@ -93,6 +93,27 @@ func (pp *TxPreparedPool) FetchForRollback(dtid string) *StatefulConnection { return c } +// Open marks the prepared pool open for use. +func (pp *TxPreparedPool) Open() { + pp.mu.Lock() + defer pp.mu.Unlock() + pp.open = true +} + +// Close marks the prepared pool closed. +func (pp *TxPreparedPool) Close() { + pp.mu.Lock() + defer pp.mu.Unlock() + pp.open = false +} + +// IsOpen checks if the prepared pool is open for use. +func (pp *TxPreparedPool) IsOpen() bool { + pp.mu.Lock() + defer pp.mu.Unlock() + return pp.open +} + // FetchForCommit returns the connection for commit. Before returning, // it remembers the dtid in its reserved list as "committing". If // the dtid is already in the reserved list, it returns an error. @@ -105,7 +126,7 @@ func (pp *TxPreparedPool) FetchForCommit(dtid string) (*StatefulConnection, erro defer pp.mu.Unlock() // If the pool is shutdown, we don't have any connections to return. // That however doesn't mean this transaction was committed, it could very well have been rollbacked. - if pp.shutdown { + if !pp.open { return nil, vterrors.VT09025("pool is shutdown") } if err, ok := pp.reserved[dtid]; ok { @@ -139,7 +160,7 @@ func (pp *TxPreparedPool) Forget(dtid string) { func (pp *TxPreparedPool) FetchAllForRollback() []*StatefulConnection { pp.mu.Lock() defer pp.mu.Unlock() - pp.shutdown = true + pp.open = false conns := make([]*StatefulConnection, 0, len(pp.conns)) for _, c := range pp.conns { conns = append(conns, c) diff --git a/go/vt/vttablet/tabletserver/tx_prep_pool_test.go b/go/vt/vttablet/tabletserver/tx_prep_pool_test.go index 42e2b800e0e..43c0c022b13 100644 --- a/go/vt/vttablet/tabletserver/tx_prep_pool_test.go +++ b/go/vt/vttablet/tabletserver/tx_prep_pool_test.go @@ -24,13 +24,13 @@ import ( ) func TestEmptyPrep(t *testing.T) { - pp := NewTxPreparedPool(0) + pp := createAndOpenPreparedPool(0) err := pp.Put(nil, "aa") require.ErrorContains(t, err, "prepared transactions exceeded limit: 0") } func TestPrepPut(t *testing.T) { - pp := NewTxPreparedPool(2) + pp := createAndOpenPreparedPool(2) err := pp.Put(nil, "aa") require.NoError(t, err) err = pp.Put(nil, "bb") @@ -50,7 +50,7 @@ func TestPrepPut(t *testing.T) { } func TestPrepFetchForRollback(t *testing.T) { - pp := NewTxPreparedPool(2) + pp := createAndOpenPreparedPool(2) conn := &StatefulConnection{} pp.Put(conn, "aa") got := pp.FetchForRollback("bb") @@ -68,7 +68,7 @@ func TestPrepFetchForRollback(t *testing.T) { } func TestPrepFetchForCommit(t *testing.T) { - pp := NewTxPreparedPool(2) + pp := createAndOpenPreparedPool(2) conn := &StatefulConnection{} got, err := pp.FetchForCommit("aa") require.NoError(t, err) @@ -97,7 +97,7 @@ func TestPrepFetchForCommit(t *testing.T) { } func TestPrepFetchAll(t *testing.T) { - pp := NewTxPreparedPool(2) + pp := createAndOpenPreparedPool(2) conn1 := &StatefulConnection{} conn2 := &StatefulConnection{} pp.Put(conn1, "aa") @@ -108,3 +108,11 @@ func TestPrepFetchAll(t *testing.T) { _, err := pp.FetchForCommit("aa") require.ErrorContains(t, err, "pool is shutdown") } + +// createAndOpenPreparedPool creates a new transaction prepared pool and opens it. +// Used as a helper function for testing. +func createAndOpenPreparedPool(capacity int) *TxPreparedPool { + pp := NewTxPreparedPool(capacity) + pp.Open() + return pp +} diff --git a/go/vt/vttablet/tabletservermock/controller.go b/go/vt/vttablet/tabletservermock/controller.go index 7c7055b3e15..52bb71abcd9 100644 --- a/go/vt/vttablet/tabletservermock/controller.go +++ b/go/vt/vttablet/tabletservermock/controller.go @@ -226,6 +226,9 @@ func (tqsc *Controller) GetThrottlerStatus(ctx context.Context) *throttle.Thrott return nil } +// RedoPreparedTransactions is part of the tabletserver.Controller interface +func (tqsc *Controller) RedoPreparedTransactions() {} + // EnterLameduck implements tabletserver.Controller. func (tqsc *Controller) EnterLameduck() { tqsc.mu.Lock() diff --git a/test/config.json b/test/config.json index 49f77e1b7fb..f1a8f1bcf74 100644 --- a/test/config.json +++ b/test/config.json @@ -842,9 +842,9 @@ "RetryMax": 1, "Tags": [] }, - "vtgate_transaction_twopc_fuzzer": { + "vtgate_transaction_twopc_stress": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/transaction/twopc/fuzzer"], + "Args": ["vitess.io/vitess/go/test/endtoend/transaction/twopc/stress"], "Command": [], "Manual": false, "Shard": "vtgate_transaction",