Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-21.0] VReplication: Support reversing read-only traffic in vtctldclient (#16920) #17015

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,11 @@ func executeOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletPro

func assertQueryExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) {
t.Helper()
rr, err := vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules")
require.NoError(t, err)
count0, body0, count1, body1 := executeOnTablet(t, conn, tablet, ksName, query, matchQuery)
assert.Equalf(t, count0+1, count1, "query %q did not execute in target;\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\n", query, matchQuery, body0, body1)
require.Equalf(t, count0+1, count1, "query %q did not execute on destination %s (%s-%d);\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\nrouting rules:\n%s\n\n",
query, ksName, tablet.Cell, tablet.TabletUID, matchQuery, body0, body1, rr)
}

func assertQueryDoesNotExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@ import (
)

func TestMoveTablesBuffering(t *testing.T) {
defaultRdonly = 1
ogReplicas := defaultReplicas
ogRdOnly := defaultRdonly
defer func() {
defaultReplicas = ogReplicas
defaultRdonly = ogRdOnly
}()
defaultRdonly = 0
defaultReplicas = 0
vc = setupMinimalCluster(t)
defer vc.TearDown()

Expand Down
129 changes: 94 additions & 35 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"math/rand/v2"
"net"
"slices"
"strconv"
"strings"
"sync"
Expand All @@ -35,9 +36,11 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/wrangler"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

Expand Down Expand Up @@ -169,9 +172,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
args = append(args, "--tablet-types", tabletTypes)
}
args = append(args, "--action_timeout=10m") // At this point something is up so fail the test
if debugMode {
t.Logf("Executing workflow command: vtctldclient %v", strings.Join(args, " "))
}
t.Logf("Executing workflow command: vtctldclient %s", strings.Join(args, " "))
output, err := vc.VtctldClient.ExecuteCommandWithOutput(args...)
lastOutput = output
if err != nil {
Expand Down Expand Up @@ -334,33 +335,44 @@ func tstWorkflowCancel(t *testing.T) error {
return tstWorkflowAction(t, workflowActionCancel, "", "")
}

func validateReadsRoute(t *testing.T, tabletTypes string, tablet *cluster.VttabletProcess) {
func validateReadsRoute(t *testing.T, tabletType string, tablet *cluster.VttabletProcess) {
if tablet == nil {
return
}
if tabletTypes == "" {
tabletTypes = "replica,rdonly"
}
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
for _, tt := range []string{"replica", "rdonly"} {
destination := fmt.Sprintf("%s:%s@%s", tablet.Keyspace, tablet.Shard, tt)
if strings.Contains(tabletTypes, tt) {
readQuery := "select cid from customer limit 10"
assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, "select cid from customer limit :vtg1")
}
}
// We do NOT want to target a shard as that goes around the routing rules and
// defeats the purpose here. We are using a query w/o a WHERE clause so for
// sharded keyspaces it should hit all shards as a SCATTER query. So all we
// care about is the keyspace and tablet type.
destination := fmt.Sprintf("%s@%s", tablet.Keyspace, strings.ToLower(tabletType))
readQuery := "select cid from customer limit 50"
assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, "select cid from customer limit :vtg1")
}

func validateReadsRouteToSource(t *testing.T, tabletTypes string) {
if sourceReplicaTab != nil {
validateReadsRoute(t, tabletTypes, sourceReplicaTab)
tt, err := topoproto.ParseTabletTypes(tabletTypes)
require.NoError(t, err)
if slices.Contains(tt, topodatapb.TabletType_REPLICA) {
require.NotNil(t, sourceReplicaTab)
validateReadsRoute(t, topodatapb.TabletType_REPLICA.String(), sourceReplicaTab)
}
if slices.Contains(tt, topodatapb.TabletType_RDONLY) {
require.NotNil(t, sourceRdonlyTab)
validateReadsRoute(t, topodatapb.TabletType_RDONLY.String(), sourceRdonlyTab)
}
}

func validateReadsRouteToTarget(t *testing.T, tabletTypes string) {
if targetReplicaTab1 != nil {
validateReadsRoute(t, tabletTypes, targetReplicaTab1)
tt, err := topoproto.ParseTabletTypes(tabletTypes)
require.NoError(t, err)
if slices.Contains(tt, topodatapb.TabletType_REPLICA) {
require.NotNil(t, targetReplicaTab1)
validateReadsRoute(t, topodatapb.TabletType_REPLICA.String(), targetReplicaTab1)
}
if slices.Contains(tt, topodatapb.TabletType_RDONLY) {
require.NotNil(t, targetRdonlyTab1)
validateReadsRoute(t, topodatapb.TabletType_RDONLY.String(), targetRdonlyTab1)
}
}

Expand Down Expand Up @@ -411,6 +423,13 @@ func getCurrentStatus(t *testing.T) string {
// but CI currently fails on creating multiple clusters even after the previous ones are torn down

func TestBasicV2Workflows(t *testing.T) {
ogReplicas := defaultReplicas
ogRdOnly := defaultRdonly
defer func() {
defaultReplicas = ogReplicas
defaultRdonly = ogRdOnly
}()
defaultReplicas = 1
defaultRdonly = 1
extraVTTabletArgs = []string{
parallelInsertWorkers,
Expand Down Expand Up @@ -664,7 +683,7 @@ func testMoveTablesV2Workflow(t *testing.T) {
// If it's not then we'll get an error as the table doesn't exist in the vschema
createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431")
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

// Verify that we've properly ignored any internal operational tables
Expand Down Expand Up @@ -725,6 +744,12 @@ func testPartialSwitches(t *testing.T) {
tstWorkflowSwitchReads(t, "", "")
checkStates(t, nextState, nextState) // idempotency

tstWorkflowReverseReads(t, "replica,rdonly", "")
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)

tstWorkflowSwitchReads(t, "", "")
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)

tstWorkflowSwitchWrites(t)
currentState = nextState
nextState = wrangler.WorkflowStateAllSwitched
Expand Down Expand Up @@ -771,12 +796,12 @@ func testRestOfWorkflow(t *testing.T) {
waitForLowLag(t, "customer", "wf1")
tstWorkflowSwitchReads(t, "", "")
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)
validateReadsRouteToTarget(t, "replica")
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToSource(t)

tstWorkflowSwitchWrites(t)
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateAllSwitched)
validateReadsRouteToTarget(t, "replica")
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToTarget(t)

// this function is called for both MoveTables and Reshard, so the reverse workflows exist in different keyspaces
Expand All @@ -787,42 +812,45 @@ func testRestOfWorkflow(t *testing.T) {
waitForLowLag(t, keyspace, "wf1_reverse")
tstWorkflowReverseReads(t, "", "")
checkStates(t, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched)
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToTarget(t)

tstWorkflowReverseWrites(t)
checkStates(t, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

waitForLowLag(t, "customer", "wf1")
tstWorkflowSwitchWrites(t)
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateWritesSwitched)
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToTarget(t)

waitForLowLag(t, keyspace, "wf1_reverse")
tstWorkflowReverseWrites(t)
validateReadsRouteToSource(t, "replica")
checkStates(t, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

waitForLowLag(t, "customer", "wf1")
tstWorkflowSwitchReads(t, "", "")
validateReadsRouteToTarget(t, "replica")
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToSource(t)

tstWorkflowReverseReads(t, "", "")
validateReadsRouteToSource(t, "replica")
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

tstWorkflowSwitchReadsAndWrites(t)
validateReadsRouteToTarget(t, "replica")
validateReadsRoute(t, "rdonly", targetRdonlyTab1)
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToTarget(t)
waitForLowLag(t, keyspace, "wf1_reverse")
tstWorkflowReverseReadsAndWrites(t)
validateReadsRoute(t, "rdonly", sourceRdonlyTab)
validateReadsRouteToSource(t, "replica")
checkStates(t, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

// trying to complete an unswitched workflow should error
Expand All @@ -835,8 +863,7 @@ func testRestOfWorkflow(t *testing.T) {
waitForLowLag(t, "customer", "customer_name")
waitForLowLag(t, "customer", "enterprise_customer")
tstWorkflowSwitchReadsAndWrites(t)
validateReadsRoute(t, "rdonly", targetRdonlyTab1)
validateReadsRouteToTarget(t, "replica")
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToTarget(t)

err = tstWorkflowComplete(t)
Expand Down Expand Up @@ -899,7 +926,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster {

zone1 := vc.Cells["zone1"]

vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, 0, 0, 100, nil)
vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil)

verifyClusterHealth(t, vc)
insertInitialData(t)
Expand All @@ -912,7 +939,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster {
func setupMinimalCustomerKeyspace(t *testing.T) map[string]*cluster.VttabletProcess {
tablets := make(map[string]*cluster.VttabletProcess)
if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, "customer", "-80,80-",
customerVSchema, customerSchema, 0, 0, 200, nil); err != nil {
customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200, nil); err != nil {
t.Fatal(err)
}
defaultCell := vc.Cells[vc.CellNames[0]]
Expand Down Expand Up @@ -1048,6 +1075,7 @@ func createAdditionalCustomerShards(t *testing.T, shards string) {
targetTab2 = custKs.Shards["80-c0"].Tablets["zone1-600"].Vttablet
targetTab1 = custKs.Shards["40-80"].Tablets["zone1-500"].Vttablet
targetReplicaTab1 = custKs.Shards["-40"].Tablets["zone1-401"].Vttablet
targetRdonlyTab1 = custKs.Shards["-40"].Tablets["zone1-402"].Vttablet

sourceTab = custKs.Shards["-80"].Tablets["zone1-200"].Vttablet
sourceReplicaTab = custKs.Shards["-80"].Tablets["zone1-201"].Vttablet
Expand All @@ -1059,3 +1087,34 @@ func tstApplySchemaOnlineDDL(t *testing.T, sql string, keyspace string) {
"--sql", sql, keyspace)
require.NoError(t, err, fmt.Sprintf("ApplySchema Error: %s", err))
}

func validateTableRoutingRule(t *testing.T, table, tabletType, fromKeyspace, toKeyspace string) {
tabletType = strings.ToLower(strings.TrimSpace(tabletType))
rr := getRoutingRules(t)
// We set matched = true by default because it is possible, if --no-routing-rules is set while creating
// a workflow, that the routing rules are empty when the workflow starts.
// We set it to false below when the rule is found, but before matching the routed keyspace.
matched := true
for _, r := range rr.GetRules() {
fromRule := fmt.Sprintf("%s.%s", fromKeyspace, table)
if tabletType != "" && tabletType != "primary" {
fromRule = fmt.Sprintf("%s@%s", fromRule, tabletType)
}
if r.FromTable == fromRule {
// We found the rule, so we can set matched to false here and check for the routed keyspace below.
matched = false
require.NotEmpty(t, r.ToTables)
toTable := r.ToTables[0]
// The ToTables value is of the form "routedKeyspace.table".
routedKeyspace, routedTable, ok := strings.Cut(toTable, ".")
require.True(t, ok)
require.Equal(t, table, routedTable)
if routedKeyspace == toKeyspace {
// We found the rule, the table and keyspace matches, so our search is done.
matched = true
break
}
}
}
require.Truef(t, matched, "routing rule for %s.%s from %s to %s not found", fromKeyspace, table, tabletType, toKeyspace)
}
Loading
Loading