Skip to content

Commit

Permalink
Leave vtctl/wrangler code unchanged
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jul 13, 2023
1 parent 8343c0d commit 6a3b978
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 6 deletions.
122 changes: 122 additions & 0 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"strings"
"sync"

"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/sets"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/concurrency"
Expand Down Expand Up @@ -680,3 +682,123 @@ func areTabletsAvailableToStreamFrom(ctx context.Context, req *vtctldatapb.Workf
}
return nil
}

// LegacyBuildTargets collects MigrationTargets and other metadata (see TargetInfo)
// from a workflow in the target keyspace. It uses VReplicationExec to get the workflow
// details rather than the new TabletManager ReadVReplicationWorkflow RPC. This is
// being used to slowly transition all of the older code, including unit tests, over to
// the new RPC and limit the impact of the new implementation to vtctldclient. You can see
// how the unit tests were being migrated here: https://gist.github.com/mattlord/738c12befe951f8d09304ff7fdc47c46
//
// New callers should instead use the new BuildTargets function.
//
// It returns ErrNoStreams if there are no targets found for the workflow.
func LegacyBuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string, workflow string) (*TargetInfo, error) {
targetShards, err := ts.GetShardNames(ctx, targetKeyspace)
if err != nil {
return nil, err
}

var (
frozen bool
optCells string
optTabletTypes string
targets = make(map[string]*MigrationTarget, len(targetShards))
workflowType binlogdatapb.VReplicationWorkflowType
workflowSubType binlogdatapb.VReplicationWorkflowSubType
)

getVReplicationWorkflowType := func(row sqltypes.RowNamedValues) binlogdatapb.VReplicationWorkflowType {
i, _ := row["workflow_type"].ToInt32()
return binlogdatapb.VReplicationWorkflowType(i)
}

getVReplicationWorkflowSubType := func(row sqltypes.RowNamedValues) binlogdatapb.VReplicationWorkflowSubType {
i, _ := row["workflow_sub_type"].ToInt32()
return binlogdatapb.VReplicationWorkflowSubType(i)
}

// We check all shards in the target keyspace. Not all of them may have a
// stream. For example, if we're splitting -80 to [-40,40-80], only those
// two target shards will have vreplication streams, and the other shards in
// the target keyspace will not.
for _, targetShard := range targetShards {
si, err := ts.GetShard(ctx, targetKeyspace, targetShard)
if err != nil {
return nil, err
}

if si.PrimaryAlias == nil {
// This can happen if bad inputs are given.
return nil, fmt.Errorf("shard %v/%v doesn't have a primary set", targetKeyspace, targetShard)
}

primary, err := ts.GetTablet(ctx, si.PrimaryAlias)
if err != nil {
return nil, err
}

// NB: changing the whitespace of this query breaks tests for now.
// (TODO:@ajm188) extend FakeDBClient to be less whitespace-sensitive on
// expected queries.
query := fmt.Sprintf("select id, source, message, cell, tablet_types, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where workflow=%s and db_name=%s", encodeString(workflow), encodeString(primary.DbName()))
p3qr, err := tmc.VReplicationExec(ctx, primary.Tablet, query)
if err != nil {
return nil, err
}

if len(p3qr.Rows) < 1 {
continue
}

target := &MigrationTarget{
si: si,
primary: primary,
Sources: make(map[int32]*binlogdatapb.BinlogSource),
}

qr := sqltypes.Proto3ToResult(p3qr)
for _, row := range qr.Named().Rows {
id, err := row["id"].ToInt32()
if err != nil {
return nil, err
}

var bls binlogdatapb.BinlogSource
rowBytes, err := row["source"].ToBytes()
if err != nil {
return nil, err
}
if err := prototext.Unmarshal(rowBytes, &bls); err != nil {
return nil, err
}

if row["message"].ToString() == Frozen {
frozen = true
}

target.Sources[id] = &bls
optCells = row["cell"].ToString()
optTabletTypes = row["tablet_types"].ToString()

workflowType = getVReplicationWorkflowType(row)
workflowSubType = getVReplicationWorkflowSubType(row)

}

targets[targetShard] = target
}

if len(targets) == 0 {
return nil, fmt.Errorf("%w in keyspace %s for %s", ErrNoStreams, targetKeyspace, workflow)
}

return &TargetInfo{
Targets: targets,
Frozen: frozen,
OptCells: optCells,
OptTabletTypes: optTabletTypes,
WorkflowType: workflowType,
WorkflowSubType: workflowSubType,
}, nil
}
2 changes: 1 addition & 1 deletion go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ func (wr *Wrangler) DropSources(ctx context.Context, targetKeyspace, workflowNam
}

func (wr *Wrangler) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, error) {
tgtInfo, err := workflow.BuildTargets(ctx, wr.ts, wr.tmc, targetKeyspace, workflowName)
tgtInfo, err := workflow.LegacyBuildTargets(ctx, wr.ts, wr.tmc, targetKeyspace, workflowName)
if err != nil {
log.Infof("Error building targets: %s", err)
return nil, err
Expand Down
10 changes: 5 additions & 5 deletions go/vt/wrangler/vexec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ func TestVExec(t *testing.T) {
}

func TestWorkflowStatusUpdate(t *testing.T) {
require.Equal(t, binlogdatapb.VReplicationWorkflowState_Running, updateState("for vdiff", binlogdatapb.VReplicationWorkflowState_Running, nil, int64(time.Now().Second())))
require.Equal(t, binlogdatapb.VReplicationWorkflowState_Running, updateState("", binlogdatapb.VReplicationWorkflowState_Running, nil, int64(time.Now().Second())))
require.Equal(t, binlogdatapb.VReplicationWorkflowState_Lagging, updateState("", binlogdatapb.VReplicationWorkflowState_Running, nil, int64(time.Now().Second())-100))
require.Equal(t, binlogdatapb.VReplicationWorkflowState_Copying, updateState("", binlogdatapb.VReplicationWorkflowState_Running, []copyState{{Table: "t1", LastPK: "[[INT64(10)]]"}}, int64(time.Now().Second())))
require.Equal(t, binlogdatapb.VReplicationWorkflowState_Error, updateState("error: primary tablet not contactable", binlogdatapb.VReplicationWorkflowState_Running, nil, 0))
require.Equal(t, binlogdatapb.VReplicationWorkflowState_Running.String(), updateState("for vdiff", binlogdatapb.VReplicationWorkflowState_Running, nil, int64(time.Now().Second())))
require.Equal(t, binlogdatapb.VReplicationWorkflowState_Running.String(), updateState("", binlogdatapb.VReplicationWorkflowState_Running, nil, int64(time.Now().Second())))
require.Equal(t, binlogdatapb.VReplicationWorkflowState_Lagging.String(), updateState("", binlogdatapb.VReplicationWorkflowState_Running, nil, int64(time.Now().Second())-100))
require.Equal(t, binlogdatapb.VReplicationWorkflowState_Copying.String(), updateState("", binlogdatapb.VReplicationWorkflowState_Running, []copyState{{Table: "t1", LastPK: "[[INT64(10)]]"}}, int64(time.Now().Second())))
require.Equal(t, binlogdatapb.VReplicationWorkflowState_Error.String(), updateState("error: primary tablet not contactable", binlogdatapb.VReplicationWorkflowState_Running, nil, 0))
}

func TestWorkflowListStreams(t *testing.T) {
Expand Down

0 comments on commit 6a3b978

Please sign in to comment.