Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

resolve inconsistencies detected during ring merge #3637

Merged
merged 7 commits into from
Jun 14, 2019
6 changes: 5 additions & 1 deletion ipam/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,10 @@ func (alloc *Allocator) sendRingUpdate(dest mesh.PeerName) {
alloc.gossip.GossipUnicast(dest, msg)
}

func (alloc *Allocator) checkRangeHasAllocations(r address.Range) bool {
return alloc.space.NumFreeAddressesInRange(r) != r.Size()
}

func (alloc *Allocator) update(sender mesh.PeerName, msg []byte) error {
reader := bytes.NewReader(msg)
decoder := gob.NewDecoder(reader)
Expand All @@ -832,7 +836,7 @@ func (alloc *Allocator) update(sender mesh.PeerName, msg []byte) error {
// If someone sent us a ring, merge it into ours. Note this will move us
// out of the awaiting-consensus state if we didn't have a ring already.
case data.Ring != nil:
updated, err := alloc.ring.Merge(*data.Ring)
updated, err := alloc.ring.Merge(*data.Ring, alloc.checkRangeHasAllocations)
switch err {
case nil:
if updated {
Expand Down
69 changes: 54 additions & 15 deletions ipam/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (r *Ring) GrantRangeToHost(start, end address.Address, peer mesh.PeerName)

// Merge the given ring into this ring and indicate whether this ring
// got updated as a result.
func (r *Ring) Merge(gossip Ring) (bool, error) {
func (r *Ring) Merge(gossip Ring, checkRangeHasAllocations func(r address.Range) bool) (bool, error) {
r.assertInvariants()
defer r.trackUpdates()()

Expand All @@ -238,12 +238,22 @@ func (r *Ring) Merge(gossip Ring) (bool, error) {
return false, ErrDifferentRange
}

result, updated, err := r.Entries.merge(gossip.Entries, r.Peer)
result, updated, err := r.Entries.merge(gossip.Entries, r.Peer, r, checkRangeHasAllocations)

if err != nil {
return false, err
}

// reset the free space for entries if there is invalid free space
// due to accepting an unexpected update from the peers
for i := 0; i < len(result); i++ {
distance := r.distance(result.entry(i).Token, result.entry(i+1).Token)
if result.entry(i).Peer == r.Peer && result.entry(i).Free > distance {
// case that can arise when a range that we own that got split and had no allocations
result.entry(i).Free = distance
}
}

if err := r.checkEntries(result); err != nil {
return false, fmt.Errorf("Merge of incoming data causes: %s", err)
}
Expand All @@ -260,7 +270,7 @@ func (r *Ring) Merge(gossip Ring) (bool, error) {
// entries belonging to ourPeer. Returns the merged entries and an
// indication whether the merge resulted in any changes, i.e. the
// result differs from the original.
func (es entries) merge(other entries, ourPeer mesh.PeerName) (result entries, updated bool, err error) {
func (es entries) merge(other entries, ourPeer mesh.PeerName, r *Ring, checkRangeHasAllocations func(r address.Range) bool) (result entries, updated bool, err error) {
var mine, theirs *entry
var previousOwner *mesh.PeerName
addToResult := func(e entry) { result = append(result, &e) }
Expand All @@ -274,6 +284,7 @@ func (es entries) merge(other entries, ourPeer mesh.PeerName) (result entries, u
var i, j int
for i < len(es) && j < len(other) {
mine, theirs = es[i], other[j]
common.Log.Debugln(fmt.Sprintf("[ring %s]: Merge mine.Token=%s theirs.Token=%s mine.Peer=%s theirs.Peer=%s mine.Version=%s theirs.Version=%s", ourPeer, mine.Token, theirs.Token, mine.Peer, theirs.Peer, fmt.Sprint(mine.Version), fmt.Sprint(theirs.Version)))
switch {
case mine.Token < theirs.Token:
addToResult(*mine)
Expand All @@ -282,29 +293,57 @@ func (es entries) merge(other entries, ourPeer mesh.PeerName) (result entries, u
case mine.Token > theirs.Token:
// insert, checking that a range owned by us hasn't been split
if previousOwner != nil && *previousOwner == ourPeer && theirs.Peer != ourPeer {
err = errEntryInMyRange(theirs)
return
// check we have no allocations in the range that got split
if checkRangeHasAllocations(address.Range{Start: theirs.Token, End: mine.Token}) {
err = errEntryInMyRange(theirs)
return
}
}
addTheirs(*theirs)
j++
case mine.Token == theirs.Token:
common.Log.Debugln(fmt.Sprintf("[ring %s]: Merge token=%s mine.Peer=%s theirs.Peer=%s mine.Version=%s theirs.Version=%s", ourPeer, mine.Token, mine.Peer, theirs.Peer, fmt.Sprint(mine.Version), fmt.Sprint(theirs.Version)))
// merge
switch {
case mine.Version >= theirs.Version:
if mine.Version == theirs.Version && !mine.Equal(theirs) {
err = errInconsistentEntry(mine, theirs)
return
if mine.Peer == ourPeer {
// if we own the entry and has allocations
if checkRangeHasAllocations(address.Range{Start: mine.Token, End: es.entry(i + 1).Token}) {
err = errInconsistentEntry(mine, theirs)
return
}
}
// tie-break here, pick the entry with the highest free count
if mine.Free >= theirs.Free {
addToResult(*mine)
previousOwner = &mine.Peer
} else {
addTheirs(*theirs)
}
} else {
addToResult(*mine)
previousOwner = &mine.Peer
}
addToResult(*mine)
previousOwner = &mine.Peer
common.Log.Debugln(fmt.Sprintf("[ring %s]: Merge token=%s mine.Peer=%s theirs.Peer=%s mine.Version=%s theirs.Version=%s", ourPeer, mine.Token, mine.Peer, theirs.Peer, fmt.Sprint(mine.Version), fmt.Sprint(theirs.Version)))
case mine.Version < theirs.Version:
if mine.Peer == ourPeer { // We shouldn't receive updates to our own tokens
err = errNewerVersion(mine, theirs)
return
if mine.Peer == ourPeer {
// We received update to our own tokens accept the received entry
// if either it belongs to a different peer and we do not have allocations
// in the range effectively given away, or it belongs to our own peer, in
// which case we should set our version to the one received plus one,
// effectively imposing our existing entry.
if theirs.Peer != ourPeer && !checkRangeHasAllocations(address.Range{Start: mine.Token, End: es.entry(i + 1).Token}) {
addTheirs(*theirs)
murali-reddy marked this conversation as resolved.
Show resolved Hide resolved
} else if theirs.Peer == ourPeer {
mine.Version = theirs.Version + 1
addToResult(*mine)
} else {
err = errNewerVersion(mine, theirs)
return
}
} else {
addTheirs(*theirs)
}
addTheirs(*theirs)
common.Log.Debugln(fmt.Sprintf("[ring %s]: Merge token=%s mine.Peer=%s theirs.Peer=%s mine.Version=%s theirs.Version=%s", ourPeer, mine.Token, mine.Peer, theirs.Peer, fmt.Sprint(mine.Version), fmt.Sprint(theirs.Version)))
}
i++
j++
Expand Down
82 changes: 74 additions & 8 deletions ipam/ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,25 @@ func ParseIP(s string) address.Address {
}

func merge(r1, r2 *Ring) error {
_, err := r1.Merge(*r2)
distance := func(start, end address.Address) address.Count {
if end > start {
return address.Count(end - start)
}
return address.Count((r1.End - start) + (end - r1.Start))
}
checkEntryHasAllocations := func(r address.Range) bool {
size := distance(r.Start, r.End)
entry, found := r1.Entries.get(r.Start)
if !found {
return false
}
if entry.Free < size {
return true
}
return false
}

_, err := r1.Merge(*r2, checkEntryHasAllocations)
return err
}

Expand Down Expand Up @@ -219,6 +237,61 @@ func TestMergeSimple(t *testing.T) {
require.Equal(t, ring2.Entries, ring1.Entries)
}

func TestMergeWithConflicts(t *testing.T) {

// received update to entry with a token and version identical to an entry we have
// but but holding different content, if there are no allocations, resolve the
// conflict by picking entry one with high free count
ring1 := NewRing(start, end, peer1name)
ring2 := NewRing(start, end, peer2name)
ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 256, Version: 1}}
ring2.Entries = []*entry{{Token: start, Peer: peer2name, Free: 256, Version: 1}}
require.NoError(t, merge(ring1, ring2))

// received update to entry with a token and version identical to an entry we have
// but but holding different content, if there are allocations, then reject the received
// entry
ring1 = NewRing(start, end, peer1name)
ring2 = NewRing(start, end, peer2name)
ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 128, Version: 1}}
ring2.Entries = []*entry{{Token: start, Peer: peer2name, Free: 256, Version: 1}}
require.Error(t, merge(ring1, ring2), "Expected error")

// received an entry with update to one of our own tokens and with new version,
// accept the received entry if its still going to belong to us
ring1 = NewRing(start, end, peer1name)
ring2 = NewRing(start, end, peer2name)
ring1.Entries = []*entry{{Token: start, Peer: peer1name}}
ring2.Entries = []*entry{{Token: start, Peer: peer1name, Version: 1}}
require.NoError(t, merge(ring1, ring2))

// received an entry with update to one of our own tokens and with new version,
// but belongs to a different peer, accept the received entry if we do not
// have allocations in the range
ring1 = NewRing(start, end, peer1name)
ring2 = NewRing(start, end, peer2name)
ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 256}}
ring2.Entries = []*entry{{Token: start, Peer: peer2name, Free: 256, Version: 1}}
require.NoError(t, merge(ring1, ring2))

// received an entry with update to one of our own tokens and with new version,
// reject received entry if we have allocations in the range
ring1 = NewRing(start, end, peer1name)
ring2 = NewRing(start, end, peer2name)
ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 128}}
ring2.Entries = []*entry{{Token: start, Peer: peer2name, Version: 1}}
require.Error(t, merge(ring1, ring2), "Expected error")

// we receive an entry that splits one of our ranges, giving some of it
// away to another peer. accept the entry provided that we have no allocations
// in the range that got given away.
ring1 = NewRing(start, end, peer1name)
ring2 = NewRing(start, end, peer2name)
ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 256, Version: 1}}
ring2.Entries = []*entry{{Token: start, Peer: peer1name, Free: 128, Version: 2}, {Token: middle, Peer: peer2name, Free: 128}}
require.NoError(t, merge(ring1, ring2))
}

func TestMergeErrors(t *testing.T) {
// Cannot Merge in an invalid ring
ring1 := NewRing(start, end, peer1name)
Expand All @@ -231,13 +304,6 @@ func TestMergeErrors(t *testing.T) {
ring2.Entries = []*entry{}
require.True(t, merge(ring1, ring2) == ErrDifferentRange, "Expected ErrDifferentRange")

// Cannot Merge newer version of entry I own
ring2 = NewRing(start, end, peer2name)
ring1.Entries = []*entry{{Token: start, Peer: peer1name}}
ring2.Entries = []*entry{{Token: start, Peer: peer1name, Version: 1}}
fmt.Println(merge(ring1, ring2))
require.Error(t, merge(ring1, ring2), "Expected error")

// Cannot Merge two entries with same version but different hosts
ring1.Entries = []*entry{{Token: start, Peer: peer1name}}
ring2.Entries = []*entry{{Token: start, Peer: peer2name}}
Expand Down