Skip to content

Commit

Permalink
Atomic Transactions correctness with PRS, ERS and MySQL & Vttablet Re…
Browse files Browse the repository at this point in the history
…starts (vitessio#16553)

Signed-off-by: Manan Gupta <[email protected]>
Signed-off-by: Harshit Gangal <[email protected]>
Co-authored-by: Harshit Gangal <[email protected]>
  • Loading branch information
2 people authored and venkatraju committed Aug 29, 2024
1 parent a143ba3 commit e7bda4b
Show file tree
Hide file tree
Showing 29 changed files with 709 additions and 254 deletions.
37 changes: 37 additions & 0 deletions doc/design-docs/AtomicTransactionsWithDisruptions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Handling disruptions in atomic transactions

## Overview

This document describes how to make atomic transactions resilient in the face of disruptions. The basic design and components involved in an atomic transaction are described in [here](./TwoPhaseCommitDesign.md) The document describes each of the disruptions that can happen in a running cluster and how atomic transactions are engineered to handle them without breaking their guarantee of being atomic.

## `PlannedReparentShard` and `EmergencyReparentShard`

For both Planned and Emergency reparents, we call `DemotePrimary` on the primary tablet. For Planned reparent, this call has to succeed, while on Emergency reparent, if the primary is unreachable then this call can fail, and we would still proceed further.

As part of the `DemotePrimary` flow, when we transition the tablet to a non-serving state, we wait for all the transactions to have completed (in `TxEngine.shutdownLocked()` we have `te.txPool.WaitForEmpty()`). If the user has specified a shutdown grace-period, then after that much time elapses, we go ahead and forcefully kill all running queries. We then also rollback the prepared transactions. It is crucial that we rollback the prepared transactions only after all other writes have been killed, because when we rollback a prepared transaction, it lets go of the locks it was holding. If there were some other conflicting write in progress that hadn't been killed, then it could potentially go through and cause data corruption since we won't be able to prepare the transaction again. All the code to kill queries can be found in `stateManager.terminateAllQueries()`.

The above outlined steps ensure that we either wait for all prepared transactions to conclude or we rollback them safely so that they can be prepared again on the new primary.

On the new primary, when we call `PromoteReplica`, we redo all the prepared transactions before we allow any new writes to go through. This ensures that the new primary is in the same state as the old primary was before the reparent. The code for redoing the prepared transactions can be found in `TxEngine.RedoPreparedTransactions()`.

If everything goes as described above, there is no reason for redoing of prepared transactions to fail. But in case, something unexpected happens and preparing transactions fails, we still allow the vttablet to accept new writes because we decided availability of the tablet is more important. We will however, build tooling and metrics for the users to be notified of these failures and let them handle this in the way they see fit.

While Planned reparent is an operation where all the processes are running fine, Emergency reparent is called when something has gone wrong with the cluster. Because we call `DemotePrimary` in parallel with `StopReplicationAndBuildStatusMap`, we can run into a case wherein the primary tries to write something to the binlog after all the replicas have stopped replicating. If we were to run without semi-sync, then the primary could potentially commit a prepared transaction, and return a success to the vtgate trying to commit this transaction. The vtgate can then conclude that the transaction is safe to conclude and remove all the metadata information. However, on the new primary since the transaction commit didn't get replicated, it would re-prepare the transaction and would wait for a coordinator to either commit or rollback it, but that would never happen. Essentially we would have a transaction stuck in prepared state on a shard indefinitely. To avoid this situation, it is essential that we run with semi-sync, because this ensures that any write that is acknowledged as a success to the caller, would necessarily have to be replicated to at least one replica. This ensures that the transaction would also already be committed on the new primary.

## MySQL Restarts

When MySQL restarts, it loses all the ongoing transactions which includes all the prepared transactions. This is because the transaction logs are not persistent across restarts. This is a MySQL limitation and there is no way to get around this. However, at the Vitess level we must ensure that we can commit the prepared transactions even in case of MySQL restarts without any failures.

Vttablet has the code to detect MySQL failures and call `stateManager.checkMySQL()` which transitions the tablet to a NotConnected state. This prevents any writes from going through until the vttablet has transitioned back to a serving state.

However, we cannot rely on `checkMySQL` to ensure that no conflicting writes go through. This is because the time between MySQL restart and the vttablet transitioning to a NotConnected state can be large. During this time, the vttablet would still be accepting writes and some of them could potentially conflict with the prepared transactions.

To handle this, we rely on the fact that when MySQL restarts, it starts with super-read-only turned on. This means that no writes can go through. It is VTOrc that registers this as an issue and fixes it by calling `UndoDemotePrimary`. As part of that call, before we set MySQL to read-write, we ensure that all the prepared transactions are redone in the read_only state. We use the dba pool (that has admin permissions) to prepare the transactions. This is safe because we know that no conflicting writes can go through until we set MySQL to read-write. The code to set MySQL to read-write after redoing prepared transactions can be found in `TabletManager.redoPreparedTransactionsAndSetReadWrite()`.

Handling MySQL restarts is the only reason we needed to add the code to redo prepared transactions whenever MySQL transitions from super-read-only to read-write state. Even though, we only need to do this in `UndoDemotePrimary`, it not necessary that it is `UndoDemotePrimary` that sets MySQL to read-write. If the user notices that the tablet is in a read-only state before VTOrc has a chance to fix it, they can manually call `SetReadWrite` on the tablet.
Therefore, the safest option was to always check if we need to redo the prepared transactions whenever MySQL transitions from super-read-only to read-write state.

## Vttablet Restarts

When Vttabet restarts, all the previous connections are dropped. It starts in a non-serving state, and then after reading the shard and tablet records from the topo, it transitions to a serving state.
As part of this transition we need to ensure that we redo the prepared transactions before we start accepting any writes. This is done as part of the `TxEngine.transition` function when we transition to an `AcceptingReadWrite` state. We call the same code for redoing the prepared transactions that we called for MySQL restarts, PRS and ERS.
1 change: 1 addition & 0 deletions go/test/endtoend/transaction/twopc/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func start(t *testing.T) (*mysql.Conn, func()) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
cleanup(t)

return conn, func() {
conn.Close()
Expand Down
19 changes: 11 additions & 8 deletions go/test/endtoend/transaction/twopc/schema.sql
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
create table twopc_user (
id bigint,
create table twopc_user
(
id bigint,
name varchar(64),
primary key (id)
) Engine=InnoDB;

create table twopc_music (
id varchar(64),
create table twopc_music
(
id varchar(64),
user_id bigint,
title varchar(64),
title varchar(64),
primary key (id)
) Engine=InnoDB;

create table twopc_t1 (
id bigint,
create table twopc_t1
(
id bigint,
col bigint,
primary key (id, col)
primary key (id)
) Engine=InnoDB;
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,27 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package fuzzer
package stress

import (
"context"
"fmt"
"os"
"path"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/syscallutil"
"vitess.io/vitess/go/vt/log"
)

var (
Expand Down Expand Up @@ -67,10 +74,12 @@ var (
// Moreover, the threadIDs of rows for a given update set in the 3 shards should be the same to ensure that conflicting transactions got committed in the same exact order.
func TestTwoPCFuzzTest(t *testing.T) {
testcases := []struct {
name string
threads int
updateSets int
timeForTesting time.Duration
name string
threads int
updateSets int
timeForTesting time.Duration
clusterDisruptions []func()
disruptionProbability []int
}{
{
name: "Single Thread - Single Set",
Expand All @@ -90,15 +99,24 @@ func TestTwoPCFuzzTest(t *testing.T) {
updateSets: 15,
timeForTesting: 5 * time.Second,
},
{
name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL and Vttablet restart disruptions",
threads: 15,
updateSets: 15,
timeForTesting: 5 * time.Second,
clusterDisruptions: []func(){prs, ers, mysqlRestarts, vttabletRestarts},
disruptionProbability: []int{5, 5, 5, 5},
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
conn, closer := start(t)
defer closer()
fz := newFuzzer(tt.threads, tt.updateSets)
fz := newFuzzer(tt.threads, tt.updateSets, tt.clusterDisruptions, tt.disruptionProbability)

fz.initialize(t, conn)
conn.Close()
// Start the fuzzer.
fz.start(t)

Expand All @@ -108,8 +126,12 @@ func TestTwoPCFuzzTest(t *testing.T) {
// Signal the fuzzer to stop.
fz.stop()

// Wait for all transactions to be resolved.
waitForResults(t, fmt.Sprintf(`show unresolved transactions for %v`, keyspaceName), "[]", 30*time.Second)
// Verify that all the transactions run were actually atomic and no data issues have occurred.
fz.verifyTransactionsWereAtomic(t)

log.Errorf("Verification complete. All good!")
})
}
}
Expand Down Expand Up @@ -176,14 +198,20 @@ type fuzzer struct {
wg sync.WaitGroup
// updateRowVals are the rows that we use to ensure 1 update on each shard with the same increment.
updateRowsVals [][]int
// clusterDisruptions are the cluster level disruptions that can happen in a running cluster.
clusterDisruptions []func()
// disruptionProbability is the chance for the disruption to happen. We check this every 100 milliseconds.
disruptionProbability []int
}

// newFuzzer creates a new fuzzer struct.
func newFuzzer(threads int, updateSets int) *fuzzer {
func newFuzzer(threads int, updateSets int, clusterDisruptions []func(), disruptionProbability []int) *fuzzer {
fz := &fuzzer{
threads: threads,
updateSets: updateSets,
wg: sync.WaitGroup{},
threads: threads,
updateSets: updateSets,
wg: sync.WaitGroup{},
clusterDisruptions: clusterDisruptions,
disruptionProbability: disruptionProbability,
}
// Initially the fuzzer thread is stopped.
fz.shouldStop.Store(true)
Expand All @@ -202,12 +230,16 @@ func (fz *fuzzer) stop() {
func (fz *fuzzer) start(t *testing.T) {
// We mark the fuzzer thread to be running now.
fz.shouldStop.Store(false)
fz.wg.Add(fz.threads)
// fz.threads is the count of fuzzer threads, and one disruption thread.
fz.wg.Add(fz.threads + 1)
for i := 0; i < fz.threads; i++ {
go func() {
fz.runFuzzerThread(t, i)
}()
}
go func() {
fz.runClusterDisruptionThread(t)
}()
}

// runFuzzerThread is used to run a thread of the fuzzer.
Expand Down Expand Up @@ -308,3 +340,108 @@ func (fz *fuzzer) generateInsertQueries(updateSet int, threadId int) []string {
})
return queries
}

// runClusterDisruptionThread runs the cluster level disruptions in a separate thread.
func (fz *fuzzer) runClusterDisruptionThread(t *testing.T) {
// Whenever we finish running this thread, we should mark the thread has stopped.
defer func() {
fz.wg.Done()
}()

for {
// If disruption thread is marked to be stopped, then we should exit this go routine.
if fz.shouldStop.Load() == true {
return
}
// Run a potential disruption
fz.runClusterDisruption(t)
time.Sleep(100 * time.Millisecond)
}

}

// runClusterDisruption tries to run a single cluster disruption.
func (fz *fuzzer) runClusterDisruption(t *testing.T) {
for idx, prob := range fz.disruptionProbability {
if rand.Intn(100) < prob {
fz.clusterDisruptions[idx]()
return
}
}
}

/*
Cluster Level Disruptions for the fuzzer
*/

func prs() {
shards := clusterInstance.Keyspaces[0].Shards
shard := shards[rand.Intn(len(shards))]
vttablets := shard.Vttablets
newPrimary := vttablets[rand.Intn(len(vttablets))]
log.Errorf("Running PRS for - %v/%v with new primary - %v", keyspaceName, shard.Name, newPrimary.Alias)
err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, newPrimary.Alias)
if err != nil {
log.Errorf("error running PRS - %v", err)
}
}

func ers() {
shards := clusterInstance.Keyspaces[0].Shards
shard := shards[rand.Intn(len(shards))]
vttablets := shard.Vttablets
newPrimary := vttablets[rand.Intn(len(vttablets))]
log.Errorf("Running ERS for - %v/%v with new primary - %v", keyspaceName, shard.Name, newPrimary.Alias)
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("EmergencyReparentShard", fmt.Sprintf("%s/%s", keyspaceName, shard.Name), "--new-primary", newPrimary.Alias)
if err != nil {
log.Errorf("error running ERS - %v", err)
}
}

func vttabletRestarts() {
shards := clusterInstance.Keyspaces[0].Shards
shard := shards[rand.Intn(len(shards))]
vttablets := shard.Vttablets
tablet := vttablets[rand.Intn(len(vttablets))]
log.Errorf("Restarting vttablet for - %v/%v - %v", keyspaceName, shard.Name, tablet.Alias)
err := tablet.VttabletProcess.TearDown()
if err != nil {
log.Errorf("error stopping vttablet - %v", err)
return
}
tablet.VttabletProcess.ServingStatus = "SERVING"
for {
err = tablet.VttabletProcess.Setup()
if err == nil {
return
}
// Sometimes vttablets fail to connect to the topo server due to a minor blip there.
// We don't want to fail the test, so we retry setting up the vttablet.
log.Errorf("error restarting vttablet - %v", err)
time.Sleep(1 * time.Second)
}
}

func mysqlRestarts() {
shards := clusterInstance.Keyspaces[0].Shards
shard := shards[rand.Intn(len(shards))]
vttablets := shard.Vttablets
tablet := vttablets[rand.Intn(len(vttablets))]
log.Errorf("Restarting MySQL for - %v/%v tablet - %v", keyspaceName, shard.Name, tablet.Alias)
pidFile := path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d/mysql.pid", tablet.TabletUID))
pidBytes, err := os.ReadFile(pidFile)
if err != nil {
// We can't read the file which means the PID file does not exist
// The server must have stopped
return
}
pid, err := strconv.Atoi(strings.TrimSpace(string(pidBytes)))
if err != nil {
log.Errorf("Error in conversion to integer: %v", err)
return
}
err = syscallutil.Kill(pid, syscall.SIGKILL)
if err != nil {
log.Errorf("Error in killing process: %v", err)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package fuzzer
package stress

import (
"context"
Expand Down Expand Up @@ -75,12 +75,13 @@ func TestMain(m *testing.M) {

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
SidecarDBName: sidecarDBName,
Name: keyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
SidecarDBName: sidecarDBName,
DurabilityPolicy: "semi_sync",
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 0, false); err != nil {
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false); err != nil {
return 1
}

Expand Down Expand Up @@ -113,4 +114,5 @@ func cleanup(t *testing.T) {

utils.ClearOutTable(t, vtParams, "twopc_fuzzer_insert")
utils.ClearOutTable(t, vtParams, "twopc_fuzzer_update")
utils.ClearOutTable(t, vtParams, "twopc_t1")
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,9 @@ create table twopc_fuzzer_insert (
key(col),
primary key (id, col)
) Engine=InnoDB;

create table twopc_t1 (
id bigint,
col bigint,
primary key (id)
) Engine=InnoDB;
Loading

0 comments on commit e7bda4b

Please sign in to comment.