Skip to content

Commit

Permalink
CBG-2853 Allow one-shot replications to wait for DCP to catch up on c…
Browse files Browse the repository at this point in the history
…hanges feed (#6243)

* CBG-2853 Add requestPlus option for changes feeds

Adds requestPlus option for changes feeds.  When set, changes feeds will loop until the cached sequence (via DCP) is greater than the database sequence at the time the changes request was issued.

requestPlus can be enabled for non-continuous changes requests in one of three ways:
- by setting request_plus=true on a REST API changes call
- by setting the requestPlus property to "true" on a subChanges message
- by setting "changes_request_plus":true in the database config (default=false)

The request setting is given priority - if not set on a request, the value will fall back to the database config value.

Required minor refactoring of how options.Wait was used in changes.go, to support use of requestPlus and longpoll together.  No functional changes to longpoll if requestPlus is not set.

* Update docs for request_plus changes parameter.

* lint fixes
  • Loading branch information
adamcfraser authored and torcolvin committed Jun 1, 2023
1 parent c39824f commit c4e0dd3
Show file tree
Hide file tree
Showing 13 changed files with 617 additions and 43 deletions.
34 changes: 26 additions & 8 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,23 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {
}

continuous := subChangesParams.continuous()

// used for stats tracking
bh.continuous = continuous

requestPlusSeq := uint64(0)
// If non-continuous, check whether requestPlus handling is set for request or via database config
if continuous == false {
useRequestPlus := subChangesParams.requestPlus(bh.db.Options.ChangesRequestPlus)
if useRequestPlus {
seq, requestPlusErr := bh.db.GetRequestPlusSequence()
if requestPlusErr != nil {
return base.HTTPErrorf(http.StatusServiceUnavailable, "Unable to retrieve current sequence for requestPlus=true: %v", requestPlusErr)
}
requestPlusSeq = seq
}
}

// Start asynchronous changes goroutine
go func() {
// Pull replication stats by type
Expand Down Expand Up @@ -249,6 +264,7 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {
revocations: subChangesParams.revocations(),
clientType: clientType,
ignoreNoConflicts: clientType == clientTypeSGR2, // force this side to accept a "changes" message, even in no conflicts mode for SGR2.
requestPlusSeq: requestPlusSeq,
})
base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s --> Time:%v", bh.serialNumber, rq.Profile(), time.Since(startTime))
}()
Expand All @@ -273,6 +289,7 @@ type sendChangesOptions struct {
clientType clientType
revocations bool
ignoreNoConflicts bool
requestPlusSeq uint64
}

type changesDeletedFlag uint
Expand All @@ -299,14 +316,15 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions
base.InfofCtx(bh.loggingCtx, base.KeySync, "Sending changes since %v", opts.since)

options := ChangesOptions{
Since: opts.since,
Conflicts: false, // CBL 2.0/BLIP don't support branched rev trees (LiteCore #437)
Continuous: opts.continuous,
ActiveOnly: opts.activeOnly,
Revocations: opts.revocations,
Terminator: bh.BlipSyncContext.terminator,
Ctx: bh.loggingCtx,
clientType: opts.clientType,
Since: opts.since,
Conflicts: false, // CBL 2.0/BLIP don't support branched rev trees (LiteCore #437)
Continuous: opts.continuous,
ActiveOnly: opts.activeOnly,
Revocations: opts.revocations,
Terminator: bh.BlipSyncContext.terminator,
Ctx: bh.loggingCtx,
clientType: opts.clientType,
RequestPlusSeq: opts.requestPlusSeq,
}

channelSet := opts.channels
Expand Down
9 changes: 9 additions & 0 deletions db/blip_sync_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const (
SubChangesContinuous = "continuous"
SubChangesBatch = "batch"
SubChangesRevocations = "revocations"
SubChangesRequestPlus = "requestPlus"

// rev message properties
RevMessageId = "id"
Expand Down Expand Up @@ -199,6 +200,14 @@ func (s *SubChangesParams) activeOnly() bool {
return (s.rq.Properties[SubChangesActiveOnly] == "true")
}

func (s *SubChangesParams) requestPlus(defaultValue bool) (value bool) {
propertyValue, isDefined := s.rq.Properties[SubChangesRequestPlus]
if !isDefined {
return defaultValue
}
return propertyValue == "true"
}

func (s *SubChangesParams) filter() string {
return s.rq.Properties[SubChangesFilter]
}
Expand Down
56 changes: 34 additions & 22 deletions db/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,20 @@ import (
// Options for changes-feeds. ChangesOptions must not contain any mutable pointer references, as
// changes processing currently assumes a deep copy when doing chanOpts := changesOptions.
type ChangesOptions struct {
Since SequenceID // sequence # to start _after_
Limit int // Max number of changes to return, if nonzero
Conflicts bool // Show all conflicting revision IDs, not just winning one?
IncludeDocs bool // Include doc body of each change?
Wait bool // Wait for results, instead of immediately returning empty result?
Continuous bool // Run continuously until terminated?
Terminator chan bool // Caller can close this channel to terminate the feed
HeartbeatMs uint64 // How often to send a heartbeat to the client
TimeoutMs uint64 // After this amount of time, close the longpoll connection
ActiveOnly bool // If true, only return information on non-deleted, non-removed revisions
Revocations bool // Specifies whether revocation messages should be sent on the changes feed
clientType clientType // Can be used to determine if the replication is being started from a CBL 2.x or SGR2 client
Ctx context.Context // Used for adding context to logs
Since SequenceID // sequence # to start _after_
Limit int // Max number of changes to return, if nonzero
Conflicts bool // Show all conflicting revision IDs, not just winning one?
IncludeDocs bool // Include doc body of each change?
Wait bool // Wait for results, instead of immediately returning empty result?
Continuous bool // Run continuously until terminated?
Terminator chan bool // Caller can close this channel to terminate the feed
HeartbeatMs uint64 // How often to send a heartbeat to the client
TimeoutMs uint64 // After this amount of time, close the longpoll connection
ActiveOnly bool // If true, only return information on non-deleted, non-removed revisions
Revocations bool // Specifies whether revocation messages should be sent on the changes feed
clientType clientType // Can be used to determine if the replication is being started from a CBL 2.x or SGR2 client
Ctx context.Context // Used for adding context to logs
RequestPlusSeq uint64 // Do not stop changes before cached sequence catches up with requestPlusSeq
}

// A changes entry; Database.GetChanges returns an array of these.
Expand Down Expand Up @@ -628,9 +629,13 @@ func (db *Database) SimpleMultiChangesFeed(chans base.Set, options ChangesOption

// Retrieve the current max cached sequence - ensures there isn't a race between the subsequent channel cache queries
currentCachedSequence = db.changeCache.getChannelCache().GetHighCacheSequence()
if options.Wait {
options.Wait = false
changeWaiter = db.startChangeWaiter(base.Set{}) // Waiter is updated with the actual channel set (post-user reload) at the start of the outer changes loop
var useLateSequenceFeeds bool // LateSequence feeds are only used for continuous, or one-shot where options.RequestPlusSeq > currentCachedSequence
// Retrieve the current max cached sequence - ensures there isn't a race between the subsequent channel cache queries
currentCachedSequence = db.changeCache.getChannelCache().GetHighCacheSequence()

// If changes feed requires more than one ChangesLoop iteration, initialize changeWaiter
if options.Wait || options.RequestPlusSeq > currentCachedSequence {
changeWaiter = db.startChangeWaiter(nil) // Waiter is updated with the actual channel set (post-user reload) at the start of the outer changes loop
userCounter = changeWaiter.CurrentUserCount()
// Reload user to pick up user changes that happened between auth and the change waiter
// initialization. Without this, notification for user doc changes in that window (a) won't be
Expand Down Expand Up @@ -665,7 +670,8 @@ func (db *Database) SimpleMultiChangesFeed(chans base.Set, options ChangesOption

// For a continuous feed, initialise the lateSequenceFeeds that track late-arriving sequences
// to the channel caches.
if options.Continuous {
if options.Continuous || options.RequestPlusSeq > currentCachedSequence {
useLateSequenceFeeds = true
lateSequenceFeeds = make(map[string]*lateSequenceFeed)
defer db.closeLateFeeds(lateSequenceFeeds)
}
Expand Down Expand Up @@ -723,7 +729,7 @@ func (db *Database) SimpleMultiChangesFeed(chans base.Set, options ChangesOption
// Handles previously skipped sequences prior to options.Since that
// have arrived in the channel cache since this changes request started. Only needed for
// continuous feeds - one-off changes requests only require the standard channel cache.
if options.Continuous {
if useLateSequenceFeeds {
lateSequenceFeedHandler := lateSequenceFeeds[name]
if lateSequenceFeedHandler != nil {
latefeed, err := db.getLateFeed(lateSequenceFeedHandler, singleChannelCache)
Expand Down Expand Up @@ -940,14 +946,18 @@ func (db *Database) SimpleMultiChangesFeed(chans base.Set, options ChangesOption
}
}

if !options.Continuous && (sentSomething || changeWaiter == nil) {
break
// Check whether non-continuous changes feeds that aren't waiting to reach requestPlus sequence can exit
if !options.Continuous && currentCachedSequence >= options.RequestPlusSeq {
// If non-longpoll, or longpoll has sent something, can exit
if !options.Wait || sentSomething {
break
}
}

// For longpoll requests that didn't send any results, reset low sequence to the original since value,
// as the system low sequence may change before the longpoll request wakes up, and longpoll feeds don't
// use lateSequenceFeeds.
if !options.Continuous {
if !useLateSequenceFeeds {
options.Since.LowSeq = requestLowSeq
}

Expand All @@ -964,6 +974,7 @@ func (db *Database) SimpleMultiChangesFeed(chans base.Set, options ChangesOption

waitForChanges:
for {
db.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Add(1)
// If we're in a deferred Backfill, the user may not get notification when the cache catches up to the backfill (e.g. when the granting doc isn't
// visible to the user), and so ChangeWaiter.Wait() would block until the next user-visible doc arrives. Use a hardcoded wait instead
// Similar handling for when we see sequences later than the stable sequence.
Expand Down Expand Up @@ -1288,7 +1299,7 @@ func createChangesEntry(docid string, db *Database, options ChangesOptions) *Cha

func (options ChangesOptions) String() string {
return fmt.Sprintf(
`{Since: %s, Limit: %d, Conflicts: %t, IncludeDocs: %t, Wait: %t, Continuous: %t, HeartbeatMs: %d, TimeoutMs: %d, ActiveOnly: %t}`,
`{Since: %s, Limit: %d, Conflicts: %t, IncludeDocs: %t, Wait: %t, Continuous: %t, HeartbeatMs: %d, TimeoutMs: %d, ActiveOnly: %t, RequestPlusSeq: %d}`,
options.Since,
options.Limit,
options.Conflicts,
Expand All @@ -1298,6 +1309,7 @@ func (options ChangesOptions) String() string {
options.HeartbeatMs,
options.TimeoutMs,
options.ActiveOnly,
options.RequestPlusSeq,
)
}

Expand Down
8 changes: 8 additions & 0 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ type DatabaseContextOptions struct {
ClientPartitionWindow time.Duration
BcryptCost int
GroupID string
ChangesRequestPlus bool // Sets the default value for request_plus, for non-continuous changes feeds
}

type SGReplicateOptions struct {
Expand Down Expand Up @@ -1711,3 +1712,10 @@ func (context *DatabaseContext) IsGuestReadOnly() bool {
return context.Options.UnsupportedOptions != nil && context.Options.UnsupportedOptions.GuestReadOnly

}

// GetRequestPlusSequence fetches the current value of the sequence counter for the database.
// Uses getSequence (instead of lastSequence) as it's intended to be up to date with allocations
// across all nodes, while lastSequence is just the latest allocation from this node
func (dbc *DatabaseContext) GetRequestPlusSequence() (uint64, error) {
return dbc.sequences.getSequence()
}
21 changes: 21 additions & 0 deletions db/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ func (db *DatabaseContext) WaitForCaughtUp(targetCount int64) error {
return errors.New("WaitForCaughtUp didn't catch up")
}

func (db *DatabaseContext) WaitForTotalCaughtUp(targetCount int64) error {
for i := 0; i < 100; i++ {
caughtUpCount := db.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Value()
if caughtUpCount >= targetCount {
return nil
}
time.Sleep(100 * time.Millisecond)
}
return errors.New("WaitForCaughtUp didn't catch up")
}

type StatWaiter struct {
initCount int64 // Document cached count when NewStatWaiter is called
targetCount int64 // Target count used when Wait is called
Expand Down Expand Up @@ -368,3 +379,13 @@ func SuspendSequenceBatching() func() {
MaxSequenceIncrFrequency = 0 * time.Millisecond
return func() { MaxSequenceIncrFrequency = oldFrequency }
}

// AllocateTestSequence allocates a sequence via the sequenceAllocator. For use by non-db tests
func AllocateTestSequence(database *DatabaseContext) (uint64, error) {
return database.sequences.incrementSequence(1)
}

// ReleaseTestSequence releases a sequence via the sequenceAllocator. For use by non-db tests
func ReleaseTestSequence(database *DatabaseContext, sequence uint64) error {
return database.sequences.releaseSequence(sequence)
}
139 changes: 139 additions & 0 deletions rest/blip_api_crud_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
Copyright 2018-Present Couchbase, Inc.
Use of this software is governed by the Business Source License included in
the file licenses/BSL-Couchbase.txt. As of the Change Date specified in that
file, in accordance with the Business Source License, use of this software will
be governed by the Apache License, Version 2.0, included in the file
licenses/APL2.txt.
*/

package rest

import (
"testing"

"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/db"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestRequestPlusPull tests that a one-shot pull replication waits for pending changes when request plus is set on the replication.
func TestRequestPlusPull(t *testing.T) {

defer base.SetUpTestLogging(base.LevelInfo, base.KeyDCP, base.KeyChanges, base.KeyHTTP)()
defer db.SuspendSequenceBatching()() // Required for slow sequence simulation

rtConfig := RestTesterConfig{
SyncFn: `function(doc) {
channel(doc.channel);
if (doc.accessUser != "") {
access(doc.accessUser, doc.accessChannel)
}
}`,
}
rt := NewRestTester(t, &rtConfig)
defer rt.Close()
database := rt.GetDatabase()

// Initialize blip tester client (will create user)
client, err := NewBlipTesterClientOptsWithRT(t, rt, &BlipTesterClientOpts{
Username: "bernard",
})
require.NoError(t, err)
defer client.Close()

// Put a doc in channel PBS
response := rt.SendAdminRequest("PUT", "/db/pbs-1", `{"channel":["PBS"]}`)
RequireStatus(t, response, 201)

// Allocate a sequence but do not write a doc for it - will block DCP buffering until sequence is skipped
slowSequence, seqErr := db.AllocateTestSequence(database)
require.NoError(t, seqErr)

// Write a document granting user 'bernard' access to PBS
response = rt.SendAdminRequest("PUT", "/db/grantDoc", `{"accessUser":"bernard", "accessChannel":"PBS"}`)
RequireStatus(t, response, 201)

caughtUpStart := database.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Value()

// Start a regular one-shot pull
err = client.StartOneshotPullRequestPlus()
assert.NoError(t, err)

// Wait for the one-shot changes feed to go into wait mode before releasing the slow sequence
require.NoError(t, database.WaitForTotalCaughtUp(caughtUpStart+1))

// Release the slow sequence
releaseErr := db.ReleaseTestSequence(database, slowSequence)
require.NoError(t, releaseErr)

// The one-shot pull should unblock and replicate the document in the granted channel
data, ok := client.WaitForDoc("pbs-1")
assert.True(t, ok)
assert.Equal(t, `{"channel":["PBS"]}`, string(data))

}

// TestRequestPlusPull tests that a one-shot pull replication waits for pending changes when request plus is set on the db config.
func TestRequestPlusPullDbConfig(t *testing.T) {

defer base.SetUpTestLogging(base.LevelInfo, base.KeyDCP, base.KeyChanges, base.KeyHTTP)()
defer db.SuspendSequenceBatching()() // Required for slow sequence simulation

rtConfig := RestTesterConfig{
SyncFn: `function(doc) {
channel(doc.channel);
if (doc.accessUser != "") {
access(doc.accessUser, doc.accessChannel)
}
}`,
DatabaseConfig: &DatabaseConfig{
DbConfig: DbConfig{
ChangesRequestPlus: base.BoolPtr(true),
},
},
}
rt := NewRestTester(t, &rtConfig)
defer rt.Close()
database := rt.GetDatabase()

// Initialize blip tester client (will create user)
client, err := NewBlipTesterClientOptsWithRT(t, rt, &BlipTesterClientOpts{
Username: "bernard",
})
require.NoError(t, err)
defer client.Close()

// Put a doc in channel PBS
response := rt.SendAdminRequest("PUT", "/db/pbs-1", `{"channel":["PBS"]}`)
RequireStatus(t, response, 201)

// Allocate a sequence but do not write a doc for it - will block DCP buffering until sequence is skipped
slowSequence, seqErr := db.AllocateTestSequence(database)
require.NoError(t, seqErr)

// Write a document granting user 'bernard' access to PBS
response = rt.SendAdminRequest("PUT", "/db/grantDoc", `{"accessUser":"bernard", "accessChannel":"PBS"}`)
RequireStatus(t, response, 201)

caughtUpStart := database.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Value()

// Start a regular one-shot pull
err = client.StartOneshotPull()
assert.NoError(t, err)

// Wait for the one-shot changes feed to go into wait mode before releasing the slow sequence
require.NoError(t, database.WaitForTotalCaughtUp(caughtUpStart+1))

// Release the slow sequence
releaseErr := db.ReleaseTestSequence(database, slowSequence)
require.NoError(t, releaseErr)

// The one-shot pull should unblock and replicate the document in the granted channel
data, ok := client.WaitForDoc("pbs-1")
assert.True(t, ok)
assert.Equal(t, `{"channel":["PBS"]}`, string(data))

}
Loading

0 comments on commit c4e0dd3

Please sign in to comment.