Skip to content

Commit

Permalink
More fixes related to partial traffic handling
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jul 17, 2023
1 parent 8bc817e commit 12d4261
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 56 deletions.
38 changes: 21 additions & 17 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl
}

var (
reverse bool
keyspace string
reverse bool
sourceKeyspace string
)

// We reverse writes by using the source_keyspace.workflowname_reverse workflow
Expand All @@ -229,17 +229,19 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl
// source to check if writes have been switched.
if strings.HasSuffix(workflowName, "_reverse") {
reverse = true
keyspace = state.SourceKeyspace
// Flip the source and target keyspaces.
sourceKeyspace = state.TargetKeyspace
targetKeyspace = state.SourceKeyspace
workflowName = workflow.ReverseWorkflowName(workflowName)
} else {
keyspace = targetKeyspace
sourceKeyspace = state.SourceKeyspace
}
if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
state.WorkflowType = workflow.TypeMoveTables

// We assume a consistent state, so only choose routing rule for one table.
if len(ts.Tables()) == 0 {
return nil, nil, fmt.Errorf("no tables in workflow %s.%s", keyspace, workflowName)
return nil, nil, fmt.Errorf("no tables in workflow %s.%s", targetKeyspace, workflowName)

}
table := ts.Tables()[0]
Expand All @@ -253,21 +255,21 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl
rules := shardRoutingRules.Rules
for _, rule := range rules {
switch rule.ToKeyspace {
case ts.SourceKeyspaceName():
case sourceKeyspace:
state.ShardsNotYetSwitched = append(state.ShardsNotYetSwitched, rule.Shard)
case ts.TargetKeyspaceName():
case targetKeyspace:
state.ShardsAlreadySwitched = append(state.ShardsAlreadySwitched, rule.Shard)
default:
// Not a relevant rule.
}
}
} else {
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, keyspace, table, topodatapb.TabletType_RDONLY)
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_RDONLY)
if err != nil {
return nil, nil, err
}

state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, keyspace, table, topodatapb.TabletType_REPLICA)
state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_REPLICA)
if err != nil {
return nil, nil, err
}
Expand All @@ -278,7 +280,7 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl
for _, table := range ts.Tables() {
rr := globalRules[table]
// if a rule exists for the table and points to the target keyspace, writes have been switched
if len(rr) > 0 && rr[0] == fmt.Sprintf("%s.%s", keyspace, table) {
if len(rr) > 0 && rr[0] == fmt.Sprintf("%s.%s", targetKeyspace, table) {
state.WritesSwitched = true
break
}
Expand All @@ -295,12 +297,12 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl
shard = ts.SourceShards()[0]
}

state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithShardReadsSwitched(ctx, keyspace, shard, topodatapb.TabletType_RDONLY)
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithShardReadsSwitched(ctx, targetKeyspace, shard, topodatapb.TabletType_RDONLY)
if err != nil {
return nil, nil, err
}

state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithShardReadsSwitched(ctx, keyspace, shard, topodatapb.TabletType_REPLICA)
state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithShardReadsSwitched(ctx, targetKeyspace, shard, topodatapb.TabletType_REPLICA)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -333,11 +335,13 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflowNam
if servedType != topodatapb.TabletType_REPLICA && servedType != topodatapb.TabletType_RDONLY {
return nil, fmt.Errorf("tablet type must be REPLICA or RDONLY: %v", servedType)
}
if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_REPLICA && len(ws.ReplicaCellsSwitched) == 0 {
return nil, fmt.Errorf("requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched")
}
if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_RDONLY && len(ws.RdonlyCellsSwitched) == 0 {
return nil, fmt.Errorf("requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")
if !ts.isPartialMigration { // shard level traffic switching is all or nothing
if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_REPLICA && len(ws.ReplicaCellsSwitched) == 0 {
return nil, fmt.Errorf("requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched")
}
if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_RDONLY && len(ws.RdonlyCellsSwitched) == 0 {
return nil, fmt.Errorf("requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")
}
}
switch servedType {
case topodatapb.TabletType_REPLICA:
Expand Down
51 changes: 22 additions & 29 deletions go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,9 @@ func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shar
now := time.Now().Unix()

for i, shard := range shards {
var streamInfoRows []string
var streamExtInfoRows []string
for _, shardToMove := range shardsToMove {
var streamInfoRows []string
var streamExtInfoRows []string
if shardToMove == shard {
bls := &binlogdatapb.BinlogSource{
Keyspace: "ks1",
Expand All @@ -366,25 +366,25 @@ func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shar
streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls))
streamExtInfoRows = append(streamExtInfoRows, fmt.Sprintf("%d|||||Running|vt_ks1|%d|%d|0|0||||0", i+1, now, now))
}
tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult)
tme.dbTargetClients[i].addInvariant(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
streamInfoRows...))
tme.dbTargetClients[i].addInvariant(streamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys",
"int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"),
streamExtInfoRows...))
tme.dbTargetClients[i].addInvariant(reverseStreamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys",
"int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"),
streamExtInfoRows...))
}
tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult)
tme.dbTargetClients[i].addInvariant(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
streamInfoRows...))
tme.dbTargetClients[i].addInvariant(streamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys",
"int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"),
streamExtInfoRows...))
tme.dbTargetClients[i].addInvariant(reverseStreamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys",
"int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"),
streamExtInfoRows...))
}

for i, shard := range shards {
var streamInfoRows []string
for _, shardToMove := range shardsToMove {
var streamInfoRows []string
if shardToMove == shard {
bls := &binlogdatapb.BinlogSource{
Keyspace: "ks2",
Expand All @@ -400,23 +400,16 @@ func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shar
},
}
streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls))
tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult)
}
tme.dbSourceClients[i].addInvariant(reverseStreamInfoKs1, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
streamInfoRows...),
)
}
tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult)
tme.dbSourceClients[i].addInvariant(reverseStreamInfoKs1, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
streamInfoRows...),
)
}

srr := make(map[string]string, len(shards))
for _, shard := range shards {
srr[fmt.Sprintf("ks2.%s", shard)] = "ks1"
}
err = topotools.SaveShardRoutingRules(ctx, tme.ts, srr)
require.NoError(t, err)

tme.targetKeyspace = "ks2"
return tme
}
Expand Down
26 changes: 16 additions & 10 deletions go/vt/wrangler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ func TestMoveTablesV2(t *testing.T) {

// TestMoveTablesShardByShard ensures that shard by shard
// migrations work as expected.
// This test moves tables from one sharded keyspace (ks1) to
// another sharded keyspace (ks2), but only for the -80 shard.
func TestMoveTablesShardByShard(t *testing.T) {
ctx := context.Background()
shards := []string{"-80", "80-"}
Expand All @@ -328,11 +330,18 @@ func TestMoveTablesShardByShard(t *testing.T) {
// they don't interfere in any way.
srr, err := tme.ts.GetShardRoutingRules(ctx)
require.NoError(t, err)
srr.Rules = append(srr.Rules, &vschema.ShardRoutingRule{
FromKeyspace: "wut",
Shard: "40-80",
ToKeyspace: "bloop",
})
srr.Rules = append(srr.Rules, []*vschema.ShardRoutingRule{
{
FromKeyspace: "wut",
Shard: "40-80",
ToKeyspace: "bloop",
},
{
FromKeyspace: "haylo",
Shard: "-80",
ToKeyspace: "blarg",
},
}...)
err = tme.ts.SaveShardRoutingRules(ctx, srr)
require.NoError(t, err)

Expand All @@ -342,20 +351,17 @@ func TestMoveTablesShardByShard(t *testing.T) {
require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState())
require.True(t, wf.ts.isPartialMigration, "expected partial shard migration")

trafficSwitchResults := fmt.Sprintf("Reads partially switched, for shards: %s. Writes partially switched, for shards: %s",
strings.Join(shardsToMove, ","), strings.Join(shardsToMove, ","))
tme.expectNoPreviousJournals()
expectMoveTablesQueries(t, tme, p)
tme.expectNoPreviousJournals()
require.NoError(t, testSwitchForward(t, wf))
require.Equal(t, trafficSwitchResults, wf.CurrentState())
require.Equal(t, "All Reads Switched. All Writes Switched", wf.CurrentState())
require.NoError(t, err)

/* TODO: Figure out why this isn't working...
tme.expectNoPreviousJournals()
tme.expectNoPreviousReverseJournals()
require.NoError(t, testReverse(t, wf))
require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState())
*/
}

func validateRoutingRuleCount(ctx context.Context, t *testing.T, ts *topo.Server, cnt int) {
Expand Down

0 comments on commit 12d4261

Please sign in to comment.