diff --git a/plenum/common/ledger_manager.py b/plenum/common/ledger_manager.py index a77a1487c7..eb9e6d54cd 100644 --- a/plenum/common/ledger_manager.py +++ b/plenum/common/ledger_manager.py @@ -14,7 +14,7 @@ from plenum.common.types import LedgerStatus, CatchupRep, \ ConsistencyProof, f, CatchupReq, ConsProofRequest -from plenum.common.constants import POOL_LEDGER_ID, LedgerState +from plenum.common.constants import POOL_LEDGER_ID, LedgerState, DOMAIN_LEDGER_ID from plenum.common.util import getMaxFailures from plenum.common.config_util import getConfig from stp_core.common.log import getlogger @@ -333,6 +333,8 @@ def canProcessConsistencyProof(self, proof: ConsistencyProof) -> bool: self.owner.totalNodes, ledgerInfo.state, LedgerState.not_synced)) self.setLedgerState(ledgerId, LedgerState.not_synced) + if ledgerId == DOMAIN_LEDGER_ID: + ledgerInfo.preCatchupStartClbk() return self.canProcessConsistencyProof(proof) start = getattr(proof, f.SEQ_NO_START.nm) @@ -770,9 +772,9 @@ def catchupCompleted(self, ledgerId: int, lastPpSeqNo: int=-1): ledgerInfo.canSync = False ledgerInfo.state = LedgerState.synced - ledgerInfo.postCatchupCompleteClbk() ledgerInfo.ledgerStatusOk = set() ledgerInfo.recvdConsistencyProofs = {} + ledgerInfo.postCatchupCompleteClbk() if self.postAllLedgersCaughtUp: if all(l.state == LedgerState.synced diff --git a/plenum/server/node.py b/plenum/server/node.py index 3288ac7686..709ec467b2 100644 --- a/plenum/server/node.py +++ b/plenum/server/node.py @@ -149,6 +149,7 @@ def __init__(self, self.ledgerManager.addLedger(DOMAIN_LEDGER_ID, self.domainLedger, + preCatchupStartClbk=self.preDomainLedgerCatchUp, postCatchupCompleteClbk=self.postDomainLedgerCaughtUp, postTxnAddedToLedgerClbk=self.postTxnFromCatchupAddedToLedger) self.states[DOMAIN_LEDGER_ID] = self.loadDomainState() @@ -1386,13 +1387,20 @@ def start_domain_ledger_sync(self): self.sendDomainLedgerStatus(nm) self.ledgerManager.processStashedLedgerStatuses(DOMAIN_LEDGER_ID) + def preDomainLedgerCatchUp(self): + """ + Ledger got out of sync. Setting node's state accordingly + :return: + """ + self.mode = Mode.syncing + def postDomainLedgerCaughtUp(self, **kwargs): """ Process any stashed ordered requests and set the mode to `participating` :return: """ - pass + self.mode = Mode.participating def postTxnFromCatchupAddedToLedger(self, ledgerId: int, txn: Any): self.reqsFromCatchupReplies.add((txn.get(f.IDENTIFIER.nm), diff --git a/plenum/test/node_catchup/test_node_catchup_after_disconnect.py b/plenum/test/node_catchup/test_node_catchup_after_disconnect.py index dacbe8e3a5..b5e37d2d69 100644 --- a/plenum/test/node_catchup/test_node_catchup_after_disconnect.py +++ b/plenum/test/node_catchup/test_node_catchup_after_disconnect.py @@ -14,7 +14,7 @@ def testNodeCatchupAfterDisconnect(newNodeCaughtUp, txnPoolNodeSet, nodeSetWithNodeAddedAfterSomeTxns): """ - A node that restarts after some transactions should eventually get the + A node that disconnects after some transactions should eventually get the transactions which happened while it was disconnected :return: """ diff --git a/plenum/test/node_catchup/test_node_catchup_after_lost_connection.py b/plenum/test/node_catchup/test_node_catchup_after_lost_connection.py new file mode 100644 index 0000000000..f74e733c66 --- /dev/null +++ b/plenum/test/node_catchup/test_node_catchup_after_lost_connection.py @@ -0,0 +1,35 @@ +from stp_core.common.log import getlogger +from plenum.test.test_node import ensure_node_disconnected +from plenum.test.helper import sendReqsToNodesAndVerifySuffReplies +from plenum.test.node_catchup.helper import waitNodeDataEquality, waitNodeDataUnequality + +# Do not remove the next import +from plenum.test.node_catchup.conftest import whitelist + +logger = getlogger() +txnCount = 5 + + +# TODO: Refactor tests to minimize module-scoped fixtures.They make tests depend on each other +def testNodeCatchupAfterLostConnection(newNodeCaughtUp, txnPoolNodeSet, + nodeSetWithNodeAddedAfterSomeTxns): + """ + A node that has poor internet connection and got unsynced after some transactions should eventually get the + transactions which happened while it was not accessible + :return: + """ + looper, newNode, client, wallet, _, _ = nodeSetWithNodeAddedAfterSomeTxns + logger.debug("Stopping node {} with pool ledger size {}". + format(newNode, newNode.poolManager.txnSeqNo)) + looper.removeProdable(newNode) + # TODO: Check if the node has really stopped processing requests? + logger.debug("Sending requests") + sendReqsToNodesAndVerifySuffReplies(looper, wallet, client, 5) + # Make sure new node got out of sync + waitNodeDataUnequality(looper, newNode, *txnPoolNodeSet[:-1]) + logger.debug("Ensure node {} gets disconnected".format(newNode)) + ensure_node_disconnected(looper, newNode, txnPoolNodeSet[:-1]) + logger.debug("Starting the stopped node, {}".format(newNode)) + looper.add(newNode) + logger.debug("Waiting for the node to catch up, {}".format(newNode)) + waitNodeDataEquality(looper, newNode, *txnPoolNodeSet[:-1])