Skip to content

Commit

Permalink
VReplication: Undo vschema changes on LookupVindex workflow creation …
Browse files Browse the repository at this point in the history
…fail (#16810)

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Sep 26, 2024
1 parent 2e2b223 commit 7517840
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 88 deletions.
28 changes: 23 additions & 5 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ type testTMClient struct {

mu sync.Mutex
vrQueries map[int][]*queryResult
createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest
createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse
readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest
primaryPositions map[uint32]string
vdiffRequests map[uint32]*vdiffRequestResponse
Expand All @@ -289,7 +289,7 @@ func newTestTMClient(env *testEnv) *testTMClient {
return &testTMClient{
schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition),
vrQueries: make(map[int][]*queryResult),
createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest),
createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse),
readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest),
readVReplicationWorkflowsResponses: make(map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse),
primaryPositions: make(map[uint32]string),
Expand All @@ -304,9 +304,12 @@ func (tmc *testTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet
defer tmc.mu.Unlock()

if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil {
if !proto.Equal(expect, req) {
if expect.req != nil && !proto.Equal(expect.req, req) {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", req, expect)
}
if expect.res != nil {
return expect.res, expect.err
}
}
res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1")
return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil
Expand Down Expand Up @@ -418,20 +421,29 @@ func (tmc *testTMClient) expectVRQueryResultOnKeyspaceTablets(keyspace string, q
}
}

func (tmc *testTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) {
func (tmc *testTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *createVReplicationWorkflowRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

tmc.createVReplicationWorkflowRequests[tabletID] = req
}

func (tmc *testTMClient) expectCreateVReplicationWorkflowRequestOnTargetTablets(req *createVReplicationWorkflowRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

for _, tablet := range tmc.env.tablets[tmc.env.targetKeyspace.KeyspaceName] {
tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid] = req
}
}

func (tmc *testTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

qrs := tmc.vrQueries[int(tablet.Alias.Uid)]
if len(qrs) == 0 {
return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query)
return nil, fmt.Errorf("tablet %v does not expect any more queries: %q", tablet, query)
}
matched := false
if qrs[0].query[0] == '/' {
Expand Down Expand Up @@ -479,6 +491,12 @@ type vdiffRequestResponse struct {
err error
}

type createVReplicationWorkflowRequestResponse struct {
req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest
res *tabletmanagerdatapb.CreateVReplicationWorkflowResponse
err error
}

func (tmc *testTMClient) expectVDiffRequest(tablet *topodatapb.Tablet, vrr *vdiffRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
Expand Down
10 changes: 6 additions & 4 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,11 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe
if err := validateNewWorkflow(mz.ctx, mz.ts, mz.tmc, mz.ms.TargetKeyspace, mz.ms.Workflow); err != nil {
return err
}

err := mz.buildMaterializer()
if err != nil {
return err
}
if err := mz.deploySchema(); err != nil {
return err
}

var workflowSubType binlogdatapb.VReplicationWorkflowSubType
workflowSubType, err = mz.getWorkflowSubType()
Expand All @@ -133,6 +131,10 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe
}
req.Options = optionsJSON

if err := mz.deploySchema(); err != nil {
return err
}

return mz.forAllTargets(func(target *topo.ShardInfo) error {
targetPrimary, err := mz.ts.GetTablet(mz.ctx, target.PrimaryAlias)
if err != nil {
Expand Down Expand Up @@ -304,7 +306,7 @@ func (mz *materializer) deploySchema() error {
continue
}
if ts.CreateDdl == "" {
return fmt.Errorf("target table %v does not exist and there is no create ddl defined", ts.TargetTable)
return fmt.Errorf("target table %s does not exist and there is no create ddl defined", ts.TargetTable)
}

var err error
Expand Down
56 changes: 47 additions & 9 deletions go/vt/vtctl/workflow/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topotools"
Expand Down Expand Up @@ -120,16 +121,39 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M
if err == nil {
tableName = table.Name.String()
}
var (
cols []string
fields []*querypb.Field
)
if ts.CreateDdl != "" {
stmt, err := env.venv.Parser().ParseStrictDDL(ts.CreateDdl)
require.NoError(t, err)
ddl, ok := stmt.(*sqlparser.CreateTable)
require.True(t, ok)
cols = make([]string, len(ddl.TableSpec.Columns))
fields = make([]*querypb.Field, len(ddl.TableSpec.Columns))
for i, col := range ddl.TableSpec.Columns {
cols[i] = col.Name.String()
fields[i] = &querypb.Field{
Name: col.Name.String(),
Type: col.Type.SQLType(),
}
}
}
env.tmc.schema[ms.SourceKeyspace+"."+tableName] = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Name: tableName,
Schema: fmt.Sprintf("%s_schema", tableName),
Name: tableName,
Schema: ts.CreateDdl,
Columns: cols,
Fields: fields,
}},
}
env.tmc.schema[ms.TargetKeyspace+"."+ts.TargetTable] = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Name: ts.TargetTable,
Schema: fmt.Sprintf("%s_schema", ts.TargetTable),
Name: ts.TargetTable,
Schema: ts.CreateDdl,
Columns: cols,
Fields: fields,
}},
}
}
Expand Down Expand Up @@ -199,7 +223,7 @@ type testMaterializerTMClient struct {

mu sync.Mutex
vrQueries map[int][]*queryResult
createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest
createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse

// Used to confirm the number of times WorkflowDelete was called.
workflowDeleteCalls int
Expand All @@ -215,21 +239,29 @@ func newTestMaterializerTMClient(keyspace string, sourceShards []string, tableSe
sourceShards: sourceShards,
tableSettings: tableSettings,
vrQueries: make(map[int][]*queryResult),
createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest),
createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse),
}
}

func (tmc *testMaterializerTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil {
if !proto.Equal(expect, request) {
if expect.req != nil && !proto.Equal(expect.req, request) {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", request, expect)
}
if expect.res != nil {
return expect.res, expect.err
}
}
res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1")
return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil
}

func (tmc *testMaterializerTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

if tmc.readVReplicationWorkflow != nil {
return tmc.readVReplicationWorkflow(ctx, tablet, request)
}
Expand Down Expand Up @@ -283,6 +315,9 @@ func (tmc *testMaterializerTMClient) DeleteVReplicationWorkflow(ctx context.Cont
}

func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

schemaDefn := &tabletmanagerdatapb.SchemaDefinition{}
for _, table := range request.Tables {
if table == "/.*/" {
Expand Down Expand Up @@ -315,7 +350,7 @@ func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, r
})
}

func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) {
func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *createVReplicationWorkflowRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

Expand Down Expand Up @@ -344,7 +379,7 @@ func (tmc *testMaterializerTMClient) VReplicationExec(ctx context.Context, table

qrs := tmc.vrQueries[int(tablet.Alias.Uid)]
if len(qrs) == 0 {
return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query)
return nil, fmt.Errorf("tablet %v does not expect any more queries: %q", tablet, query)
}
matched := false
if qrs[0].query[0] == '/' {
Expand Down Expand Up @@ -403,6 +438,9 @@ func (tmc *testMaterializerTMClient) HasVReplicationWorkflows(ctx context.Contex
}

func (tmc *testMaterializerTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowsRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

workflowType := binlogdatapb.VReplicationWorkflowType_MoveTables
if len(req.IncludeWorkflows) > 0 {
for _, wf := range req.IncludeWorkflows {
Expand Down
Loading

0 comments on commit 7517840

Please sign in to comment.