Skip to content

Commit

Permalink
CBG-2905 remove cached connections when bucket disappear (#6251)
Browse files Browse the repository at this point in the history
* remove cached buckets
* Add refcounting for parallel calls to get buckets
  • Loading branch information
torcolvin authored May 18, 2023
1 parent 5c7673e commit c3146d8
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 38 deletions.
128 changes: 90 additions & 38 deletions base/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ type CouchbaseCluster struct {
clusterOptions gocb.ClusterOptions
forcePerBucketAuth bool // Forces perBucketAuth authenticators to be used to connect to the bucket
perBucketAuth map[string]*gocb.Authenticator
bucketConnectionMode BucketConnectionMode // Whether to cache cluster connections
cachedClusterConnection *gocb.Cluster // Cached cluster connection, should only be used by GetConfigBuckets
cachedBucketConnections map[string]*cachedBucket // Per-bucket cached connections
cachedConnectionLock sync.Mutex // mutex for access to cachedBucketConnections
configPersistence ConfigPersistence // ConfigPersistence mode
bucketConnectionMode BucketConnectionMode // Whether to cache cluster connections
cachedClusterConnection *gocb.Cluster // Cached cluster connection, should only be used by GetConfigBuckets
cachedBucketConnections cachedBucketConnections // Per-bucket cached connections
cachedConnectionLock sync.Mutex // mutex for access to cachedBucketConnections
configPersistence ConfigPersistence // ConfigPersistence mode
}

type BucketConnectionMode int
Expand All @@ -70,13 +70,73 @@ const (
)

type cachedBucket struct {
bucket *gocb.Bucket
teardownFn func()
bucket *gocb.Bucket // underlying bucket
bucketCloseFn func() // teardown function which will close the gocb connection
refcount int // count of how many functions are using this cachedBucket
shouldClose bool // mark this cachedBucket as needing to be closed with ref
}

// noopTeardown is returned by getBucket when using a cached bucket - these buckets are torn down
// when CouchbaseCluster.Close is called.
func noopTeardown() {}
// cahedBucketConnections is a lockable map cached buckets containing refcounts
type cachedBucketConnections struct {
buckets map[string]*cachedBucket
lock sync.Mutex
}

// removeOutdatedBuckets marks any active buckets for closure and removes the cached connections.
func (c *cachedBucketConnections) removeOutdatedBuckets(activeBuckets Set) {
c.lock.Lock()
defer c.lock.Unlock()
for bucketName, bucket := range c.buckets {
_, exists := activeBuckets[bucketName]
if exists {
continue
}
bucket.shouldClose = true
c._teardown(bucketName)
}
}

// closeAll removes all cached bucekts
func (c *cachedBucketConnections) closeAll() {
c.lock.Lock()
defer c.lock.Unlock()
for _, bucket := range c.buckets {
bucket.shouldClose = true
bucket.bucketCloseFn()
}
}

// teardown closes the cached bucket connection while locked, suitable for CouchbaseCluster.getBucket() teardowns
func (c *cachedBucketConnections) teardown(bucketName string) {
c.lock.Lock()
defer c.lock.Unlock()
c.buckets[bucketName].refcount--
c._teardown(bucketName)
}

// _teardown closes expects the lock to be acquired before calling this function and the reference count to be up to date.
func (c *cachedBucketConnections) _teardown(bucketName string) {
if !c.buckets[bucketName].shouldClose || c.buckets[bucketName].refcount > 0 {
return
}
c.buckets[bucketName].bucketCloseFn()
delete(c.buckets, bucketName)
}

// get returns a cachedBucket for a given bucketName, or nil if it doesn't exist
func (c *cachedBucketConnections) _get(bucketName string) *cachedBucket {
bucket, ok := c.buckets[bucketName]
if !ok {
return nil
}
c.buckets[bucketName].refcount++
return bucket
}

// set adds a cachedBucket for a given bucketName, or nil if it doesn't exist
func (c *cachedBucketConnections) _set(bucketName string, bucket *cachedBucket) {
c.buckets[bucketName] = bucket
}

var _ BootstrapConnection = &CouchbaseCluster{}

Expand Down Expand Up @@ -128,7 +188,7 @@ func NewCouchbaseCluster(server, username, password,
}

if bucketMode == CachedClusterConnections {
cbCluster.cachedBucketConnections = make(map[string]*cachedBucket)
cbCluster.cachedBucketConnections = cachedBucketConnections{buckets: make(map[string]*cachedBucket)}
}

cbCluster.configPersistence = &DocumentBootstrapPersistence{}
Expand Down Expand Up @@ -243,6 +303,8 @@ func (cc *CouchbaseCluster) GetConfigBuckets() ([]string, error) {
bucketList = append(bucketList, bucketName)
}

cc.cachedBucketConnections.removeOutdatedBuckets(SetOf(bucketList...))

return bucketList, nil
}

Expand Down Expand Up @@ -399,13 +461,11 @@ func (cc *CouchbaseCluster) KeyExists(location, docID string) (exists bool, err
// Close calls teardown for any cached buckets and removes from cachedBucketConnections
func (cc *CouchbaseCluster) Close() {

cc.cachedBucketConnections.closeAll()

cc.cachedConnectionLock.Lock()
defer cc.cachedConnectionLock.Unlock()

for bucketName, cachedBucket := range cc.cachedBucketConnections {
cachedBucket.teardownFn()
delete(cc.cachedBucketConnections, bucketName)
}
if cc.cachedClusterConnection != nil {
_ = cc.cachedClusterConnection.Close(nil)
cc.cachedClusterConnection = nil
Expand All @@ -418,36 +478,28 @@ func (cc *CouchbaseCluster) getBucket(bucketName string) (b *gocb.Bucket, teardo
return cc.connectToBucket(bucketName)
}

cc.cachedConnectionLock.Lock()
defer cc.cachedConnectionLock.Unlock()

cacheBucket, ok := cc.cachedBucketConnections[bucketName]
if ok {
return cacheBucket.bucket, noopTeardown, nil
teardownFn = func() {
cc.cachedBucketConnections.teardown(bucketName)
}
cc.cachedBucketConnections.lock.Lock()
defer cc.cachedBucketConnections.lock.Unlock()
bucket := cc.cachedBucketConnections._get(bucketName)
if bucket != nil {
return bucket.bucket, teardownFn, nil
}

// cached bucket not found, connect and add
newBucket, newTeardownFn, err := cc.connectToBucket(bucketName)
newBucket, bucketCloseFn, err := cc.connectToBucket(bucketName)
if err != nil {
return nil, nil, err
}
cc.cachedBucketConnections[bucketName] = &cachedBucket{
bucket: newBucket,
teardownFn: newTeardownFn,
}
return newBucket, noopTeardown, nil
}

// For unrecoverable errors when using cached buckets, remove the bucket from the cache to trigger a new connection on next usage
func (cc *CouchbaseCluster) onCachedBucketError(bucketName string) {
cc.cachedBucketConnections._set(bucketName, &cachedBucket{
bucket: newBucket,
bucketCloseFn: bucketCloseFn,
refcount: 1,
})

cc.cachedConnectionLock.Lock()
defer cc.cachedConnectionLock.Unlock()
cacheBucket, ok := cc.cachedBucketConnections[bucketName]
if ok {
cacheBucket.teardownFn()
delete(cc.cachedBucketConnections, bucketName)
}
return newBucket, teardownFn, nil
}

// connectToBucket establishes a new connection to a bucket, and returns the bucket after waiting for it to be ready.
Expand Down
82 changes: 82 additions & 0 deletions base/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package base

import (
"sync"
"testing"

"github.com/imdario/mergo"
Expand All @@ -33,3 +34,84 @@ func TestMergeStructPointer(t *testing.T) {
assert.Equal(t, "changed", source.Ptr.S)
assert.Equal(t, IntPtr(5), source.Ptr.I)
}

func TestBootstrapRefCounting(t *testing.T) {
if UnitTestUrlIsWalrus() {
t.Skip("Test requires making a connection to CBS")
}
// Integration tests are configured to run in these parameters, they are used in main_test_bucket_pool.go
// Future enhancement would be to allow all integration tests to run with TLS
x509CertPath := ""
x509KeyPath := ""
caCertPath := ""
forcePerBucketAuth := false
tlsSkipVerify := BoolPtr(false)
var perBucketCredentialsConfig map[string]*CredentialsConfig

cluster, err := NewCouchbaseCluster(UnitTestUrl(), TestClusterUsername(), TestClusterPassword(), x509CertPath, x509KeyPath, caCertPath, forcePerBucketAuth, perBucketCredentialsConfig, tlsSkipVerify, BoolPtr(TestUseXattrs()), CachedClusterConnections)
require.NoError(t, err)
defer cluster.Close()
require.NotNil(t, cluster)

clusterConnection, err := cluster.getClusterConnection()
require.NoError(t, err)
require.NotNil(t, clusterConnection)

buckets, err := cluster.GetConfigBuckets()
require.NoError(t, err)
require.Len(t, buckets, tbpNumBuckets())
// GetConfigBuckets doesn't cache connections, it uses cluster connection to determine number of buckets
require.Len(t, cluster.cachedBucketConnections.buckets, 0)

primeBucketConnectionCache := func(bucketNames []string) {
// Bucket CRUD ops do cache connections
for _, bucketName := range bucketNames {
exists, err := cluster.KeyExists(bucketName, "keyThatDoesNotExist")
require.NoError(t, err)
require.False(t, exists)
}
}

primeBucketConnectionCache(buckets)
require.Len(t, cluster.cachedBucketConnections.buckets, tbpNumBuckets())

// call removeOutdatedBuckets as no-op
cluster.cachedBucketConnections.removeOutdatedBuckets(SetOf(buckets...))
require.Len(t, cluster.cachedBucketConnections.buckets, tbpNumBuckets())

// call removeOutdatedBuckets to remove all cached buckets, call multiple times to make sure idempotent
for i := 0; i < 3; i++ {
cluster.cachedBucketConnections.removeOutdatedBuckets(Set{})
require.Len(t, cluster.cachedBucketConnections.buckets, 0)
}

primeBucketConnectionCache(buckets)
require.Len(t, cluster.cachedBucketConnections.buckets, tbpNumBuckets())

// make sure that you can still use an active connection while the bucket has been removed
wg := sync.WaitGroup{}
wg.Add(1)
makeConnection := make(chan struct{})
go func() {
defer wg.Done()
b, teardown, err := cluster.getBucket(buckets[0])
defer teardown()
require.NoError(t, err)
require.NotNil(t, b)
<-makeConnection
// make sure that we can still use bucket after it is no longer cached
exists, err := cluster.configPersistence.keyExists(b.DefaultCollection(), "keyThatDoesNotExist")
require.NoError(t, err)
require.False(t, exists)
}()

cluster.cachedBucketConnections.removeOutdatedBuckets(Set{})
require.Len(t, cluster.cachedBucketConnections.buckets, 0)
makeConnection <- struct{}{}

wg.Wait()

// make sure you can "remove" a non existent bucket in the case that bucket removal is called multiple times
cluster.cachedBucketConnections.removeOutdatedBuckets(SetOf("not-a-bucket"))

}

0 comments on commit c3146d8

Please sign in to comment.