diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index d03032f2bda..2d8fcfa2f6b 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -25,6 +25,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" farm "github.com/dgryski/go-farm" @@ -48,6 +49,8 @@ const ( raftDefaults = "idx=1; learner=false;" ) +var proposalKey uint64 + type node struct { *conn.Node server *Server @@ -81,8 +84,19 @@ func (n *node) AmLeader() bool { return time.Since(n.lastQuorum) <= 5*time.Second } +// {2 bytes Node ID} {4 bytes for random} {2 bytes zero} +func (n *node) initProposalKey(id uint64) error { + x.AssertTrue(id != 0) + var err error + proposalKey, err = x.ProposalKey(n.Id) + if err != nil { + return err + } + return nil +} + func (n *node) uniqueKey() uint64 { - return uint64(n.Id)<<32 | uint64(n.Rand.Uint32()) + return atomic.AddUint64(&proposalKey, 1) } var errInternalRetry = errors.New("Retry Raft proposal internally") @@ -626,6 +640,7 @@ func (n *node) checkForCIDInEntries() (bool, error) { } func (n *node) initAndStartNode() error { + x.Check(n.initProposalKey(n.Id)) _, restart, err := n.PastLife() x.Check(err) diff --git a/dgraph/cmd/zero/zero_test.go b/dgraph/cmd/zero/zero_test.go index 9f4e18f113f..e7a3636f16d 100644 --- a/dgraph/cmd/zero/zero_test.go +++ b/dgraph/cmd/zero/zero_test.go @@ -24,8 +24,10 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc" + "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/testutil" + "github.com/dgraph-io/ristretto/z" ) func TestRemoveNode(t *testing.T) { @@ -84,3 +86,27 @@ func TestIdBump(t *testing.T) { _, err = zc.AssignIds(ctx, &pb.Num{Val: 10, Type: pb.Num_UID, Bump: true}) require.Contains(t, err.Error(), "Nothing to be leased") } + +func TestProposalKey(t *testing.T) { + + id := uint64(2) + node := &node{Node: &conn.Node{Id: id}, ctx: context.Background(), closer: z.NewCloser(1)} + node.initProposalKey(node.Id) + + pkey := proposalKey + nodeIdFromKey := proposalKey >> 48 + require.Equal(t, id, nodeIdFromKey, "id extracted from proposal key is not equal to initial value") + + valueOf48thBit := int(pkey & (1 << 48)) + require.Equal(t, 0, valueOf48thBit, "48th bit is not set to zero on initialisation") + + node.uniqueKey() + require.Equal(t, pkey+1, proposalKey, "proposal key should increment by 1 at each call of unique key") + + uniqueKeys := make(map[uint64]struct{}) + for i := 0; i < 10; i++ { + node.uniqueKey() + uniqueKeys[proposalKey] = struct{}{} + } + require.Equal(t, len(uniqueKeys), 10, "each iteration should create unique key") +} diff --git a/worker/draft.go b/worker/draft.go index e2425d5be5d..505917aa6bc 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -1756,7 +1756,7 @@ func (n *node) retryUntilSuccess(fn func() error, pause time.Duration) { // InitAndStartNode gets called after having at least one membership sync with the cluster. func (n *node) InitAndStartNode() { - initProposalKey(n.Id) + x.Check(initProposalKey(n.Id)) _, restart, err := n.PastLife() x.Check(err) diff --git a/worker/proposal.go b/worker/proposal.go index 96dfedf5738..b2648f5ab1c 100644 --- a/worker/proposal.go +++ b/worker/proposal.go @@ -32,7 +32,6 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/x" - "github.com/dgraph-io/ristretto/z" ) const baseTimeout time.Duration = 4 * time.Second @@ -111,9 +110,14 @@ func (rl *rateLimiter) decr(retry int) { var proposalKey uint64 // {2 bytes Node ID} {4 bytes for random} {2 bytes zero} -func initProposalKey(id uint64) { +func initProposalKey(id uint64) error { x.AssertTrue(id != 0) - proposalKey = uint64(groups().Node.Id)<<48 | uint64(z.FastRand())<<16 + var err error + proposalKey, err = x.ProposalKey(groups().Node.Id) + if err != nil { + return err + } + return nil } // uniqueKey is meant to be unique across all the replicas. diff --git a/x/x.go b/x/x.go index 1646b600182..d8538bd88ab 100644 --- a/x/x.go +++ b/x/x.go @@ -21,7 +21,9 @@ import ( "bytes" builtinGzip "compress/gzip" "context" + cr "crypto/rand" "crypto/tls" + "encoding/binary" "encoding/json" "fmt" "io" @@ -644,6 +646,21 @@ func RetryUntilSuccess(maxRetries int, waitAfterFailure time.Duration, return err } +// {2 bytes Node ID} {4 bytes for random} {2 bytes zero} +func ProposalKey(id uint64) (uint64, error) { + random4Bytes := make([]byte, 4) + if _, err := cr.Read(random4Bytes); err != nil { + return 0, err + } + proposalKey := id<<48 | uint64(binary.BigEndian.Uint32(random4Bytes))<<16 + // We want to avoid spillage to node id in case of overflow. For instance, if the + // random bytes end up being [xx,xx, 255, 255, 255, 255, 0 , 0] (xx, xx being the node id) + // we would spill to node id after 65535 calls to unique key. + // So by setting 48th bit to 0 we ensure that we never spill out to node ids. + proposalKey &= ^(uint64(1) << 47) + return proposalKey, nil +} + // HasString returns whether the slice contains the given string. func HasString(a []string, b string) bool { for _, k := range a {