Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

copy over existing vreplication rows copied to local counter if resuming from another tablet #13949

Merged
merged 11 commits into from
Sep 13, 2023
Merged
21 changes: 21 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (
initSequenceTable = "insert into %a.%a (id, next_id, cache) values (0, %d, 1000) on duplicate key update next_id = if(next_id < %d, %d, next_id)"
deleteWorkflow = "delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'"
updatePickedSourceTablet = `update _vt.vreplication set message='Picked source tablet: cell:\"%s\" uid:%d' where id=1`
getRowsCopied = "SELECT rows_copied FROM _vt.vreplication WHERE id=1"
)

var (
Expand Down Expand Up @@ -322,6 +323,16 @@ func TestMoveTables(t *testing.T) {
ftc.vrdbClient.ExpectRequest(fmt.Sprintf(updatePickedSourceTablet, tenv.cells[0], sourceTabletUID), &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(getRowsCopied,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"rows_copied",
"int64",
),
"0",
),
nil,
)
ftc.vrdbClient.ExpectRequest(getWorkflowState, sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys",
Expand Down Expand Up @@ -657,6 +668,16 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) {
&sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(getRowsCopied,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"rows_copied",
"int64",
),
"0",
),
nil,
)
targetTablet.vrdbClient.ExpectRequest(getWorkflowState,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func getExpectedVreplicationQueries(t *testing.T, pos string) []string {
if pos == "" {
return []string{
"/insert into _vt.vreplication",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"begin",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
Expand All @@ -190,6 +191,7 @@ func getExpectedVreplicationQueries(t *testing.T, pos string) []string {
}
return []string{
"/insert into _vt.vreplication",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running'",
}
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/journal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestJournalOneToOne(t *testing.T) {
fmt.Sprintf("delete from _vt.vreplication where id=%d", firstID),
"commit",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running', message='' where id.*",
))

Expand Down Expand Up @@ -145,6 +146,8 @@ func TestJournalOneToMany(t *testing.T) {
"commit",
"/update _vt.vreplication set message='Picked source tablet.*",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running', message='' where id.*",
"/update _vt.vreplication set state='Running', message='' where id.*",
))
Expand Down Expand Up @@ -207,6 +210,7 @@ func TestJournalTablePresent(t *testing.T) {
fmt.Sprintf("delete from _vt.vreplication where id=%d", firstID),
"commit",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running', message='' where id.*",
))

Expand Down
17 changes: 17 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func testPlayerCopyCharPK(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
"insert into dst(idc,val) values ('a\\0',1)",
Expand Down Expand Up @@ -280,6 +281,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
// Copy mode.
Expand Down Expand Up @@ -403,6 +405,7 @@ func testPlayerCopyVarcharCompositePKCaseSensitiveCollation(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
// Copy mode.
Expand Down Expand Up @@ -477,6 +480,7 @@ func testPlayerCopyTablesWithFK(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"select @@foreign_key_checks",
// Create the list of tables to copy and transition to Copying state.
"begin",
Expand Down Expand Up @@ -609,6 +613,7 @@ func testPlayerCopyTables(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -751,6 +756,7 @@ func testPlayerCopyBigTable(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set state='Copying'",
Expand Down Expand Up @@ -881,6 +887,7 @@ func testPlayerCopyWildcardRule(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
// The first fast-forward has no starting point. So, it just saves the current position.
Expand Down Expand Up @@ -1042,6 +1049,7 @@ func testPlayerCopyTableContinuation(t *testing.T) {
expectNontxQueries(t, qh.Expect(
// Catchup
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"insert into dst1(id,val) select 1, 'insert in' from dual where (1,1) <= (6,6)",
"insert into dst1(id,val) select 7, 'insert out' from dual where (7,7) <= (6,6)",
"update dst1 set val='updated' where id=3 and (3,3) <= (6,6)",
Expand Down Expand Up @@ -1172,6 +1180,7 @@ func testPlayerCopyWildcardTableContinuation(t *testing.T) {
"/insert into _vt.vreplication",
"/update _vt.vreplication set state = 'Copying'",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
).Then(func(expect qh.ExpectationSequencer) qh.ExpectationSequencer {
if !optimizeInsertsEnabled {
expect = expect.Then(qh.Immediately("insert into dst(id,val) select 4, 'new' from dual where (4) <= (2)"))
Expand Down Expand Up @@ -1268,6 +1277,7 @@ func TestPlayerCopyWildcardTableContinuationWithOptimizeInserts(t *testing.T) {
"/insert into _vt.vreplication",
"/update _vt.vreplication set state = 'Copying'",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Copy
"insert into dst(id,val) values (3,'uncopied'), (4,'new')",
`/insert into _vt.copy_state .*`,
Expand Down Expand Up @@ -1321,6 +1331,7 @@ func testPlayerCopyTablesNone(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"begin",
"/update _vt.vreplication set state='Stopped'",
"commit",
Expand Down Expand Up @@ -1375,6 +1386,7 @@ func testPlayerCopyTablesStopAfterCopy(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -1464,6 +1476,7 @@ func testPlayerCopyTablesGIPK(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -1563,6 +1576,7 @@ func testPlayerCopyTableCancel(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -1645,6 +1659,7 @@ func testPlayerCopyTablesWithGeneratedColumn(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message=",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state",
// The first fast-forward has no starting point. So, it just saves the current position.
Expand Down Expand Up @@ -1717,6 +1732,7 @@ func testCopyTablesWithInvalidDates(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -1813,6 +1829,7 @@ func testCopyInvisibleColumns(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message=",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
// The first fast-forward has no starting point. So, it just saves the current position.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,7 @@ func TestPlayerDDL(t *testing.T) {
"/update.*'Running'",
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update.*'Running'",
"begin",
fmt.Sprintf("/update.*'%s'", pos2),
Expand Down Expand Up @@ -1919,6 +1920,7 @@ func TestPlayerStopPos(t *testing.T) {
"/update.*'Running'",
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update.*'Running'",
"begin",
"insert into yes(id,val) values (1,'aaa')",
Expand All @@ -1944,6 +1946,7 @@ func TestPlayerStopPos(t *testing.T) {
"/update.*'Running'",
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update.*'Running'",
"begin",
// Since 'no' generates empty transactions that are skipped by
Expand All @@ -1962,6 +1965,7 @@ func TestPlayerStopPos(t *testing.T) {
"/update.*'Running'",
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update.*'Running'",
"/update.*'Stopped'.*already reached",
))
Expand Down Expand Up @@ -2583,6 +2587,7 @@ func TestRestartOnVStreamEnd(t *testing.T) {
})
expectDBClientQueries(t, qh.Expect(
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running'",
"begin",
"insert into t1(id,val) values (2,'aaa')",
Expand Down Expand Up @@ -3155,6 +3160,7 @@ func startVReplication(t *testing.T, bls *binlogdatapb.BinlogSource, pos string)
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running'",
))
var once sync.Once
Expand Down
40 changes: 39 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func newVReplicator(id int32, source *binlogdatapb.BinlogSource, sourceVStreamer
log.Warningf("The supplied value for vreplication_heartbeat_update_interval:%d seconds is larger than the maximum allowed:%d seconds, vreplication will fallback to %d",
vreplicationHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval)
}
return &vreplicator{
vr := &vreplicator{
vre: vre,
id: id,
source: source,
Expand All @@ -151,6 +151,8 @@ func newVReplicator(id int32, source *binlogdatapb.BinlogSource, sourceVStreamer
dbClient: newVDBClient(dbClient, stats),
mysqld: mysqld,
}
vr.setExistingRowsCopied()
return vr
}

// Replicate starts a vreplication stream. It can be in one of three phases:
Expand Down Expand Up @@ -1030,3 +1032,39 @@ func (vr *vreplicator) newClientConnection(ctx context.Context) (*vdbClient, err
}
return dbClient, nil
}

// setExistingRowsCopied deals with the case where another tablet started
// the workflow and a reparent occurred, and now that we manage the
// workflow, we need to read the rows_copied that already exists and add
// them to our counter, otherwise it will look like the reparent wiped all the
// rows_copied. So in the event that our CopyRowCount counter is zero, and
// the existing rows_copied in the vreplication table is not, copy the value of
// vreplication.rows_copied into our CopyRowCount.
func (vr *vreplicator) setExistingRowsCopied() {
if vr.stats.CopyRowCount.Get() == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A large table can take 5 days (as an example). Let's say the shard has 1 primary and 1 replica tablet at any time. The copy starts on the original replica, gets resumed on the original primary, then resumed again on the original replica. Would this value be non-zero but still incorrect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thankfully no, it seems that CopyRowCount gets reset to zero on each reparent. I tested by reparenting several times back and forth and would always see it set it to a higher number each time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, great!

rowsCopiedExisting, err := vr.readExistingRowsCopied(vr.id)
if err != nil {
log.Warningf("Failed to read existing rows copied value for %s worfklow: %v", vr.WorkflowName, err)
} else if rowsCopiedExisting != 0 {
log.Infof("Resuming the %s vreplication workflow started on another tablet, setting rows copied counter to %v", vr.WorkflowName, rowsCopiedExisting)
vr.stats.CopyRowCount.Set(rowsCopiedExisting)
}
}
}

func (vr *vreplicator) readExistingRowsCopied(id int32) (int64, error) {
query, err := sqlparser.ParseAndBind(`SELECT rows_copied FROM _vt.vreplication WHERE id=%a`,
sqltypes.Int32BindVariable(id),
)
if err != nil {
return 0, err
}
r, err := vr.dbClient.Execute(query)
if err != nil {
return 0, err
}
if len(r.Rows) != 1 {
return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "did not get expected single row value when getting rows_copied for workflow id: %d", id)
}
return r.Rows[0][0].ToInt64()
}
52 changes: 52 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,58 @@ func TestCancelledDeferSecondaryKeys(t *testing.T) {
require.Equal(t, 1, len(res.Rows))
}

// TestResumingFromPreviousWorkflowKeepingRowsCopied tests that when you
// resume a workflow started by another tablet (eg. a reparent occurred),
// the rows_copied does not reset to zero but continues along from where
// it left off.
func TestResumingFromPreviousWorkflowKeepingRowsCopied(t *testing.T) {
_, cancel := context.WithCancel(context.Background())
defer cancel()
tablet := addTablet(100)
defer deleteTablet(tablet)
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
}},
}
bls := &binlogdatapb.BinlogSource{
Keyspace: env.KeyspaceName,
Shard: env.ShardName,
Filter: filter,
}
// The test env uses the same factory for both dba and
// filtered connections.
dbconfigs.GlobalDBConfigs.Filtered.User = "vt_dba"
id := int32(1)

vsclient := newTabletConnector(tablet)
olyazavr marked this conversation as resolved.
Show resolved Hide resolved
stats := binlogplayer.NewStats()
defer stats.Stop()

dbaconn := playerEngine.dbClientFactoryDba()
olyazavr marked this conversation as resolved.
Show resolved Hide resolved
err := dbaconn.Connect()
require.NoError(t, err)
defer dbaconn.Close()

dbClient := playerEngine.dbClientFactoryFiltered()
olyazavr marked this conversation as resolved.
Show resolved Hide resolved
err = dbClient.Connect()
require.NoError(t, err)
defer dbClient.Close()

dbName := dbClient.DBName()
olyazavr marked this conversation as resolved.
Show resolved Hide resolved
rowsCopied := int64(500000)
// Ensure there's an existing vreplication workflow
_, err = dbClient.ExecuteFetch(fmt.Sprintf("insert into _vt.vreplication (id, workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, rows_copied) values (%d, 'test', '', '', 99999, 99999, 0, 0, 'Running', '%s', %v) on duplicate key update workflow='test', source='', pos='', max_tps=99999, max_replication_lag=99999, time_updated=0, transaction_timestamp=0, state='Running', db_name='%s', rows_copied=%v",
id, dbName, rowsCopied, dbName, rowsCopied), 1)
require.NoError(t, err)
defer func() {
_, err = dbClient.ExecuteFetch(fmt.Sprintf("delete from _vt.vreplication where id = %d", id), 1)
require.NoError(t, err)
}()
vr := newVReplicator(id, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine)
assert.Equal(t, rowsCopied, vr.stats.CopyRowCount.Get())
}

// stripCruft removes all whitespace unicode chars and backticks.
func stripCruft(in string) string {
out := strings.Builder{}
Expand Down
Loading