Skip to content

Commit

Permalink
[release-19.0] VReplication Workflow: set state correctly when restar…
Browse files Browse the repository at this point in the history
…ting workflow streams in the copy phase (#16217) (#16222)

Signed-off-by: Rohit Nayak <[email protected]>
Co-authored-by: Rohit Nayak <[email protected]>
  • Loading branch information
vitess-bot[bot] and rohit-nayak-ps authored Jun 21, 2024
1 parent 124486c commit 4f6f80f
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 4 deletions.
26 changes: 26 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_vreplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package tabletmanager

import (
"context"
"fmt"
"strings"

"google.golang.org/protobuf/encoding/prototext"
Expand Down Expand Up @@ -49,6 +50,8 @@ const (
sqlSelectVReplicationWorkflowConfig = "select id, source, cell, tablet_types, state, message from %s.vreplication where workflow = %a"
// Update the configuration values for a workflow's vreplication stream.
sqlUpdateVReplicationWorkflowStreamConfig = "update %s.vreplication set state = %a, source = %a, cell = %a, tablet_types = %a where id = %a"
// Check if workflow is still copying.
sqlGetVReplicationCopyStatus = "select distinct vrepl_id from %s.copy_state where vrepl_id = %d"
)

func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) {
Expand Down Expand Up @@ -227,6 +230,18 @@ func (tm *TabletManager) ReadVReplicationWorkflow(ctx context.Context, req *tabl
return resp, nil
}

func isStreamCopying(tm *TabletManager, id int64) (bool, error) {
query := fmt.Sprintf(sqlGetVReplicationCopyStatus, sidecar.GetIdentifier(), id)
res, err := tm.VREngine.Exec(query)
if err != nil {
return false, err
}
if res != nil && len(res.Rows) > 0 {
return true, nil
}
return false, nil
}

// UpdateVReplicationWorkflow updates the sidecar databases's vreplication
// record(s) for this tablet's vreplication workflow stream(s). If there
// are no streams for the given workflow on the tablet then a nil result
Expand Down Expand Up @@ -302,6 +317,17 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta
if !textutil.ValueIsSimulatedNull(req.State) {
state = binlogdatapb.VReplicationWorkflowState_name[int32(req.State)]
}
if state == binlogdatapb.VReplicationWorkflowState_Running.String() {
// `Workflow Start` sets the new state to Running. However, if stream is still copying tables, we should set
// the state as Copying.
isCopying, err := isStreamCopying(tm, id)
if err != nil {
return nil, err
}
if isCopying {
state = binlogdatapb.VReplicationWorkflowState_Copying.String()
}
}
bindVars = map[string]*querypb.BindVariable{
"st": sqltypes.StringBindVariable(state),
"sc": sqltypes.StringBindVariable(string(source)),
Expand Down
38 changes: 34 additions & 4 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,11 +495,18 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
),
fmt.Sprintf("%d", vreplID),
)

getCopyStateQuery := fmt.Sprintf(sqlGetVReplicationCopyStatus, sidecar.GetIdentifier(), int64(vreplID))
copyStatusFields := sqltypes.MakeTestFields(
"id",
"int64",
)
notCopying := sqltypes.MakeTestResult(copyStatusFields)
copying := sqltypes.MakeTestResult(copyStatusFields, "1")
tests := []struct {
name string
request *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest
query string
name string
request *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest
query string
isCopying bool
}{
{
name: "update cells",
Expand Down Expand Up @@ -579,6 +586,19 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
query: fmt.Sprintf(`update _vt.vreplication set state = '%s', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
binlogdatapb.VReplicationWorkflowState_Stopped.String(), keyspace, shard, cells[0], tabletTypes[0], vreplID),
},
{
name: "update to running while copying",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState_Running,
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt),
},
isCopying: true,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Copying', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
keyspace, shard, cells[0], tabletTypes[0], vreplID),
},
}

for _, tt := range tests {
Expand All @@ -597,6 +617,16 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
// These are the same for each RPC call.
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil)
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(selectQuery, selectRes, nil)
if tt.request.State == binlogdatapb.VReplicationWorkflowState_Running ||
tt.request.State == binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt) {
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.GetIdentifier()), &sqltypes.Result{}, nil)
if tt.isCopying {
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(getCopyStateQuery, copying, nil)
} else {
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(getCopyStateQuery, notCopying, nil)

}
}
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil)
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(idQuery, idRes, nil)

Expand Down

0 comments on commit 4f6f80f

Please sign in to comment.