Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atomic Transactions handling with Online DDL #16585

Merged
merged 14 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions go/test/endtoend/transaction/twopc/stress/fuzzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import (

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/syscallutil"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
)

var (
Expand Down Expand Up @@ -78,7 +80,7 @@ func TestTwoPCFuzzTest(t *testing.T) {
threads int
updateSets int
timeForTesting time.Duration
clusterDisruptions []func()
clusterDisruptions []func(t *testing.T)
disruptionProbability []int
}{
{
Expand All @@ -100,20 +102,20 @@ func TestTwoPCFuzzTest(t *testing.T) {
timeForTesting: 5 * time.Second,
},
{
name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL and Vttablet restart disruptions",
name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL & Vttablet restart, OnlineDDL disruptions",
threads: 15,
updateSets: 15,
timeForTesting: 5 * time.Second,
clusterDisruptions: []func(){prs, ers, mysqlRestarts, vttabletRestarts},
disruptionProbability: []int{5, 5, 5, 5},
clusterDisruptions: []func(t *testing.T){prs, ers, mysqlRestarts, vttabletRestarts, onlineDDLFuzzer},
disruptionProbability: []int{5, 5, 5, 5, 5},
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
conn, closer := start(t)
defer closer()
fz := newFuzzer(tt.threads, tt.updateSets, tt.clusterDisruptions, tt.disruptionProbability)
fz := newFuzzer(t, tt.threads, tt.updateSets, tt.clusterDisruptions, tt.disruptionProbability)

fz.initialize(t, conn)
conn.Close()
Expand Down Expand Up @@ -190,6 +192,7 @@ func getThreadIDsForUpdateSetFromFuzzInsert(t *testing.T, conn *mysql.Conn, upda
type fuzzer struct {
threads int
updateSets int
t *testing.T

// shouldStop is an internal state variable, that tells the fuzzer
// whether it should stop or not.
Expand All @@ -199,14 +202,15 @@ type fuzzer struct {
// updateRowVals are the rows that we use to ensure 1 update on each shard with the same increment.
updateRowsVals [][]int
// clusterDisruptions are the cluster level disruptions that can happen in a running cluster.
clusterDisruptions []func()
clusterDisruptions []func(t *testing.T)
// disruptionProbability is the chance for the disruption to happen. We check this every 100 milliseconds.
disruptionProbability []int
}

// newFuzzer creates a new fuzzer struct.
func newFuzzer(threads int, updateSets int, clusterDisruptions []func(), disruptionProbability []int) *fuzzer {
func newFuzzer(t *testing.T, threads int, updateSets int, clusterDisruptions []func(t *testing.T), disruptionProbability []int) *fuzzer {
fz := &fuzzer{
t: t,
threads: threads,
updateSets: updateSets,
wg: sync.WaitGroup{},
Expand Down Expand Up @@ -364,7 +368,7 @@ func (fz *fuzzer) runClusterDisruptionThread(t *testing.T) {
func (fz *fuzzer) runClusterDisruption(t *testing.T) {
for idx, prob := range fz.disruptionProbability {
if rand.Intn(100) < prob {
fz.clusterDisruptions[idx]()
fz.clusterDisruptions[idx](fz.t)
return
}
}
Expand All @@ -374,7 +378,7 @@ func (fz *fuzzer) runClusterDisruption(t *testing.T) {
Cluster Level Disruptions for the fuzzer
*/

func prs() {
func prs(t *testing.T) {
shards := clusterInstance.Keyspaces[0].Shards
shard := shards[rand.Intn(len(shards))]
vttablets := shard.Vttablets
Expand All @@ -386,7 +390,7 @@ func prs() {
}
}

func ers() {
func ers(t *testing.T) {
shards := clusterInstance.Keyspaces[0].Shards
shard := shards[rand.Intn(len(shards))]
vttablets := shard.Vttablets
Expand All @@ -398,7 +402,7 @@ func ers() {
}
}

func vttabletRestarts() {
func vttabletRestarts(t *testing.T) {
shards := clusterInstance.Keyspaces[0].Shards
shard := shards[rand.Intn(len(shards))]
vttablets := shard.Vttablets
Expand All @@ -422,7 +426,27 @@ func vttabletRestarts() {
}
}

func mysqlRestarts() {
var orderedDDLFuzzer = []string{
"alter table twopc_fuzzer_insert add column extra_col1 varchar(20)",
"alter table twopc_fuzzer_insert add column extra_col2 varchar(20)",
"alter table twopc_fuzzer_insert drop column extra_col1",
"alter table twopc_fuzzer_insert drop column extra_col2",
}

// onlineDDLFuzzer runs an online DDL statement while ignoring any errors for the fuzzer.
func onlineDDLFuzzer(t *testing.T) {
output, err := clusterInstance.VtctldClientProcess.ApplySchemaWithOutput(keyspaceName, orderedDDLFuzzer[count%len(orderedDDLFuzzer)], cluster.ApplySchemaParams{
DDLStrategy: "vitess --force-cut-over-after=1ms",
})
count++
if err != nil {
return
}
fmt.Println("Running online DDL with uuid: ", output)
WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
}

func mysqlRestarts(t *testing.T) {
shards := clusterInstance.Keyspaces[0].Shards
shard := shards[rand.Intn(len(shards))]
vttablets := shard.Vttablets
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/transaction/twopc/stress/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func TestMain(m *testing.M) {
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--twopc_enable",
"--twopc_abandon_age", "1",
"--migration_check_interval", "2s",
)

// Start keyspace
Expand Down
106 changes: 95 additions & 11 deletions go/test/endtoend/transaction/twopc/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,25 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/syscallutil"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
)

// TestDisruptions tests that atomic transactions persevere through various disruptions.
func TestDisruptions(t *testing.T) {
testcases := []struct {
disruptionName string
commitDelayTime string
disruption func() error
disruption func(t *testing.T) error
}{
{
disruptionName: "No Disruption",
commitDelayTime: "1",
disruption: func() error {
disruption: func(t *testing.T) error {
return nil
},
},
Expand All @@ -68,6 +71,11 @@ func TestDisruptions(t *testing.T) {
commitDelayTime: "5",
disruption: vttabletRestartShard3,
},
{
disruptionName: "OnlineDDL",
commitDelayTime: "20",
disruption: onlineDDL,
},
{
disruptionName: "EmergencyReparentShard",
commitDelayTime: "5",
Expand Down Expand Up @@ -119,7 +127,7 @@ func TestDisruptions(t *testing.T) {
}()
}
// Run the disruption.
err := tt.disruption()
err := tt.disruption(t)
require.NoError(t, err)
// Wait for the commit to have returned. We don't actually check for an error in the commit because the user might receive an error.
// But since we are waiting in CommitPrepared, the decision to commit the transaction should have already been taken.
Expand All @@ -145,6 +153,7 @@ func threadToWrite(t *testing.T, ctx context.Context, id int) {
continue
}
_, _ = utils.ExecAllowError(t, conn, fmt.Sprintf("insert into twopc_t1(id, col) values(%d, %d)", id, rand.Intn(10000)))
conn.Close()
}
}

Expand All @@ -170,11 +179,13 @@ func waitForResults(t *testing.T, query string, resultExpected string, waitTime
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
if err == nil {
res := utils.Exec(t, conn, query)
res, _ := utils.ExecAllowError(t, conn, query)
conn.Close()
prevRes = res.Rows
if fmt.Sprintf("%v", res.Rows) == resultExpected {
return
if res != nil {
prevRes = res.Rows
if fmt.Sprintf("%v", res.Rows) == resultExpected {
return
}
}
}
time.Sleep(100 * time.Millisecond)
Expand All @@ -187,29 +198,29 @@ Cluster Level Disruptions for the fuzzer
*/

// prsShard3 runs a PRS in shard 3 of the keyspace. It promotes the second tablet to be the new primary.
func prsShard3() error {
func prsShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
newPrimary := shard.Vttablets[1]
return clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, newPrimary.Alias)
}

// ersShard3 runs a ERS in shard 3 of the keyspace. It promotes the second tablet to be the new primary.
func ersShard3() error {
func ersShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
newPrimary := shard.Vttablets[1]
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("EmergencyReparentShard", fmt.Sprintf("%s/%s", keyspaceName, shard.Name), "--new-primary", newPrimary.Alias)
return err
}

// vttabletRestartShard3 restarts the first vttablet of the third shard.
func vttabletRestartShard3() error {
func vttabletRestartShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
tablet := shard.Vttablets[0]
return tablet.RestartOnlyTablet()
}

// mysqlRestartShard3 restarts MySQL on the first tablet of the third shard.
func mysqlRestartShard3() error {
func mysqlRestartShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
vttablets := shard.Vttablets
tablet := vttablets[0]
Expand All @@ -227,3 +238,76 @@ func mysqlRestartShard3() error {
}
return syscallutil.Kill(pid, syscall.SIGKILL)
}

var orderedDDL = []string{
"alter table twopc_t1 add column extra_col1 varchar(20)",
"alter table twopc_t1 add column extra_col2 varchar(20)",
"alter table twopc_t1 add column extra_col3 varchar(20)",
"alter table twopc_t1 add column extra_col4 varchar(20)",
}

var count = 0

// onlineDDL runs a DDL statement.
func onlineDDL(t *testing.T) error {
output, err := clusterInstance.VtctldClientProcess.ApplySchemaWithOutput(keyspaceName, orderedDDL[count%len(orderedDDL)], cluster.ApplySchemaParams{
DDLStrategy: "vitess --force-cut-over-after=1ms",
})
require.NoError(t, err)
count++
fmt.Println("uuid: ", output)
status := WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), status)
require.Equal(t, schema.OnlineDDLStatusComplete, status)
return nil
}

func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus {
shardNames := map[string]bool{}
for _, shard := range shards {
shardNames[shard.Name] = true
}
query := fmt.Sprintf("show vitess_migrations like '%s'", uuid)

statusesMap := map[string]bool{}
for _, status := range expectStatuses {
statusesMap[string(status)] = true
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

lastKnownStatus := ""
for {
countMatchedShards := 0
conn, err := mysql.Connect(ctx, vtParams)
if err != nil {
continue
}
r, err := utils.ExecAllowError(t, conn, query)
conn.Close()
if err != nil {
continue
}
for _, row := range r.Named().Rows {
shardName := row["shard"].ToString()
if !shardNames[shardName] {
// irrelevant shard
continue
}
lastKnownStatus = row["migration_status"].ToString()
if row["migration_uuid"].ToString() == uuid && statusesMap[lastKnownStatus] {
countMatchedShards++
}
}
if countMatchedShards == len(shards) {
return schema.OnlineDDLStatus(lastKnownStatus)
}
select {
case <-ctx.Done():
return schema.OnlineDDLStatus(lastKnownStatus)
case <-ticker.C:
}
}
}
2 changes: 1 addition & 1 deletion go/test/endtoend/transaction/twopc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) {
return
}
_, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false)
conn.Close()
if err != nil {
fmt.Printf("Error in cleanup deletion - %v\n", err)
conn.Close()
time.Sleep(100 * time.Millisecond)
continue
}
Expand Down
21 changes: 21 additions & 0 deletions go/vt/sqlparser/ast_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2818,3 +2818,24 @@ func (lock Lock) GetHighestOrderLock(newLock Lock) Lock {
func Clone[K SQLNode](x K) K {
return CloneSQLNode(x).(K)
}

// ExtractAllTables returns all the table names in the SQLNode as slice of string
func ExtractAllTables(stmt Statement) []string {
var tables []string
tableMap := make(map[string]any)
_ = Walk(func(node SQLNode) (kontinue bool, err error) {
switch node := node.(type) {
case *AliasedTableExpr:
if tblName, ok := node.Expr.(TableName); ok {
name := String(tblName)
if _, exists := tableMap[name]; !exists {
tableMap[name] = nil
tables = append(tables, name)
}
return false, nil
}
}
return true, nil
}, stmt)
return tables
}
Loading
Loading