diff --git a/docs/catchup.md b/docs/catchup.md new file mode 100644 index 0000000000..bdac470944 --- /dev/null +++ b/docs/catchup.md @@ -0,0 +1,37 @@ +# Ledger catchup + +A node uses a process called `catchup` to sync its ledgers with other nodes. It does this process after +- starting up: Any node communicates state of its ledgers to any other node it connects to and learns whether is ahead or behind or at same state as others +- after a view change: After a view change starts, nodes again communicate state of their ledgers to other nodes. +- if it realises that it has missed some transactions: Nodes periodically send checkpoints indicating how many transactions they have processed +recently, if a node finds out that is missed some txns, it will perform catchup. + +The catchup is managed by a object called `LedgerManager`. The `LedgerManager` maintains a `LedgerInfo` object for each ledger and references each ledger by its unique id called `ledger_id`. +When a `ledger` is initialised, `addLedger` method of `LedgerManager` is called; this method registers the ledger with the `LedgerManager`. +`addLedger` also accepts callbacks which are called on occurence of different events, like before/after starting catchup for a ledger, +before/after marking catchup complete for a ledger, after adding any transaction that was received in catchup to the ledger. +The `LedgerManager` performs catchup of each ledger sequentially, which means unless catchup for one ledger is complete, catchup for other will not start. +This is mostly done for simplicity and can be optimised but the pool ledger needs to be caught up first as the pool ledger knows how many nodes are there +in the network. Catchup for any ledger involves these steps: +- Learn the correct state (how many txns, merkle roots) of the ledger by using `LedgerStatus` messages from other nodes. +- Once sufficient (`Quorums.ledger_status`) and consistent `LedgerStatus` messages are received so that the node knows the latest state of the ledger, if it finds it ledger to be +latest, it marks the ledger catchup complete otherwise wait for `ConsistencyProof` messages from other nodes until a timeout. +- When a node receives a `LedgerStatus` and it finds the sending node's ledger behind, it sends a `ConsistencyProof` message. This message is like a +merkle subset proof of the sending node's ledger and the receiving node's ledger, eg. if the sending node A's ledger has size 20 and merkle root `x` and +the receiving node B's size is 30 with merkle root `y`, B sends a proof to A that A's ledger with size 20 and root `x` is a subset of B's ledger with size +30 and root `y`. +- After receiving a `ConsistencyProof`, the node verifies it. +- Once the node that is catching up has sufficient (`Quorums.consistency_proof`) and consistent `ConsistencyProof` messages, it knows how many transactions it needs to catchup and +then requests those transactions from other nodes by equally distributing the load. eg if a node has to catchup 1000 txns and there are 5 other nodes in the +network, then the node will request 200 txns from each. The node catching up sends a `CatchupReq` message to other nodes and expects a corresponding `CatchupRep` + +Apart from this if the node does not receive sufficient or consistent `ConsistencyProof`s under a timeout, it requests them using `request_CPs_if_needed`. +Similarly if the node does not receive sufficient or consistent `CatchupRep`s under a timeout, it requests them using `request_txns_if_needed`. +These timeouts can be configured using `ConsistencyProofsTimeout` and `CatchupTransactionsTimeout` respectively from the config file + + +Relevant code: +- LedgerManager: `plenum/common/ledger_manager.py` +- LedgerInfo: `plenum/common/ledger_info.py` +- Catchup message types: `plenum/common/messages/node_messages.py` +- Catchup quorum values: `plenum/server/quorums.py` diff --git a/docs/main.md b/docs/main.md new file mode 100644 index 0000000000..dc80b78511 --- /dev/null +++ b/docs/main.md @@ -0,0 +1,2 @@ +Outline of the system +Node \ No newline at end of file diff --git a/docs/request_handling.md b/docs/request_handling.md new file mode 100644 index 0000000000..657449b582 --- /dev/null +++ b/docs/request_handling.md @@ -0,0 +1,57 @@ +# Request handling + +To handle requests sent by client, the nodes require a ledger and/or a state as a data store. Request handling logic is written in `RequestHandler`. +`RequestHandler` is extended by new classes to support new requests. A node provides methods `register_new_ledger` to register new `Ledger` and `register_req_handler` to register new `RequestHandler`s. +There can be a callback registered to execute after a write request has been successfully executed using `register_executer`. + + +There are 2 types of requests a client can send: +- Query: + Here the client is requesting transactions or some state variables from the node(s). The client can either send a `GET_TXN` to get any transaction with a sequence number from any ledger. + Or it can send specific `GET_*` transactions for queries +- Write: + Here the client is asking the nodes to write a new transaction to the ledger and change some state variable. This requires the nodes to run a consensus protocol (currently RBFT). + If the protocol run is successful, then the client's proposed changes are written. + + +#### Request handling +Below is a description of how a request is processed. +A node on receiving a client request in `validateClientMsg`: +- The node performs static validation checks (validation that does not require any state, like mandatory fields are present, etc), it uses `ClientMessageValidator` and + `RequestHandler`'s `doStaticValidation` for this. +- If the static validation passes, it checks if the signature check is required (not required for queries) and does that if needed in `verifySignature`. More on this later. +- Checks if it's a generic transaction query (`GET_TXN`). If it is then query the ledger for that particular sequence number and return the result. A `REQNACK` might be sent if the query is not correctly constructed. +- Checks if it's a specific query (needs a specific `RequestHandler`), if it is then it uses `process_query` that uses the specific `RequestHandler` to respond to the query. A `REQNACK` might be sent if the query is not correctly constructed. +- If it is a write, then node checks if it has already processed the request before by checking the uniqueness of `identifier` and `reqId` fields of the Request. + - If it has already processed the request, then it sends the corresponding `Reply` to the client + - If the `Request` is already in process, then it sends an acknowledgement to the client as a `REQACK` + - If the node has not seen the `Request` before it broadcasts the `Request` to all nodes in a `PROPAGATE`. + - Once a node receives sufficient (`Quorums.propagate`) `PROPAGATE`s for a request, it forwards the request to its replicas. + - A primary replica on receiving a forwarded request does dynamic validation (requiring state, like if violating some unique constraint or doing an unauthorised action) + on the request using `doDynamicValidation` which in turn calls the corresponding `RequestHandler`'s `validate` method. If the validation succeeds then + `apply` method of `RequestHandler` is called which optimistically applies the changes to ledger and state. + If the validation fails, the primary sends a `REJECT` to the client. Then the primary sends a `PRE-PREPARE` to other nodes which contains the merkle roots and some other information for all such requests. + - The non-primary replicas on receiving the `PRE-PREPARE` performs the same dynamic validation that the primary performed on each request. It also checks if the merkle roots match. + If all checks pass the replicas send a `PREPARE` otherwise they reject the `PRE-PREPARE` + - Once the consensus protocol is successfully executed on the request, the replicas send `ORDERED` message to its node and the node updates the monitor. + - If the `ORDERED` message above was sent by the master replica then the node executes the request; meaning they commit any changes made to the ledger and state by that + request by calling the corresponding `RequestHandler`'s `commit` method and send a `Reply` back to client. + - The node also tracks the request's `identifier` and `reqId` in a key value database in `updateSeqNoMap`. + + +#### Signature verification +Each node has a `ReqAuthenticator` object which allows to register `ClientAuthNr` objects using `register_authenticator` method. During signature verification, +a node runs each registered authenticator over the request and if any authenticator results in an exception then signature verification is considered failed. +A node has atleast 1 authenticator called `CoreAuthNr` whose `authenticate` method is called over the serialised request data to verify signature. + + +Relevant code: +- Node: `plenum/server/node.py` +- Replica: `plenum/server/replica.py` +- Propagator: `plenum/server/propagator.py` +- Request: `plenum/common/request.py` +- Request structure validation: `plenum/common/messages/client_request.py` +- RequestHandler: `plenum/server/req_handler.py` +- Request Authenticator: `plenum/server/req_authenticator.py` +- Core Authenticator: `plenum/server/client_authn.py` +- Quorums: `plenum/server/quorums.py` diff --git a/docs/storage.md b/docs/storage.md index 51b02c2f23..ce00222deb 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -17,8 +17,7 @@ Each new transaction is added to the ledger (log) and is also hashed (sha256) an results in a new merkle root. Thus for each transaction a merkle proof of presence, called `inclusion proof` or `audit_path` can be created by using the root hash a few (`O(lgn)`, `n` being the total number of leaves/transactions in the tree/ledger) intermediate hashes. -Hashes of all the -leaves and intermediate nodes of the tree are stored in a `HashStore`, enabling the creating of inclusion proofs and subset proofs. A subset proof +Hashes of all the leaves and intermediate nodes of the tree are stored in a `HashStore`, enabling the creating of inclusion proofs and subset proofs. A subset proof proves that a particular merkle tree is a subset of another (usually larger) merkle tree; more on this in the Catchup doc. The `HashStore` has 2 separate storages for storing leaves and intermediate nodes, each leaf or node of the tree is 32 bytes. Each of these storages can be a binary file or a key value store. The leaf or node hashes are queried by their number. When a client write request completes or it requests a transaction with a particular sequence number from a ledger, diff --git a/plenum/common/ledger_info.py b/plenum/common/ledger_info.py index 6a8536c4b0..f38f3c1cb0 100644 --- a/plenum/common/ledger_info.py +++ b/plenum/common/ledger_info.py @@ -45,6 +45,10 @@ def set_defaults(self): # process as completed self.ledgerStatusOk = set() + # Key of the 3PC-batch ordered by the master instance that contained + # the last transaction of this node's ledger + self.last_txn_3PC_key = None + # Dictionary of consistency proofs received for the ledger # in process of catching up # Key is the node name and value is a consistency proof @@ -79,6 +83,7 @@ def done_syncing(self): self.canSync = False self.state = LedgerState.synced self.ledgerStatusOk = set() + self.last_txn_3PC_key = None self.recvdConsistencyProofs = {} self.receivedCatchUpReplies = [] self.recvdCatchupRepliesFrm = {} diff --git a/plenum/common/ledger_manager.py b/plenum/common/ledger_manager.py index 5bc08677e0..0575a4bd5f 100644 --- a/plenum/common/ledger_manager.py +++ b/plenum/common/ledger_manager.py @@ -305,6 +305,13 @@ def processLedgerStatus(self, status: LedgerStatus, frm: str): # post sending this ledger status ledgerInfo.recvdConsistencyProofs[frm] = None ledgerInfo.ledgerStatusOk.add(frm) + + if self.isLedgerSame(ledgerStatus) \ + and ledgerStatus.viewNo is not None \ + and ledgerStatus.ppSeqNo is not None: + ledgerInfo.last_txn_3PC_key = \ + (ledgerStatus.viewNo, ledgerStatus.ppSeqNo) + if self.has_ledger_status_quorum( len(ledgerInfo.ledgerStatusOk), self.owner.totalNodes): logger.debug("{} found out from {} that its " @@ -316,14 +323,10 @@ def processLedgerStatus(self, status: LedgerStatus, frm: str): # If this node's ledger is same as the ledger status (which is # also the majority of the pool), then set the last ordered # 3PC key - key = (ledgerStatus.viewNo, ledgerStatus.ppSeqNo) self.do_pre_catchup(ledgerId) - if self.isLedgerSame(ledgerStatus) and key != (None, None): - # Any state cleaup that is part of pre-catchup should be - # done - self.catchupCompleted(ledgerId, key) - else: - self.catchupCompleted(ledgerId) + # Any state cleanup that is part of pre-catchup should be + # done + self.catchupCompleted(ledgerId, ledgerInfo.last_txn_3PC_key) else: # Ledger was already synced self.mark_ledger_synced(ledgerId) @@ -844,10 +847,9 @@ def startCatchUpProcess(self, ledgerId: int, proof: ConsistencyProof): .format(CATCH_UP_PREFIX, self, ledgerId)) def _getCatchupTimeout(self, numRequest, batchSize): - return numRequest * (self.config.CatchupTransactionsTimeout + - 0.1 * batchSize) + return numRequest * self.config.CatchupTransactionsTimeout - def catchupCompleted(self, ledgerId: int, last_3PC: Tuple=(0, 0)): + def catchupCompleted(self, ledgerId: int, last_3PC: Optional[Tuple]=None): if ledgerId not in self.ledgerRegistry: logger.error("{}{} called catchup completed for ledger {}". format(CATCH_UP_PREFIX, self, ledgerId)) @@ -856,7 +858,8 @@ def catchupCompleted(self, ledgerId: int, last_3PC: Tuple=(0, 0)): # Since multiple ledger will be caught up and catchups might happen # multiple times for a single ledger, the largest seen # ppSeqNo needs to be known. - if compare_3PC_keys(self.last_caught_up_3PC, last_3PC) > 0: + if last_3PC is not None \ + and compare_3PC_keys(self.last_caught_up_3PC, last_3PC) > 0: self.last_caught_up_3PC = last_3PC self.mark_ledger_synced(ledgerId) diff --git a/plenum/common/test_network_setup.py b/plenum/common/test_network_setup.py index 414e3dc8d2..df84355839 100644 --- a/plenum/common/test_network_setup.py +++ b/plenum/common/test_network_setup.py @@ -20,6 +20,9 @@ from stp_core.common.util import adict +CLIENT_CONNECTIONS_LIMIT = 15360 + + class TestNetworkSetup: @staticmethod def getNumberFromName(name: str) -> int: @@ -45,7 +48,8 @@ def writeNodeParamsFile(filePath, name, nPort, cPort): contents = [ 'NODE_NAME={}'.format(name), 'NODE_PORT={}'.format(nPort), - 'NODE_CLIENT_PORT={}'.format(cPort) + 'NODE_CLIENT_PORT={}'.format(cPort), + 'CLIENT_CONNECTIONS_LIMIT={}'.format(CLIENT_CONNECTIONS_LIMIT) ] with open(filePath, 'w') as f: f.writelines(os.linesep.join(contents)) diff --git a/plenum/common/txn_util.py b/plenum/common/txn_util.py index b7126538fe..286d3e5b61 100644 --- a/plenum/common/txn_util.py +++ b/plenum/common/txn_util.py @@ -1,4 +1,5 @@ from collections import OrderedDict +import json from ledger.genesis_txn.genesis_txn_file_util import create_genesis_txn_init_ledger from plenum.common.constants import TXN_TIME, TXN_TYPE, TARGET_NYM, ROLE, \ @@ -107,3 +108,34 @@ def idr_from_req_data(data): return data[f.IDENTIFIER.nm] else: return Request.gen_idr_from_sigs(data.get(f.SIGS.nm, {})) + + +def sdk_reqToTxn(sdk_req, cons_time=None): + """ + Transform a client request such that it can be stored in the ledger. + Also this is what will be returned to the client in the reply + + :param sdk_req: sdk request in str or dict type + :param cons_time: UTC epoch at which consensus was reached + :return: + """ + # TODO: we should not reformat transaction this way + # When refactor keep in mind thought about back compatibility + + if isinstance(sdk_req, dict): + data = sdk_req + elif isinstance(sdk_req, str): + data = json.loads(sdk_req) + else: + raise TypeError( + "Expected dict or str as input, but got: {}".format(type(sdk_req))) + + res = { + f.IDENTIFIER.nm: data[f.IDENTIFIER.nm], + f.REQ_ID.nm: data[f.REQ_ID.nm], + f.SIG.nm: data.get(f.SIG.nm, None), + f.SIGS.nm: data.get(f.SIGS.nm, None), + TXN_TIME: cons_time or data.get(TXN_TIME) + } + res.update(data[OPERATION]) + return res diff --git a/plenum/config.py b/plenum/config.py index 7bbbf028e4..8856de8b99 100644 --- a/plenum/config.py +++ b/plenum/config.py @@ -140,7 +140,12 @@ ConsistencyProofsTimeout = 5 # Timeout factor after which a node starts requesting transactions -CatchupTransactionsTimeout = 5 +# We assume, that making consistency proof + iterate over all transactions (getAllTxn) +# will take a little time (0.003 sec for making cp for 10 000 txns + +# 0.2 sec for getAllTxn for 10 000 txn) +# Therefore, node communication is the most cost operation +# Timeout for pool catchuping would be nodeCount * CatchupTransactionsTimeout +CatchupTransactionsTimeout = 6 # Log configuration diff --git a/plenum/server/node.py b/plenum/server/node.py index 02f2217c46..9a47f1fd31 100644 --- a/plenum/server/node.py +++ b/plenum/server/node.py @@ -1957,6 +1957,10 @@ def processRequest(self, request: Request, frm: str): ledgerId = self.ledger_id_for_request(request) ledger = self.getLedger(ledgerId) + if self.is_query(request.operation[TXN_TYPE]): + self.process_query(request, frm) + return + reply = self.getReplyFromLedger(ledger, request) if reply: logger.debug("{} returning REPLY from already processed " @@ -1964,10 +1968,7 @@ def processRequest(self, request: Request, frm: str): self.transmitToClient(reply, frm) return - if self.is_query(request.operation[TXN_TYPE]): - self.process_query(request, frm) - return - + # If the node is not already processing the request if not self.isProcessingReq(*request.key): self.startedProcessingReq(*request.key, frm) # If not already got the propagate request(PROPAGATE) for the @@ -2252,6 +2253,11 @@ def propose_view_change(self): 'so view change will not be proposed'.format(self)) return + if not self.isReady(): + logger.trace('{} The node is not ready yet ' + 'so view change will not be proposed'.format(self)) + return + disconnected_time = time.perf_counter() - self.lost_primary_at if disconnected_time >= self.config.ToleratePrimaryDisconnection: logger.info("{} primary has been disconnected for too long" diff --git a/plenum/server/quorums.py b/plenum/server/quorums.py index 5857bc645f..a034410386 100644 --- a/plenum/server/quorums.py +++ b/plenum/server/quorums.py @@ -27,7 +27,7 @@ def __init__(self, n): self.same_consistency_proof = Quorum(f + 1) self.consistency_proof = Quorum(f + 1) self.ledger_status = Quorum(n - f - 1) - self.checkpoint = Quorum(2 * f) + self.checkpoint = Quorum(n - f - 1) self.timestamp = Quorum(f + 1) self.bls_signatures = Quorum(n - f) self.observer_data = Quorum(f + 1) diff --git a/plenum/server/replica.py b/plenum/server/replica.py index 4a173a09de..c114e12dc4 100644 --- a/plenum/server/replica.py +++ b/plenum/server/replica.py @@ -1770,8 +1770,7 @@ def markCheckPointStable(self, seqNo): def checkIfCheckpointStable(self, key: Tuple[int, int]): ckState = self.checkpoints[key] - # TODO: what if len(ckState.receivedDigests) > 2 * f? - if len(ckState.receivedDigests) == self.quorums.checkpoint.value: + if self.quorums.checkpoint.is_reached(len(ckState.receivedDigests)): self.markCheckPointStable(ckState.seqNo) return True else: diff --git a/plenum/test/helper.py b/plenum/test/helper.py index f2081b3641..11eece3d4e 100644 --- a/plenum/test/helper.py +++ b/plenum/test/helper.py @@ -654,7 +654,7 @@ def checkViewNoForNodes(nodes: Iterable[TestNode], expectedViewNo: int = None): assert len(viewNos) == 1 vNo, = viewNos if expectedViewNo is not None: - assert vNo == expectedViewNo, ','.join(['{} -> Ratio: {}'.format( + assert vNo >= expectedViewNo, ','.join(['{} -> Ratio: {}'.format( node.name, node.monitor.masterThroughputRatio()) for node in nodes]) return vNo diff --git a/plenum/test/node_request/test_quorum_f_plus_2_nodes_off_and_on.py b/plenum/test/node_request/test_quorum_f_plus_2_nodes_off_and_on.py new file mode 100644 index 0000000000..34a3cc52a9 --- /dev/null +++ b/plenum/test/node_request/test_quorum_f_plus_2_nodes_off_and_on.py @@ -0,0 +1,80 @@ +import pytest + +from plenum.test import waits +from plenum.test.helper import waitForViewChange, \ + check_request_is_not_returned_to_nodes, checkViewNoForNodes, \ + sendRandomRequest, waitForSufficientRepliesForRequests +from plenum.test.pool_transactions.conftest import looper, clientAndWallet1, \ + client1, wallet1, client1Connected +from plenum.test.pool_transactions.helper import \ + disconnect_node_and_ensure_disconnected +from plenum.test.test_node import ensureElectionsDone, getRequiredInstances +from plenum.test.view_change.helper import start_stopped_node + +nodeCount = 5 + + +def stop_node(node_to_stop, looper, pool_nodes): + disconnect_node_and_ensure_disconnected(looper, pool_nodes, node_to_stop) + looper.removeProdable(node_to_stop) + + +def verify_request_not_replied_and_not_ordered(request, looper, client, nodes): + with pytest.raises(AssertionError): + waitForSufficientRepliesForRequests(looper, client, requests=[request]) + check_request_is_not_returned_to_nodes(looper, nodes, request) + + +def test_pool_reaches_quorum_after_f_plus_2_nodes_turned_off_and_later_on( + looper, allPluginsPath, tdir, tconf, + txnPoolNodeSet, wallet1, client1, client1Connected): + + nodes = txnPoolNodeSet + initial_view_no = nodes[0].viewNo + + request = sendRandomRequest(wallet1, client1) + waitForSufficientRepliesForRequests(looper, client1, requests=[request]) + + stop_node(nodes[0], looper, nodes) + waitForViewChange(looper, nodes[1:], expectedViewNo=initial_view_no + 1) + ensureElectionsDone(looper, nodes[1:], + numInstances=getRequiredInstances(nodeCount)) + + request = sendRandomRequest(wallet1, client1) + waitForSufficientRepliesForRequests(looper, client1, requests=[request]) + + stop_node(nodes[1], looper, nodes) + looper.runFor(tconf.ToleratePrimaryDisconnection + 2) + checkViewNoForNodes(nodes[2:], initial_view_no + 1) + + request = sendRandomRequest(wallet1, client1) + verify_request_not_replied_and_not_ordered(request, looper, client1, nodes) + + stop_node(nodes[2], looper, nodes) + looper.runFor(tconf.ToleratePrimaryDisconnection + 2) + checkViewNoForNodes(nodes[3:], initial_view_no + 1) + + request = sendRandomRequest(wallet1, client1) + verify_request_not_replied_and_not_ordered(request, looper, client1, nodes) + + nodes[2] = start_stopped_node(nodes[2], looper, tconf, tdir, allPluginsPath) + looper.runFor(waits.expectedPoolElectionTimeout(len(nodes))) + + request = sendRandomRequest(wallet1, client1) + verify_request_not_replied_and_not_ordered(request, looper, client1, nodes) + + nodes[1] = start_stopped_node(nodes[1], looper, tconf, tdir, allPluginsPath) + ensureElectionsDone(looper, nodes[1:], + numInstances=getRequiredInstances(nodeCount)) + waitForViewChange(looper, nodes[1:], expectedViewNo=initial_view_no + 1) + + request = sendRandomRequest(wallet1, client1) + waitForSufficientRepliesForRequests(looper, client1, requests=[request]) + + nodes[0] = start_stopped_node(nodes[0], looper, tconf, tdir, allPluginsPath) + ensureElectionsDone(looper, nodes, + numInstances=getRequiredInstances(nodeCount)) + waitForViewChange(looper, nodes, expectedViewNo=initial_view_no + 1) + + request = sendRandomRequest(wallet1, client1) + waitForSufficientRepliesForRequests(looper, client1, requests=[request]) diff --git a/plenum/test/pool_transactions/helper.py b/plenum/test/pool_transactions/helper.py index f3e00adf31..bb7233e46c 100644 --- a/plenum/test/pool_transactions/helper.py +++ b/plenum/test/pool_transactions/helper.py @@ -11,7 +11,7 @@ from plenum.common.signer_simple import SimpleSigner from plenum.common.signer_did import DidSigner from plenum.common.util import randomString, hexToFriendly -from plenum.test.helper import waitForSufficientRepliesForRequests, sdk_gen_request, sdk_sign_and_submit_req_obj,\ +from plenum.test.helper import waitForSufficientRepliesForRequests, sdk_gen_request, sdk_sign_and_submit_req_obj, \ sdk_get_reply from plenum.test.test_client import TestClient, genTestClient from plenum.test.test_node import TestNode, check_node_disconnected_from, \ @@ -56,12 +56,31 @@ def addNewClient(role, looper, creatorClient: Client, creatorWallet: Wallet, def sendAddNewNode(tdir, tconf, newNodeName, stewardClient, stewardWallet, transformOpFunc=None): + sigseed, verkey, bls_key, nodeIp, nodePort, clientIp, clientPort = \ + prepare_new_node_data(tconf, tdir, newNodeName) + return send_new_node_txn(sigseed, + nodeIp, nodePort, clientIp, clientPort, + bls_key, + newNodeName, stewardClient, stewardWallet, + transformOpFunc) + + +def prepare_new_node_data(tconf, tdir, + newNodeName): sigseed = randomString(32).encode() - nodeSigner = SimpleSigner(seed=sigseed) (nodeIp, nodePort), (clientIp, clientPort) = genHa(2) config_helper = PNodeConfigHelper(newNodeName, tconf, chroot=tdir) _, verkey, bls_key = initNodeKeysForBothStacks(newNodeName, config_helper.keys_dir, sigseed, override=True) + return sigseed, verkey, bls_key, nodeIp, nodePort, clientIp, clientPort + + +def send_new_node_txn(sigseed, + nodeIp, nodePort, clientIp, clientPort, + bls_key, + newNodeName, stewardClient, stewardWallet, + transformOpFunc=None): + nodeSigner = SimpleSigner(seed=sigseed) op = { TXN_TYPE: NODE, TARGET_NYM: nodeSigner.identifier, @@ -81,9 +100,9 @@ def sendAddNewNode(tdir, tconf, newNodeName, stewardClient, stewardWallet, req = stewardWallet.signOp(op) stewardClient.submitReqs(req) return req, \ - op[DATA].get(NODE_IP), op[DATA].get(NODE_PORT), \ - op[DATA].get(CLIENT_IP), op[DATA].get(CLIENT_PORT), \ - sigseed + op[DATA].get(NODE_IP), op[DATA].get(NODE_PORT), \ + op[DATA].get(CLIENT_IP), op[DATA].get(CLIENT_PORT), \ + sigseed def addNewNode(looper, stewardClient, stewardWallet, newNodeName, tdir, tconf, @@ -96,19 +115,62 @@ def addNewNode(looper, stewardClient, stewardWallet, newNodeName, tdir, tconf, waitForSufficientRepliesForRequests(looper, stewardClient, requests=[req]) - # initNodeKeysForBothStacks(newNodeName, tdir, sigseed, override=True) - # node = nodeClass(newNodeName, basedirpath=tdir, config=tconf, - # ha=(nodeIp, nodePort), cliha=(clientIp, clientPort), - # pluginPaths=allPluginsPath) - # if autoStart: - # looper.add(node) - # return node - return start_newly_added_node(looper, newNodeName, tdir, sigseed, - (nodeIp, nodePort), (clientIp, clientPort), - tconf, autoStart, allPluginsPath, nodeClass) + return create_and_start_new_node(looper, newNodeName, tdir, sigseed, + (nodeIp, nodePort), (clientIp, clientPort), + tconf, autoStart, allPluginsPath, nodeClass) + + +def start_not_added_node(looper, + tdir, tconf, allPluginsPath, + newNodeName): + ''' + Creates and starts a new node, but doesn't add it to the Pool + (so, NODE txn is not sent). + ''' + sigseed, verkey, bls_key, nodeIp, nodePort, clientIp, clientPort = \ + prepare_new_node_data(tconf, tdir, newNodeName) + + new_node = create_and_start_new_node(looper, newNodeName, + tdir, randomString(32).encode(), + (nodeIp, nodePort), (clientIp, clientPort), + tconf, True, allPluginsPath, TestNode) + return sigseed, bls_key, new_node + + +def add_started_node(looper, + new_node, + txnPoolNodeSet, + client_tdir, + stewardClient, stewardWallet, + sigseed, + bls_key): + ''' + Adds already created node to the pool, + that is sends NODE txn. + Makes sure that node is actually added and connected to all otehr nodes. + ''' + newSteward, newStewardWallet = addNewSteward(looper, client_tdir, + stewardClient, stewardWallet, + "Steward" + new_node.name, + clientClass=TestClient) + node_name = new_node.name + send_new_node_txn(sigseed, + new_node.poolManager.nodeReg[node_name][0], + new_node.poolManager.nodeReg[node_name][1], + new_node.poolManager.cliNodeReg[node_name + "C"][0], + new_node.poolManager.cliNodeReg[node_name + "C"][1], + bls_key, + node_name, + newSteward, newStewardWallet) + + txnPoolNodeSet.append(new_node) + looper.run(checkNodesConnected(txnPoolNodeSet)) + ensureClientConnectedToNodesAndPoolLedgerSame(looper, newSteward, *txnPoolNodeSet) + + waitNodeDataEquality(looper, new_node, *txnPoolNodeSet[:-1]) -def start_newly_added_node( +def create_and_start_new_node( looper, node_name, tdir, @@ -130,6 +192,7 @@ def start_newly_added_node( looper.add(node) return node + def new_node( node_name, tdir, @@ -146,6 +209,7 @@ def new_node( pluginPaths=plugin_path) return node + def addNewSteward(looper, client_tdir, creatorClient, creatorWallet, stewardName, clientClass=TestClient): @@ -165,7 +229,6 @@ def addNewStewardAndNode(looper, creatorClient, creatorWallet, stewardName, newNodeName, tdir, client_tdir, tconf, allPluginsPath=None, autoStart=True, nodeClass=TestNode, clientClass=TestClient, transformNodeOpFunc=None): - newSteward, newStewardWallet = addNewSteward(looper, client_tdir, creatorClient, creatorWallet, stewardName, clientClass=clientClass) diff --git a/plenum/test/view_change/test_6th_node_join_after_view_change_by_primary_restart.py b/plenum/test/view_change/test_6th_node_join_after_view_change_by_primary_restart.py index 86515eb6ad..b21143670b 100644 --- a/plenum/test/view_change/test_6th_node_join_after_view_change_by_primary_restart.py +++ b/plenum/test/view_change/test_6th_node_join_after_view_change_by_primary_restart.py @@ -67,7 +67,6 @@ def add_new_node(looper, nodes, steward, steward_wallet, return new_node -@pytest.mark.skip(reason='INDY-1084') def test_6th_node_join_after_view_change_by_master_restart( looper, txnPoolNodeSet, tdir, tconf, allPluginsPath, steward1, stewardWallet, diff --git a/plenum/test/view_change/test_no_instance_change_before_node_is_ready.py b/plenum/test/view_change/test_no_instance_change_before_node_is_ready.py new file mode 100644 index 0000000000..a6a6d06185 --- /dev/null +++ b/plenum/test/view_change/test_no_instance_change_before_node_is_ready.py @@ -0,0 +1,60 @@ +import pytest + +from plenum.server.view_change.view_changer import ViewChanger + +from plenum.test.pool_transactions.conftest import clientAndWallet1, \ + client1, wallet1, client1Connected, looper, stewardAndWallet1, steward1, \ + stewardWallet + +from stp_core.common.log import getlogger +from plenum.test.pool_transactions.helper import start_not_added_node, add_started_node + +logger = getlogger() + + +@pytest.fixture(scope="module", autouse=True) +def tconf(tconf): + old_vc_timeout = tconf.VIEW_CHANGE_TIMEOUT + tconf.VIEW_CHANGE_TIMEOUT = 5 + yield tconf + tconf.VIEW_CHANGE_TIMEOUT = old_vc_timeout + + +def test_no_instance_change_on_primary_disconnection_for_not_ready_node( + looper, txnPoolNodeSet, tdir, tconf, + allPluginsPath, steward1, stewardWallet, + client_tdir): + """ + Test steps: + 1. create a new node, but don't add it to the pool (so not send NODE txn), so that the node is not ready. + 2. wait for more than VIEW_CHANGE_TIMEOUT (a timeout for initial check for disconnected primary) + 3. make sure no InstanceChange sent by the new node + 4. add the node to the pool (send NODE txn) and make sure that the node is ready now. + 5. wait for more than VIEW_CHANGE_TIMEOUT (a timeout for initial check for disconnected primary) + 6. make sure no InstanceChange sent by the new node + """ + + # 1. create a new node, but don't add it to the pool (so not send NODE txn), so that the node is not ready. + sigseed, bls_key, new_node = start_not_added_node(looper, + tdir, tconf, allPluginsPath, + "TestTheta") + + # 2. wait for more than VIEW_CHANGE_TIMEOUT (a timeout for initial check for disconnected primary) + looper.runFor(tconf.VIEW_CHANGE_TIMEOUT + 2) + + # 3. make sure no InstanceChange sent by the new node + assert 0 == new_node.view_changer.spylog.count(ViewChanger.sendInstanceChange.__name__) + + # 4. add the node to the pool (send NODE txn) and make sure that the node is ready now. + add_started_node(looper, + new_node, + txnPoolNodeSet, + client_tdir, + steward1, stewardWallet, + sigseed, bls_key) + + # 5. wait for more than VIEW_CHANGE_TIMEOUT (a timeout for initial check for disconnected primary) + looper.runFor(tconf.VIEW_CHANGE_TIMEOUT + 2) + + # 6. make sure no InstanceChange sent by the new node + assert 0 == new_node.view_changer.spylog.count(ViewChanger.sendInstanceChange.__name__) diff --git a/plenum/test/view_change/test_view_changes_if_master_primary_disconnected.py b/plenum/test/view_change/test_view_changes_if_master_primary_disconnected.py index c0cf85e258..27ac2bef3d 100644 --- a/plenum/test/view_change/test_view_changes_if_master_primary_disconnected.py +++ b/plenum/test/view_change/test_view_changes_if_master_primary_disconnected.py @@ -48,7 +48,7 @@ def test_view_changes_if_master_primary_disconnected(txnPoolNodeSet, looper, sdk txnPoolNodeSet = remaining_nodes + [old_pr_node] looper.run(eventually(checkViewNoForNodes, - txnPoolNodeSet, old_view_no + 1, timeout=10)) + txnPoolNodeSet, old_view_no + 1, timeout=tconf.VIEW_CHANGE_TIMEOUT)) assert len(getAllReturnVals(old_pr_node.view_changer, old_pr_node.view_changer._start_view_change_if_possible, compare_val_to=True)) > 0 diff --git a/plenum/test/waits.py b/plenum/test/waits.py index 65666d0a4f..ac272bcb6c 100644 --- a/plenum/test/waits.py +++ b/plenum/test/waits.py @@ -104,9 +104,7 @@ def expectedPoolCatchupTime(nodeCount): To: each of the Nodes finished the the catchup procedure """ config = getConfig() - nodeCatchupTimeout = __Peer2PeerRequestExchangeTime + \ - config.CatchupTransactionsTimeout - return nodeCount * nodeCatchupTimeout + return nodeCount * config.CatchupTransactionsTimeout def expectedPoolGetReadyTimeout(nodeCount): diff --git a/scripts/add_json_txns_to_ledger.py b/scripts/add_json_txns_to_ledger.py new file mode 100644 index 0000000000..37f5caae0f --- /dev/null +++ b/scripts/add_json_txns_to_ledger.py @@ -0,0 +1,67 @@ +#! /usr/bin/env python3 + +import os +import sys +import json +import argparse + +from stp_core.types import HA +from indy_common.config_util import getConfig +from plenum.server.node import Node +from indy_common.config_helper import NodeConfigHelper + +config = getConfig() + + +def get_ha_cliha_node_name(path_to_env): + node_name_key = 'NODE_NAME' + node_port_key = 'NODE_PORT' + node_clien_port_key = 'NODE_CLIENT_PORT' + node_name = '' + node_port = '' + node_clieint_port = '' + with open(path_to_env) as fenv: + for line in fenv.readlines(): + print(line) + if line.find(node_name_key) != -1: + node_name = line.split('=')[1].strip() + elif line.find(node_port_key) != -1: + node_port = int(line.split('=')[1].strip()) + elif line.find(node_clien_port_key) != -1: + node_clieint_port = int(line.split('=')[1].strip()) + return node_name, node_port, node_clieint_port + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('infpath', help="Path to previous generated txns", type=str, default='/tmp/generated_txns') + parser.add_argument('--env_file', help='Path to environment file with node name and ports', default='/etc/indy/indy.env') + args = parser.parse_args() + path_to_txns = os.path.realpath(args.infpath) + path_to_env = os.path.realpath(args.env_file) + + if not os.path.exists(path_to_txns): + print("Path to txns file does not exist") + sys.exit(1) + + if not os.path.exists(path_to_env): + print("Path to env file does not exist") + sys.exit(1) + + nname, nport, ncliport = get_ha_cliha_node_name(path_to_env) + ha = HA("0.0.0.0", nport) + cliha = HA("0.0.0.0", ncliport) + config_helper = NodeConfigHelper(nname, config) + + node = Node(nname, nodeRegistry=None, + ha=ha, + cliha=cliha, + config_helper=config_helper, + config=config) + i = 0 + with open(path_to_txns) as txns: + for txn in txns: + node.domainLedger.add(json.loads(txn)) + i += 1 + if not i % 1000: + print("added {} txns".format(i)) diff --git a/scripts/generate_txns.py b/scripts/generate_txns.py new file mode 100644 index 0000000000..b0900b10c6 --- /dev/null +++ b/scripts/generate_txns.py @@ -0,0 +1,94 @@ +#! /usr/bin/env python3 + +import os +import json +import time +from contextlib import ExitStack +import argparse +import random +from typing import Sequence +from plenum.common.request import Request +from plenum.common.constants import CURRENT_PROTOCOL_VERSION +from plenum.common.util import randomString +from plenum.common.config_util import getConfig +from plenum.common.txn_util import sdk_reqToTxn +from indy.ledger import sign_request +from indy import signus, wallet +from stp_core.loop.looper import Looper + +config = getConfig() + + +async def get_wallet_and_pool(): + pool_name = 'pool' + randomString(3) + wallet_name = 'wallet' + randomString(10) + their_wallet_name = 'their_wallet' + randomString(10) + seed_trustee1 = "000000000000000000000000Trustee1" + + await wallet.create_wallet(pool_name, wallet_name, None, None, None) + my_wallet_handle = await wallet.open_wallet(wallet_name, None, None) + + await wallet.create_wallet(pool_name, their_wallet_name, None, None, None) + their_wallet_handle = await wallet.open_wallet(their_wallet_name, None, None) + + await signus.create_and_store_my_did(my_wallet_handle, "{}") + + (their_did, their_verkey) = await signus.create_and_store_my_did(their_wallet_handle, + json.dumps({"seed": seed_trustee1})) + + await signus.store_their_did(my_wallet_handle, json.dumps({'did': their_did, 'verkey': their_verkey})) + + return their_wallet_handle, their_did + + +def randomOperation(): + return { + "type": "buy", + "amount": random.randint(10, 100000) + } + + +def random_requests(count): + return [randomOperation() for _ in range(count)] + + +def sdk_gen_request(operation, protocol_version=CURRENT_PROTOCOL_VERSION, identifier=None): + return Request(operation=operation, reqId=random.randint(10, 100000), + protocolVersion=protocol_version, identifier=identifier) + + +def sdk_random_request_objects(count, protocol_version, identifier=None): + ops = random_requests(count) + return [sdk_gen_request(op, protocol_version=protocol_version, identifier=identifier) for op in ops] + + +def sdk_sign_request_objects(looper, sdk_wallet, reqs: Sequence): + wallet_h, did = sdk_wallet + reqs_str = [json.dumps(req.as_dict) for req in reqs] + resp = [looper.loop.run_until_complete(sign_request(wallet_h, did, req)) for req in reqs_str] + return resp + + +def sdk_signed_random_requests(looper, sdk_wallet, count): + _, did = sdk_wallet + reqs_obj = sdk_random_request_objects(count, identifier=did, protocol_version=CURRENT_PROTOCOL_VERSION) + return sdk_sign_request_objects(looper, sdk_wallet, reqs_obj) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('count', help="Count of generated txns", type=int) + parser.add_argument('outfpath', help="Path to save generated txns", type=str, default='/tmp/generated_txns') + args = parser.parse_args() + path_to_save = os.path.realpath(args.outfpath) + + with ExitStack() as exit_stack: + with Looper() as looper: + sdk_wallet, did = looper.loop.run_until_complete(get_wallet_and_pool()) + with open(path_to_save, 'w') as outpath: + for _ in range(args.count): + req = sdk_signed_random_requests(looper, (sdk_wallet, did), 1)[0] + txn = sdk_reqToTxn(req, int(time.time())) + outpath.write(json.dumps(txn)) + outpath.write(os.linesep) + looper.stopall()