Skip to content

Commit

Permalink
Migrate Workflow: Scope vindex names correctly when target and source…
Browse files Browse the repository at this point in the history
… keyspace have different names (#16769)

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps authored Sep 21, 2024
1 parent 502bd61 commit c3bbce2
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 7 deletions.
89 changes: 88 additions & 1 deletion go/test/endtoend/vreplication/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package vreplication

import (
"fmt"
"strings"
"testing"

"github.com/tidwall/gjson"

"vitess.io/vitess/go/test/endtoend/cluster"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -165,11 +168,18 @@ func TestVtctlMigrate(t *testing.T) {
// However now we need to create an external Vitess cluster. For this we need a different VTDATAROOT and
// hence the VTDATAROOT env variable gets overwritten.
// Each time we need to create vt processes in the "other" cluster we need to set the appropriate VTDATAROOT
func TestVtctldMigrate(t *testing.T) {
func TestVtctldMigrateUnsharded(t *testing.T) {
vc = NewVitessCluster(t, nil)

oldDefaultReplicas := defaultReplicas
oldDefaultRdonly := defaultRdonly
defaultReplicas = 0
defaultRdonly = 0
defer func() {
defaultReplicas = oldDefaultReplicas
defaultRdonly = oldDefaultRdonly
}()

defer vc.TearDown()

defaultCell := vc.Cells[vc.CellNames[0]]
Expand Down Expand Up @@ -299,3 +309,80 @@ func TestVtctldMigrate(t *testing.T) {
require.Errorf(t, err, "there is no vitess cluster named ext1")
})
}

// TestVtctldMigrate adds a test for a sharded cluster to validate a fix for a bug where the target keyspace name
// doesn't match that of the source cluster. The test migrates from a cluster with keyspace customer to an "external"
// cluster with keyspace rating.
func TestVtctldMigrateSharded(t *testing.T) {
oldDefaultReplicas := defaultReplicas
oldDefaultRdonly := defaultRdonly
defaultReplicas = 1
defaultRdonly = 1
defer func() {
defaultReplicas = oldDefaultReplicas
defaultRdonly = oldDefaultRdonly
}()

setSidecarDBName("_vt")
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
vc = setupCluster(t)
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
defer vc.TearDown()
setupCustomerKeyspace(t)
createMoveTablesWorkflow(t, "customer,Lead,datze,customer2")
tstWorkflowSwitchReadsAndWrites(t)
tstWorkflowComplete(t)

var err error
// create external cluster
extCell := "extcell1"
extCells := []string{extCell}
extVc := NewVitessCluster(t, &clusterOptions{
cells: extCells,
clusterConfig: externalClusterConfig,
})
defer extVc.TearDown()

setupExtKeyspace(t, extVc, "rating", extCell)
err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "-80")
require.NoError(t, err)
err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "80-")
require.NoError(t, err)
verifyClusterHealth(t, extVc)
extVtgateConn := getConnection(t, extVc.ClusterConfig.hostname, extVc.ClusterConfig.vtgateMySQLPort)
defer extVtgateConn.Close()

currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Migrate
var output string
if output, err = extVc.VtctldClient.ExecuteCommandWithOutput("Mount", "register", "--name=external", "--topo-type=etcd2",
fmt.Sprintf("--topo-server=localhost:%d", vc.ClusterConfig.topoPort), "--topo-root=/vitess/global"); err != nil {
require.FailNow(t, "Mount command failed with %+v : %s\n", err, output)
}
ksWorkflow := "rating.e1"
if output, err = extVc.VtctldClient.ExecuteCommandWithOutput("Migrate",
"--target-keyspace", "rating", "--workflow", "e1",
"create", "--source-keyspace", "customer", "--mount-name", "external", "--all-tables", "--cells=zone1",
"--tablet-types=primary,replica"); err != nil {
require.FailNow(t, "Migrate command failed with %+v : %s\n", err, output)
}
waitForWorkflowState(t, extVc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
// this is because currently doVtctldclientVDiff is using the global vc :-( and we want to run a diff on the extVc cluster
vc = extVc
doVtctldclientVDiff(t, "rating", "e1", "zone1", nil)
}

func setupExtKeyspace(t *testing.T, vc *VitessCluster, ksName, cellName string) {
numReplicas := 1
shards := []string{"-80", "80-"}
if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells[cellName]}, ksName, strings.Join(shards, ","),
customerVSchema, customerSchema, numReplicas, 0, 1200, nil); err != nil {
t.Fatal(err)
}
vtgate := vc.Cells[cellName].Vtgates[0]
for _, shard := range shards {
err := cluster.WaitForHealthyShard(vc.VtctldClient, ksName, shard)
require.NoError(t, err)
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", ksName, shard), numReplicas, waitTimeout))
}
}
15 changes: 14 additions & 1 deletion go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,20 @@ func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *
for _, mappedCol := range mappedCols {
subExprs = append(subExprs, mappedCol)
}
vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
var vindexName string
if mz.workflowType == binlogdatapb.VReplicationWorkflowType_Migrate {
// For a Migrate, if the TargetKeyspace name is different from the SourceKeyspace name, we need to use the
// SourceKeyspace name to determine the vindex since the TargetKeyspace name is not known to the source.
// Note: it is expected that the source and target keyspaces have the same vindex name and data type.
keyspace := mz.ms.TargetKeyspace
if mz.ms.ExternalCluster != "" {
keyspace = mz.ms.SourceKeyspace
}
vindexName = fmt.Sprintf("%s.%s", keyspace, cv.Name)
} else {
vindexName = fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
}

subExprs = append(subExprs, sqlparser.NewStrLiteral(vindexName))
subExprs = append(subExprs, sqlparser.NewStrLiteral(key.KeyRangeString(targetShard.KeyRange)))
inKeyRange := &sqlparser.FuncExpr{
Expand Down
9 changes: 6 additions & 3 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2168,7 +2168,6 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt
if err != nil {
return nil, err
}

// The stream key is target keyspace/tablet alias, e.g. 0/test-0000000100.
// We sort the keys for intuitive and consistent output.
streamKeys := make([]string, 0, len(workflow.ShardStreams))
Expand Down Expand Up @@ -2224,9 +2223,13 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt
return resp, nil
}

// GetCopyProgress returns the progress of all tables being copied in the
// workflow.
// GetCopyProgress returns the progress of all tables being copied in the workflow.
func (s *Server) GetCopyProgress(ctx context.Context, ts *trafficSwitcher, state *State) (*copyProgress, error) {
if ts.workflowType == binlogdatapb.VReplicationWorkflowType_Migrate {
// The logic below expects the source primaries to be in the same cluster as the target.
// For now we don't report progress for Migrate workflows.
return nil, nil
}
getTablesQuery := "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = %d"
getRowCountQuery := "select table_name, table_rows, data_length from information_schema.tables where table_schema = %s and table_name in (%s)"
tables := make(map[string]bool)
Expand Down
14 changes: 13 additions & 1 deletion go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,19 @@ func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*top
for _, mappedCol := range mappedCols {
subExprs = append(subExprs, mappedCol)
}
vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
var vindexName string
if mz.getWorkflowType() == binlogdatapb.VReplicationWorkflowType_Migrate {
// For a Migrate, if the TargetKeyspace name is different from the SourceKeyspace name, we need to use the
// SourceKeyspace name to determine the vindex since the TargetKeyspace name is not known to the source.
// Note: it is expected that the source and target keyspaces have the same vindex name and data type.
keyspace := mz.ms.TargetKeyspace
if mz.ms.ExternalCluster != "" {
keyspace = mz.ms.SourceKeyspace
}
vindexName = fmt.Sprintf("%s.%s", keyspace, cv.Name)
} else {
vindexName = fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
}
subExprs = append(subExprs, sqlparser.NewStrLiteral(vindexName))
subExprs = append(subExprs, sqlparser.NewStrLiteral("{{.keyrange}}"))
inKeyRange := &sqlparser.FuncExpr{
Expand Down
2 changes: 1 addition & 1 deletion test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,7 @@
},
"vreplication_materialize": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMaterialize"],
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "Materialize"],
"Command": [],
"Manual": false,
"Shard": "vreplication_partial_movetables_and_materialize",
Expand Down

0 comments on commit c3bbce2

Please sign in to comment.