Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Estimate lag when workflow fully throttled #16577

Merged
merged 12 commits into from
Aug 15, 2024
71 changes: 52 additions & 19 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var (
targetKsOpts = make(map[string]string)
httpClient = throttlebase.SetupHTTPClient(time.Second)
sourceThrottlerAppName = throttlerapp.VStreamerName
targetThrottlerAppName = throttlerapp.VReplicationName
targetThrottlerAppName = throttlerapp.VPlayerName
)

const (
Expand Down Expand Up @@ -1228,18 +1228,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
for _, tab := range customerTablets {
waitForRowCountInTablet(t, tab, keyspace, workflow, 5)
// Confirm that we updated the stats on the target tablets as expected.
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2}
vstreamerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vstreamer`, workflow)).Int()
require.Greater(t, vstreamerThrottledCount, int64(0))
// We only need to do this stat check once.
val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
require.NoError(t, err)
throttledCount, err := strconv.ParseInt(val, 10, 64)
require.NoError(t, err)
require.GreaterOrEqual(t, throttledCount, vstreamerThrottledCount)
confirmVReplicationThrottling(t, tab, sourceKs, workflow, sourceThrottlerAppName)
}
})
t.Run("unthrottle-app-product", func(t *testing.T) {
Expand Down Expand Up @@ -1274,12 +1263,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
for _, tab := range customerTablets {
waitForRowCountInTablet(t, tab, keyspace, workflow, 8)
// Confirm that we updated the stats on the target tablets as expected.
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON value now looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
vplayerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vplayer`, workflow)).Int()
require.Greater(t, vplayerThrottledCount, int64(0))
confirmVReplicationThrottling(t, tab, sourceKs, workflow, targetThrottlerAppName)
}
})
t.Run("unthrottle-app-customer", func(t *testing.T) {
Expand Down Expand Up @@ -1709,3 +1693,52 @@ func waitForInnoDBHistoryLength(t *testing.T, tablet *cluster.VttabletProcess, e
func releaseInnoDBRowHistory(t *testing.T, dbConn *mysql.Conn) {
execQuery(t, dbConn, "rollback")
}

// confirmVReplicationThrottling confirms that the throttling related metrics reflect that
// the workflow is being throttled as expected, via the expected app name, and that this
// is impacting the lag as expected.
// The tablet passed should be a target tablet for the given workflow while the keyspace
// name provided should be the source keyspace as the target tablet stats note the stream's
// source keyspace and shard.
func confirmVReplicationThrottling(t *testing.T, tab *cluster.VttabletProcess, keyspace, workflow string, appname throttlerapp.Name) {
const (
sleepTime = 5 * time.Second
zv = int64(0)
)
time.Sleep(sleepTime) // To be sure that we accrue some lag

jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
throttledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.%s`, workflow, appname)).Int()
require.Greater(t, throttledCount, zv, "JSON value: %s", jsVal)

val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
require.NoError(t, err)
require.NotEqual(t, "", val)
throttledCountTotal, err := strconv.ParseInt(val, 10, 64)
require.NoError(t, err)
require.GreaterOrEqual(t, throttledCountTotal, throttledCount, "Value: %s", val)

// We do not calculate replication lag for the vcopier as it's not replicating
// events.
if appname != throttlerapp.VCopierName {
jsVal, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSeconds"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON value looks like this: {"product.0.cproduct.4": 6}
vreplLagSeconds := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.%s\.*`, keyspace, workflow)).Int()
require.NoError(t, err)
// Take off 1 second to deal with timing issues in the test.
minLagSecs := int64(int64(sleepTime.Seconds()) - 1)
require.GreaterOrEqual(t, vreplLagSeconds, minLagSecs, "JSON value: %s", jsVal)

val, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSecondsMax"})
require.NoError(t, err)
require.NotEqual(t, "", val)
vreplLagSecondsMax, err := strconv.ParseInt(val, 10, 64)
require.NoError(t, err)
require.GreaterOrEqual(t, vreplLagSecondsMax, vreplLagSeconds, "Value: %s", val)
}
}
34 changes: 22 additions & 12 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,33 +476,34 @@ func (vp *vplayer) recordHeartbeat() error {
func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
defer vp.vr.dbClient.Rollback()

estimateLag := func() {
behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs
vp.vr.stats.ReplicationLagSeconds.Store(behind / 1e9)
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(behind/1e9)*time.Second)
}

// If we're not running, set ReplicationLagSeconds to be very high.
// TODO(sougou): if we also stored the time of the last event, we
// can estimate this value more accurately.
defer vp.vr.stats.ReplicationLagSeconds.Store(math.MaxInt64)
defer vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), math.MaxInt64)
var sbm int64 = -1
var sbm int64
mattlord marked this conversation as resolved.
Show resolved Hide resolved
for {
if ctx.Err() != nil {
return ctx.Err()
}
// Check throttler.
if checkResult, ok := vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)); !ok {
_ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName, checkResult.Summary())
estimateLag()
continue
}

items, err := relay.Fetch()
if err != nil {
return err
}
// No events were received. This likely means that there's a network partition.
// So, we should assume we're falling behind.
if len(items) == 0 {
behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs
vp.vr.stats.ReplicationLagSeconds.Store(behind / 1e9)
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(behind/1e9)*time.Second)
}

// Empty transactions are saved at most once every idleTimeout.
// This covers two situations:
// 1. Fetch was idle for idleTimeout.
Expand All @@ -520,12 +521,20 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
}
}

sbm = -1
for i, events := range items {
for j, event := range events {
if event.Timestamp != 0 {
vp.lastTimestampNs = event.Timestamp * 1e9
vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime
sbm = event.CurrentTime/1e9 - event.Timestamp
// If the event is a heartbeat sent while throttled then do not update
// the lag based on it.
// If the batch consists only of throttled heartbeat events then we cannot
// determine the actual lag, as the vstreamer is fully throttled, and we
// will estimate it after processing the batch.
if !(event.Type == binlogdatapb.VEventType_HEARTBEAT && event.Throttled) {
vp.lastTimestampNs = event.Timestamp * 1e9
vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime
sbm = event.CurrentTime/1e9 - event.Timestamp
}
}
mustSave := false
switch event.Type {
Expand Down Expand Up @@ -569,8 +578,9 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
if sbm >= 0 {
vp.vr.stats.ReplicationLagSeconds.Store(sbm)
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(sbm)*time.Second)
} else { // We couldn't determine the lag, so we need to estimate it
estimateLag()
}

}
}

Expand Down
24 changes: 11 additions & 13 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/binlog"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -288,11 +287,11 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
defer hbTimer.Stop()

injectHeartbeat := func(throttled bool, throttledReason string) error {
now := time.Now().UnixNano()
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
default:
now := time.Now().UnixNano()
err := bufferAndTransmit(&binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_HEARTBEAT,
Timestamp: now / 1e9,
Expand All @@ -305,24 +304,22 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}

logger := logutil.NewThrottledLogger(vs.vse.GetTabletInfo(), throttledLoggerInterval)
wfNameLog := ""
if vs.filter != nil && vs.filter.WorkflowName != "" {
wfNameLog = fmt.Sprintf(" in workflow %s", vs.filter.WorkflowName)
}
throttleEvents := func(throttledEvents chan mysql.BinlogEvent) {
throttledHeartbeatsRateLimiter := timer.NewRateLimiter(HeartbeatTime)
defer throttledHeartbeatsRateLimiter.Stop()
for {
// check throttler.
// Check throttler.
if checkResult, ok := vs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vs.throttlerApp); !ok {
// make sure to leave if context is cancelled
// Make sure to leave if context is cancelled.
select {
case <-ctx.Done():
return
default:
// do nothing special
// Do nothing special.
}
throttledHeartbeatsRateLimiter.Do(func() error {
return injectHeartbeat(true, checkResult.Summary())
})
Comment on lines -321 to -323
Copy link
Contributor Author

@mattlord mattlord Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that we were getting double the heartbeats as we injected them here and via the timer. I consolidated them into the main loop with the timer. Otherwise we needed to do twice the work as the other heartbeats would get interspersed with these and those would indicate that we weren't throttled when in fact we were — so we needed to check the throttler there as well and reset the heartbeat timer in injectHeartbeat() to limit the extra heartbeats and even then some extras still went through as it was a race.

// we won't process events, until we're no longer throttling
logger.Infof("throttled.")
logger.Infof("vstreamer throttled%s: %s.", wfNameLog, checkResult.Summary())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this spam the logs when throttled?

Copy link
Contributor Author

@mattlord mattlord Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a throttled logger that logs the message at most once every 5 minutes. It was added in #14936. I thought about removing these altogether as the value was not clear to me, but instead I at least made this one much more useful IMO as it would state the component and the workflow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK cool!

continue
}
select {
Expand Down Expand Up @@ -394,7 +391,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
case <-ctx.Done():
return nil
case <-hbTimer.C:
if err := injectHeartbeat(false, ""); err != nil {
checkResult, ok := vs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vs.throttlerApp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain what this throttler check is for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was added as part of consolidating the two heartbeat injectors: #16577 (comment)

We had two goroutines doing heartbeat injections. One that was sent only when we were throttled (removed here), and this one that would be sent both when we were throttled AND when we were not. This one would always say that we weren't throttled — even when in fact we were — so it prevented us from being able to estimate the lag in the vplayer when the vstreamer was fully throttled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case I encourage you to use vs.vse.throttlerClient.ThrottleCheckOK as opposed to vs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName. You're not interested in the extra sleep.

This makes sense now, thank you for clarifying.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Thank you! 🙇 That makes total sense — no need for the sleep here as it's not done in a tight loop. I should have at least checked the function comments. 🙂 I've made this change.

if err := injectHeartbeat(!ok, checkResult.Summary()); err != nil {
if err == io.EOF {
return nil
}
Expand Down
Loading