diff --git a/harness/src/interface.rs b/harness/src/interface.rs index c9cabbf2d..b963ea0ce 100644 --- a/harness/src/interface.rs +++ b/harness/src/interface.rs @@ -48,11 +48,7 @@ impl Interface { /// Read messages out of the raft. pub fn read_messages(&mut self) -> Vec { match self.raft { - Some(_) => { - let mut msgs = self.msgs.drain(..).collect::>(); - msgs.extend(self.groups.take_messages()); - msgs - } + Some(_) => self.msgs.drain(..).collect(), None => vec![], } } diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index 1f2b08774..d566374c4 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -127,7 +127,10 @@ fn next_ents(r: &mut Raft, s: &MemStorage) -> Vec { fn do_send_append(raft: &mut Raft, to: u64) { let mut prs = raft.take_prs(); - raft.send_append(to, &mut prs); + { + let pr = prs.get_mut(to).unwrap(); + raft.send_append(to, pr); + } raft.set_prs(prs); } @@ -1193,7 +1196,7 @@ fn test_handle_msg_append() { ); sm.become_follower(2, INVALID_ID); - sm.handle_replication(m); + sm.handle_append_message(m); if sm.raft_log.last_index() != w_index { panic!( "#{}: last_index = {}, want {}", @@ -1360,7 +1363,7 @@ fn test_msg_append_response_wait_reset() { // The new leader has just emitted a new Term 4 entry; consume those messages // from the outgoing queue. - sm.bcast_append(None); + sm.bcast_append(); sm.read_messages(); // Node 2 acks the first entry, making it committed. diff --git a/harness/tests/integration_cases/test_raft_follower_replication.rs b/harness/tests/integration_cases/test_raft_follower_replication.rs index 07a948e74..329067df6 100644 --- a/harness/tests/integration_cases/test_raft_follower_replication.rs +++ b/harness/tests/integration_cases/test_raft_follower_replication.rs @@ -1,4 +1,4 @@ -// Copyright 2019 PingCAP, Inc. +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,10 +14,8 @@ use crate::test_util::*; use harness::{Interface, Network}; use raft::eraftpb::*; -use raft::group::{Groups, GroupsConfig}; use raft::storage::MemStorage; use raft::*; -use rand::Rng; use slog::Logger; use std::collections::HashSet; use std::iter::FromIterator; @@ -94,10 +92,11 @@ impl Sandbox { let c = new_test_config(leader, 10, 1); let storage = new_storage(peers.clone(), snapshot_index, last_index - 1); let mut leader_node = Interface::new(Raft::new(&c, storage, l).unwrap()); - leader_node.set_groups(Groups::new(GroupsConfig::new(group_config))); + leader_node.set_groups(group_config); leader_node.become_candidate(); leader_node.become_leader(); let entries = leader_node.raft_log.all_entries(); + let mut prs = leader_node.take_prs(); let mut interfaces = followers .clone() .drain(..) @@ -111,7 +110,7 @@ impl Sandbox { if scenario != FollowerScenario::Snapshot { Self::assert_entries_consistent(entries.clone(), node_entries); } - let mut pr = leader_node.mut_prs().get_mut(id).unwrap(); + let mut pr = prs.get_mut(id).unwrap(); pr.state = match scenario { FollowerScenario::NeedEntries(_) | FollowerScenario::UpToDate => { ProgressState::Replicate @@ -125,6 +124,7 @@ impl Sandbox { Some(node) }) .collect::>>(); + leader_node.set_prs(prs); interfaces.insert(0, Some(leader_node)); let network = Network::new(interfaces, l); Self { @@ -136,7 +136,6 @@ impl Sandbox { } // Only for `UpToDate` and `NeedEntries` - #[inline] fn assert_entries_consistent(leader: Vec, target: Vec) { for (e1, e2) in leader.iter().zip(target) { assert_eq!(e1.index, e2.index); @@ -144,49 +143,64 @@ impl Sandbox { } } - #[inline] fn assert_final_state(&self) { self.network.peers.iter().for_each(|(id, n)| { assert_eq!( n.raft_log.last_index(), - self.last_index + 1, + self.last_index, "The peer {} last index should be up-to-date", id ) }); + // The ProgressSet should be updated + self.network + .peers + .get(&self.leader) + .unwrap() + .prs() + .iter() + .for_each(|(_, pr)| assert!(pr.matched == self.last_index)) } // Get mutable Interface of the leader - #[inline] - pub fn leader_mut(&mut self) -> &mut Interface { + fn leader_mut(&mut self) -> &mut Interface { let leader = self.leader; self.network.peers.get_mut(&leader).unwrap() } // Get immutable Interface of the leader - #[inline] - pub fn leader(&self) -> &Interface { + fn leader(&self) -> &Interface { let leader = self.leader; self.network.peers.get(&leader).unwrap() } // Get a mutable Interface by given id - #[inline] - pub fn get_mut(&mut self, id: u64) -> &mut Interface { + fn get_mut(&mut self, id: u64) -> &mut Interface { self.network.peers.get_mut(&id).unwrap() } + + // Send a MsgPropose to the leader + fn propose(&mut self, only_dispatch: bool) { + let proposal = new_message(self.leader, self.leader, MessageType::MsgPropose, 1); + if only_dispatch { + self.network.dispatch(vec![proposal]).unwrap(); + } else { + self.network.send(vec![proposal]); + } + self.last_index += 1; + } } fn new_storage(peers: Vec, snapshot_index: u64, last_index: u64) -> MemStorage { let s = MemStorage::new_with_conf_state((peers.clone(), vec![])); let snapshot = new_snapshot(snapshot_index, 1, peers.clone()); - s.wl().apply_snapshot(snapshot).expect(""); + s.wl().apply_snapshot(snapshot).unwrap(); if snapshot_index < last_index { let mut ents = vec![]; for index in snapshot_index + 1..=last_index { ents.push(empty_entry(1, index)); } - s.wl().append(&ents).expect(""); + s.wl().append(&ents).unwrap(); } s } @@ -201,38 +215,33 @@ fn new_storage_by_scenario( match scenario { FollowerScenario::UpToDate => { let snapshot = new_snapshot(snapshot_index, 1, peers.clone()); - s.wl().apply_snapshot(snapshot).expect(""); + s.wl().apply_snapshot(snapshot).unwrap(); let mut ents = vec![]; for index in snapshot_index + 1..last_index { ents.push(empty_entry(1, index)); } ents.push(empty_entry(2, last_index)); - s.wl().append(&ents).expect(""); + s.wl().append(&ents).unwrap(); } FollowerScenario::NeedEntries(index) => { + assert!(index > snapshot_index); let snapshot = new_snapshot(snapshot_index, 1, peers.clone()); - s.wl().apply_snapshot(snapshot).expect(""); - let li = if index <= last_index && index > snapshot_index { - index - } else { - rand::thread_rng().gen_range(snapshot_index + 1, last_index) - }; + s.wl().apply_snapshot(snapshot).unwrap(); let mut ents = vec![]; - for index in snapshot_index + 1..li { - ents.push(empty_entry(1, index)); + for i in snapshot_index + 1..index { + ents.push(empty_entry(1, i)); } - if li == last_index { + if index == last_index { ents.push(empty_entry(2, index)); } - s.wl().append(&ents).expect(""); + s.wl().append(&ents).unwrap(); } FollowerScenario::Snapshot => { - let li = rand::thread_rng().gen_range(1, snapshot_index); let mut ents = vec![]; - for index in 2..li { + for index in 2..snapshot_index { ents.push(empty_entry(1, index)) } - s.wl().append(&ents).expect(""); + s.wl().append(&ents).unwrap(); } }; s @@ -273,32 +282,40 @@ fn test_pick_group_delegate() { ], ), ]; - for (expected_delegate, expected_msg_type, input) in tests { - let mut sandbox = Sandbox::new(&l, 1, input, group_config.clone(), 5, 10); - sandbox - .network - .dispatch(vec![new_message(1, 1, MessageType::MsgPropose, 1)]) - .expect(""); + for (i, (expected_delegate, expected_msg_type, input)) in tests.into_iter().enumerate() { + let mut sandbox = Sandbox::new(&l, 1, input.clone(), group_config.clone(), 5, 10); + + sandbox.propose(true); let mut msgs = sandbox.leader_mut().read_messages(); - assert_eq!(1, msgs.len(), "Should only send one msg"); + assert_eq!( + 1, + msgs.len(), + "#{} Should only send one msg: {:?}", + i, + input + ); + let m = msgs.pop().unwrap(); assert_eq!( m.msg_type, expected_msg_type, - "The sent msg type should be {:?} but got {:?}", - expected_delegate, m.msg_type, + "#{} The sent msg type should be {:?} but got {:?}", + i, expected_delegate, m.msg_type, ); + let delegate = m.to; let delegate_set: HashSet = HashSet::from_iter(expected_delegate); assert!( delegate_set.contains(&delegate), - "set {:?}, delegate {}", + "#{} set {:?}, delegate {}", + i, &delegate_set, delegate ); assert_eq!( - sandbox.leader().groups().get_delegate(1), - Some(delegate), - "The picked delegate should be cached" + sandbox.leader().groups.get_delegate(2), + delegate, + "#{} The picked delegate should be cached", + i ); } } @@ -314,10 +331,8 @@ fn test_delegate_in_group_containing_leader() { (4, FollowerScenario::UpToDate), ]; let mut sandbox = Sandbox::new(&l, 1, followers.clone(), group_config.clone(), 5, 10); - sandbox - .network - .dispatch(vec![new_message(1, 1, MessageType::MsgPropose, 1)]) - .expect(""); + + sandbox.propose(true); let msgs = sandbox.leader_mut().read_messages(); assert_eq!(msgs.len(), 3); msgs.iter() @@ -339,24 +354,25 @@ fn test_broadcast_append_use_delegate() { 5, 10, ); - dbg!(sandbox.leader().groups()); - sandbox - .network - .dispatch(vec![new_message(1, 1, MessageType::MsgPropose, 1)]) - .expect(""); + + sandbox.propose(true); let mut msgs = sandbox.leader_mut().read_messages(); assert_eq!(1, msgs.len()); + let m = msgs.pop().unwrap(); assert_eq!(m.msg_type, MessageType::MsgAppend); assert!(m.bcast_targets.contains(&3)); assert!(m.bcast_targets.contains(&4)); + let delegate = m.to; assert_eq!(delegate, 2); - sandbox.network.dispatch(vec![m]).expect(""); - assert_eq!(Some(2), sandbox.leader().groups().get_delegate(1)); + + sandbox.network.dispatch(vec![m]).unwrap(); + assert_eq!(2, sandbox.leader().groups.get_delegate(2)); let mut msgs = sandbox.get_mut(delegate).read_messages(); assert_eq!(3, msgs.len()); - let bcast_resp = msgs.pop().unwrap(); + + let bcast_resp = msgs.remove(0); // Send to leader first assert_eq!(bcast_resp.msg_type, MessageType::MsgAppendResponse); let to_send_ids = sandbox .followers @@ -370,7 +386,7 @@ fn test_broadcast_append_use_delegate() { m.from, 1, "the delegated message must looks like coming from leader" ); - assert_eq!(m.from_delegate, 2, "'from_delegate' must be set"); + assert_eq!(m.delegate, 2, "'delegate' must be set"); assert_eq!(m.msg_type, MessageType::MsgAppend); assert!(set.contains(&m.to)); }); @@ -391,36 +407,27 @@ fn test_delegate_reject_broadcast() { (4, FollowerScenario::NeedEntries(12)), ]; let mut sandbox = Sandbox::new(&l, 1, followers, group_config, 5, 20); - sandbox.leader_mut().mut_prs().get_mut(2).unwrap().next_idx = 10; // make a conflict next_idx - sandbox - .network - .dispatch(vec![new_message(1, 1, MessageType::MsgPropose, 1)]) - .expect(""); + + sandbox.leader_mut().mut_prs().get_mut(4).unwrap().next_idx = 15; // make a conflict next_idx + sandbox.propose(true); let mut msgs = sandbox.leader_mut().read_messages(); let m = msgs.pop().unwrap(); - assert_eq!(2, m.to); - let pr_peer4 = sandbox.leader().prs().get(4).unwrap(); - assert_eq!(ProgressState::Delegated, pr_peer4.state); - assert_eq!( - sandbox.last_index + 2, - pr_peer4.next_idx, - "The progress of members should be updated when using delegate" - ); - sandbox.network.dispatch(vec![m]).expect(""); - let mut msgs = sandbox.get_mut(2).read_messages(); + assert_eq!(4, m.to); + sandbox.network.dispatch(vec![m]).unwrap(); + + let mut msgs = sandbox.get_mut(4).read_messages(); assert_eq!(1, msgs.len()); let m = msgs.pop().unwrap(); assert_eq!(MessageType::MsgAppendResponse, m.msg_type); assert!(m.reject); assert_eq!(1, m.to); + sandbox.network.dispatch(vec![m]).unwrap(); assert_eq!( - 2, - m.get_bcast_targets().len(), - "If a delegate rejects broadcasting, it should send back all the commissions to the leader" + 4, + sandbox.leader().groups.get_delegate(2), + "The delegate won't be dismissed when rejecting MsgAppend" ); - sandbox.network.dispatch(vec![m]).expect(""); - // Delegate(peer 2) of group 1 should be repicked - assert_eq!(Some(4), sandbox.leader().groups().get_delegate(1)); + let mut msgs = sandbox.leader_mut().read_messages(); assert_eq!(1, msgs.len()); let m = msgs.pop().unwrap(); @@ -430,117 +437,179 @@ fn test_delegate_reject_broadcast() { sandbox.assert_final_state(); } -// test_send_append_use_delegate ensures that the leader picks a delegate to send entries if it receives a rejection from the follower. #[test] -fn test_send_append_use_delegate() { +fn test_follower_only_send_reject_to_delegate() { let l = default_logger(); - let group_config = vec![(2, vec![1]), (1, vec![2, 3, 4])]; + let group_config = vec![(2, vec![1]), (1, vec![2, 3])]; let followers = vec![ - (2, FollowerScenario::NeedEntries(7)), - (3, FollowerScenario::Snapshot), - (4, FollowerScenario::NeedEntries(10)), + (2, FollowerScenario::NeedEntries(10)), + (3, FollowerScenario::NeedEntries(7)), ]; let mut sandbox = Sandbox::new(&l, 1, followers, group_config, 5, 20); - // Make a conflict next_idx in peer 2 so that `maybe_decr_to` can work. - sandbox.leader_mut().mut_prs().get_mut(2).unwrap().next_idx = 21; - let mut m = new_message(2, 1, MessageType::MsgAppendResponse, 0); - m.index = 21; - m.reject = true; - m.reject_hint = 9; // make node2's match_idx to 9 - sandbox.network.dispatch(vec![m]).expect(""); + + sandbox.propose(true); let msgs = sandbox.leader_mut().read_messages(); - // Pick peer 4 as the delegate - assert_eq!(Some(4), sandbox.leader().groups().get_delegate(1)); - assert_eq!(1, msgs.len()); - let m = &msgs[0]; - assert_eq!(m.msg_type, MessageType::MsgAppend); - assert_eq!(4, m.to); - assert_eq!(1, m.get_bcast_targets().len()); - let targets = m.get_bcast_targets(); - assert_eq!(vec![2], targets); - sandbox.network.send(msgs); - assert_eq!( - sandbox.network.peers.get(&2).unwrap().raft_log.last_index(), - sandbox.last_index, - ); + + // Pick peer 2 as the delegate + assert_eq!(2, sandbox.leader().groups.get_delegate(3)); + sandbox.network.dispatch(msgs).unwrap(); + let mut msgs = sandbox.get_mut(2).read_messages(); + // MsgAppendResponse to 1 and MsgAppend to 3 + // We only care about the latter + assert_eq!(msgs.len(), 2); + let m = msgs.remove(1); + assert_eq!(m.get_msg_type(), MessageType::MsgAppend); + assert_eq!(m.to, 3); + assert_eq!(m.from, 1); + assert_eq!(m.delegate, 2); + sandbox.network.dispatch(vec![m]).unwrap(); + let mut msgs = sandbox.get_mut(3).read_messages(); + assert_eq!(msgs.len(), 1); + let m = msgs.pop().unwrap(); + assert_eq!(m.to, 2); + assert!(m.reject); } -// test_delegate_paused_due_to_full_inflight ensures that if the old delegate is paused . #[test] -fn test_dismiss_delegate_due_to_full_inflight() { +fn test_paused_delegate() { let l = default_logger(); let group_config = vec![(2, vec![1]), (1, vec![2, 3, 4])]; let followers = vec![ - (2, FollowerScenario::UpToDate), + (2, FollowerScenario::NeedEntries(10)), (3, FollowerScenario::NeedEntries(7)), - (4, FollowerScenario::NeedEntries(10)), + (4, FollowerScenario::Snapshot), ]; - let last_index = 20; - let mut sandbox = Sandbox::new(&l, 1, followers, group_config, 5, last_index); - let max_inflight = sandbox.leader().max_inflight; - sandbox - .network - .dispatch( - (0..max_inflight) - .map(|_| new_message(1, 1, MessageType::MsgPropose, 1)) - .collect::>(), - ) - .expect(""); - sandbox - .leader() - .prs() - .iter() - .filter(|(id, _)| **id != sandbox.leader) - .for_each(|(_, pr)| assert!(pr.is_paused())); - sandbox.leader_mut().read_messages().iter().for_each(|m| { - assert_eq!(m.msg_type, MessageType::MsgAppend); - assert_eq!(m.to, 2); - }); - assert_eq!( - 2, - sandbox - .leader() - .groups() - .get_delegate(1) - .unwrap() - .to_owned() - ); - // Assume node 3 and 4 sending response to the leader - let mut resp_from3 = new_message(3, 1, MessageType::MsgAppendResponse, 0); - resp_from3.index = last_index + max_inflight as u64; - let mut resp_from4 = resp_from3.clone(); - resp_from4.from = 4; - resp_from4.index = last_index + max_inflight as u64 - 1; - sandbox.network.dispatch(vec![resp_from3]).expect(""); - // This is a kind of special situation: - // After the leader receives the AppendResponse from 3 and handle it, it detects that node 3 need an extra `send_append` because node 3 was - // paused before. So the leader tries to pick a delegate for this extra `send_append`. However, as the pre-delegate node 2 and node4 are - // both paused, the leader has to send this msg on its own. - assert_eq!( - 2, - sandbox - .leader() - .groups() - .get_delegate(1) - .unwrap() - .to_owned() - ); - let _ = sandbox.leader_mut().read_messages(); - sandbox.network.dispatch(vec![resp_from4]).expect(""); - // Now the node 4 has the smallest `match_index`. + let mut sandbox = Sandbox::new(&l, 1, followers, group_config, 5, 20); + for id in 1..=4 { + // Reset inflights capacity to 1. + let r = sandbox.network.peers.get_mut(&id).unwrap(); + r.max_inflight = 1; + for (_, pr) in r.mut_prs().iter_mut() { + pr.ins = Inflights::new(1); + } + } + + // Leader will send append only to peer 2. + sandbox.propose(true); + let msgs = sandbox.get_mut(1).read_messages(); + assert_eq!(msgs.len(), 1); + sandbox.network.dispatch(msgs).unwrap(); + + // More proposals wont' cause more messages sent out. + sandbox.propose(true); + let msgs = sandbox.get_mut(1).read_messages(); + assert_eq!(msgs.len(), 0); + + // Step the append response from peer 2, then the leader can send more. + // And all append messages should contain `bcast_targets`. + let append_resp = sandbox.get_mut(2).read_messages()[0].clone(); + sandbox.network.dispatch(vec![append_resp]).unwrap(); + let msgs = sandbox.get_mut(1).read_messages(); + assert!(!msgs[0].get_bcast_targets().is_empty()); +} + +#[test] +fn test_dismiss_delegate_when_not_active() { + let l = default_logger(); + let group_config = vec![(2, vec![1]), (1, vec![2, 3, 4])]; + let followers = vec![ + (2, FollowerScenario::NeedEntries(10)), + (3, FollowerScenario::NeedEntries(7)), + (4, FollowerScenario::NeedEntries(6)), + ]; + let mut sandbox = Sandbox::new(&l, 1, followers, group_config, 5, 20); + + for id in 1..=4 { + // Reset check_quorum to true. + let r = sandbox.network.peers.get_mut(&id).unwrap(); + r.check_quorum = true; + } + + // Leader will send append only to peer 2. + sandbox.propose(true); + let msgs = sandbox.get_mut(1).read_messages(); + assert_eq!(msgs.len(), 1); + sandbox.network.dispatch(msgs).unwrap(); + + // Let the leader check quorum twice. Then delegates should be dismissed. + for _ in 0..2 { + { + let r = sandbox.network.peers.get_mut(&1).unwrap(); + for (id, pr) in r.mut_prs().iter_mut() { + if *id == 3 || *id == 4 { + pr.recent_active = true; + } + } + } + let s = sandbox.network.peers[&1].election_elapsed; + let e = sandbox.network.peers[&1].election_timeout(); + for _ in s..=e { + sandbox.leader_mut().tick(); + } + } + + // After the delegate is dismissed, leader will send append to it and an another new delegate. + sandbox.propose(true); + let mut msgs = sandbox.get_mut(1).read_messages(); + msgs = msgs + .into_iter() + .filter(|m| m.get_msg_type() == MessageType::MsgAppend) + .collect(); + assert_eq!(msgs.len(), 2); + sandbox.network.send(msgs); + sandbox.assert_final_state(); +} + +#[test] +fn test_update_group_by_group_id_in_message() { + let l = default_logger(); + let group_config = vec![(1, vec![1]), (2, vec![2, 3, 4]), (3, vec![5])]; + let followers = vec![ + (2, FollowerScenario::NeedEntries(10)), + (3, FollowerScenario::NeedEntries(9)), + (4, FollowerScenario::NeedEntries(8)), + (5, FollowerScenario::NeedEntries(7)), + ]; + let mut sandbox = Sandbox::new(&l, 1, followers, group_config, 5, 20); + + // Change peer 4 group id from 2 to 3. + sandbox.network.peers.get_mut(&4).unwrap().group_id = 3; + + sandbox.propose(false); + sandbox.assert_final_state(); + + sandbox.propose(false); + sandbox.assert_final_state(); assert_eq!( - 4, - sandbox - .leader() - .groups() - .get_delegate(1) - .unwrap() - .to_owned() + sandbox.leader().groups.dump(), + vec![(1, vec![1]), (2, vec![2, 3]), (3, vec![4, 5])], ); - let msgs = sandbox.leader_mut().read_messages(); - assert_eq!(1, msgs.len()); - msgs.iter().for_each(|m| { - assert_eq!(MessageType::MsgAppend, m.msg_type); - assert_eq!(4, m.to); - }) +} + +#[test] +fn test_delegate_must_be_able_to_send_logs_to_targets() { + let l = default_logger(); + let group_config = vec![(1, vec![1]), (2, vec![2, 3, 4])]; + let followers = vec![ + (2, FollowerScenario::UpToDate), + (3, FollowerScenario::NeedEntries(9)), + (4, FollowerScenario::Snapshot), + ]; + let mut sandbox = Sandbox::new(&l, 1, followers, group_config, 5, 20); + let max_inflight = sandbox.network.peers.get(&2).unwrap().max_inflight; + // Make Inflights 3 full + let node2 = sandbox.get_mut(2); + let pr3 = node2.mut_prs().get_mut(3).unwrap(); + pr3.become_replicate(); + for i in 1..=max_inflight { + pr3.ins.add(i as u64); + } + assert!(pr3.is_paused()); + // Make Progress 4 paused + let pr4 = node2.mut_prs().get_mut(4).unwrap(); + pr4.become_probe(); + pr4.pause(); + assert!(pr4.is_paused()); + sandbox.propose(false); + sandbox.assert_final_state(); } diff --git a/harness/tests/integration_cases/test_raft_paper.rs b/harness/tests/integration_cases/test_raft_paper.rs index f0b4baf69..62063a35a 100644 --- a/harness/tests/integration_cases/test_raft_paper.rs +++ b/harness/tests/integration_cases/test_raft_paper.rs @@ -23,7 +23,7 @@ use slog::Logger; pub fn commit_noop_entry(r: &mut Interface, s: &MemStorage) { assert_eq!(r.state, StateRole::Leader); - r.bcast_append(None); + r.bcast_append(); // simulate the response of MsgAppend let msgs = r.read_messages(); for m in msgs { diff --git a/proto/proto/eraftpb.proto b/proto/proto/eraftpb.proto index 08a408fe2..32be9fd0c 100644 --- a/proto/proto/eraftpb.proto +++ b/proto/proto/eraftpb.proto @@ -85,7 +85,7 @@ message Message { // The targets to send raft logs through the delegate repeated uint64 bcast_targets = 16; // Whether the message comes from a delegate actually - uint64 from_delegate = 17; + uint64 delegate = 17; } message HardState { diff --git a/src/group/mod.rs b/src/group/mod.rs index 9b83fecee..fb1852edb 100644 --- a/src/group/mod.rs +++ b/src/group/mod.rs @@ -18,54 +18,10 @@ //! # Follower Replication //! See https://github.com/tikv/rfcs/pull/33 -use crate::eraftpb::Message; -use std::collections::hash_map::Iter; -use std::collections::{HashMap, HashSet}; -use std::iter::FromIterator; - -/// Configuration for distribution of raft nodes in groups. -/// For the inner hashmap, the key is group ID and value is the group members. -#[derive(Clone, Debug)] -pub struct GroupsConfig { - inner: HashMap>, -} - -impl GroupsConfig { - /// Create a new GroupsConfig - pub fn new(config: Vec<(u64, Vec)>) -> Self { - let inner = HashMap::from_iter(config.into_iter()); - Self { inner } - } - - /// Return a iterator with inner group ID - group members pairs - #[inline] - pub fn iter(&self) -> Iter<'_, u64, Vec> { - self.inner.iter() - } -} +use std::collections::HashMap; -impl Default for GroupsConfig { - fn default() -> Self { - Self { - inner: HashMap::new(), - } - } -} - -#[derive(Clone, Debug)] -pub(crate) struct DelegatedMessage { - pub delegated_peers: HashSet, - pub inner: Message, -} - -impl DelegatedMessage { - fn into_message(mut self) -> Message { - let targets = self.delegated_peers.drain().collect::>(); - let mut m = self.inner; - m.bcast_targets = targets; - m - } -} +use crate::progress::progress_set::ProgressSet; +use crate::raft::INVALID_ID; /// Maintain all the groups info in Follower Replication /// @@ -73,151 +29,290 @@ impl DelegatedMessage { /// /// A node only belongs to one group /// -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct Groups { - /// Group config metadata - pub meta: GroupsConfig, - // The messages to be sent to any delegates - // The key is a group ID - delegated_msgs: HashMap>, - // node id => group id - indexes: HashMap, - // group id => delegate id - delegate_cache: HashMap, + // node id => (group id, delegate id). + indexes: HashMap, + + // Use to construct `bcast_targets` for delegates quickly. + bcast_targets: HashMap>, + + // Peers without chosen delegates. + unresolved: Vec, + + leader_group_id: u64, } impl Groups { - /// Create new Groups with given configuration - pub fn new(meta: GroupsConfig) -> Self { + /// Create new Groups with given configuration. + pub(crate) fn new(config: Vec<(u64, Vec)>) -> Self { let mut indexes = HashMap::new(); - for (group_id, members) in meta.inner.iter() { - for member in members.iter() { - indexes.insert(*member, *group_id); + let mut unresolved = Vec::new(); + for (group_id, members) in config { + for id in members { + indexes.insert(id, (group_id, INVALID_ID)); + unresolved.push(id); } } + Self { - meta, indexes, + unresolved, ..Default::default() } } - /// Take all the delegated messages into a vec - #[inline] - pub fn take_messages(&mut self) -> Vec { - self.delegated_msgs - .drain() - .map(|(_, mut msgs)| msgs.drain(..).map(|m| m.into_message()).collect::>()) - .flatten() - .collect::>() + pub(crate) fn set_leader_group_id(&mut self, leader_group: u64) { + self.leader_group_id = leader_group; } - #[inline] - pub(crate) fn has_message(&self) -> bool { - self.delegated_msgs.is_empty() + /// Get group id by member id. + pub(crate) fn get_group_id(&self, member: u64) -> Option { + self.indexes.get(&member).map(|(gid, _)| *gid) } - /// Create a DelegatedMessage and push it to the corresponding `delegated_msgs` - pub(crate) fn insert_delegated_msg(&mut self, group_id: u64, m: Message, peer: Option) { - let mut set = HashSet::default(); - if let Some(p) = peer { - set.insert(p); + /// Get a delegate for `to`. The return value could be `to` itself. + pub fn get_delegate(&self, to: u64) -> u64 { + match self.indexes.get(&to) { + Some((_, delegate)) => *delegate, + None => INVALID_ID, } - let msg = DelegatedMessage { - delegated_peers: set, - inner: m, + } + + // Pick a delegate for the given peer. + // + // The delegate must satisfy conditions below: + // 1. The progress state should be `ProgressState::Replicate`; + // 2. The progress has biggest `match`; + // If all the members are requiring snapshots, use given `to`. + fn pick_delegate(&mut self, to: u64, prs: &ProgressSet) { + let group_id = match self.indexes.get(&to) { + Some((_, delegate)) if *delegate != INVALID_ID => return, + Some((gid, _)) if *gid == self.leader_group_id => return, + Some((gid, _)) => *gid, + None => return, }; - match self.delegated_msgs.get_mut(&group_id) { - Some(msgs) => { - msgs.push(msg); + + let (mut chosen, mut matched, mut bcast_targets) = (INVALID_ID, 0, vec![]); + for id in self.candidate_delegates(group_id) { + let pr = prs.get(id).unwrap(); + if matched < pr.matched { + if chosen != INVALID_ID { + bcast_targets.push(chosen); + } + chosen = id; + matched = pr.matched; + } else { + bcast_targets.push(id); } - None => { - self.delegated_msgs.insert(group_id, vec![msg]); + } + + // If there is only one member in the group, it remains unresolved. + if chosen != INVALID_ID && !bcast_targets.is_empty() { + let (_, d) = self.indexes.get_mut(&chosen).unwrap(); + *d = chosen; + for id in &bcast_targets { + let (_, d) = self.indexes.get_mut(id).unwrap(); + *d = chosen; } + self.bcast_targets.insert(chosen, bcast_targets); } } - /// Get the last delegated message - #[inline] - pub(crate) fn get_latest_delegated_msg( - &mut self, - group_id: u64, - ) -> Option<&mut DelegatedMessage> { - self.delegated_msgs - .get_mut(&group_id) - .and_then(|msgs| msgs.last_mut()) + fn candidate_delegates(&self, group_id: u64) -> impl Iterator + '_ { + self.indexes.iter().filter_map(move |(peer, (gid, _))| { + if group_id == *gid { + return Some(*peer); + } + None + }) } - /// Get group members by the member id - #[inline] - pub fn get_members(&self, member: u64) -> Option> { - self.indexes - .get(&member) - .and_then(|group_id| self.get_members_by_group(*group_id)) + /// Unset the delegate by delegate id. If the peer is not delegate, do nothing. + pub(crate) fn remove_delegate(&mut self, delegate: u64) { + if self.bcast_targets.remove(&delegate).is_some() { + // Remove the delegate from the group system since it's temorary unreachable. + // And the peer will be re-added after the leader receives a message from it. + self.indexes.remove(&delegate); + for (peer, (_, d)) in self.indexes.iter_mut() { + if *d == delegate { + *d = INVALID_ID; + self.unresolved.push(*peer); + } + } + } } - /// Get group members by group id - #[inline] - pub fn get_members_by_group(&self, group_id: u64) -> Option> { - self.meta.inner.get(&group_id).cloned() + pub(crate) fn is_delegated(&self, to: u64) -> bool { + self.indexes + .get(&to) + .map_or(false, |x| x.1 != INVALID_ID && x.1 != to) } - /// Get group id by member id - #[inline] - pub fn get_group_id(&self, member: u64) -> Option { - self.indexes.get(&member).cloned() + pub(crate) fn get_bcast_targets(&self, delegate: u64) -> Option<&Vec> { + debug_assert!(self.unresolved.is_empty()); + self.bcast_targets.get(&delegate) } - /// Set delegate for a group if the given delegate is a group member. - #[inline] - pub fn set_delegate(&mut self, delegate: u64) { - if let Some(group) = self.get_group_id(delegate) { - self.delegate_cache.insert(group, delegate); + /// Update given `peer`'s group ID. Return `true` if any peers are unresolved. + pub(crate) fn update_group_id(&mut self, peer: u64, group_id: u64) -> bool { + if group_id == INVALID_ID { + self.unmark_peer(peer); + } else if let Some((gid, _)) = self.indexes.get(&peer) { + if *gid == group_id { + return false; + } + self.unmark_peer(peer); + self.mark_peer(peer, group_id); + } else { + self.mark_peer(peer, group_id); } + !self.unresolved.is_empty() } - /// Unset the delegate by delegate id. - #[inline] - pub fn remove_delegate(&mut self, delegate: u64) { - if let Some(group_id) = self.get_group_id(delegate) { - match self.delegate_cache.get(&group_id) { - Some(d) if *d == delegate => { - self.delegate_cache.remove(&group_id); - } - _ => {} - }; + fn unmark_peer(&mut self, peer: u64) { + if let Some((_, del)) = self.indexes.remove(&peer) { + if peer == del { + self.remove_delegate(del); + return; + } + let mut targets = self.bcast_targets.remove(&del).unwrap(); + let pos = targets.iter().position(|id| *id == peer).unwrap(); + targets.swap_remove(pos); + if !targets.is_empty() { + self.bcast_targets.insert(del, targets); + } } } - /// Return the delegate for a group by group id - #[inline] - pub fn get_delegate(&self, group: u64) -> Option { - self.delegate_cache.get(&group).cloned() + fn mark_peer(&mut self, peer: u64, group_id: u64) { + let (found, delegate) = self + .indexes + .iter() + .find(|(_, (gid, _))| *gid == group_id) + .map_or((false, INVALID_ID), |(_, (_, d))| (true, *d)); + + let _x = self.indexes.insert(peer, (group_id, delegate)); + debug_assert!(_x.is_none(), "peer can't exist before mark"); + + if delegate != INVALID_ID { + self.bcast_targets.get_mut(&delegate).unwrap().push(peer); + } else if found { + // We have found a peer in the same group but haven't been delegated, add it to + // `unresolved`. + self.unresolved.push(peer); + } } - /// Return the delegate for a group by node id - #[inline] - pub fn get_delegate_by_member(&self, member: u64) -> Option { - self.get_group_id(member) - .and_then(|group| self.get_delegate(group)) + // Pick delegates for all peers if need. + // TODO: change to `pub(crate)` after we find a simple way to test. + pub fn resolve_delegates(&mut self, prs: &ProgressSet) { + if !self.unresolved.is_empty() { + for peer in std::mem::replace(&mut self.unresolved, vec![]) { + self.pick_delegate(peer, prs); + } + } } - /// Whether the two nodes are in the same group - #[inline] - pub fn in_same_group(&self, a: u64, b: u64) -> bool { - let ga = self.get_group_id(a); - let gb = self.get_group_id(b); - ga.is_some() && ga == gb + // Return the collection of mapping: group id => members + pub fn dump(&self) -> Vec<(u64, Vec)> { + let mut m: HashMap> = HashMap::new(); + for (peer, (group, _)) in &self.indexes { + let v = m.entry(*group).or_default(); + v.push(*peer); + } + for v in m.values_mut() { + v.sort(); + } + let mut v: Vec<_> = m.into_iter().collect(); + v.sort_by(|a1, a2| a1.0.cmp(&a2.0)); + v } } -impl Default for Groups { - fn default() -> Self { - Self { - meta: Default::default(), - indexes: HashMap::new(), - delegate_cache: HashMap::new(), - delegated_msgs: HashMap::new(), +#[cfg(test)] +mod tests { + use super::*; + use crate::progress::Progress; + + fn next_delegate_and_bcast_targets(group: &Groups) -> (u64, Vec) { + group + .bcast_targets + .iter() + .next() + .map(|(k, v)| (*k, v.clone())) + .unwrap() + } + + #[test] + fn test_group() { + let mut group = Groups::new(vec![(1, vec![1, 2]), (2, vec![3, 4, 5]), (3, vec![6])]); + assert_eq!(group.unresolved.len(), 6); + group.set_leader_group_id(1); + + let mut prs = ProgressSet::new(crate::default_logger()); + for id in 1..=6 { + let mut pr = Progress::new(100, 100); + pr.matched = 99; + prs.insert_voter(id, pr).unwrap(); } + + // After the resolving, only group 2 should be delegated. + group.resolve_delegates(&prs); + assert_eq!(group.bcast_targets.len(), 1); + let (delegate, mut targets) = next_delegate_and_bcast_targets(&group); + targets.push(delegate); + targets.sort(); + assert_eq!(targets, [3, 4, 5]); + + // Remove a delegate which doesn't exists. + group.remove_delegate(6); + assert!(group.unresolved.is_empty()); + + // Remove a peer which is not delegate. + let remove = match delegate { + 3 => 4, + 4 => 5, + 5 => 3, + _ => unreachable!(), + }; + group.remove_delegate(remove); + assert!(group.unresolved.is_empty()); + let (_, targets) = next_delegate_and_bcast_targets(&group); + assert_eq!(targets.len(), 2); + + // Remove a delegate. + group.remove_delegate(delegate); + assert_eq!(group.unresolved.len(), 2); + group.resolve_delegates(&prs); + let (_, targets) = next_delegate_and_bcast_targets(&group); + assert_eq!(targets.len(), 1); + + // Add the removed peer back, without group id. + let peer = delegate; + group.update_group_id(peer, INVALID_ID); + assert!(group.unresolved.is_empty()); + + // Add the removed peer back, with group id. + assert!(!group.update_group_id(peer, 2)); + let (_, targets) = next_delegate_and_bcast_targets(&group); + assert_eq!(targets.len(), 2); + + // Get the new delegate. + let (delegate, _) = next_delegate_and_bcast_targets(&group); + assert_ne!(peer, delegate); + + // The peer reports to the group again, without group id. + group.update_group_id(peer, INVALID_ID); + let (_, targets) = next_delegate_and_bcast_targets(&group); + assert_eq!(targets.len(), 1); + + // The delegate changes group to 3. + assert!(group.update_group_id(delegate, 3)); + assert!(group.bcast_targets.is_empty()); + group.resolve_delegates(&prs); + let (_, targets) = next_delegate_and_bcast_targets(&group); + assert!(targets.contains(&delegate) || targets.contains(&6)); + assert!(group.get_delegate(6) == 6 || group.get_delegate(6) == delegate); } } diff --git a/src/lib.rs b/src/lib.rs index 4310c207c..146cd9a2c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -472,7 +472,7 @@ macro_rules! fatal { mod config; mod errors; -pub mod group; +mod group; mod log_unstable; mod progress; #[cfg(test)] @@ -488,7 +488,6 @@ pub mod util; pub use self::config::Config; pub use self::errors::{Error, Result, StorageError}; -pub use self::group::GroupsConfig; pub use self::log_unstable::Unstable; pub use self::progress::inflights::Inflights; pub use self::progress::progress_set::{majority, Configuration, ProgressSet}; @@ -528,8 +527,6 @@ pub mod prelude { pub use crate::status::{Status, StatusRef}; pub use crate::read_only::{ReadOnlyOption, ReadState}; - - pub use crate::group::GroupsConfig; } /// The default logger we fall back to when passed `None` in external facing constructors. diff --git a/src/progress/mod.rs b/src/progress/mod.rs index d4b4125b5..3c80ddc65 100644 --- a/src/progress/mod.rs +++ b/src/progress/mod.rs @@ -29,9 +29,6 @@ pub enum ProgressState { Replicate, /// Whether it's a snapshot. Snapshot, - /// Whether the peer is delegated. - /// A `Delegated` peer's Inflights is always empty and the Progress is never paused. - Delegated, } impl Default for ProgressState { @@ -149,14 +146,6 @@ impl Progress { self.pending_snapshot = snapshot_idx; } - /// Changes the progress to a Delegated. - #[inline] - pub fn become_delegated(&mut self) { - if self.state != ProgressState::Delegated { - self.reset_state(ProgressState::Delegated); - } - } - /// Sets the snapshot to failure. #[inline] pub fn snapshot_failure(&mut self) { @@ -196,7 +185,7 @@ impl Progress { /// Otherwise it decreases the progress next index to min(rejected, last) /// and returns true. pub fn maybe_decr_to(&mut self, rejected: u64, last: u64, request_snapshot: u64) -> bool { - if self.state == ProgressState::Replicate || self.state == ProgressState::Delegated { + if self.state == ProgressState::Replicate { // the rejection must be stale if the progress has matched and "rejected" // is smaller than "match". // Or rejected equals to matched and request_snapshot is the INVALID_INDEX. @@ -215,7 +204,6 @@ impl Progress { // The rejection must be stale if "rejected" does not match next - 1. // Do not consider it stale if it is a request snapshot message. - // This should only happens when the state is ProgressState::Probe if (self.next_idx == 0 || self.next_idx - 1 != rejected) && request_snapshot == INVALID_INDEX { @@ -243,7 +231,6 @@ impl Progress { ProgressState::Probe => self.paused, ProgressState::Replicate => self.ins.full(), ProgressState::Snapshot => true, - ProgressState::Delegated => false, } } @@ -271,14 +258,6 @@ impl Progress { "updating progress state in unhandled state {:?}", self.state ), - // TODO: For now just treated as Replicate - // NOTICE: When the leader resume origin Log Replication of a `Delegated` peer, there might - // be still some MsgAppendResp on the fly. But it's safe to just `add(last)` as `free_to` will ignore the idx out of the window. - ProgressState::Delegated => { - self.become_replicate(); - self.optimistic_update(last); - self.ins.add(last); - } } } } diff --git a/src/progress/progress_set.rs b/src/progress/progress_set.rs index bff202643..d42e0c0e2 100644 --- a/src/progress/progress_set.rs +++ b/src/progress/progress_set.rs @@ -32,13 +32,14 @@ pub fn majority(total: usize) -> usize { /// A Raft internal representation of a Configuration. /// /// This is corollary to a ConfState, but optimized for `contains` calls. -#[derive(Clone, Debug, Default, PartialEq, Getters)] +#[derive(Clone, Debug, Default, PartialEq, Getters, Setters)] pub struct Configuration { /// The voter set. #[get = "pub"] voters: HashSet, /// The learner set. #[get = "pub"] + #[set = "pub"] learners: HashSet, } @@ -239,12 +240,6 @@ impl ProgressSet { .filter(move |(k, _)| ids.contains(k)) } - /// Returns the ids of all the peers (voters + learners) - #[inline] - pub fn peer_ids(&self) -> Vec { - self.progress.keys().copied().collect::>() - } - /// Returns the ids of all known voters. /// /// **Note:** Do not use this for majority/quorum calculation. The Raft node may be @@ -427,18 +422,29 @@ impl ProgressSet { /// Doing this will set the `recent_active` of each peer to false. /// /// This should only be called by the leader. - pub fn quorum_recently_active(&mut self, perspective_of: u64) -> bool { + pub fn quorum_recently_active(&mut self, perspective_of: u64, mut f: F) -> bool + where + F: FnMut(u64, bool), + { let mut active = HashSet::default(); - for (&id, pr) in self.voters_mut() { - if id == perspective_of { - active.insert(id); + for id in &self.configuration.voters { + if *id == perspective_of { + active.insert(*id); continue; } + let pr = self.progress.get_mut(id).unwrap(); + f(*id, pr.recent_active); if pr.recent_active { - active.insert(id); + active.insert(*id); } + pr.recent_active = false; } - for pr in self.progress.values_mut() { + for id in &self.configuration.learners { + if *id == perspective_of { + continue; + } + let pr = self.progress.get_mut(id).unwrap(); + f(*id, pr.recent_active); pr.recent_active = false; } self.configuration.has_quorum(&active) diff --git a/src/raft.rs b/src/raft.rs index 34dec468d..cf8b95a73 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -14,10 +14,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp; + use crate::eraftpb::{Entry, EntryType, HardState, Message, MessageType, Snapshot}; use rand::{self, Rng}; use slog::{self, Logger}; -use std::{cmp, mem}; use super::errors::{Error, Result, StorageError}; use super::group::Groups; @@ -177,7 +178,12 @@ pub struct Raft { /// Raft groups inner state pub groups: Groups, - group_id: u64, + /// The group ID of this node + pub group_id: u64, + // Indicates that self is a delegate or not. + // Become true when receive a `MsgAppend` contains non-empty `bcast_targets` from leader + // Become false when receive a normal `MsgAppend` without `bcast_targets` + is_delegate: bool, /// The logger for the raft structure. pub(crate) logger: slog::Logger, @@ -216,6 +222,7 @@ impl Raft { let conf_state = &raft_state.conf_state; let voters = &conf_state.voters; let learners = &conf_state.learners; + let mut r = Raft { id: c.id, read_states: Default::default(), @@ -251,6 +258,7 @@ impl Raft { batch_append: c.batch_append, groups: Default::default(), group_id: c.group_id, + is_delegate: false, logger, }; for p in voters { @@ -385,12 +393,6 @@ impl Raft { self.skip_bcast_commit = skip; } - /// Returns the inner raft group configuration - #[inline] - pub fn groups(&self) -> &Groups { - &self.groups - } - /// Set whether batch append msg at runtime. #[inline] pub fn set_batch_append(&mut self, batch_append: bool) { @@ -409,6 +411,7 @@ impl Raft { if m.from == INVALID_ID { m.from = self.id; } + m.group_id = self.group_id; if m.get_msg_type() == MessageType::MsgRequestVote || m.get_msg_type() == MessageType::MsgRequestPreVote || m.get_msg_type() == MessageType::MsgRequestVoteResponse @@ -485,22 +488,16 @@ impl Raft { } let (sindex, sterm) = (snapshot.get_metadata().index, snapshot.get_metadata().term); m.set_snapshot(snapshot); - self.pr_become_snapshot_and_log(to, pr, sindex, sterm); + self.pr_become_snapshot(to, pr, sindex, sterm); true } - fn prepare_send_entries( - &mut self, - m: &mut Message, - pr: &mut Progress, - term: u64, - ents: Vec, - ) { + fn prepare_send_entries(&mut self, m: &mut Message, pr: &mut Progress, ents: Vec) { m.set_msg_type(MessageType::MsgAppend); m.index = pr.next_idx - 1; - m.log_term = term; - m.set_entries(ents.into()); + m.log_term = self.raft_log.term(m.index).unwrap(); m.commit = self.raft_log.committed; + m.set_entries(ents.into()); if !m.entries.is_empty() { let last = m.entries.last().unwrap().index; pr.update_state(last); @@ -531,161 +528,19 @@ impl Raft { is_batched } - // Whether the leader can send raft logs through Follower Replication - // and returns the group id of the given peer. - #[inline] - fn use_delegate(&self, to: u64) -> Option { - if self.is_follower_replication_enabled() - && self.is_leader() - && self - .groups - .get_members(to) - .map_or(0, |members| members.len()) - > 1 - && !self.groups.in_same_group(self.id, to) - { - self.groups.get_group_id(to) - } else { - None + // Attach group info to `m` before it's sent out. It can be called on the leader or delegates. + fn attach_group_info(&self, m: &mut Message) { + debug_assert!(m.to != INVALID_ID); + if self.is_delegate { + m.from = self.leader_id; + m.delegate = self.id; + } else if let Some(ids) = self.groups.get_bcast_targets(m.to) { + m.set_bcast_targets(ids.clone()); } } - #[inline] - fn is_follower_replication_enabled(&self) -> bool { - self.group_id == INVALID_ID - } - /// Sends RPC, with entries to the given peer. - /// This can be called by the leader or a delegate. If the delegate call this, it ignores the Follower Replication and the field `to` - /// of produced message will be set to the leader id. - pub fn send_append(&mut self, to: u64, prs: &mut ProgressSet) { - if let Some(gid) = self.use_delegate(to) { - let mut groups = self.take_groups(); - match groups.get_latest_delegated_msg(gid) { - Some(m) => { - // `to` is a delegate - if to == m.inner.to { - let mut pr = prs.get_mut(to).unwrap(); - assert_eq!( - pr.pending_request_snapshot, INVALID_INDEX, - "The delegate must have been dismissed if it is requesting a snapshot" - ); - // TODO: If we can batch this message, the Message::default() is unnecessary - match self.prepare_replicate_message_for_peer(self.id, to, &mut pr, false) { - None => { - // Delegate paused - groups.remove_delegate(to); - } - Some(mut msg) => { - // Try batch - if msg.msg_type == MessageType::MsgAppend - && m.inner.msg_type == MessageType::MsgAppend - && util::is_continuous_ents(&m.inner, &msg.entries.as_slice()) - { - // TODO: Can we positively assume the delegate not requiring a snapshot here? - let mut batched_ents: Vec = - m.inner.take_entries().into(); - let ents: Vec = msg.take_entries().into(); - batched_ents.extend(ents); - util::limit_size(&mut batched_ents, Some(self.max_msg_size)); - m.inner.set_entries(batched_ents.into()); - } else { - // Unable to batch, create a new DelegatedMessage - groups.insert_delegated_msg(gid, msg, None); - } - } - } - self.set_groups(groups); - return; - } else if let Some(pr) = prs.get(m.inner.to) { - // TODO: Is None possible? If not, we can just use unwrap - if m.inner.msg_type == MessageType::MsgAppend - && m.inner.index < self.raft_log.last_index() - { - let size = util::compute_ents_size(m.inner.entries.as_slice()); - if self.max_msg_size - size > 0 { - if let Ok(ents) = self - .raft_log - .entries(pr.next_idx - 1, self.max_msg_size - size) - { - for e in ents { - m.inner.mut_entries().push(e); - } - } - } - } - m.delegated_peers.insert(to); - // This just clean all the Inflights though there could be some MsgAppendResp on the way. And from - // now the delegate will take over the flow control of the peer `to`. - let pr = prs.get_mut(to).unwrap(); - pr.become_delegated(); - self.set_groups(groups); - return; - } - // Leader send raft logs itself - } - // Pick a delegate and create a new DelegatedMessage - None => { - let members = groups.get_members(to).unwrap(); - let delegate = match groups.get_delegate(gid) { - Some(cached) => cached, - None => self.pick_delegate(&members, prs), - }; - if delegate != INVALID_ID { - let pr = prs.get_mut(delegate).unwrap(); - if let Some(m) = - self.prepare_replicate_message_for_peer(self.id, delegate, pr, false) - { - let peer = if to != delegate { Some(to) } else { None }; - groups.insert_delegated_msg(gid, m, peer); - groups.set_delegate(delegate); - self.set_groups(groups); - return; - } else { - groups.remove_delegate(delegate); - } - } - } - } - self.set_groups(groups); - } - self.send_append_without_delegate(to, prs); - } - - // Send Append RPC using no delegate to the given peer. - // This could be called by the leader or a delegate. - fn send_append_without_delegate(&mut self, to: u64, prs: &mut ProgressSet) { - if let Some(pr) = prs.get_mut(to) { - let from = if self.is_leader() { - INVALID_ID - } else { - // Is this safe? - self.leader_id - }; - if let Some(mut m) = self.prepare_replicate_message_for_peer(from, to, pr, true) { - if self.is_leader() { - self.send(m); - } else { - // Self is a delegate, set `from_delegate` - m.from_delegate = self.id; - self.send(m); - } - } - } - } - - // Prepare a message to send entries or snapshot for given peer - // Return `None` only in conditions below : - // - given `pr` is paused - // - Snapshot is temporarily unavailable - // - Not `recent_active` - fn prepare_replicate_message_for_peer( - &mut self, - from: u64, - to: u64, - pr: &mut Progress, - batch_append: bool, - ) -> Option { + pub fn send_append(&mut self, to: u64, pr: &mut Progress) { if pr.is_paused() { trace!( self.logger, @@ -693,41 +548,33 @@ impl Raft { to = to; "progress" => ?pr, ); - return None; + return; } let mut m = Message::default(); m.to = to; - m.from = from; + self.attach_group_info(&mut m); if pr.pending_request_snapshot != INVALID_INDEX { // Check pending request snapshot first to avoid unnecessary loading entries. if !self.prepare_send_snapshot(&mut m, pr, to) { - return None; + return; } } else { let term = self.raft_log.term(pr.next_idx - 1); let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size); if term.is_err() || ents.is_err() { - // send snapshot if we failed to get term or entries - trace!( - self.logger, - "Skipping sending to {to}", - to = to; - "index" => pr.next_idx, - "term" => ?term, - "ents" => ?ents, - ); + // send snapshot if we failed to get term or entries. if !self.prepare_send_snapshot(&mut m, pr, to) { - return None; + return; } } else { let mut ents = ents.unwrap(); - if self.batch_append && batch_append && self.try_batching(to, pr, &mut ents) { - return None; + if self.batch_append && self.try_batching(to, pr, &mut ents) { + return; } - self.prepare_send_entries(&mut m, pr, term.unwrap(), ents); + self.prepare_send_entries(&mut m, pr, ents); } } - Some(m) + self.send(m); } // send_heartbeat sends an empty MsgAppend @@ -749,23 +596,17 @@ impl Raft { self.send(m); } - /// Sends RPC, with entries to all or given peers that are not up-to-date + /// Sends RPC, with entries to all peers that are not up-to-date /// according to the progress recorded in r.prs(). - pub fn bcast_append(&mut self, targets: Option>) { - let self_id = self.id; - let mut prs = self.take_prs(); - match targets { - None => { - let ids = prs.peer_ids(); - ids.iter() - .filter(|id| **id != self_id) - .for_each(|id| self.send_append(*id, &mut prs)); + pub fn bcast_append(&mut self) { + let (self_id, mut prs) = (self.id, self.take_prs()); + self.groups.resolve_delegates(&prs); + for (id, pr) in prs.iter_mut().filter(|(id, _)| **id != self_id) { + let delegate = self.groups.get_delegate(*id); + if delegate == INVALID_ID || delegate == *id { + self.send_append(*id, pr); } - Some(set) => set - .iter() - .filter(|id| **id != self_id) - .for_each(|id| self.send_append(*id, &mut prs)), - }; + } self.set_prs(prs); } @@ -997,7 +838,8 @@ impl Raft { self.reset(term); self.leader_id = self.id; self.state = StateRole::Leader; - + self.groups.set_leader_group_id(self.group_id); + self.is_delegate = false; // Followers enter replicate mode when they've been successfully probed // (perhaps after having received a snapshot as a result). The leader is // trivially in this state. Note that r.reset() has initialized this @@ -1096,7 +938,16 @@ impl Raft { /// Steps the raft along via a message. This should be called everytime your raft receives a /// message from a peer. + #[allow(clippy::collapsible_if)] pub fn step(&mut self, m: Message) -> Result<()> { + if m.term != 0 && m.group_id != INVALID_ID && self.is_leader() { + if self.groups.update_group_id(m.from, m.group_id) { + let prs = self.take_prs(); + self.groups.resolve_delegates(&prs); + self.set_prs(prs); + } + } + // Handle the message term, which may result in our stepping down to a follower. if m.term == 0 { // local message @@ -1400,13 +1251,11 @@ impl Raft { if pr.state == ProgressState::Replicate { pr.become_probe(); } - if m.request_snapshot != INVALID_INDEX { - self.groups.remove_delegate(m.from); - } *send_append = true; } return; } + *old_paused = pr.is_paused(); if !pr.maybe_update(m.index) { return; @@ -1429,60 +1278,20 @@ impl Raft { pr.become_probe(); } ProgressState::Replicate => pr.ins.free_to(m.index), - // In the leader's ProgressSet, the flow control of a `Delegated` progress is turned off - ProgressState::Delegated => {} } *maybe_commit = true; } - fn handle_append_response_in_delegate(&mut self, m: &Message, prs: &mut ProgressSet) { - let pr = prs.get_mut(m.from).unwrap(); - pr.recent_active = true; - if m.reject { - debug!( - self.logger, - "received msgAppend rejection"; - "last index" => m.reject_hint, - "from" => m.from, - "index" => m.index, - ); - - if pr.maybe_decr_to(m.index, m.reject_hint, m.request_snapshot) { - debug!( - self.logger, - "decreased progress of {}", - m.from; - "progress" => ?pr, - ); - if pr.state == ProgressState::Replicate { - pr.become_probe(); - } - self.send_append_without_delegate(m.from, prs); - } - } else { - if !pr.maybe_update(m.index) { - // No update found, skip the Progress part - return; - } - match pr.state { - ProgressState::Probe => pr.become_replicate(), - ProgressState::Snapshot => { - if !pr.maybe_snapshot_abort() { - return; - } - debug!( - self.logger, - "snapshot aborted, resumed sending replication messages to {from}", - from = m.from; - "progress" => ?pr, - ); - pr.become_probe(); - } - ProgressState::Replicate => pr.ins.free_to(m.index), - // In a delegate's ProgressSet, the `Delegated` state must be stale (from the former Leader role) - ProgressState::Delegated => pr.become_probe(), - } + fn handle_append_response_on_delegate(&mut self, m: &Message) { + let mut prs = self.take_prs(); + let mut send_append = false; + let (mut _h1, mut _h2) = (false, false); + self.handle_append_response(&m, &mut prs, &mut _h1, &mut send_append, &mut _h2); + if send_append { + let from = m.from; + self.send_append(from, prs.get_mut(from).unwrap()); } + self.set_prs(prs); } fn process_leader_transfer(&mut self, from: u64, match_idx: u64) { @@ -1610,7 +1419,7 @@ impl Raft { lead_transferee = lead_transferee; ); } else { - self.send_append(lead_transferee, prs); + self.send_append(lead_transferee, pr); } } @@ -1638,14 +1447,12 @@ impl Raft { // If snapshot failure, wait for a heartbeat interval before next try pr.pause(); pr.pending_request_snapshot = INVALID_INDEX; - // Remove the delegate as it's paused now - self.groups.remove_delegate(m.from); } /// Check message's progress to decide which action should be taken. fn check_message_with_progress( &mut self, - m: &mut Message, + m: &Message, send_append: &mut bool, old_paused: &mut bool, maybe_commit: &mut bool, @@ -1659,6 +1466,7 @@ impl Raft { ); return; } + let mut prs = self.take_prs(); match m.get_msg_type() { MessageType::MsgAppendResponse => { @@ -1674,13 +1482,13 @@ impl Raft { } } MessageType::MsgUnreachable => { + self.groups.remove_delegate(m.from); let pr = prs.get_mut(m.from).unwrap(); // During optimistic replication, if the remote becomes unreachable, // there is huge probability that a MsgAppend is lost. if pr.state == ProgressState::Replicate { pr.become_probe(); } - self.groups.remove_delegate(m.from); debug!( self.logger, "failed to send message to {from} because it is unreachable", @@ -1753,7 +1561,7 @@ impl Raft { } } self.append_entry(&mut m.mut_entries()); - self.bcast_append(None); + self.bcast_append(); return Ok(()); } MessageType::MsgReadIndex => { @@ -1824,7 +1632,7 @@ impl Raft { let mut old_paused = false; let mut more_to_send = vec![]; self.check_message_with_progress( - &mut m, + &m, &mut send_append, &mut old_paused, &mut maybe_commit, @@ -1833,7 +1641,7 @@ impl Raft { if maybe_commit { if self.maybe_commit() { if self.should_bcast_commit() { - self.bcast_append(None); + self.bcast_append(); } } else if old_paused { // update() reset the wait state on this node. If we had delayed sending @@ -1841,16 +1649,12 @@ impl Raft { send_append = true; } } - if send_append { + + if send_append && !self.groups.is_delegated(m.from) { let from = m.from; - if m.from_delegate == INVALID_ID { - let mut prs = self.take_prs(); - self.send_append(from, &mut prs); - self.set_prs(prs); - } else { - let members = self.groups.get_members(m.from_delegate); - self.bcast_append(members); - } + let mut prs = self.take_prs(); + self.send_append(from, prs.get_mut(from).unwrap()); + self.set_prs(prs); } if !more_to_send.is_empty() { for to_send in more_to_send.drain(..) { @@ -1873,16 +1677,21 @@ impl Raft { ); return Err(Error::ProposalDropped); } - MessageType::MsgAppend | MessageType::MsgSnapshot => { + MessageType::MsgAppend => { debug_assert_eq!(self.term, m.term); self.become_follower(m.term, m.from); - self.handle_replication(m); + self.handle_append_message(m); } MessageType::MsgHeartbeat => { debug_assert_eq!(self.term, m.term); self.become_follower(m.term, m.from); self.handle_heartbeat(m); } + MessageType::MsgSnapshot => { + debug_assert_eq!(self.term, m.term); + self.become_follower(m.term, m.from); + self.handle_snapshot(m); + } MessageType::MsgRequestPreVoteResponse | MessageType::MsgRequestVoteResponse => { // Only handle vote responses corresponding to our candidacy (while in // state Candidate, we may get stale MsgPreVoteResp messages in this term from @@ -1912,7 +1721,7 @@ impl Raft { self.campaign(CAMPAIGN_ELECTION); } else { self.become_leader(); - self.bcast_append(None); + self.bcast_append(); } } CandidacyStatus::Ineligible => { @@ -1950,16 +1759,21 @@ impl Raft { m.to = self.leader_id; self.send(m); } - MessageType::MsgAppend | MessageType::MsgSnapshot => { + MessageType::MsgAppend => { self.election_elapsed = 0; self.leader_id = m.from; - self.handle_replication(m); + self.handle_append_message(m); } MessageType::MsgHeartbeat => { self.election_elapsed = 0; self.leader_id = m.from; self.handle_heartbeat(m); } + MessageType::MsgSnapshot => { + self.election_elapsed = 0; + self.leader_id = m.from; + self.handle_snapshot(m); + } MessageType::MsgTransferLeader => { if self.leader_id == INVALID_ID { info!( @@ -2022,52 +1836,13 @@ impl Raft { self.read_states.push(rs); } MessageType::MsgAppendResponse => { - let mut prs = self.take_prs(); - self.handle_append_response_in_delegate(&m, &mut prs); - self.set_prs(prs); + self.handle_append_response_on_delegate(&m); } _ => {} } Ok(()) } - /// Receive a replication message (MsgAppend or MsgSnapshot) and handle it - pub fn handle_replication(&mut self, mut m: Message) { - // `from` of receiving message must be the leader so that `to` of the response is also the leader - let mut response = match m.msg_type { - MessageType::MsgAppend => self.handle_append_entries(&m), - MessageType::MsgSnapshot => { - let snapshot = m.take_snapshot(); - self.handle_snapshot(&m, snapshot) - } - _ => fatal!( - self.logger, - "expect a message for doing replication (MsgAppend or MsgSnapshot), but got {:?}", - m.msg_type - ), - }; - // Self is a delegate, try to bcast_append all the members. - if !m.bcast_targets.is_empty() { - if response.reject { - response.from_delegate = self.id; - } else { - let targets = m.take_bcast_targets(); - self.bcast_append(Some(targets)); - } - } else if m.from_delegate != INVALID_ID { - // Self is a normal follower receiving msg from the delegate - if m.index >= self.raft_log.committed && !response.reject { - // The delegate could send a stale appending request when it becomes a delegate first time. - // And the follower dont need to send this response to leader since the commit index is up-to-date. - let to_leader = response.clone(); - self.send(to_leader); - } - // Send back to delegate for flow control and updating the Progress - response.to = m.from_delegate; - } - self.send(response); - } - /// Request a snapshot from a leader. pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> { if self.is_leader() { @@ -2100,27 +1875,63 @@ impl Raft { } /// For a given message, append the entries to the log. - fn handle_append_entries(&mut self, m: &Message) -> Message { + pub fn handle_append_message(&mut self, mut m: Message) { if self.pending_request_snapshot != INVALID_INDEX { - return self.prepare_request_snapshot(self.leader_id); + self.send_request_snapshot(); + return; } + let mut to_send = Message::default(); to_send.set_msg_type(MessageType::MsgAppendResponse); to_send.to = m.from; + if m.index < self.raft_log.committed { debug!( self.logger, "got message with lower index than committed."; ); to_send.index = self.raft_log.committed; - return to_send; + if msg_from_delegate(&m) { + to_send.to = m.delegate; + } + self.send(to_send); + return; } + let old_is_delegate = self.is_delegate; + self.is_delegate = !m.get_bcast_targets().is_empty(); + self.handle_append_entries(&m, &mut to_send); + let accepted = !to_send.reject; + self.send(to_send); + if accepted { + let mut prs = self.take_prs(); + for target in m.take_bcast_targets() { + // Self is delegate, sync raft logs to other members. + if let Some(pr) = prs.get_mut(target) { + if !old_is_delegate && self.is_delegate { + // Make sure the delegate can send a message to the target + pr.become_probe(); + pr.optimistic_update(self.raft_log.last_index()); + } + self.send_append(target, pr); + } + } + self.set_prs(prs); + } + } + + fn handle_append_entries(&mut self, m: &Message, to_send: &mut Message) { debug_assert!(m.log_term != 0, "{:?} log term can't be 0", m); if let Some((_, last_idx)) = self .raft_log .maybe_append(m.index, m.log_term, m.commit, &m.entries) { to_send.set_index(last_idx); + if msg_from_delegate(m) { + // Follower receives the message from a delegate, send response back to the delegate + let mut to_delegate = to_send.clone(); + to_delegate.to = m.delegate; + self.send(to_delegate); + } } else { debug!( self.logger, @@ -2132,11 +1943,14 @@ impl Raft { "index" => m.index, "logterm" => ?self.raft_log.term(m.index), ); + if msg_from_delegate(m) { + // Follower only sends rejection to the delegate. + to_send.to = m.delegate; + } to_send.index = m.index; to_send.reject = true; to_send.reject_hint = self.raft_log.last_index(); } - to_send } // TODO: revoke pub when there is a better way to test. @@ -2154,15 +1968,14 @@ impl Raft { self.send(to_send); } - // A follower never rejects a MsgSnapshot - fn handle_snapshot(&mut self, m: &Message, snapshot: Snapshot) -> Message { + fn handle_snapshot(&mut self, mut m: Message) { debug_assert!(m.term != 0, "{:?} term can't be 0", m); let metadata = m.get_snapshot().get_metadata(); let (sindex, sterm) = (metadata.index, metadata.term); let mut to_send = Message::default(); to_send.set_msg_type(MessageType::MsgAppendResponse); to_send.to = m.from; - if self.restore(snapshot) { + if self.restore(m.take_snapshot()) { info!( self.logger, "[commit: {commit}, term: {term}] restored snapshot [index: {snapshot_index}, term: {snapshot_term}]", @@ -2172,6 +1985,26 @@ impl Raft { snapshot_term = sterm; ); to_send.index = self.raft_log.last_index(); + if msg_from_delegate(&m) { + // The snapshot comes from the peer's delegate. + let mut to_delegate = to_send.clone(); + to_delegate.to = m.delegate; + self.send(to_delegate); + } else if !m.get_bcast_targets().is_empty() { + let mut prs = self.take_prs(); + for member in m.take_bcast_targets() { + if let Some(pr) = prs.get_mut(member) { + // TODO: send snapshot directly. + let mut to_member = Message::default(); + to_member.from = m.from; + to_member.to = member; + if self.prepare_send_snapshot(&mut to_member, pr, member) { + self.send(to_member); + } + } + } + self.set_prs(prs) + } } else { info!( self.logger, @@ -2180,9 +2013,12 @@ impl Raft { snapshot_index = sindex, snapshot_term = sterm; ); + if msg_from_delegate(&m) { + to_send.to = m.delegate + } to_send.index = self.raft_log.committed; } - to_send + self.send(to_send); } fn restore_raft(&mut self, snap: &Snapshot) -> Option { @@ -2270,7 +2106,6 @@ impl Raft { } /// Check if self is the leader now. - #[inline] pub fn is_leader(&self) -> bool { self.state == StateRole::Leader } @@ -2358,7 +2193,7 @@ impl Raft { // The quorum size is now smaller, so see if any pending entries can // be committed. if self.maybe_commit() { - self.bcast_append(None); + self.bcast_append(); } // If the removed node is the lead_transferee, then abort the leadership transferring. if self.is_leader() && self.lead_transferee == Some(id) { @@ -2381,14 +2216,13 @@ impl Raft { } } - /// Takes the `Groups`. - pub fn take_groups(&mut self) -> Groups { - mem::replace(&mut self.groups, Groups::default()) - } - - /// Sets the `Groups`. - pub fn set_groups(&mut self, groups: Groups) { - self.groups = groups + /// Sets the `Groups`. Only should be used for tests. + pub fn set_groups(&mut self, groups: Vec<(u64, Vec)>) { + let groups = Groups::new(groups); + if let Some(gid) = groups.get_group_id(self.id) { + self.group_id = gid; + } + self.groups = groups; } /// Takes the progress set (destructively turns to `None`). @@ -2457,7 +2291,15 @@ impl Raft { // check_quorum_active can only called by leader. fn check_quorum_active(&mut self) -> bool { let self_id = self.id; - self.mut_prs().quorum_recently_active(self_id) + let mut prs = self.take_prs(); + let res = prs.quorum_recently_active(self_id, |id, active| { + if !active { + // Remove the non-active delegate peer + self.groups.remove_delegate(id); + } + }); + self.set_prs(prs); + res } /// Issues a message to timeout immediately. @@ -2471,26 +2313,18 @@ impl Raft { self.lead_transferee = None; } - // Send snapshot request message to the leader fn send_request_snapshot(&mut self) { - let m = self.prepare_request_snapshot(self.leader_id); - self.send(m); - } - - // Prepare a Message for requesting snapshot from given `to` - fn prepare_request_snapshot(&mut self, to: u64) -> Message { let mut m = Message::default(); m.set_msg_type(MessageType::MsgAppendResponse); m.index = self.raft_log.committed; m.reject = true; m.reject_hint = self.raft_log.last_index(); - m.to = to; + m.to = self.leader_id; m.request_snapshot = self.pending_request_snapshot; - m + self.send(m); } - #[inline] - fn pr_become_snapshot_and_log( + fn pr_become_snapshot( &self, id: u64, progress: &mut Progress, @@ -2515,49 +2349,8 @@ impl Raft { "progress" => ?progress, ); } +} - // Pick a delegate of the given group - // - // The delegate must satisfy conditions below: - // 1. Must be 'recent_active' - // 2. The progress state should be 'Replicate' but not 'paused' - // 3. The progress has biggest 'match' - // 4. Not requiring a snapshot - // - // Especially if all the members in a group needs snapshot, choose the delegate randomly. - // - fn pick_delegate(&self, group: &[u64], prs: &ProgressSet) -> u64 { - let mut to_send_snapshot = vec![]; - let first_index = self.raft_log.first_index(); - let (_, delegate_id) = group.iter().filter(|member| **member != self.id).fold( - (INVALID_INDEX, INVALID_ID), - |(mut max_matched, mut id), member| { - if let Some(pr) = prs.get(*member) { - // `pr.pending_request_snapshot` can be passed as the delegate can send snapshots - // to other members now - if !pr.is_paused() { - let term = self.raft_log.term(pr.next_idx - 1); - if term.is_err() || pr.next_idx < first_index { - to_send_snapshot.push(*member); - } else if pr.matched > max_matched - && pr.recent_active - && pr.state == ProgressState::Replicate - { - max_matched = pr.matched; - id = *member; - } - } - } - (max_matched, id) - }, - ); - if delegate_id == INVALID_ID && !to_send_snapshot.is_empty() { - // All the members need snapshot, choose the delegate randomly. It's safe since they're all unpaused. - to_send_snapshot[0] - } else { - // In the situation where every member is paused(or not recent_active) but not requring a snapshot, we get a invalid delegate. - // Otherwise, the `delegate_id` must be valid. - delegate_id - } - } +fn msg_from_delegate(m: &Message) -> bool { + m.delegate != INVALID_ID } diff --git a/src/raw_node.rs b/src/raw_node.rs index 67b0e4b9e..8de4dcc9a 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -30,7 +30,6 @@ use crate::eraftpb::{ Snapshot, }; use crate::errors::{Error, Result}; -use crate::group::{Groups, GroupsConfig}; use crate::read_only::ReadState; use crate::{Raft, SoftState, Status, StatusRef, Storage, INVALID_ID}; use slog::Logger; @@ -124,10 +123,6 @@ impl Ready { if !raft.msgs.is_empty() { mem::swap(&mut raft.msgs, &mut rd.messages); } - if !raft.groups.has_message() { - let msgs = raft.groups.take_messages(); - rd.messages.extend(msgs); - } rd.committed_entries = Some( (match since_idx { None => raft.raft_log.next_entries(), @@ -520,10 +515,9 @@ impl RawNode { /// Update raft groups config for Follower Replication in flight #[inline] - pub fn update_groups_config(&mut self, config: GroupsConfig) { - let groups = Groups::new(config); + pub fn update_groups_config(&mut self, config: Vec<(u64, Vec)>) { // The delegate cache will be removed - self.raft.set_groups(groups); + self.raft.set_groups(config); } } diff --git a/src/util.rs b/src/util.rs index 4248643ee..6b1a57f36 100644 --- a/src/util.rs +++ b/src/util.rs @@ -71,14 +71,6 @@ pub fn limit_size(entries: &mut Vec, max: Option) entries.truncate(limit); } -/// Compute the total size of given entries -#[inline] -pub fn compute_ents_size(entries: &[T]) -> u64 { - entries - .iter() - .fold(0, |acc, e| acc + u64::from(e.compute_size())) -} - /// Check whether the entry is continuous to the message. /// i.e msg's next entry index should be equal to the first entries's index pub fn is_continuous_ents(msg: &Message, ents: &[Entry]) -> bool {