Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport 14693 - Use GetTabletsByCell in healthcheck #514

Merged
merged 6 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ limitations under the License.
// Alternatively, use a Watcher implementation which will constantly watch
// a source (e.g. the topology) and add and remove tablets as they are
// added or removed from the source.
// For a Watcher example have a look at NewCellTabletsWatcher().
// For a Watcher example have a look at NewTopologyWatcher().
//
// Internally, the HealthCheck module is connected to each tablet and has a
// streaming RPC (StreamHealth) open to receive periodic health infos.
Expand Down Expand Up @@ -92,7 +92,7 @@ var (
refreshKnownTablets = true

// topoReadConcurrency tells us how many topo reads are allowed in parallel.
topoReadConcurrency = 32
topoReadConcurrency int64 = 32

// healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000.
healthCheckDialConcurrency int64 = 1024
Expand Down Expand Up @@ -178,7 +178,7 @@ func registerWebUIFlags(fs *pflag.FlagSet) {
fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.")
fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.")
fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.")
fs.IntVar(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.")
fs.Int64Var(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.")
fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.")
ParseTabletURLTemplateFromFlag()
}
Expand Down Expand Up @@ -365,6 +365,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
cellAliases: make(map[string]string),
}
var topoWatchers []*TopologyWatcher
var filter TabletFilter
cells := strings.Split(cellsToWatch, ",")
if cellsToWatch == "" {
cells = append(cells, localCell)
Expand All @@ -375,7 +376,19 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
if c == "" {
continue
}
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
if len(tabletFilters) > 0 {
if len(KeyspacesToWatch) > 0 {
log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time")
}
fbs, err := NewFilterByShard(tabletFilters)
if err != nil {
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
filter = fbs
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
2 changes: 1 addition & 1 deletion go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn

shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases)
tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases, nil)
if err != nil {
log.Warningf("Error fetching tablets from topo: %v", err)
// If we get a partial result we can still use it, otherwise return.
Expand Down
81 changes: 25 additions & 56 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ type TopologyWatcher struct {
cell string
refreshInterval time.Duration
refreshKnownTablets bool
getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)
sem chan int
concurrency int64
ctx context.Context
cancelFunc context.CancelFunc
// wg keeps track of all launched Go routines.
Expand All @@ -93,34 +92,28 @@ type TopologyWatcher struct {
}

// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets that it is configured to watch, and reloads them periodically if needed.
// As of now there is only one implementation: watch all tablets in a cell.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher {
// the tablets in a cell, and reloads them as needed.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int64) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
healthcheck: hc,
tabletFilter: filter,
tabletFilter: f,
cell: cell,
refreshInterval: refreshInterval,
refreshKnownTablets: refreshKnownTablets,
getTablets: getTablets,
sem: make(chan int, topoReadConcurrency),
concurrency: topoReadConcurrency,
tablets: make(map[string]*tabletInfo),
}
tw.firstLoadChan = make(chan struct{})

// We want the span from the context, but not the cancelation that comes with it
// We want the span from the context, but not the cancellation that comes with it
spanContext := trace.CopySpan(context.Background(), ctx)
tw.ctx, tw.cancelFunc = context.WithCancel(spanContext)
return tw
}

// NewCellTabletsWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and reloads them as needed.
func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(ctx, topoServer, hc, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) {
return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell)
})
func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) {
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: tw.concurrency})
}

// Start starts the topology watcher.
Expand Down Expand Up @@ -149,30 +142,33 @@ func (tw *TopologyWatcher) Stop() {
}

func (tw *TopologyWatcher) loadTablets() {
var wg sync.WaitGroup
newTablets := make(map[string]*tabletInfo)

// First get the list of relevant tabletAliases.
tabletAliases, err := tw.getTablets(tw)
// First get the list of all tablets.
tabletInfos, err := tw.getTablets()
topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1)
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpListTablets, 1)
select {
case <-tw.ctx.Done():
// If we get a partial result error, we just log it and process the tablets that we did manage to fetch.
if topo.IsErrType(err, topo.PartialResult) {
log.Errorf("received partial result from getTablets for cell %v: %v", tw.cell, err)
} else { // For all other errors, just return.
log.Errorf("error getting tablets for cell: %v: %v", tw.cell, err)
return
default:
}
log.Errorf("cannot get tablets for cell: %v: %v", tw.cell, err)
return
}

// Accumulate a list of all known alias strings to use later
// when sorting.
tabletAliasStrs := make([]string, 0, len(tabletAliases))
tabletAliasStrs := make([]string, 0, len(tabletInfos))

tw.mu.Lock()
for _, tAlias := range tabletAliases {
aliasStr := topoproto.TabletAliasString(tAlias)
defer tw.mu.Unlock()

for _, tInfo := range tabletInfos {
aliasStr := topoproto.TabletAliasString(tInfo.Alias)
tabletAliasStrs = append(tabletAliasStrs, aliasStr)

if !tw.refreshKnownTablets {
Expand All @@ -182,38 +178,13 @@ func (tw *TopologyWatcher) loadTablets() {
continue
}
}

wg.Add(1)
go func(alias *topodata.TabletAlias) {
defer wg.Done()
tw.sem <- 1 // Wait for active queue to drain.
tablet, err := tw.topoServer.GetTablet(tw.ctx, alias)
topologyWatcherOperations.Add(topologyWatcherOpGetTablet, 1)
<-tw.sem // Done; enable next request to run.
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpGetTablet, 1)
select {
case <-tw.ctx.Done():
return
default:
}
log.Errorf("cannot get tablet for alias %v: %v", alias, err)
return
}
tw.mu.Lock()
aliasStr := topoproto.TabletAliasString(alias)
newTablets[aliasStr] = &tabletInfo{
alias: aliasStr,
tablet: tablet.Tablet,
}
tw.mu.Unlock()
}(tAlias)
// There's no network call here, so we just do the tablets one at a time instead of in parallel goroutines.
newTablets[aliasStr] = &tabletInfo{
alias: aliasStr,
tablet: tInfo.Tablet,
}
}

tw.mu.Unlock()
wg.Wait()
tw.mu.Lock()

for alias, newVal := range newTablets {
if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(newVal.tablet) {
continue
Expand Down Expand Up @@ -266,8 +237,6 @@ func (tw *TopologyWatcher) loadTablets() {
tw.topoChecksum = crc32.ChecksumIEEE(buf.Bytes())
tw.lastRefresh = time.Now()

tw.mu.Unlock()

}

// RefreshLag returns the time since the last refresh.
Expand Down
Loading
Loading