Skip to content

Commit

Permalink
[latest-17.0](vitessio#3181): CherryPick(vitessio#14008): MoveTables …
Browse files Browse the repository at this point in the history
…Cancel: drop denied tables on target when dropping source/target tables (vitessio#3276)

* backport of 3181

* Fix conflicts. Modify tests to revert changes made in v18

Signed-off-by: Rohit Nayak <[email protected]>

---------

Signed-off-by: Rohit Nayak <[email protected]>
Co-authored-by: Rohit Nayak <[email protected]>
  • Loading branch information
planetscale-actions-bot and rohit-nayak-ps authored Oct 2, 2023
1 parent 3851626 commit 5ea1c24
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 24 deletions.
12 changes: 11 additions & 1 deletion go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,17 @@ func checkIfTableExists(t *testing.T, vc *VitessCluster, tabletAlias string, tab
return found, nil
}

func checkIfDenyListExists(t *testing.T, vc *VitessCluster, ksShard string, table string) (bool, error) {
func validateTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table string, mustExist bool) {
found, err := isTableInDenyList(t, vc, ksShard, table)
require.NoError(t, err)
if mustExist {
require.True(t, found, "Table %s not found in deny list", table)
} else {
require.False(t, found, "Table %s found in deny list", table)
}
}

func isTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table string) (bool, error) {
var output string
var err error
found := false
Expand Down
45 changes: 44 additions & 1 deletion go/test/endtoend/vreplication/partial_movetables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,46 @@ import (
"vitess.io/vitess/go/vt/wrangler"
)

// testCancel() starts and cancels a partial MoveTables for one of the shards which will be actually moved later on.
// Before canceling, we first switch traffic to the target keyspace and then reverse it back to the source keyspace.
// This tests that artifacts are being properly cleaned up when a MoveTables ia canceled.
func testCancel(t *testing.T) {
targetKeyspace := "customer2"
sourceKeyspace := "customer"
wfName := "partial80DashForCancel"
// We use a different table in this MoveTables than the subsequent one, so that setting up of the artifacts
// while creating MoveTables do not paper over any issues with cleaning up artifacts when MoveTables is canceled.
// Ref: https://github.com/vitessio/vitess/issues/13998
table := "customer2"
shard := "80-"
// start the partial movetables for 80-
err := tstWorkflowExec(t, defaultCellName, wfName, sourceKeyspace, targetKeyspace,
table, workflowActionCreate, "", shard, "")
require.NoError(t, err)

checkDenyList := func(keyspace string, expected bool) {
validateTableInDenyList(t, vc, fmt.Sprintf("%s:%s", keyspace, shard), table, expected)
}
targetTab1 = vc.getPrimaryTablet(t, targetKeyspace, shard)
catchup(t, targetTab1, wfName, "")

checkDenyList(targetKeyspace, false)
checkDenyList(sourceKeyspace, false)

require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKeyspace, "", workflowActionSwitchTraffic, "", "", ""))
checkDenyList(targetKeyspace, false)
checkDenyList(sourceKeyspace, true)

require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKeyspace, "", workflowActionReverseTraffic, "", "", ""))
checkDenyList(targetKeyspace, true)
checkDenyList(sourceKeyspace, false)

require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKeyspace, "", workflowActionCancel, "", "", ""))
checkDenyList(targetKeyspace, false)
checkDenyList(sourceKeyspace, false)

}

// TestPartialMoveTablesBasic tests partial move tables by moving each
// customer shard -- -80,80- -- once a a time to customer2.
func TestPartialMoveTablesBasic(t *testing.T) {
Expand Down Expand Up @@ -58,7 +98,7 @@ func TestPartialMoveTablesBasic(t *testing.T) {

// Move customer table from unsharded product keyspace to
// sharded customer keyspace.
createMoveTablesWorkflow(t, "customer")
createMoveTablesWorkflow(t, "customer,customer2")
tstWorkflowSwitchReadsAndWrites(t)
tstWorkflowComplete(t)

Expand All @@ -79,6 +119,9 @@ func TestPartialMoveTablesBasic(t *testing.T) {
// move tables for one of the two shards: 80-.
defaultRdonly = 0
setupCustomer2Keyspace(t)

testCancel(t)

currentWorkflowType = wrangler.MoveTablesWorkflow
wfName := "partial80Dash"
sourceKs := "customer"
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func TestMultiCellVreplicationWorkflow(t *testing.T) {
verifyClusterHealth(t, vc)
insertInitialData(t)
shardCustomer(t, true, []*Cell{cell1, cell2}, cell2.Name, true)
checkIfDenyListExists(t, vc, "product:0", "customer")
isTableInDenyList(t, vc, "product:0", "customer")
// we tag along this test so as not to create the overhead of creating another cluster
testVStreamCellFlag(t)
}
Expand Down Expand Up @@ -854,13 +854,13 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
switchWrites(t, workflowType, ksWorkflow, false)

var exists bool
exists, err = checkIfDenyListExists(t, vc, "product:0", "customer")
exists, err = isTableInDenyList(t, vc, "product:0", "customer")
require.NoError(t, err, "Error getting denylist for customer:0")
require.True(t, exists)

moveTablesAction(t, "Complete", allCellNames, workflow, sourceKs, targetKs, tables)

exists, err = checkIfDenyListExists(t, vc, "product:0", "customer")
exists, err = isTableInDenyList(t, vc, "product:0", "customer")
require.NoError(t, err, "Error getting denylist for customer:0")
require.False(t, exists)

Expand Down
4 changes: 2 additions & 2 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func (si *ShardInfo) GetTabletControl(tabletType topodatapb.TabletType) *topodat
return nil
}

// UpdateSourceDeniedTables will add or remove the listed tables
// UpdateDeniedTables will add or remove the listed tables
// in the shard record's TabletControl structures. Note we don't
// support a lot of the corner cases:
// - only support one table list per shard. If we encounter a different
Expand All @@ -421,7 +421,7 @@ func (si *ShardInfo) GetTabletControl(tabletType topodatapb.TabletType) *topodat
// because it's not used in the same context (vertical vs horizontal sharding)
//
// This function should be called while holding the keyspace lock.
func (si *ShardInfo) UpdateSourceDeniedTables(ctx context.Context, tabletType topodatapb.TabletType, cells []string, remove bool, tables []string) error {
func (si *ShardInfo) UpdateDeniedTables(ctx context.Context, tabletType topodatapb.TabletType, cells []string, remove bool, tables []string) error {
if err := CheckKeyspaceLocked(ctx, si.keyspace); err != nil {
return err
}
Expand Down
20 changes: 10 additions & 10 deletions go/vt/topo/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ func lockedKeyspaceContext(keyspace string) context.Context {
}

func addToDenyList(ctx context.Context, si *ShardInfo, tabletType topodatapb.TabletType, cells, tables []string) error {
if err := si.UpdateSourceDeniedTables(ctx, tabletType, cells, false, tables); err != nil {
if err := si.UpdateDeniedTables(ctx, tabletType, cells, false, tables); err != nil {
return err
}
return nil
}

func removeFromDenyList(ctx context.Context, si *ShardInfo, tabletType topodatapb.TabletType, cells, tables []string) error {
if err := si.UpdateSourceDeniedTables(ctx, tabletType, cells, true, tables); err != nil {
if err := si.UpdateDeniedTables(ctx, tabletType, cells, true, tables); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -161,13 +161,13 @@ func TestUpdateSourceDeniedTables(t *testing.T) {

// check we enforce the keyspace lock
ctx := context.Background()
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, nil, false, nil); err == nil || err.Error() != "keyspace ks is not locked (no locksInfo)" {
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, nil, false, nil); err == nil || err.Error() != "keyspace ks is not locked (no locksInfo)" {
t.Fatalf("unlocked keyspace produced wrong error: %v", err)
}
ctx = lockedKeyspaceContext("ks")

// add one cell
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
{
TabletType: topodatapb.TabletType_RDONLY,
Cells: []string{"first"},
Expand All @@ -178,20 +178,20 @@ func TestUpdateSourceDeniedTables(t *testing.T) {
}

// remove that cell, going back
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, true, nil); err != nil || len(si.TabletControls) != 0 {
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, true, nil); err != nil || len(si.TabletControls) != 0 {
t.Fatalf("going back should have remove the record: %v", si)
}

// re-add a cell, then another with different table list to
// make sure it fails
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, false, []string{"t1", "t2"}); err != nil {
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, false, []string{"t1", "t2"}); err != nil {
t.Fatalf("one cell add failed: %v", si)
}
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, false, []string{"t2", "t3"}); err == nil || err.Error() != "trying to use two different sets of denied tables for shard ks/sh: [t1 t2] and [t2 t3]" {
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, false, []string{"t2", "t3"}); err == nil || err.Error() != "trying to use two different sets of denied tables for shard ks/sh: [t1 t2] and [t2 t3]" {
t.Fatalf("different table list should fail: %v", err)
}
// add another cell, see the list grow
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
{
TabletType: topodatapb.TabletType_RDONLY,
Cells: []string{"first", "second"},
Expand All @@ -202,7 +202,7 @@ func TestUpdateSourceDeniedTables(t *testing.T) {
}

// add all cells, see the list grow to all
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first", "second", "third"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first", "second", "third"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
{
TabletType: topodatapb.TabletType_RDONLY,
Cells: []string{"first", "second", "third"},
Expand All @@ -213,7 +213,7 @@ func TestUpdateSourceDeniedTables(t *testing.T) {
}

// remove one cell from the full list
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, true, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, true, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
{
TabletType: topodatapb.TabletType_RDONLY,
Cells: []string{"first", "third"},
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2957,7 +2957,7 @@ func (s *VtctldServer) SetShardTabletControl(ctx context.Context, req *vtctldata
defer unlock(&err)

si, err := s.ts.UpdateShardFields(ctx, req.Keyspace, req.Shard, func(si *topo.ShardInfo) error {
return si.UpdateSourceDeniedTables(ctx, req.TabletType, req.Cells, req.Remove, req.DeniedTables)
return si.UpdateDeniedTables(ctx, req.TabletType, req.Cells, req.Remove, req.DeniedTables)
})

switch {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (target *MigrationTarget) GetPrimary() *topo.TabletInfo {

// BuildTargets collects MigrationTargets and other metadata (see TargetInfo)
// from a workflow in the target keyspace.
//

// It returns ErrNoStreams if there are no targets found for the workflow.
func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string, workflow string) (*TargetInfo, error) {
targetShards, err := ts.GetShardNames(ctx, targetKeyspace)
Expand Down
4 changes: 4 additions & 0 deletions go/vt/wrangler/switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (r *switcher) dropSourceDeniedTables(ctx context.Context) error {
return r.ts.dropSourceDeniedTables(ctx)
}

func (r *switcher) dropTargetDeniedTables(ctx context.Context) error {
return r.ts.dropTargetDeniedTables(ctx)
}

func (r *switcher) validateWorkflowHasCompleted(ctx context.Context) error {
return r.ts.validateWorkflowHasCompleted(ctx)
}
Expand Down
12 changes: 12 additions & 0 deletions go/vt/wrangler/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,18 @@ func (dr *switcherDryRun) dropSourceDeniedTables(ctx context.Context) error {
return nil
}

func (dr *switcherDryRun) dropTargetDeniedTables(ctx context.Context) error {
logs := make([]string, 0)
for _, si := range dr.ts.TargetShards() {
logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s Tablet %d", si.Keyspace(), si.ShardName(), si.PrimaryAlias.Uid))
}
if len(logs) > 0 {
dr.drLog.Log(fmt.Sprintf("Denied tables [%s] will be removed from:", strings.Join(dr.ts.Tables(), ",")))
dr.drLog.LogSlice(logs)
}
return nil
}

func (dr *switcherDryRun) logs() *[]string {
return &dr.drLog.logs
}
Expand Down
1 change: 1 addition & 0 deletions go/vt/wrangler/switcher_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type iswitcher interface {
removeSourceTables(ctx context.Context, removalType workflow.TableRemovalType) error
dropSourceShards(ctx context.Context) error
dropSourceDeniedTables(ctx context.Context) error
dropTargetDeniedTables(ctx context.Context) error
freezeTargetVReplication(ctx context.Context) error
dropSourceReverseVReplicationStreams(ctx context.Context) error
dropTargetVReplicationStreams(ctx context.Context) error
Expand Down
26 changes: 23 additions & 3 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,9 @@ func (wr *Wrangler) DropTargets(ctx context.Context, targetKeyspace, workflow st
if err := sw.dropSourceDeniedTables(ctx); err != nil {
return nil, err
}
if err := sw.dropTargetDeniedTables(ctx); err != nil {
return nil, err
}
case binlogdatapb.MigrationType_SHARDS:
log.Infof("Removing target shards")
if err := sw.dropTargetShards(ctx); err != nil {
Expand Down Expand Up @@ -835,6 +838,9 @@ func (wr *Wrangler) DropSources(ctx context.Context, targetKeyspace, workflowNam
if err := sw.dropSourceDeniedTables(ctx); err != nil {
return nil, err
}
if err := sw.dropTargetDeniedTables(ctx); err != nil {
return nil, err
}

case binlogdatapb.MigrationType_SHARDS:
log.Infof("Removing shards")
Expand Down Expand Up @@ -1193,7 +1199,7 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error {
func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access accessType) error {
err := ts.ForAllSources(func(source *workflow.MigrationSource) error {
if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error {
return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables())
return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables())
}); err != nil {
return err
}
Expand Down Expand Up @@ -1498,7 +1504,7 @@ func (ts *trafficSwitcher) allowTargetWrites(ctx context.Context) error {
func (ts *trafficSwitcher) allowTableTargetWrites(ctx context.Context) error {
return ts.ForAllTargets(func(target *workflow.MigrationTarget) error {
if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error {
return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables())
return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables())
}); err != nil {
return err
}
Expand Down Expand Up @@ -1641,7 +1647,7 @@ func (ts *trafficSwitcher) TargetShards() []*topo.ShardInfo {
func (ts *trafficSwitcher) dropSourceDeniedTables(ctx context.Context) error {
return ts.ForAllSources(func(source *workflow.MigrationSource) error {
if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error {
return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables())
return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables())
}); err != nil {
return err
}
Expand All @@ -1652,6 +1658,20 @@ func (ts *trafficSwitcher) dropSourceDeniedTables(ctx context.Context) error {
})
}

func (ts *trafficSwitcher) dropTargetDeniedTables(ctx context.Context) error {
return ts.ForAllTargets(func(target *workflow.MigrationTarget) error {
if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error {
return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables())
}); err != nil {
return err
}
rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout)
defer cancel()
_, _, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger())
return err
})
}

func (ts *trafficSwitcher) validateWorkflowHasCompleted(ctx context.Context) error {
return doValidateWorkflowHasCompleted(ctx, ts)
}
Expand Down
12 changes: 10 additions & 2 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,11 @@ func testTableMigrateOneToMany(t *testing.T, keepData, keepRoutingRules bool) {
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t1",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t2",
"Denied tables [t1,t2] will be removed from:",
" Keyspace ks1 Shard 0 Tablet 10")
" Keyspace ks1 Shard 0 Tablet 10",
"Denied tables [t1,t2] will be removed from:",
" Keyspace ks2 Shard -80 Tablet 20",
" Keyspace ks2 Shard 80- Tablet 30",
)
}
wantdryRunDropSources = append(wantdryRunDropSources, "Delete reverse vreplication streams on source:",
" Keyspace ks1 Shard 0 Workflow test_reverse DbName vt_ks1 Tablet 10",
Expand Down Expand Up @@ -919,7 +923,11 @@ func testTableMigrateOneToMany(t *testing.T, keepData, keepRoutingRules bool) {
"Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t1",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t2",
"Denied tables [t1,t2] will be removed from:",
" Keyspace ks1 Shard 0 Tablet 10")
" Keyspace ks1 Shard 0 Tablet 10",
"Denied tables [t1,t2] will be removed from:",
" Keyspace ks2 Shard -80 Tablet 20",
" Keyspace ks2 Shard 80- Tablet 30",
)
}
wantdryRunRenameSources = append(wantdryRunRenameSources, "Delete reverse vreplication streams on source:",
" Keyspace ks1 Shard 0 Workflow test_reverse DbName vt_ks1 Tablet 10",
Expand Down

0 comments on commit 5ea1c24

Please sign in to comment.