diff --git a/plenum/client/client.py b/plenum/client/client.py index 2a44b489eb..c71f2d8238 100644 --- a/plenum/client/client.py +++ b/plenum/client/client.py @@ -12,6 +12,7 @@ from typing import List, Union, Dict, Optional, Tuple, Set, Any, \ Iterable +from plenum.common.ledger import Ledger from plenum.common.stacks import nodeStackClass from stp_core.crypto.nacl_wrappers import Signer from stp_core.network.auth_mode import AuthMode @@ -31,10 +32,10 @@ from plenum.common.motor import Motor from plenum.common.plugin_helper import loadPlugins from plenum.common.request import Request -from plenum.common.startable import Status, LedgerState, Mode +from plenum.common.startable import Status, Mode from plenum.common.constants import REPLY, POOL_LEDGER_TXNS, \ LEDGER_STATUS, CONSISTENCY_PROOF, CATCHUP_REP, REQACK, REQNACK, REJECT, OP_FIELD_NAME, \ - POOL_LEDGER_ID, TXN_TIME + POOL_LEDGER_ID, TXN_TIME, LedgerState from plenum.common.txn_util import getTxnOrderedFields from plenum.common.types import Reply, f, LedgerStatus, TaggedTuples from plenum.common.util import getMaxFailures, checkIfMoreThanFSameItems, rawToFriendly @@ -106,7 +107,7 @@ def __init__(self, self.mode = None HasPoolManager.__init__(self) self.ledgerManager = LedgerManager(self, ownedByNode=False) - self.ledgerManager.addLedger(0, self.ledger, + self.ledgerManager.addLedger(POOL_LEDGER_ID, self.ledger, postCatchupCompleteClbk=self.postPoolLedgerCaughtUp, postTxnAddedToLedgerClbk=self.postTxnFromCatchupAddedToLedger) else: @@ -624,13 +625,12 @@ def verifyMerkleProof(*replies: Tuple[Reply]) -> bool: ignored = {F.auditPath.name, F.seqNo.name, F.rootHash.name, TXN_TIME} for r in replies: seqNo = r[f.RESULT.nm][F.seqNo.name] - rootHash = base64.b64decode( - r[f.RESULT.nm][F.rootHash.name].encode()) - auditPath = [base64.b64decode( - a.encode()) for a in r[f.RESULT.nm][F.auditPath.name]] + rootHash = Ledger.strToHash( + r[f.RESULT.nm][F.rootHash.name]) + auditPath = [Ledger.strToHash(a) for a in + r[f.RESULT.nm][F.auditPath.name]] filtered = dict((k, v) for (k, v) in r[f.RESULT.nm].items() - if k not in ignored - ) + if k not in ignored) result = serializer.serialize(filtered) verifier.verify_leaf_inclusion(result, seqNo - 1, auditPath, diff --git a/plenum/common/constants.py b/plenum/common/constants.py index d59c62cce5..ef0c1337b3 100644 --- a/plenum/common/constants.py +++ b/plenum/common/constants.py @@ -1,5 +1,5 @@ # inter-node communication -from enum import IntEnum +from enum import IntEnum, unique from plenum.common.roles import Roles from plenum.common.transactions import PlenumTransactions @@ -101,10 +101,19 @@ class StorageType(IntEnum): File = 1 Ledger = 2 + class KeyValueStorageType(IntEnum): Leveldb = 1 Memory = 2 + +@unique +class LedgerState(IntEnum): + not_synced = 1 + syncing = 2 + synced = 3 + + OP_FIELD_NAME = "op" CLIENT_STACK_SUFFIX = "C" @@ -121,4 +130,4 @@ class KeyValueStorageType(IntEnum): PLUGIN_BASE_DIR_PATH = "PluginBaseDirPath" POOL_LEDGER_ID = 0 -DOMAIN_LEDGER_ID = 1 \ No newline at end of file +DOMAIN_LEDGER_ID = 1 diff --git a/plenum/common/keygen_utils.py b/plenum/common/keygen_utils.py index 9059588ab4..be307ffda2 100644 --- a/plenum/common/keygen_utils.py +++ b/plenum/common/keygen_utils.py @@ -30,6 +30,7 @@ def initNodeKeysForBothStacks(name, baseDir, sigseed, override=False, config=Non return nodeStackClass.initLocalKeys(name, baseDir, sigseed, override=override) + def areKeysSetup(name, baseDir, config=None): return nodeStackClass.areKeysSetup(name, baseDir) diff --git a/plenum/common/ledger.py b/plenum/common/ledger.py index f80a546370..2a003a3599 100644 --- a/plenum/common/ledger.py +++ b/plenum/common/ledger.py @@ -26,10 +26,6 @@ def __init__(self, *args, **kwargs): self.uncommittedRootHash = None self.uncommittedTree = None - @property - def uncommittedSize(self): - return len(self.uncommittedTxns) - def appendTxns(self, txns: List): # These transactions are not yet committed so they do not go to # the ledger @@ -102,3 +98,7 @@ def treeWithAppliedTxns(self, txns: List, currentTree=None): @staticmethod def hashToStr(h): return base58.b58encode(h) + + @staticmethod + def strToHash(s): + return base58.b58decode(s) diff --git a/plenum/common/ledger_manager.py b/plenum/common/ledger_manager.py index c1153090bb..8a3c14a632 100644 --- a/plenum/common/ledger_manager.py +++ b/plenum/common/ledger_manager.py @@ -1,6 +1,5 @@ import heapq import operator -from base64 import b64encode, b64decode from collections import Callable from functools import partial from random import shuffle @@ -9,14 +8,13 @@ from typing import Optional import time -from ledger.ledger import Ledger +from plenum.common.ledger import Ledger from ledger.merkle_verifier import MerkleVerifier from ledger.util import F -from plenum.common.startable import LedgerState from plenum.common.types import LedgerStatus, CatchupRep, \ ConsistencyProof, f, CatchupReq, ConsProofRequest -from plenum.common.constants import POOL_LEDGER_ID +from plenum.common.constants import POOL_LEDGER_ID, LedgerState from plenum.common.util import getMaxFailures from plenum.common.config_util import getConfig from stp_core.common.log import getlogger @@ -323,6 +321,10 @@ def canProcessConsistencyProof(self, proof: ConsistencyProof) -> bool: def processCatchupReq(self, req: CatchupReq, frm: str): logger.debug("{} received catchup request: {} from {}". format(self, req, frm)) + if not self.ownedByNode: + self.discard(req, reason="Only node can serve catchup requests", + logMethod=logger.warn) + return start = getattr(req, f.SEQ_NO_START.nm) end = getattr(req, f.SEQ_NO_END.nm) @@ -352,8 +354,11 @@ def processCatchupReq(self, req: CatchupReq, frm: str): logger.debug("{} generating consistency proof: {} from {}". format(self, end, req.catchupTill)) - consProof = [b64encode(p).decode() for p in + consProof = [Ledger.hashToStr(p) for p in ledger.tree.consistency_proof(end, req.catchupTill)] + + for seq_no in txns: + txns[seq_no] = self.owner.update_txn_with_extra_data(txns[seq_no]) self.sendTo(msg=CatchupRep(getattr(req, f.LEDGER_ID.nm), txns, consProof), to=frm) @@ -422,7 +427,7 @@ def _processCatchupReplies(self, ledgerId, ledger: Ledger, if result: ledgerInfo = self.getLedgerInfoByType(ledgerId) for _, txn in catchUpReplies[:toBeProcessed]: - merkleInfo = ledger.add(txn) + merkleInfo = ledger.add(self._transform(txn)) txn[F.seqNo.name] = merkleInfo[F.seqNo.name] ledgerInfo.postTxnAddedToLedgerClbk(ledgerId, txn) self._removePrcdCatchupReply(ledgerId, nodeName, seqNo) @@ -450,6 +455,14 @@ def _removePrcdCatchupReply(self, ledgerId, node, seqNo): break ledgerInfo.recvdCatchupRepliesFrm[node].pop(i) + def _transform(self, txn): + # Certain transactions other than pool ledger might need to be + # transformed to certain format before applying to the ledger + if not self.ownedByNode: + return txn + else: + return self.owner.transform_txn_for_ledger(txn) + def hasValidCatchupReplies(self, ledgerId, ledger, seqNo, catchUpReplies): # Here seqNo has to be the seqNo of first transaction of # `catchupReplies` @@ -460,10 +473,13 @@ def hasValidCatchupReplies(self, ledgerId, ledger, seqNo, catchUpReplies): seqNo) txns = getattr(catchupReply, f.TXNS.nm) + # Add only those transaction in the temporary tree from the above # batch + # Transfers of odcits in RAET converts integer keys to string - txns = [txn for s, txn in catchUpReplies[:len(txns)] if str(s) in txns] + txns = [self._transform(txn) for s, txn in catchUpReplies[:len(txns)] + if str(s) in txns] # Creating a temporary tree which will be used to verify consistency # proof, by inserting transactions. Duplicating a merkle tree is not @@ -471,8 +487,6 @@ def hasValidCatchupReplies(self, ledgerId, ledger, seqNo, catchUpReplies): tempTree = ledger.treeWithAppliedTxns(txns) proof = getattr(catchupReply, f.CONS_PROOF.nm) - - ledgerInfo = self.getLedgerInfoByType(ledgerId) verifier = ledgerInfo.verifier cp = ledgerInfo.catchUpTill @@ -481,13 +495,13 @@ def hasValidCatchupReplies(self, ledgerId, ledger, seqNo, catchUpReplies): try: logger.debug("{} verifying proof for {}, {}, {}, {}, {}". format(self, tempTree.tree_size, finalSize, - tempTree.root_hash, b64decode(finalMTH), - [b64decode(p) for p in proof])) + tempTree.root_hash, Ledger.strToHash(finalMTH), + [Ledger.strToHash(p) for p in proof])) verified = verifier.verify_tree_consistency(tempTree.tree_size, finalSize, tempTree.root_hash, - b64decode(finalMTH), - [b64decode(p) for p in + Ledger.strToHash(finalMTH), + [Ledger.strToHash(p) for p in proof]) except Exception as ex: logger.info("{} could not verify catchup reply {} since {}". @@ -778,10 +792,9 @@ def _buildConsistencyProof(self, ledgerId, seqNoStart, seqNoEnd): seqNoStart, seqNoEnd, ppSeqNo, - b64encode(oldRoot).decode(), - b64encode(newRoot).decode(), - [b64encode(p).decode() for p in - proof] + Ledger.hashToStr(oldRoot), + Ledger.hashToStr(newRoot), + [Ledger.hashToStr(p) for p in proof] ) def _compareLedger(self, status: LedgerStatus): diff --git a/plenum/common/stack_manager.py b/plenum/common/stack_manager.py index d0099d3679..3d65a35cb6 100644 --- a/plenum/common/stack_manager.py +++ b/plenum/common/stack_manager.py @@ -1,6 +1,6 @@ import os import shutil -from abc import abstractproperty +from abc import abstractmethod from collections import OrderedDict from plenum.common.keygen_utils import initRemoteKeys @@ -25,15 +25,18 @@ def __init__(self, name, basedirpath, isNode=True): self.isNode = isNode self.hashStore = None - @abstractproperty + @property + @abstractmethod def hasLedger(self) -> bool: raise NotImplementedError - @abstractproperty + @property + @abstractmethod def ledgerLocation(self) -> str: raise NotImplementedError - @abstractproperty + @property + @abstractmethod def ledgerFile(self) -> str: raise NotImplementedError diff --git a/plenum/common/startable.py b/plenum/common/startable.py index 1d0408e027..912cc7077f 100644 --- a/plenum/common/startable.py +++ b/plenum/common/startable.py @@ -53,10 +53,3 @@ class Mode(IntEnum): discovered = 3 # caught up with pool txn ledger syncing = 4 # catching up on domain txn ledger participating = 5 # caught up with domain txn ledger - - -@unique -class LedgerState(IntEnum): - not_synced = 1 - syncing = 2 - synced = 3 diff --git a/plenum/config.py b/plenum/config.py index 56457301fe..cb816e44eb 100644 --- a/plenum/config.py +++ b/plenum/config.py @@ -64,7 +64,10 @@ # Monitoring configuration PerfCheckFreq = 10 -DELTA = 0.8 + +# Temporarily reducing DELTA till the calculations for extra work are not +# incorporated +DELTA = 0.4 LAMBDA = 60 OMEGA = 5 SendMonitorStats = False @@ -164,7 +167,7 @@ # Max batch size for 3 phase commit Max3PCBatchSize = 100 # Max time to wait before creating a batch for 3 phase commit -Max3PCBatchWait = 5 +Max3PCBatchWait = 1 # Maximum lifespan for a batch, this needs to be changed if # `Max3PCBatchSize` is changed @@ -186,4 +189,4 @@ CLIENT_REQACK_TIMEOUT = 5 CLIENT_REPLY_TIMEOUT = Max3PCBatchWait + 10 CLIENT_MAX_RETRY_ACK = 5 -CLIENT_MAX_RETRY_REPLY = 5 \ No newline at end of file +CLIENT_MAX_RETRY_REPLY = 5 diff --git a/plenum/persistence/req_id_to_txn.py b/plenum/persistence/req_id_to_txn.py index 3bdf198315..68c06a0931 100644 --- a/plenum/persistence/req_id_to_txn.py +++ b/plenum/persistence/req_id_to_txn.py @@ -35,5 +35,9 @@ def get(self, identifier, reqId) -> Optional[int]: except (KeyError, ValueError): return None + @property + def size(self): + return self._keyValueStorage.size + def close(self): self._keyValueStorage.close() diff --git a/plenum/persistence/util.py b/plenum/persistence/util.py index bc2c22258a..28ff0b74f4 100644 --- a/plenum/persistence/util.py +++ b/plenum/persistence/util.py @@ -6,6 +6,9 @@ def txnsWithSeqNo(seqNoStart, seqNoEnd, txns): + """ + Update each transaction with a sequence number field + """ txns = deepcopy(txns) for txn, seqNo in zip(txns, range(seqNoStart, seqNoEnd + 1)): txn[F.seqNo.name] = seqNo @@ -13,6 +16,9 @@ def txnsWithSeqNo(seqNoStart, seqNoEnd, txns): def txnsWithMerkleInfo(ledger, committedTxns): + """ + Update each transaction with the merkle root hash and audit path + """ committedTxns = deepcopy(committedTxns) for txn in committedTxns: mi = ledger.merkleInfo(txn.get(F.seqNo.name)) diff --git a/plenum/server/domain_req_handler.py b/plenum/server/domain_req_handler.py index ffd5e3adec..2525f1500a 100644 --- a/plenum/server/domain_req_handler.py +++ b/plenum/server/domain_req_handler.py @@ -48,10 +48,19 @@ def _reqToTxn(self, req: Request): def apply(self, req: Request): txn = self._reqToTxn(req) - (start, end), _ = self.ledger.appendTxns([txn]) + (start, end), _ = self.ledger.appendTxns([self.transform_txn_for_ledger(txn)]) self.updateState(txnsWithSeqNo(start, end, [txn])) return txn + @staticmethod + def transform_txn_for_ledger(txn): + """ + Some transactions need to be updated before they can be stored in the + ledger, eg. storing certain payload in another data store and only its + hash in the ledger + """ + return txn + def updateState(self, txns, isCommitted=False): for txn in txns: self._updateStateWithSingleTxn(txn, isCommitted=isCommitted) diff --git a/plenum/server/node.py b/plenum/server/node.py index 288c509fce..e19a373201 100644 --- a/plenum/server/node.py +++ b/plenum/server/node.py @@ -23,7 +23,7 @@ TARGET_NYM, ROLE, STEWARD, NYM, VERKEY, OP_FIELD_NAME, CLIENT_STACK_SUFFIX, \ CLIENT_BLACKLISTER_SUFFIX, NODE_BLACKLISTER_SUFFIX, \ NODE_PRIMARY_STORAGE_SUFFIX, NODE_HASH_STORE_SUFFIX, HS_FILE, DATA, ALIAS, \ - NODE_IP, HS_LEVELDB, POOL_LEDGER_ID, DOMAIN_LEDGER_ID + NODE_IP, HS_LEVELDB, POOL_LEDGER_ID, DOMAIN_LEDGER_ID, LedgerState from plenum.common.exceptions import SuspiciousNode, SuspiciousClient, \ MissingNodeOp, InvalidNodeOp, InvalidNodeMsg, InvalidClientMsgType, \ InvalidClientOp, InvalidClientRequest, BaseExc, \ @@ -39,7 +39,7 @@ from plenum.common.roles import Roles from plenum.common.signer_simple import SimpleSigner from plenum.common.stacks import nodeStackClass, clientStackClass -from plenum.common.startable import Status, Mode, LedgerState +from plenum.common.startable import Status, Mode from plenum.common.throttler import Throttler from plenum.common.txn_util import getTxnOrderedFields from plenum.common.types import Propagate, \ @@ -1200,7 +1200,7 @@ def postToNodeInBox(self, msg, frm): :param msg: a node message :param frm: the name of the node that sent this `msg` """ - logger.debug("{} appending to nodeinxbox {}".format(self, msg)) + logger.debug("{} appending to nodeInbox {}".format(self, msg)) self.nodeInBox.append((msg, frm)) async def processNodeInBox(self): @@ -1351,17 +1351,24 @@ async def processClientInBox(self): def postPoolLedgerCaughtUp(self, **kwargs): self.mode = Mode.discovered - self.ledgerManager.setLedgerCanSync(DOMAIN_LEDGER_ID, True) - # Node has discovered other nodes now sync up domain ledger - for nm in self.nodestack.connecteds: - self.sendDomainLedgerStatus(nm) - self.ledgerManager.processStashedLedgerStatuses(DOMAIN_LEDGER_ID) + # The node might have discovered more nodes, so see if schedule + # election if needed. if isinstance(self.poolManager, TxnPoolManager): self.checkInstances() # Initialising node id in case where node's information was not present # in pool ledger at the time of starting, happens when a non-genesis # node starts self.id + self.catchup_next_ledger_after_pool() + + def catchup_next_ledger_after_pool(self): + self.start_domain_ledger_sync() + + def start_domain_ledger_sync(self): + self.ledgerManager.setLedgerCanSync(DOMAIN_LEDGER_ID, True) + for nm in self.nodestack.connecteds: + self.sendDomainLedgerStatus(nm) + self.ledgerManager.processStashedLedgerStatuses(DOMAIN_LEDGER_ID) def postDomainLedgerCaughtUp(self, **kwargs): """ @@ -1375,13 +1382,14 @@ def postTxnFromCatchupAddedToLedger(self, ledgerId: int, txn: Any): self.reqsFromCatchupReplies.add((txn.get(f.IDENTIFIER.nm), txn.get(f.REQ_ID.nm))) - rh = self.postTxnFromCatchup(ledgerId, txn) + rh = self.postRecvTxnFromCatchup(ledgerId, txn) if rh: rh.updateState([txn], isCommitted=True) state = self.getState(ledgerId) state.commit(rootHash=state.headHash) + self.updateSeqNoMap([txn]) - def postTxnFromCatchup(self, ledgerId: int, txn: Any): + def postRecvTxnFromCatchup(self, ledgerId: int, txn: Any): rh = None if ledgerId == POOL_LEDGER_ID: self.poolManager.onPoolMembershipChange(txn) @@ -1394,6 +1402,7 @@ def postTxnFromCatchup(self, ledgerId: int, txn: Any): def allLedgersCaughtUp(self): self.mode = Mode.participating self.processStashedOrderedReqs() + # TODO: next line not needed self.checkInstances() def getLedger(self, ledgerId): @@ -1737,11 +1746,16 @@ def quorum(self) -> int: """ return (2 * self.f) + 1 - def primaryFound(self): + def primary_found(self): # If the node has primary replica of master instance self.monitor.hasMasterPrimary = self.primaryReplicaNo == 0 + self.process_reqs_stashed_for_primary() - def canViewChange(self, proposedViewNo: int) -> bool: + @property + def all_instances_have_primary(self): + return all(r.primaryName is not None for r in self.replicas) + + def canViewChange(self, proposedViewNo: int) -> (bool, str): """ Return whether there's quorum for view change for the proposed view number and its view is less than or equal to the proposed view @@ -1860,15 +1874,24 @@ def updateSeqNoMap(self, committedTxns): self.seqNoDB.addBatch((txn[f.IDENTIFIER.nm], txn[f.REQ_ID.nm], txn[F.seqNo.name]) for txn in committedTxns) + def commitAndSendReplies(self, reqHandler, ppTime, reqs: List[Request], + stateRoot, txnRoot) -> List: + committedTxns = reqHandler.commit(len(reqs), stateRoot, txnRoot) + self.updateSeqNoMap(committedTxns) + committedTxns = txnsWithMerkleInfo(reqHandler.ledger, + committedTxns) + self.sendRepliesToClients( + map(self.update_txn_with_extra_data, committedTxns), + ppTime) + return committedTxns + def executeDomainTxns(self, ppTime, reqs: List[Request], stateRoot, txnRoot) -> List: - committedTxns = self.reqHandler.commit(len(reqs), stateRoot, txnRoot) - self.updateSeqNoMap(committedTxns) + committedTxns = self.commitAndSendReplies(self.reqHandler, ppTime, reqs, + stateRoot, txnRoot) for txn in committedTxns: if txn[TXN_TYPE] == NYM: self.addNewRole(txn) - committedTxns = txnsWithMerkleInfo(self.reqHandler.ledger, committedTxns) - self.sendRepliesToClients(committedTxns, ppTime) return committedTxns def onBatchCreated(self, ledgerId, stateRoot): @@ -2143,6 +2166,8 @@ def send(self, msg: Any, *rids: Iterable[int], signer: Signer = None): self.nodestack.send(msg, *rids, signer=signer) def getReplyFromLedger(self, ledger, request): + # DoS attack vector, client requesting already processed request id + # results in iterating over ledger (or its subset) seqNo = self.seqNoDB.get(request.identifier, request.reqId) if seqNo: txn = ledger.getBySeqNo(int(seqNo)) @@ -2150,8 +2175,23 @@ def getReplyFromLedger(self, ledger, request): txn = ledger.get(identifier=request.identifier, reqId=request.reqId) if txn: txn.update(ledger.merkleInfo(txn.get(F.seqNo.name))) + txn = self.update_txn_with_extra_data(txn) return Reply(txn) + def update_txn_with_extra_data(self, txn): + """ + All the data of the transaction might not be stored in ledger so the + extra data that is omitted from ledger needs to be fetched from the + appropriate data store + :param txn: + :return: + """ + # All the data of any transaction is stored in the ledger + return txn + + def transform_txn_for_ledger(self, txn): + return self.reqHandler.transform_txn_for_ledger(txn) + def __enter__(self): return self diff --git a/plenum/server/primary_elector.py b/plenum/server/primary_elector.py index e0a3fa3003..b4538134e3 100644 --- a/plenum/server/primary_elector.py +++ b/plenum/server/primary_elector.py @@ -394,7 +394,7 @@ def processPrimary(self, prim: Primary, sender: str) -> None: if self.replicaNominatedForItself == instId: self.replicaNominatedForItself = None - self.node.primaryFound() + self.node.primary_found() self.scheduleElection() else: diff --git a/plenum/server/propagator.py b/plenum/server/propagator.py index 6612e03b29..cb614c486e 100644 --- a/plenum/server/propagator.py +++ b/plenum/server/propagator.py @@ -1,4 +1,5 @@ from collections import OrderedDict +from collections import deque from typing import Dict, Tuple, Union import weakref @@ -84,13 +85,19 @@ def votes(self, req) -> int: votes = 0 return votes - def canForward(self, req: Request, requiredVotes: int) -> bool: + def canForward(self, req: Request, requiredVotes: int) -> (bool, str): """ Check whether the request specified is eligible to be forwarded to the protocol instances. """ state = self[req.key] - return not state.forwarded and state.isFinalised(requiredVotes) + if state.forwarded: + msg = 'already forwarded' + elif not state.isFinalised(requiredVotes): + msg = 'not finalised' + else: + msg = None + return not bool(msg), msg def hasPropagated(self, req: Request, sender: str) -> bool: """ @@ -111,6 +118,11 @@ def digest(self, reqKey: Tuple) -> str: class Propagator: def __init__(self): self.requests = Requests() + # If the node does not have any primary and at least one protocol + # instance is missing a primary then add the request in + # `reqs_stashed_for_primary`. Note that this does not prevent the + # request from being processed as its marked as finalised + self.reqs_stashed_for_primary = deque() # noinspection PyUnresolvedReferences def propagate(self, request: Request, clientName): @@ -124,8 +136,8 @@ def propagate(self, request: Request, clientName): else: self.requests.addPropagate(request, self.name) # Only propagate if the node is participating in the consensus - # process - # which happens when the node has completed the catchup process + # process which happens when the node has completed the + # catchup process. QUESTION: WHY? if self.isParticipating: propagate = self.createPropagate(request, clientName) logger.display("{} propagating {} request {} from client {}". @@ -153,7 +165,7 @@ def createPropagate(request: Union[Request, dict], identifier) -> Propagate: return Propagate(request, identifier) # noinspection PyUnresolvedReferences - def canForward(self, request: Request) -> bool: + def canForward(self, request: Request) -> (bool, str): """ Determine whether to forward client REQUESTs to replicas, based on the following logic: @@ -179,13 +191,16 @@ def forward(self, request: Request): :param request: the REQUEST to propagate """ key = request.key - for idx, repQueue in enumerate(self.msgsToReplicas): - if self.primaryReplicaNo == idx: - # req = weakref.ref(self.requests[key].finalised) - req = self.requests[key].finalised - repQueue.append(req) - logger.debug("{} forwarding client request {} to its replicas". - format(self, key)) + fin_req = self.requests[key].finalised + if self.primaryReplicaNo is not None: + self.msgsToReplicas[self.primaryReplicaNo].append(fin_req) + logger.debug("{} forwarding client request {} to replica {}". + format(self, key, self.primaryReplicaNo)) + elif not self.all_instances_have_primary: + logger.debug('{} stashing request {} since at least one replica ' + 'lacks primary'.format(self, key)) + self.reqs_stashed_for_primary.append(fin_req) + self.monitor.requestUnOrdered(*key) self.requests.flagAsForwarded(request, len(self.msgsToReplicas)) @@ -209,11 +224,27 @@ def tryForwarding(self, request: Request): See the method `canForward` for the conditions to check before forwarding a request. """ - if self.canForward(request): + r, msg = self.canForward(request) + if r: # If haven't got the client request(REQUEST) for the corresponding # propagate request(PROPAGATE) but have enough propagate requests # to move ahead self.forward(request) else: - logger.trace("{} cannot yet forward request {} to its replicas". - format(self, request)) + logger.trace("{} not forwarding request {} to its replicas " + "since {}".format(self, request, msg)) + + def process_reqs_stashed_for_primary(self): + if self.reqs_stashed_for_primary: + if self.primaryReplicaNo is not None: + self.msgsToReplicas[self.primaryReplicaNo].extend( + self.reqs_stashed_for_primary) + logger.debug("{} forwarding stashed {} client requests to " + "replica {}". + format(self, len(self.reqs_stashed_for_primary), + self.primaryReplicaNo)) + elif not self.all_instances_have_primary: + return + # Either the stashed requests have been given to a primary or this + # node does not have a primary, so clear the queue + self.reqs_stashed_for_primary.clear() diff --git a/plenum/test/client/test_client_retry.py b/plenum/test/client/test_client_retry.py index c9577b950a..b30eed9be6 100644 --- a/plenum/test/client/test_client_retry.py +++ b/plenum/test/client/test_client_retry.py @@ -2,11 +2,12 @@ from functools import partial import pytest +import time from stp_core.loop.eventually import eventually, eventuallyAll from plenum.common.request import Request from plenum.common.types import Reply, RequestNack -from plenum.test.helper import sendRandomRequest, checkReqAck, waitReplyCount +from plenum.test.helper import sendRandomRequest, checkReqAck, wait_for_replies from plenum.test import waits whitelist = ['AlphaC unable to send message', ] @@ -47,13 +48,13 @@ def chkAcks(): timeout = waits.expectedReqAckQuorumTime() looper.run(eventually(chkAcks, retryWait=1, timeout=timeout)) idr, reqId = req.key - waitReplyCount(looper, client1, idr, reqId, 4) + wait_for_replies(looper, client1, idr, reqId, 4) def testClientRetryRequestWhenReplyNotReceived(looper, nodeSet, client1, wallet1, tconf): """ - A node say Alpha sends ACK but doesn't send REPLY. The connect resends the + A node say Alpha sends ACK but doesn't send REPLY. The client resends the request and gets REPLY """ @@ -72,10 +73,18 @@ def skipReplyOnce(msg, remoteName): req = sendRandomRequest(wallet1, client1) coros = [partial(checkReqAck, client1, node, *req.key) for node in nodeSet] timeout = waits.expectedReqAckQuorumTime() + start = time.perf_counter() looper.run(eventuallyAll(*coros, retryWait=.5, totalTimeout=timeout)) idr, reqId = req.key - waitReplyCount(looper, client1, idr, reqId, 3) - waitReplyCount(looper, client1, idr, reqId, 4) + # Client should get only 3 replies till the retry timeout since one node + # is not sending any replies + wait_for_replies(looper, client1, idr, reqId, 3, + custom_timeout=tconf.CLIENT_REPLY_TIMEOUT-1) + end = time.perf_counter() + # Client should wait till the retry timeout but after that should + # get the reply from the remaining node + looper.runFor(tconf.CLIENT_REPLY_TIMEOUT-(end-start)) + wait_for_replies(looper, client1, idr, reqId, 4) def testClientNotRetryRequestWhenReqnackReceived(looper, nodeSet, client1, @@ -117,7 +126,7 @@ def onlyTransNack(msg, remoteName): assert client1.spylog.count(client1.resendRequests.__name__) == totalResends idr, reqId = req.key - waitReplyCount(looper, client1, idr, reqId, 3) + wait_for_replies(looper, client1, idr, reqId, 3) alpha.clientMsgRouter.routes[Request] = origProcReq alpha.transmitToClient = origTrans @@ -172,7 +181,7 @@ def dontTransmitReply(msg, remoteName): looper.runFor(timeout) idr, reqId = req.key - waitReplyCount(looper, client1, idr, reqId, 3) + wait_for_replies(looper, client1, idr, reqId, 3) assert client1.spylog.count(client1.resendRequests.__name__) == \ (totalResends + withFewerRetryReq.CLIENT_MAX_RETRY_REPLY) diff --git a/plenum/test/conftest.py b/plenum/test/conftest.py index 02ca911c22..970560f02a 100644 --- a/plenum/test/conftest.py +++ b/plenum/test/conftest.py @@ -161,7 +161,8 @@ def getValueFromModule(request, name: str, default: Any = None): PLUGIN_TYPE_STATS_CONSUMER: "stats_consumer" }, 'EnsureLedgerDurability': False, - 'Max3PCBatchSize': 1 + 'Max3PCBatchSize': 1, + 'DELTA': .8 } diff --git a/plenum/test/delayers.py b/plenum/test/delayers.py index 550162be82..5c3e23f4fd 100644 --- a/plenum/test/delayers.py +++ b/plenum/test/delayers.py @@ -1,7 +1,8 @@ from typing import Iterable from plenum.common.types import f, Propagate, PrePrepare, \ - Prepare, Commit, InstanceChange, LedgerStatus, ConsistencyProof, CatchupReq + Prepare, Commit, InstanceChange, LedgerStatus, ConsistencyProof, CatchupReq, \ + Nomination from plenum.common.constants import OP_FIELD_NAME from plenum.common.util import getCallableName from plenum.test.test_client import TestClient @@ -62,6 +63,11 @@ def inner(action_pair): return inner +def nom_delay(delay: float): + # Delayer of NOMINATE requests + return delayerMsgTuple(delay, Nomination) + + def ppgDelay(delay: float): # Delayer of PROPAGATE requests return delayerMsgTuple(delay, Propagate) diff --git a/plenum/test/helper.py b/plenum/test/helper.py index 7174f0f0eb..0cde3fba16 100644 --- a/plenum/test/helper.py +++ b/plenum/test/helper.py @@ -4,7 +4,7 @@ import string from _signal import SIGINT from functools import partial -from itertools import permutations +from itertools import permutations, combinations from shutil import copyfile from sys import executable from time import sleep @@ -424,9 +424,9 @@ def checkReplyCount(client, idr, reqId, count): assertLength(senders, count) -def waitReplyCount(looper, client, idr, reqId, count): - numOfNodes = len(client.nodeReg) - timeout = waits.expectedTransactionExecutionTime(numOfNodes) +def wait_for_replies(looper, client, idr, reqId, count, custom_timeout=None): + timeout = custom_timeout or waits.expectedTransactionExecutionTime( + len(client.nodeReg)) looper.run(eventually(checkReplyCount, client, idr, reqId, count, timeout=timeout)) @@ -588,7 +588,7 @@ def checkLedgerEquality(ledger1, ledger2): def checkAllLedgersEqual(*ledgers): - for l1, l2 in permutations(ledgers, 2): + for l1, l2 in combinations(ledgers, 2): checkLedgerEquality(l1, l2) @@ -598,9 +598,10 @@ def checkStateEquality(state1, state2): assertEquality(state1.committedHead, state2.committedHead) -def checkAllStatesEqual(*states): - for s1, s2 in permutations(states, 2): - checkStateEquality(s1, s2) +def check_seqno_db_equality(db1, db2): + assert db1.size == db2.size + assert {bytes(k): bytes(v) for k, v in db1._keyValueStorage.iter()} == \ + {bytes(k): bytes(v) for k, v in db2._keyValueStorage.iter()} def randomText(size): diff --git a/plenum/test/node_catchup/helper.py b/plenum/test/node_catchup/helper.py index 12c3754097..a106e6c7fd 100644 --- a/plenum/test/node_catchup/helper.py +++ b/plenum/test/node_catchup/helper.py @@ -4,7 +4,8 @@ from plenum.common.constants import POOL_LEDGER_ID, DOMAIN_LEDGER_ID from stp_core.loop.eventually import eventually from stp_core.types import HA -from plenum.test.helper import checkLedgerEquality, checkStateEquality +from plenum.test.helper import checkLedgerEquality, checkStateEquality, \ + check_seqno_db_equality from plenum.test.test_client import TestClient from plenum.test.test_node import TestNode from plenum.test import waits @@ -19,6 +20,7 @@ def checkNodeDataForEquality(node: TestNode, *otherNodes: Iterable[TestNode]): # Checks for node's ledgers and state's to be equal for n in otherNodes: + check_seqno_db_equality(node.seqNoDB, n.seqNoDB) checkLedgerEquality(node.domainLedger, n.domainLedger) checkStateEquality(node.getState(DOMAIN_LEDGER_ID), n.getState(DOMAIN_LEDGER_ID)) if n.poolLedger: diff --git a/plenum/test/node_catchup/test_node_reject_invalid_txn_during_catchup.py b/plenum/test/node_catchup/test_node_reject_invalid_txn_during_catchup.py index 4f89b19691..1a007980c5 100644 --- a/plenum/test/node_catchup/test_node_reject_invalid_txn_during_catchup.py +++ b/plenum/test/node_catchup/test_node_reject_invalid_txn_during_catchup.py @@ -1,8 +1,6 @@ import types -from base64 import b64encode - -import pytest +from plenum.common.ledger import Ledger from stp_core.common.log import getlogger from plenum.common.constants import TXN_TYPE, DOMAIN_LEDGER_ID from plenum.common.types import CatchupReq, f, CatchupRep @@ -49,7 +47,7 @@ def sendIncorrectTxns(self, req, frm): # Since the type of random request is `buy` if txns[seqNo].get(TXN_TYPE) == "buy": txns[seqNo][TXN_TYPE] = "randomtype" - consProof = [b64encode(p).decode() for p in + consProof = [Ledger.hashToStr(p) for p in ledger.tree.consistency_proof(end, ledger.size)] self.sendTo(msg=CatchupRep(getattr(req, f.LEDGER_ID.nm), txns, consProof), to=frm) diff --git a/plenum/test/node_catchup/test_node_request_consistency_proof.py b/plenum/test/node_catchup/test_node_request_consistency_proof.py index 1d38d62def..746f4cc56c 100644 --- a/plenum/test/node_catchup/test_node_request_consistency_proof.py +++ b/plenum/test/node_catchup/test_node_request_consistency_proof.py @@ -1,9 +1,9 @@ -import base64 import types from random import randint import pytest +from plenum.common.ledger import Ledger from stp_core.loop.eventually import eventually from stp_core.common.log import getlogger from plenum.common.types import LedgerStatus @@ -46,8 +46,8 @@ def sendDLStatus(self, name): while newSize in sentSizes: newSize = randint(1, size) print("new size {}".format(newSize)) - newRootHash = base64.b64encode( - self.domainLedger.tree.merkle_tree_hash(0, newSize)).decode() + newRootHash = Ledger.hashToStr( + self.domainLedger.tree.merkle_tree_hash(0, newSize)) ledgerStatus = LedgerStatus(1, newSize, newRootHash) diff --git a/plenum/test/node_catchup/test_node_request_missing_transactions.py b/plenum/test/node_catchup/test_node_request_missing_transactions.py index 1d853a19ab..11cc90f440 100644 --- a/plenum/test/node_catchup/test_node_request_missing_transactions.py +++ b/plenum/test/node_catchup/test_node_request_missing_transactions.py @@ -30,7 +30,7 @@ def reset(): request.addfinalizer(reset) return conf -@pytest.mark.skip(reason='SOV-1020') + def testNodeRequestingTxns(txnPoolNodeSet, nodeCreatedAfterSomeTxns): """ A newly joined node is catching up and sends catchup requests to other diff --git a/plenum/test/node_request/test_no_forwarding_without_election.py b/plenum/test/node_request/test_no_forwarding_without_election.py new file mode 100644 index 0000000000..9ebd603dea --- /dev/null +++ b/plenum/test/node_request/test_no_forwarding_without_election.py @@ -0,0 +1,50 @@ +from plenum.test import waits +from plenum.test.delayers import nom_delay +from plenum.test.helper import sendRandomRequests, \ + waitForSufficientRepliesForRequests, sendReqsToNodesAndVerifySuffReplies +from plenum.test.pool_transactions.conftest import looper, clientAndWallet1, \ + client1, wallet1, client1Connected +from plenum.test.test_node import ensureElectionsDone +from plenum.test.view_change.helper import ensure_view_change +from stp_core.loop.eventually import eventually + + +def test_node_stashes_requests_if_no_primary(looper, txnPoolNodeSet, client1, + wallet1, client1Connected): + """ + Node stashes requests while no primary is present, but once primary is + determined, the stashed requests are processed + """ + def chk_stashed(stashed): + for node in txnPoolNodeSet: + assert (len(node.reqs_stashed_for_primary) == 0) != stashed + + # No requests are stashed before and after sending any requests + chk_stashed(False) + sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, 2) + chk_stashed(False) + + delay = 3 + for node in txnPoolNodeSet: + node.nodeIbStasher.delay(nom_delay(delay)) + + # Ensure view change and soon as view starts, send requests + ensure_view_change(looper, txnPoolNodeSet, client1, wallet1) + + reqs = sendRandomRequests(wallet1, client1, 2) + + # The above requests must be stashed + looper.run(eventually(chk_stashed, True, retryWait=.1, + timeout=3)) + + # The elections must complete for the new view, though the election would + # take longer since nominates are delayed. The calculation below is approx. + timeout = waits.expectedPoolElectionTimeout(len(txnPoolNodeSet)) + \ + delay*(len(txnPoolNodeSet)) + ensureElectionsDone(looper, txnPoolNodeSet, customTimeout=timeout) + + # The requests should be successful + waitForSufficientRepliesForRequests(looper, client1, requests=reqs) + + # No requests should be stashed in propagator. + chk_stashed(False) diff --git a/plenum/test/script/test_change_non_primary_node_ha.py b/plenum/test/script/test_change_non_primary_node_ha.py index d0e6933fad..98c95b54e1 100644 --- a/plenum/test/script/test_change_non_primary_node_ha.py +++ b/plenum/test/script/test_change_non_primary_node_ha.py @@ -13,7 +13,7 @@ 'got error while verifying message'] -@pytest.mark.skipif('sys.platform == "win32"', reason='SOV-330') +@pytest.mark.skip(reason='SOV-330') def testChangeNodeHaForNonPrimary(looper, txnPoolNodeSet, tdirWithPoolTxns, poolTxnData, poolTxnStewardNames, tconf): diff --git a/plenum/test/script/test_change_primary_node_ha.py b/plenum/test/script/test_change_primary_node_ha.py index a2bcea0df1..bb9407f847 100644 --- a/plenum/test/script/test_change_primary_node_ha.py +++ b/plenum/test/script/test_change_primary_node_ha.py @@ -12,7 +12,8 @@ 'conflicting address', 'unable to send message', 'got error while verifying message'] -@pytest.mark.skipif('sys.platform == "win32"', reason='SOV-330') + +@pytest.mark.skip(reason='SOV-330') def testChangeNodeHaForPrimary(looper, txnPoolNodeSet, tdirWithPoolTxns, poolTxnData, poolTxnStewardNames, tconf): changeNodeHa(looper, diff --git a/plenum/test/test_verif_merkle_proof.py b/plenum/test/test_verif_merkle_proof.py index 69ef0f84c4..cd200b1d83 100644 --- a/plenum/test/test_verif_merkle_proof.py +++ b/plenum/test/test_verif_merkle_proof.py @@ -6,13 +6,11 @@ from plenum.test.test_client import TestClient -@pytest.mark.skip("merkle proofs not being sent") def testMerkleProofForFirstLeaf(client1: TestClient, replied1): replies = client1.getRepliesFromAllNodes(*replied1.key).values() assert Client.verifyMerkleProof(*replies) -@pytest.mark.skip("merkle proofs not being sent") def testMerkleProofForNonFirstLeaf(looper, nodeSet, wallet1, client1, replied1): req2 = sendRandomRequest(wallet1, client1) waitForSufficientRepliesForRequests(looper, client1, requests=[req2]) diff --git a/plenum/test/view_change/helper.py b/plenum/test/view_change/helper.py index be1438d730..d62a9a5839 100644 --- a/plenum/test/view_change/helper.py +++ b/plenum/test/view_change/helper.py @@ -1,4 +1,7 @@ -from plenum.test.helper import checkViewNoForNodes, sendRandomRequests +import types + +from plenum.test.helper import checkViewNoForNodes, sendRandomRequests, \ + sendReqsToNodesAndVerifySuffReplies from stp_core.common.log import getlogger from stp_core.loop.eventually import eventually from plenum.test import waits @@ -39,3 +42,27 @@ def provoke_and_wait_for_view_change(looper, client, timeout=timeout)) + +def ensure_view_change(looper, nodes, client, wallet): + sendReqsToNodesAndVerifySuffReplies(looper, wallet, client, 2) + old_view_no = checkViewNoForNodes(nodes) + + old_meths = {} + view_changes = {} + for node in nodes: + old_meths[node.name] = node.monitor.isMasterDegraded + view_changes[node.name] = node.monitor.totalViewChanges + + def slow_master(self): + # Only allow one view change + return self.totalViewChanges == view_changes[self.name] + + node.monitor.isMasterDegraded = types.MethodType(slow_master, node.monitor) + + timeout = waits.expectedPoolViewChangeStartedTimeout(len(nodes)) + \ + client.config.PerfCheckFreq + looper.run(eventually(checkViewNoForNodes, nodes, old_view_no+1, + retryWait=1, timeout=timeout)) + for node in nodes: + node.monitor.isMasterDegraded = old_meths[node.name] + return old_view_no + 1 diff --git a/setup.py b/setup.py index 632a5bcf5c..6bf81e1c9f 100644 --- a/setup.py +++ b/setup.py @@ -58,7 +58,7 @@ data_files=[( (BASE_DIR, ['data/pool_transactions_sandbox', ]) )], - install_requires=['ledger-dev==0.2.12', 'stp-dev==0.1.48', + install_requires=['ledger-dev==0.2.14', 'stp-dev==0.1.48', 'state-trie==0.1.14', 'jsonpickle', 'prompt_toolkit==0.57', 'pygments', 'ioflo==1.5.4', 'semver', 'base58', 'orderedset',