Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Errant GTID Counts metric in VTOrc #16829

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/21.0/21.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- **[VTGate Tablet Balancer](#tablet-balancer)**
- **[Query Timeout Override](#query-timeout)**
- **[Dynamic VReplication Configuration](#dynamic-vreplication-configuration)**
- **[Errant GTIDs Count Metric](#errant-gtid-metric)**

## <a id="major-changes"/>Major Changes

Expand Down Expand Up @@ -137,3 +138,6 @@ Currently many of the configuration options for VReplication Workflows are vttab
requires restarts of vttablets. We now allow these to be overridden while creating a workflow or dynamically once
the workflow is in progress. See https://github.com/vitessio/vitess/pull/16583 for details.

### <a id="errant-gtid-metric"/>Errant GTIDs Count Metric
A new metric called `ErrantGTIDCounts` has been added to the `VTOrc` component.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A count[er] stat is an ever increasing metric that indicates the number of times X has occurred. So I would assume this is the number of times it's had an errant GTID. I would suggest calling this ~ CurrentErrantGTIDCount (which is a gauge vs a counter).

This metric shows the count of the errant GTIDs in the tablets.
15 changes: 15 additions & 0 deletions go/mysql/replication/mysql56_gtid_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,3 +688,18 @@ func Subtract(lhs, rhs string) (string, error) {
diffSet := lhsSet.Difference(rhsSet)
return diffSet.String(), nil
}

// GTIDCount returns the number of GTIDs in a GTID set.
func GTIDCount(gtidStr string) (int64, error) {
gtidSet, err := ParseMysql56GTIDSet(gtidStr)
if err != nil {
return 0, err
}
var count int64
for _, intervals := range gtidSet {
for _, intvl := range intervals {
count = count + intvl.end - intvl.start + 1
}
}
return count, nil
}
46 changes: 46 additions & 0 deletions go/mysql/replication/mysql56_gtid_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,3 +704,49 @@ func BenchmarkMySQL56GTIDParsing(b *testing.B) {
}
}
}

func TestGTIDCount(t *testing.T) {
tests := []struct {
name string
gtidStr string
wantCount int64
wantErr string
}{
{
name: "Empty GTID String",
gtidStr: "",
wantCount: 0,
}, {
name: "Single GTID",
gtidStr: "00010203-0405-0607-0809-0a0b0c0d0e0f:12",
wantCount: 1,
}, {
name: "Single GTID Interval",
gtidStr: "00010203-0405-0607-0809-0a0b0c0d0e0f:1-5",
wantCount: 5,
}, {
name: "Single UUID",
gtidStr: "00010203-0405-0607-0809-0a0b0c0d0e0f:1-5:11-20",
wantCount: 15,
}, {
name: "Multiple UUIDs",
gtidStr: "00010203-0405-0607-0809-0a0b0c0d0e0f:1-5:10-20,00010203-0405-0607-0809-0a0b0c0d0eff:1-5:50",
wantCount: 22,
}, {
name: "Parsing error",
gtidStr: "incorrect set",
wantErr: "invalid MySQL 5.6 GTID set",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
count, err := GTIDCount(tt.gtidStr)
require.EqualValues(t, tt.wantCount, count)
if tt.wantErr != "" {
require.ErrorContains(t, err, tt.wantErr)
} else {
require.NoError(t, err)
}
})
}
}
15 changes: 13 additions & 2 deletions go/test/endtoend/vtorc/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,13 @@ func TestAPIEndpoints(t *testing.T) {
assert.Equal(t, "Filtering by shard without keyspace isn't supported\n", resp)

// Also verify that the metric for errant GTIDs is reporting the correct count.
waitForErrantGTIDCount(t, vtorc, 1)
waitForErrantGTIDTabletCount(t, vtorc, 1)
// Now we check the errant GTID count for the tablet
verifyErrantGTIDCount(t, vtorc, replica.Alias, 1)
})
}

func waitForErrantGTIDCount(t *testing.T, vtorc *cluster.VTOrcProcess, errantGTIDCountWanted int) {
func waitForErrantGTIDTabletCount(t *testing.T, vtorc *cluster.VTOrcProcess, errantGTIDCountWanted int) {
timeout := time.After(15 * time.Second)
for {
select {
Expand All @@ -293,3 +295,12 @@ func waitForErrantGTIDCount(t *testing.T, vtorc *cluster.VTOrcProcess, errantGTI
}
}
}

func verifyErrantGTIDCount(t *testing.T, vtorc *cluster.VTOrcProcess, tabletAlias string, countWanted int) {
vars := vtorc.GetVars()
errantGTIDCounts := vars["ErrantGTIDCounts"].(map[string]interface{})
gtidCountVal, isPresent := errantGTIDCounts[tabletAlias]
require.True(t, isPresent, "Tablet %s not found in errant GTID counts", tabletAlias)
gtidCount := utils.GetIntFromValue(gtidCountVal)
require.EqualValues(t, countWanted, gtidCount, "Tablet %s has %d errant GTIDs, wanted %d", tabletAlias, gtidCount, countWanted)
}
20 changes: 10 additions & 10 deletions go/test/endtoend/vtorc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,15 +998,15 @@ func WaitForSuccessfulRecoveryCount(t *testing.T, vtorcInstance *cluster.VTOrcPr
for time.Since(startTime) < timeout {
vars := vtorcInstance.GetVars()
successfulRecoveriesMap := vars["SuccessfulRecoveries"].(map[string]interface{})
successCount := getIntFromValue(successfulRecoveriesMap[recoveryName])
successCount := GetIntFromValue(successfulRecoveriesMap[recoveryName])
if successCount == countExpected {
return
}
time.Sleep(time.Second)
}
vars := vtorcInstance.GetVars()
successfulRecoveriesMap := vars["SuccessfulRecoveries"].(map[string]interface{})
successCount := getIntFromValue(successfulRecoveriesMap[recoveryName])
successCount := GetIntFromValue(successfulRecoveriesMap[recoveryName])
assert.EqualValues(t, countExpected, successCount)
}

Expand All @@ -1019,15 +1019,15 @@ func WaitForSuccessfulPRSCount(t *testing.T, vtorcInstance *cluster.VTOrcProcess
for time.Since(startTime) < timeout {
vars := vtorcInstance.GetVars()
prsCountsMap := vars["PlannedReparentCounts"].(map[string]interface{})
successCount := getIntFromValue(prsCountsMap[mapKey])
successCount := GetIntFromValue(prsCountsMap[mapKey])
if successCount == countExpected {
return
}
time.Sleep(time.Second)
}
vars := vtorcInstance.GetVars()
prsCountsMap := vars["PlannedReparentCounts"].(map[string]interface{})
successCount := getIntFromValue(prsCountsMap[mapKey])
successCount := GetIntFromValue(prsCountsMap[mapKey])
assert.EqualValues(t, countExpected, successCount)
}

Expand All @@ -1040,15 +1040,15 @@ func WaitForSuccessfulERSCount(t *testing.T, vtorcInstance *cluster.VTOrcProcess
for time.Since(startTime) < timeout {
vars := vtorcInstance.GetVars()
ersCountsMap := vars["EmergencyReparentCounts"].(map[string]interface{})
successCount := getIntFromValue(ersCountsMap[mapKey])
successCount := GetIntFromValue(ersCountsMap[mapKey])
if successCount == countExpected {
return
}
time.Sleep(time.Second)
}
vars := vtorcInstance.GetVars()
ersCountsMap := vars["EmergencyReparentCounts"].(map[string]interface{})
successCount := getIntFromValue(ersCountsMap[mapKey])
successCount := GetIntFromValue(ersCountsMap[mapKey])
assert.EqualValues(t, countExpected, successCount)
}

Expand All @@ -1067,10 +1067,10 @@ func CheckMetricExists(t *testing.T, vtorcInstance *cluster.VTOrcProcess, metric
assert.Contains(t, metrics, metricName)
}

// getIntFromValue is a helper function to get an integer from the given value.
// GetIntFromValue is a helper function to get an integer from the given value.
// If it is convertible to a float, then we round the number to the nearest integer.
// If the value is not numeric at all, we return 0.
func getIntFromValue(val any) int {
func GetIntFromValue(val any) int {
value := reflect.ValueOf(val)
if value.CanFloat() {
return int(math.Round(value.Float()))
Expand All @@ -1091,7 +1091,7 @@ func WaitForDetectedProblems(t *testing.T, vtorcInstance *cluster.VTOrcProcess,
for time.Since(startTime) < timeout {
vars := vtorcInstance.GetVars()
problems := vars["DetectedProblems"].(map[string]interface{})
actual := getIntFromValue(problems[key])
actual := GetIntFromValue(problems[key])
if actual == expect {
return
}
Expand All @@ -1101,7 +1101,7 @@ func WaitForDetectedProblems(t *testing.T, vtorcInstance *cluster.VTOrcProcess,
vars := vtorcInstance.GetVars()
problems := vars["DetectedProblems"].(map[string]interface{})
actual, ok := problems[key]
actual = getIntFromValue(actual)
actual = GetIntFromValue(actual)

assert.True(t, ok,
"The metric DetectedProblems[%s] should exist but does not (all problems: %+v)",
Expand Down
9 changes: 9 additions & 0 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var forgetAliases *cache.Cache
var (
readTopologyInstanceCounter = stats.NewCounter("InstanceReadTopology", "Number of times an instance was read from the topology")
readInstanceCounter = stats.NewCounter("InstanceRead", "Number of times an instance was read")
errantGTIDCounts = stats.NewGaugesWithSingleLabel("ErrantGTIDCounts", "Number of errant GTIDs in a vttablet", "TabletAlias")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
errantGTIDCounts = stats.NewGaugesWithSingleLabel("ErrantGTIDCounts", "Number of errant GTIDs in a vttablet", "TabletAlias")
errantGTIDCounts = stats.NewGaugesWithSingleLabel("ErrantGTIDCounts", "Number of errant GTIDs a vttablet currently has", "TabletAlias")

backendWrites = collection.CreateOrReturnCollection("BACKEND_WRITES")
writeBufferLatency = stopwatch.NewNamedStopwatch()
)
Expand Down Expand Up @@ -378,6 +379,11 @@ Cleanup:
redactedPrimaryExecutedGtidSet.RemoveUUID(instance.SourceUUID)

instance.GtidErrant, err = replication.Subtract(redactedExecutedGtidSet.String(), redactedPrimaryExecutedGtidSet.String())
if err == nil {
var gtidCount int64
gtidCount, err = replication.GTIDCount(instance.GtidErrant)
errantGTIDCounts.Set(tabletAlias, gtidCount)
}
}
}
}
Expand Down Expand Up @@ -1036,6 +1042,9 @@ func ForgetInstance(tabletAlias string) error {
forgetAliases.Set(tabletAlias, true, cache.DefaultExpiration)
log.Infof("Forgetting: %v", tabletAlias)

// Remove this tablet from errant GTID count metric.
errantGTIDCounts.Reset(tabletAlias)

// Delete from the 'vitess_tablet' table.
_, err := db.ExecVTOrc(`
delete
Expand Down
Loading