Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-4300 improve rosmar XDCR handling #7162

Open
wants to merge 1 commit into
base: release/anemone
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2323,7 +2323,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
updatedDoc.IsTombstone = currentRevFromHistory.Deleted
if doc.metadataOnlyUpdate != nil {
if doc.metadataOnlyUpdate.CAS != "" {
updatedDoc.Spec = append(updatedDoc.Spec, sgbucket.NewMacroExpansionSpec(xattrMouCasPath(), sgbucket.MacroCas))
updatedDoc.Spec = append(updatedDoc.Spec, sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas))
}
} else {
if currentXattrs[base.MouXattrName] != nil && !isNewDocCreation {
Expand Down Expand Up @@ -3146,7 +3146,8 @@ func xattrCrc32cPath(xattrKey string) string {
return xattrKey + "." + xattrMacroValueCrc32c
}

func xattrMouCasPath() string {
// XattrMouCasPath returns the xattr path for the CAS value for expansion, _mou.cas
func XattrMouCasPath() string {
return base.MouXattrName + "." + xattrMacroCas
}

Expand Down
4 changes: 2 additions & 2 deletions db/hybrid_logical_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func TestHLVImport(t *testing.T) {
}
opts := &sgbucket.MutateInOptions{
MacroExpansion: []sgbucket.MacroExpansionSpec{
sgbucket.NewMacroExpansionSpec(xattrMouCasPath(), sgbucket.MacroCas),
sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas),
},
}
_, err = collection.dataStore.UpdateXattrs(ctx, docID, 0, cas, map[string][]byte{base.MouXattrName: base.MustJSONMarshal(t, mou)}, opts)
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestHLVImport(t *testing.T) {
}
opts := &sgbucket.MutateInOptions{
MacroExpansion: []sgbucket.MacroExpansionSpec{
sgbucket.NewMacroExpansionSpec(xattrMouCasPath(), sgbucket.MacroCas),
sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas),
},
}
_, err = collection.dataStore.UpdateXattrs(ctx, docID, 0, cas, map[string][]byte{base.MouXattrName: base.MustJSONMarshal(t, mou)}, opts)
Expand Down
10 changes: 10 additions & 0 deletions db/utilities_hlv_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ func EncodeTestVersion(versionString string) (encodedString string) {
return hexTimestamp + "@" + base64Source
}

// GetHelperBody returns the body contents of a document written by HLVAgent.
func (h *HLVAgent) GetHelperBody() string {
return string(base.MustJSONMarshal(h.t, defaultHelperBody))
}

// SourceID returns the encoded source ID for the HLVAgent
func (h *HLVAgent) SourceID() string {
return h.Source
}

// encodeTestHistory converts a simplified version history of the form "1@abc,2@def;3@ghi" to use hex-encoded versions and
// base64 encoded sources
func EncodeTestHistory(historyString string) (encodedString string) {
Expand Down
1 change: 1 addition & 0 deletions topologytest/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func createPeers(t *testing.T, peersOptions map[string]PeerOptions) map[string]P
peers := make(map[string]Peer, len(peersOptions))
for id, peerOptions := range peersOptions {
peer := NewPeer(t, id, buckets, peerOptions)
t.Logf("TopologyTest: created peer %s, SourceID=%+v", id, peer.SourceID())
t.Cleanup(func() {
peer.Close()
})
Expand Down
21 changes: 0 additions & 21 deletions topologytest/topologies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ var Topologies = []Topology{
},
},
skipIf: func(t *testing.T, activePeer string, peers map[string]Peer) {
switch activePeer {
case "cbs1":
t.Skip("CBG-4289 imported documents get CV updated")
}
if base.UnitTestUrlIsWalrus() {
switch activePeer {
case "cbl1":
Expand Down Expand Up @@ -139,8 +135,6 @@ var Topologies = []Topology{
skipIf: func(t *testing.T, activePeer string, peers map[string]Peer) {
if base.UnitTestUrlIsWalrus() {
switch activePeer {
case "cbs1", "cbs2":
t.Skip("CBG-4289 imported documents get CV updated")
case "cbl1":
t.Skip("CBG-4257, docs don't get CV when set from CBL")
}
Expand Down Expand Up @@ -346,11 +340,6 @@ var simpleTopologies = []Topology{
},
},
},
skipIf: func(t *testing.T, activePeer string, peers map[string]Peer) {
if base.UnitTestUrlIsWalrus() {
t.Skip("CBG-4300, need to construct a _vv on source if none is present, to then call setWithMeta")
}
},
},
{
/*
Expand Down Expand Up @@ -386,15 +375,5 @@ var simpleTopologies = []Topology{
},
},
},
skipIf: func(t *testing.T, activePeer string, peers map[string]Peer) {
if base.UnitTestUrlIsWalrus() {
switch activePeer {
case "cbs1":
t.Skip("CBG-4289 imported documents get CV updated")
case "cbs2":
t.Skip("CBG-4300, need to construct a _vv on source if none is present, to then call setWithMeta")
}
}
},
},
}
127 changes: 89 additions & 38 deletions xdcr/rosmar_xdcr.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,46 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve
return true
}

err = opWithMeta(ctx, col, r.fromBucketSourceID, toCas, toXattrs, event)
hlv, mou, body, err := getBodyHLVAndMou(event)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When mou.cas == cas on the target (i.e. there was an import), I believe we should be using toCas = hlv.cvCAS to decide whether to replicate the source mutation. So I think we require something above line 120 like:

 if toXattrs.mou.cas == toCas {
   toCas = toXattrs.hlv.cvCAS
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if naming everything 'sourceHLV, sourceMou, sourceBody' and 'targetHLV, targetMou etc' will make it easier to follow the logic here.

if err != nil {
base.WarnfCtx(ctx, "Replicating doc %s, could not get body, hlv, and mou: %s", event.Key, err)
r.errorCount.Add(1)
return false
}
if hlv != nil && mou != nil {
pCAS := base.HexCasToUint64(mou.PreviousCAS)
if pCAS <= toCas {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be considering pCAS when deciding whether to replicate or not - I believe only mou.cas, cvCAS and cas should be considered.

r.targetNewerDocs.Add(1)
base.TracefCtx(ctx, base.KeyWalrus, "Skipping replicating doc %s, cas %d <= %d", docID, event.Cas, toCas)
return true
}
}
if hlv == nil {
newHlv := db.NewHybridLogicalVector()
hlv = &newHlv
err := hlv.AddVersion(db.Version{
SourceID: r.fromBucketSourceID,
Value: event.Cas,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's where we want to use pCAS. If the source mutation (event) was an import (event.Cas == mou.cas), we should set the value to event.mou.pCas.

})
if err != nil {
base.WarnfCtx(ctx, "Replicating doc %s, could not set hlv.AddVersion: %s", event.Key, err)
r.errorCount.Add(1)
return false
}
hlv.CurrentVersionCAS = event.Cas
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the comment above, if the source mutation was an import, should set cvCAS to pCAS.

} // TODO: read existing originalXattrs[base.VvXattrName] and update the pv CBG-4250
// TODO: clear _mou when appropriate CBG-4251

if toXattrs == nil {
toXattrs = make(map[string][]byte, 1) // size 1 for _vv
}
err = updateXattrs(toXattrs, hlv, mou)
if err != nil {
base.WarnfCtx(ctx, "Replicating doc %s, could not update xattrs: %s", event.Key, err)
r.errorCount.Add(1)
return false
}
err = opWithMeta(ctx, col, toCas, toXattrs, body, &event)
if err != nil {
base.WarnfCtx(ctx, "Replicating doc %s, could not write doc: %s", event.Key, err)
r.errorCount.Add(1)
Expand Down Expand Up @@ -194,43 +233,7 @@ func (r *rosmarManager) Stop(_ context.Context) error {
}

// opWithMeta writes a document to the target datastore given a type of Deletion or Mutation event with a specific cas. The originalXattrs will contain only the _vv and _mou xattr.
func opWithMeta(ctx context.Context, collection *rosmar.Collection, sourceID string, originalCas uint64, originalXattrs map[string][]byte, event sgbucket.FeedEvent) error {
var xattrs map[string][]byte
var body []byte
if event.DataType&sgbucket.FeedDataTypeXattr != 0 {
var err error
body, xattrs, err = sgbucket.DecodeValueWithAllXattrs(event.Value)
if err != nil {
return err
}
} else {
xattrs = make(map[string][]byte, 1) // size one for _vv
body = event.Value
}
var vv *db.HybridLogicalVector
if bytes, ok := originalXattrs[base.VvXattrName]; ok {
err := json.Unmarshal(bytes, &vv)
if err != nil {
return fmt.Errorf("Could not unmarshal the existing vv xattr %s: %w", string(bytes), err)
}
} else {
newVv := db.NewHybridLogicalVector()
vv = &newVv
}
// TODO: read existing originalXattrs[base.VvXattrName] and update the pv CBG-4250

// TODO: clear _mou when appropriate CBG-4251

// update new cv with new source/cas
vv.SourceID = sourceID
vv.CurrentVersionCAS = event.Cas
vv.Version = event.Cas

var err error
xattrs[base.VvXattrName], err = json.Marshal(vv)
if err != nil {
return err
}
func opWithMeta(ctx context.Context, collection *rosmar.Collection, originalCas uint64, xattrs map[string][]byte, body []byte, event *sgbucket.FeedEvent) error {
xattrBytes, err := xattrToBytes(xattrs)
if err != nil {
return err
Expand Down Expand Up @@ -272,3 +275,51 @@ type xdcrFilterFunc func(event *sgbucket.FeedEvent) bool
func mobileXDCRFilter(event *sgbucket.FeedEvent) bool {
return !(strings.HasPrefix(string(event.Key), base.SyncDocPrefix) && !strings.HasPrefix(string(event.Key), base.Att2Prefix))
}

// getBodyHLVAndMou gets the body, vv, and mou from the event.
func getBodyHLVAndMou(event sgbucket.FeedEvent) (*db.HybridLogicalVector, *db.MetadataOnlyUpdate, []byte, error) {
if event.DataType&sgbucket.FeedDataTypeXattr == 0 {
return nil, nil, event.Value, nil
}
body, xattrs, err := sgbucket.DecodeValueWithAllXattrs(event.Value)
if err != nil {
return nil, nil, nil, err
}
var hlv *db.HybridLogicalVector
if bytes, ok := xattrs[base.VvXattrName]; ok {
err := json.Unmarshal(bytes, &hlv)
if err != nil {
return nil, nil, nil, fmt.Errorf("Could not unmarshal the vv xattr %s: %w", string(bytes), err)
}
}
var mou *db.MetadataOnlyUpdate
if bytes, ok := xattrs[base.MouXattrName]; ok {
err := json.Unmarshal(bytes, &mou)
if err != nil {
return nil, nil, nil, fmt.Errorf("Could not unmarshal the mou xattr %s: %w", string(bytes), err)
}
}
return hlv, mou, body, nil
}

// updateXattrs updates the xattrs with the hlv and mou.
func updateXattrs(xattrs map[string][]byte, hlv *db.HybridLogicalVector, mou *db.MetadataOnlyUpdate) error {
if xattrs == nil {
xattrs = make(map[string][]byte, 1)
}
if hlv != nil {
var err error
xattrs[base.VvXattrName], err = json.Marshal(hlv)
if err != nil {
return err
}
}
if mou != nil {
var err error
xattrs[base.MouXattrName], err = json.Marshal(mou)
if err != nil {
return err
}
}
return nil
}
Loading
Loading