Skip to content

Commit

Permalink
raft: leader response to learner MsgReadIndex
Browse files Browse the repository at this point in the history
Leader should check message sender after receiving MsgReadIndex, even
when raft quorum is 1. The message could be sent from learner node, and
leader should respond.
  • Loading branch information
jingyih committed Mar 28, 2019
1 parent 9c2b88d commit 5088d70
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 2 deletions.
8 changes: 6 additions & 2 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,8 +1044,12 @@ func stepLeader(r *raft, m pb.Message) error {
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
}
}
} else {
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
} else { // there is only one voting member (the leader) in the cluster
if m.From == None || m.From == r.id { // from leader itself
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
} else { // from learner member
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: r.raftLog.committed, Entries: m.Entries})
}
}

return nil
Expand Down
51 changes: 51 additions & 0 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2416,6 +2416,57 @@ func TestReadOnlyOptionSafe(t *testing.T) {
}
}

func TestReadOnlyWithLearner(t *testing.T) {
a := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
b := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())

nt := newNetwork(a, b)
setRandomizedElectionTimeout(b, b.electionTimeout+1)

for i := 0; i < b.electionTimeout; i++ {
b.tick()
}
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})

if a.state != StateLeader {
t.Fatalf("state = %s, want %s", a.state, StateLeader)
}

tests := []struct {
sm *raft
proposals int
wri uint64
wctx []byte
}{
{a, 10, 11, []byte("ctx1")},
{b, 10, 21, []byte("ctx2")},
{a, 10, 31, []byte("ctx3")},
{b, 10, 41, []byte("ctx4")},
}

for i, tt := range tests {
for j := 0; j < tt.proposals; j++ {
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
}

nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}})

r := tt.sm
if len(r.readStates) == 0 {
t.Fatalf("#%d: len(readStates) = 0, want non-zero", i)
}
rs := r.readStates[0]
if rs.Index != tt.wri {
t.Errorf("#%d: readIndex = %d, want %d", i, rs.Index, tt.wri)
}

if !bytes.Equal(rs.RequestCtx, tt.wctx) {
t.Errorf("#%d: requestCtx = %v, want %v", i, rs.RequestCtx, tt.wctx)
}
r.readStates = nil
}
}

func TestReadOnlyOptionLease(t *testing.T) {
a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
Expand Down

0 comments on commit 5088d70

Please sign in to comment.