From b281e77c61d2ad28ed14ceeb32cb7a8e9c939ebc Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Wed, 27 Mar 2024 19:52:24 -0400 Subject: [PATCH 1/9] CBG-3764 create a basic XDCR implementation - uses new common elements from sg-bucket - absorbs DataStoreName and GetCollectionID into DataStore to avoid duplication - The implementation only copies documents with xattrs, and excludes specially named documents. It does not implement _vv, _mou, or _sync handling. --- bucket_test.go | 34 +++++-- collection.go | 7 ++ feeds_test.go | 9 +- go.mod | 2 +- go.sum | 4 +- xdcr.go | 243 +++++++++++++++++++++++++++++++++++++++++++++++++ xdcr_test.go | 141 ++++++++++++++++++++++++++++ 7 files changed, 423 insertions(+), 17 deletions(-) create mode 100644 xdcr.go create mode 100644 xdcr_test.go diff --git a/bucket_test.go b/bucket_test.go index f7844be..3aa5a53 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -20,6 +20,7 @@ import ( "time" sgbucket "github.com/couchbase/sg-bucket" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -30,21 +31,24 @@ func init() { } } +// testCtx returns a context for testing func testCtx(t *testing.T) context.Context { return context.Background() // match sync gateway interfaces for logging } const testBucketDirName = "RosmarTest" +// testBucketPath returns a path for a test bucket in a unique directory. func testBucketPath(t *testing.T) string { return fmt.Sprintf("%s%c%s", t.TempDir(), os.PathSeparator, testBucketDirName) } -func makeTestBucket(t *testing.T) *Bucket { +// makeTestBucketWithName creates a new persistent test bucket with a given name. If a bucket already exists, this function will fail. This uses testing.T.Cleanup to remove the bucket. +func makeTestBucketWithName(t *testing.T, name string) *Bucket { LoggingCallback = func(level LogLevel, fmt string, args ...any) { t.Logf(logLevelNamesPrint[level]+fmt, args...) } - bucket, err := OpenBucket(uriFromPath(testBucketPath(t)), strings.ToLower(t.Name()), CreateNew) + bucket, err := OpenBucket(uriFromPath(testBucketPath(t)), name, CreateNew) require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, bucket.CloseAndDelete(testCtx(t))) @@ -53,6 +57,11 @@ func makeTestBucket(t *testing.T) *Bucket { return bucket } +// makeTestBucket creates a new test bucket with a unique game +func makeTestBucket(t *testing.T) *Bucket { + return makeTestBucketWithName(t, t.Name()+"_"+uuid.New().String()) +} + func dsName(scope string, coll string) sgbucket.DataStoreName { return sgbucket.DataStoreNameImpl{Scope: scope, Collection: coll} } @@ -71,8 +80,8 @@ func bucketCount(name string) uint { func TestNewBucket(t *testing.T) { ensureNoLeaks(t) - bucket := makeTestBucket(t) bucketName := strings.ToLower(t.Name()) + bucket := makeTestBucketWithName(t, bucketName) assert.Equal(t, bucketName, bucket.GetName()) assert.Contains(t, bucket.GetURL(), testBucketDirName) @@ -146,10 +155,10 @@ var defaultCollection = dsName("_default", "_default") func TestTwoBucketsOneURL(t *testing.T) { ensureNoLeaks(t) - bucket1 := makeTestBucket(t) + bucketName := strings.ToLower(t.Name()) + bucket1 := makeTestBucketWithName(t, bucketName) url := bucket1.url - bucketName := strings.ToLower(t.Name()) bucket2, err := OpenBucket(url, bucketName, CreateNew) require.ErrorContains(t, err, "already exists") require.Nil(t, bucket2) @@ -177,7 +186,8 @@ func TestTwoBucketsOneURL(t *testing.T) { func TestDefaultCollection(t *testing.T) { ensureNoLeaks(t) - bucket := makeTestBucket(t) + bucketName := strings.ToLower(t.Name()) + bucket := makeTestBucketWithName(t, bucketName) // Initially one collection: colls, err := bucket.ListDataStores() @@ -186,12 +196,15 @@ func TestDefaultCollection(t *testing.T) { coll := bucket.DefaultDataStore() assert.NotNil(t, coll) - assert.Equal(t, strings.ToLower(t.Name())+"._default._default", coll.GetName()) + assert.Equal(t, bucketName+"._default._default", coll.GetName()) + assert.Equal(t, "_default", coll.DataStoreName().ScopeName()) + assert.Equal(t, "_default", coll.DataStoreName().CollectionName()) } func TestCreateCollection(t *testing.T) { ensureNoLeaks(t) - bucket := makeTestBucket(t) + bucketName := strings.ToLower(t.Name()) + bucket := makeTestBucketWithName(t, bucketName) collName := dsName("_default", "foo") err := bucket.CreateDataStore(testCtx(t), collName) @@ -200,8 +213,9 @@ func TestCreateCollection(t *testing.T) { coll, err := bucket.NamedDataStore(collName) assert.NoError(t, err) assert.NotNil(t, coll) - assert.Equal(t, strings.ToLower(t.Name())+"._default.foo", coll.GetName()) - + assert.Equal(t, bucketName+"._default.foo", coll.GetName()) + assert.Equal(t, "_default", coll.DataStoreName().ScopeName()) + assert.Equal(t, "foo", coll.DataStoreName().CollectionName()) colls, err := bucket.ListDataStores() assert.NoError(t, err) assert.Equal(t, colls, []sgbucket.DataStoreName{defaultCollection, collName}) diff --git a/collection.go b/collection.go index b6c6231..4b89e97 100644 --- a/collection.go +++ b/collection.go @@ -63,10 +63,17 @@ func (c *Collection) db() queryable { //////// Interface DataStore +// GetName returns bucket.scope.collection name. func (c *Collection) GetName() string { return c.bucket.GetName() + "." + c.DataStoreNameImpl.String() } +// DataStoreName returns the scope and collection name. +func (c *Collection) DataStoreName() sgbucket.DataStoreName { + return c.DataStoreNameImpl +} + +// GetCollectionID returns a unique ID for a given collection, used to identify the collection in DCP feeds and other places. func (c *Collection) GetCollectionID() uint32 { return uint32(c.id) - 1 // SG expects that the default collection has id 0, so subtract 1 } diff --git a/feeds_test.go b/feeds_test.go index a4219ed..6d81dcb 100644 --- a/feeds_test.go +++ b/feeds_test.go @@ -190,7 +190,8 @@ func readExpectedEventsDEF(t *testing.T, events chan sgbucket.FeedEvent) { func TestCrossBucketEvents(t *testing.T) { ensureNoLeakedFeeds(t) - bucket := makeTestBucket(t) + bucketName := strings.ToLower(t.Name()) + bucket := makeTestBucketWithName(t, bucketName) c := bucket.DefaultDataStore() addToCollection(t, c, "able", 0, "A") @@ -198,7 +199,7 @@ func TestCrossBucketEvents(t *testing.T) { addToCollection(t, c, "charlie", 0, "C") // Open a 2nd bucket on the same file, to receive events: - bucket2, err := OpenBucket(bucket.url, strings.ToLower(t.Name()), ReOpenExisting) + bucket2, err := OpenBucket(bucket.url, bucketName, ReOpenExisting) require.NoError(t, err) t.Cleanup(func() { bucket2.Close(testCtx(t)) @@ -241,8 +242,8 @@ func TestCollectionMutations(t *testing.T) { require.NoError(t, err) numDocs := 50 - collectionID_1 := collection1.(sgbucket.Collection).GetCollectionID() - collectionID_2 := collection2.(sgbucket.Collection).GetCollectionID() + collectionID_1 := collection1.GetCollectionID() + collectionID_2 := collection2.GetCollectionID() // Add n docs to two collections for i := 1; i <= numDocs; i++ { diff --git a/go.mod b/go.mod index ee0d356..4073915 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/couchbaselabs/rosmar go 1.19 require ( - github.com/couchbase/sg-bucket v0.0.0-20240326230241-0b197e169b27 + github.com/couchbase/sg-bucket v0.0.0-20240327235911-eb1eb768174c github.com/google/uuid v1.6.0 github.com/mattn/go-sqlite3 v1.14.22 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 3cd7827..c4e5e20 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= -github.com/couchbase/sg-bucket v0.0.0-20240326230241-0b197e169b27 h1:FGNvJsAQk6JZzuVXvoLXcoSQzOnQxWkywzYJFQqzXEg= -github.com/couchbase/sg-bucket v0.0.0-20240326230241-0b197e169b27/go.mod h1:5me3TJLTPfR0s3aMJZcPoTu5FT8oelaINz5l7Q3cApE= +github.com/couchbase/sg-bucket v0.0.0-20240327235911-eb1eb768174c h1:IjO5PI0qJLJgiziRlbHVhTm7qGXyT/rY2rXHAJ53uNc= +github.com/couchbase/sg-bucket v0.0.0-20240327235911-eb1eb768174c/go.mod h1:5me3TJLTPfR0s3aMJZcPoTu5FT8oelaINz5l7Q3cApE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/xdcr.go b/xdcr.go new file mode 100644 index 0000000..9f4dc3c --- /dev/null +++ b/xdcr.go @@ -0,0 +1,243 @@ +// Copyright 2024-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package rosmar + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "sync/atomic" + + sgbucket "github.com/couchbase/sg-bucket" +) + +// XDCR implements a XDCR bucket to bucket setup within rosmar. +type XDCR struct { + terminator chan bool + fromBucketCollectionIDs map[uint32]sgbucket.DataStoreName + fromBucket *Bucket + toBucket *Bucket + replicationID string + docsWritten atomic.Uint64 + docsFiltered atomic.Uint64 + errorCount atomic.Uint64 +} + +// NewXDCR creates an instance of XDCR backed by rosmar. This is not started until Start is called. +func NewXDCR(_ context.Context, fromBucket, toBucket *Bucket, opts sgbucket.XDCROptions) (*XDCR, error) { + + return &XDCR{ + fromBucket: fromBucket, + toBucket: toBucket, + replicationID: fmt.Sprintf("%s-%s", fromBucket.GetName(), toBucket.GetName()), + fromBucketCollectionIDs: map[uint32]sgbucket.DataStoreName{}, + terminator: make(chan bool), + }, nil + +} + +// getFromBucketCollectionName returns the collection name for a given collection ID in the from bucket. +func (r *XDCR) getFromBucketCollectionName(collectionID uint32) (sgbucket.DataStoreName, error) { + dsName, ok := r.fromBucketCollectionIDs[collectionID] + if ok { + return dsName, nil + } + + dataStores, err := r.fromBucket.ListDataStores() + + if err != nil { + return nil, fmt.Errorf("Could not list data stores: %w", err) + + } + + for _, dsName := range dataStores { + dataStore, err := r.fromBucket.NamedDataStore(dsName) + if err != nil { + return nil, fmt.Errorf("Could not get data store %s: %w", dsName, err) + } + + if dataStore.GetCollectionID() == collectionID { + name := dataStore.DataStoreName() + r.fromBucketCollectionIDs[collectionID] = name + return name, nil + + } + } + return nil, fmt.Errorf("Could not find collection with ID %d", collectionID) +} + +// Start starts the replication. +func (r *XDCR) Start(ctx context.Context) error { + args := sgbucket.FeedArguments{ + ID: "xdcr-" + r.replicationID, + Backfill: sgbucket.FeedNoBackfill, + Terminator: r.terminator, + } + + callback := func(event sgbucket.FeedEvent) bool { + docID := string(event.Key) + trace("Got event %s, opcode: %s", docID, event.Opcode) + dsName, err := r.getFromBucketCollectionName(event.CollectionID) + if err != nil { + warn("Could not find collection with ID %d for docID %s: %s", event.CollectionID, docID, err) + r.errorCount.Add(1) + return false + + } + + switch event.Opcode { + case sgbucket.FeedOpDeletion, sgbucket.FeedOpMutation: + if strings.HasPrefix(docID, sgbucket.SyncDocPrefix) && !strings.HasPrefix(docID, sgbucket.Att2Prefix) { + trace("Filtering doc %s", docID) + r.docsFiltered.Add(1) + return true + + } + + toDataStore, err := r.toBucket.NamedDataStore(dsName) + if err != nil { + warn("Replicating doc %s, could not find matching datastore for %s in target bucket", event.Key, dsName) + r.errorCount.Add(1) + return false + } + + originalCas, err := toDataStore.Get(docID, nil) + if err != nil && !toDataStore.IsError(err, sgbucket.KeyNotFoundError) { + warn("Skipping replicating doc %s, could not perform a kv op get doc in toBucket: %s", event.Key, err) + r.errorCount.Add(1) + return false + } + + /* full LWW conflict resolution is not implemented in rosmar yet + + CBS algorithm is: + + if (command.CAS > document.CAS) + command succeeds + else if (command.CAS == document.CAS) + // Check the RevSeqno + if (command.RevSeqno > document.RevSeqno) + command succeeds + else if (command.RevSeqno == document.RevSeqno) + // Check the expiry time + if (command.Expiry > document.Expiry) + command succeeds + else if (command.Expiry == document.Expiry) + // Finally check flags + if (command.Flags < document.Flags) + command succeeds + + + command fails + + In the current state of rosmar: + + 1. all CAS values are unique. + 2. RevSeqno is not implemented + 3. Expiry is implemented and could be compared except all CAS values are unique. + 4. Flags are not implemented + + */ + + if event.Cas <= originalCas { + trace("Skipping replicating doc %s, cas %d <= %d", docID, event.Cas, originalCas) + return true + } + + toCollection, ok := toDataStore.(*Collection) + if !ok { + warn("Datastore is not of type Collection, is of type %T", toDataStore) + r.errorCount.Add(1) + } + + err = writeDoc(ctx, toCollection, originalCas, event) + if err != nil { + warn("Replicating doc %s, could not write doc: %s", event.Key, err) + r.errorCount.Add(1) + return false + + } + r.docsWritten.Add(1) + } + + return true + + } + return r.fromBucket.StartDCPFeed(ctx, args, callback, nil) +} + +// Stop terminates the replication. +func (r *XDCR) Stop(_ context.Context) error { + close(r.terminator) + r.terminator = nil + return nil +} + +// writeDoc writes a document to the target datastore. This will not return an error on a CAS mismatch, but will return error on other types of write. +func writeDoc(ctx context.Context, collection *Collection, originalCas uint64, event sgbucket.FeedEvent) error { + if event.Opcode == sgbucket.FeedOpDeletion { + _, err := collection.Remove(string(event.Key), originalCas) + if !errors.Is(err, sgbucket.CasMismatchErr{}) { + return err + } + return nil + } + + var xattrs []byte + var body []byte + if event.DataType&sgbucket.FeedDataTypeXattr != 0 { + var err error + var dcpXattrs []sgbucket.Xattr + body, dcpXattrs, err = sgbucket.DecodeValueWithXattrs(event.Value) + if err != nil { + return err + } + + xattrs, err = xattrToBytes(dcpXattrs) + if err != nil { + return err + } + + } else { + + body = event.Value + + } + + err := collection.SetWithMeta(ctx, string(event.Key), originalCas, event.Cas, event.Expiry, xattrs, body, event.DataType) + + if !collection.IsError(err, sgbucket.KeyNotFoundError) { + return err + } + + return nil + +} + +// Stats returns the stats of the XDCR replication. + +func (r *XDCR) Stats(context.Context) (*sgbucket.XDCRStats, error) { + + return &sgbucket.XDCRStats{ + DocsWritten: r.docsWritten.Load(), + DocsFiltered: r.docsFiltered.Load(), + ErrorCount: r.errorCount.Load(), + }, nil +} + +// xattrToBytes converts a slice of Xattrs to a byte slice of marshalled json. +func xattrToBytes(xattrs []sgbucket.Xattr) ([]byte, error) { + xattrMap := make(map[string]json.RawMessage) + for _, xattr := range xattrs { + xattrMap[xattr.Name] = xattr.Value + } + return json.Marshal(xattrMap) +} diff --git a/xdcr_test.go b/xdcr_test.go new file mode 100644 index 0000000..a3d7022 --- /dev/null +++ b/xdcr_test.go @@ -0,0 +1,141 @@ +package rosmar + +// Copyright 2024-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +import ( + "encoding/json" + "testing" + "time" + + sgbucket "github.com/couchbase/sg-bucket" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestXDCR(t *testing.T) { + ctx := testCtx(t) + fromBucket := makeTestBucket(t) + toBucket := makeTestBucket(t) + defer fromBucket.Close(ctx) + defer toBucket.Close(ctx) + + xdcr, err := NewXDCR(ctx, fromBucket, toBucket, sgbucket.XDCROptions{}) + require.NoError(t, err) + err = xdcr.Start(ctx) + require.NoError(t, err) + defer func() { + assert.NoError(t, xdcr.Stop(ctx)) + }() + const ( + syncDoc = "_sync:doc1doc2" + attachmentDoc = "_sync:att2:foo" + attachmentDocBody = `1ABINARYBLOB` + normalDoc = "doc2" + normalDocBody = `{"key":"value"}` + exp = 0 + ) + _, err = fromBucket.DefaultDataStore().AddRaw(syncDoc, exp, []byte(`{"foo", "bar"}`)) + require.NoError(t, err) + + attachmentDocCas, err := fromBucket.DefaultDataStore().WriteCas(attachmentDoc, exp, 0, []byte(attachmentDocBody), sgbucket.Raw) + require.NoError(t, err) + + normalDocCas, err := fromBucket.DefaultDataStore().WriteCas(normalDoc, exp, 0, []byte(normalDocBody), 0) + require.NoError(t, err) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + val, cas, err := toBucket.DefaultDataStore().GetRaw(normalDoc) + assert.NoError(c, err) + assert.Equal(c, normalDocCas, cas) + assert.JSONEq(c, normalDocBody, string(val)) + }, time.Second*5, time.Millisecond*100) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + val, cas, err := toBucket.DefaultDataStore().GetRaw(attachmentDoc) + assert.NoError(c, err) + assert.Equal(c, attachmentDocCas, cas) + assert.Equal(c, []byte(attachmentDocBody), val) + }, time.Second*5, time.Millisecond*100) + + _, err = toBucket.DefaultDataStore().Get(syncDoc, nil) + assert.True(t, toBucket.IsError(err, sgbucket.KeyNotFoundError)) + + require.NoError(t, fromBucket.DefaultDataStore().Delete(normalDoc)) + require.EventuallyWithT(t, func(c *assert.CollectT) { + var value string + _, err = toBucket.DefaultDataStore().Get(normalDoc, &value) + assert.Error(t, err) + assert.True(t, toBucket.IsError(err, sgbucket.KeyNotFoundError)) + }, time.Second*5, time.Millisecond*100) + + // stats are not updated in real time, so we need to wait a bit + require.EventuallyWithT(t, func(c *assert.CollectT) { + stats, err := xdcr.Stats(ctx) + assert.NoError(t, err) + assert.Equal(c, uint64(1), stats.DocsFiltered) + assert.Equal(c, uint64(3), stats.DocsWritten) + }, time.Second*5, time.Millisecond*100) + + stats, err := xdcr.Stats(ctx) + require.NoError(t, err) + require.Equal(t, uint64(0), stats.ErrorCount) +} + +func TestXattrMigration(t *testing.T) { + ctx := testCtx(t) + fromBucket := makeTestBucket(t) + toBucket := makeTestBucket(t) + defer fromBucket.Close(ctx) + defer toBucket.Close(ctx) + + xdcr, err := NewXDCR(ctx, fromBucket, toBucket, sgbucket.XDCROptions{}) + require.NoError(t, err) + err = xdcr.Start(ctx) + require.NoError(t, err) + defer func() { + assert.NoError(t, xdcr.Stop(ctx)) + }() + + const ( + docID = "doc1" + systemXattrName = "_system" + userXattrName = "user" + body = `{"foo": "bar"}` + systemXattrVal = `{"bar": "baz"}` + userXattrVal = `{"baz": "baz"}` + isDelete = false + deleteBody = false + ) + + startingCas, err := fromBucket.DefaultDataStore().WriteWithXattrs(ctx, docID, 0, 0, []byte(body), map[string][]byte{systemXattrName: []byte(systemXattrVal)}, nil) + require.NoError(t, err) + require.Greater(t, startingCas, uint64(0)) + + startingCas, err = fromBucket.DefaultDataStore().SetXattrs(ctx, docID, map[string][]byte{userXattrName: []byte(userXattrVal)}) + require.NoError(t, err) + require.EventuallyWithT(t, func(c *assert.CollectT) { + toVal, xattrs, cas, err := toBucket.DefaultDataStore().GetWithXattrs(ctx, docID, []string{systemXattrName, userXattrName}) + assert.NoError(c, err) + assert.Equal(c, startingCas, cas) + assert.JSONEq(c, body, string(toVal)) + assert.JSONEq(c, systemXattrVal, string(xattrs[systemXattrName])) + assert.JSONEq(c, userXattrVal, string(xattrs[userXattrName])) + + }, time.Second*5, time.Millisecond*100) + stats, err := xdcr.Stats(ctx) + require.NoError(t, err) + require.Equal(t, uint64(0), stats.ErrorCount) +} + +func mustUnmarshalJSON(t *testing.T, val string) map[string]string { + var v map[string]string + err := json.Unmarshal([]byte(val), &v) + require.NoError(t, err) + return v +} From 9f9e245749d93df4fb542998544e9ab103641e0d Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Thu, 28 Mar 2024 11:16:46 -0400 Subject: [PATCH 2/9] Remove unused code --- xdcr_test.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/xdcr_test.go b/xdcr_test.go index a3d7022..374b0fb 100644 --- a/xdcr_test.go +++ b/xdcr_test.go @@ -9,7 +9,6 @@ package rosmar // the file licenses/APL2.txt. import ( - "encoding/json" "testing" "time" @@ -109,8 +108,6 @@ func TestXattrMigration(t *testing.T) { body = `{"foo": "bar"}` systemXattrVal = `{"bar": "baz"}` userXattrVal = `{"baz": "baz"}` - isDelete = false - deleteBody = false ) startingCas, err := fromBucket.DefaultDataStore().WriteWithXattrs(ctx, docID, 0, 0, []byte(body), map[string][]byte{systemXattrName: []byte(systemXattrVal)}, nil) @@ -132,10 +129,3 @@ func TestXattrMigration(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(0), stats.ErrorCount) } - -func mustUnmarshalJSON(t *testing.T, val string) map[string]string { - var v map[string]string - err := json.Unmarshal([]byte(val), &v) - require.NoError(t, err) - return v -} From b885e35575a515d2aece270dfe5553043fba44d1 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Thu, 28 Mar 2024 11:19:40 -0400 Subject: [PATCH 3/9] Make sure that only XDCR mobile on is allowed since that is all that is implemented --- xdcr.go | 4 +++- xdcr_test.go | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/xdcr.go b/xdcr.go index 9f4dc3c..4baaf08 100644 --- a/xdcr.go +++ b/xdcr.go @@ -33,7 +33,9 @@ type XDCR struct { // NewXDCR creates an instance of XDCR backed by rosmar. This is not started until Start is called. func NewXDCR(_ context.Context, fromBucket, toBucket *Bucket, opts sgbucket.XDCROptions) (*XDCR, error) { - + if opts.Mobile != sgbucket.XDCRMobileOn { + return nil, errors.New("Only sgbucket.XDCRMobileOn is supported in rosmar") + } return &XDCR{ fromBucket: fromBucket, toBucket: toBucket, diff --git a/xdcr_test.go b/xdcr_test.go index 374b0fb..16615f3 100644 --- a/xdcr_test.go +++ b/xdcr_test.go @@ -24,7 +24,7 @@ func TestXDCR(t *testing.T) { defer fromBucket.Close(ctx) defer toBucket.Close(ctx) - xdcr, err := NewXDCR(ctx, fromBucket, toBucket, sgbucket.XDCROptions{}) + xdcr, err := NewXDCR(ctx, fromBucket, toBucket, sgbucket.XDCROptions{Mobile: sgbucket.XDCRMobileOn}) require.NoError(t, err) err = xdcr.Start(ctx) require.NoError(t, err) @@ -93,7 +93,7 @@ func TestXattrMigration(t *testing.T) { defer fromBucket.Close(ctx) defer toBucket.Close(ctx) - xdcr, err := NewXDCR(ctx, fromBucket, toBucket, sgbucket.XDCROptions{}) + xdcr, err := NewXDCR(ctx, fromBucket, toBucket, sgbucket.XDCROptions{Mobile: sgbucket.XDCRMobileOn}) require.NoError(t, err) err = xdcr.Start(ctx) require.NoError(t, err) From 4798974c21b4214c9c386e857ca02aa72e5ce41f Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Sun, 31 Mar 2024 20:04:54 -0400 Subject: [PATCH 4/9] remove comments --- bucket_test.go | 5 +- collection+xattrs.go | 6 +- collection_test.go | 28 ++--- go.mod | 2 + xdcr.go | 264 +++++++++++++++++++++---------------------- xdcr_test.go | 102 ++++++++++------- 6 files changed, 210 insertions(+), 197 deletions(-) diff --git a/bucket_test.go b/bucket_test.go index 3aa5a53..232f92f 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -32,7 +32,7 @@ func init() { } // testCtx returns a context for testing -func testCtx(t *testing.T) context.Context { +func testCtx(_ *testing.T) context.Context { return context.Background() // match sync gateway interfaces for logging } @@ -45,9 +45,6 @@ func testBucketPath(t *testing.T) string { // makeTestBucketWithName creates a new persistent test bucket with a given name. If a bucket already exists, this function will fail. This uses testing.T.Cleanup to remove the bucket. func makeTestBucketWithName(t *testing.T, name string) *Bucket { - LoggingCallback = func(level LogLevel, fmt string, args ...any) { - t.Logf(logLevelNamesPrint[level]+fmt, args...) - } bucket, err := OpenBucket(uriFromPath(testBucketPath(t)), name, CreateNew) require.NoError(t, err) t.Cleanup(func() { diff --git a/collection+xattrs.go b/collection+xattrs.go index ad2566b..4326c08 100644 --- a/collection+xattrs.go +++ b/collection+xattrs.go @@ -39,7 +39,7 @@ func (c *Collection) GetXattrs( } // SetWithMeta updates a document fully with xattrs and body and allows specification of a specific CAS (newCas). This update will always happen as long as oldCas matches the value of existing document. This simulates the kv op setWithMeta. -func (c *Collection) SetWithMeta(_ context.Context, key string, oldCas CAS, newCas CAS, exp uint32, xattrs []byte, body []byte, datatype sgbucket.FeedDataType) error { +func (c *Collection) setWithMeta(key string, oldCas CAS, newCas CAS, exp uint32, xattrs []byte, body []byte, datatype sgbucket.FeedDataType) error { isJSON := datatype&sgbucket.FeedDataTypeJSON != 0 isDeletion := false return c.writeWithMeta(key, body, xattrs, oldCas, newCas, exp, isJSON, isDeletion) @@ -80,8 +80,8 @@ func (c *Collection) writeWithMeta(key string, body []byte, xattrs []byte, oldCa return nil } -// DeleteWithMeta tombstones a document and sets a specific cas. This update will always happen as long as oldCas matches the value of existing document. This simulates the kv op deleteWithMeta. -func (c *Collection) DeleteWithMeta(_ context.Context, key string, oldCas CAS, newCas CAS, exp uint32, xattrs []byte) error { +// deleteWithMeta tombstones a document and sets a specific cas. This update will always happen as long as oldCas matches the value of existing document. This simulates the kv op deleteWithMeta. +func (c *Collection) deleteWithMeta(key string, oldCas CAS, newCas CAS, exp uint32, xattrs []byte) error { var body []byte isJSON := false isDeletion := true diff --git a/collection_test.go b/collection_test.go index 1450ebd..6fb377d 100644 --- a/collection_test.go +++ b/collection_test.go @@ -633,10 +633,9 @@ func verifyEmptyBodyAndSyncXattr(t *testing.T, store sgbucket.DataStore, key str func TestSetWithMetaNoDocument(t *testing.T) { col := makeTestBucket(t).DefaultDataStore() const docID = "TestSetWithMeta" - ctx := testCtx(t) cas2 := CAS(1) body := []byte(`{"foo":"bar"}`) - err := col.(*Collection).SetWithMeta(ctx, docID, 0, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON) + err := col.(*Collection).setWithMeta(docID, 0, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON) require.NoError(t, err) val, cas, err := col.GetRaw(docID) @@ -652,10 +651,9 @@ func TestSetWithMetaOverwriteJSON(t *testing.T) { require.NoError(t, err) require.Greater(t, cas1, CAS(0)) - ctx := testCtx(t) cas2 := CAS(1) body := []byte(`{"foo":"bar"}`) - err = col.(*Collection).SetWithMeta(ctx, docID, cas1, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON) + err = col.(*Collection).setWithMeta(docID, cas1, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON) require.NoError(t, err) val, cas, err := col.GetRaw(docID) @@ -679,10 +677,9 @@ func TestSetWithMetaOverwriteNotJSON(t *testing.T) { require.Equal(t, sgbucket.FeedOpMutation, event1.Opcode) require.Equal(t, sgbucket.FeedDataTypeJSON, event1.DataType) - ctx := testCtx(t) cas2 := CAS(1) body := []byte(`ABC`) - err = col.(*Collection).SetWithMeta(ctx, docID, cas1, cas2, 0, nil, body, sgbucket.FeedDataTypeRaw) + err = col.(*Collection).setWithMeta(docID, cas1, cas2, 0, nil, body, sgbucket.FeedDataTypeRaw) require.NoError(t, err) val, cas, err := col.GetRaw(docID) @@ -706,18 +703,17 @@ func TestSetWithMetaOverwriteTombstone(t *testing.T) { deletedCas, err := col.Remove(docID, cas1) require.NoError(t, err) - ctx := testCtx(t) cas2 := CAS(1) body := []byte(`ABC`) // make sure there is a cas check even for tombstone - err = col.(*Collection).SetWithMeta(ctx, docID, CAS(0), cas2, 0, nil, body, sgbucket.FeedDataTypeJSON) + err = col.(*Collection).setWithMeta(docID, CAS(0), cas2, 0, nil, body, sgbucket.FeedDataTypeJSON) require.ErrorAs(t, err, &sgbucket.CasMismatchErr{}) events, _ := startFeed(t, bucket) // cas check even on tombstone - err = col.(*Collection).SetWithMeta(ctx, docID, deletedCas, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON) + err = col.(*Collection).setWithMeta(docID, deletedCas, cas2, 0, nil, body, sgbucket.FeedDataTypeJSON) require.NoError(t, err) event := <-events @@ -735,17 +731,16 @@ func TestSetWithMetaCas(t *testing.T) { docID := t.Name() body := []byte(`{"foo":"bar"}`) - ctx := testCtx(t) badStartingCas := CAS(1234) specifiedCas := CAS(1) // document doesn't exist, so cas mismatch will occur if CAS != 0 - err := col.(*Collection).SetWithMeta(ctx, docID, badStartingCas, specifiedCas, 0, nil, body, sgbucket.FeedDataTypeJSON) + err := col.(*Collection).setWithMeta(docID, badStartingCas, specifiedCas, 0, nil, body, sgbucket.FeedDataTypeJSON) require.ErrorAs(t, err, &sgbucket.CasMismatchErr{}) // document doesn't exist, but CAS 0 will allow writing - err = col.(*Collection).SetWithMeta(ctx, docID, CAS(0), specifiedCas, 0, nil, body, sgbucket.FeedDataTypeJSON) + err = col.(*Collection).setWithMeta(docID, CAS(0), specifiedCas, 0, nil, body, sgbucket.FeedDataTypeJSON) require.NoError(t, err) val, cas, err := col.GetRaw(docID) @@ -779,16 +774,15 @@ func TestDeleteWithMeta(t *testing.T) { specifiedCas := CAS(1) events, _ := startFeed(t, bucket) - ctx := testCtx(t) // pass a bad CAS and document will not delete badStartingCas := CAS(1234) // document doesn't exist, but CAS 0 will allow writing - err = col.(*Collection).DeleteWithMeta(ctx, docID, badStartingCas, specifiedCas, 0, nil) + err = col.(*Collection).deleteWithMeta(docID, badStartingCas, specifiedCas, 0, nil) require.ErrorAs(t, err, &sgbucket.CasMismatchErr{}) // tombstone with a good cas - err = col.(*Collection).DeleteWithMeta(ctx, docID, startingCas, specifiedCas, 0, nil) + err = col.(*Collection).deleteWithMeta(docID, startingCas, specifiedCas, 0, nil) require.NoError(t, err) event := <-events @@ -826,11 +820,11 @@ func TestDeleteWithMetaXattr(t *testing.T) { // pass a bad CAS and document will not delete badStartingCas := CAS(1234) // document doesn't exist, but CAS 0 will allow writing - err = col.DeleteWithMeta(ctx, docID, badStartingCas, specifiedCas, 0, nil) + err = col.deleteWithMeta(docID, badStartingCas, specifiedCas, 0, nil) require.ErrorAs(t, err, &sgbucket.CasMismatchErr{}) // tombstone with a good cas - err = col.DeleteWithMeta(ctx, docID, startingCas, specifiedCas, 0, []byte(fmt.Sprintf(fmt.Sprintf(`{"%s": "%s"}`, systemXattr, systemXattrVal)))) + err = col.deleteWithMeta(docID, startingCas, specifiedCas, 0, []byte(fmt.Sprintf(fmt.Sprintf(`{"%s": "%s"}`, systemXattr, systemXattrVal)))) require.NoError(t, err) _, err = col.Get(docID, nil) diff --git a/go.mod b/go.mod index 4073915..3141c1d 100644 --- a/go.mod +++ b/go.mod @@ -17,3 +17,5 @@ require ( gopkg.in/sourcemap.v1 v1.0.5 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/couchbase/sg-bucket => ../sg-bucket diff --git a/xdcr.go b/xdcr.go index 4baaf08..15fa595 100644 --- a/xdcr.go +++ b/xdcr.go @@ -19,16 +19,18 @@ import ( sgbucket "github.com/couchbase/sg-bucket" ) -// XDCR implements a XDCR bucket to bucket setup within rosmar. +// XDCR implements a XDCR bucket to bucket replication within rosmar. type XDCR struct { - terminator chan bool - fromBucketCollectionIDs map[uint32]sgbucket.DataStoreName - fromBucket *Bucket - toBucket *Bucket - replicationID string - docsWritten atomic.Uint64 - docsFiltered atomic.Uint64 - errorCount atomic.Uint64 + filterFunc xdcrFilterFunc + terminator chan bool + toBucketCollections map[uint32]*Collection + fromBucket *Bucket + toBucket *Bucket + replicationID string + docsFiltered atomic.Uint64 + docsWritten atomic.Uint64 + errorCount atomic.Uint64 + targetNewerDocs atomic.Uint64 } // NewXDCR creates an instance of XDCR backed by rosmar. This is not started until Start is called. @@ -37,143 +39,140 @@ func NewXDCR(_ context.Context, fromBucket, toBucket *Bucket, opts sgbucket.XDCR return nil, errors.New("Only sgbucket.XDCRMobileOn is supported in rosmar") } return &XDCR{ - fromBucket: fromBucket, - toBucket: toBucket, - replicationID: fmt.Sprintf("%s-%s", fromBucket.GetName(), toBucket.GetName()), - fromBucketCollectionIDs: map[uint32]sgbucket.DataStoreName{}, - terminator: make(chan bool), + fromBucket: fromBucket, + toBucket: toBucket, + replicationID: fmt.Sprintf("%s-%s", fromBucket.GetName(), toBucket.GetName()), + toBucketCollections: make(map[uint32]*Collection), + terminator: make(chan bool), + filterFunc: mobileXDCRFilter, }, nil } -// getFromBucketCollectionName returns the collection name for a given collection ID in the from bucket. -func (r *XDCR) getFromBucketCollectionName(collectionID uint32) (sgbucket.DataStoreName, error) { - dsName, ok := r.fromBucketCollectionIDs[collectionID] - if ok { - return dsName, nil +// processEvent processes a DCP event coming from a toBucket and replicates it to the target datastore. +func (r *XDCR) processEvent(event sgbucket.FeedEvent) bool { + docID := string(event.Key) + trace("Got event %s, opcode: %s", docID, event.Opcode) + fmt.Printf("Got event %s, opcode: %s\n", docID, event.Opcode) + col, ok := r.toBucketCollections[event.CollectionID] + if !ok { + logError("This violates the assumption that all collections are mapped to a target collection. This should not happen. Found event=%+v", event) + r.errorCount.Add(1) + return false } - dataStores, err := r.fromBucket.ListDataStores() - - if err != nil { - return nil, fmt.Errorf("Could not list data stores: %w", err) - - } - - for _, dsName := range dataStores { - dataStore, err := r.fromBucket.NamedDataStore(dsName) - if err != nil { - return nil, fmt.Errorf("Could not get data store %s: %w", dsName, err) - } - - if dataStore.GetCollectionID() == collectionID { - name := dataStore.DataStoreName() - r.fromBucketCollectionIDs[collectionID] = name - return name, nil - + switch event.Opcode { + case sgbucket.FeedOpDeletion, sgbucket.FeedOpMutation: + // Filter out events if we have a non XDCR filter + if r.filterFunc != nil && !r.filterFunc(&event) { + fmt.Printf("Filtering %s\n", docID) + trace("Filtering doc %s", docID) + r.docsFiltered.Add(1) + return true } - } - return nil, fmt.Errorf("Could not find collection with ID %d", collectionID) -} - -// Start starts the replication. -func (r *XDCR) Start(ctx context.Context) error { - args := sgbucket.FeedArguments{ - ID: "xdcr-" + r.replicationID, - Backfill: sgbucket.FeedNoBackfill, - Terminator: r.terminator, - } - callback := func(event sgbucket.FeedEvent) bool { - docID := string(event.Key) - trace("Got event %s, opcode: %s", docID, event.Opcode) - dsName, err := r.getFromBucketCollectionName(event.CollectionID) - if err != nil { - warn("Could not find collection with ID %d for docID %s: %s", event.CollectionID, docID, err) + toCas, err := col.Get(docID, nil) + if err != nil && !col.IsError(err, sgbucket.KeyNotFoundError) { + warn("Skipping replicating doc %s, could not perform a kv op get doc in toBucket: %s", event.Key, err) r.errorCount.Add(1) return false - } - switch event.Opcode { - case sgbucket.FeedOpDeletion, sgbucket.FeedOpMutation: - if strings.HasPrefix(docID, sgbucket.SyncDocPrefix) && !strings.HasPrefix(docID, sgbucket.Att2Prefix) { - trace("Filtering doc %s", docID) - r.docsFiltered.Add(1) - return true + /* full LWW conflict resolution is not implemented in rosmar yet - } + CBS algorithm is: - toDataStore, err := r.toBucket.NamedDataStore(dsName) - if err != nil { - warn("Replicating doc %s, could not find matching datastore for %s in target bucket", event.Key, dsName) - r.errorCount.Add(1) - return false - } + if (command.CAS > document.CAS) + command succeeds + else if (command.CAS == document.CAS) + // Check the RevSeqno + if (command.RevSeqno > document.RevSeqno) + command succeeds + else if (command.RevSeqno == document.RevSeqno) + // Check the expiry time + if (command.Expiry > document.Expiry) + command succeeds + else if (command.Expiry == document.Expiry) + // Finally check flags + if (command.Flags < document.Flags) + command succeeds - originalCas, err := toDataStore.Get(docID, nil) - if err != nil && !toDataStore.IsError(err, sgbucket.KeyNotFoundError) { - warn("Skipping replicating doc %s, could not perform a kv op get doc in toBucket: %s", event.Key, err) - r.errorCount.Add(1) - return false - } - /* full LWW conflict resolution is not implemented in rosmar yet + command fails - CBS algorithm is: + In the current state of rosmar: - if (command.CAS > document.CAS) - command succeeds - else if (command.CAS == document.CAS) - // Check the RevSeqno - if (command.RevSeqno > document.RevSeqno) - command succeeds - else if (command.RevSeqno == document.RevSeqno) - // Check the expiry time - if (command.Expiry > document.Expiry) - command succeeds - else if (command.Expiry == document.Expiry) - // Finally check flags - if (command.Flags < document.Flags) - command succeeds + 1. all CAS values are unique. + 2. RevSeqno is not implemented + 3. Expiry is implemented and could be compared except all CAS values are unique. + 4. Flags are not implemented + */ - command fails + if event.Cas <= toCas { + r.targetNewerDocs.Add(1) + trace("Skipping replicating doc %s, cas %d <= %d", docID, event.Cas, toCas) + return true + } - In the current state of rosmar: + err = opWithMeta(col, toCas, event) + if err != nil { + warn("Replicating doc %s, could not write doc: %s", event.Key, err) + r.errorCount.Add(1) + return false - 1. all CAS values are unique. - 2. RevSeqno is not implemented - 3. Expiry is implemented and could be compared except all CAS values are unique. - 4. Flags are not implemented + } + r.docsWritten.Add(1) + } - */ + return true - if event.Cas <= originalCas { - trace("Skipping replicating doc %s, cas %d <= %d", docID, event.Cas, originalCas) - return true - } +} - toCollection, ok := toDataStore.(*Collection) - if !ok { - warn("Datastore is not of type Collection, is of type %T", toDataStore) - r.errorCount.Add(1) +// Start starts the replication. +func (r *XDCR) Start(ctx context.Context) error { + // set up replication to target all existing collections, and map to other collections + scopes := make(map[string][]string) + fromDataStores, err := r.fromBucket.ListDataStores() + if err != nil { + return fmt.Errorf("Could not list data stores: %w", err) + } + toDataStores, err := r.toBucket.ListDataStores() + if err != nil { + return fmt.Errorf("Could not list toBucket data stores: %w", err) + } + for _, fromName := range fromDataStores { + fromDataStore, err := r.fromBucket.NamedDataStore(fromName) + if err != nil { + return fmt.Errorf("Could not get data store %s: %w when starting XDCR", fromName, err) + } + collectionID := fromDataStore.GetCollectionID() + for _, toName := range toDataStores { + if fromName.ScopeName() != toName.ScopeName() || fromName.CollectionName() != toName.CollectionName() { + continue } - - err = writeDoc(ctx, toCollection, originalCas, event) + toDataStore, err := r.toBucket.NamedDataStore(toName) if err != nil { - warn("Replicating doc %s, could not write doc: %s", event.Key, err) - r.errorCount.Add(1) - return false - + return fmt.Errorf("There is not a matching datastore name in the toBucket for the fromBucket %s", toName) } - r.docsWritten.Add(1) + col, ok := toDataStore.(*Collection) + if !ok { + return fmt.Errorf("DataStore %s is not of rosmar.Collection: %T", toDataStore, toDataStore) + } + r.toBucketCollections[collectionID] = col + scopes[fromName.ScopeName()] = append(scopes[fromName.ScopeName()], fromName.CollectionName()) + break } + } - return true - + args := sgbucket.FeedArguments{ + ID: "xdcr-" + r.replicationID, + Backfill: sgbucket.FeedNoBackfill, + Terminator: r.terminator, + Scopes: scopes, } - return r.fromBucket.StartDCPFeed(ctx, args, callback, nil) + + return r.fromBucket.StartDCPFeed(ctx, args, r.processEvent, nil) } // Stop terminates the replication. @@ -183,16 +182,8 @@ func (r *XDCR) Stop(_ context.Context) error { return nil } -// writeDoc writes a document to the target datastore. This will not return an error on a CAS mismatch, but will return error on other types of write. -func writeDoc(ctx context.Context, collection *Collection, originalCas uint64, event sgbucket.FeedEvent) error { - if event.Opcode == sgbucket.FeedOpDeletion { - _, err := collection.Remove(string(event.Key), originalCas) - if !errors.Is(err, sgbucket.CasMismatchErr{}) { - return err - } - return nil - } - +// opWithMeta writes a document to the target datastore given a type of Deletion or Mutation event with a specific cas. +func opWithMeta(collection *Collection, originalCas uint64, event sgbucket.FeedEvent) error { var xattrs []byte var body []byte if event.DataType&sgbucket.FeedDataTypeXattr != 0 { @@ -214,13 +205,11 @@ func writeDoc(ctx context.Context, collection *Collection, originalCas uint64, e } - err := collection.SetWithMeta(ctx, string(event.Key), originalCas, event.Cas, event.Expiry, xattrs, body, event.DataType) - - if !collection.IsError(err, sgbucket.KeyNotFoundError) { - return err + if event.Opcode == sgbucket.FeedOpDeletion { + return collection.deleteWithMeta(string(event.Key), originalCas, event.Cas, event.Expiry, xattrs) } - return nil + return collection.setWithMeta(string(event.Key), originalCas, event.Cas, event.Expiry, xattrs, body, event.DataType) } @@ -229,9 +218,10 @@ func writeDoc(ctx context.Context, collection *Collection, originalCas uint64, e func (r *XDCR) Stats(context.Context) (*sgbucket.XDCRStats, error) { return &sgbucket.XDCRStats{ - DocsWritten: r.docsWritten.Load(), - DocsFiltered: r.docsFiltered.Load(), - ErrorCount: r.errorCount.Load(), + DocsWritten: r.docsWritten.Load(), + DocsFiltered: r.docsFiltered.Load(), + ErrorCount: r.errorCount.Load(), + TargetNewerDocs: r.targetNewerDocs.Load(), }, nil } @@ -243,3 +233,11 @@ func xattrToBytes(xattrs []sgbucket.Xattr) ([]byte, error) { } return json.Marshal(xattrMap) } + +// xdcrFilterFunc is a function that filters out events from the replication. +type xdcrFilterFunc func(event *sgbucket.FeedEvent) bool + +// mobileXDCRFilter is the implicit key filtering function that Couchbase Server -mobile XDCR works on. +func mobileXDCRFilter(event *sgbucket.FeedEvent) bool { + return !(strings.HasPrefix(string(event.Key), sgbucket.SyncDocPrefix) && !strings.HasPrefix(string(event.Key), sgbucket.Att2Prefix)) +} diff --git a/xdcr_test.go b/xdcr_test.go index 16615f3..d713b02 100644 --- a/xdcr_test.go +++ b/xdcr_test.go @@ -26,6 +26,21 @@ func TestXDCR(t *testing.T) { xdcr, err := NewXDCR(ctx, fromBucket, toBucket, sgbucket.XDCROptions{Mobile: sgbucket.XDCRMobileOn}) require.NoError(t, err) + + const ( + scopeName = "customScope" + collectionName = "customCollection" + ) + fromDs, err := fromBucket.NamedDataStore(sgbucket.DataStoreNameImpl{Scope: scopeName, Collection: collectionName}) + require.NoError(t, err) + toDs, err := toBucket.NamedDataStore(sgbucket.DataStoreNameImpl{Scope: scopeName, Collection: collectionName}) + require.NoError(t, err) + // create collections on both sides + collections := map[sgbucket.DataStore]sgbucket.DataStore{ + fromBucket.DefaultDataStore(): toBucket.DefaultDataStore(), + fromDs: toDs, + } + err = xdcr.Start(ctx) require.NoError(t, err) defer func() { @@ -39,48 +54,55 @@ func TestXDCR(t *testing.T) { normalDocBody = `{"key":"value"}` exp = 0 ) - _, err = fromBucket.DefaultDataStore().AddRaw(syncDoc, exp, []byte(`{"foo", "bar"}`)) - require.NoError(t, err) - - attachmentDocCas, err := fromBucket.DefaultDataStore().WriteCas(attachmentDoc, exp, 0, []byte(attachmentDocBody), sgbucket.Raw) - require.NoError(t, err) - - normalDocCas, err := fromBucket.DefaultDataStore().WriteCas(normalDoc, exp, 0, []byte(normalDocBody), 0) - require.NoError(t, err) - - require.EventuallyWithT(t, func(c *assert.CollectT) { - val, cas, err := toBucket.DefaultDataStore().GetRaw(normalDoc) - assert.NoError(c, err) - assert.Equal(c, normalDocCas, cas) - assert.JSONEq(c, normalDocBody, string(val)) - }, time.Second*5, time.Millisecond*100) - - require.EventuallyWithT(t, func(c *assert.CollectT) { - val, cas, err := toBucket.DefaultDataStore().GetRaw(attachmentDoc) - assert.NoError(c, err) - assert.Equal(c, attachmentDocCas, cas) - assert.Equal(c, []byte(attachmentDocBody), val) - }, time.Second*5, time.Millisecond*100) - - _, err = toBucket.DefaultDataStore().Get(syncDoc, nil) - assert.True(t, toBucket.IsError(err, sgbucket.KeyNotFoundError)) - - require.NoError(t, fromBucket.DefaultDataStore().Delete(normalDoc)) - require.EventuallyWithT(t, func(c *assert.CollectT) { - var value string - _, err = toBucket.DefaultDataStore().Get(normalDoc, &value) - assert.Error(t, err) + var totalDocsFiltered uint64 + var totalDocsWritten uint64 + // run test on named and default collections + for fromDs, toDs := range collections { + _, err = fromDs.AddRaw(syncDoc, exp, []byte(`{"foo", "bar"}`)) + require.NoError(t, err) + + attachmentDocCas, err := fromDs.WriteCas(attachmentDoc, exp, 0, []byte(attachmentDocBody), sgbucket.Raw) + require.NoError(t, err) + + normalDocCas, err := fromDs.WriteCas(normalDoc, exp, 0, []byte(normalDocBody), 0) + require.NoError(t, err) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + val, cas, err := toDs.GetRaw(normalDoc) + assert.NoError(c, err) + assert.Equal(c, normalDocCas, cas) + assert.JSONEq(c, normalDocBody, string(val)) + }, time.Second*5, time.Millisecond*100) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + val, cas, err := toDs.GetRaw(attachmentDoc) + assert.NoError(c, err) + assert.Equal(c, attachmentDocCas, cas) + assert.Equal(c, []byte(attachmentDocBody), val) + }, time.Second*5, time.Millisecond*100) + + _, err = toDs.Get(syncDoc, nil) assert.True(t, toBucket.IsError(err, sgbucket.KeyNotFoundError)) - }, time.Second*5, time.Millisecond*100) - - // stats are not updated in real time, so we need to wait a bit - require.EventuallyWithT(t, func(c *assert.CollectT) { - stats, err := xdcr.Stats(ctx) - assert.NoError(t, err) - assert.Equal(c, uint64(1), stats.DocsFiltered) - assert.Equal(c, uint64(3), stats.DocsWritten) - }, time.Second*5, time.Millisecond*100) + require.NoError(t, fromDs.Delete(normalDoc)) + require.EventuallyWithT(t, func(c *assert.CollectT) { + var value string + _, err = toDs.Get(normalDoc, &value) + assert.Error(t, err) + assert.True(t, toBucket.IsError(err, sgbucket.KeyNotFoundError)) + }, time.Second*5, time.Millisecond*100) + + // stats are not updated in real time, so we need to wait a bit + require.EventuallyWithT(t, func(c *assert.CollectT) { + stats, err := xdcr.Stats(ctx) + assert.NoError(t, err) + assert.Equal(c, totalDocsFiltered+1, stats.DocsFiltered) + assert.Equal(c, totalDocsWritten+3, stats.DocsWritten) + }, time.Second*5, time.Millisecond*100) + totalDocsFiltered += 1 + totalDocsWritten += 3 + + } stats, err := xdcr.Stats(ctx) require.NoError(t, err) require.Equal(t, uint64(0), stats.ErrorCount) From bfa0cf1cfc315d62eaf2d3977cb3c05668e069ec Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Sun, 31 Mar 2024 20:21:31 -0400 Subject: [PATCH 5/9] review comments --- bucket_test.go | 8 ++++---- collection.go | 11 ++++++++--- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/bucket_test.go b/bucket_test.go index 232f92f..dbf9401 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -194,8 +194,8 @@ func TestDefaultCollection(t *testing.T) { coll := bucket.DefaultDataStore() assert.NotNil(t, coll) assert.Equal(t, bucketName+"._default._default", coll.GetName()) - assert.Equal(t, "_default", coll.DataStoreName().ScopeName()) - assert.Equal(t, "_default", coll.DataStoreName().CollectionName()) + assert.Equal(t, "_default", coll.ScopeName()) + assert.Equal(t, "_default", coll.CollectionName()) } func TestCreateCollection(t *testing.T) { @@ -211,8 +211,8 @@ func TestCreateCollection(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, coll) assert.Equal(t, bucketName+"._default.foo", coll.GetName()) - assert.Equal(t, "_default", coll.DataStoreName().ScopeName()) - assert.Equal(t, "foo", coll.DataStoreName().CollectionName()) + assert.Equal(t, "_default", coll.ScopeName()) + assert.Equal(t, "foo", coll.CollectionName()) colls, err := bucket.ListDataStores() assert.NoError(t, err) assert.Equal(t, colls, []sgbucket.DataStoreName{defaultCollection, collName}) diff --git a/collection.go b/collection.go index 4b89e97..20d10e8 100644 --- a/collection.go +++ b/collection.go @@ -68,9 +68,14 @@ func (c *Collection) GetName() string { return c.bucket.GetName() + "." + c.DataStoreNameImpl.String() } -// DataStoreName returns the scope and collection name. -func (c *Collection) DataStoreName() sgbucket.DataStoreName { - return c.DataStoreNameImpl +// ScopeName returns the scope name. _default for the default scope. +func (c *Collection) ScopeName() string { + return c.DataStoreNameImpl.ScopeName() +} + +// ScopeName returns the collection name. _default for the default collection +func (c *Collection) CollectionName() string { + return c.DataStoreNameImpl.CollectionName() } // GetCollectionID returns a unique ID for a given collection, used to identify the collection in DCP feeds and other places. From 0d5cc3af96170fb520dae8d1327bbd51487ea838 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 1 Apr 2024 11:29:23 -0400 Subject: [PATCH 6/9] update go.mod --- go.mod | 2 -- xdcr.go | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 3141c1d..4073915 100644 --- a/go.mod +++ b/go.mod @@ -17,5 +17,3 @@ require ( gopkg.in/sourcemap.v1 v1.0.5 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - -replace github.com/couchbase/sg-bucket => ../sg-bucket diff --git a/xdcr.go b/xdcr.go index 15fa595..d7a9ed0 100644 --- a/xdcr.go +++ b/xdcr.go @@ -129,7 +129,7 @@ func (r *XDCR) processEvent(event sgbucket.FeedEvent) bool { } -// Start starts the replication. +// Start starts the replication for all existing replications. Errors if there aren't corresponding named collections on each bucket. func (r *XDCR) Start(ctx context.Context) error { // set up replication to target all existing collections, and map to other collections scopes := make(map[string][]string) From 249fc06a7bcc27de03dcd6de285e8d344a16f478 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 1 Apr 2024 09:41:38 -0400 Subject: [PATCH 7/9] Merged MutationFeedStore2, Collection into sgbucket.DataStore --- feeds.go | 16 ---------------- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 3 insertions(+), 19 deletions(-) diff --git a/feeds.go b/feeds.go index 8f703bd..f294a69 100644 --- a/feeds.go +++ b/feeds.go @@ -23,10 +23,6 @@ var activeFeedCount int32 // for tests //////// BUCKET API: (sgbucket.MutationFeedStore interface) -func (bucket *Bucket) GetFeedType() sgbucket.FeedType { - return sgbucket.DcpFeedType -} - func (bucket *Bucket) StartDCPFeed( ctx context.Context, args sgbucket.FeedArguments, @@ -84,13 +80,6 @@ func (bucket *Bucket) StartDCPFeed( return nil } -func (wh *Bucket) StartTapFeed( - args sgbucket.FeedArguments, - dbStats *expvar.Map, -) (sgbucket.MutationFeed, error) { - return nil, &ErrUnimplemented{"rosmar bucket doesn't support tap feed, use DCP"} -} - //////// COLLECTION API: func (c *Collection) StartDCPFeed( @@ -350,8 +339,3 @@ func (e *event) asFeedEvent(collectionID uint32) *sgbucket.FeedEvent { } return &feedEvent } - -var ( - // Enforce interface conformance: - _ sgbucket.MutationFeedStore2 = &Bucket{} -) diff --git a/go.mod b/go.mod index 4073915..d33512f 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/couchbaselabs/rosmar go 1.19 require ( - github.com/couchbase/sg-bucket v0.0.0-20240327235911-eb1eb768174c + github.com/couchbase/sg-bucket v0.0.0-20240401153043-4d86ea8a8a98 github.com/google/uuid v1.6.0 github.com/mattn/go-sqlite3 v1.14.22 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index c4e5e20..6726d3a 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= -github.com/couchbase/sg-bucket v0.0.0-20240327235911-eb1eb768174c h1:IjO5PI0qJLJgiziRlbHVhTm7qGXyT/rY2rXHAJ53uNc= -github.com/couchbase/sg-bucket v0.0.0-20240327235911-eb1eb768174c/go.mod h1:5me3TJLTPfR0s3aMJZcPoTu5FT8oelaINz5l7Q3cApE= +github.com/couchbase/sg-bucket v0.0.0-20240401153043-4d86ea8a8a98 h1:rK0OBdmn+LeRCYKVrrub8jpzfRUkKZ3c+lAQHPItLRw= +github.com/couchbase/sg-bucket v0.0.0-20240401153043-4d86ea8a8a98/go.mod h1:5me3TJLTPfR0s3aMJZcPoTu5FT8oelaINz5l7Q3cApE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= From ce4cab031fec1458342235acf84b1ad3f7c5e69b Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 1 Apr 2024 11:52:26 -0400 Subject: [PATCH 8/9] Remove print statements --- xdcr.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/xdcr.go b/xdcr.go index d7a9ed0..2d57fce 100644 --- a/xdcr.go +++ b/xdcr.go @@ -53,7 +53,6 @@ func NewXDCR(_ context.Context, fromBucket, toBucket *Bucket, opts sgbucket.XDCR func (r *XDCR) processEvent(event sgbucket.FeedEvent) bool { docID := string(event.Key) trace("Got event %s, opcode: %s", docID, event.Opcode) - fmt.Printf("Got event %s, opcode: %s\n", docID, event.Opcode) col, ok := r.toBucketCollections[event.CollectionID] if !ok { logError("This violates the assumption that all collections are mapped to a target collection. This should not happen. Found event=%+v", event) @@ -65,7 +64,6 @@ func (r *XDCR) processEvent(event sgbucket.FeedEvent) bool { case sgbucket.FeedOpDeletion, sgbucket.FeedOpMutation: // Filter out events if we have a non XDCR filter if r.filterFunc != nil && !r.filterFunc(&event) { - fmt.Printf("Filtering %s\n", docID) trace("Filtering doc %s", docID) r.docsFiltered.Add(1) return true From 8c60c5d91ff55e08734aeef5cb9d6a02db720891 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Thu, 4 Apr 2024 21:41:49 -0400 Subject: [PATCH 9/9] update go.mod --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 8fbafe6..4b7d435 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/couchbaselabs/rosmar go 1.19 require ( - github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8 + github.com/couchbase/sg-bucket v0.0.0-20240405013818-2750c47d6936 github.com/google/uuid v1.6.0 github.com/mattn/go-sqlite3 v1.14.22 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 2066b3e..2db16e6 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= -github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8 h1:kfWMYvUgSg2yIZJx+t63Ucl+zorvFqlYayXPkiXFtSE= -github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8/go.mod h1:5me3TJLTPfR0s3aMJZcPoTu5FT8oelaINz5l7Q3cApE= +github.com/couchbase/sg-bucket v0.0.0-20240405013818-2750c47d6936 h1:RQX9vRQ1orRgr9IyTcJemj0yjY0heUJfmAY70dPC+YQ= +github.com/couchbase/sg-bucket v0.0.0-20240405013818-2750c47d6936/go.mod h1:5me3TJLTPfR0s3aMJZcPoTu5FT8oelaINz5l7Q3cApE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=