-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CBG-4300 improve rosmar XDCR handling #7162
base: release/anemone
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -123,7 +123,46 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve | |
return true | ||
} | ||
|
||
err = opWithMeta(ctx, col, r.fromBucketSourceID, toCas, toXattrs, event) | ||
hlv, mou, body, err := getBodyHLVAndMou(event) | ||
if err != nil { | ||
base.WarnfCtx(ctx, "Replicating doc %s, could not get body, hlv, and mou: %s", event.Key, err) | ||
r.errorCount.Add(1) | ||
return false | ||
} | ||
if hlv != nil && mou != nil { | ||
pCAS := base.HexCasToUint64(mou.PreviousCAS) | ||
if pCAS <= toCas { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We shouldn't be considering pCAS when deciding whether to replicate or not - I believe only mou.cas, cvCAS and cas should be considered. |
||
r.targetNewerDocs.Add(1) | ||
base.TracefCtx(ctx, base.KeyWalrus, "Skipping replicating doc %s, cas %d <= %d", docID, event.Cas, toCas) | ||
return true | ||
} | ||
} | ||
if hlv == nil { | ||
newHlv := db.NewHybridLogicalVector() | ||
hlv = &newHlv | ||
err := hlv.AddVersion(db.Version{ | ||
SourceID: r.fromBucketSourceID, | ||
Value: event.Cas, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here's where we want to use pCAS. If the source mutation (event) was an import (event.Cas == mou.cas), we should set the value to event.mou.pCas. |
||
}) | ||
if err != nil { | ||
base.WarnfCtx(ctx, "Replicating doc %s, could not set hlv.AddVersion: %s", event.Key, err) | ||
r.errorCount.Add(1) | ||
return false | ||
} | ||
hlv.CurrentVersionCAS = event.Cas | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the comment above, if the source mutation was an import, should set cvCAS to pCAS. |
||
} // TODO: read existing originalXattrs[base.VvXattrName] and update the pv CBG-4250 | ||
// TODO: clear _mou when appropriate CBG-4251 | ||
|
||
if toXattrs == nil { | ||
toXattrs = make(map[string][]byte, 1) // size 1 for _vv | ||
} | ||
err = updateXattrs(toXattrs, hlv, mou) | ||
if err != nil { | ||
base.WarnfCtx(ctx, "Replicating doc %s, could not update xattrs: %s", event.Key, err) | ||
r.errorCount.Add(1) | ||
return false | ||
} | ||
err = opWithMeta(ctx, col, toCas, toXattrs, body, &event) | ||
if err != nil { | ||
base.WarnfCtx(ctx, "Replicating doc %s, could not write doc: %s", event.Key, err) | ||
r.errorCount.Add(1) | ||
|
@@ -194,43 +233,7 @@ func (r *rosmarManager) Stop(_ context.Context) error { | |
} | ||
|
||
// opWithMeta writes a document to the target datastore given a type of Deletion or Mutation event with a specific cas. The originalXattrs will contain only the _vv and _mou xattr. | ||
func opWithMeta(ctx context.Context, collection *rosmar.Collection, sourceID string, originalCas uint64, originalXattrs map[string][]byte, event sgbucket.FeedEvent) error { | ||
var xattrs map[string][]byte | ||
var body []byte | ||
if event.DataType&sgbucket.FeedDataTypeXattr != 0 { | ||
var err error | ||
body, xattrs, err = sgbucket.DecodeValueWithAllXattrs(event.Value) | ||
if err != nil { | ||
return err | ||
} | ||
} else { | ||
xattrs = make(map[string][]byte, 1) // size one for _vv | ||
body = event.Value | ||
} | ||
var vv *db.HybridLogicalVector | ||
if bytes, ok := originalXattrs[base.VvXattrName]; ok { | ||
err := json.Unmarshal(bytes, &vv) | ||
if err != nil { | ||
return fmt.Errorf("Could not unmarshal the existing vv xattr %s: %w", string(bytes), err) | ||
} | ||
} else { | ||
newVv := db.NewHybridLogicalVector() | ||
vv = &newVv | ||
} | ||
// TODO: read existing originalXattrs[base.VvXattrName] and update the pv CBG-4250 | ||
|
||
// TODO: clear _mou when appropriate CBG-4251 | ||
|
||
// update new cv with new source/cas | ||
vv.SourceID = sourceID | ||
vv.CurrentVersionCAS = event.Cas | ||
vv.Version = event.Cas | ||
|
||
var err error | ||
xattrs[base.VvXattrName], err = json.Marshal(vv) | ||
if err != nil { | ||
return err | ||
} | ||
func opWithMeta(ctx context.Context, collection *rosmar.Collection, originalCas uint64, xattrs map[string][]byte, body []byte, event *sgbucket.FeedEvent) error { | ||
xattrBytes, err := xattrToBytes(xattrs) | ||
if err != nil { | ||
return err | ||
|
@@ -272,3 +275,51 @@ type xdcrFilterFunc func(event *sgbucket.FeedEvent) bool | |
func mobileXDCRFilter(event *sgbucket.FeedEvent) bool { | ||
return !(strings.HasPrefix(string(event.Key), base.SyncDocPrefix) && !strings.HasPrefix(string(event.Key), base.Att2Prefix)) | ||
} | ||
|
||
// getBodyHLVAndMou gets the body, vv, and mou from the event. | ||
func getBodyHLVAndMou(event sgbucket.FeedEvent) (*db.HybridLogicalVector, *db.MetadataOnlyUpdate, []byte, error) { | ||
if event.DataType&sgbucket.FeedDataTypeXattr == 0 { | ||
return nil, nil, event.Value, nil | ||
} | ||
body, xattrs, err := sgbucket.DecodeValueWithAllXattrs(event.Value) | ||
if err != nil { | ||
return nil, nil, nil, err | ||
} | ||
var hlv *db.HybridLogicalVector | ||
if bytes, ok := xattrs[base.VvXattrName]; ok { | ||
err := json.Unmarshal(bytes, &hlv) | ||
if err != nil { | ||
return nil, nil, nil, fmt.Errorf("Could not unmarshal the vv xattr %s: %w", string(bytes), err) | ||
} | ||
} | ||
var mou *db.MetadataOnlyUpdate | ||
if bytes, ok := xattrs[base.MouXattrName]; ok { | ||
err := json.Unmarshal(bytes, &mou) | ||
if err != nil { | ||
return nil, nil, nil, fmt.Errorf("Could not unmarshal the mou xattr %s: %w", string(bytes), err) | ||
} | ||
} | ||
return hlv, mou, body, nil | ||
} | ||
|
||
// updateXattrs updates the xattrs with the hlv and mou. | ||
func updateXattrs(xattrs map[string][]byte, hlv *db.HybridLogicalVector, mou *db.MetadataOnlyUpdate) error { | ||
if xattrs == nil { | ||
xattrs = make(map[string][]byte, 1) | ||
} | ||
if hlv != nil { | ||
var err error | ||
xattrs[base.VvXattrName], err = json.Marshal(hlv) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
if mou != nil { | ||
var err error | ||
xattrs[base.MouXattrName], err = json.Marshal(mou) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When mou.cas == cas on the target (i.e. there was an import), I believe we should be using toCas = hlv.cvCAS to decide whether to replicate the source mutation. So I think we require something above line 120 like:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if naming everything 'sourceHLV, sourceMou, sourceBody' and 'targetHLV, targetMou etc' will make it easier to follow the logic here.