diff --git a/db/blip_handler.go b/db/blip_handler.go index 109bbeb871..90c35d9840 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -295,6 +295,19 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error { continuous := subChangesParams.continuous() + requestPlusSeq := uint64(0) + // If non-continuous, check whether requestPlus handling is set for request or via database config + if continuous == false { + useRequestPlus := subChangesParams.requestPlus(bh.db.Options.ChangesRequestPlus) + if useRequestPlus { + seq, requestPlusErr := bh.db.GetRequestPlusSequence() + if requestPlusErr != nil { + return base.HTTPErrorf(http.StatusServiceUnavailable, "Unable to retrieve current sequence for requestPlus=true: %v", requestPlusErr) + } + requestPlusSeq = seq + } + } + // Start asynchronous changes goroutine go func() { // Pull replication stats by type @@ -325,6 +338,7 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error { clientType: clientType, ignoreNoConflicts: clientType == clientTypeSGR2, // force this side to accept a "changes" message, even in no conflicts mode for SGR2. changesCtx: collectionCtx.changesCtx, + requestPlusSeq: requestPlusSeq, }) base.DebugfCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s --> Time:%v", bh.serialNumber, rq.Profile(), time.Since(startTime)) }() @@ -358,6 +372,7 @@ type sendChangesOptions struct { revocations bool ignoreNoConflicts bool changesCtx context.Context + requestPlusSeq uint64 } type changesDeletedFlag uint @@ -385,14 +400,15 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions base.InfofCtx(bh.loggingCtx, base.KeySync, "Sending changes since %v", opts.since) options := ChangesOptions{ - Since: opts.since, - Conflicts: false, // CBL 2.0/BLIP don't support branched rev trees (LiteCore #437) - Continuous: opts.continuous, - ActiveOnly: opts.activeOnly, - Revocations: opts.revocations, - LoggingCtx: bh.loggingCtx, - clientType: opts.clientType, - ChangesCtx: opts.changesCtx, + Since: opts.since, + Conflicts: false, // CBL 2.0/BLIP don't support branched rev trees (LiteCore #437) + Continuous: opts.continuous, + ActiveOnly: opts.activeOnly, + Revocations: opts.revocations, + LoggingCtx: bh.loggingCtx, + clientType: opts.clientType, + ChangesCtx: opts.changesCtx, + RequestPlusSeq: opts.requestPlusSeq, } channelSet := opts.channels diff --git a/db/blip_sync_messages.go b/db/blip_sync_messages.go index 381591be0b..1e2a161e73 100644 --- a/db/blip_sync_messages.go +++ b/db/blip_sync_messages.go @@ -68,6 +68,8 @@ const ( SubChangesContinuous = "continuous" SubChangesBatch = "batch" SubChangesRevocations = "revocations" + SubChangesRequestPlus = "requestPlus" + SubChangesFuture = "future" // rev message properties RevMessageID = "id" @@ -163,7 +165,7 @@ func NewSubChangesParams(logCtx context.Context, rq *blip.Message, zeroSeq Seque // Determine incoming since and docIDs once, since there is some overhead associated with their calculation sinceSequenceId := zeroSeq var err error - if rq.Properties["future"] == trueProperty { + if rq.Properties[SubChangesFuture] == trueProperty { sinceSequenceId, err = latestSeq() } else if sinceStr, found := rq.Properties[SubChangesSince]; found { if sinceSequenceId, err = sequenceIDParser(sinceStr); err != nil { @@ -234,6 +236,14 @@ func (s *SubChangesParams) activeOnly() bool { return (s.rq.Properties[SubChangesActiveOnly] == trueProperty) } +func (s *SubChangesParams) requestPlus(defaultValue bool) (value bool) { + propertyValue, isDefined := s.rq.Properties[SubChangesRequestPlus] + if !isDefined { + return defaultValue + } + return propertyValue == trueProperty +} + func (s *SubChangesParams) filter() string { return s.rq.Properties[SubChangesFilter] } diff --git a/db/changes.go b/db/changes.go index d430d51cda..53b1df1cc3 100644 --- a/db/changes.go +++ b/db/changes.go @@ -26,19 +26,20 @@ import ( // Options for changes-feeds. ChangesOptions must not contain any mutable pointer references, as // changes processing currently assumes a deep copy when doing chanOpts := changesOptions. type ChangesOptions struct { - Since SequenceID // sequence # to start _after_ - Limit int // Max number of changes to return, if nonzero - Conflicts bool // Show all conflicting revision IDs, not just winning one? - IncludeDocs bool // Include doc body of each change? - Wait bool // Wait for results, instead of immediately returning empty result? - Continuous bool // Run continuously until terminated? - HeartbeatMs uint64 // How often to send a heartbeat to the client - TimeoutMs uint64 // After this amount of time, close the longpoll connection - ActiveOnly bool // If true, only return information on non-deleted, non-removed revisions - Revocations bool // Specifies whether revocation messages should be sent on the changes feed - clientType clientType // Can be used to determine if the replication is being started from a CBL 2.x or SGR2 client - LoggingCtx context.Context // Used for adding context to logs - ChangesCtx context.Context // Used for cancelling checking the changes feed should stop + Since SequenceID // sequence # to start _after_ + Limit int // Max number of changes to return, if nonzero + Conflicts bool // Show all conflicting revision IDs, not just winning one? + IncludeDocs bool // Include doc body of each change? + Wait bool // Wait for results, instead of immediately returning empty result? + Continuous bool // Run continuously until terminated? + RequestPlusSeq uint64 // Do not stop changes before cached sequence catches up with requestPlusSeq + HeartbeatMs uint64 // How often to send a heartbeat to the client + TimeoutMs uint64 // After this amount of time, close the longpoll connection + ActiveOnly bool // If true, only return information on non-deleted, non-removed revisions + Revocations bool // Specifies whether revocation messages should be sent on the changes feed + clientType clientType // Can be used to determine if the replication is being started from a CBL 2.x or SGR2 client + LoggingCtx context.Context // Used for adding context to logs + ChangesCtx context.Context // Used for cancelling checking the changes feed should stop } // A changes entry; Database.GetChanges returns an array of these. @@ -629,8 +630,9 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex var changeWaiter *ChangeWaiter var lowSequence uint64 - var currentCachedSequence uint64 + var currentCachedSequence uint64 // The highest contiguous sequence buffered over the caching feed var lateSequenceFeeds map[channels.ID]*lateSequenceFeed + var useLateSequenceFeeds bool // LateSequence feeds are only used for continuous, or one-shot where options.RequestPlusSeq > currentCachedSequence var userCounter uint64 // Wait counter used to identify changes to the user document var changedChannels channels.ChangedKeys // Tracks channels added/removed to the user during changes processing. var userChanged bool // Whether the user document has changed in a given iteration loop @@ -638,9 +640,9 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex // Retrieve the current max cached sequence - ensures there isn't a race between the subsequent channel cache queries currentCachedSequence = col.changeCache().getChannelCache().GetHighCacheSequence() - if options.Wait { - options.Wait = false + // If changes feed requires more than one ChangesLoop iteration, initialize changeWaiter + if options.Wait || options.RequestPlusSeq > currentCachedSequence { changeWaiter = col.startChangeWaiter() // Waiter is updated with the actual channel set (post-user reload) at the start of the outer changes loop userCounter = changeWaiter.CurrentUserCount() // Reload user to pick up user changes that happened between auth and the change waiter @@ -676,7 +678,8 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex // For a continuous feed, initialise the lateSequenceFeeds that track late-arriving sequences // to the channel caches. - if options.Continuous { + if options.Continuous || options.RequestPlusSeq > currentCachedSequence { + useLateSequenceFeeds = true lateSequenceFeeds = make(map[channels.ID]*lateSequenceFeed) defer col.closeLateFeeds(lateSequenceFeeds) } @@ -741,7 +744,7 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex // Handles previously skipped sequences prior to options.Since that // have arrived in the channel cache since this changes request started. Only needed for // continuous feeds - one-off changes requests only require the standard channel cache. - if options.Continuous { + if useLateSequenceFeeds { lateSequenceFeedHandler := lateSequenceFeeds[chanID] if lateSequenceFeedHandler != nil { latefeed, err := col.getLateFeed(lateSequenceFeedHandler, singleChannelCache) @@ -957,14 +960,19 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex } } } - if !options.Continuous && (sentSomething || changeWaiter == nil) { - break + + // Check whether non-continuous changes feeds that aren't waiting to reach requestPlus sequence can exit + if !options.Continuous && currentCachedSequence >= options.RequestPlusSeq { + // If non-longpoll, or longpoll has sent something, can exit + if !options.Wait || sentSomething { + break + } } // For longpoll requests that didn't send any results, reset low sequence to the original since value, // as the system low sequence may change before the longpoll request wakes up, and longpoll feeds don't // use lateSequenceFeeds. - if !options.Continuous { + if !useLateSequenceFeeds { options.Since.LowSeq = requestLowSeq } @@ -981,6 +989,7 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex waitForChanges: for { + col.dbStats().CBLReplicationPull().NumPullReplTotalCaughtUp.Add(1) // If we're in a deferred Backfill, the user may not get notification when the cache catches up to the backfill (e.g. when the granting doc isn't // visible to the user), and so ChangeWaiter.Wait() would block until the next user-visible doc arrives. Use a hardcoded wait instead // Similar handling for when we see sequences later than the stable sequence. @@ -992,7 +1001,6 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex break waitForChanges } - col.dbStats().CBLReplicationPull().NumPullReplTotalCaughtUp.Add(1) col.dbStats().CBLReplicationPull().NumPullReplCaughtUp.Add(1) waitResponse := changeWaiter.Wait() col.dbStats().CBLReplicationPull().NumPullReplCaughtUp.Add(-1) @@ -1310,7 +1318,7 @@ func createChangesEntry(ctx context.Context, docid string, db *DatabaseCollectio func (options ChangesOptions) String() string { return fmt.Sprintf( - `{Since: %s, Limit: %d, Conflicts: %t, IncludeDocs: %t, Wait: %t, Continuous: %t, HeartbeatMs: %d, TimeoutMs: %d, ActiveOnly: %t}`, + `{Since: %s, Limit: %d, Conflicts: %t, IncludeDocs: %t, Wait: %t, Continuous: %t, HeartbeatMs: %d, TimeoutMs: %d, ActiveOnly: %t, RequestPlusSeq: %d}`, options.Since, options.Limit, options.Conflicts, @@ -1320,6 +1328,7 @@ func (options ChangesOptions) String() string { options.HeartbeatMs, options.TimeoutMs, options.ActiveOnly, + options.RequestPlusSeq, ) } diff --git a/db/database.go b/db/database.go index f7ded714b4..fcd4f21908 100644 --- a/db/database.go +++ b/db/database.go @@ -174,8 +174,8 @@ type DatabaseContextOptions struct { skipRegisterImportPIndex bool // if set, skips the global gocb PIndex registration MetadataStore base.DataStore // If set, use this location/connection for SG metadata storage - if not set, metadata is stored using the same location/connection as the bucket used for data storage. MetadataID string // MetadataID used for metadata storage - - BlipStatsReportingInterval int64 // interval to report blip stats in milliseconds + BlipStatsReportingInterval int64 // interval to report blip stats in milliseconds + ChangesRequestPlus bool // Sets the default value for request_plus, for non-continuous changes feeds } type ScopesOptions map[string]ScopeOptions @@ -2326,3 +2326,10 @@ func (dbc *DatabaseContext) AuthenticatorOptions() auth.AuthenticatorOptions { defaultOptions.MetaKeys = dbc.MetadataKeys return defaultOptions } + +// GetRequestPlusSequence fetches the current value of the sequence counter for the database. +// Uses getSequence (instead of lastSequence) as it's intended to be up to date with allocations +// across all nodes, while lastSequence is just the latest allocation from this node +func (dbc *DatabaseContext) GetRequestPlusSequence() (uint64, error) { + return dbc.sequences.getSequence() +} diff --git a/db/util_testing.go b/db/util_testing.go index 67433123c8..63f6814d08 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -95,6 +95,17 @@ func (db *DatabaseContext) WaitForCaughtUp(targetCount int64) error { return errors.New("WaitForCaughtUp didn't catch up") } +func (db *DatabaseContext) WaitForTotalCaughtUp(targetCount int64) error { + for i := 0; i < 100; i++ { + caughtUpCount := db.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Value() + if caughtUpCount >= targetCount { + return nil + } + time.Sleep(100 * time.Millisecond) + } + return errors.New("WaitForCaughtUp didn't catch up") +} + type StatWaiter struct { initCount int64 // Document cached count when NewStatWaiter is called targetCount int64 // Target count used when Wait is called @@ -598,3 +609,13 @@ func GetSingleDatabaseCollection(tb testing.TB, database *DatabaseContext) *Data tb.Fatalf("Could not find a collection") return nil } + +// AllocateTestSequence allocates a sequence via the sequenceAllocator. For use by non-db tests +func AllocateTestSequence(database *DatabaseContext) (uint64, error) { + return database.sequences.incrementSequence(1) +} + +// ReleaseTestSequence releases a sequence via the sequenceAllocator. For use by non-db tests +func ReleaseTestSequence(database *DatabaseContext, sequence uint64) error { + return database.sequences.releaseSequence(sequence) +} diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index 356b51074a..bae3c89a18 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -2652,3 +2652,122 @@ func TestUnsubChanges(t *testing.T) { _, found = btc.WaitForRev("doc2", resp.Rev) assert.True(t, found) } + +// TestRequestPlusPull tests that a one-shot pull replication waits for pending changes when request plus is set on the replication. +func TestRequestPlusPull(t *testing.T) { + + base.SetUpTestLogging(t, base.LevelInfo, base.KeyDCP, base.KeyChanges, base.KeyHTTP) + defer db.SuspendSequenceBatching()() // Required for slow sequence simulation + + rtConfig := RestTesterConfig{ + SyncFn: `function(doc) { + channel(doc.channel); + if (doc.accessUser != "") { + access(doc.accessUser, doc.accessChannel) + } + }`, + } + rt := NewRestTester(t, &rtConfig) + defer rt.Close() + database := rt.GetDatabase() + + // Initialize blip tester client (will create user) + client, err := NewBlipTesterClientOptsWithRT(t, rt, &BlipTesterClientOpts{ + Username: "bernard", + }) + require.NoError(t, err) + defer client.Close() + + // Put a doc in channel PBS + response := rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-1", `{"channel":["PBS"]}`) + RequireStatus(t, response, 201) + + // Allocate a sequence but do not write a doc for it - will block DCP buffering until sequence is skipped + slowSequence, seqErr := db.AllocateTestSequence(database) + require.NoError(t, seqErr) + + // Write a document granting user 'bernard' access to PBS + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/grantDoc", `{"accessUser":"bernard", "accessChannel":"PBS"}`) + RequireStatus(t, response, 201) + + caughtUpStart := database.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Value() + + // Start a regular one-shot pull + err = client.StartOneshotPullRequestPlus() + assert.NoError(t, err) + + // Wait for the one-shot changes feed to go into wait mode before releasing the slow sequence + require.NoError(t, database.WaitForTotalCaughtUp(caughtUpStart+1)) + + // Release the slow sequence + releaseErr := db.ReleaseTestSequence(database, slowSequence) + require.NoError(t, releaseErr) + + // The one-shot pull should unblock and replicate the document in the granted channel + data, ok := client.WaitForDoc("pbs-1") + assert.True(t, ok) + assert.Equal(t, `{"channel":["PBS"]}`, string(data)) + +} + +// TestRequestPlusPull tests that a one-shot pull replication waits for pending changes when request plus is set on the db config. +func TestRequestPlusPullDbConfig(t *testing.T) { + + base.SetUpTestLogging(t, base.LevelInfo, base.KeyDCP, base.KeyChanges, base.KeyHTTP) + defer db.SuspendSequenceBatching()() // Required for slow sequence simulation + + rtConfig := RestTesterConfig{ + SyncFn: `function(doc) { + channel(doc.channel); + if (doc.accessUser != "") { + access(doc.accessUser, doc.accessChannel) + } + }`, + DatabaseConfig: &DatabaseConfig{ + DbConfig: DbConfig{ + ChangesRequestPlus: base.BoolPtr(true), + }, + }, + } + rt := NewRestTester(t, &rtConfig) + defer rt.Close() + database := rt.GetDatabase() + + // Initialize blip tester client (will create user) + client, err := NewBlipTesterClientOptsWithRT(t, rt, &BlipTesterClientOpts{ + Username: "bernard", + }) + require.NoError(t, err) + defer client.Close() + + // Put a doc in channel PBS + response := rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-1", `{"channel":["PBS"]}`) + RequireStatus(t, response, 201) + + // Allocate a sequence but do not write a doc for it - will block DCP buffering until sequence is skipped + slowSequence, seqErr := db.AllocateTestSequence(database) + require.NoError(t, seqErr) + + // Write a document granting user 'bernard' access to PBS + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/grantDoc", `{"accessUser":"bernard", "accessChannel":"PBS"}`) + RequireStatus(t, response, 201) + + caughtUpStart := database.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Value() + + // Start a regular one-shot pull + err = client.StartOneshotPull() + assert.NoError(t, err) + + // Wait for the one-shot changes feed to go into wait mode before releasing the slow sequence + require.NoError(t, database.WaitForTotalCaughtUp(caughtUpStart+1)) + + // Release the slow sequence + releaseErr := db.ReleaseTestSequence(database, slowSequence) + require.NoError(t, releaseErr) + + // The one-shot pull should unblock and replicate the document in the granted channel + data, ok := client.WaitForDoc("pbs-1") + assert.True(t, ok) + assert.Equal(t, `{"channel":["PBS"]}`, string(data)) + +} diff --git a/rest/blip_client_test.go b/rest/blip_client_test.go index 13558aee7e..0b09af007b 100644 --- a/rest/blip_client_test.go +++ b/rest/blip_client_test.go @@ -632,19 +632,23 @@ func (btc *BlipTesterClient) Collection(collectionName string) *BlipTesterCollec // StartPull will begin a continuous pull replication since 0 between the client and server func (btcc *BlipTesterCollectionClient) StartPull() (err error) { - return btcc.StartPullSince("true", "0", "false", "") + return btcc.StartPullSince("true", "0", "false", "", "") } func (btcc *BlipTesterCollectionClient) StartOneshotPull() (err error) { - return btcc.StartPullSince("false", "0", "false", "") + return btcc.StartPullSince("false", "0", "false", "", "") } func (btcc *BlipTesterCollectionClient) StartOneshotPullFiltered(channels string) (err error) { - return btcc.StartPullSince("false", "0", "false", channels) + return btcc.StartPullSince("false", "0", "false", channels, "") +} + +func (btcc *BlipTesterCollectionClient) StartOneshotPullRequestPlus() (err error) { + return btcc.StartPullSince("false", "0", "false", "", "true") } // StartPullSince will begin a pull replication between the client and server with the given params. -func (btc *BlipTesterCollectionClient) StartPullSince(continuous, since, activeOnly string, channels string) (err error) { +func (btc *BlipTesterCollectionClient) StartPullSince(continuous, since, activeOnly, channels, requestPlus string) (err error) { subChangesRequest := blip.NewRequest() subChangesRequest.SetProfile(db.MessageSubChanges) subChangesRequest.Properties[db.SubChangesContinuous] = continuous @@ -654,6 +658,9 @@ func (btc *BlipTesterCollectionClient) StartPullSince(continuous, since, activeO subChangesRequest.Properties[db.SubChangesFilter] = base.ByChannelFilter subChangesRequest.Properties[db.SubChangesChannels] = channels } + if requestPlus != "" { + subChangesRequest.Properties[db.SubChangesRequestPlus] = requestPlus + } subChangesRequest.SetNoReply(true) if btc.parent.BlipTesterClientOpts.SendRevocations { @@ -923,6 +930,9 @@ func (btc *BlipTesterCollectionClient) GetRev(docID, revID string) (data []byte, // WaitForRev blocks until the given doc ID and rev ID have been stored by the client, and returns the data when found. func (btc *BlipTesterCollectionClient) WaitForRev(docID, revID string) (data []byte, found bool) { + if data, found := btc.GetRev(docID, revID); found { + return data, found + } ticker := time.NewTicker(50 * time.Millisecond) timeout := time.After(10 * time.Second) for { @@ -938,6 +948,41 @@ func (btc *BlipTesterCollectionClient) WaitForRev(docID, revID string) (data []b } } +// GetDoc returns a rev stored in the Client under the given docID. (if multiple revs are present, rev body returned is non-deterministic) +func (btc *BlipTesterCollectionClient) GetDoc(docID string) (data []byte, found bool) { + btc.docsLock.RLock() + defer btc.docsLock.RUnlock() + + if rev, ok := btc.docs[docID]; ok { + for _, data := range rev { + return data.body, true + } + } + + return nil, false +} + +// WaitForDoc blocks until the given doc ID has been stored by the client, and returns the data when found. +func (btc *BlipTesterCollectionClient) WaitForDoc(docID string) (data []byte, found bool) { + + if data, found := btc.GetDoc(docID); found { + return data, found + } + ticker := time.NewTicker(50 * time.Millisecond) + timeout := time.After(10 * time.Second) + for { + select { + case <-timeout: + btc.parent.rt.TB.Fatalf("BlipTesterClient timed out waiting for doc ID: %v", docID) + return nil, false + case <-ticker.C: + if data, found := btc.GetDoc(docID); found { + return data, found + } + } + } +} + // GetMessage returns the message stored in the Client under the given serial number func (btr *BlipTesterReplicator) GetMessage(serialNumber blip.MessageNumber) (msg *blip.Message, found bool) { btr.messagesLock.RLock() @@ -1026,6 +1071,10 @@ func (btc *BlipTesterClient) WaitForRev(docID string, revID string) ([]byte, boo return btc.SingleCollection().WaitForRev(docID, revID) } +func (btc *BlipTesterClient) WaitForDoc(docID string) ([]byte, bool) { + return btc.SingleCollection().WaitForDoc(docID) +} + func (btc *BlipTesterClient) WaitForBlipRevMessage(docID string, revID string) (*blip.Message, bool) { return btc.SingleCollection().WaitForBlipRevMessage(docID, revID) } @@ -1038,16 +1087,20 @@ func (btc *BlipTesterClient) StartOneshotPullFiltered(channels string) error { return btc.SingleCollection().StartOneshotPullFiltered(channels) } +func (btc *BlipTesterClient) StartOneshotPullRequestPlus() error { + return btc.SingleCollection().StartOneshotPullRequestPlus() +} + func (btc *BlipTesterClient) PushRev(docID string, revID string, body []byte) (string, error) { return btc.SingleCollection().PushRev(docID, revID, body) } func (btc *BlipTesterClient) StartPullSince(continuous, since, activeOnly string) error { - return btc.SingleCollection().StartPullSince(continuous, since, activeOnly, "") + return btc.SingleCollection().StartPullSince(continuous, since, activeOnly, "", "") } func (btc *BlipTesterClient) StartFilteredPullSince(continuous, since, activeOnly string, channels string) error { - return btc.SingleCollection().StartPullSince(continuous, since, activeOnly, channels) + return btc.SingleCollection().StartPullSince(continuous, since, activeOnly, channels, "") } func (btc *BlipTesterClient) GetRev(docID, revID string) ([]byte, bool) { diff --git a/rest/changes_api.go b/rest/changes_api.go index 408ca401b9..b1f39173f3 100644 --- a/rest/changes_api.go +++ b/rest/changes_api.go @@ -37,6 +37,12 @@ const kDefaultTimeoutMS = 5 * 60 * 1000 // Maximum value of _changes?timeout property const kMaxTimeoutMS = 15 * 60 * 1000 +// Values for feed parameter on changes request +const feedTypeContinuous = "continuous" +const feedTypeLongpoll = "longpoll" +const feedTypeNormal = "normal" +const feedTypeWebsocket = "websocket" + func (h *handler) handleRevsDiff() error { var input map[string][]string err := h.readJSONInto(&input) @@ -180,6 +186,16 @@ func (h *handler) handleChanges() error { options.ActiveOnly = h.getBoolQuery("active_only") options.IncludeDocs = h.getBoolQuery("include_docs") options.Revocations = h.getBoolQuery("revocations") + + useRequestPlus, _ := h.getOptBoolQuery("request_plus", h.db.Options.ChangesRequestPlus) + if useRequestPlus && feed != feedTypeContinuous { + var seqErr error + options.RequestPlusSeq, seqErr = h.db.GetRequestPlusSequence() + if seqErr != nil { + return base.HTTPErrorf(http.StatusServiceUnavailable, "Unable to retrieve requestPlus sequence") + } + + } filter = h.getQuery("filter") channelsParam := h.getQuery("channels") if channelsParam != "" { @@ -312,18 +328,18 @@ func (h *handler) handleChanges() error { var err error switch feed { - case "normal": + case feedTypeNormal: if filter == "_doc_ids" { err, forceClose = h.sendSimpleChanges(userChannels, options, docIdsArray) } else { err, forceClose = h.sendSimpleChanges(userChannels, options, nil) } - case "longpoll": + case feedTypeLongpoll: options.Wait = true err, forceClose = h.sendSimpleChanges(userChannels, options, nil) - case "continuous": + case feedTypeContinuous: err, forceClose = h.sendContinuousChangesByHTTP(userChannels, options) - case "websocket": + case feedTypeWebsocket: err, forceClose = h.sendContinuousChangesByWebSocket(userChannels, options) default: err = base.HTTPErrorf(http.StatusBadRequest, "Unknown feed type") @@ -454,7 +470,7 @@ func (h *handler) generateContinuousChanges(inChannels base.Set, options db.Chan options.Continuous = true err, forceClose := db.GenerateChanges(h.ctx(), h.rq.Context(), h.collection, inChannels, options, nil, send) if sendErr, ok := err.(*db.ChangesSendErr); ok { - h.logStatus(http.StatusOK, fmt.Sprintf("0Write error: %v", sendErr)) + h.logStatus(http.StatusOK, fmt.Sprintf("Write error: %v", sendErr)) return nil, forceClose // error is probably because the client closed the connection } else { h.logStatus(http.StatusOK, "OK (continuous feed closed)") @@ -580,7 +596,8 @@ func (h *handler) readChangesOptionsFromJSON(jsonData []byte) (feed string, opti HeartbeatMs *uint64 `json:"heartbeat"` TimeoutMs *uint64 `json:"timeout"` AcceptEncoding string `json:"accept_encoding"` - ActiveOnly bool `json:"active_only"` // Return active revisions only + ActiveOnly bool `json:"active_only"` // Return active revisions only + RequestPlus *bool `json:"request_plus"` // Wait for sequence buffering to catch up to database seq value at time request was issued } // Initialize since clock and hasher ahead of unmarshalling sequence @@ -624,6 +641,20 @@ func (h *handler) readChangesOptionsFromJSON(jsonData []byte) (feed string, opti compress = (input.AcceptEncoding == "gzip") + if h.db != nil && feed != feedTypeContinuous { + useRequestPlus := h.db.Options.ChangesRequestPlus + if input.RequestPlus != nil { + useRequestPlus = *input.RequestPlus + } + if useRequestPlus { + var seqErr error + options.RequestPlusSeq, seqErr = h.db.GetRequestPlusSequence() + if seqErr != nil { + err = base.HTTPErrorf(http.StatusServiceUnavailable, "Unable to retrieve requestPlus sequence: %v", seqErr) + return + } + } + } return } diff --git a/rest/changestest/changes_api_test.go b/rest/changestest/changes_api_test.go index 0c6472e7af..3d46f8400d 100644 --- a/rest/changestest/changes_api_test.go +++ b/rest/changestest/changes_api_test.go @@ -3935,6 +3935,297 @@ func TestTombstoneCompaction(t *testing.T) { TestCompact(db.QueryTombstoneBatch + 20) } +// TestOneShotGrantTiming simulates a one-shot changes feed returning before a previously issued grant has been +// buffered over DCP. +func TestOneShotGrantTiming(t *testing.T) { + + base.SetUpTestLogging(t, base.LevelDebug, base.KeyChanges, base.KeyHTTP) + + defer db.SuspendSequenceBatching()() + + rt := rest.NewRestTester(t, + &rest.RestTesterConfig{ + SyncFn: `function(doc) { + channel(doc.channel); + if (doc.accessUser != "") { + access(doc.accessUser, doc.accessChannel) + } + }`, + }) + defer rt.Close() + + // Create user with access to no channels + ctx := rt.Context() + database := rt.GetDatabase() + a := database.Authenticator(ctx) + bernard, err := a.NewUser("bernard", "letmein", nil) + assert.NoError(t, err) + assert.NoError(t, a.Save(bernard)) + + // Put several documents in channel PBS + response := rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-1", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-2", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-3", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-4", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + + var changes struct { + Results []db.ChangeEntry + Last_Seq interface{} + } + + // Allocate a sequence but do not write a doc for it - will block DCP buffering until sequence is skipped + slowSequence, seqErr := db.AllocateTestSequence(database) + require.NoError(t, seqErr) + log.Printf("Allocated slowSequence: %v", slowSequence) + + // Write a document granting user access to PBS + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/grantDoc", `{"accessUser":"bernard", "accessChannel":"PBS"}`) + rest.RequireStatus(t, response, 201) + + // Issue normal one-shot changes request. Expect no results as granting document hasn't been buffered (blocked by + // slowSequence) + changesResponse := rt.SendUserRequest("GET", "/{{.keyspace}}/_changes", "", "bernard") + rest.RequireStatus(t, changesResponse, 200) + err = base.JSONUnmarshal(changesResponse.Body.Bytes(), &changes) + assert.NoError(t, err, "Error unmarshalling changes response") + for _, entry := range changes.Results { + log.Printf("Entry:%+v", entry) + } + require.Len(t, changes.Results, 0) + + // Release the slow sequence and wait for it to be processed over DCP + releaseErr := db.ReleaseTestSequence(database, slowSequence) + require.NoError(t, releaseErr) + rt.WaitForPendingChanges() + + // Issue normal one-shot changes request. Expect results as granting document buffering is unblocked + changesResponse = rt.SendUserRequest("GET", "/{{.keyspace}}/_changes", "", "bernard") + rest.RequireStatus(t, changesResponse, 200) + err = base.JSONUnmarshal(changesResponse.Body.Bytes(), &changes) + assert.NoError(t, err, "Error unmarshalling changes response") + for _, entry := range changes.Results { + log.Printf("Entry:%+v", entry) + } + require.Len(t, changes.Results, 4) + +} + +// TestOneShotGrantRequestPlus simulates a one-shot changes feed being made before a previously issued grant has been +// buffered over DCP. When requestPlus is set, changes feed should block until grant is processed. +func TestOneShotGrantRequestPlus(t *testing.T) { + + base.SetUpTestLogging(t, base.LevelDebug, base.KeyChanges, base.KeyHTTP) + + defer db.SuspendSequenceBatching()() // Required for slow sequence simulation + + rt := rest.NewRestTester(t, + &rest.RestTesterConfig{ + SyncFn: `function(doc) { + channel(doc.channel); + if (doc.accessUser != "") { + access(doc.accessUser, doc.accessChannel) + } + }`, + }) + defer rt.Close() + + // Create user with access to no channels + ctx := rt.Context() + database := rt.GetDatabase() + a := database.Authenticator(ctx) + bernard, err := a.NewUser("bernard", "letmein", nil) + assert.NoError(t, err) + assert.NoError(t, a.Save(bernard)) + + // Put several documents in channel PBS + response := rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-1", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-2", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-3", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-4", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + + var changes struct { + Results []db.ChangeEntry + Last_Seq interface{} + } + + // Allocate a sequence but do not write a doc for it - will block DCP buffering until sequence is skipped + slowSequence, seqErr := db.AllocateTestSequence(database) + require.NoError(t, seqErr) + + // Write a document granting user access to PBS + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/grantDoc", `{"accessUser":"bernard", "accessChannel":"PBS"}`) + rest.RequireStatus(t, response, 201) + + caughtUpStart := database.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Value() + + var oneShotComplete sync.WaitGroup + // Issue a GET requestPlus one-shot changes request in a separate goroutine. + oneShotComplete.Add(1) + go func() { + defer oneShotComplete.Done() + changesResponse := rt.SendUserRequest("GET", "/{{.keyspace}}/_changes?request_plus=true", "", "bernard") + rest.RequireStatus(t, changesResponse, 200) + err = base.JSONUnmarshal(changesResponse.Body.Bytes(), &changes) + assert.NoError(t, err, "Error unmarshalling changes response") + for _, entry := range changes.Results { + log.Printf("Entry:%+v", entry) + } + require.Len(t, changes.Results, 4) + }() + + // Issue a POST requestPlus one-shot changes request in a separate goroutine. + oneShotComplete.Add(1) + go func() { + defer oneShotComplete.Done() + changesResponse := rt.SendUserRequest("POST", "/{{.keyspace}}/_changes", `{"request_plus":true}`, "bernard") + rest.RequireStatus(t, changesResponse, 200) + err = base.JSONUnmarshal(changesResponse.Body.Bytes(), &changes) + assert.NoError(t, err, "Error unmarshalling changes response") + for _, entry := range changes.Results { + log.Printf("Entry:%+v", entry) + } + require.Len(t, changes.Results, 4) + }() + + // Wait for the one-shot changes feed to go into wait mode before releasing the slow sequence + require.NoError(t, database.WaitForTotalCaughtUp(caughtUpStart+2)) + + // Release the slow sequence and wait for it to be processed over DCP + releaseErr := db.ReleaseTestSequence(database, slowSequence) + require.NoError(t, releaseErr) + rt.WaitForPendingChanges() + + oneShotComplete.Wait() +} + +// TestOneShotGrantRequestPlusDbConfig simulates a one-shot changes feed being made before a previously issued grant has been +// buffered over DCP. When requestPlus is set via config, changes feed should block until grant is processed. +func TestOneShotGrantRequestPlusDbConfig(t *testing.T) { + + base.SetUpTestLogging(t, base.LevelDebug, base.KeyChanges, base.KeyHTTP) + + defer db.SuspendSequenceBatching()() + + rt := rest.NewRestTester(t, + &rest.RestTesterConfig{ + SyncFn: `function(doc) { + channel(doc.channel); + if (doc.accessUser != "") { + access(doc.accessUser, doc.accessChannel) + } + }`, + DatabaseConfig: &rest.DatabaseConfig{ + DbConfig: rest.DbConfig{ + ChangesRequestPlus: base.BoolPtr(true), + }, + }, + }) + defer rt.Close() + + // Create user with access to no channels + ctx := rt.Context() + database := rt.GetDatabase() + a := database.Authenticator(ctx) + bernard, err := a.NewUser("bernard", "letmein", nil) + assert.NoError(t, err) + assert.NoError(t, a.Save(bernard)) + + // Put several documents in channel PBS + response := rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-1", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-2", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-3", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/pbs-4", `{"channel":["PBS"]}`) + rest.RequireStatus(t, response, 201) + + var changes struct { + Results []db.ChangeEntry + Last_Seq interface{} + } + + // Allocate a sequence but do not write a doc for it - will block DCP buffering until sequence is skipped + slowSequence, seqErr := db.AllocateTestSequence(database) + require.NoError(t, seqErr) + log.Printf("Allocated slowSequence: %v", slowSequence) + + // Write a document granting user access to PBS + response = rt.SendAdminRequest("PUT", "/{{.keyspace}}/grantDoc", `{"accessUser":"bernard", "accessChannel":"PBS"}`) + rest.RequireStatus(t, response, 201) + + // Issue one-shot GET changes request explicitly setting request_plus=false (should override config value). + // Expect no results as granting document hasn't been buffered (blocked by slowSequence) + changesResponse := rt.SendUserRequest("GET", "/{{.keyspace}}/_changes?request_plus=false", "", "bernard") + rest.RequireStatus(t, changesResponse, 200) + err = base.JSONUnmarshal(changesResponse.Body.Bytes(), &changes) + assert.NoError(t, err, "Error unmarshalling changes response") + for _, entry := range changes.Results { + log.Printf("Entry:%+v", entry) + } + require.Len(t, changes.Results, 0) + + // Issue one-shot POST changes request explicitly setting request_plus=false (should override config value). + // Expect no results as granting document hasn't been buffered (blocked by slowSequence) + changesResponse = rt.SendUserRequest("POST", "/{{.keyspace}}/_changes", `{"request_plus":false}`, "bernard") + rest.RequireStatus(t, changesResponse, 200) + err = base.JSONUnmarshal(changesResponse.Body.Bytes(), &changes) + assert.NoError(t, err, "Error unmarshalling changes response") + for _, entry := range changes.Results { + log.Printf("Entry:%+v", entry) + } + require.Len(t, changes.Results, 0) + + caughtUpStart := database.DbStats.CBLReplicationPull().NumPullReplTotalCaughtUp.Value() + + var oneShotComplete sync.WaitGroup + // Issue a GET one-shot changes request in a separate goroutine. Should run as request plus based on config + oneShotComplete.Add(1) + go func() { + defer oneShotComplete.Done() + changesResponse := rt.SendUserRequest("GET", "/{{.keyspace}}/_changes", "", "bernard") + rest.RequireStatus(t, changesResponse, 200) + err = base.JSONUnmarshal(changesResponse.Body.Bytes(), &changes) + assert.NoError(t, err, "Error unmarshalling changes response") + for _, entry := range changes.Results { + log.Printf("Entry:%+v", entry) + } + require.Len(t, changes.Results, 4) + }() + + // Issue a POST one-shot changes request in a separate goroutine. Should run as request plus based on config + oneShotComplete.Add(1) + go func() { + defer oneShotComplete.Done() + changesResponse := rt.SendUserRequest("POST", "/{{.keyspace}}/_changes", `{}`, "bernard") + rest.RequireStatus(t, changesResponse, 200) + err = base.JSONUnmarshal(changesResponse.Body.Bytes(), &changes) + assert.NoError(t, err, "Error unmarshalling changes response") + for _, entry := range changes.Results { + log.Printf("Entry:%+v", entry) + } + require.Len(t, changes.Results, 4) + }() + + // Wait for the one-shot changes feed to go into wait mode before releasing the slow sequence + require.NoError(t, database.WaitForTotalCaughtUp(caughtUpStart+2)) + + // Release the slow sequence and wait for it to be processed over DCP + releaseErr := db.ReleaseTestSequence(database, slowSequence) + require.NoError(t, releaseErr) + rt.WaitForPendingChanges() + + oneShotComplete.Wait() +} + func waitForCompactStopped(dbc *db.DatabaseContext) error { for i := 0; i < 100; i++ { compactRunning := dbc.CacheCompactActive() diff --git a/rest/config.go b/rest/config.go index 43b6d6a9f7..848bee10aa 100644 --- a/rest/config.go +++ b/rest/config.go @@ -165,6 +165,7 @@ type DbConfig struct { GraphQL *functions.GraphQLConfig `json:"graphql,omitempty"` // GraphQL configuration & resolver fns UserFunctions *functions.FunctionsConfig `json:"functions,omitempty"` // Named JS fns for clients to call Suspendable *bool `json:"suspendable,omitempty"` // Allow the database to be suspended + ChangesRequestPlus *bool `json:"changes_request_plus,omitempty"` // If set, is used as the default value of request_plus for non-continuous replications CORS *auth.CORSConfig `json:"cors,omitempty"` } diff --git a/rest/server_context.go b/rest/server_context.go index 2b10ccd4f5..2cb270627e 100644 --- a/rest/server_context.go +++ b/rest/server_context.go @@ -1050,6 +1050,7 @@ func dbcOptionsFromConfig(ctx context.Context, sc *ServerContext, config *DbConf GroupID: groupID, JavascriptTimeout: javascriptTimeout, Serverless: sc.Config.IsServerless(), + ChangesRequestPlus: base.BoolDefault(config.ChangesRequestPlus, false), // UserQueries: config.UserQueries, // behind feature flag (see below) // UserFunctions: config.UserFunctions, // behind feature flag (see below) // GraphQL: config.GraphQL, // behind feature flag (see below)