diff --git a/bucket_test.go b/bucket_test.go index f7844be..dbf9401 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -20,6 +20,7 @@ import ( "time" sgbucket "github.com/couchbase/sg-bucket" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -30,21 +31,21 @@ func init() { } } -func testCtx(t *testing.T) context.Context { +// testCtx returns a context for testing +func testCtx(_ *testing.T) context.Context { return context.Background() // match sync gateway interfaces for logging } const testBucketDirName = "RosmarTest" +// testBucketPath returns a path for a test bucket in a unique directory. func testBucketPath(t *testing.T) string { return fmt.Sprintf("%s%c%s", t.TempDir(), os.PathSeparator, testBucketDirName) } -func makeTestBucket(t *testing.T) *Bucket { - LoggingCallback = func(level LogLevel, fmt string, args ...any) { - t.Logf(logLevelNamesPrint[level]+fmt, args...) - } - bucket, err := OpenBucket(uriFromPath(testBucketPath(t)), strings.ToLower(t.Name()), CreateNew) +// makeTestBucketWithName creates a new persistent test bucket with a given name. If a bucket already exists, this function will fail. This uses testing.T.Cleanup to remove the bucket. +func makeTestBucketWithName(t *testing.T, name string) *Bucket { + bucket, err := OpenBucket(uriFromPath(testBucketPath(t)), name, CreateNew) require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, bucket.CloseAndDelete(testCtx(t))) @@ -53,6 +54,11 @@ func makeTestBucket(t *testing.T) *Bucket { return bucket } +// makeTestBucket creates a new test bucket with a unique game +func makeTestBucket(t *testing.T) *Bucket { + return makeTestBucketWithName(t, t.Name()+"_"+uuid.New().String()) +} + func dsName(scope string, coll string) sgbucket.DataStoreName { return sgbucket.DataStoreNameImpl{Scope: scope, Collection: coll} } @@ -71,8 +77,8 @@ func bucketCount(name string) uint { func TestNewBucket(t *testing.T) { ensureNoLeaks(t) - bucket := makeTestBucket(t) bucketName := strings.ToLower(t.Name()) + bucket := makeTestBucketWithName(t, bucketName) assert.Equal(t, bucketName, bucket.GetName()) assert.Contains(t, bucket.GetURL(), testBucketDirName) @@ -146,10 +152,10 @@ var defaultCollection = dsName("_default", "_default") func TestTwoBucketsOneURL(t *testing.T) { ensureNoLeaks(t) - bucket1 := makeTestBucket(t) + bucketName := strings.ToLower(t.Name()) + bucket1 := makeTestBucketWithName(t, bucketName) url := bucket1.url - bucketName := strings.ToLower(t.Name()) bucket2, err := OpenBucket(url, bucketName, CreateNew) require.ErrorContains(t, err, "already exists") require.Nil(t, bucket2) @@ -177,7 +183,8 @@ func TestTwoBucketsOneURL(t *testing.T) { func TestDefaultCollection(t *testing.T) { ensureNoLeaks(t) - bucket := makeTestBucket(t) + bucketName := strings.ToLower(t.Name()) + bucket := makeTestBucketWithName(t, bucketName) // Initially one collection: colls, err := bucket.ListDataStores() @@ -186,12 +193,15 @@ func TestDefaultCollection(t *testing.T) { coll := bucket.DefaultDataStore() assert.NotNil(t, coll) - assert.Equal(t, strings.ToLower(t.Name())+"._default._default", coll.GetName()) + assert.Equal(t, bucketName+"._default._default", coll.GetName()) + assert.Equal(t, "_default", coll.ScopeName()) + assert.Equal(t, "_default", coll.CollectionName()) } func TestCreateCollection(t *testing.T) { ensureNoLeaks(t) - bucket := makeTestBucket(t) + bucketName := strings.ToLower(t.Name()) + bucket := makeTestBucketWithName(t, bucketName) collName := dsName("_default", "foo") err := bucket.CreateDataStore(testCtx(t), collName) @@ -200,8 +210,9 @@ func TestCreateCollection(t *testing.T) { coll, err := bucket.NamedDataStore(collName) assert.NoError(t, err) assert.NotNil(t, coll) - assert.Equal(t, strings.ToLower(t.Name())+"._default.foo", coll.GetName()) - + assert.Equal(t, bucketName+"._default.foo", coll.GetName()) + assert.Equal(t, "_default", coll.ScopeName()) + assert.Equal(t, "foo", coll.CollectionName()) colls, err := bucket.ListDataStores() assert.NoError(t, err) assert.Equal(t, colls, []sgbucket.DataStoreName{defaultCollection, collName}) diff --git a/collection+xattrs.go b/collection+xattrs.go index ad2566b..4326c08 100644 --- a/collection+xattrs.go +++ b/collection+xattrs.go @@ -39,7 +39,7 @@ func (c *Collection) GetXattrs( } // SetWithMeta updates a document fully with xattrs and body and allows specification of a specific CAS (newCas). This update will always happen as long as oldCas matches the value of existing document. This simulates the kv op setWithMeta. -func (c *Collection) SetWithMeta(_ context.Context, key string, oldCas CAS, newCas CAS, exp uint32, xattrs []byte, body []byte, datatype sgbucket.FeedDataType) error { +func (c *Collection) setWithMeta(key string, oldCas CAS, newCas CAS, exp uint32, xattrs []byte, body []byte, datatype sgbucket.FeedDataType) error { isJSON := datatype&sgbucket.FeedDataTypeJSON != 0 isDeletion := false return c.writeWithMeta(key, body, xattrs, oldCas, newCas, exp, isJSON, isDeletion) @@ -80,8 +80,8 @@ func (c *Collection) writeWithMeta(key string, body []byte, xattrs []byte, oldCa return nil } -// DeleteWithMeta tombstones a document and sets a specific cas. This update will always happen as long as oldCas matches the value of existing document. This simulates the kv op deleteWithMeta. -func (c *Collection) DeleteWithMeta(_ context.Context, key string, oldCas CAS, newCas CAS, exp uint32, xattrs []byte) error { +// deleteWithMeta tombstones a document and sets a specific cas. This update will always happen as long as oldCas matches the value of existing document. This simulates the kv op deleteWithMeta. +func (c *Collection) deleteWithMeta(key string, oldCas CAS, newCas CAS, exp uint32, xattrs []byte) error { var body []byte isJSON := false isDeletion := true diff --git a/collection.go b/collection.go index b6c6231..20d10e8 100644 --- a/collection.go +++ b/collection.go @@ -63,10 +63,22 @@ func (c *Collection) db() queryable { //////// Interface DataStore +// GetName returns bucket.scope.collection name. func (c *Collection) GetName() string { return c.bucket.GetName() + "." + c.DataStoreNameImpl.String() } +// ScopeName returns the scope name. _default for the default scope. +func (c *Collection) ScopeName() string { + return c.DataStoreNameImpl.ScopeName() +} + +// ScopeName returns the collection name. _default for the default collection +func (c *Collection) CollectionName() string { + return c.DataStoreNameImpl.CollectionName() +} + +// GetCollectionID returns a unique ID for a given collection, used to identify the collection in DCP feeds and other places. func (c *Collection) GetCollectionID() uint32 { return uint32(c.id) - 1 // SG expects that the default collection has id 0, so subtract 1 } diff --git a/collection_test.go b/collection_test.go index 1450ebd..6fb377d 100644 --- a/collection_test.go +++ b/collection_test.go @@ -633,10 +633,9 @@ func verifyEmptyBodyAndSyncXattr(t *testing.T, store sgbucket.DataStore, key str func TestSetWithMetaNoDocument(t *testing.T) { col := makeTestBucket(t).DefaultDataStore() const docID = "TestSetWithMeta" - ctx := testCtx(t) cas2 := CAS(1) body := []byte(`{"foo":"bar"}`) - err := col.(*Collection).SetWithMeta(ctx, docID, 0, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON) + err := col.(*Collection).setWithMeta(docID, 0, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON) require.NoError(t, err) val, cas, err := col.GetRaw(docID) @@ -652,10 +651,9 @@ func TestSetWithMetaOverwriteJSON(t *testing.T) { require.NoError(t, err) require.Greater(t, cas1, CAS(0)) - ctx := testCtx(t) cas2 := CAS(1) body := []byte(`{"foo":"bar"}`) - err = col.(*Collection).SetWithMeta(ctx, docID, cas1, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON) + err = col.(*Collection).setWithMeta(docID, cas1, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON) require.NoError(t, err) val, cas, err := col.GetRaw(docID) @@ -679,10 +677,9 @@ func TestSetWithMetaOverwriteNotJSON(t *testing.T) { require.Equal(t, sgbucket.FeedOpMutation, event1.Opcode) require.Equal(t, sgbucket.FeedDataTypeJSON, event1.DataType) - ctx := testCtx(t) cas2 := CAS(1) body := []byte(`ABC`) - err = col.(*Collection).SetWithMeta(ctx, docID, cas1, cas2, 0, nil, body, sgbucket.FeedDataTypeRaw) + err = col.(*Collection).setWithMeta(docID, cas1, cas2, 0, nil, body, sgbucket.FeedDataTypeRaw) require.NoError(t, err) val, cas, err := col.GetRaw(docID) @@ -706,18 +703,17 @@ func TestSetWithMetaOverwriteTombstone(t *testing.T) { deletedCas, err := col.Remove(docID, cas1) require.NoError(t, err) - ctx := testCtx(t) cas2 := CAS(1) body := []byte(`ABC`) // make sure there is a cas check even for tombstone - err = col.(*Collection).SetWithMeta(ctx, docID, CAS(0), cas2, 0, nil, body, sgbucket.FeedDataTypeJSON) + err = col.(*Collection).setWithMeta(docID, CAS(0), cas2, 0, nil, body, sgbucket.FeedDataTypeJSON) require.ErrorAs(t, err, &sgbucket.CasMismatchErr{}) events, _ := startFeed(t, bucket) // cas check even on tombstone - err = col.(*Collection).SetWithMeta(ctx, docID, deletedCas, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON) + err = col.(*Collection).setWithMeta(docID, deletedCas, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON) require.NoError(t, err) event := <-events @@ -735,17 +731,16 @@ func TestSetWithMetaCas(t *testing.T) { docID := t.Name() body := []byte(`{"foo":"bar"}`) - ctx := testCtx(t) badStartingCas := CAS(1234) specifiedCas := CAS(1) // document doesn't exist, so cas mismatch will occur if CAS != 0 - err := col.(*Collection).SetWithMeta(ctx, docID, badStartingCas, specifiedCas, 0, nil, body, sgbucket.FeedDataTypeJSON) + err := col.(*Collection).setWithMeta(docID, badStartingCas, specifiedCas, 0, nil, body, sgbucket.FeedDataTypeJSON) require.ErrorAs(t, err, &sgbucket.CasMismatchErr{}) // document doesn't exist, but CAS 0 will allow writing - err = col.(*Collection).SetWithMeta(ctx, docID, CAS(0), specifiedCas, 0, nil, body, sgbucket.FeedDataTypeJSON) + err = col.(*Collection).setWithMeta(docID, CAS(0), specifiedCas, 0, nil, body, sgbucket.FeedDataTypeJSON) require.NoError(t, err) val, cas, err := col.GetRaw(docID) @@ -779,16 +774,15 @@ func TestDeleteWithMeta(t *testing.T) { specifiedCas := CAS(1) events, _ := startFeed(t, bucket) - ctx := testCtx(t) // pass a bad CAS and document will not delete badStartingCas := CAS(1234) // document doesn't exist, but CAS 0 will allow writing - err = col.(*Collection).DeleteWithMeta(ctx, docID, badStartingCas, specifiedCas, 0, nil) + err = col.(*Collection).deleteWithMeta(docID, badStartingCas, specifiedCas, 0, nil) require.ErrorAs(t, err, &sgbucket.CasMismatchErr{}) // tombstone with a good cas - err = col.(*Collection).DeleteWithMeta(ctx, docID, startingCas, specifiedCas, 0, nil) + err = col.(*Collection).deleteWithMeta(docID, startingCas, specifiedCas, 0, nil) require.NoError(t, err) event := <-events @@ -826,11 +820,11 @@ func TestDeleteWithMetaXattr(t *testing.T) { // pass a bad CAS and document will not delete badStartingCas := CAS(1234) // document doesn't exist, but CAS 0 will allow writing - err = col.DeleteWithMeta(ctx, docID, badStartingCas, specifiedCas, 0, nil) + err = col.deleteWithMeta(docID, badStartingCas, specifiedCas, 0, nil) require.ErrorAs(t, err, &sgbucket.CasMismatchErr{}) // tombstone with a good cas - err = col.DeleteWithMeta(ctx, docID, startingCas, specifiedCas, 0, []byte(fmt.Sprintf(fmt.Sprintf(`{"%s": "%s"}`, systemXattr, systemXattrVal)))) + err = col.deleteWithMeta(docID, startingCas, specifiedCas, 0, []byte(fmt.Sprintf(fmt.Sprintf(`{"%s": "%s"}`, systemXattr, systemXattrVal)))) require.NoError(t, err) _, err = col.Get(docID, nil) diff --git a/feeds_test.go b/feeds_test.go index 2816df3..6d81dcb 100644 --- a/feeds_test.go +++ b/feeds_test.go @@ -190,7 +190,8 @@ func readExpectedEventsDEF(t *testing.T, events chan sgbucket.FeedEvent) { func TestCrossBucketEvents(t *testing.T) { ensureNoLeakedFeeds(t) - bucket := makeTestBucket(t) + bucketName := strings.ToLower(t.Name()) + bucket := makeTestBucketWithName(t, bucketName) c := bucket.DefaultDataStore() addToCollection(t, c, "able", 0, "A") @@ -198,7 +199,7 @@ func TestCrossBucketEvents(t *testing.T) { addToCollection(t, c, "charlie", 0, "C") // Open a 2nd bucket on the same file, to receive events: - bucket2, err := OpenBucket(bucket.url, strings.ToLower(t.Name()), ReOpenExisting) + bucket2, err := OpenBucket(bucket.url, bucketName, ReOpenExisting) require.NoError(t, err) t.Cleanup(func() { bucket2.Close(testCtx(t)) diff --git a/go.mod b/go.mod index 8fbafe6..4b7d435 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/couchbaselabs/rosmar go 1.19 require ( - github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8 + github.com/couchbase/sg-bucket v0.0.0-20240405013818-2750c47d6936 github.com/google/uuid v1.6.0 github.com/mattn/go-sqlite3 v1.14.22 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 2066b3e..2db16e6 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= -github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8 h1:kfWMYvUgSg2yIZJx+t63Ucl+zorvFqlYayXPkiXFtSE= -github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8/go.mod h1:5me3TJLTPfR0s3aMJZcPoTu5FT8oelaINz5l7Q3cApE= +github.com/couchbase/sg-bucket v0.0.0-20240405013818-2750c47d6936 h1:RQX9vRQ1orRgr9IyTcJemj0yjY0heUJfmAY70dPC+YQ= +github.com/couchbase/sg-bucket v0.0.0-20240405013818-2750c47d6936/go.mod h1:5me3TJLTPfR0s3aMJZcPoTu5FT8oelaINz5l7Q3cApE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/xdcr.go b/xdcr.go new file mode 100644 index 0000000..2d57fce --- /dev/null +++ b/xdcr.go @@ -0,0 +1,241 @@ +// Copyright 2024-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package rosmar + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "sync/atomic" + + sgbucket "github.com/couchbase/sg-bucket" +) + +// XDCR implements a XDCR bucket to bucket replication within rosmar. +type XDCR struct { + filterFunc xdcrFilterFunc + terminator chan bool + toBucketCollections map[uint32]*Collection + fromBucket *Bucket + toBucket *Bucket + replicationID string + docsFiltered atomic.Uint64 + docsWritten atomic.Uint64 + errorCount atomic.Uint64 + targetNewerDocs atomic.Uint64 +} + +// NewXDCR creates an instance of XDCR backed by rosmar. This is not started until Start is called. +func NewXDCR(_ context.Context, fromBucket, toBucket *Bucket, opts sgbucket.XDCROptions) (*XDCR, error) { + if opts.Mobile != sgbucket.XDCRMobileOn { + return nil, errors.New("Only sgbucket.XDCRMobileOn is supported in rosmar") + } + return &XDCR{ + fromBucket: fromBucket, + toBucket: toBucket, + replicationID: fmt.Sprintf("%s-%s", fromBucket.GetName(), toBucket.GetName()), + toBucketCollections: make(map[uint32]*Collection), + terminator: make(chan bool), + filterFunc: mobileXDCRFilter, + }, nil + +} + +// processEvent processes a DCP event coming from a toBucket and replicates it to the target datastore. +func (r *XDCR) processEvent(event sgbucket.FeedEvent) bool { + docID := string(event.Key) + trace("Got event %s, opcode: %s", docID, event.Opcode) + col, ok := r.toBucketCollections[event.CollectionID] + if !ok { + logError("This violates the assumption that all collections are mapped to a target collection. This should not happen. Found event=%+v", event) + r.errorCount.Add(1) + return false + } + + switch event.Opcode { + case sgbucket.FeedOpDeletion, sgbucket.FeedOpMutation: + // Filter out events if we have a non XDCR filter + if r.filterFunc != nil && !r.filterFunc(&event) { + trace("Filtering doc %s", docID) + r.docsFiltered.Add(1) + return true + } + + toCas, err := col.Get(docID, nil) + if err != nil && !col.IsError(err, sgbucket.KeyNotFoundError) { + warn("Skipping replicating doc %s, could not perform a kv op get doc in toBucket: %s", event.Key, err) + r.errorCount.Add(1) + return false + } + + /* full LWW conflict resolution is not implemented in rosmar yet + + CBS algorithm is: + + if (command.CAS > document.CAS) + command succeeds + else if (command.CAS == document.CAS) + // Check the RevSeqno + if (command.RevSeqno > document.RevSeqno) + command succeeds + else if (command.RevSeqno == document.RevSeqno) + // Check the expiry time + if (command.Expiry > document.Expiry) + command succeeds + else if (command.Expiry == document.Expiry) + // Finally check flags + if (command.Flags < document.Flags) + command succeeds + + + command fails + + In the current state of rosmar: + + 1. all CAS values are unique. + 2. RevSeqno is not implemented + 3. Expiry is implemented and could be compared except all CAS values are unique. + 4. Flags are not implemented + + */ + + if event.Cas <= toCas { + r.targetNewerDocs.Add(1) + trace("Skipping replicating doc %s, cas %d <= %d", docID, event.Cas, toCas) + return true + } + + err = opWithMeta(col, toCas, event) + if err != nil { + warn("Replicating doc %s, could not write doc: %s", event.Key, err) + r.errorCount.Add(1) + return false + + } + r.docsWritten.Add(1) + } + + return true + +} + +// Start starts the replication for all existing replications. Errors if there aren't corresponding named collections on each bucket. +func (r *XDCR) Start(ctx context.Context) error { + // set up replication to target all existing collections, and map to other collections + scopes := make(map[string][]string) + fromDataStores, err := r.fromBucket.ListDataStores() + if err != nil { + return fmt.Errorf("Could not list data stores: %w", err) + } + toDataStores, err := r.toBucket.ListDataStores() + if err != nil { + return fmt.Errorf("Could not list toBucket data stores: %w", err) + } + for _, fromName := range fromDataStores { + fromDataStore, err := r.fromBucket.NamedDataStore(fromName) + if err != nil { + return fmt.Errorf("Could not get data store %s: %w when starting XDCR", fromName, err) + } + collectionID := fromDataStore.GetCollectionID() + for _, toName := range toDataStores { + if fromName.ScopeName() != toName.ScopeName() || fromName.CollectionName() != toName.CollectionName() { + continue + } + toDataStore, err := r.toBucket.NamedDataStore(toName) + if err != nil { + return fmt.Errorf("There is not a matching datastore name in the toBucket for the fromBucket %s", toName) + } + col, ok := toDataStore.(*Collection) + if !ok { + return fmt.Errorf("DataStore %s is not of rosmar.Collection: %T", toDataStore, toDataStore) + } + r.toBucketCollections[collectionID] = col + scopes[fromName.ScopeName()] = append(scopes[fromName.ScopeName()], fromName.CollectionName()) + break + } + } + + args := sgbucket.FeedArguments{ + ID: "xdcr-" + r.replicationID, + Backfill: sgbucket.FeedNoBackfill, + Terminator: r.terminator, + Scopes: scopes, + } + + return r.fromBucket.StartDCPFeed(ctx, args, r.processEvent, nil) +} + +// Stop terminates the replication. +func (r *XDCR) Stop(_ context.Context) error { + close(r.terminator) + r.terminator = nil + return nil +} + +// opWithMeta writes a document to the target datastore given a type of Deletion or Mutation event with a specific cas. +func opWithMeta(collection *Collection, originalCas uint64, event sgbucket.FeedEvent) error { + var xattrs []byte + var body []byte + if event.DataType&sgbucket.FeedDataTypeXattr != 0 { + var err error + var dcpXattrs []sgbucket.Xattr + body, dcpXattrs, err = sgbucket.DecodeValueWithXattrs(event.Value) + if err != nil { + return err + } + + xattrs, err = xattrToBytes(dcpXattrs) + if err != nil { + return err + } + + } else { + + body = event.Value + + } + + if event.Opcode == sgbucket.FeedOpDeletion { + return collection.deleteWithMeta(string(event.Key), originalCas, event.Cas, event.Expiry, xattrs) + } + + return collection.setWithMeta(string(event.Key), originalCas, event.Cas, event.Expiry, xattrs, body, event.DataType) + +} + +// Stats returns the stats of the XDCR replication. + +func (r *XDCR) Stats(context.Context) (*sgbucket.XDCRStats, error) { + + return &sgbucket.XDCRStats{ + DocsWritten: r.docsWritten.Load(), + DocsFiltered: r.docsFiltered.Load(), + ErrorCount: r.errorCount.Load(), + TargetNewerDocs: r.targetNewerDocs.Load(), + }, nil +} + +// xattrToBytes converts a slice of Xattrs to a byte slice of marshalled json. +func xattrToBytes(xattrs []sgbucket.Xattr) ([]byte, error) { + xattrMap := make(map[string]json.RawMessage) + for _, xattr := range xattrs { + xattrMap[xattr.Name] = xattr.Value + } + return json.Marshal(xattrMap) +} + +// xdcrFilterFunc is a function that filters out events from the replication. +type xdcrFilterFunc func(event *sgbucket.FeedEvent) bool + +// mobileXDCRFilter is the implicit key filtering function that Couchbase Server -mobile XDCR works on. +func mobileXDCRFilter(event *sgbucket.FeedEvent) bool { + return !(strings.HasPrefix(string(event.Key), sgbucket.SyncDocPrefix) && !strings.HasPrefix(string(event.Key), sgbucket.Att2Prefix)) +} diff --git a/xdcr_test.go b/xdcr_test.go new file mode 100644 index 0000000..d713b02 --- /dev/null +++ b/xdcr_test.go @@ -0,0 +1,153 @@ +package rosmar + +// Copyright 2024-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +import ( + "testing" + "time" + + sgbucket "github.com/couchbase/sg-bucket" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestXDCR(t *testing.T) { + ctx := testCtx(t) + fromBucket := makeTestBucket(t) + toBucket := makeTestBucket(t) + defer fromBucket.Close(ctx) + defer toBucket.Close(ctx) + + xdcr, err := NewXDCR(ctx, fromBucket, toBucket, sgbucket.XDCROptions{Mobile: sgbucket.XDCRMobileOn}) + require.NoError(t, err) + + const ( + scopeName = "customScope" + collectionName = "customCollection" + ) + fromDs, err := fromBucket.NamedDataStore(sgbucket.DataStoreNameImpl{Scope: scopeName, Collection: collectionName}) + require.NoError(t, err) + toDs, err := toBucket.NamedDataStore(sgbucket.DataStoreNameImpl{Scope: scopeName, Collection: collectionName}) + require.NoError(t, err) + // create collections on both sides + collections := map[sgbucket.DataStore]sgbucket.DataStore{ + fromBucket.DefaultDataStore(): toBucket.DefaultDataStore(), + fromDs: toDs, + } + + err = xdcr.Start(ctx) + require.NoError(t, err) + defer func() { + assert.NoError(t, xdcr.Stop(ctx)) + }() + const ( + syncDoc = "_sync:doc1doc2" + attachmentDoc = "_sync:att2:foo" + attachmentDocBody = `1ABINARYBLOB` + normalDoc = "doc2" + normalDocBody = `{"key":"value"}` + exp = 0 + ) + var totalDocsFiltered uint64 + var totalDocsWritten uint64 + // run test on named and default collections + for fromDs, toDs := range collections { + _, err = fromDs.AddRaw(syncDoc, exp, []byte(`{"foo", "bar"}`)) + require.NoError(t, err) + + attachmentDocCas, err := fromDs.WriteCas(attachmentDoc, exp, 0, []byte(attachmentDocBody), sgbucket.Raw) + require.NoError(t, err) + + normalDocCas, err := fromDs.WriteCas(normalDoc, exp, 0, []byte(normalDocBody), 0) + require.NoError(t, err) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + val, cas, err := toDs.GetRaw(normalDoc) + assert.NoError(c, err) + assert.Equal(c, normalDocCas, cas) + assert.JSONEq(c, normalDocBody, string(val)) + }, time.Second*5, time.Millisecond*100) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + val, cas, err := toDs.GetRaw(attachmentDoc) + assert.NoError(c, err) + assert.Equal(c, attachmentDocCas, cas) + assert.Equal(c, []byte(attachmentDocBody), val) + }, time.Second*5, time.Millisecond*100) + + _, err = toDs.Get(syncDoc, nil) + assert.True(t, toBucket.IsError(err, sgbucket.KeyNotFoundError)) + + require.NoError(t, fromDs.Delete(normalDoc)) + require.EventuallyWithT(t, func(c *assert.CollectT) { + var value string + _, err = toDs.Get(normalDoc, &value) + assert.Error(t, err) + assert.True(t, toBucket.IsError(err, sgbucket.KeyNotFoundError)) + }, time.Second*5, time.Millisecond*100) + + // stats are not updated in real time, so we need to wait a bit + require.EventuallyWithT(t, func(c *assert.CollectT) { + stats, err := xdcr.Stats(ctx) + assert.NoError(t, err) + assert.Equal(c, totalDocsFiltered+1, stats.DocsFiltered) + assert.Equal(c, totalDocsWritten+3, stats.DocsWritten) + }, time.Second*5, time.Millisecond*100) + totalDocsFiltered += 1 + totalDocsWritten += 3 + + } + stats, err := xdcr.Stats(ctx) + require.NoError(t, err) + require.Equal(t, uint64(0), stats.ErrorCount) +} + +func TestXattrMigration(t *testing.T) { + ctx := testCtx(t) + fromBucket := makeTestBucket(t) + toBucket := makeTestBucket(t) + defer fromBucket.Close(ctx) + defer toBucket.Close(ctx) + + xdcr, err := NewXDCR(ctx, fromBucket, toBucket, sgbucket.XDCROptions{Mobile: sgbucket.XDCRMobileOn}) + require.NoError(t, err) + err = xdcr.Start(ctx) + require.NoError(t, err) + defer func() { + assert.NoError(t, xdcr.Stop(ctx)) + }() + + const ( + docID = "doc1" + systemXattrName = "_system" + userXattrName = "user" + body = `{"foo": "bar"}` + systemXattrVal = `{"bar": "baz"}` + userXattrVal = `{"baz": "baz"}` + ) + + startingCas, err := fromBucket.DefaultDataStore().WriteWithXattrs(ctx, docID, 0, 0, []byte(body), map[string][]byte{systemXattrName: []byte(systemXattrVal)}, nil) + require.NoError(t, err) + require.Greater(t, startingCas, uint64(0)) + + startingCas, err = fromBucket.DefaultDataStore().SetXattrs(ctx, docID, map[string][]byte{userXattrName: []byte(userXattrVal)}) + require.NoError(t, err) + require.EventuallyWithT(t, func(c *assert.CollectT) { + toVal, xattrs, cas, err := toBucket.DefaultDataStore().GetWithXattrs(ctx, docID, []string{systemXattrName, userXattrName}) + assert.NoError(c, err) + assert.Equal(c, startingCas, cas) + assert.JSONEq(c, body, string(toVal)) + assert.JSONEq(c, systemXattrVal, string(xattrs[systemXattrName])) + assert.JSONEq(c, userXattrVal, string(xattrs[userXattrName])) + + }, time.Second*5, time.Millisecond*100) + stats, err := xdcr.Stats(ctx) + require.NoError(t, err) + require.Equal(t, uint64(0), stats.ErrorCount) +}