Skip to content

Commit

Permalink
Changes after self review and further testing
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Aug 13, 2024
1 parent f6d1557 commit 287876d
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 12 deletions.
13 changes: 9 additions & 4 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1701,15 +1701,18 @@ func releaseInnoDBRowHistory(t *testing.T, dbConn *mysql.Conn) {
// name provided should be the source keyspace as the target tablet stats note the stream's
// source keyspace and shard.
func confirmVReplicationThrottling(t *testing.T, tab *cluster.VttabletProcess, keyspace, workflow string, appname throttlerapp.Name) {
time.Sleep(5 * time.Second) // To be sure that we accrue some lag
const zero = int64(0)
const (
sleepTime = 5 * time.Second
zv = int64(0)
)
time.Sleep(sleepTime) // To be sure that we accrue some lag

jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
throttledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.%s`, workflow, appname)).Int()
require.Greater(t, throttledCount, zero, "JSON value: %s", jsVal)
require.Greater(t, throttledCount, zv, "JSON value: %s", jsVal)

val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
require.NoError(t, err)
Expand All @@ -1727,7 +1730,9 @@ func confirmVReplicationThrottling(t *testing.T, tab *cluster.VttabletProcess, k
// The JSON value looks like this: {"product.0.cproduct.4": 6}
vreplLagSeconds := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.%s\.*`, keyspace, workflow)).Int()
require.NoError(t, err)
require.Greater(t, vreplLagSeconds, zero, "JSON value: %s", jsVal)
// Take off 1 second to deal with timing issues in the test.
minLagSecs := int64(int64(sleepTime.Seconds()) - 1)
require.GreaterOrEqual(t, vreplLagSeconds, minLagSecs, "JSON value: %s", jsVal)

val, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSecondsMax"})
require.NoError(t, err)
Expand Down
12 changes: 5 additions & 7 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,14 +524,12 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
sbm = -1
for i, events := range items {
for j, event := range events {
// If the event has no timestamp OR is an Other/GTID event (which is only meant
// to move the position forward as non-data changing events occur) then do not
// update the lag.
// If the batch consists only of Other/GTID and Heartbeat events then we cannot
// calculate the lag -- as the vstreamer may be fully throttled -- and we will
// If the event has no timestamp OR is a heartbeat event then do not update
// the lag.
// If the batch consists only of heartbeat events then we cannot calculate
// the lag -- as the vstreamer may be fully throttled -- and we will
// estimate it after processing the batch.
if event.Timestamp != 0 &&
!(event.Type == binlogdatapb.VEventType_GTID || event.Type == binlogdatapb.VEventType_HEARTBEAT) {
if event.Timestamp != 0 && event.Type != binlogdatapb.VEventType_HEARTBEAT {
vp.lastTimestampNs = event.Timestamp * 1e9
vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime
sbm = event.CurrentTime/1e9 - event.Timestamp
Expand Down
1 change: 0 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
go throttleEvents(throttledEvents)

for {
// Drain event if timer fired before reset.
resetHBTimer()

select {
Expand Down

0 comments on commit 287876d

Please sign in to comment.