Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync optimizations #6234

Open
wants to merge 43 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
f9dd5e9
CBG-2894: Reject user auth when channel threshold is over 500 (#6214)
gregns1 May 9, 2023
ae024a2
Report the correct error (#6232)
torcolvin May 9, 2023
c776492
CBG-2916 Add database examples with scopes (#6233)
torcolvin May 10, 2023
ddb447f
Update scopes documentation (#6231)
torcolvin May 10, 2023
370a2bd
CBG-2895: Add static replication connection limit (#6226)
gregns1 May 11, 2023
80d5eae
API Spec cleanup (scopes/collections) (#6236)
bbrks May 11, 2023
a31be80
CBG-2928 add blip stats for database (#6229)
torcolvin May 11, 2023
f56fba3
add x-additionalPropertiesName (#6238)
torcolvin May 11, 2023
b9a08ed
CBG-2938: Ignore Cbgt EOF feed errors when intentionally stopped (#6235)
bbrks May 12, 2023
1bdd12c
Rename TestStatusAfterReplicationRebalanceFail to avoid 'Fail:' searc…
bbrks May 15, 2023
36a3204
CBG-2853 Allow one-shot replications to wait for DCP to catch up on c…
adamcfraser May 15, 2023
2eb41b1
Fix race in TestOneShotGrant tests (#6247)
adamcfraser May 15, 2023
c24e303
Use less or equal as a speculative fix (#6246)
torcolvin May 15, 2023
4fe200c
CBG-2944: Ensure proveAttachments works for v2 attachments with a v2 …
bbrks May 16, 2023
3655a09
Make turnOffNoDelay log more info in error case (#6250)
bbrks May 17, 2023
044bf5b
CBG-2973: Fix panic for assigning to nil map inside Mutable1xBody (#6…
gregns1 May 17, 2023
c1cf34e
Fix RedactableError not satisfying the Redactor interface (#6253)
bbrks May 17, 2023
5c7673e
CBG-2998 always set no TLS bootstrap parameter to false for cbgt (#6254)
torcolvin May 18, 2023
c3146d8
CBG-2905 remove cached connections when bucket disappear (#6251)
torcolvin May 18, 2023
cb6aef7
Log around config.FromConnStr to diagnose slow DNS SRV resolution (#6…
bbrks May 19, 2023
0f6660f
Update waiting error message from generic message (#6259)
torcolvin May 23, 2023
75def59
Tweak TestIncrCounter to cover non-equal def and amt values (#6263)
bbrks May 25, 2023
b49a752
Setup manifest for 3.0.8 (#6266)
adamcfraser May 26, 2023
7093dda
CBG-2983 Close cbgt agents on database close (#6265)
adamcfraser May 26, 2023
c17d66c
CBG-3024 Make sure CE import uses checkpoints (#6261)
torcolvin May 26, 2023
b1b017e
CBG-3001 Avoid bucket retrieval error during OnFeedClose (#6269)
adamcfraser May 30, 2023
36d453a
Sync: Sequence-allocation optimization for push
snej Apr 4, 2023
60288e1
Sync: Small change-list optimizations
snej Apr 4, 2023
8e1d959
Sync: Limit concurrency of blip handlers
snej Apr 4, 2023
a80a9da
Flow-control of outgoing 'rev' messages
snej May 9, 2023
2d5b448
Fixed tests that copy blip.Message structs
snej May 10, 2023
c471697
CBG-2977 allow DELETE on a broken DB config (#6260)
torcolvin May 30, 2023
c7b78bf
Make tests pass with default collections/views (#6267)
torcolvin May 31, 2023
2940cb1
CBG-3043: pick up cbgt fix for panic in import feed (#6270)
gregns1 May 31, 2023
7dff1d1
CBG-3028: fixes for failing CE tests (#6279)
gregns1 Jun 1, 2023
76f92e0
CBG-2857 Remove unambiguous timeouts from triggering cbcollections (#…
torcolvin Jun 1, 2023
b01ed47
CBG-2793: attachment compaction code erroneously sets failOnRollback …
gregns1 Jun 1, 2023
a62cd63
Make test pass if there are buckets that are non test buckets (#6285)
torcolvin Jun 5, 2023
3b3201b
Tiny whitespace lint fix
snej Jun 5, 2023
d0be0f8
CBG-3022: Replicator will not reconnect when max_back_off != 0 (#6287)
gregns1 Jun 6, 2023
6cb633a
Add the current state of the database in error message (#6290)
torcolvin Jun 6, 2023
e4f58d2
Updated go-blip with a fix for a race condition
snej Jun 6, 2023
9fcefbc
Merge branch 'master' into CBG-2952-sync-opt
snej Jun 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 86 additions & 5 deletions auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ type Authenticator struct {
}

type AuthenticatorOptions struct {
ClientPartitionWindow time.Duration
ChannelsWarningThreshold *uint32
SessionCookieName string
BcryptCost int
LogCtx context.Context
ClientPartitionWindow time.Duration
ChannelsWarningThreshold *uint32
ServerlessChannelThreshold uint32
SessionCookieName string
BcryptCost int
LogCtx context.Context

// Collections defines the set of collections used by the authenticator when rebuilding channels.
// Channels are only recomputed for collections included in this set.
Expand Down Expand Up @@ -196,6 +197,17 @@ func (auth *Authenticator) getPrincipal(docID string, factory func() Principal)
}
changed = true
}
// If the channel threshold has been set we need to check the inherited channels across all scopes and collections against the limit
if auth.ServerlessChannelThreshold != 0 {
channelsLength, err := auth.getInheritedChannelsLength(user)
if err != nil {
return nil, nil, false, err
}
err = auth.checkChannelLimits(channelsLength, user)
if err != nil {
return nil, nil, false, err
}
}
}

if changed {
Expand Down Expand Up @@ -223,13 +235,81 @@ func (auth *Authenticator) getPrincipal(docID string, factory func() Principal)
return princ, nil
}

// inheritedCollectionChannels returns channels for a given scope + collection
func (auth *Authenticator) inheritedCollectionChannels(user User, scope, collection string) (ch.TimedSet, error) {
roles, err := auth.getUserRoles(user)
if err != nil {
return nil, err
}

channels := user.CollectionChannels(scope, collection)
for _, role := range roles {
roleSince := user.RoleNames()[role.Name()]
channels.AddAtSequence(role.CollectionChannels(scope, collection), roleSince.Sequence)
}
return channels, nil
}

// getInheritedChannelsLength returns number of channels a user has access to across all collections
func (auth *Authenticator) getInheritedChannelsLength(user User) (int, error) {
var cumulativeChannels int
for scope, collections := range auth.Collections {
for collection := range collections {
channels, err := auth.inheritedCollectionChannels(user, scope, collection)
if err != nil {
return 0, err
}
cumulativeChannels += len(channels)
}
}
return cumulativeChannels, nil
}

// checkChannelLimits logs a warning when the warning threshold is met and will return an error when the channel limit is met
func (auth *Authenticator) checkChannelLimits(channels int, user User) error {
// Error if ServerlessChannelThreshold is set and is >= than the threshold
if uint32(channels) >= auth.ServerlessChannelThreshold {
base.ErrorfCtx(auth.LogCtx, "User ID: %v channel count: %d exceeds %d for channels per user threshold. Auth will be rejected until rectified",
base.UD(user.Name()), channels, auth.ServerlessChannelThreshold)
return base.ErrMaximumChannelsForUserExceeded
}

// This function is likely to be called once per session when a channel limit is applied, the sync once
// applied here ensures we don't fill logs with warnings about being over warning threshold. We may want
// to revisit this implementation around the warning threshold in future
user.GetWarnChanSync().Do(func() {
if channelsPerUserThreshold := auth.ChannelsWarningThreshold; channelsPerUserThreshold != nil {
if uint32(channels) >= *channelsPerUserThreshold {
base.WarnfCtx(auth.LogCtx, "User ID: %v channel count: %d exceeds %d for channels per user warning threshold",
base.UD(user.Name()), channels, *channelsPerUserThreshold)
}
}
})
return nil
}

// getUserRoles gets all roles a user has been granted
func (auth *Authenticator) getUserRoles(user User) ([]Role, error) {
roles := make([]Role, 0, len(user.RoleNames()))
for name := range user.RoleNames() {
role, err := auth.GetRole(name)
if err != nil {
return nil, err
} else if role != nil {
roles = append(roles, role)
}
}
return roles, nil
}

// Rebuild channels computes the full set of channels for all collections defined for the authenticator.
// For each collection in Authenticator.collections:
// - if there is no CollectionAccess on the principal for the collection, rebuilds channels for that collection
// - If CollectionAccess on the principal has been invalidated, rebuilds channels for that collection
func (auth *Authenticator) rebuildChannels(princ Principal) (changed bool, err error) {

changed = false

for scope, collections := range auth.Collections {
for collection, _ := range collections {
// If collection channels are nil, they have been invalidated and must be rebuilt
Expand All @@ -242,6 +322,7 @@ func (auth *Authenticator) rebuildChannels(princ Principal) (changed bool, err e
}
}
}

return changed, nil
}

Expand Down
111 changes: 111 additions & 0 deletions auth/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2752,6 +2752,117 @@ func TestObtainChannelsForDeletedRole(t *testing.T) {
}
}

func TestServerlessChannelLimitsRoles(t *testing.T) {
testCases := []struct {
Name string
Collection bool
}{
{
Name: "Single role",
},
{
Name: "Muliple roles",
},
}
for _, testCase := range testCases {
t.Run(testCase.Name, func(t *testing.T) {
testBucket := base.GetTestBucket(t)
defer testBucket.Close()
dataStore := testBucket.GetSingleDataStore()
var role2 Role

opts := DefaultAuthenticatorOptions()
opts.ServerlessChannelThreshold = 5
opts.Collections = map[string]map[string]struct{}{
"scope1": {"collection1": struct{}{}, "collection2": struct{}{}},
}
auth := NewAuthenticator(dataStore, nil, opts)
user1, err := auth.NewUser("user1", "pass", ch.BaseSetOf(t, "ABC"))
require.NoError(t, err)
err = auth.Save(user1)
require.NoError(t, err)
_, err = auth.AuthenticateUser("user1", "pass")
require.NoError(t, err)

role1, err := auth.NewRole("role1", nil)
require.NoError(t, err)
if testCase.Name == "Single role" {
user1.SetExplicitRoles(ch.TimedSet{"role1": ch.NewVbSimpleSequence(1)}, 1)
require.NoError(t, auth.Save(user1))
_, err = auth.AuthenticateUser("user1", "pass")
require.NoError(t, err)

role1.SetCollectionExplicitChannels("scope1", "collection1", ch.AtSequence(ch.BaseSetOf(t, "ABC", "DEF", "GHI", "JKL"), 1), 1)
require.NoError(t, auth.Save(role1))
} else {
role2, err = auth.NewRole("role2", nil)
require.NoError(t, err)
user1.SetExplicitRoles(ch.TimedSet{"role1": ch.NewVbSimpleSequence(1), "role2": ch.NewVbSimpleSequence(1)}, 1)
require.NoError(t, auth.Save(user1))
role1.SetCollectionExplicitChannels("scope1", "collection1", ch.AtSequence(ch.BaseSetOf(t, "ABC", "DEF", "GHI", "JKL"), 1), 1)
role2.SetCollectionExplicitChannels("scope1", "collection2", ch.AtSequence(ch.BaseSetOf(t, "MNO", "PQR"), 1), 1)
require.NoError(t, auth.Save(role1))
require.NoError(t, auth.Save(role2))
}
_, err = auth.AuthenticateUser("user1", "pass")
require.Error(t, err)
})
}
}

func TestServerlessChannelLimits(t *testing.T) {

testCases := []struct {
Name string
Collection bool
}{
{
Name: "Collection not enabled",
Collection: false,
},
{
Name: "Collection is enabled",
Collection: true,
},
}
for _, testCase := range testCases {
t.Run(testCase.Name, func(t *testing.T) {
testBucket := base.GetTestBucket(t)
defer testBucket.Close()
dataStore := testBucket.GetSingleDataStore()

opts := DefaultAuthenticatorOptions()
opts.ServerlessChannelThreshold = 5
if testCase.Collection {
opts.Collections = map[string]map[string]struct{}{
"scope1": {"collection1": struct{}{}, "collection2": struct{}{}},
}
}
auth := NewAuthenticator(dataStore, nil, opts)
user1, err := auth.NewUser("user1", "pass", ch.BaseSetOf(t, "ABC"))
require.NoError(t, err)
err = auth.Save(user1)
require.NoError(t, err)
_, err = auth.AuthenticateUser("user1", "pass")
require.NoError(t, err)

if !testCase.Collection {
user1.SetCollectionExplicitChannels("_default", "_default", ch.AtSequence(ch.BaseSetOf(t, "ABC", "DEF", "GHI", "JKL", "MNO", "PQR"), 1), 1)
err = auth.Save(user1)
require.NoError(t, err)
} else {
user1.SetCollectionExplicitChannels("scope1", "collection1", ch.AtSequence(ch.BaseSetOf(t, "ABC", "DEF", "GHI", "JKL"), 1), 1)
user1.SetCollectionExplicitChannels("scope1", "collection2", ch.AtSequence(ch.BaseSetOf(t, "MNO", "PQR"), 1), 1)
err = auth.Save(user1)
require.NoError(t, err)
}
_, err = auth.AuthenticateUser("user1", "pass")
require.Error(t, err)
assert.Contains(t, err.Error(), base.ErrMaximumChannelsForUserExceeded.Error())
})
}
}

func TestInvalidateRoles(t *testing.T) {
testBucket := base.GetTestBucket(t)
defer testBucket.Close()
Expand Down
3 changes: 3 additions & 0 deletions auth/principal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package auth

import (
"sync"
"time"

"github.com/couchbase/sync_gateway/base"
Expand Down Expand Up @@ -125,6 +126,8 @@ type User interface {

InitializeRoles()

GetWarnChanSync() *sync.Once

revokedChannels(since uint64, lowSeq uint64, triggeredBy uint64) RevokedChannels

// Obtains the period over which the user had access to the given channel. Either directly or via a role.
Expand Down
4 changes: 4 additions & 0 deletions auth/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ func (user *userImpl) SetEmail(email string) error {
return nil
}

func (user *userImpl) GetWarnChanSync() *sync.Once {
return &user.warnChanThresholdOnce
}

func (user *userImpl) RoleNames() ch.TimedSet {
if user.RoleInvalSeq != 0 {
return nil
Expand Down
Loading