Skip to content

Commit

Permalink
Correct & improve on e2e and unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Mar 29, 2022
1 parent 69566f8 commit 6c8655f
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 93 deletions.
8 changes: 6 additions & 2 deletions go/test/endtoend/cluster/topo_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,12 @@ func (topo *TopoProcess) IsHealthy() bool {
}

func (topo *TopoProcess) removeTopoDirectories(Cell string) {
_ = topo.ManageTopoDir("rmdir", "/vitess/global")
_ = topo.ManageTopoDir("rmdir", "/vitess/"+Cell)
if err := topo.ManageTopoDir("rmdir", "/vitess/global"); err != nil {
log.Errorf("Failed to remove global topo directory: %v", err)
}
if err := topo.ManageTopoDir("rmdir", "/vitess/"+Cell); err != nil {
log.Errorf("Failed to remove local topo directory: %v", err)
}
}

// ManageTopoDir creates global and zone in etcd2
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/cluster/vtctlclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type VtctlClientParams struct {
MigrationContext string
SkipPreflight bool
UUIDList string
CallerId string
CallerID string
}

// InitShardPrimary executes vtctlclient command to make specified tablet the primary for the shard.
Expand Down Expand Up @@ -91,8 +91,8 @@ func (vtctlclient *VtctlClientProcess) ApplySchemaWithOutput(Keyspace string, SQ
args = append(args, "--skip_preflight")
}

if params.CallerId != "" {
args = append(args, "--caller_id", params.CallerId)
if params.CallerID != "" {
args = append(args, "--caller_id", params.CallerID)
}
args = append(args, Keyspace)
return vtctlclient.ExecuteCommandWithOutput(args...)
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/onlineddl/ghost/onlineddl_ghost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str
}
} else {
var err error
uuid, err = clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, sqlQuery, cluster.VtctlClientParams{DDLStrategy: ddlStrategy, CallerId: callerID})
uuid, err = clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, sqlQuery, cluster.VtctlClientParams{DDLStrategy: ddlStrategy, CallerID: callerID})
assert.NoError(t, err)
}
uuid = strings.TrimSpace(uuid)
Expand Down
92 changes: 63 additions & 29 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,43 @@ func throttlerCheckSelf(tablet *cluster.VttabletProcess, app string) (resp *http
return resp, respBody, err
}

func TestVreplicationCopyThrottling(t *testing.T) {
workflow := "copy-throttling"
cell := "zone1"
table := "customer"
shard := "0"
vc = NewVitessCluster(t, "TestVreplicationCopyThrottling", []string{cell}, mainClusterConfig)
defer vc.TearDown(t)
defaultCell = vc.Cells[cell]
// To test vstreamer source throttling for the MoveTables operation
maxSourceTrxHistory := 5
extraVTTabletArgs = []string{
fmt.Sprintf("--vreplication_copy_phase_max_innodb_history_list_length=%d", maxSourceTrxHistory),
}

if _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, sourceKs, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil); err != nil {
t.Fatal(err)
}
if _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, targetKs, shard, "", "", 0, 0, 200, nil); err != nil {
t.Fatal(err)
}
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", sourceKs, shard), 1)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", targetKs, shard), 1)

// Confirm that the initial copy table phase does not proceed until the source tablet(s)
// have an InnoDB History List length that is less than specified in the tablet's config.
// We update rows in a table not part of the MoveTables operation so that we're not blocking
// on the LOCK TABLE call but rather the InnoDB History List length.
trxConn := createSourceInnoDBRowHistory(t, sourceKs, maxSourceTrxHistory)
// We need to force primary tablet types as the history list has been increased on the source primary
moveTablesWithTabletTypes(t, defaultCell.Name, workflow, sourceKs, targetKs, table, "primary")
verifySourceTabletThrottling(t, targetKs, workflow)
deleteSourceInnoDBRowHistory(t, trxConn)
trxConn.Close()
}

func TestBasicVreplicationWorkflow(t *testing.T) {
sourceKsOpts["DBTypeVersion"] = "mysql-5.7"
targetKsOpts["DBTypeVersion"] = "mysql-5.7"
Expand All @@ -125,16 +162,6 @@ func testBasicVreplicationWorkflow(t *testing.T) {

defer vc.TearDown(t)

// To test vstreamer source throttling for the MoveTables operation
maxSourceTrxHistory := 1
maxSourceRplLag := 1
transactionTimeout := 60
extraVTTabletArgs = []string{
fmt.Sprintf("--vreplication_copy_phase_max_innodb_history_list_length=%d", maxSourceTrxHistory),
fmt.Sprintf("--vreplication_copy_phase_max_mysql_replication_lag=%d", maxSourceRplLag),
fmt.Sprintf("--queryserver-config-transaction-timeout=%d", transactionTimeout),
}

defaultCell = vc.Cells[defaultCellName]
vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, sourceKsOpts)
vtgate = defaultCell.Vtgates[0]
Expand Down Expand Up @@ -375,20 +402,11 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
custKs := vc.Cells[defaultCell.Name].Keyspaces["customer"]

tables := "customer,Lead,Lead-1,db_order_test"
moveTables(t, sourceCellOrAlias, workflow, sourceKs, targetKs, tables)

customerTab1 := custKs.Shards["-80"].Tablets["zone1-200"].Vttablet
customerTab2 := custKs.Shards["80-"].Tablets["zone1-300"].Vttablet

// Confirm that the initial copy table phase does not proceed until the source tablet(s)
// have an InnoDB History List length that is less than specified in the tablet's config.
// We update rows in a table not part of the MoveTables operation so that we're not blocking
// on the LOCK TABLE call but rather the InnoDB History List length.
trxConn := createSourceInnoDBRowHistory(t, sourceKs)
// We need to force primary tablet types as the history list has been increased on the source primary
moveTablesWithTabletTypes(t, sourceCellOrAlias, workflow, sourceKs, targetKs, tables, "primary")
verifySourceTabletThrottling(t, targetKs, workflow)
deleteSourceInnoDBRowHistory(t, trxConn)
trxConn.Close()

catchup(t, customerTab1, workflow, "MoveTables")
catchup(t, customerTab2, workflow, "MoveTables")

Expand Down Expand Up @@ -525,7 +543,7 @@ func validateRollupReplicates(t *testing.T) {
}

func verifySourceTabletThrottling(t *testing.T, targetKS, workflow string) {
tDuration := time.Duration(30 * time.Second)
tDuration := time.Duration(15 * time.Second)
ticker := time.NewTicker(5 * time.Second)
timer := time.NewTimer(tDuration)
ksWorkflow := fmt.Sprintf("%s.%s", targetKS, workflow)
Expand Down Expand Up @@ -1139,15 +1157,31 @@ func dropSources(t *testing.T, ksWorkflow string) {
// createSourceInnoDBRowHistory generates at least maxSourceTrxHistory rollback segment entries.
// This allows us to confirm two behaviors:
// 1. MoveTables blocks on starting its first copy phase until we rollback
// 2. All other workflows continue to work w/o issue with this MVCC history in place
// 2. All other workflows continue to work w/o issue with this MVCC history in place (not used yet)
// Returns a db connection used for the transaction which you can use for follow-up
// work, such as rolling it back directly or using the deleteSourceInnoDBRowHistory call.
func createSourceInnoDBRowHistory(t *testing.T, sourceKS string) *mysql.Conn {
dbConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
execQuery(t, dbConn, "use "+sourceKS)
execQuery(t, dbConn, "start transaction")
execQuery(t, dbConn, "update product set pid = pid+1000")
return dbConn
func createSourceInnoDBRowHistory(t *testing.T, sourceKS string, neededTrxHistory int) *mysql.Conn {
dbConn1 := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
dbConn2 := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
execQuery(t, dbConn1, "use "+sourceKS)
execQuery(t, dbConn2, "use "+sourceKS)
offset := 1000
insertStmt := strings.Builder{}
for i := offset; i <= (neededTrxHistory*10)+offset; i++ {
if i == offset {
insertStmt.WriteString(fmt.Sprintf("insert into product (pid, description) values (%d, 'test')", i))
} else {
insertStmt.WriteString(fmt.Sprintf(", (%d, 'test')", i))
}
}
execQuery(t, dbConn2, "start transaction")
execQuery(t, dbConn2, "select count(*) from product")
execQuery(t, dbConn1, insertStmt.String())
execQuery(t, dbConn2, "rollback")
execQuery(t, dbConn2, "start transaction")
execQuery(t, dbConn2, "select count(*) from product")
execQuery(t, dbConn1, fmt.Sprintf("delete from product where pid >= %d and pid < %d", offset, offset+10000))
return dbConn2
}

func deleteSourceInnoDBRowHistory(t *testing.T, dbConn *mysql.Conn) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func TestPlayerCopyBigTable(t *testing.T) {
defer func() { *copyPhaseDuration = savedCopyPhaseDuration }()

savedWaitRetryTime := waitRetryTime
// waitRetry time shoulw be very low to cause the wait loop to execute multipel times.
// waitRetry time should be very low to cause the wait loop to execute multiple times.
waitRetryTime = 10 * time.Millisecond
defer func() { waitRetryTime = savedWaitRetryTime }()

Expand Down Expand Up @@ -676,7 +676,7 @@ func TestPlayerCopyWildcardRule(t *testing.T) {
defer func() { *copyPhaseDuration = savedCopyPhaseDuration }()

savedWaitRetryTime := waitRetryTime
// waitRetry time shoulw be very low to cause the wait loop to execute multipel times.
// waitRetry time should be very low to cause the wait loop to execute multipel times.
waitRetryTime = 10 * time.Millisecond
defer func() { waitRetryTime = savedWaitRetryTime }()

Expand Down
55 changes: 0 additions & 55 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,58 +508,3 @@ func (uvs *uvstreamer) setReplicationLagSeconds(sbm int64) {
defer uvs.mu.Unlock()
uvs.ReplicationLagSeconds = sbm
}

// getTrxHistoryLen attempts to query InnoDB's current transaction rollback segment's history
// list length. If the value cannot be determined for any reason then -1 is returned, which means
// "unknown".
func (uvs *uvstreamer) getTrxHistoryLen() int64 {
histLen := int64(-1)
conn, err := uvs.cp.Connect(uvs.ctx)
if err != nil {
return histLen
}
defer conn.Close()

res, err := conn.ExecuteFetch(trxHistoryLenQuery, 1, false)
if err != nil || len(res.Rows) != 1 || res.Rows[0] == nil {
return histLen
}
histLen, _ = res.Rows[0][0].ToInt64()
return histLen
}

// getReplicationLag attempts to get the seconds_behind_master value.
// If the value cannot be determined for any reason then -1 is returned, which
// means "unknown" or "irrelevant" (meaning it's not actively replicating).
func (uvs *uvstreamer) getReplicationLag() int64 {
lagSecs := int64(-1)
conn, err := uvs.cp.Connect(uvs.ctx)
if err != nil {
return lagSecs
}
defer conn.Close()

res, err := conn.ExecuteFetch(replicaLagQuery, 1, true)
if err != nil || len(res.Rows) != 1 || res.Rows[0] == nil {
return lagSecs
}
row := res.Named().Row()
return row.AsInt64("Seconds_Behind_Master", -1)
}

// getSourceEndpoint returns the host:port value for the vstreamer (MySQL) instance
func (uvs *uvstreamer) getEndpoint() (string, error) {
conn, err := uvs.cp.Connect(uvs.ctx)
if err != nil {
return "", err
}
defer conn.Close()

res, err := conn.ExecuteFetch(hostQuery, 1, false)
if err != nil || len(res.Rows) != 1 || res.Rows[0] == nil {
return "", vterrors.Wrap(err, "could not get vstreamer endpoint")
}
host := res.Rows[0][0].ToString()
port, _ := res.Rows[0][1].ToInt64()
return fmt.Sprintf("%s:%d", host, port), nil
}
2 changes: 1 addition & 1 deletion test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1060,7 +1060,7 @@
},
"vreplication_basic": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestBasicVreplicationWorkflow"],
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "^(TestVreplicationCopyThrottling|TestBasicVreplicationWorkflow)$", "-keep-data=false"],
"Command": [],
"Manual": false,
"Shard": "vreplication_basic",
Expand Down

0 comments on commit 6c8655f

Please sign in to comment.