Skip to content

Commit

Permalink
CBG-2853 Add requestPlus option for changes feeds
Browse files Browse the repository at this point in the history
Adds requestPlus option for changes feeds.  When set, changes feeds will loop until the cached sequence (via DCP) is greater than the database sequence at the time the changes request was issued.

requestPlus can be enabled for non-continuous changes requests in one of three ways:
- by setting request_plus=true on a REST API changes call
- by setting the requestPlus property to "true" on a subChanges message
- by setting "changes_request_plus":true in the database config (default=false)

The request setting is given priority - if not set on a request, the value will fall back to the database config value.

Required minor refactoring of how options.Wait was used in changes.go, to support use of requestPlus and longpoll together.  No functional changes to longpoll if requestPlus is not set.
  • Loading branch information
adamcfraser committed May 15, 2023
1 parent b9a08ed commit 569d6e9
Show file tree
Hide file tree
Showing 11 changed files with 605 additions and 46 deletions.
32 changes: 24 additions & 8 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,19 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {

continuous := subChangesParams.continuous()

requestPlusSeq := uint64(0)
// If non-continuous, check whether requestPlus handling is set for request or via database config
if continuous == false {
useRequestPlus := subChangesParams.requestPlus(bh.db.Options.ChangesRequestPlus)
if useRequestPlus {
seq, requestPlusErr := bh.db.GetRequestPlusSequence()
if requestPlusErr != nil {
return base.HTTPErrorf(http.StatusServiceUnavailable, "Unable to retrieve current sequence for requestPlus=true: %v", requestPlusErr)
}
requestPlusSeq = seq
}
}

// Start asynchronous changes goroutine
go func() {
// Pull replication stats by type
Expand Down Expand Up @@ -325,6 +338,7 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {
clientType: clientType,
ignoreNoConflicts: clientType == clientTypeSGR2, // force this side to accept a "changes" message, even in no conflicts mode for SGR2.
changesCtx: collectionCtx.changesCtx,
requestPlusSeq: requestPlusSeq,
})
base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s --> Time:%v", bh.serialNumber, rq.Profile(), time.Since(startTime))
}()
Expand Down Expand Up @@ -358,6 +372,7 @@ type sendChangesOptions struct {
revocations bool
ignoreNoConflicts bool
changesCtx context.Context
requestPlusSeq uint64
}

type changesDeletedFlag uint
Expand Down Expand Up @@ -385,14 +400,15 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions
base.InfofCtx(bh.loggingCtx, base.KeySync, "Sending changes since %v", opts.since)

options := ChangesOptions{
Since: opts.since,
Conflicts: false, // CBL 2.0/BLIP don't support branched rev trees (LiteCore #437)
Continuous: opts.continuous,
ActiveOnly: opts.activeOnly,
Revocations: opts.revocations,
LoggingCtx: bh.loggingCtx,
clientType: opts.clientType,
ChangesCtx: opts.changesCtx,
Since: opts.since,
Conflicts: false, // CBL 2.0/BLIP don't support branched rev trees (LiteCore #437)
Continuous: opts.continuous,
ActiveOnly: opts.activeOnly,
Revocations: opts.revocations,
LoggingCtx: bh.loggingCtx,
clientType: opts.clientType,
ChangesCtx: opts.changesCtx,
RequestPlusSeq: opts.requestPlusSeq,
}

channelSet := opts.channels
Expand Down
12 changes: 11 additions & 1 deletion db/blip_sync_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ const (
SubChangesContinuous = "continuous"
SubChangesBatch = "batch"
SubChangesRevocations = "revocations"
SubChangesRequestPlus = "requestPlus"
SubChangesFuture = "future"

// rev message properties
RevMessageID = "id"
Expand Down Expand Up @@ -163,7 +165,7 @@ func NewSubChangesParams(logCtx context.Context, rq *blip.Message, zeroSeq Seque
// Determine incoming since and docIDs once, since there is some overhead associated with their calculation
sinceSequenceId := zeroSeq
var err error
if rq.Properties["future"] == trueProperty {
if rq.Properties[SubChangesFuture] == trueProperty {
sinceSequenceId, err = latestSeq()
} else if sinceStr, found := rq.Properties[SubChangesSince]; found {
if sinceSequenceId, err = sequenceIDParser(sinceStr); err != nil {
Expand Down Expand Up @@ -234,6 +236,14 @@ func (s *SubChangesParams) activeOnly() bool {
return (s.rq.Properties[SubChangesActiveOnly] == trueProperty)
}

func (s *SubChangesParams) requestPlus(defaultValue bool) (value bool) {
propertyValue, isDefined := s.rq.Properties[SubChangesRequestPlus]
if !isDefined {
return defaultValue
}
return propertyValue == trueProperty
}

func (s *SubChangesParams) filter() string {
return s.rq.Properties[SubChangesFilter]
}
Expand Down
55 changes: 32 additions & 23 deletions db/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,20 @@ import (
// Options for changes-feeds. ChangesOptions must not contain any mutable pointer references, as
// changes processing currently assumes a deep copy when doing chanOpts := changesOptions.
type ChangesOptions struct {
Since SequenceID // sequence # to start _after_
Limit int // Max number of changes to return, if nonzero
Conflicts bool // Show all conflicting revision IDs, not just winning one?
IncludeDocs bool // Include doc body of each change?
Wait bool // Wait for results, instead of immediately returning empty result?
Continuous bool // Run continuously until terminated?
HeartbeatMs uint64 // How often to send a heartbeat to the client
TimeoutMs uint64 // After this amount of time, close the longpoll connection
ActiveOnly bool // If true, only return information on non-deleted, non-removed revisions
Revocations bool // Specifies whether revocation messages should be sent on the changes feed
clientType clientType // Can be used to determine if the replication is being started from a CBL 2.x or SGR2 client
LoggingCtx context.Context // Used for adding context to logs
ChangesCtx context.Context // Used for cancelling checking the changes feed should stop
Since SequenceID // sequence # to start _after_
Limit int // Max number of changes to return, if nonzero
Conflicts bool // Show all conflicting revision IDs, not just winning one?
IncludeDocs bool // Include doc body of each change?
Wait bool // Wait for results, instead of immediately returning empty result?
Continuous bool // Run continuously until terminated?
RequestPlusSeq uint64 // Do not stop changes before cached sequence catches up with requestPlusSeq
HeartbeatMs uint64 // How often to send a heartbeat to the client
TimeoutMs uint64 // After this amount of time, close the longpoll connection
ActiveOnly bool // If true, only return information on non-deleted, non-removed revisions
Revocations bool // Specifies whether revocation messages should be sent on the changes feed
clientType clientType // Can be used to determine if the replication is being started from a CBL 2.x or SGR2 client
LoggingCtx context.Context // Used for adding context to logs
ChangesCtx context.Context // Used for cancelling checking the changes feed should stop
}

// A changes entry; Database.GetChanges returns an array of these.
Expand Down Expand Up @@ -629,18 +630,19 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex

var changeWaiter *ChangeWaiter
var lowSequence uint64
var currentCachedSequence uint64
var currentCachedSequence uint64 // The highest contiguous sequence buffered over the caching feed
var lateSequenceFeeds map[channels.ID]*lateSequenceFeed
var useLateSequenceFeeds bool // LateSequence feeds are only used for continuous, or one-shot where options.RequestPlusSeq > currentCachedSequence
var userCounter uint64 // Wait counter used to identify changes to the user document
var changedChannels channels.ChangedKeys // Tracks channels added/removed to the user during changes processing.
var userChanged bool // Whether the user document has changed in a given iteration loop
var deferredBackfill bool // Whether there's a backfill identified in the user doc that's deferred while the SG cache catches up

// Retrieve the current max cached sequence - ensures there isn't a race between the subsequent channel cache queries
currentCachedSequence = col.changeCache().getChannelCache().GetHighCacheSequence()
if options.Wait {
options.Wait = false

// If changes feed requires more than one ChangesLoop iteration, initialize changeWaiter
if options.Wait || options.RequestPlusSeq > currentCachedSequence {
changeWaiter = col.startChangeWaiter() // Waiter is updated with the actual channel set (post-user reload) at the start of the outer changes loop
userCounter = changeWaiter.CurrentUserCount()
// Reload user to pick up user changes that happened between auth and the change waiter
Expand Down Expand Up @@ -676,7 +678,8 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex

// For a continuous feed, initialise the lateSequenceFeeds that track late-arriving sequences
// to the channel caches.
if options.Continuous {
if options.Continuous || options.RequestPlusSeq > currentCachedSequence {
useLateSequenceFeeds = true
lateSequenceFeeds = make(map[channels.ID]*lateSequenceFeed)
defer col.closeLateFeeds(lateSequenceFeeds)
}
Expand Down Expand Up @@ -741,7 +744,7 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex
// Handles previously skipped sequences prior to options.Since that
// have arrived in the channel cache since this changes request started. Only needed for
// continuous feeds - one-off changes requests only require the standard channel cache.
if options.Continuous {
if useLateSequenceFeeds {
lateSequenceFeedHandler := lateSequenceFeeds[chanID]
if lateSequenceFeedHandler != nil {
latefeed, err := col.getLateFeed(lateSequenceFeedHandler, singleChannelCache)
Expand Down Expand Up @@ -957,14 +960,19 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex
}
}
}
if !options.Continuous && (sentSomething || changeWaiter == nil) {
break

// Check whether non-continuous changes feeds that aren't waiting to reach requestPlus sequence can exit
if !options.Continuous && currentCachedSequence >= options.RequestPlusSeq {
// If non-longpoll, or longpoll has sent something, can exit
if !options.Wait || sentSomething {
break
}
}

// For longpoll requests that didn't send any results, reset low sequence to the original since value,
// as the system low sequence may change before the longpoll request wakes up, and longpoll feeds don't
// use lateSequenceFeeds.
if !options.Continuous {
if !useLateSequenceFeeds {
options.Since.LowSeq = requestLowSeq
}

Expand All @@ -981,6 +989,7 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex

waitForChanges:
for {
col.dbStats().CBLReplicationPull().NumPullReplTotalCaughtUp.Add(1)
// If we're in a deferred Backfill, the user may not get notification when the cache catches up to the backfill (e.g. when the granting doc isn't
// visible to the user), and so ChangeWaiter.Wait() would block until the next user-visible doc arrives. Use a hardcoded wait instead
// Similar handling for when we see sequences later than the stable sequence.
Expand All @@ -992,7 +1001,6 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex
break waitForChanges
}

col.dbStats().CBLReplicationPull().NumPullReplTotalCaughtUp.Add(1)
col.dbStats().CBLReplicationPull().NumPullReplCaughtUp.Add(1)
waitResponse := changeWaiter.Wait()
col.dbStats().CBLReplicationPull().NumPullReplCaughtUp.Add(-1)
Expand Down Expand Up @@ -1310,7 +1318,7 @@ func createChangesEntry(ctx context.Context, docid string, db *DatabaseCollectio

func (options ChangesOptions) String() string {
return fmt.Sprintf(
`{Since: %s, Limit: %d, Conflicts: %t, IncludeDocs: %t, Wait: %t, Continuous: %t, HeartbeatMs: %d, TimeoutMs: %d, ActiveOnly: %t}`,
`{Since: %s, Limit: %d, Conflicts: %t, IncludeDocs: %t, Wait: %t, Continuous: %t, HeartbeatMs: %d, TimeoutMs: %d, ActiveOnly: %t, RequestPlusSeq: %d}`,
options.Since,
options.Limit,
options.Conflicts,
Expand All @@ -1320,6 +1328,7 @@ func (options ChangesOptions) String() string {
options.HeartbeatMs,
options.TimeoutMs,
options.ActiveOnly,
options.RequestPlusSeq,
)
}

Expand Down
11 changes: 9 additions & 2 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ type DatabaseContextOptions struct {
skipRegisterImportPIndex bool // if set, skips the global gocb PIndex registration
MetadataStore base.DataStore // If set, use this location/connection for SG metadata storage - if not set, metadata is stored using the same location/connection as the bucket used for data storage.
MetadataID string // MetadataID used for metadata storage

BlipStatsReportingInterval int64 // interval to report blip stats in milliseconds
BlipStatsReportingInterval int64 // interval to report blip stats in milliseconds
ChangesRequestPlus bool // Sets the default value for request_plus, for non-continuous changes feeds
}

type ScopesOptions map[string]ScopeOptions
Expand Down Expand Up @@ -2326,3 +2326,10 @@ func (dbc *DatabaseContext) AuthenticatorOptions() auth.AuthenticatorOptions {
defaultOptions.MetaKeys = dbc.MetadataKeys
return defaultOptions
}

// GetRequestPlusSequence fetches the current value of the sequence counter for the database.
// Uses getSequence (instead of lastSequence) as it's intended to be up to date with allocations
// across all nodes, while lastSequence is just the latest allocation from this node
func (dbc *DatabaseContext) GetRequestPlusSequence() (uint64, error) {
return dbc.sequences.getSequence()
}
21 changes: 21 additions & 0 deletions db/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ func (db *DatabaseContext) WaitForCaughtUp(targetCount int64) error {
return errors.New("WaitForCaughtUp didn't catch up")
}

func (db *DatabaseContext) WaitForTotalCaughtUp(targetCount int64) error {
for i := 0; i < 100; i++ {
caughtUpCount := db.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Value()
if caughtUpCount >= targetCount {
return nil
}
time.Sleep(100 * time.Millisecond)
}
return errors.New("WaitForCaughtUp didn't catch up")
}

type StatWaiter struct {
initCount int64 // Document cached count when NewStatWaiter is called
targetCount int64 // Target count used when Wait is called
Expand Down Expand Up @@ -598,3 +609,13 @@ func GetSingleDatabaseCollection(tb testing.TB, database *DatabaseContext) *Data
tb.Fatalf("Could not find a collection")
return nil
}

// AllocateTestSequence allocates a sequence via the sequenceAllocator. For use by non-db tests
func AllocateTestSequence(database *DatabaseContext) (uint64, error) {
return database.sequences.incrementSequence(1)
}

// ReleaseTestSequence releases a sequence via the sequenceAllocator. For use by non-db tests
func ReleaseTestSequence(database *DatabaseContext, sequence uint64) error {
return database.sequences.releaseSequence(sequence)
}
Loading

0 comments on commit 569d6e9

Please sign in to comment.