diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index 73b34015338..e2ccde0a0e7 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -21,6 +21,7 @@ import ( "fmt" "os" "regexp" + "slices" "strings" "sync" "sync/atomic" @@ -33,6 +34,7 @@ import ( "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" @@ -94,24 +96,68 @@ func newTestEnv(t *testing.T, ctx context.Context, cell string, sourceKeyspace, env.tmc = newTestTMClient(env) env.ws = NewServer(venv, env.ts, env.tmc) + serving := true tabletID := startingSourceTabletUID for _, shardName := range sourceKeyspace.ShardNames { - _ = env.addTablet(t, ctx, tabletID, sourceKeyspace.KeyspaceName, shardName, topodatapb.TabletType_PRIMARY) + _ = env.addTablet(t, ctx, tabletID, sourceKeyspace.KeyspaceName, shardName, topodatapb.TabletType_PRIMARY, serving) tabletID += tabletUIDStep } - if sourceKeyspace.KeyspaceName != targetKeyspace.KeyspaceName { - tabletID = startingTargetTabletUID - for _, shardName := range targetKeyspace.ShardNames { - _ = env.addTablet(t, ctx, tabletID, targetKeyspace.KeyspaceName, shardName, topodatapb.TabletType_PRIMARY) - tabletID += tabletUIDStep - } + + isReshard := func() bool { + return sourceKeyspace.KeyspaceName == targetKeyspace.KeyspaceName && + !slices.Equal(sourceKeyspace.ShardNames, targetKeyspace.ShardNames) } + + if isReshard() { + serving = false + } + tabletID = startingTargetTabletUID + for _, shardName := range targetKeyspace.ShardNames { + _ = env.addTablet(t, ctx, tabletID, targetKeyspace.KeyspaceName, shardName, topodatapb.TabletType_PRIMARY, serving) + tabletID += tabletUIDStep + } + + if isReshard() { + initSrvKeyspace(t, env.ts, targetKeyspace.KeyspaceName, sourceKeyspace.ShardNames, targetKeyspace.ShardNames, []string{cell}) + } + err := env.ts.RebuildSrvVSchema(ctx, nil) require.NoError(t, err) return env } +func initSrvKeyspace(t *testing.T, topo *topo.Server, keyspace string, sources, targets, cells []string) { + ctx := context.Background() + srvKeyspace := &topodatapb.SrvKeyspace{ + Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{}, + } + getPartition := func(t *testing.T, shards []string) *topodatapb.SrvKeyspace_KeyspacePartition { + partition := &topodatapb.SrvKeyspace_KeyspacePartition{ + ServedType: topodatapb.TabletType_PRIMARY, + ShardReferences: []*topodatapb.ShardReference{}, + } + for _, shard := range shards { + keyRange, err := key.ParseShardingSpec(shard) + require.NoError(t, err) + require.Equal(t, 1, len(keyRange)) + partition.ShardReferences = append(partition.ShardReferences, &topodatapb.ShardReference{ + Name: shard, + KeyRange: keyRange[0], + }) + } + return partition + } + srvKeyspace.Partitions = append(srvKeyspace.Partitions, getPartition(t, sources)) + srvKeyspace.Partitions = append(srvKeyspace.Partitions, getPartition(t, targets)) + for _, cell := range cells { + err := topo.UpdateSrvKeyspace(ctx, cell, keyspace, srvKeyspace) + require.NoError(t, err) + } + err := topo.ValidateSrvKeyspace(ctx, keyspace, strings.Join(cells, ",")) + require.NoError(t, err) +} + func (env *testEnv) close() { for _, k := range maps.Values(env.tablets) { for _, t := range maps.Values(k) { @@ -120,7 +166,7 @@ func (env *testEnv) close() { } } -func (env *testEnv) addTablet(t *testing.T, ctx context.Context, id int, keyspace, shard string, tabletType topodatapb.TabletType) *topodatapb.Tablet { +func (env *testEnv) addTablet(t *testing.T, ctx context.Context, id int, keyspace, shard string, tabletType topodatapb.TabletType, serving bool) *topodatapb.Tablet { tablet := &topodatapb.Tablet{ Alias: &topodatapb.TabletAlias{ Cell: env.cell, @@ -143,7 +189,7 @@ func (env *testEnv) addTablet(t *testing.T, ctx context.Context, id int, keyspac if tabletType == topodatapb.TabletType_PRIMARY { _, err = env.ws.ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { si.PrimaryAlias = tablet.Alias - si.IsPrimaryServing = true + si.IsPrimaryServing = serving return nil }) require.NoError(t, err) diff --git a/go/vt/vtctl/workflow/resharder.go b/go/vt/vtctl/workflow/resharder.go index 95fcea3a2a9..4f4ed34963a 100644 --- a/go/vt/vtctl/workflow/resharder.go +++ b/go/vt/vtctl/workflow/resharder.go @@ -99,6 +99,9 @@ func (s *Server) buildResharder(ctx context.Context, keyspace, workflow string, if err != nil { return nil, vterrors.Wrapf(err, "GetShard(%s) failed", shard) } + if si.PrimaryAlias == nil { + return nil, fmt.Errorf("target shard %v has no primary tablet", shard) + } if si.IsPrimaryServing { return nil, fmt.Errorf("target shard %v is in serving state", shard) } diff --git a/go/vt/vtctl/workflow/resharder_test.go b/go/vt/vtctl/workflow/resharder_test.go new file mode 100644 index 00000000000..1bb2f065e0f --- /dev/null +++ b/go/vt/vtctl/workflow/resharder_test.go @@ -0,0 +1,192 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" +) + +const eol = "$" + +func TestReshardCreate(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + workflowName := "wf1" + tableName := "t1" + sourceKeyspaceName := "targetks" + targetKeyspaceName := "targetks" + tabletTypes := []topodatapb.TabletType{ + topodatapb.TabletType_PRIMARY, + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_RDONLY, + } + tabletTypesStr := topoproto.MakeStringTypeCSV(tabletTypes) + schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ + tableName: { + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: tableName, + Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), + }, + }, + }, + } + + testcases := []struct { + name string + sourceKeyspace, targetKeyspace *testKeyspace + preFunc func(env *testEnv) + want *vtctldatapb.WorkflowStatusResponse + wantErr string + }{ + { + name: "basic", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + want: &vtctldatapb.WorkflowStatusResponse{ + ShardStreams: map[string]*vtctldatapb.WorkflowStatusResponse_ShardStreams{ + "targetks/-80": { + Streams: []*vtctldatapb.WorkflowStatusResponse_ShardStreamState{ + { + Id: 1, + Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID}, + SourceShard: "targetks/0", Position: position, Status: "Running", Info: "VStream Lag: 0s", + }, + }, + }, + "targetks/80-": { + Streams: []*vtctldatapb.WorkflowStatusResponse_ShardStreamState{ + { + Id: 1, + Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID + tabletUIDStep}, + SourceShard: "targetks/0", Position: position, Status: "Running", Info: "VStream Lag: 0s", + }, + }, + }, + }, + TrafficState: "Reads Not Switched. Writes Not Switched", + }, + }, + { + name: "no primary", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + preFunc: func(env *testEnv) { + _, err := env.ts.UpdateShardFields(ctx, targetKeyspaceName, "-80", func(si *topo.ShardInfo) error { + si.PrimaryAlias = nil + return nil + }) + require.NoError(t, err) + }, + wantErr: "buildResharder: target shard -80 has no primary tablet", + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + require.NotNil(t, tc.sourceKeyspace) + require.NotNil(t, tc.targetKeyspace) + + env := newTestEnv(t, ctx, defaultCellName, tc.sourceKeyspace, tc.targetKeyspace) + defer env.close() + env.tmc.schema = schema + + req := &vtctldatapb.ReshardCreateRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + TabletTypes: tabletTypes, + SourceShards: tc.sourceKeyspace.ShardNames, + TargetShards: tc.targetKeyspace.ShardNames, + Cells: []string{env.cell}, + } + + for i := range tc.sourceKeyspace.ShardNames { + tabletUID := startingSourceTabletUID + (tabletUIDStep * i) + env.tmc.expectVRQuery( + tabletUID, + "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1", + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery( + tabletUID, + "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", + &sqltypes.Result{}, + ) + } + + for i, target := range tc.targetKeyspace.ShardNames { + tabletUID := startingTargetTabletUID + (tabletUIDStep * i) + env.tmc.expectVRQuery( + tabletUID, + insertPrefix+ + `\('`+workflowName+`', 'keyspace:"`+targetKeyspaceName+`" shard:"0" filter:{rules:{match:"/.*" filter:"`+target+`"}}', '', [0-9]*, [0-9]*, '`+ + env.cell+`', '`+tabletTypesStr+`', [0-9]*, 0, 'Stopped', 'vt_`+targetKeyspaceName+`', 4, 0, false, '{}'\)`+eol, + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery( + tabletUID, + "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1", + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery( + tabletUID, + "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", + &sqltypes.Result{}, + ) + } + + if tc.preFunc != nil { + tc.preFunc(env) + } + + res, err := env.ws.ReshardCreate(ctx, req) + if tc.wantErr != "" { + require.EqualError(t, err, tc.wantErr) + return + } + require.NoError(t, err) + if tc.want != nil { + require.Equal(t, tc.want, res) + } + }) + } +} diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 31c27601f6b..19268866253 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1725,7 +1725,7 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea if err := s.ts.ValidateSrvKeyspace(ctx, keyspace, strings.Join(cells, ",")); err != nil { err2 := vterrors.Wrapf(err, "SrvKeyspace for keyspace %s is corrupt for cell(s) %s", keyspace, cells) - log.Errorf("%w", err2) + log.Errorf("%v", err2) return nil, err } tabletTypesStr := discovery.BuildTabletTypesString(req.TabletTypes, req.TabletSelectionPreference) @@ -1755,6 +1755,7 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea return s.WorkflowStatus(ctx, &vtctldatapb.WorkflowStatusRequest{ Keyspace: req.Keyspace, Workflow: req.Workflow, + Shards: req.TargetShards, }) }