Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3703 create a basic bucket to bucket XDCR implementation [HAS DEPENDENCY] #29

Closed
wants to merge 10 commits into from
39 changes: 25 additions & 14 deletions bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

sgbucket "github.com/couchbase/sg-bucket"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -30,21 +31,21 @@ func init() {
}
}

func testCtx(t *testing.T) context.Context {
// testCtx returns a context for testing
func testCtx(_ *testing.T) context.Context {
return context.Background() // match sync gateway interfaces for logging
}

const testBucketDirName = "RosmarTest"

// testBucketPath returns a path for a test bucket in a unique directory.
func testBucketPath(t *testing.T) string {
return fmt.Sprintf("%s%c%s", t.TempDir(), os.PathSeparator, testBucketDirName)
}

func makeTestBucket(t *testing.T) *Bucket {
LoggingCallback = func(level LogLevel, fmt string, args ...any) {
t.Logf(logLevelNamesPrint[level]+fmt, args...)
}
bucket, err := OpenBucket(uriFromPath(testBucketPath(t)), strings.ToLower(t.Name()), CreateNew)
// makeTestBucketWithName creates a new persistent test bucket with a given name. If a bucket already exists, this function will fail. This uses testing.T.Cleanup to remove the bucket.
func makeTestBucketWithName(t *testing.T, name string) *Bucket {
bucket, err := OpenBucket(uriFromPath(testBucketPath(t)), name, CreateNew)
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, bucket.CloseAndDelete(testCtx(t)))
Expand All @@ -53,6 +54,11 @@ func makeTestBucket(t *testing.T) *Bucket {
return bucket
}

// makeTestBucket creates a new test bucket with a unique game
func makeTestBucket(t *testing.T) *Bucket {
return makeTestBucketWithName(t, t.Name()+"_"+uuid.New().String())
}

func dsName(scope string, coll string) sgbucket.DataStoreName {
return sgbucket.DataStoreNameImpl{Scope: scope, Collection: coll}
}
Expand All @@ -71,8 +77,8 @@ func bucketCount(name string) uint {

func TestNewBucket(t *testing.T) {
ensureNoLeaks(t)
bucket := makeTestBucket(t)
bucketName := strings.ToLower(t.Name())
bucket := makeTestBucketWithName(t, bucketName)
assert.Equal(t, bucketName, bucket.GetName())
assert.Contains(t, bucket.GetURL(), testBucketDirName)

Expand Down Expand Up @@ -146,10 +152,10 @@ var defaultCollection = dsName("_default", "_default")

func TestTwoBucketsOneURL(t *testing.T) {
ensureNoLeaks(t)
bucket1 := makeTestBucket(t)
bucketName := strings.ToLower(t.Name())
bucket1 := makeTestBucketWithName(t, bucketName)
url := bucket1.url

bucketName := strings.ToLower(t.Name())
bucket2, err := OpenBucket(url, bucketName, CreateNew)
require.ErrorContains(t, err, "already exists")
require.Nil(t, bucket2)
Expand Down Expand Up @@ -177,7 +183,8 @@ func TestTwoBucketsOneURL(t *testing.T) {

func TestDefaultCollection(t *testing.T) {
ensureNoLeaks(t)
bucket := makeTestBucket(t)
bucketName := strings.ToLower(t.Name())
bucket := makeTestBucketWithName(t, bucketName)

// Initially one collection:
colls, err := bucket.ListDataStores()
Expand All @@ -186,12 +193,15 @@ func TestDefaultCollection(t *testing.T) {

coll := bucket.DefaultDataStore()
assert.NotNil(t, coll)
assert.Equal(t, strings.ToLower(t.Name())+"._default._default", coll.GetName())
assert.Equal(t, bucketName+"._default._default", coll.GetName())
assert.Equal(t, "_default", coll.ScopeName())
assert.Equal(t, "_default", coll.CollectionName())
}

func TestCreateCollection(t *testing.T) {
ensureNoLeaks(t)
bucket := makeTestBucket(t)
bucketName := strings.ToLower(t.Name())
bucket := makeTestBucketWithName(t, bucketName)

collName := dsName("_default", "foo")
err := bucket.CreateDataStore(testCtx(t), collName)
Expand All @@ -200,8 +210,9 @@ func TestCreateCollection(t *testing.T) {
coll, err := bucket.NamedDataStore(collName)
assert.NoError(t, err)
assert.NotNil(t, coll)
assert.Equal(t, strings.ToLower(t.Name())+"._default.foo", coll.GetName())

assert.Equal(t, bucketName+"._default.foo", coll.GetName())
assert.Equal(t, "_default", coll.ScopeName())
assert.Equal(t, "foo", coll.CollectionName())
colls, err := bucket.ListDataStores()
assert.NoError(t, err)
assert.Equal(t, colls, []sgbucket.DataStoreName{defaultCollection, collName})
Expand Down
6 changes: 3 additions & 3 deletions collection+xattrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (c *Collection) GetXattrs(
}

// SetWithMeta updates a document fully with xattrs and body and allows specification of a specific CAS (newCas). This update will always happen as long as oldCas matches the value of existing document. This simulates the kv op setWithMeta.
func (c *Collection) SetWithMeta(_ context.Context, key string, oldCas CAS, newCas CAS, exp uint32, xattrs []byte, body []byte, datatype sgbucket.FeedDataType) error {
func (c *Collection) setWithMeta(key string, oldCas CAS, newCas CAS, exp uint32, xattrs []byte, body []byte, datatype sgbucket.FeedDataType) error {
isJSON := datatype&sgbucket.FeedDataTypeJSON != 0
isDeletion := false
return c.writeWithMeta(key, body, xattrs, oldCas, newCas, exp, isJSON, isDeletion)
Expand Down Expand Up @@ -80,8 +80,8 @@ func (c *Collection) writeWithMeta(key string, body []byte, xattrs []byte, oldCa
return nil
}

// DeleteWithMeta tombstones a document and sets a specific cas. This update will always happen as long as oldCas matches the value of existing document. This simulates the kv op deleteWithMeta.
func (c *Collection) DeleteWithMeta(_ context.Context, key string, oldCas CAS, newCas CAS, exp uint32, xattrs []byte) error {
// deleteWithMeta tombstones a document and sets a specific cas. This update will always happen as long as oldCas matches the value of existing document. This simulates the kv op deleteWithMeta.
func (c *Collection) deleteWithMeta(key string, oldCas CAS, newCas CAS, exp uint32, xattrs []byte) error {
var body []byte
isJSON := false
isDeletion := true
Expand Down
12 changes: 12 additions & 0 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,22 @@ func (c *Collection) db() queryable {

//////// Interface DataStore

// GetName returns bucket.scope.collection name.
func (c *Collection) GetName() string {
return c.bucket.GetName() + "." + c.DataStoreNameImpl.String()
}

// ScopeName returns the scope name. _default for the default scope.
func (c *Collection) ScopeName() string {
return c.DataStoreNameImpl.ScopeName()
}

// ScopeName returns the collection name. _default for the default collection
func (c *Collection) CollectionName() string {
return c.DataStoreNameImpl.CollectionName()
}

// GetCollectionID returns a unique ID for a given collection, used to identify the collection in DCP feeds and other places.
func (c *Collection) GetCollectionID() uint32 {
return uint32(c.id) - 1 // SG expects that the default collection has id 0, so subtract 1
}
Expand Down
28 changes: 11 additions & 17 deletions collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,10 +633,9 @@ func verifyEmptyBodyAndSyncXattr(t *testing.T, store sgbucket.DataStore, key str
func TestSetWithMetaNoDocument(t *testing.T) {
col := makeTestBucket(t).DefaultDataStore()
const docID = "TestSetWithMeta"
ctx := testCtx(t)
cas2 := CAS(1)
body := []byte(`{"foo":"bar"}`)
err := col.(*Collection).SetWithMeta(ctx, docID, 0, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON)
err := col.(*Collection).setWithMeta(docID, 0, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON)
require.NoError(t, err)

val, cas, err := col.GetRaw(docID)
Expand All @@ -652,10 +651,9 @@ func TestSetWithMetaOverwriteJSON(t *testing.T) {
require.NoError(t, err)
require.Greater(t, cas1, CAS(0))

ctx := testCtx(t)
cas2 := CAS(1)
body := []byte(`{"foo":"bar"}`)
err = col.(*Collection).SetWithMeta(ctx, docID, cas1, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON)
err = col.(*Collection).setWithMeta(docID, cas1, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON)
require.NoError(t, err)

val, cas, err := col.GetRaw(docID)
Expand All @@ -679,10 +677,9 @@ func TestSetWithMetaOverwriteNotJSON(t *testing.T) {
require.Equal(t, sgbucket.FeedOpMutation, event1.Opcode)
require.Equal(t, sgbucket.FeedDataTypeJSON, event1.DataType)

ctx := testCtx(t)
cas2 := CAS(1)
body := []byte(`ABC`)
err = col.(*Collection).SetWithMeta(ctx, docID, cas1, cas2, 0, nil, body, sgbucket.FeedDataTypeRaw)
err = col.(*Collection).setWithMeta(docID, cas1, cas2, 0, nil, body, sgbucket.FeedDataTypeRaw)
require.NoError(t, err)

val, cas, err := col.GetRaw(docID)
Expand All @@ -706,18 +703,17 @@ func TestSetWithMetaOverwriteTombstone(t *testing.T) {
deletedCas, err := col.Remove(docID, cas1)
require.NoError(t, err)

ctx := testCtx(t)
cas2 := CAS(1)
body := []byte(`ABC`)

// make sure there is a cas check even for tombstone
err = col.(*Collection).SetWithMeta(ctx, docID, CAS(0), cas2, 0, nil, body, sgbucket.FeedDataTypeJSON)
err = col.(*Collection).setWithMeta(docID, CAS(0), cas2, 0, nil, body, sgbucket.FeedDataTypeJSON)
require.ErrorAs(t, err, &sgbucket.CasMismatchErr{})

events, _ := startFeed(t, bucket)

// cas check even on tombstone
err = col.(*Collection).SetWithMeta(ctx, docID, deletedCas, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON)
err = col.(*Collection).setWithMeta(docID, deletedCas, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON)
require.NoError(t, err)

event := <-events
Expand All @@ -735,17 +731,16 @@ func TestSetWithMetaCas(t *testing.T) {
docID := t.Name()

body := []byte(`{"foo":"bar"}`)
ctx := testCtx(t)

badStartingCas := CAS(1234)
specifiedCas := CAS(1)

// document doesn't exist, so cas mismatch will occur if CAS != 0
err := col.(*Collection).SetWithMeta(ctx, docID, badStartingCas, specifiedCas, 0, nil, body, sgbucket.FeedDataTypeJSON)
err := col.(*Collection).setWithMeta(docID, badStartingCas, specifiedCas, 0, nil, body, sgbucket.FeedDataTypeJSON)
require.ErrorAs(t, err, &sgbucket.CasMismatchErr{})

// document doesn't exist, but CAS 0 will allow writing
err = col.(*Collection).SetWithMeta(ctx, docID, CAS(0), specifiedCas, 0, nil, body, sgbucket.FeedDataTypeJSON)
err = col.(*Collection).setWithMeta(docID, CAS(0), specifiedCas, 0, nil, body, sgbucket.FeedDataTypeJSON)
require.NoError(t, err)

val, cas, err := col.GetRaw(docID)
Expand Down Expand Up @@ -779,16 +774,15 @@ func TestDeleteWithMeta(t *testing.T) {
specifiedCas := CAS(1)

events, _ := startFeed(t, bucket)
ctx := testCtx(t)

// pass a bad CAS and document will not delete
badStartingCas := CAS(1234)
// document doesn't exist, but CAS 0 will allow writing
err = col.(*Collection).DeleteWithMeta(ctx, docID, badStartingCas, specifiedCas, 0, nil)
err = col.(*Collection).deleteWithMeta(docID, badStartingCas, specifiedCas, 0, nil)
require.ErrorAs(t, err, &sgbucket.CasMismatchErr{})

// tombstone with a good cas
err = col.(*Collection).DeleteWithMeta(ctx, docID, startingCas, specifiedCas, 0, nil)
err = col.(*Collection).deleteWithMeta(docID, startingCas, specifiedCas, 0, nil)
require.NoError(t, err)

event := <-events
Expand Down Expand Up @@ -826,11 +820,11 @@ func TestDeleteWithMetaXattr(t *testing.T) {
// pass a bad CAS and document will not delete
badStartingCas := CAS(1234)
// document doesn't exist, but CAS 0 will allow writing
err = col.DeleteWithMeta(ctx, docID, badStartingCas, specifiedCas, 0, nil)
err = col.deleteWithMeta(docID, badStartingCas, specifiedCas, 0, nil)
require.ErrorAs(t, err, &sgbucket.CasMismatchErr{})

// tombstone with a good cas
err = col.DeleteWithMeta(ctx, docID, startingCas, specifiedCas, 0, []byte(fmt.Sprintf(fmt.Sprintf(`{"%s": "%s"}`, systemXattr, systemXattrVal))))
err = col.deleteWithMeta(docID, startingCas, specifiedCas, 0, []byte(fmt.Sprintf(fmt.Sprintf(`{"%s": "%s"}`, systemXattr, systemXattrVal))))
require.NoError(t, err)

_, err = col.Get(docID, nil)
Expand Down
5 changes: 3 additions & 2 deletions feeds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,16 @@ func readExpectedEventsDEF(t *testing.T, events chan sgbucket.FeedEvent) {

func TestCrossBucketEvents(t *testing.T) {
ensureNoLeakedFeeds(t)
bucket := makeTestBucket(t)
bucketName := strings.ToLower(t.Name())
bucket := makeTestBucketWithName(t, bucketName)
c := bucket.DefaultDataStore()

addToCollection(t, c, "able", 0, "A")
addToCollection(t, c, "baker", 0, "B")
addToCollection(t, c, "charlie", 0, "C")

// Open a 2nd bucket on the same file, to receive events:
bucket2, err := OpenBucket(bucket.url, strings.ToLower(t.Name()), ReOpenExisting)
bucket2, err := OpenBucket(bucket.url, bucketName, ReOpenExisting)
require.NoError(t, err)
t.Cleanup(func() {
bucket2.Close(testCtx(t))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/couchbaselabs/rosmar
go 1.19

require (
github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8
github.com/couchbase/sg-bucket v0.0.0-20240405013818-2750c47d6936
github.com/google/uuid v1.6.0
github.com/mattn/go-sqlite3 v1.14.22
github.com/stretchr/testify v1.9.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8 h1:kfWMYvUgSg2yIZJx+t63Ucl+zorvFqlYayXPkiXFtSE=
github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8/go.mod h1:5me3TJLTPfR0s3aMJZcPoTu5FT8oelaINz5l7Q3cApE=
github.com/couchbase/sg-bucket v0.0.0-20240405013818-2750c47d6936 h1:RQX9vRQ1orRgr9IyTcJemj0yjY0heUJfmAY70dPC+YQ=
github.com/couchbase/sg-bucket v0.0.0-20240405013818-2750c47d6936/go.mod h1:5me3TJLTPfR0s3aMJZcPoTu5FT8oelaINz5l7Q3cApE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
Loading
Loading