diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 8faa0d43fd4..6a372b3fd09 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -1701,15 +1701,18 @@ func releaseInnoDBRowHistory(t *testing.T, dbConn *mysql.Conn) { // name provided should be the source keyspace as the target tablet stats note the stream's // source keyspace and shard. func confirmVReplicationThrottling(t *testing.T, tab *cluster.VttabletProcess, keyspace, workflow string, appname throttlerapp.Name) { - time.Sleep(5 * time.Second) // To be sure that we accrue some lag - const zero = int64(0) + const ( + sleepTime = 5 * time.Second + zv = int64(0) + ) + time.Sleep(sleepTime) // To be sure that we accrue some lag jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"}) require.NoError(t, err) require.NotEqual(t, "{}", jsVal) // The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4} throttledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.%s`, workflow, appname)).Int() - require.Greater(t, throttledCount, zero, "JSON value: %s", jsVal) + require.Greater(t, throttledCount, zv, "JSON value: %s", jsVal) val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"}) require.NoError(t, err) @@ -1727,7 +1730,9 @@ func confirmVReplicationThrottling(t *testing.T, tab *cluster.VttabletProcess, k // The JSON value looks like this: {"product.0.cproduct.4": 6} vreplLagSeconds := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.%s\.*`, keyspace, workflow)).Int() require.NoError(t, err) - require.Greater(t, vreplLagSeconds, zero, "JSON value: %s", jsVal) + // Take off 1 second to deal with timing issues in the test. + minLagSecs := int64(int64(sleepTime.Seconds()) - 1) + require.GreaterOrEqual(t, vreplLagSeconds, minLagSecs, "JSON value: %s", jsVal) val, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSecondsMax"}) require.NoError(t, err) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 9e0d24bbc78..6bfee82b682 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -524,14 +524,12 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { sbm = -1 for i, events := range items { for j, event := range events { - // If the event has no timestamp OR is an Other/GTID event (which is only meant - // to move the position forward as non-data changing events occur) then do not - // update the lag. - // If the batch consists only of Other/GTID and Heartbeat events then we cannot - // calculate the lag -- as the vstreamer may be fully throttled -- and we will + // If the event has no timestamp OR is a heartbeat event then do not update + // the lag. + // If the batch consists only of heartbeat events then we cannot calculate + // the lag -- as the vstreamer may be fully throttled -- and we will // estimate it after processing the batch. - if event.Timestamp != 0 && - !(event.Type == binlogdatapb.VEventType_GTID || event.Type == binlogdatapb.VEventType_HEARTBEAT) { + if event.Timestamp != 0 && event.Type != binlogdatapb.VEventType_HEARTBEAT { vp.lastTimestampNs = event.Timestamp * 1e9 vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime sbm = event.CurrentTime/1e9 - event.Timestamp diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index fb9d0d27e68..9d69418cf5a 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -359,7 +359,6 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog go throttleEvents(throttledEvents) for { - // Drain event if timer fired before reset. resetHBTimer() select {