Skip to content

Commit

Permalink
remove HTTP probing
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach committed Oct 4, 2023
1 parent 23a1872 commit 2d7b093
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 54 deletions.
4 changes: 1 addition & 3 deletions go/vt/vttablet/tabletserver/throttle/mysql/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ type Probe struct {
Alias string
MetricQuery string
Tablet *topodatapb.Tablet
TabletHost string
TabletPort int
CacheMillis int
QueryInProgress int64
}
Expand Down Expand Up @@ -46,5 +44,5 @@ func NewProbe() *Probe {

// String returns a human readable string of this struct
func (p *Probe) String() string {
return fmt.Sprintf("%s, tablet=%s:%d", p.Alias, p.TabletHost, p.TabletPort)
return fmt.Sprintf("probe alias=%s", p.Alias)
}
67 changes: 16 additions & 51 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ package throttle

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"math/rand"
"net/http"
Expand Down Expand Up @@ -713,7 +711,7 @@ func (throttler *Throttler) Operate(ctx context.Context) {
}()
}

func (throttler *Throttler) generateTabletHTTPProbeFunction(ctx context.Context, tmClient tmclient.TabletManagerClient, clusterName string, probe *mysql.Probe) (probeFunc func() *mysql.MySQLThrottleMetric) {
func (throttler *Throttler) generateTabletProbeFunction(ctx context.Context, tmClient tmclient.TabletManagerClient, clusterName string, probe *mysql.Probe) (probeFunc func() *mysql.MySQLThrottleMetric) {
return func() *mysql.MySQLThrottleMetric {
// Some reasonable timeout, to ensure we release connections even if they're hanging (otherwise grpc-go keeps polling those connections forever)
ctx, cancel := context.WithTimeout(ctx, 4*mysqlCollectInterval)
Expand All @@ -724,49 +722,22 @@ func (throttler *Throttler) generateTabletHTTPProbeFunction(ctx context.Context,
mySQLThrottleMetric.ClusterName = clusterName
mySQLThrottleMetric.Alias = probe.Alias

{
if probe.Tablet == nil {
mySQLThrottleMetric.Err = fmt.Errorf("found nil tablet reference for alias %v, hostname %v", probe.Alias, probe.Tablet.Hostname)
return mySQLThrottleMetric
}
req := &tabletmanagerdatapb.CheckThrottlerRequest{} // We leave AppName empty; it will default to VitessName anyway, and we can save some proto space
if resp, gRPCErr := tmClient.CheckThrottler(ctx, probe.Tablet, req); gRPCErr == nil {
mySQLThrottleMetric.Value = resp.Value
if resp.StatusCode == http.StatusInternalServerError {
mySQLThrottleMetric.Err = fmt.Errorf("Status code: %d", resp.StatusCode)
}
if resp.RecentlyChecked {
// We have just probed a tablet, and it reported back that someone just recently "check"ed it.
// We therefore renew the heartbeats lease.
go throttler.heartbeatWriter.RequestHeartbeats()
}
return mySQLThrottleMetric
}
}
// Backwards compatibility to v17: if the underlying tablets do not support CheckThrottler gRPC, attempt a HTTP cehck:
tabletCheckSelfURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", probe.TabletHost, probe.TabletPort, throttlerapp.VitessName)
resp, err := throttler.httpClient.Get(tabletCheckSelfURL)
if err != nil {
mySQLThrottleMetric.Err = err
if probe.Tablet == nil {
mySQLThrottleMetric.Err = fmt.Errorf("found nil tablet reference for alias %v, hostname %v", probe.Alias, probe.Tablet.Hostname)
return mySQLThrottleMetric
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
if err != nil {
mySQLThrottleMetric.Err = err
req := &tabletmanagerdatapb.CheckThrottlerRequest{} // We leave AppName empty; it will default to VitessName anyway, and we can save some proto space
resp, gRPCErr := tmClient.CheckThrottler(ctx, probe.Tablet, req)
if gRPCErr != nil {
resp.StatusCode = http.StatusInternalServerError
mySQLThrottleMetric.Err = fmt.Errorf("gRPC error accessing tablet %v. Err=%v", probe.Alias, gRPCErr)
return mySQLThrottleMetric
}
checkResult := &CheckResult{}
if err := json.Unmarshal(b, checkResult); err != nil {
mySQLThrottleMetric.Err = err
return mySQLThrottleMetric
}
mySQLThrottleMetric.Value = checkResult.Value

if checkResult.StatusCode == http.StatusInternalServerError {
mySQLThrottleMetric.Err = fmt.Errorf("Status code: %d", checkResult.StatusCode)
mySQLThrottleMetric.Value = resp.Value
if resp.StatusCode == http.StatusInternalServerError {
mySQLThrottleMetric.Err = fmt.Errorf("Status code: %d", resp.StatusCode)
}
if checkResult.RecentlyChecked {
if resp.RecentlyChecked {
// We have just probed a tablet, and it reported back that someone just recently "check"ed it.
// We therefore renew the heartbeats lease.
go throttler.heartbeatWriter.RequestHeartbeats()
Expand Down Expand Up @@ -797,7 +768,7 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context, tmClient tm
throttleMetricFunc = throttler.generateSelfMySQLThrottleMetricFunc(ctx, probe)
} else {
// Throttler probing other tablets:
throttleMetricFunc = throttler.generateTabletHTTPProbeFunction(ctx, tmClient, clusterName, probe)
throttleMetricFunc = throttler.generateTabletProbeFunction(ctx, tmClient, clusterName, probe)
}
throttleMetrics := mysql.ReadThrottleMetric(probe, clusterName, throttleMetricFunc)
throttler.mysqlThrottleMetricChan <- throttleMetrics
Expand All @@ -812,7 +783,7 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error {
// distribute the query/threshold from the throttler down to the cluster settings and from there to the probes
metricsQuery := throttler.GetMetricsQuery()
metricsThreshold := throttler.MetricsThreshold.Load()
addProbe := func(alias string, tablet *topodatapb.Tablet, tabletHost string, tabletPort int, clusterName string, clusterSettings *config.MySQLClusterConfigurationSettings, probes mysql.Probes) bool {
addProbe := func(alias string, tablet *topodatapb.Tablet, clusterName string, clusterSettings *config.MySQLClusterConfigurationSettings, probes mysql.Probes) bool {
for _, ignore := range clusterSettings.IgnoreHosts {
if strings.Contains(alias, ignore) {
log.Infof("Throttler: tablet ignored: %+v", alias)
Expand All @@ -824,10 +795,6 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error {
log.Errorf("Throttler: got empty alias for cluster: %+v", clusterName)
return false
}
if tabletHost == "" {
log.Errorf("Throttler: got empty host for tablet: %v in cluster: %+v", alias, clusterName)
return false
}
if tablet == nil {
log.Errorf("Throttler: got nil tablet for alias: %v in cluster: %+v", alias, clusterName)
return false
Expand All @@ -837,8 +804,6 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error {
probe := &mysql.Probe{
Alias: alias,
Tablet: tablet,
TabletHost: tabletHost,
TabletPort: tabletPort,
MetricQuery: clusterSettings.MetricQuery,
CacheMillis: clusterSettings.CacheMillis,
}
Expand All @@ -864,7 +829,7 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error {
if clusterName == selfStoreName {
// special case: just looking at this tablet's MySQL server.
// We will probe this "cluster" (of one server) is a special way.
addProbe("", nil, "", 0, clusterName, clusterSettings, clusterProbes.TabletProbes)
addProbe("", nil, clusterName, clusterSettings, clusterProbes.TabletProbes)
throttler.mysqlClusterProbesChan <- clusterProbes
return
}
Expand Down Expand Up @@ -894,7 +859,7 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error {
return err
}
if throttler.throttleTabletTypesMap[tablet.Type] {
addProbe(topoproto.TabletAliasString(tabletAlias), tablet.Tablet, tablet.Hostname, int(tablet.PortMap["vt"]), clusterName, clusterSettings, clusterProbes.TabletProbes)
addProbe(topoproto.TabletAliasString(tabletAlias), tablet.Tablet, clusterName, clusterSettings, clusterProbes.TabletProbes)
}
}
throttler.mysqlClusterProbesChan <- clusterProbes
Expand Down

0 comments on commit 2d7b093

Please sign in to comment.