diff --git a/base/dcp_sharded.go b/base/dcp_sharded.go index 7d5272a924..f124981d7f 100644 --- a/base/dcp_sharded.go +++ b/base/dcp_sharded.go @@ -52,6 +52,9 @@ type CbgtContext struct { Cfg cbgt.Cfg // Cfg manages storage of the current pindex set and node assignment heartbeater Heartbeater // Heartbeater used for failed node detection heartbeatListener *importHeartbeatListener // Listener subscribed to failed node alerts from heartbeater + eventHandlers *sgMgrEventHandlers // Event handler callbacks + ctx context.Context // Log context + dbName string // Database name } // StartShardedDCPFeed initializes and starts a CBGT Manager targeting the provided bucket. @@ -316,7 +319,8 @@ func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG // avoids file system usage, in conjunction with managerLoadDataDir=false in options. dataDir := "" - eventHandlers := &sgMgrEventHandlers{ctx: ctx} + eventHandlersCtx, eventHandlersCancel := context.WithCancel(ctx) + eventHandlers := &sgMgrEventHandlers{ctx: eventHandlersCtx, ctxCancel: eventHandlersCancel} // Specify one feed per pindex options := make(map[string]string) @@ -347,8 +351,11 @@ func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG options) cbgtContext := &CbgtContext{ - Manager: mgr, - Cfg: cfgSG, + Manager: mgr, + Cfg: cfgSG, + eventHandlers: eventHandlers, + ctx: ctx, + dbName: dbName, } if spec.Auth != nil || (spec.Certpath != "" && spec.Keypath != "") { @@ -439,13 +446,28 @@ func getMinNodeVersion(cfg cbgt.Cfg) (*ComparableVersion, error) { return minVersion, nil } -// StopHeartbeatListener unregisters the listener from the heartbeater, and stops it. -func (c *CbgtContext) StopHeartbeatListener() { +// Stop unregisters the listener from the heartbeater, and stops it and associated handlers. +func (c *CbgtContext) Stop() { + if c.eventHandlers != nil { + c.eventHandlers.ctxCancel() + } if c.heartbeatListener != nil { c.heartbeater.UnregisterListener(c.heartbeatListener.Name()) c.heartbeatListener.Stop() } + + // Close open PIndexes before stopping the manager. + _, pindexes := c.Manager.CurrentMaps() + for _, pIndex := range pindexes { + err := c.Manager.ClosePIndex(pIndex) + if err != nil { + DebugfCtx(c.ctx, KeyImport, "Error closing pindex: %v", err) + } + } + // ClosePIndex calls are synchronous, so can stop manager once they've completed + c.Manager.Stop() + c.RemoveFeedCredentials(c.dbName) } func (c *CbgtContext) RemoveFeedCredentials(dbName string) { @@ -696,7 +718,8 @@ func GetDefaultImportPartitions(serverless bool) uint16 { } type sgMgrEventHandlers struct { - ctx context.Context + ctx context.Context + ctxCancel context.CancelFunc } func (meh *sgMgrEventHandlers) OnRefreshManagerOptions(options map[string]string) { @@ -715,20 +738,20 @@ func (meh *sgMgrEventHandlers) OnUnregisterPIndex(pindex *cbgt.PIndex) { // Handling below based on cbft implementation - checks whether the underlying source (bucket) // still exists with VerifySourceNotExists, and if it exists, calls NotifyMgrOnClose. // This will trigger cbgt closing and then attempting to reconnect to the feed. -func (meh *sgMgrEventHandlers) OnFeedError(srcType string, r cbgt.Feed, err error) { +func (meh *sgMgrEventHandlers) OnFeedError(srcType string, r cbgt.Feed, feedErr error) { DebugfCtx(meh.ctx, KeyDCP, "cbgt Mgr OnFeedError, srcType: %s, feed name: %s, err: %v", - srcType, r.Name(), err) + srcType, r.Name(), feedErr) dcpFeed, ok := r.(cbgt.FeedEx) if !ok { return } - gone, indexUUID, er := dcpFeed.VerifySourceNotExists() + gone, indexUUID, err := dcpFeed.VerifySourceNotExists() DebugfCtx(meh.ctx, KeyDCP, "cbgt Mgr OnFeedError, VerifySourceNotExists,"+ " srcType: %s, gone: %t, indexUUID: %s, err: %v", - srcType, gone, indexUUID, er) + srcType, gone, indexUUID, err) if !gone { // If we get an EOF error from the feeds and the bucket is still alive, // then there could at the least two potential error scenarios. @@ -741,8 +764,11 @@ func (meh *sgMgrEventHandlers) OnFeedError(srcType string, r cbgt.Feed, err erro // the connectivity problems either during the next rebalance // (new kv node after failover-recovery rebalance) or // on the next janitor work cycle(ephemeral network issue to the same node). - if strings.Contains(err.Error(), "EOF") { - InfofCtx(meh.ctx, KeyDCP, "Handling EOF on cbgt feed - notifying manager to trigger reconnection to feed. indexUUID: %v, err: %v", indexUUID, err) + if strings.Contains(feedErr.Error(), "EOF") { + // If this wasn't an intentional close, log about the EOF + if meh.ctx.Err() != context.Canceled { + InfofCtx(meh.ctx, KeyDCP, "Handling EOF on cbgt feed - notifying manager to trigger reconnection to feed. indexUUID: %v, err: %v", indexUUID, feedErr) + } dcpFeed.NotifyMgrOnClose() } } diff --git a/db/import_listener.go b/db/import_listener.go index 72ef8a8734..7cb9142bcc 100644 --- a/db/import_listener.go +++ b/db/import_listener.go @@ -214,19 +214,7 @@ func (il *importListener) ImportFeedEvent(event sgbucket.FeedEvent) { func (il *importListener) Stop() { if il != nil { if il.cbgtContext != nil { - il.cbgtContext.StopHeartbeatListener() - - // Close open PIndexes before stopping the manager. - _, pindexes := il.cbgtContext.Manager.CurrentMaps() - for _, pIndex := range pindexes { - err := il.cbgtContext.Manager.ClosePIndex(pIndex) - if err != nil { - base.DebugfCtx(il.loggingCtx, base.KeyImport, "Error closing pindex: %v", err) - } - } - // ClosePIndex calls are synchronous, so can stop manager once they've completed - il.cbgtContext.Manager.Stop() - il.cbgtContext.RemoveFeedCredentials(il.dbName) + il.cbgtContext.Stop() // Remove entry from global listener directory base.RemoveDestFactory(il.importDestKey)