Skip to content

Commit

Permalink
Minor changes from self review
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Sep 22, 2024
1 parent 32c29e1 commit 5bd4b7e
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 3 deletions.
11 changes: 11 additions & 0 deletions go/vt/vtctl/workflow/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ func newTestMaterializerTMClient(keyspace string, sourceShards []string, tableSe
}

func (tmc *testMaterializerTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil {
if expect.req != nil && !proto.Equal(expect.req, request) {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", request, expect)
Expand All @@ -257,6 +259,9 @@ func (tmc *testMaterializerTMClient) CreateVReplicationWorkflow(ctx context.Cont
}

func (tmc *testMaterializerTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

if tmc.readVReplicationWorkflow != nil {
return tmc.readVReplicationWorkflow(ctx, tablet, request)
}
Expand Down Expand Up @@ -310,6 +315,9 @@ func (tmc *testMaterializerTMClient) DeleteVReplicationWorkflow(ctx context.Cont
}

func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

schemaDefn := &tabletmanagerdatapb.SchemaDefinition{}
for _, table := range request.Tables {
if table == "/.*/" {
Expand Down Expand Up @@ -430,6 +438,9 @@ func (tmc *testMaterializerTMClient) HasVReplicationWorkflows(ctx context.Contex
}

func (tmc *testMaterializerTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowsRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

workflowType := binlogdatapb.VReplicationWorkflowType_MoveTables
if len(req.IncludeWorkflows) > 0 {
for _, wf := range req.IncludeWorkflows {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/workflow/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1667,7 +1667,7 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) {
}
require.NoError(t, err)
// withTable is a vschema that already contains the table and thus
// we make not vschema changes and there's nothing to cancel.
// we don't make any vschema changes and there's nothing to cancel.
require.True(t, (cancelFunc != nil) == (tcase.targetVSchema != withTable))
utils.MustMatch(t, tcase.out, got, tcase.description)
}
Expand Down Expand Up @@ -2463,7 +2463,7 @@ func TestCreateLookupVindexFailures(t *testing.T) {
},
vrepExecQueries: []string{"CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)"},
createRequest: &createVReplicationWorkflowRequestResponse{
req: nil,
req: nil, // We don't care about defining it in this case
res: &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{},
err: errors.New("we gots us an error"),
},
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,8 @@ func (s *Server) LookupVindexCreate(ctx context.Context, req *vtctldatapb.Lookup
}

if err := s.ts.SaveVSchema(ctx, ms.TargetKeyspace, targetVSchema); err != nil {
return nil, err
return nil, vterrors.Wrapf(err, "failed to save updated vschema '%v' in the %s keyspace",
targetVSchema, ms.TargetKeyspace)
}
ms.TabletTypes = topoproto.MakeStringTypeCSV(req.TabletTypes)
ms.TabletSelectionPreference = req.TabletSelectionPreference
Expand Down Expand Up @@ -4146,6 +4147,8 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str
}
materializeQuery = buf.String()

// Save a copy of the original vschema if we modify it and need to provide
// a cancelFunc.
ogTargetVSchema := targetVSchema.CloneVT()
targetChanged := false

Expand Down

0 comments on commit 5bd4b7e

Please sign in to comment.