Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-2938: Ignore Cbgt EOF feed errors when intentionally stopped #6235

Merged
merged 2 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions base/dcp_sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type CbgtContext struct {
Cfg cbgt.Cfg // Cfg manages storage of the current pindex set and node assignment
heartbeater Heartbeater // Heartbeater used for failed node detection
heartbeatListener *importHeartbeatListener // Listener subscribed to failed node alerts from heartbeater
eventHandlers *sgMgrEventHandlers // Event handler callbacks
}

// StartShardedDCPFeed initializes and starts a CBGT Manager targeting the provided bucket.
Expand Down Expand Up @@ -316,7 +317,8 @@ func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG
// avoids file system usage, in conjunction with managerLoadDataDir=false in options.
dataDir := ""

eventHandlers := &sgMgrEventHandlers{ctx: ctx}
eventHandlersCtx, eventHandlersCancel := context.WithCancel(ctx)
eventHandlers := &sgMgrEventHandlers{ctx: eventHandlersCtx, ctxCancel: eventHandlersCancel}

// Specify one feed per pindex
options := make(map[string]string)
Expand Down Expand Up @@ -347,8 +349,9 @@ func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG
options)

cbgtContext := &CbgtContext{
Manager: mgr,
Cfg: cfgSG,
Manager: mgr,
Cfg: cfgSG,
eventHandlers: eventHandlers,
}

if spec.Auth != nil || (spec.Certpath != "" && spec.Keypath != "") {
Expand Down Expand Up @@ -439,8 +442,11 @@ func getMinNodeVersion(cfg cbgt.Cfg) (*ComparableVersion, error) {
return minVersion, nil
}

// StopHeartbeatListener unregisters the listener from the heartbeater, and stops it.
func (c *CbgtContext) StopHeartbeatListener() {
// Stop unregisters the listener from the heartbeater, and stops it and associated handlers.
func (c *CbgtContext) Stop() {
bbrks marked this conversation as resolved.
Show resolved Hide resolved
if c.eventHandlers != nil {
c.eventHandlers.ctxCancel()
}

if c.heartbeatListener != nil {
c.heartbeater.UnregisterListener(c.heartbeatListener.Name())
Expand Down Expand Up @@ -696,7 +702,8 @@ func GetDefaultImportPartitions(serverless bool) uint16 {
}

type sgMgrEventHandlers struct {
ctx context.Context
ctx context.Context
ctxCancel context.CancelFunc
}

func (meh *sgMgrEventHandlers) OnRefreshManagerOptions(options map[string]string) {
Expand All @@ -715,20 +722,20 @@ func (meh *sgMgrEventHandlers) OnUnregisterPIndex(pindex *cbgt.PIndex) {
// Handling below based on cbft implementation - checks whether the underlying source (bucket)
// still exists with VerifySourceNotExists, and if it exists, calls NotifyMgrOnClose.
// This will trigger cbgt closing and then attempting to reconnect to the feed.
func (meh *sgMgrEventHandlers) OnFeedError(srcType string, r cbgt.Feed, err error) {
func (meh *sgMgrEventHandlers) OnFeedError(srcType string, r cbgt.Feed, feedErr error) {

DebugfCtx(meh.ctx, KeyDCP, "cbgt Mgr OnFeedError, srcType: %s, feed name: %s, err: %v",
srcType, r.Name(), err)
srcType, r.Name(), feedErr)

dcpFeed, ok := r.(cbgt.FeedEx)
if !ok {
return
}

gone, indexUUID, er := dcpFeed.VerifySourceNotExists()
gone, indexUUID, err := dcpFeed.VerifySourceNotExists()
DebugfCtx(meh.ctx, KeyDCP, "cbgt Mgr OnFeedError, VerifySourceNotExists,"+
" srcType: %s, gone: %t, indexUUID: %s, err: %v",
srcType, gone, indexUUID, er)
srcType, gone, indexUUID, err)
if !gone {
// If we get an EOF error from the feeds and the bucket is still alive,
// then there could at the least two potential error scenarios.
Expand All @@ -741,8 +748,11 @@ func (meh *sgMgrEventHandlers) OnFeedError(srcType string, r cbgt.Feed, err erro
// the connectivity problems either during the next rebalance
// (new kv node after failover-recovery rebalance) or
// on the next janitor work cycle(ephemeral network issue to the same node).
if strings.Contains(err.Error(), "EOF") {
InfofCtx(meh.ctx, KeyDCP, "Handling EOF on cbgt feed - notifying manager to trigger reconnection to feed. indexUUID: %v, err: %v", indexUUID, err)
if strings.Contains(feedErr.Error(), "EOF") {
// If this wasn't an intentional close, log about the EOF
if meh.ctx.Err() != context.Canceled {
bbrks marked this conversation as resolved.
Show resolved Hide resolved
InfofCtx(meh.ctx, KeyDCP, "Handling EOF on cbgt feed - notifying manager to trigger reconnection to feed. indexUUID: %v, err: %v", indexUUID, feedErr)
}
dcpFeed.NotifyMgrOnClose()
}
}
Expand Down
2 changes: 1 addition & 1 deletion db/import_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (il *importListener) ImportFeedEvent(event sgbucket.FeedEvent) {
func (il *importListener) Stop() {
if il != nil {
if il.cbgtContext != nil {
il.cbgtContext.StopHeartbeatListener()
il.cbgtContext.Stop()
bbrks marked this conversation as resolved.
Show resolved Hide resolved

// Close open PIndexes before stopping the manager.
_, pindexes := il.cbgtContext.Manager.CurrentMaps()
Expand Down