Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Make Gossip DNS reject gossip messages if clockskew > tombstoneTimeou…
Browse files Browse the repository at this point in the history
…t / 2
  • Loading branch information
Tom Wilkie committed Jul 20, 2015
1 parent 5baddb8 commit 9e0702d
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 27 deletions.
17 changes: 13 additions & 4 deletions nameserver/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,22 @@ func (es *Entries) first(f func(*Entry) bool) (*Entry, error) {
return nil, fmt.Errorf("Not found")
}

func (es *Entries) Merge(other router.GossipData) {
es.merge(*other.(*Entries))
type GossipData struct {
Timestamp int64
Entries
}

func (es *Entries) Encode() [][]byte {
func (g *GossipData) Merge(o router.GossipData) {
other := o.(*GossipData)
g.Entries.merge(other.Entries)
if g.Timestamp < other.Timestamp {
g.Timestamp = other.Timestamp
}
}

func (g *GossipData) Encode() [][]byte {
buf := &bytes.Buffer{}
if err := gob.NewEncoder(buf).Encode(es); err != nil {
if err := gob.NewEncoder(buf).Encode(g); err != nil {
panic(err)
}
return [][]byte{buf.Bytes()}
Expand Down
58 changes: 35 additions & 23 deletions nameserver/nameserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ const (
// we delete entries for disconnected peers. Therefore they just need to hang
// around to account for propagation delay through gossip. 10 minutes sounds
// long enough.
tombstoneTimeout = time.Minute * 10
tombstoneTimeout = time.Minute * 30

// Used by prog/weaver/main.go and proxy/create_container_interceptor.go
DefaultDomain = "weave.local."

// Maximum age of acceptable gossip messages (to account for clock skew)
gossipWindow = int64(tombstoneTimeout/time.Second) / 2
)

// Nameserver: gossip-based, in memory nameserver.
Expand Down Expand Up @@ -78,16 +81,22 @@ func (n *Nameserver) Stop() {
n.quit <- struct{}{}
}

func (n *Nameserver) broadcastEntries(es ...Entry) error {
if n.gossip != nil {
return n.gossip.GossipBroadcast(&GossipData{
Entries: Entries(es),
Timestamp: now(),
})
}
return nil
}

func (n *Nameserver) AddEntry(hostname, containerid string, origin router.PeerName, addr address.Address) error {
n.infof("adding entry %s -> %s", hostname, addr.String())
n.Lock()
entry := n.entries.add(hostname, containerid, origin, addr)
n.Unlock()

if n.gossip != nil {
return n.gossip.GossipBroadcast(&Entries{entry})
}
return nil
return n.broadcastEntries(entry)
}

func (n *Nameserver) Lookup(hostname string) []address.Address {
Expand Down Expand Up @@ -130,9 +139,8 @@ func (n *Nameserver) ContainerDied(ident string) {
return false
})
n.Unlock()
if n.gossip != nil && len(*entries) > 0 {
err := n.gossip.GossipBroadcast(entries)
if err != nil {
if len(*entries) > 0 {
if err := n.broadcastEntries(*entries...); err != nil {
n.errorf("Failed to broadcast container '%s' death: %v", ident, err)
}
}
Expand Down Expand Up @@ -165,10 +173,7 @@ func (n *Nameserver) Delete(hostname, containerid, ipStr string, ip address.Addr
return true
})
n.Unlock()
if n.gossip != nil {
return n.gossip.GossipBroadcast(entries)
}
return nil
return n.broadcastEntries(*entries...)
}

func (n *Nameserver) deleteTombstones() {
Expand Down Expand Up @@ -200,39 +205,46 @@ func (n *Nameserver) String() string {
func (n *Nameserver) Gossip() router.GossipData {
n.RLock()
defer n.RUnlock()
result := make(Entries, len(n.entries))
copy(result, n.entries)
return &result
gossip := &GossipData{
Entries: make(Entries, len(n.entries)),
Timestamp: now(),
}
copy(gossip.Entries, n.entries)
return gossip
}

func (n *Nameserver) OnGossipUnicast(sender router.PeerName, msg []byte) error {
return nil
}

func (n *Nameserver) receiveGossip(msg []byte) (router.GossipData, router.GossipData, error) {
var entries Entries
if err := gob.NewDecoder(bytes.NewReader(msg)).Decode(&entries); err != nil {
var gossip GossipData
if err := gob.NewDecoder(bytes.NewReader(msg)).Decode(&gossip); err != nil {
return nil, nil, err
}

if err := entries.check(); err != nil {
if delta := gossip.Timestamp - now(); delta > gossipWindow || delta < -gossipWindow {
return nil, nil, fmt.Errorf("clock skew of %d detected", delta)
}

if err := gossip.Entries.check(); err != nil {
return nil, nil, err
}

n.Lock()
defer n.Unlock()

if n.peers != nil {
entries.filter(func(e *Entry) bool {
gossip.Entries.filter(func(e *Entry) bool {
return n.peers.Fetch(e.Origin) != nil
})
}

newEntries := n.entries.merge(entries)
newEntries := n.entries.merge(gossip.Entries)
if len(newEntries) > 0 {
return &newEntries, &entries, nil
return &GossipData{Entries: newEntries, Timestamp: now()}, &gossip, nil
}
return nil, &entries, nil
return nil, &gossip, nil
}

// merge received data into state and return "everything new I've
Expand Down

0 comments on commit 9e0702d

Please sign in to comment.