Skip to content

Commit

Permalink
squash commits and resolve conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Fullstop000 <[email protected]>
  • Loading branch information
Fullstop000 committed Feb 25, 2020
1 parent 5189128 commit f89086c
Show file tree
Hide file tree
Showing 14 changed files with 204 additions and 68 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ rust:
- stable
- nightly
# Officially the oldest compiler we support.
- 1.36.0
- 1.39.0
matrix:
include:
- os: windows
Expand Down
28 changes: 14 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ version = "0.6.0-alpha"
authors = ["The TiKV Project Developers"]
license = "Apache-2.0"
keywords = ["raft", "distributed-systems", "ha"]
repository = "https://github.com/pingcap/raft-rs"
repository = "https://github.com/tikv/raft-rs"
readme = "README.md"
homepage = "https://github.com/pingcap/raft-rs"
homepage = "https://github.com/tikv/raft-rs"
documentation = "https://docs.rs/raft"
description = "The rust language implementation of Raft algorithm."
categories = ["algorithms", "database-implementations"]
Expand All @@ -25,32 +25,32 @@ default-logger = ["slog-stdlog", "slog-envlogger", "slog-term"]

# Make sure to synchronize updates with Harness.
[dependencies]
fxhash = "0.2.1"
fail = { version = "0.3", optional = true }
getset = "0.0.9"
protobuf = "2"
slog = "2.2"
slog-stdlog = { version = "4", optional = true }
slog-envlogger = { version = "2.1.0", optional = true }
slog-term = { version = "2.4.0", optional = true }
quick-error = "1.2.2"
raft-proto = { path = "proto", version = "0.6.0-alpha", default-features = false }
rand = "0.7"
fxhash = "0.2.1"
fail = { version = "0.3", optional = true }
getset = "0.0.7"
slog = "2.2"
slog-envlogger = { version = "2.1.0", optional = true }
slog-stdlog = { version = "4", optional = true }
slog-term = { version = "2.4.0", optional = true }

[dev-dependencies]
criterion = "0.3.0"
regex = "1.1"
slog-stdlog = "4"
slog-envlogger = "2.1.0"
criterion = "0.3"
regex = "1"
slog-async = "2.3.0"
slog-envlogger = "2.1.0"
slog-stdlog = "4"
slog-term = "2.4.0"

[[bench]]
name = "benches"
harness = false

[badges]
travis-ci = { repository = "pingcap/raft-rs" }
travis-ci = { repository = "tikv/raft-rs" }

[[example]]
name = "single_mem_node"
Expand Down
6 changes: 3 additions & 3 deletions harness/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ rand = "0.7"
slog = "2.2"

[dev-dependencies]
criterion = ">0.2.4"
criterion = "0.3"
fxhash = "0.2.1"
lazy_static = "1.0"
lazy_static = "1"
protobuf = "2"
regex = "1.1"
regex = "1"
83 changes: 83 additions & 0 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4591,3 +4591,86 @@ fn test_request_snapshot_on_role_change() {
nt.peers[&2].pending_request_snapshot
);
}

#[test]
fn test_custom_quorum() {
fn unsafe_quorum_fn_1(_: usize) -> usize {
1
}

fn unsafe_quorum_fn_2(_: usize) -> usize {
100
}

fn safe_quorum_fn_3(voters_len: usize) -> usize {
(voters_len + 1) / 2 + 1
}

#[allow(clippy::type_complexity)]
let cases: Vec<(fn(usize) -> usize, usize)> = vec![
(unsafe_quorum_fn_1, 3),
(unsafe_quorum_fn_2, 5),
(safe_quorum_fn_3, 4),
];

for (f, expect_quorum) in cases {
let mut peers = Vec::new();
for i in 1..=5 {
let l = default_logger();
let storage = new_storage();
let raft = new_test_raft_with_quorum_fn(i, vec![1, 2, 3, 4, 5], 10, 1, storage, f, &l);
peers.push(Some(raft));
}
let mut network = Network::new(peers, &default_logger());

for isolated in ((5 - expect_quorum)..=expect_quorum).rev() {
network.recover();
for id in 2..(2 + isolated) {
network.isolate(id as u64);
}
network.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

if 5 - isolated == expect_quorum {
assert_eq!(network.peers[&1].state, StateRole::Leader);
break;
}
assert_eq!(network.peers[&1].state, StateRole::Candidate);
}

let old_committed = network.peers[&1].raft_log.committed;
for isolated in ((5 - expect_quorum)..=expect_quorum).rev() {
network.recover();
for id in 2..(2 + isolated) {
network.isolate(id as u64);
}
network.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]);

if 5 - isolated == expect_quorum {
assert!(network.peers[&1].raft_log.committed > old_committed);
break;
}
assert!(network.peers[&1].raft_log.committed == old_committed);
}

for isolated in ((5 - expect_quorum)..=expect_quorum).rev() {
network.recover();
network.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);
for id in 2..(2 + isolated) {
network.isolate(id as u64);
}

// Peer 1 can keep its leadership in the first check quorum.
network.send(vec![new_message(1, 1, MessageType::MsgCheckQuorum, 0)]);
assert_eq!(network.peers[&1].state, StateRole::Leader);

network.send(vec![new_message(1, 1, MessageType::MsgBeat, 0)]);
network.send(vec![new_message(1, 1, MessageType::MsgCheckQuorum, 0)]);

if 5 - isolated == expect_quorum {
assert_eq!(network.peers[&1].state, StateRole::Leader);
break;
}
assert_eq!(network.peers[&1].state, StateRole::Follower);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl Sandbox {

fn new_storage(peers: Vec<u64>, snapshot_index: u64, last_index: u64) -> MemStorage {
let s = MemStorage::new_with_conf_state((peers.clone(), vec![]));
let snapshot = new_snapshot(snapshot_index, 1, peers.clone());
let snapshot = new_snapshot(snapshot_index, 1, peers);
s.wl().apply_snapshot(snapshot).unwrap();
if snapshot_index < last_index {
let mut ents = vec![];
Expand All @@ -214,7 +214,7 @@ fn new_storage_by_scenario(
let s = MemStorage::new_with_conf_state((peers.clone(), vec![]));
match scenario {
FollowerScenario::UpToDate => {
let snapshot = new_snapshot(snapshot_index, 1, peers.clone());
let snapshot = new_snapshot(snapshot_index, 1, peers);
s.wl().apply_snapshot(snapshot).unwrap();
let mut ents = vec![];
for index in snapshot_index + 1..last_index {
Expand All @@ -225,7 +225,7 @@ fn new_storage_by_scenario(
}
FollowerScenario::NeedEntries(index) => {
assert!(index > snapshot_index);
let snapshot = new_snapshot(snapshot_index, 1, peers.clone());
let snapshot = new_snapshot(snapshot_index, 1, peers);
s.wl().apply_snapshot(snapshot).unwrap();
let mut ents = vec![];
for i in snapshot_index + 1..index {
Expand Down Expand Up @@ -330,7 +330,7 @@ fn test_delegate_in_group_containing_leader() {
(3, FollowerScenario::Snapshot),
(4, FollowerScenario::UpToDate),
];
let mut sandbox = Sandbox::new(&l, 1, followers.clone(), group_config.clone(), 5, 10);
let mut sandbox = Sandbox::new(&l, 1, followers, group_config, 5, 10);

sandbox.propose(true);
let msgs = sandbox.leader_mut().read_messages();
Expand Down Expand Up @@ -636,7 +636,7 @@ fn test_follower_replication_with_inconsistent_progress_set() {
// only bcast to 4 even if the peer2 is in group system
assert_eq!(m.bcast_targets, vec![4]);

let mut sandbox = Sandbox::new(&l, 1, followers.clone(), group_config.clone(), 5, 20);
let mut sandbox = Sandbox::new(&l, 1, followers, group_config, 5, 20);
// Remove peer 2 from group system
sandbox.leader_mut().groups.remove_node(2);
sandbox.propose(false);
Expand Down
2 changes: 1 addition & 1 deletion harness/tests/integration_cases/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ fn test_raw_node_read_index_to_old_leader() {
);
assert_eq!(
nt.peers[&1].msgs[1],
new_message_with_entries(3, 3, MessageType::MsgReadIndex, vec![test_entries.clone()])
new_message_with_entries(3, 3, MessageType::MsgReadIndex, vec![test_entries])
);
}

Expand Down
20 changes: 20 additions & 0 deletions harness/tests/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,26 @@ pub fn new_test_raft_with_config(config: &Config, storage: MemStorage, l: &Logge
Interface::new(Raft::new(config, storage, l).unwrap())
}

pub fn new_test_raft_with_quorum_fn(
id: u64,
peers: Vec<u64>,
election: usize,
heartbeat: usize,
storage: MemStorage,
quorum_fn: fn(usize) -> usize,
l: &Logger,
) -> Interface {
let mut config = new_test_config(id, election, heartbeat);
config.quorum_fn = quorum_fn;
if storage.initial_state().unwrap().initialized() && peers.is_empty() {
panic!("new_test_raft with empty peers on initialized store");
}
if !peers.is_empty() && !storage.initial_state().unwrap().initialized() {
storage.initialize_with_conf_state((peers, vec![]));
}
new_test_raft_with_config(&config, storage, l)
}

pub fn hard_state(t: u64, c: u64, v: u64) -> HardState {
let mut hs = HardState::default();
hs.term = t;
Expand Down
7 changes: 3 additions & 4 deletions proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ build = "build.rs"
[features]
default = ["protobuf-codec"]
protobuf-codec = ["protobuf-build/protobuf-codec"]
prost-codec = ["prost", "prost-derive", "bytes", "lazy_static", "protobuf-build/prost-codec"]
prost-codec = ["protobuf-build/prost-codec", "prost", "bytes", "lazy_static"]

[build-dependencies]
protobuf-build = { version = "0.10", default-features = false }

[dependencies]
bytes = { version = "0.4.11", optional = true }
lazy_static = { version = "1.3.0", optional = true }
lazy_static = { version = "1", optional = true }
bytes = { version = "0.4", optional = true }
prost = { version = "0.5", optional = true }
prost-derive = { version = "0.5", optional = true }
protobuf = "2"
5 changes: 5 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ pub struct Config {
/// Batches every append msg if any append msg already exists
pub batch_append: bool,

/// Function to custom `quorum` for Raft. The return value will be normalized into range
/// [majority, voters_len].
pub quorum_fn: fn(usize) -> usize,

/// The Group ID of this node in the feature Follower Replication.
pub group_id: u64,
}
Expand All @@ -109,6 +113,7 @@ impl Default for Config {
read_only_option: ReadOnlyOption::Safe,
skip_bcast_commit: false,
batch_append: false,
quorum_fn: crate::majority,
group_id: INVALID_ID,
}
}
Expand Down
16 changes: 6 additions & 10 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@
// TODO: std::error::Error::description is deprecated now, resolve it later.
#![allow(deprecated)]

use protobuf::ProtobufError;
use std::error;
use std::{cmp, io, result};

quick_error! {
/// The base error type for raft
#[derive(Debug)]
pub enum Error {
/// An IO error occurred
Io(err: io::Error) {
Io(err: std::io::Error) {
from()
cause(err)
description(err.description())
Expand Down Expand Up @@ -40,7 +36,7 @@ quick_error! {
description(desc)
}
/// A protobuf message codec failed in some manner.
CodecError(err: ProtobufError) {
CodecError(err: protobuf::ProtobufError) {
from()
cause(err)
description(err.description())
Expand All @@ -61,7 +57,7 @@ quick_error! {
}
}

impl cmp::PartialEq for Error {
impl PartialEq for Error {
#[cfg_attr(feature = "cargo-clippy", allow(clippy::match_same_arms))]
fn eq(&self, other: &Error) -> bool {
match (self, other) {
Expand Down Expand Up @@ -98,7 +94,7 @@ quick_error! {
description("snapshot is temporarily unavailable")
}
/// Some other error occurred.
Other(err: Box<dyn error::Error + Sync + Send>) {
Other(err: Box<dyn std::error::Error + Sync + Send>) {
from()
cause(err.as_ref())
description(err.description())
Expand All @@ -107,7 +103,7 @@ quick_error! {
}
}

impl cmp::PartialEq for StorageError {
impl PartialEq for StorageError {
#[cfg_attr(feature = "cargo-clippy", allow(clippy::match_same_arms))]
fn eq(&self, other: &StorageError) -> bool {
match (self, other) {
Expand All @@ -124,7 +120,7 @@ impl cmp::PartialEq for StorageError {
}

/// A result type that wraps up the raft errors.
pub type Result<T> = result::Result<T, Error>;
pub type Result<T> = std::result::Result<T, Error>;

#[cfg(test)]
mod tests {
Expand Down
7 changes: 4 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,11 +449,11 @@ before taking old, removed peers offline.
extern crate fail;

#[macro_use]
extern crate slog;
extern crate getset;
#[macro_use]
extern crate quick_error;
#[macro_use]
extern crate getset;
extern crate slog;

macro_rules! fatal {
($logger:expr, $msg:expr) => {{
Expand Down Expand Up @@ -490,14 +490,15 @@ pub use self::config::Config;
pub use self::errors::{Error, Result, StorageError};
pub use self::log_unstable::Unstable;
pub use self::progress::inflights::Inflights;
pub use self::progress::progress_set::{majority, Configuration, ProgressSet};
pub use self::progress::progress_set::{Configuration, ProgressSet};
pub use self::progress::{Progress, ProgressState};
pub use self::raft::{vote_resp_msg_type, Raft, SoftState, StateRole, INVALID_ID, INVALID_INDEX};
pub use self::raft_log::{RaftLog, NO_LIMIT};
pub use self::raw_node::{is_empty_snap, Peer, RawNode, Ready, SnapshotStatus};
pub use self::read_only::{ReadOnlyOption, ReadState};
pub use self::status::{Status, StatusRef};
pub use self::storage::{RaftState, Storage};
pub use self::util::majority;
pub use raft_proto::eraftpb;

pub mod prelude {
Expand Down
Loading

0 comments on commit f89086c

Please sign in to comment.