Skip to content

Commit

Permalink
CBG-2938: Ignore Cbgt EOF feed errors when intentionally stopped (#6235)
Browse files Browse the repository at this point in the history
* Use sgMgrEventHandlers context to add cancellation handling for EOF feed errors

* Move all importListener.Stop cbgtContext code into CbgtContext.Stop()
  • Loading branch information
bbrks authored May 12, 2023
1 parent f56fba3 commit b9a08ed
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 25 deletions.
50 changes: 38 additions & 12 deletions base/dcp_sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type CbgtContext struct {
Cfg cbgt.Cfg // Cfg manages storage of the current pindex set and node assignment
heartbeater Heartbeater // Heartbeater used for failed node detection
heartbeatListener *importHeartbeatListener // Listener subscribed to failed node alerts from heartbeater
eventHandlers *sgMgrEventHandlers // Event handler callbacks
ctx context.Context // Log context
dbName string // Database name
}

// StartShardedDCPFeed initializes and starts a CBGT Manager targeting the provided bucket.
Expand Down Expand Up @@ -316,7 +319,8 @@ func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG
// avoids file system usage, in conjunction with managerLoadDataDir=false in options.
dataDir := ""

eventHandlers := &sgMgrEventHandlers{ctx: ctx}
eventHandlersCtx, eventHandlersCancel := context.WithCancel(ctx)
eventHandlers := &sgMgrEventHandlers{ctx: eventHandlersCtx, ctxCancel: eventHandlersCancel}

// Specify one feed per pindex
options := make(map[string]string)
Expand Down Expand Up @@ -347,8 +351,11 @@ func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG
options)

cbgtContext := &CbgtContext{
Manager: mgr,
Cfg: cfgSG,
Manager: mgr,
Cfg: cfgSG,
eventHandlers: eventHandlers,
ctx: ctx,
dbName: dbName,
}

if spec.Auth != nil || (spec.Certpath != "" && spec.Keypath != "") {
Expand Down Expand Up @@ -439,13 +446,28 @@ func getMinNodeVersion(cfg cbgt.Cfg) (*ComparableVersion, error) {
return minVersion, nil
}

// StopHeartbeatListener unregisters the listener from the heartbeater, and stops it.
func (c *CbgtContext) StopHeartbeatListener() {
// Stop unregisters the listener from the heartbeater, and stops it and associated handlers.
func (c *CbgtContext) Stop() {
if c.eventHandlers != nil {
c.eventHandlers.ctxCancel()
}

if c.heartbeatListener != nil {
c.heartbeater.UnregisterListener(c.heartbeatListener.Name())
c.heartbeatListener.Stop()
}

// Close open PIndexes before stopping the manager.
_, pindexes := c.Manager.CurrentMaps()
for _, pIndex := range pindexes {
err := c.Manager.ClosePIndex(pIndex)
if err != nil {
DebugfCtx(c.ctx, KeyImport, "Error closing pindex: %v", err)
}
}
// ClosePIndex calls are synchronous, so can stop manager once they've completed
c.Manager.Stop()
c.RemoveFeedCredentials(c.dbName)
}

func (c *CbgtContext) RemoveFeedCredentials(dbName string) {
Expand Down Expand Up @@ -696,7 +718,8 @@ func GetDefaultImportPartitions(serverless bool) uint16 {
}

type sgMgrEventHandlers struct {
ctx context.Context
ctx context.Context
ctxCancel context.CancelFunc
}

func (meh *sgMgrEventHandlers) OnRefreshManagerOptions(options map[string]string) {
Expand All @@ -715,20 +738,20 @@ func (meh *sgMgrEventHandlers) OnUnregisterPIndex(pindex *cbgt.PIndex) {
// Handling below based on cbft implementation - checks whether the underlying source (bucket)
// still exists with VerifySourceNotExists, and if it exists, calls NotifyMgrOnClose.
// This will trigger cbgt closing and then attempting to reconnect to the feed.
func (meh *sgMgrEventHandlers) OnFeedError(srcType string, r cbgt.Feed, err error) {
func (meh *sgMgrEventHandlers) OnFeedError(srcType string, r cbgt.Feed, feedErr error) {

DebugfCtx(meh.ctx, KeyDCP, "cbgt Mgr OnFeedError, srcType: %s, feed name: %s, err: %v",
srcType, r.Name(), err)
srcType, r.Name(), feedErr)

dcpFeed, ok := r.(cbgt.FeedEx)
if !ok {
return
}

gone, indexUUID, er := dcpFeed.VerifySourceNotExists()
gone, indexUUID, err := dcpFeed.VerifySourceNotExists()
DebugfCtx(meh.ctx, KeyDCP, "cbgt Mgr OnFeedError, VerifySourceNotExists,"+
" srcType: %s, gone: %t, indexUUID: %s, err: %v",
srcType, gone, indexUUID, er)
srcType, gone, indexUUID, err)
if !gone {
// If we get an EOF error from the feeds and the bucket is still alive,
// then there could at the least two potential error scenarios.
Expand All @@ -741,8 +764,11 @@ func (meh *sgMgrEventHandlers) OnFeedError(srcType string, r cbgt.Feed, err erro
// the connectivity problems either during the next rebalance
// (new kv node after failover-recovery rebalance) or
// on the next janitor work cycle(ephemeral network issue to the same node).
if strings.Contains(err.Error(), "EOF") {
InfofCtx(meh.ctx, KeyDCP, "Handling EOF on cbgt feed - notifying manager to trigger reconnection to feed. indexUUID: %v, err: %v", indexUUID, err)
if strings.Contains(feedErr.Error(), "EOF") {
// If this wasn't an intentional close, log about the EOF
if meh.ctx.Err() != context.Canceled {
InfofCtx(meh.ctx, KeyDCP, "Handling EOF on cbgt feed - notifying manager to trigger reconnection to feed. indexUUID: %v, err: %v", indexUUID, feedErr)
}
dcpFeed.NotifyMgrOnClose()
}
}
Expand Down
14 changes: 1 addition & 13 deletions db/import_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,7 @@ func (il *importListener) ImportFeedEvent(event sgbucket.FeedEvent) {
func (il *importListener) Stop() {
if il != nil {
if il.cbgtContext != nil {
il.cbgtContext.StopHeartbeatListener()

// Close open PIndexes before stopping the manager.
_, pindexes := il.cbgtContext.Manager.CurrentMaps()
for _, pIndex := range pindexes {
err := il.cbgtContext.Manager.ClosePIndex(pIndex)
if err != nil {
base.DebugfCtx(il.loggingCtx, base.KeyImport, "Error closing pindex: %v", err)
}
}
// ClosePIndex calls are synchronous, so can stop manager once they've completed
il.cbgtContext.Manager.Stop()
il.cbgtContext.RemoveFeedCredentials(il.dbName)
il.cbgtContext.Stop()

// Remove entry from global listener directory
base.RemoveDestFactory(il.importDestKey)
Expand Down

0 comments on commit b9a08ed

Please sign in to comment.