Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix primary selection after primary demotion (INDY-463) #354

Merged
2 changes: 1 addition & 1 deletion plenum/server/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ def schedule_node_status_dump(self):
)

@property
def rank(self) -> int:
def rank(self) -> Optional[int]:
return self.poolManager.rank

def get_name_by_rank(self, rank):
Expand Down
80 changes: 54 additions & 26 deletions plenum/server/pool_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ def _get_rank(needle_id: str, haystack_ids: List[str]):
# Return the rank of the node where rank is defined by the order in
# which node was added to the pool or on the alphabetical order of name
# if using RegistryPoolManager
return haystack_ids.index(needle_id)
try:
return haystack_ids.index(needle_id)
except ValueError:
return None

@property
@abstractmethod
Expand All @@ -63,24 +66,28 @@ def id(self):
"""

@abstractmethod
def get_rank_of(self, node_id) -> int:
"""
def get_rank_of(self, node_id) -> Optional[int]:
"""Return node rank among active pool validators by id

:param node_id: node's id
:return: rank of the node or None if not found
"""

@property
def rank(self) -> Optional[int]:
# Nodes have a total order defined in them, rank is the node's
# position in that order
if self._rank is None:
self._rank = self.get_rank_of(self.id)
return self._rank
return self.get_rank_of(self.id)

@abstractmethod
def get_name_by_rank(self, rank):
def get_name_by_rank(self, rank) -> Optional[str]:
# Needed for communicating primary name to others and also nodeReg
# uses node names (alias) and not ids
# TODO: Should move to using node ids and not node names (alias)
"""
"""Return node name (alias) by rank among active pool validators

:param rank: rank of the node
:return: name of the node or None if not found
"""


Expand All @@ -105,15 +112,17 @@ def __init__(self, node, ha=None, cliname=None, cliha=None):
self.basedirpath = node.basedirpath
self._ledger = None
self._id = None
self._rank = None

TxnStackManager.__init__(
self, self.name, self.basedirpath, isNode=True)
self.state = self.loadState()
self.reqHandler = self.getPoolReqHandler()
self.initPoolState()
self._load_nodes_order_from_ledger()
self.nstack, self.cstack, self.nodeReg, self.cliNodeReg = \
self.getStackParamsAndNodeReg(self.name, self.basedirpath, ha=ha,
cliname=cliname, cliha=cliha)

self._dataFieldsValidators = (
(NODE_IP, self._isIpAddressValid),
(CLIENT_IP, self._isIpAddressValid),
Expand Down Expand Up @@ -207,6 +216,8 @@ def onPoolMembershipChange(self, txn):
nodeName = txn[DATA][ALIAS]
nodeNym = txn[TARGET_NYM]

self._order_node(nodeNym, nodeName)

def _updateNode(txn):
if {NODE_IP, NODE_PORT, CLIENT_IP, CLIENT_PORT}. \
intersection(set(txn[DATA].keys())):
Expand Down Expand Up @@ -369,33 +380,46 @@ def id(self):
self._id = txn[TARGET_NYM]
return self._id

@property
def node_ids_in_ordered_by_rank(self) -> List:
ids = OrderedDict()
def _load_nodes_order_from_ledger(self):
self._ordered_node_ids = OrderedDict()
for _, txn in self.ledger.getAllTxn():
ids[txn[TARGET_NYM]] = True
return list(ids.keys())
if txn[TXN_TYPE] == NODE:
self._order_node(txn[TARGET_NYM], txn[DATA][ALIAS])

def _order_node(self, nodeNym, nodeName):
assert self._ordered_node_ids.get(nodeNym) in (nodeName, None), (
"{} trying to order already ordered node {} ({}) "
"with other alias {}".format(
self.name, self._ordered_node_ids.get(nodeNym), nodeNym))

self._ordered_node_ids[nodeNym] = nodeName

@property
def node_ids_ordered_by_rank(self) -> List:
return [nym for nym, name in self._ordered_node_ids.items()
if name in self.nodeReg]

def get_rank_of(self, node_id) -> Optional[int]:
if self.id is None:
# This can happen if a non-genesis node starts
return None
return self._get_rank(node_id, self.node_ids_in_ordered_by_rank)
return self._get_rank(node_id, self.node_ids_ordered_by_rank)

def get_name_by_rank(self, rank):
# This is expensive but only required while start or view change
id = self.node_ids_in_ordered_by_rank[rank]
# We don't allow changing ALIAS
for _, txn in self.ledger.getAllTxn():
if txn[TARGET_NYM] == id and DATA in txn and ALIAS in txn[DATA]:
return txn[DATA][ALIAS]
def get_name_by_rank(self, rank) -> Optional[str]:
try:
nym = self.node_ids_ordered_by_rank[rank]
except IndexError:
return None
else:
return self._ordered_node_ids[nym]


class RegistryPoolManager(PoolManager):
# This is the old way of managing the pool nodes information and
# should be deprecated.
def __init__(self, name, basedirpath, nodeRegistry, ha, cliname, cliha):
self._rank = None
self._ordered_node_names = None

self.nstack, self.cstack, self.nodeReg, self.cliNodeReg = \
self.getStackParamsAndNodeReg(name=name, basedirpath=basedirpath,
nodeRegistry=nodeRegistry, ha=ha,
Expand Down Expand Up @@ -491,8 +515,12 @@ def id(self):
def node_names_ordered_by_rank(self) -> List:
return sorted(self.nodeReg.keys())

def get_rank_of(self, node_id) -> int:
def get_rank_of(self, node_id) -> Optional[int]:
# TODO node_id here has got another meaning
return self._get_rank(node_id, self.node_names_ordered_by_rank)

def get_name_by_rank(self, rank):
return self.node_names_ordered_by_rank[rank]
def get_name_by_rank(self, rank) -> Optional[str]:
try:
return self.node_names_ordered_by_rank[rank]
except IndexError:
return None
7 changes: 5 additions & 2 deletions plenum/server/primary_decider.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Iterable
from typing import Iterable, Optional
from collections import deque

from plenum.common.constants import VIEW_CHANGE_PREFIX
Expand All @@ -23,7 +23,6 @@ def __init__(self, node):
self.f = node.f
self.replicas = node.replicas
self.viewNo = node.viewNo
self.rank = node.rank
self.nodeNames = node.allNodeNames
self.nodeCount = 0
self.inBox = deque()
Expand All @@ -40,6 +39,10 @@ def __init__(self, node):
def __repr__(self):
return "{}".format(self.name)

@property
def rank(self) -> Optional[int]:
return self.node.rank

@property
def was_master_primary_in_prev_view(self):
return self.previous_master_primary == self.name
Expand Down
15 changes: 11 additions & 4 deletions plenum/server/primary_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,9 @@ def has_view_change_from_primary(self) -> bool:
if next_primary_name not in self._view_change_done:
logger.debug(
"{} has not received ViewChangeDone from the next "
"primary {}". format(
self.name, next_primary_name))
"primary {} (viewNo: {}, totalNodes: {})". format(
self.name, next_primary_name,
self.viewNo, self.node.totalNodes))
return False
else:
self._has_view_change_from_primary = True
Expand Down Expand Up @@ -317,8 +318,14 @@ def _get_primary_id(self, view_no, instance_id):
return (view_no + instance_id) % self.node.totalNodes

def next_primary_node_name(self, instance_id):
return self.node.get_name_by_rank(self._get_primary_id(
self.viewNo, instance_id))
rank = self._get_primary_id(self.viewNo, instance_id)
name = self.node.get_name_by_rank(rank)

assert name, ("{} failed to get node name for rank {}: "
"view_no {}, instance_id {}, totalNodes {}".format(
self, rank, self.viewNo, instance_id,
self.node.totalNodes))
return name

def next_primary_replica_name(self, instance_id):
"""
Expand Down
26 changes: 25 additions & 1 deletion plenum/test/primary_selection/conftest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import pytest

from plenum.test.node_catchup.helper import waitNodeDataEquality
from plenum.test.primary_selection.helper import check_newly_added_nodes
from plenum.test.primary_selection.helper import check_newly_added_nodes, \
getPrimaryNodesIdxs
from plenum.test.pool_transactions.conftest import clientAndWallet1, \
client1, wallet1, client1Connected, looper, nodeThetaAdded, \
stewardAndWallet1, steward1, stewardWallet
from plenum.test.pool_transactions.helper import buildPoolClientAndWallet


@pytest.fixture(scope="module")
Expand All @@ -15,3 +17,25 @@ def one_node_added(looper, txnPoolNodeSet, nodeThetaAdded):
waitNodeDataEquality(looper, new_node, *txnPoolNodeSet[:-1])
check_newly_added_nodes(looper, txnPoolNodeSet, [new_node])
return new_node


@pytest.fixture(scope="module")
def txnPoolMasterNodes(txnPoolNodeSet):
primariesIdxs = getPrimaryNodesIdxs(txnPoolNodeSet)
return txnPoolNodeSet[primariesIdxs[0]], txnPoolNodeSet[primariesIdxs[1]]


@pytest.fixture(scope="module")
def stewardAndWalletForMasterNode(looper, poolTxnData, poolTxnStewardNames,
tdirWithPoolTxns, txnPoolNodeSet, txnPoolMasterNodes):
primariesIdxs = getPrimaryNodesIdxs(txnPoolNodeSet)
master_node = txnPoolMasterNodes[0]
stewardName = poolTxnStewardNames[primariesIdxs[0]]
stewardsSeed = poolTxnData["seeds"][stewardName].encode()

stewardClient, stewardWallet = buildPoolClientAndWallet(
(stewardName, stewardsSeed), tdirWithPoolTxns)
looper.add(stewardClient)
looper.run(stewardClient.ensureConnectedToNodes())

return stewardClient, stewardWallet
18 changes: 16 additions & 2 deletions plenum/test/primary_selection/helper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Sequence, List

from plenum.server.pool_manager import RegistryPoolManager, TxnPoolManager
from plenum.test.test_node import checkProtocolInstanceSetup
from plenum.test.test_node import TestNode, checkProtocolInstanceSetup


def check_rank_consistent_across_each_node(nodes):
Expand All @@ -23,7 +25,7 @@ def check_rank_consistent_across_each_node(nodes):
if isinstance(node.poolManager, RegistryPoolManager):
order.append(node.poolManager.node_names_ordered_by_rank)
elif isinstance(node.poolManager, TxnPoolManager):
order.append(node.poolManager.node_ids_in_ordered_by_rank)
order.append(node.poolManager.node_ids_ordered_by_rank)
else:
RuntimeError('Dont know this pool manager {}'.
format(node.poolManager))
Expand All @@ -41,3 +43,15 @@ def check_newly_added_nodes(looper, all_nodes, new_nodes):
assert all(new_node.rank > n.rank for n in old_nodes)
old_nodes.append(new_node)
checkProtocolInstanceSetup(looper, all_nodes, retryWait=1)


def getPrimaryNodesIdxs(nodes: Sequence[TestNode]) -> List[TestNode]:
primariesIdxs = []
for instId in range(len(nodes[0].replicas)):
for idx, node in enumerate(nodes):
if node.replicas[instId].isPrimary:
assert instId == len(primariesIdxs)
primariesIdxs.append(idx)

assert len(set(primariesIdxs)) == len(nodes[0].replicas)
return primariesIdxs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from stp_core.common.log import getlogger

from plenum.common.constants import ALIAS, SERVICES
from plenum.test.pool_transactions.conftest import looper
from plenum.test.pool_transactions.helper import updateNodeData

from plenum.test.test_node import TestNode, checkNodesConnected, \
ensureElectionsDone
from plenum.test.helper import checkViewNoForNodes, \
sendReqsToNodesAndVerifySuffReplies

from plenum.test.primary_selection.helper import getPrimaryNodesIdxs

logger = getlogger()

def test_primary_selection_after_primary_demotion_and_pool_restart(looper,
txnPoolNodeSet, stewardAndWalletForMasterNode, txnPoolMasterNodes,
tconf, tdirWithPoolTxns):
"""
Demote primary and restart the pool.
Pool should select new primary and have viewNo=0 after restart.
"""

logger.info("1. turn off the node which has primary replica for master instanse")
master_node = txnPoolMasterNodes[0]
client, wallet = stewardAndWalletForMasterNode

node_data = {
ALIAS: master_node.name,
SERVICES: []
}
updateNodeData(looper, client, wallet, master_node, node_data)

restNodes = [node for node in txnPoolNodeSet if node.name != master_node.name]
ensureElectionsDone(looper, restNodes)

# ensure pool is working properly
sendReqsToNodesAndVerifySuffReplies(looper, wallet, client, numReqs=3)

logger.info("2. restart pool")
# Stopping existing nodes
for node in txnPoolNodeSet:
node.stop()
looper.removeProdable(node)

# Starting nodes again by creating `Node` objects since that simulates
# what happens when starting the node with script
restartedNodes = []
for node in txnPoolNodeSet:
restartedNode = TestNode(node.name, basedirpath=tdirWithPoolTxns,
config=tconf, ha=node.nodestack.ha,
cliha=node.clientstack.ha)
looper.add(restartedNode)
restartedNodes.append(restartedNode)

restNodes = [node for node in restartedNodes if node.name != master_node.name]

looper.run(checkNodesConnected(restNodes))
ensureElectionsDone(looper, restNodes)
checkViewNoForNodes(restNodes, 0)
sendReqsToNodesAndVerifySuffReplies(looper, wallet, client, numReqs=3)

primariesIdxs = getPrimaryNodesIdxs(restNodes)
assert restNodes[primariesIdxs[0]].name != master_node.name
Loading