From 569d6e98c9ec43cf07793251265aa6b18b3f2d4b Mon Sep 17 00:00:00 2001 From: adamcfraser Date: Sun, 14 May 2023 22:29:21 -0700 Subject: [PATCH] CBG-2853 Add requestPlus option for changes feeds Adds requestPlus option for changes feeds. When set, changes feeds will loop until the cached sequence (via DCP) is greater than the database sequence at the time the changes request was issued. requestPlus can be enabled for non-continuous changes requests in one of three ways: - by setting request_plus=true on a REST API changes call - by setting the requestPlus property to "true" on a subChanges message - by setting "changes_request_plus":true in the database config (default=false) The request setting is given priority - if not set on a request, the value will fall back to the database config value. Required minor refactoring of how options.Wait was used in changes.go, to support use of requestPlus and longpoll together. No functional changes to longpoll if requestPlus is not set. --- db/blip_handler.go | 32 ++- db/blip_sync_messages.go | 12 +- db/changes.go | 55 ++--- db/database.go | 11 +- db/util_testing.go | 21 ++ rest/blip_api_crud_test.go | 119 +++++++++++ rest/blip_client_test.go | 65 +++++- rest/changes_api.go | 43 +++- rest/changestest/changes_api_test.go | 291 +++++++++++++++++++++++++++ rest/config.go | 1 + rest/server_context.go | 1 + 11 files changed, 605 insertions(+), 46 deletions(-) diff --git a/db/blip_handler.go b/db/blip_handler.go index 109bbeb871..90c35d9840 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -295,6 +295,19 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error { continuous := subChangesParams.continuous() + requestPlusSeq := uint64(0) + // If non-continuous, check whether requestPlus handling is set for request or via database config + if continuous == false { + useRequestPlus := subChangesParams.requestPlus(bh.db.Options.ChangesRequestPlus) + if useRequestPlus { + seq, requestPlusErr := bh.db.GetRequestPlusSequence() + if requestPlusErr != nil { + return base.HTTPErrorf(http.StatusServiceUnavailable, "Unable to retrieve current sequence for requestPlus=true: %v", requestPlusErr) + } + requestPlusSeq = seq + } + } + // Start asynchronous changes goroutine go func() { // Pull replication stats by type @@ -325,6 +338,7 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error { clientType: clientType, ignoreNoConflicts: clientType == clientTypeSGR2, // force this side to accept a "changes" message, even in no conflicts mode for SGR2. changesCtx: collectionCtx.changesCtx, + requestPlusSeq: requestPlusSeq, }) base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s --> Time:%v", bh.serialNumber, rq.Profile(), time.Since(startTime)) }() @@ -358,6 +372,7 @@ type sendChangesOptions struct { revocations bool ignoreNoConflicts bool changesCtx context.Context + requestPlusSeq uint64 } type changesDeletedFlag uint @@ -385,14 +400,15 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions base.InfofCtx(bh.loggingCtx, base.KeySync, "Sending changes since %v", opts.since) options := ChangesOptions{ - Since: opts.since, - Conflicts: false, // CBL 2.0/BLIP don't support branched rev trees (LiteCore #437) - Continuous: opts.continuous, - ActiveOnly: opts.activeOnly, - Revocations: opts.revocations, - LoggingCtx: bh.loggingCtx, - clientType: opts.clientType, - ChangesCtx: opts.changesCtx, + Since: opts.since, + Conflicts: false, // CBL 2.0/BLIP don't support branched rev trees (LiteCore #437) + Continuous: opts.continuous, + ActiveOnly: opts.activeOnly, + Revocations: opts.revocations, + LoggingCtx: bh.loggingCtx, + clientType: opts.clientType, + ChangesCtx: opts.changesCtx, + RequestPlusSeq: opts.requestPlusSeq, } channelSet := opts.channels diff --git a/db/blip_sync_messages.go b/db/blip_sync_messages.go index 381591be0b..1e2a161e73 100644 --- a/db/blip_sync_messages.go +++ b/db/blip_sync_messages.go @@ -68,6 +68,8 @@ const ( SubChangesContinuous = "continuous" SubChangesBatch = "batch" SubChangesRevocations = "revocations" + SubChangesRequestPlus = "requestPlus" + SubChangesFuture = "future" // rev message properties RevMessageID = "id" @@ -163,7 +165,7 @@ func NewSubChangesParams(logCtx context.Context, rq *blip.Message, zeroSeq Seque // Determine incoming since and docIDs once, since there is some overhead associated with their calculation sinceSequenceId := zeroSeq var err error - if rq.Properties["future"] == trueProperty { + if rq.Properties[SubChangesFuture] == trueProperty { sinceSequenceId, err = latestSeq() } else if sinceStr, found := rq.Properties[SubChangesSince]; found { if sinceSequenceId, err = sequenceIDParser(sinceStr); err != nil { @@ -234,6 +236,14 @@ func (s *SubChangesParams) activeOnly() bool { return (s.rq.Properties[SubChangesActiveOnly] == trueProperty) } +func (s *SubChangesParams) requestPlus(defaultValue bool) (value bool) { + propertyValue, isDefined := s.rq.Properties[SubChangesRequestPlus] + if !isDefined { + return defaultValue + } + return propertyValue == trueProperty +} + func (s *SubChangesParams) filter() string { return s.rq.Properties[SubChangesFilter] } diff --git a/db/changes.go b/db/changes.go index d430d51cda..53b1df1cc3 100644 --- a/db/changes.go +++ b/db/changes.go @@ -26,19 +26,20 @@ import ( // Options for changes-feeds. ChangesOptions must not contain any mutable pointer references, as // changes processing currently assumes a deep copy when doing chanOpts := changesOptions. type ChangesOptions struct { - Since SequenceID // sequence # to start _after_ - Limit int // Max number of changes to return, if nonzero - Conflicts bool // Show all conflicting revision IDs, not just winning one? - IncludeDocs bool // Include doc body of each change? - Wait bool // Wait for results, instead of immediately returning empty result? - Continuous bool // Run continuously until terminated? - HeartbeatMs uint64 // How often to send a heartbeat to the client - TimeoutMs uint64 // After this amount of time, close the longpoll connection - ActiveOnly bool // If true, only return information on non-deleted, non-removed revisions - Revocations bool // Specifies whether revocation messages should be sent on the changes feed - clientType clientType // Can be used to determine if the replication is being started from a CBL 2.x or SGR2 client - LoggingCtx context.Context // Used for adding context to logs - ChangesCtx context.Context // Used for cancelling checking the changes feed should stop + Since SequenceID // sequence # to start _after_ + Limit int // Max number of changes to return, if nonzero + Conflicts bool // Show all conflicting revision IDs, not just winning one? + IncludeDocs bool // Include doc body of each change? + Wait bool // Wait for results, instead of immediately returning empty result? + Continuous bool // Run continuously until terminated? + RequestPlusSeq uint64 // Do not stop changes before cached sequence catches up with requestPlusSeq + HeartbeatMs uint64 // How often to send a heartbeat to the client + TimeoutMs uint64 // After this amount of time, close the longpoll connection + ActiveOnly bool // If true, only return information on non-deleted, non-removed revisions + Revocations bool // Specifies whether revocation messages should be sent on the changes feed + clientType clientType // Can be used to determine if the replication is being started from a CBL 2.x or SGR2 client + LoggingCtx context.Context // Used for adding context to logs + ChangesCtx context.Context // Used for cancelling checking the changes feed should stop } // A changes entry; Database.GetChanges returns an array of these. @@ -629,8 +630,9 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex var changeWaiter *ChangeWaiter var lowSequence uint64 - var currentCachedSequence uint64 + var currentCachedSequence uint64 // The highest contiguous sequence buffered over the caching feed var lateSequenceFeeds map[channels.ID]*lateSequenceFeed + var useLateSequenceFeeds bool // LateSequence feeds are only used for continuous, or one-shot where options.RequestPlusSeq > currentCachedSequence var userCounter uint64 // Wait counter used to identify changes to the user document var changedChannels channels.ChangedKeys // Tracks channels added/removed to the user during changes processing. var userChanged bool // Whether the user document has changed in a given iteration loop @@ -638,9 +640,9 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex // Retrieve the current max cached sequence - ensures there isn't a race between the subsequent channel cache queries currentCachedSequence = col.changeCache().getChannelCache().GetHighCacheSequence() - if options.Wait { - options.Wait = false + // If changes feed requires more than one ChangesLoop iteration, initialize changeWaiter + if options.Wait || options.RequestPlusSeq > currentCachedSequence { changeWaiter = col.startChangeWaiter() // Waiter is updated with the actual channel set (post-user reload) at the start of the outer changes loop userCounter = changeWaiter.CurrentUserCount() // Reload user to pick up user changes that happened between auth and the change waiter @@ -676,7 +678,8 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex // For a continuous feed, initialise the lateSequenceFeeds that track late-arriving sequences // to the channel caches. - if options.Continuous { + if options.Continuous || options.RequestPlusSeq > currentCachedSequence { + useLateSequenceFeeds = true lateSequenceFeeds = make(map[channels.ID]*lateSequenceFeed) defer col.closeLateFeeds(lateSequenceFeeds) } @@ -741,7 +744,7 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex // Handles previously skipped sequences prior to options.Since that // have arrived in the channel cache since this changes request started. Only needed for // continuous feeds - one-off changes requests only require the standard channel cache. - if options.Continuous { + if useLateSequenceFeeds { lateSequenceFeedHandler := lateSequenceFeeds[chanID] if lateSequenceFeedHandler != nil { latefeed, err := col.getLateFeed(lateSequenceFeedHandler, singleChannelCache) @@ -957,14 +960,19 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex } } } - if !options.Continuous && (sentSomething || changeWaiter == nil) { - break + + // Check whether non-continuous changes feeds that aren't waiting to reach requestPlus sequence can exit + if !options.Continuous && currentCachedSequence >= options.RequestPlusSeq { + // If non-longpoll, or longpoll has sent something, can exit + if !options.Wait || sentSomething { + break + } } // For longpoll requests that didn't send any results, reset low sequence to the original since value, // as the system low sequence may change before the longpoll request wakes up, and longpoll feeds don't // use lateSequenceFeeds. - if !options.Continuous { + if !useLateSequenceFeeds { options.Since.LowSeq = requestLowSeq } @@ -981,6 +989,7 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex waitForChanges: for { + col.dbStats().CBLReplicationPull().NumPullReplTotalCaughtUp.Add(1) // If we're in a deferred Backfill, the user may not get notification when the cache catches up to the backfill (e.g. when the granting doc isn't // visible to the user), and so ChangeWaiter.Wait() would block until the next user-visible doc arrives. Use a hardcoded wait instead // Similar handling for when we see sequences later than the stable sequence. @@ -992,7 +1001,6 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex break waitForChanges } - col.dbStats().CBLReplicationPull().NumPullReplTotalCaughtUp.Add(1) col.dbStats().CBLReplicationPull().NumPullReplCaughtUp.Add(1) waitResponse := changeWaiter.Wait() col.dbStats().CBLReplicationPull().NumPullReplCaughtUp.Add(-1) @@ -1310,7 +1318,7 @@ func createChangesEntry(ctx context.Context, docid string, db *DatabaseCollectio func (options ChangesOptions) String() string { return fmt.Sprintf( - `{Since: %s, Limit: %d, Conflicts: %t, IncludeDocs: %t, Wait: %t, Continuous: %t, HeartbeatMs: %d, TimeoutMs: %d, ActiveOnly: %t}`, + `{Since: %s, Limit: %d, Conflicts: %t, IncludeDocs: %t, Wait: %t, Continuous: %t, HeartbeatMs: %d, TimeoutMs: %d, ActiveOnly: %t, RequestPlusSeq: %d}`, options.Since, options.Limit, options.Conflicts, @@ -1320,6 +1328,7 @@ func (options ChangesOptions) String() string { options.HeartbeatMs, options.TimeoutMs, options.ActiveOnly, + options.RequestPlusSeq, ) } diff --git a/db/database.go b/db/database.go index f7ded714b4..fcd4f21908 100644 --- a/db/database.go +++ b/db/database.go @@ -174,8 +174,8 @@ type DatabaseContextOptions struct { skipRegisterImportPIndex bool // if set, skips the global gocb PIndex registration MetadataStore base.DataStore // If set, use this location/connection for SG metadata storage - if not set, metadata is stored using the same location/connection as the bucket used for data storage. MetadataID string // MetadataID used for metadata storage - - BlipStatsReportingInterval int64 // interval to report blip stats in milliseconds + BlipStatsReportingInterval int64 // interval to report blip stats in milliseconds + ChangesRequestPlus bool // Sets the default value for request_plus, for non-continuous changes feeds } type ScopesOptions map[string]ScopeOptions @@ -2326,3 +2326,10 @@ func (dbc *DatabaseContext) AuthenticatorOptions() auth.AuthenticatorOptions { defaultOptions.MetaKeys = dbc.MetadataKeys return defaultOptions } + +// GetRequestPlusSequence fetches the current value of the sequence counter for the database. +// Uses getSequence (instead of lastSequence) as it's intended to be up to date with allocations +// across all nodes, while lastSequence is just the latest allocation from this node +func (dbc *DatabaseContext) GetRequestPlusSequence() (uint64, error) { + return dbc.sequences.getSequence() +} diff --git a/db/util_testing.go b/db/util_testing.go index 67433123c8..63f6814d08 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -95,6 +95,17 @@ func (db *DatabaseContext) WaitForCaughtUp(targetCount int64) error { return errors.New("WaitForCaughtUp didn't catch up") } +func (db *DatabaseContext) WaitForTotalCaughtUp(targetCount int64) error { + for i := 0; i < 100; i++ { + caughtUpCount := db.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Value() + if caughtUpCount >= targetCount { + return nil + } + time.Sleep(100 * time.Millisecond) + } + return errors.New("WaitForCaughtUp didn't catch up") +} + type StatWaiter struct { initCount int64 // Document cached count when NewStatWaiter is called targetCount int64 // Target count used when Wait is called @@ -598,3 +609,13 @@ func GetSingleDatabaseCollection(tb testing.TB, database *DatabaseContext) *Data tb.Fatalf("Could not find a collection") return nil } + +// AllocateTestSequence allocates a sequence via the sequenceAllocator. For use by non-db tests +func AllocateTestSequence(database *DatabaseContext) (uint64, error) { + return database.sequences.incrementSequence(1) +} + +// ReleaseTestSequence releases a sequence via the sequenceAllocator. For use by non-db tests +func ReleaseTestSequence(database *DatabaseContext, sequence uint64) error { + return database.sequences.releaseSequence(sequence) +} diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index 356b51074a..bae3c89a18 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -2652,3 +2652,122 @@ func TestUnsubChanges(t *testing.T) { _, found = btc.WaitForRev("doc2", resp.Rev) assert.True(t, found) } + +// TestRequestPlusPull tests that a one-shot pull replication waits for pending changes when request plus is set on the replication. +func TestRequestPlusPull(t *testing.T) { + + base.SetUpTestLogging(t, base.LevelInfo, base.KeyDCP, base.KeyChanges, base.KeyHTTP) + defer db.SuspendSequenceBatching()() // Required for slow sequence simulation + + rtConfig := RestTesterConfig{ + SyncFn: `function(doc) { + channel(doc.channel); + if (doc.accessUser != "") { + access(doc.accessUser, doc.accessChannel) + } + }`, + } + rt := NewRestTester(t, &rtConfig) + defer rt.Close() + database := rt.GetDatabase() + + // Initialize blip tester client (will create user) + client, err := NewBlipTesterClientOptsWithRT(t, rt, &BlipTesterClientOpts{ + Username: "bernard", + }) + require.NoError(t, err) + defer client.Close() + + // Put a doc in channel PBS + response := rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-1", `{"channel":["PBS"]}`) + RequireStatus(t, response, 201) + + // Allocate a sequence but do not write a doc for it - will block DCP buffering until sequence is skipped + slowSequence, seqErr := db.AllocateTestSequence(database) + require.NoError(t, seqErr) + + // Write a document granting user 'bernard' access to PBS + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/grantDoc", `{"accessUser":"bernard", "accessChannel":"PBS"}`) + RequireStatus(t, response, 201) + + caughtUpStart := database.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Value() + + // Start a regular one-shot pull + err = client.StartOneshotPullRequestPlus() + assert.NoError(t, err) + + // Wait for the one-shot changes feed to go into wait mode before releasing the slow sequence + require.NoError(t, database.WaitForTotalCaughtUp(caughtUpStart+1)) + + // Release the slow sequence + releaseErr := db.ReleaseTestSequence(database, slowSequence) + require.NoError(t, releaseErr) + + // The one-shot pull should unblock and replicate the document in the granted channel + data, ok := client.WaitForDoc("pbs-1") + assert.True(t, ok) + assert.Equal(t, `{"channel":["PBS"]}`, string(data)) + +} + +// TestRequestPlusPull tests that a one-shot pull replication waits for pending changes when request plus is set on the db config. +func TestRequestPlusPullDbConfig(t *testing.T) { + + base.SetUpTestLogging(t, base.LevelInfo, base.KeyDCP, base.KeyChanges, base.KeyHTTP) + defer db.SuspendSequenceBatching()() // Required for slow sequence simulation + + rtConfig := RestTesterConfig{ + SyncFn: `function(doc) { + channel(doc.channel); + if (doc.accessUser != "") { + access(doc.accessUser, doc.accessChannel) + } + }`, + DatabaseConfig: &DatabaseConfig{ + DbConfig: DbConfig{ + ChangesRequestPlus: base.BoolPtr(true), + }, + }, + } + rt := NewRestTester(t, &rtConfig) + defer rt.Close() + database := rt.GetDatabase() + + // Initialize blip tester client (will create user) + client, err := NewBlipTesterClientOptsWithRT(t, rt, &BlipTesterClientOpts{ + Username: "bernard", + }) + require.NoError(t, err) + defer client.Close() + + // Put a doc in channel PBS + response := rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-1", `{"channel":["PBS"]}`) + RequireStatus(t, response, 201) + + // Allocate a sequence but do not write a doc for it - will block DCP buffering until sequence is skipped + slowSequence, seqErr := db.AllocateTestSequence(database) + require.NoError(t, seqErr) + + // Write a document granting user 'bernard' access to PBS + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/grantDoc", `{"accessUser":"bernard", "accessChannel":"PBS"}`) + RequireStatus(t, response, 201) + + caughtUpStart := database.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Value() + + // Start a regular one-shot pull + err = client.StartOneshotPull() + assert.NoError(t, err) + + // Wait for the one-shot changes feed to go into wait mode before releasing the slow sequence + require.NoError(t, database.WaitForTotalCaughtUp(caughtUpStart+1)) + + // Release the slow sequence + releaseErr := db.ReleaseTestSequence(database, slowSequence) + require.NoError(t, releaseErr) + + // The one-shot pull should unblock and replicate the document in the granted channel + data, ok := client.WaitForDoc("pbs-1") + assert.True(t, ok) + assert.Equal(t, `{"channel":["PBS"]}`, string(data)) + +} diff --git a/rest/blip_client_test.go b/rest/blip_client_test.go index 13558aee7e..0b09af007b 100644 --- a/rest/blip_client_test.go +++ b/rest/blip_client_test.go @@ -632,19 +632,23 @@ func (btc *BlipTesterClient) Collection(collectionName string) *BlipTesterCollec // StartPull will begin a continuous pull replication since 0 between the client and server func (btcc *BlipTesterCollectionClient) StartPull() (err error) { - return btcc.StartPullSince("true", "0", "false", "") + return btcc.StartPullSince("true", "0", "false", "", "") } func (btcc *BlipTesterCollectionClient) StartOneshotPull() (err error) { - return btcc.StartPullSince("false", "0", "false", "") + return btcc.StartPullSince("false", "0", "false", "", "") } func (btcc *BlipTesterCollectionClient) StartOneshotPullFiltered(channels string) (err error) { - return btcc.StartPullSince("false", "0", "false", channels) + return btcc.StartPullSince("false", "0", "false", channels, "") +} + +func (btcc *BlipTesterCollectionClient) StartOneshotPullRequestPlus() (err error) { + return btcc.StartPullSince("false", "0", "false", "", "true") } // StartPullSince will begin a pull replication between the client and server with the given params. -func (btc *BlipTesterCollectionClient) StartPullSince(continuous, since, activeOnly string, channels string) (err error) { +func (btc *BlipTesterCollectionClient) StartPullSince(continuous, since, activeOnly, channels, requestPlus string) (err error) { subChangesRequest := blip.NewRequest() subChangesRequest.SetProfile(db.MessageSubChanges) subChangesRequest.Properties[db.SubChangesContinuous] = continuous @@ -654,6 +658,9 @@ func (btc *BlipTesterCollectionClient) StartPullSince(continuous, since, activeO subChangesRequest.Properties[db.SubChangesFilter] = base.ByChannelFilter subChangesRequest.Properties[db.SubChangesChannels] = channels } + if requestPlus != "" { + subChangesRequest.Properties[db.SubChangesRequestPlus] = requestPlus + } subChangesRequest.SetNoReply(true) if btc.parent.BlipTesterClientOpts.SendRevocations { @@ -923,6 +930,9 @@ func (btc *BlipTesterCollectionClient) GetRev(docID, revID string) (data []byte, // WaitForRev blocks until the given doc ID and rev ID have been stored by the client, and returns the data when found. func (btc *BlipTesterCollectionClient) WaitForRev(docID, revID string) (data []byte, found bool) { + if data, found := btc.GetRev(docID, revID); found { + return data, found + } ticker := time.NewTicker(50 * time.Millisecond) timeout := time.After(10 * time.Second) for { @@ -938,6 +948,41 @@ func (btc *BlipTesterCollectionClient) WaitForRev(docID, revID string) (data []b } } +// GetDoc returns a rev stored in the Client under the given docID. (if multiple revs are present, rev body returned is non-deterministic) +func (btc *BlipTesterCollectionClient) GetDoc(docID string) (data []byte, found bool) { + btc.docsLock.RLock() + defer btc.docsLock.RUnlock() + + if rev, ok := btc.docs[docID]; ok { + for _, data := range rev { + return data.body, true + } + } + + return nil, false +} + +// WaitForDoc blocks until the given doc ID has been stored by the client, and returns the data when found. +func (btc *BlipTesterCollectionClient) WaitForDoc(docID string) (data []byte, found bool) { + + if data, found := btc.GetDoc(docID); found { + return data, found + } + ticker := time.NewTicker(50 * time.Millisecond) + timeout := time.After(10 * time.Second) + for { + select { + case <-timeout: + btc.parent.rt.TB.Fatalf("BlipTesterClient timed out waiting for doc ID: %v", docID) + return nil, false + case <-ticker.C: + if data, found := btc.GetDoc(docID); found { + return data, found + } + } + } +} + // GetMessage returns the message stored in the Client under the given serial number func (btr *BlipTesterReplicator) GetMessage(serialNumber blip.MessageNumber) (msg *blip.Message, found bool) { btr.messagesLock.RLock() @@ -1026,6 +1071,10 @@ func (btc *BlipTesterClient) WaitForRev(docID string, revID string) ([]byte, boo return btc.SingleCollection().WaitForRev(docID, revID) } +func (btc *BlipTesterClient) WaitForDoc(docID string) ([]byte, bool) { + return btc.SingleCollection().WaitForDoc(docID) +} + func (btc *BlipTesterClient) WaitForBlipRevMessage(docID string, revID string) (*blip.Message, bool) { return btc.SingleCollection().WaitForBlipRevMessage(docID, revID) } @@ -1038,16 +1087,20 @@ func (btc *BlipTesterClient) StartOneshotPullFiltered(channels string) error { return btc.SingleCollection().StartOneshotPullFiltered(channels) } +func (btc *BlipTesterClient) StartOneshotPullRequestPlus() error { + return btc.SingleCollection().StartOneshotPullRequestPlus() +} + func (btc *BlipTesterClient) PushRev(docID string, revID string, body []byte) (string, error) { return btc.SingleCollection().PushRev(docID, revID, body) } func (btc *BlipTesterClient) StartPullSince(continuous, since, activeOnly string) error { - return btc.SingleCollection().StartPullSince(continuous, since, activeOnly, "") + return btc.SingleCollection().StartPullSince(continuous, since, activeOnly, "", "") } func (btc *BlipTesterClient) StartFilteredPullSince(continuous, since, activeOnly string, channels string) error { - return btc.SingleCollection().StartPullSince(continuous, since, activeOnly, channels) + return btc.SingleCollection().StartPullSince(continuous, since, activeOnly, channels, "") } func (btc *BlipTesterClient) GetRev(docID, revID string) ([]byte, bool) { diff --git a/rest/changes_api.go b/rest/changes_api.go index 408ca401b9..b1f39173f3 100644 --- a/rest/changes_api.go +++ b/rest/changes_api.go @@ -37,6 +37,12 @@ const kDefaultTimeoutMS = 5 * 60 * 1000 // Maximum value of _changes?timeout property const kMaxTimeoutMS = 15 * 60 * 1000 +// Values for feed parameter on changes request +const feedTypeContinuous = "continuous" +const feedTypeLongpoll = "longpoll" +const feedTypeNormal = "normal" +const feedTypeWebsocket = "websocket" + func (h *handler) handleRevsDiff() error { var input map[string][]string err := h.readJSONInto(&input) @@ -180,6 +186,16 @@ func (h *handler) handleChanges() error { options.ActiveOnly = h.getBoolQuery("active_only") options.IncludeDocs = h.getBoolQuery("include_docs") options.Revocations = h.getBoolQuery("revocations") + + useRequestPlus, _ := h.getOptBoolQuery("request_plus", h.db.Options.ChangesRequestPlus) + if useRequestPlus && feed != feedTypeContinuous { + var seqErr error + options.RequestPlusSeq, seqErr = h.db.GetRequestPlusSequence() + if seqErr != nil { + return base.HTTPErrorf(http.StatusServiceUnavailable, "Unable to retrieve requestPlus sequence") + } + + } filter = h.getQuery("filter") channelsParam := h.getQuery("channels") if channelsParam != "" { @@ -312,18 +328,18 @@ func (h *handler) handleChanges() error { var err error switch feed { - case "normal": + case feedTypeNormal: if filter == "_doc_ids" { err, forceClose = h.sendSimpleChanges(userChannels, options, docIdsArray) } else { err, forceClose = h.sendSimpleChanges(userChannels, options, nil) } - case "longpoll": + case feedTypeLongpoll: options.Wait = true err, forceClose = h.sendSimpleChanges(userChannels, options, nil) - case "continuous": + case feedTypeContinuous: err, forceClose = h.sendContinuousChangesByHTTP(userChannels, options) - case "websocket": + case feedTypeWebsocket: err, forceClose = h.sendContinuousChangesByWebSocket(userChannels, options) default: err = base.HTTPErrorf(http.StatusBadRequest, "Unknown feed type") @@ -454,7 +470,7 @@ func (h *handler) generateContinuousChanges(inChannels base.Set, options db.Chan options.Continuous = true err, forceClose := db.GenerateChanges(h.ctx(), h.rq.Context(), h.collection, inChannels, options, nil, send) if sendErr, ok := err.(*db.ChangesSendErr); ok { - h.logStatus(http.StatusOK, fmt.Sprintf("0Write error: %v", sendErr)) + h.logStatus(http.StatusOK, fmt.Sprintf("Write error: %v", sendErr)) return nil, forceClose // error is probably because the client closed the connection } else { h.logStatus(http.StatusOK, "OK (continuous feed closed)") @@ -580,7 +596,8 @@ func (h *handler) readChangesOptionsFromJSON(jsonData []byte) (feed string, opti HeartbeatMs *uint64 `json:"heartbeat"` TimeoutMs *uint64 `json:"timeout"` AcceptEncoding string `json:"accept_encoding"` - ActiveOnly bool `json:"active_only"` // Return active revisions only + ActiveOnly bool `json:"active_only"` // Return active revisions only + RequestPlus *bool `json:"request_plus"` // Wait for sequence buffering to catch up to database seq value at time request was issued } // Initialize since clock and hasher ahead of unmarshalling sequence @@ -624,6 +641,20 @@ func (h *handler) readChangesOptionsFromJSON(jsonData []byte) (feed string, opti compress = (input.AcceptEncoding == "gzip") + if h.db != nil && feed != feedTypeContinuous { + useRequestPlus := h.db.Options.ChangesRequestPlus + if input.RequestPlus != nil { + useRequestPlus = *input.RequestPlus + } + if useRequestPlus { + var seqErr error + options.RequestPlusSeq, seqErr = h.db.GetRequestPlusSequence() + if seqErr != nil { + err = base.HTTPErrorf(http.StatusServiceUnavailable, "Unable to retrieve requestPlus sequence: %v", seqErr) + return + } + } + } return } diff --git a/rest/changestest/changes_api_test.go b/rest/changestest/changes_api_test.go index 0c6472e7af..3d46f8400d 100644 --- a/rest/changestest/changes_api_test.go +++ b/rest/changestest/changes_api_test.go @@ -3935,6 +3935,297 @@ func TestTombstoneCompaction(t *testing.T) { TestCompact(db.QueryTombstoneBatch + 20) } +// TestOneShotGrantTiming simulates a one-shot changes feed returning before a previously issued grant has been +// buffered over DCP. +func TestOneShotGrantTiming(t *testing.T) { + + base.SetUpTestLogging(t, base.LevelDebug, base.KeyChanges, base.KeyHTTP) + + defer db.SuspendSequenceBatching()() + + rt := rest.NewRestTester(t, + &rest.RestTesterConfig{ + SyncFn: `function(doc) { + channel(doc.channel); + if (doc.accessUser != "") { + access(doc.accessUser, doc.accessChannel) + } + }`, + }) + defer rt.Close() + + // Create user with access to no channels + ctx := rt.Context() + database := rt.GetDatabase() + a := database.Authenticator(ctx) + bernard, err := a.NewUser("bernard", "letmein", nil) + assert.NoError(t, err) + assert.NoError(t, a.Save(bernard)) + + // Put several documents in channel PBS + response := rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-1", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-2", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-3", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-4", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + + var changes struct { + Results []db.ChangeEntry + Last_Seq interface{} + } + + // Allocate a sequence but do not write a doc for it - will block DCP buffering until sequence is skipped + slowSequence, seqErr := db.AllocateTestSequence(database) + require.NoError(t, seqErr) + log.Printf("Allocated slowSequence: %v", slowSequence) + + // Write a document granting user access to PBS + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/grantDoc", `{"accessUser":"bernard", "accessChannel":"PBS"}`) + rest.RequireStatus(t, response, 201) + + // Issue normal one-shot changes request. Expect no results as granting document hasn't been buffered (blocked by + // slowSequence) + changesResponse := rt.SendUserRequest("GET", "/{{.keyspace}}/_changes", "", "bernard") + rest.RequireStatus(t, changesResponse, 200) + err = base.JSONUnmarshal(changesResponse.Body.Bytes(), &changes) + assert.NoError(t, err, "Error unmarshalling changes response") + for _, entry := range changes.Results { + log.Printf("Entry:%+v", entry) + } + require.Len(t, changes.Results, 0) + + // Release the slow sequence and wait for it to be processed over DCP + releaseErr := db.ReleaseTestSequence(database, slowSequence) + require.NoError(t, releaseErr) + rt.WaitForPendingChanges() + + // Issue normal one-shot changes request. Expect results as granting document buffering is unblocked + changesResponse = rt.SendUserRequest("GET", "/{{.keyspace}}/_changes", "", "bernard") + rest.RequireStatus(t, changesResponse, 200) + err = base.JSONUnmarshal(changesResponse.Body.Bytes(), &changes) + assert.NoError(t, err, "Error unmarshalling changes response") + for _, entry := range changes.Results { + log.Printf("Entry:%+v", entry) + } + require.Len(t, changes.Results, 4) + +} + +// TestOneShotGrantRequestPlus simulates a one-shot changes feed being made before a previously issued grant has been +// buffered over DCP. When requestPlus is set, changes feed should block until grant is processed. +func TestOneShotGrantRequestPlus(t *testing.T) { + + base.SetUpTestLogging(t, base.LevelDebug, base.KeyChanges, base.KeyHTTP) + + defer db.SuspendSequenceBatching()() // Required for slow sequence simulation + + rt := rest.NewRestTester(t, + &rest.RestTesterConfig{ + SyncFn: `function(doc) { + channel(doc.channel); + if (doc.accessUser != "") { + access(doc.accessUser, doc.accessChannel) + } + }`, + }) + defer rt.Close() + + // Create user with access to no channels + ctx := rt.Context() + database := rt.GetDatabase() + a := database.Authenticator(ctx) + bernard, err := a.NewUser("bernard", "letmein", nil) + assert.NoError(t, err) + assert.NoError(t, a.Save(bernard)) + + // Put several documents in channel PBS + response := rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-1", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-2", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-3", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-4", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + + var changes struct { + Results []db.ChangeEntry + Last_Seq interface{} + } + + // Allocate a sequence but do not write a doc for it - will block DCP buffering until sequence is skipped + slowSequence, seqErr := db.AllocateTestSequence(database) + require.NoError(t, seqErr) + + // Write a document granting user access to PBS + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/grantDoc", `{"accessUser":"bernard", "accessChannel":"PBS"}`) + rest.RequireStatus(t, response, 201) + + caughtUpStart := database.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Value() + + var oneShotComplete sync.WaitGroup + // Issue a GET requestPlus one-shot changes request in a separate goroutine. + oneShotComplete.Add(1) + go func() { + defer oneShotComplete.Done() + changesResponse := rt.SendUserRequest("GET", "/{{.keyspace}}/_changes?request_plus=true", "", "bernard") + rest.RequireStatus(t, changesResponse, 200) + err = base.JSONUnmarshal(changesResponse.Body.Bytes(), &changes) + assert.NoError(t, err, "Error unmarshalling changes response") + for _, entry := range changes.Results { + log.Printf("Entry:%+v", entry) + } + require.Len(t, changes.Results, 4) + }() + + // Issue a POST requestPlus one-shot changes request in a separate goroutine. + oneShotComplete.Add(1) + go func() { + defer oneShotComplete.Done() + changesResponse := rt.SendUserRequest("POST", "/{{.keyspace}}/_changes", `{"request_plus":true}`, "bernard") + rest.RequireStatus(t, changesResponse, 200) + err = base.JSONUnmarshal(changesResponse.Body.Bytes(), &changes) + assert.NoError(t, err, "Error unmarshalling changes response") + for _, entry := range changes.Results { + log.Printf("Entry:%+v", entry) + } + require.Len(t, changes.Results, 4) + }() + + // Wait for the one-shot changes feed to go into wait mode before releasing the slow sequence + require.NoError(t, database.WaitForTotalCaughtUp(caughtUpStart+2)) + + // Release the slow sequence and wait for it to be processed over DCP + releaseErr := db.ReleaseTestSequence(database, slowSequence) + require.NoError(t, releaseErr) + rt.WaitForPendingChanges() + + oneShotComplete.Wait() +} + +// TestOneShotGrantRequestPlusDbConfig simulates a one-shot changes feed being made before a previously issued grant has been +// buffered over DCP. When requestPlus is set via config, changes feed should block until grant is processed. +func TestOneShotGrantRequestPlusDbConfig(t *testing.T) { + + base.SetUpTestLogging(t, base.LevelDebug, base.KeyChanges, base.KeyHTTP) + + defer db.SuspendSequenceBatching()() + + rt := rest.NewRestTester(t, + &rest.RestTesterConfig{ + SyncFn: `function(doc) { + channel(doc.channel); + if (doc.accessUser != "") { + access(doc.accessUser, doc.accessChannel) + } + }`, + DatabaseConfig: &rest.DatabaseConfig{ + DbConfig: rest.DbConfig{ + ChangesRequestPlus: base.BoolPtr(true), + }, + }, + }) + defer rt.Close() + + // Create user with access to no channels + ctx := rt.Context() + database := rt.GetDatabase() + a := database.Authenticator(ctx) + bernard, err := a.NewUser("bernard", "letmein", nil) + assert.NoError(t, err) + assert.NoError(t, a.Save(bernard)) + + // Put several documents in channel PBS + response := rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-1", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-2", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-3", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-4", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + + var changes struct { + Results []db.ChangeEntry + Last_Seq interface{} + } + + // Allocate a sequence but do not write a doc for it - will block DCP buffering until sequence is skipped + slowSequence, seqErr := db.AllocateTestSequence(database) + require.NoError(t, seqErr) + log.Printf("Allocated slowSequence: %v", slowSequence) + + // Write a document granting user access to PBS + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/grantDoc", `{"accessUser":"bernard", "accessChannel":"PBS"}`) + rest.RequireStatus(t, response, 201) + + // Issue one-shot GET changes request explicitly setting request_plus=false (should override config value). + // Expect no results as granting document hasn't been buffered (blocked by slowSequence) + changesResponse := rt.SendUserRequest("GET", "/{{.keyspace}}/_changes?request_plus=false", "", "bernard") + rest.RequireStatus(t, changesResponse, 200) + err = base.JSONUnmarshal(changesResponse.Body.Bytes(), &changes) + assert.NoError(t, err, "Error unmarshalling changes response") + for _, entry := range changes.Results { + log.Printf("Entry:%+v", entry) + } + require.Len(t, changes.Results, 0) + + // Issue one-shot POST changes request explicitly setting request_plus=false (should override config value). + // Expect no results as granting document hasn't been buffered (blocked by slowSequence) + changesResponse = rt.SendUserRequest("POST", "/{{.keyspace}}/_changes", `{"request_plus":false}`, "bernard") + rest.RequireStatus(t, changesResponse, 200) + err = base.JSONUnmarshal(changesResponse.Body.Bytes(), &changes) + assert.NoError(t, err, "Error unmarshalling changes response") + for _, entry := range changes.Results { + log.Printf("Entry:%+v", entry) + } + require.Len(t, changes.Results, 0) + + caughtUpStart := database.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Value() + + var oneShotComplete sync.WaitGroup + // Issue a GET one-shot changes request in a separate goroutine. Should run as request plus based on config + oneShotComplete.Add(1) + go func() { + defer oneShotComplete.Done() + changesResponse := rt.SendUserRequest("GET", "/{{.keyspace}}/_changes", "", "bernard") + rest.RequireStatus(t, changesResponse, 200) + err = base.JSONUnmarshal(changesResponse.Body.Bytes(), &changes) + assert.NoError(t, err, "Error unmarshalling changes response") + for _, entry := range changes.Results { + log.Printf("Entry:%+v", entry) + } + require.Len(t, changes.Results, 4) + }() + + // Issue a POST one-shot changes request in a separate goroutine. Should run as request plus based on config + oneShotComplete.Add(1) + go func() { + defer oneShotComplete.Done() + changesResponse := rt.SendUserRequest("POST", "/{{.keyspace}}/_changes", `{}`, "bernard") + rest.RequireStatus(t, changesResponse, 200) + err = base.JSONUnmarshal(changesResponse.Body.Bytes(), &changes) + assert.NoError(t, err, "Error unmarshalling changes response") + for _, entry := range changes.Results { + log.Printf("Entry:%+v", entry) + } + require.Len(t, changes.Results, 4) + }() + + // Wait for the one-shot changes feed to go into wait mode before releasing the slow sequence + require.NoError(t, database.WaitForTotalCaughtUp(caughtUpStart+2)) + + // Release the slow sequence and wait for it to be processed over DCP + releaseErr := db.ReleaseTestSequence(database, slowSequence) + require.NoError(t, releaseErr) + rt.WaitForPendingChanges() + + oneShotComplete.Wait() +} + func waitForCompactStopped(dbc *db.DatabaseContext) error { for i := 0; i < 100; i++ { compactRunning := dbc.CacheCompactActive() diff --git a/rest/config.go b/rest/config.go index 43b6d6a9f7..848bee10aa 100644 --- a/rest/config.go +++ b/rest/config.go @@ -165,6 +165,7 @@ type DbConfig struct { GraphQL *functions.GraphQLConfig `json:"graphql,omitempty"` // GraphQL configuration & resolver fns UserFunctions *functions.FunctionsConfig `json:"functions,omitempty"` // Named JS fns for clients to call Suspendable *bool `json:"suspendable,omitempty"` // Allow the database to be suspended + ChangesRequestPlus *bool `json:"changes_request_plus,omitempty"` // If set, is used as the default value of request_plus for non-continuous replications CORS *auth.CORSConfig `json:"cors,omitempty"` } diff --git a/rest/server_context.go b/rest/server_context.go index 2b10ccd4f5..2cb270627e 100644 --- a/rest/server_context.go +++ b/rest/server_context.go @@ -1050,6 +1050,7 @@ func dbcOptionsFromConfig(ctx context.Context, sc *ServerContext, config *DbConf GroupID: groupID, JavascriptTimeout: javascriptTimeout, Serverless: sc.Config.IsServerless(), + ChangesRequestPlus: base.BoolDefault(config.ChangesRequestPlus, false), // UserQueries: config.UserQueries, // behind feature flag (see below) // UserFunctions: config.UserFunctions, // behind feature flag (see below) // GraphQL: config.GraphQL, // behind feature flag (see below)