Skip to content

Commit

Permalink
Ignore unrelated shards in partial movetables workflow status
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jul 16, 2023
1 parent df479ad commit 8bc817e
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 9 deletions.
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestUpdateVRWorkflow(t *testing.T) {
OnDdl: binlogdatapb.OnDDLAction_EXEC,
},
query: fmt.Sprintf(`update _vt.vreplication set source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%d)`,
keyspace, shard, binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_EXEC)], vreplID),
keyspace, shard, binlogdatapb.OnDDLAction_EXEC.String(), vreplID),
},
{
name: "update cell,tablet_types,on_ddl",
Expand All @@ -161,7 +161,7 @@ func TestUpdateVRWorkflow(t *testing.T) {
OnDdl: binlogdatapb.OnDDLAction_EXEC_IGNORE,
},
query: fmt.Sprintf(`update _vt.vreplication set source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%d)`,
keyspace, shard, binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_EXEC_IGNORE)], "zone1,zone2,zone3", "rdonly,replica,primary", vreplID),
keyspace, shard, binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), "zone1,zone2,zone3", "rdonly,replica,primary", vreplID),
},
}

Expand Down
4 changes: 2 additions & 2 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const mzSelectIDQuery = "select id from _vt.vreplication where db_name='vt_targe
const mzSelectFrozenQuery = "select 1 from _vt.vreplication where db_name='vt_targetks' and message='FROZEN' and workflow_sub_type != 1"
const mzCheckJournal = "/select val from _vt.resharding_journal where id="

var defaultOnDDL = binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_IGNORE)]
var defaultOnDDL = binlogdatapb.OnDDLAction_IGNORE.String()

func TestMigrateTables(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Expand Down Expand Up @@ -2825,7 +2825,7 @@ func TestMoveTablesDDLFlag(t *testing.T) {

env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{})
if onDDLAction == binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_IGNORE)] {
if onDDLAction == binlogdatapb.OnDDLAction_IGNORE.String() {
// This is the default and go does not marshal defaults
// for prototext fields so we use the default insert stmt.
env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{})
Expand Down
7 changes: 5 additions & 2 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,13 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl

rules := shardRoutingRules.Rules
for _, rule := range rules {
if rule.ToKeyspace == ts.SourceKeyspaceName() {
switch rule.ToKeyspace {
case ts.SourceKeyspaceName():
state.ShardsNotYetSwitched = append(state.ShardsNotYetSwitched, rule.Shard)
} else {
case ts.TargetKeyspaceName():
state.ShardsAlreadySwitched = append(state.ShardsAlreadySwitched, rule.Shard)
default:
// Not a relevant rule.
}
}
} else {
Expand Down
158 changes: 158 additions & 0 deletions go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -263,6 +264,163 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards,
return tme
}

// newTestTablePartialMigrater creates a test tablet migrater
// specifially for partial or shard by shard migrations.
// The shards must be the same on the source and target, and we
// must be moving a subset of them.
// fmtQuery should be of the form: 'select a, b %s group by a'.
// The test will Sprintf a from clause and where clause as needed.
func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shardsToMove []string, fmtQuery string) *testMigraterEnv {
require.Greater(t, len(shards), 1, "shard by shard migrations can only be done on sharded keyspaces")
tme := &testMigraterEnv{}
tme.ts = memorytopo.NewServer("cell1", "cell2")
tme.wr = New(logutil.NewConsoleLogger(), tme.ts, tmclient.NewTabletManagerClient())
tme.wr.sem = semaphore.NewWeighted(1)
tme.sourceShards = shards
tme.targetShards = shards
tme.tmeDB = fakesqldb.New(t)
expectVDiffQueries(tme.tmeDB)
tabletID := 10
for _, shard := range tme.sourceShards {
tme.sourcePrimaries = append(tme.sourcePrimaries, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_PRIMARY, tme.tmeDB, TabletKeyspaceShard(t, "ks1", shard)))
tabletID += 10

_, sourceKeyRange, err := topo.ValidateShardName(shard)
if err != nil {
t.Fatal(err)
}
tme.sourceKeyRanges = append(tme.sourceKeyRanges, sourceKeyRange)
}
tpChoiceTablet := tme.sourcePrimaries[0].Tablet
tpChoice = &testTabletPickerChoice{
keyspace: tpChoiceTablet.Keyspace,
shard: tpChoiceTablet.Shard,
}
for _, shard := range tme.targetShards {
tme.targetPrimaries = append(tme.targetPrimaries, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_PRIMARY, tme.tmeDB, TabletKeyspaceShard(t, "ks2", shard)))
tabletID += 10

_, targetKeyRange, err := topo.ValidateShardName(shard)
if err != nil {
t.Fatal(err)
}
tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange)
}

vs := &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"hash": {
Type: "hash",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "c1",
Name: "hash",
}},
},
"t2": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "c1",
Name: "hash",
}},
},
},
}
err := tme.ts.SaveVSchema(ctx, "ks1", vs)
require.NoError(t, err)
err = tme.ts.SaveVSchema(ctx, "ks2", vs)
require.NoError(t, err)
err = tme.ts.RebuildSrvVSchema(ctx, nil)
require.NoError(t, err)
err = topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tme.ts, "ks1", []string{"cell1"}, false)
require.NoError(t, err)
err = topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tme.ts, "ks2", []string{"cell1"}, false)
require.NoError(t, err)

tme.startTablets(t)
tme.createDBClients(ctx, t)
tme.setPrimaryPositions()
now := time.Now().Unix()

for i, shard := range shards {
var streamInfoRows []string
var streamExtInfoRows []string
for _, shardToMove := range shardsToMove {
if shardToMove == shard {
bls := &binlogdatapb.BinlogSource{
Keyspace: "ks1",
Shard: shard,
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t1 where in_keyrange('%s')", shard)),
}, {
Match: "t2",
Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t2 where in_keyrange('%s')", shard)),
}},
},
}
streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls))
streamExtInfoRows = append(streamExtInfoRows, fmt.Sprintf("%d|||||Running|vt_ks1|%d|%d|0|0||||0", i+1, now, now))
}
}
tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult)
tme.dbTargetClients[i].addInvariant(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
streamInfoRows...))
tme.dbTargetClients[i].addInvariant(streamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys",
"int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"),
streamExtInfoRows...))
tme.dbTargetClients[i].addInvariant(reverseStreamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys",
"int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"),
streamExtInfoRows...))
}

for i, shard := range shards {
var streamInfoRows []string
for _, shardToMove := range shardsToMove {
if shardToMove == shard {
bls := &binlogdatapb.BinlogSource{
Keyspace: "ks2",
Shard: shard,
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t1 where in_keyrange('%s')", shard)),
}, {
Match: "t2",
Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t2 where in_keyrange('%s')", shard)),
}},
},
}
streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls))
}
}
tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult)
tme.dbSourceClients[i].addInvariant(reverseStreamInfoKs1, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
streamInfoRows...),
)
}

srr := make(map[string]string, len(shards))
for _, shard := range shards {
srr[fmt.Sprintf("ks2.%s", shard)] = "ks1"
}
err = topotools.SaveShardRoutingRules(ctx, tme.ts, srr)
require.NoError(t, err)

tme.targetKeyspace = "ks2"
return tme
}

func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targetShards []string) *testShardMigraterEnv {
tme := &testShardMigraterEnv{}
tme.ts = memorytopo.NewServer("cell1", "cell2")
Expand Down
64 changes: 61 additions & 3 deletions go/vt/wrangler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/workflow"

Expand All @@ -49,7 +50,7 @@ func getMoveTablesWorkflow(t *testing.T, cells, tabletTypes string) *VReplicatio
Cells: cells,
TabletTypes: tabletTypes,
MaxAllowedTransactionLagSeconds: defaultMaxAllowedTransactionLagSeconds,
OnDDL: binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_EXEC)],
OnDDL: binlogdatapb.OnDDLAction_EXEC.String(),
}
mtwf := &VReplicationWorkflow{
workflowType: MoveTablesWorkflow,
Expand Down Expand Up @@ -280,7 +281,7 @@ func TestMoveTablesV2(t *testing.T) {
TabletTypes: "REPLICA,RDONLY,PRIMARY",
Timeout: DefaultActionTimeout,
MaxAllowedTransactionLagSeconds: defaultMaxAllowedTransactionLagSeconds,
OnDDL: binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_STOP)],
OnDDL: binlogdatapb.OnDDLAction_STOP.String(),
}
tme := newTestTableMigrater(ctx, t)
defer tme.stopTablets(t)
Expand All @@ -300,6 +301,63 @@ func TestMoveTablesV2(t *testing.T) {
require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState())
}

// TestMoveTablesShardByShard ensures that shard by shard
// migrations work as expected.
func TestMoveTablesShardByShard(t *testing.T) {
ctx := context.Background()
shards := []string{"-80", "80-"}
shardsToMove := shards[0:1]
p := &VReplicationWorkflowParams{
Workflow: "test",
WorkflowType: MoveTablesWorkflow,
SourceKeyspace: "ks1",
SourceShards: shardsToMove, // shard by shard
TargetShards: shardsToMove, // shard by shard
TargetKeyspace: "ks2",
Tables: "t1,t2",
Cells: "cell1,cell2",
TabletTypes: "REPLICA,RDONLY,PRIMARY",
Timeout: DefaultActionTimeout,
MaxAllowedTransactionLagSeconds: defaultMaxAllowedTransactionLagSeconds,
OnDDL: binlogdatapb.OnDDLAction_STOP.String(),
}
tme := newTestTablePartialMigrater(ctx, t, shards, shards[0:1], "select * %s")
defer tme.stopTablets(t)

// Save some unrelated shard routing rules to be sure that
// they don't interfere in any way.
srr, err := tme.ts.GetShardRoutingRules(ctx)
require.NoError(t, err)
srr.Rules = append(srr.Rules, &vschema.ShardRoutingRule{
FromKeyspace: "wut",
Shard: "40-80",
ToKeyspace: "bloop",
})
err = tme.ts.SaveShardRoutingRules(ctx, srr)
require.NoError(t, err)

wf, err := tme.wr.NewVReplicationWorkflow(ctx, MoveTablesWorkflow, p)
require.NoError(t, err)
require.NotNil(t, wf)
require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState())
require.True(t, wf.ts.isPartialMigration, "expected partial shard migration")

trafficSwitchResults := fmt.Sprintf("Reads partially switched, for shards: %s. Writes partially switched, for shards: %s",
strings.Join(shardsToMove, ","), strings.Join(shardsToMove, ","))
tme.expectNoPreviousJournals()
expectMoveTablesQueries(t, tme, p)
tme.expectNoPreviousJournals()
require.NoError(t, testSwitchForward(t, wf))
require.Equal(t, trafficSwitchResults, wf.CurrentState())

/* TODO: Figure out why this isn't working...
tme.expectNoPreviousJournals()
tme.expectNoPreviousReverseJournals()
require.NoError(t, testReverse(t, wf))
require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState())
*/
}

func validateRoutingRuleCount(ctx context.Context, t *testing.T, ts *topo.Server, cnt int) {
rr, err := ts.GetRoutingRules(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -485,7 +543,7 @@ func TestReshardV2(t *testing.T) {
TabletTypes: "replica,rdonly,primary",
Timeout: DefaultActionTimeout,
MaxAllowedTransactionLagSeconds: defaultMaxAllowedTransactionLagSeconds,
OnDDL: binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_EXEC_IGNORE)],
OnDDL: binlogdatapb.OnDDLAction_EXEC_IGNORE.String(),
}
tme := newTestShardMigrater(ctx, t, sourceShards, targetShards)
defer tme.stopTablets(t)
Expand Down

0 comments on commit 8bc817e

Please sign in to comment.