diff --git a/db/crud.go b/db/crud.go index 6156ce7709..172c3c564e 100644 --- a/db/crud.go +++ b/db/crud.go @@ -2323,7 +2323,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do updatedDoc.IsTombstone = currentRevFromHistory.Deleted if doc.metadataOnlyUpdate != nil { if doc.metadataOnlyUpdate.CAS != "" { - updatedDoc.Spec = append(updatedDoc.Spec, sgbucket.NewMacroExpansionSpec(xattrMouCasPath(), sgbucket.MacroCas)) + updatedDoc.Spec = append(updatedDoc.Spec, sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas)) } } else { if currentXattrs[base.MouXattrName] != nil && !isNewDocCreation { @@ -3146,7 +3146,8 @@ func xattrCrc32cPath(xattrKey string) string { return xattrKey + "." + xattrMacroValueCrc32c } -func xattrMouCasPath() string { +// XattrMouCasPath returns the xattr path for the CAS value for expansion, _mou.cas +func XattrMouCasPath() string { return base.MouXattrName + "." + xattrMacroCas } diff --git a/db/hybrid_logical_vector_test.go b/db/hybrid_logical_vector_test.go index 2fec9a2de3..58510fc9fd 100644 --- a/db/hybrid_logical_vector_test.go +++ b/db/hybrid_logical_vector_test.go @@ -329,7 +329,7 @@ func TestHLVImport(t *testing.T) { } opts := &sgbucket.MutateInOptions{ MacroExpansion: []sgbucket.MacroExpansionSpec{ - sgbucket.NewMacroExpansionSpec(xattrMouCasPath(), sgbucket.MacroCas), + sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas), }, } _, err = collection.dataStore.UpdateXattrs(ctx, docID, 0, cas, map[string][]byte{base.MouXattrName: base.MustJSONMarshal(t, mou)}, opts) @@ -387,7 +387,7 @@ func TestHLVImport(t *testing.T) { } opts := &sgbucket.MutateInOptions{ MacroExpansion: []sgbucket.MacroExpansionSpec{ - sgbucket.NewMacroExpansionSpec(xattrMouCasPath(), sgbucket.MacroCas), + sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas), }, } _, err = collection.dataStore.UpdateXattrs(ctx, docID, 0, cas, map[string][]byte{base.MouXattrName: base.MustJSONMarshal(t, mou)}, opts) diff --git a/db/utilities_hlv_testing.go b/db/utilities_hlv_testing.go index ae4c2631ed..4a2a08ec9b 100644 --- a/db/utilities_hlv_testing.go +++ b/db/utilities_hlv_testing.go @@ -103,6 +103,16 @@ func EncodeTestVersion(versionString string) (encodedString string) { return hexTimestamp + "@" + base64Source } +// GetHelperBody returns the body contents of a document written by HLVAgent. +func (h *HLVAgent) GetHelperBody() string { + return string(base.MustJSONMarshal(h.t, defaultHelperBody)) +} + +// SourceID returns the encoded source ID for the HLVAgent +func (h *HLVAgent) SourceID() string { + return h.Source +} + // encodeTestHistory converts a simplified version history of the form "1@abc,2@def;3@ghi" to use hex-encoded versions and // base64 encoded sources func EncodeTestHistory(historyString string) (encodedString string) { diff --git a/topologytest/peer_test.go b/topologytest/peer_test.go index 3cebfe0124..1037708baf 100644 --- a/topologytest/peer_test.go +++ b/topologytest/peer_test.go @@ -220,6 +220,7 @@ func createPeers(t *testing.T, peersOptions map[string]PeerOptions) map[string]P peers := make(map[string]Peer, len(peersOptions)) for id, peerOptions := range peersOptions { peer := NewPeer(t, id, buckets, peerOptions) + t.Logf("TopologyTest: created peer %s, SourceID=%+v", id, peer.SourceID()) t.Cleanup(func() { peer.Close() }) diff --git a/topologytest/topologies_test.go b/topologytest/topologies_test.go index 1f91ff5fa5..43daab5019 100644 --- a/topologytest/topologies_test.go +++ b/topologytest/topologies_test.go @@ -65,10 +65,6 @@ var Topologies = []Topology{ }, }, skipIf: func(t *testing.T, activePeer string, peers map[string]Peer) { - switch activePeer { - case "cbs1": - t.Skip("CBG-4289 imported documents get CV updated") - } if base.UnitTestUrlIsWalrus() { switch activePeer { case "cbl1": @@ -139,8 +135,6 @@ var Topologies = []Topology{ skipIf: func(t *testing.T, activePeer string, peers map[string]Peer) { if base.UnitTestUrlIsWalrus() { switch activePeer { - case "cbs1", "cbs2": - t.Skip("CBG-4289 imported documents get CV updated") case "cbl1": t.Skip("CBG-4257, docs don't get CV when set from CBL") } @@ -346,11 +340,6 @@ var simpleTopologies = []Topology{ }, }, }, - skipIf: func(t *testing.T, activePeer string, peers map[string]Peer) { - if base.UnitTestUrlIsWalrus() { - t.Skip("CBG-4300, need to construct a _vv on source if none is present, to then call setWithMeta") - } - }, }, { /* @@ -386,15 +375,5 @@ var simpleTopologies = []Topology{ }, }, }, - skipIf: func(t *testing.T, activePeer string, peers map[string]Peer) { - if base.UnitTestUrlIsWalrus() { - switch activePeer { - case "cbs1": - t.Skip("CBG-4289 imported documents get CV updated") - case "cbs2": - t.Skip("CBG-4300, need to construct a _vv on source if none is present, to then call setWithMeta") - } - } - }, }, } diff --git a/xdcr/rosmar_xdcr.go b/xdcr/rosmar_xdcr.go index 86a54a2c5b..af29cf63d3 100644 --- a/xdcr/rosmar_xdcr.go +++ b/xdcr/rosmar_xdcr.go @@ -123,7 +123,46 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve return true } - err = opWithMeta(ctx, col, r.fromBucketSourceID, toCas, toXattrs, event) + hlv, mou, body, err := getBodyHLVAndMou(event) + if err != nil { + base.WarnfCtx(ctx, "Replicating doc %s, could not get body, hlv, and mou: %s", event.Key, err) + r.errorCount.Add(1) + return false + } + if hlv != nil && mou != nil { + pCAS := base.HexCasToUint64(mou.PreviousCAS) + if pCAS <= toCas { + r.targetNewerDocs.Add(1) + base.TracefCtx(ctx, base.KeyWalrus, "Skipping replicating doc %s, cas %d <= %d", docID, event.Cas, toCas) + return true + } + } + if hlv == nil { + newHlv := db.NewHybridLogicalVector() + hlv = &newHlv + err := hlv.AddVersion(db.Version{ + SourceID: r.fromBucketSourceID, + Value: event.Cas, + }) + if err != nil { + base.WarnfCtx(ctx, "Replicating doc %s, could not set hlv.AddVersion: %s", event.Key, err) + r.errorCount.Add(1) + return false + } + hlv.CurrentVersionCAS = event.Cas + } // TODO: read existing originalXattrs[base.VvXattrName] and update the pv CBG-4250 + // TODO: clear _mou when appropriate CBG-4251 + + if toXattrs == nil { + toXattrs = make(map[string][]byte, 1) // size 1 for _vv + } + err = updateXattrs(toXattrs, hlv, mou) + if err != nil { + base.WarnfCtx(ctx, "Replicating doc %s, could not update xattrs: %s", event.Key, err) + r.errorCount.Add(1) + return false + } + err = opWithMeta(ctx, col, toCas, toXattrs, body, &event) if err != nil { base.WarnfCtx(ctx, "Replicating doc %s, could not write doc: %s", event.Key, err) r.errorCount.Add(1) @@ -194,43 +233,7 @@ func (r *rosmarManager) Stop(_ context.Context) error { } // opWithMeta writes a document to the target datastore given a type of Deletion or Mutation event with a specific cas. The originalXattrs will contain only the _vv and _mou xattr. -func opWithMeta(ctx context.Context, collection *rosmar.Collection, sourceID string, originalCas uint64, originalXattrs map[string][]byte, event sgbucket.FeedEvent) error { - var xattrs map[string][]byte - var body []byte - if event.DataType&sgbucket.FeedDataTypeXattr != 0 { - var err error - body, xattrs, err = sgbucket.DecodeValueWithAllXattrs(event.Value) - if err != nil { - return err - } - } else { - xattrs = make(map[string][]byte, 1) // size one for _vv - body = event.Value - } - var vv *db.HybridLogicalVector - if bytes, ok := originalXattrs[base.VvXattrName]; ok { - err := json.Unmarshal(bytes, &vv) - if err != nil { - return fmt.Errorf("Could not unmarshal the existing vv xattr %s: %w", string(bytes), err) - } - } else { - newVv := db.NewHybridLogicalVector() - vv = &newVv - } - // TODO: read existing originalXattrs[base.VvXattrName] and update the pv CBG-4250 - - // TODO: clear _mou when appropriate CBG-4251 - - // update new cv with new source/cas - vv.SourceID = sourceID - vv.CurrentVersionCAS = event.Cas - vv.Version = event.Cas - - var err error - xattrs[base.VvXattrName], err = json.Marshal(vv) - if err != nil { - return err - } +func opWithMeta(ctx context.Context, collection *rosmar.Collection, originalCas uint64, xattrs map[string][]byte, body []byte, event *sgbucket.FeedEvent) error { xattrBytes, err := xattrToBytes(xattrs) if err != nil { return err @@ -272,3 +275,51 @@ type xdcrFilterFunc func(event *sgbucket.FeedEvent) bool func mobileXDCRFilter(event *sgbucket.FeedEvent) bool { return !(strings.HasPrefix(string(event.Key), base.SyncDocPrefix) && !strings.HasPrefix(string(event.Key), base.Att2Prefix)) } + +// getBodyHLVAndMou gets the body, vv, and mou from the event. +func getBodyHLVAndMou(event sgbucket.FeedEvent) (*db.HybridLogicalVector, *db.MetadataOnlyUpdate, []byte, error) { + if event.DataType&sgbucket.FeedDataTypeXattr == 0 { + return nil, nil, event.Value, nil + } + body, xattrs, err := sgbucket.DecodeValueWithAllXattrs(event.Value) + if err != nil { + return nil, nil, nil, err + } + var hlv *db.HybridLogicalVector + if bytes, ok := xattrs[base.VvXattrName]; ok { + err := json.Unmarshal(bytes, &hlv) + if err != nil { + return nil, nil, nil, fmt.Errorf("Could not unmarshal the vv xattr %s: %w", string(bytes), err) + } + } + var mou *db.MetadataOnlyUpdate + if bytes, ok := xattrs[base.MouXattrName]; ok { + err := json.Unmarshal(bytes, &mou) + if err != nil { + return nil, nil, nil, fmt.Errorf("Could not unmarshal the mou xattr %s: %w", string(bytes), err) + } + } + return hlv, mou, body, nil +} + +// updateXattrs updates the xattrs with the hlv and mou. +func updateXattrs(xattrs map[string][]byte, hlv *db.HybridLogicalVector, mou *db.MetadataOnlyUpdate) error { + if xattrs == nil { + xattrs = make(map[string][]byte, 1) + } + if hlv != nil { + var err error + xattrs[base.VvXattrName], err = json.Marshal(hlv) + if err != nil { + return err + } + } + if mou != nil { + var err error + xattrs[base.MouXattrName], err = json.Marshal(mou) + if err != nil { + return err + } + } + return nil +} diff --git a/xdcr/xdcr_test.go b/xdcr/xdcr_test.go index fef5f81c99..240e2f7c92 100644 --- a/xdcr/xdcr_test.go +++ b/xdcr/xdcr_test.go @@ -152,17 +152,27 @@ func TestReplicateVV(t *testing.T) { fromBucketSourceID, err := GetSourceID(ctx, fromBucket) require.NoError(t, err) + hlvAgent := db.NewHLVAgent(t, fromDs, "fakeHLVSourceID", base.VvXattrName) + testCases := []struct { name string docID string body string + HLV func(fromCas uint64) *db.HybridLogicalVector hasHLV bool preXDCRFunc func(t *testing.T, docID string) uint64 }{ { - name: "normal doc", - docID: "doc1", - body: `{"key":"value"}`, + name: "normal doc", + docID: "doc1", + body: `{"key":"value"}`, + HLV: func(fromCas uint64) *db.HybridLogicalVector { + return &db.HybridLogicalVector{ + CurrentVersionCAS: fromCas, + SourceID: fromBucketSourceID, + Version: fromCas, + } + }, hasHLV: true, preXDCRFunc: func(t *testing.T, docID string) uint64 { cas, err := fromDs.WriteCas(docID, 0, 0, []byte(`{"key":"value"}`), 0) @@ -171,9 +181,16 @@ func TestReplicateVV(t *testing.T) { }, }, { - name: "dest doc older, expect overwrite", - docID: "doc2", - body: `{"datastore":"fromDs"}`, + name: "dest doc older, expect overwrite", + docID: "doc2", + body: `{"datastore":"fromDs"}`, + HLV: func(fromCas uint64) *db.HybridLogicalVector { + return &db.HybridLogicalVector{ + CurrentVersionCAS: fromCas, + SourceID: fromBucketSourceID, + Version: fromCas, + } + }, hasHLV: true, preXDCRFunc: func(t *testing.T, docID string) uint64 { _, err := toDs.WriteCas(docID, 0, 0, []byte(`{"datastore":"toDs"}`), 0) @@ -196,6 +213,23 @@ func TestReplicateVV(t *testing.T) { return cas }, }, + { + name: "src doc has hlv", + docID: "doc4", + body: hlvAgent.GetHelperBody(), + HLV: func(fromCas uint64) *db.HybridLogicalVector { + return &db.HybridLogicalVector{ + CurrentVersionCAS: fromCas, + SourceID: hlvAgent.SourceID(), + Version: fromCas, + } + }, + hasHLV: true, + preXDCRFunc: func(t *testing.T, docID string) uint64 { + ctx := base.TestCtx(t) + return hlvAgent.InsertWithHLV(ctx, docID) + }, + }, } // tests write a document // start xdcr @@ -225,7 +259,10 @@ func TestReplicateVV(t *testing.T) { return } require.Contains(t, xattrs, base.VvXattrName) - requireCV(t, xattrs[base.VvXattrName], fromBucketSourceID, fromCAS) + + var hlv db.HybridLogicalVector + require.NoError(t, base.JSONUnmarshal(xattrs[base.VvXattrName], &hlv)) + require.Equal(t, *testCase.HLV(fromCAS), hlv) }) } } @@ -264,6 +301,65 @@ func TestVVWriteTwice(t *testing.T) { requireCV(t, xattrs[base.VvXattrName], fromBucketSourceID, fromCAS2) } +func TestVVObeyMou(t *testing.T) { + fromBucket, fromDs, toBucket, toDs := getTwoBucketDataStores(t) + ctx := base.TestCtx(t) + fromBucketSourceID, err := GetSourceID(ctx, fromBucket) + require.NoError(t, err) + + docID := "doc1" + hlvAgent := db.NewHLVAgent(t, fromDs, fromBucketSourceID, base.VvXattrName) + fromCas1 := hlvAgent.InsertWithHLV(ctx, "doc1") + + xdcr := startXDCR(t, fromBucket, toBucket, XDCROptions{Mobile: MobileOn}) + defer func() { + assert.NoError(t, xdcr.Stop(ctx)) + }() + requireWaitForXDCRDocsProcessed(t, xdcr, 1) + + body, xattrs, destCas, err := toDs.GetWithXattrs(ctx, docID, []string{base.VvXattrName, base.MouXattrName, base.VirtualXattrRevSeqNo}) + require.NoError(t, err) + require.Equal(t, fromCas1, destCas) + require.JSONEq(t, hlvAgent.GetHelperBody(), string(body)) + require.NotContains(t, xattrs, base.MouXattrName) + require.Contains(t, xattrs, base.VvXattrName) + var vv db.HybridLogicalVector + require.NoError(t, base.JSONUnmarshal(xattrs[base.VvXattrName], &vv)) + expectedVV := db.HybridLogicalVector{ + CurrentVersionCAS: fromCas1, + SourceID: hlvAgent.SourceID(), + Version: fromCas1, + } + + require.Equal(t, expectedVV, vv) + + mou := &db.MetadataOnlyUpdate{ + PreviousCAS: base.CasToString(fromCas1), + PreviousRevSeqNo: db.RetrieveDocRevSeqNo(t, xattrs[base.VirtualXattrRevSeqNo]), + } + + opts := &sgbucket.MutateInOptions{ + MacroExpansion: []sgbucket.MacroExpansionSpec{ + sgbucket.NewMacroExpansionSpec(db.XattrMouCasPath(), sgbucket.MacroCas), + }, + } + fromCas2, err := fromDs.UpdateXattrs(ctx, docID, 0, fromCas1, map[string][]byte{base.MouXattrName: base.MustJSONMarshal(t, mou)}, opts) + require.NoError(t, err) + require.NotEqual(t, fromCas1, fromCas2) + + requireWaitForXDCRDocsProcessed(t, xdcr, 2) + + body, xattrs, destCas, err = toDs.GetWithXattrs(ctx, docID, []string{base.VvXattrName, base.MouXattrName}) + require.NoError(t, err) + require.Equal(t, fromCas1, destCas) + require.JSONEq(t, hlvAgent.GetHelperBody(), string(body)) + require.NotContains(t, xattrs, base.MouXattrName) + require.Contains(t, xattrs, base.VvXattrName) + vv = db.HybridLogicalVector{} + require.NoError(t, base.JSONUnmarshal(xattrs[base.VvXattrName], &vv)) + require.Equal(t, expectedVV, vv) +} + func TestLWWAfterInitialReplication(t *testing.T) { fromBucket, fromDs, toBucket, toDs := getTwoBucketDataStores(t) ctx := base.TestCtx(t)