From 56181ff566ef5a54331951aec9f7369698652a3a Mon Sep 17 00:00:00 2001 From: andkononykhin Date: Fri, 2 Jun 2017 13:20:47 +0300 Subject: [PATCH] Use original batch parameters in all tests except the ones that use batches and use alternate of eventuallyAll to check a collection of functions under a timeout (#190) * updates to test helpers and change in forwarding requests logic * overriding batch params in tests * use correct timeouts in tests * accounting for nomination delay --- plenum/client/client.py | 2 +- plenum/config.py | 22 +++--- plenum/server/node.py | 3 - plenum/test/batching_3pc/conftest.py | 5 +- .../test/batching_3pc/test_basic_batching.py | 20 +++--- .../batching_3pc/test_batching_scenarios.py | 3 +- plenum/test/checkpoints/conftest.py | 17 +++-- .../checkpoints/test_basic_checkpointing.py | 15 ++-- .../test_discard_old_checkpoint_messages.py | 6 +- .../test_message_outside_watermark.py | 7 +- .../test_message_outside_watermark1.py | 20 ++++-- .../checkpoints/test_stable_checkpoint.py | 25 +++---- .../checkpoints/test_stable_checkpoint1.py | 6 +- plenum/test/conftest.py | 20 +++--- plenum/test/helper.py | 68 ++++++++++++++----- .../test/node_request/node_request_helper.py | 25 ++++--- ...st_num_of_commit_with_f_plus_one_faults.py | 3 +- .../node_request/test_request_forwarding.py | 47 +++++++------ .../test_primary_election_case1.py | 6 +- setup.py | 2 +- 20 files changed, 188 insertions(+), 134 deletions(-) diff --git a/plenum/client/client.py b/plenum/client/client.py index dc5040d6ae..ac5a7a3c58 100644 --- a/plenum/client/client.py +++ b/plenum/client/client.py @@ -572,7 +572,7 @@ def retryForExpected(self): # even if pool is just busy and cannot answer quickly, # that's why using maintainConnections instead # self.nodestack.connect(name=remote.name) - self.nodestack.maintainConnections() + self.nodestack.maintainConnections(force=True) if aliveRequests: # Need a delay in case connection has to be established with some diff --git a/plenum/config.py b/plenum/config.py index cb816e44eb..33f40f21cd 100644 --- a/plenum/config.py +++ b/plenum/config.py @@ -138,14 +138,6 @@ # Expected time for one stack to get connected to another ExpectedConnectTime = 3.3 if sys.platform == 'win32' else 2 - -# After ordering every `CHK_FREQ` requests, replica sends a CHECKPOINT -CHK_FREQ = 100000 - -# Difference between low water mark and high water mark -LOG_SIZE = 3*CHK_FREQ - - # Since the ledger is stored in a flat file, this makes the ledger do # an fsync on every write. Making it True can significantly slow # down writes as shown in a test `test_file_store_perf.py` in the ledger @@ -167,11 +159,8 @@ # Max batch size for 3 phase commit Max3PCBatchSize = 100 # Max time to wait before creating a batch for 3 phase commit -Max3PCBatchWait = 1 +Max3PCBatchWait = .001 -# Maximum lifespan for a batch, this needs to be changed if -# `Max3PCBatchSize` is changed -ThreePCBatchTimeout = 25 # Each node keeps a map of PrePrepare sequence numbers and the corresponding # txn seqnos that came out of it. Helps in servicing Consistency Proof Requests @@ -186,7 +175,14 @@ MaxStateProofTime = 3 +# After ordering every `CHK_FREQ` batches, replica sends a CHECKPOINT +CHK_FREQ = 10000 + +# Difference between low water mark and high water mark +LOG_SIZE = 3*CHK_FREQ + + CLIENT_REQACK_TIMEOUT = 5 -CLIENT_REPLY_TIMEOUT = Max3PCBatchWait + 10 +CLIENT_REPLY_TIMEOUT = 15 CLIENT_MAX_RETRY_ACK = 5 CLIENT_MAX_RETRY_REPLY = 5 diff --git a/plenum/server/node.py b/plenum/server/node.py index c596391f99..b0c0edd9b0 100644 --- a/plenum/server/node.py +++ b/plenum/server/node.py @@ -328,9 +328,6 @@ def __init__(self, # help in voting for/against a view change. self.lost_primary_at = None - # First view change message received for a view no - self.view_change_started_at = {} - tp = loadPlugins(self.basedirpath) logger.debug("total plugins loaded in node: {}".format(tp)) # TODO: this is already happening in `start`, why here then? diff --git a/plenum/test/batching_3pc/conftest.py b/plenum/test/batching_3pc/conftest.py index 58258de369..89a0d1c661 100644 --- a/plenum/test/batching_3pc/conftest.py +++ b/plenum/test/batching_3pc/conftest.py @@ -6,13 +6,10 @@ @pytest.fixture(scope="module") def tconf(tconf, request): oldSize = tconf.Max3PCBatchSize - oldTIme = tconf.Max3PCBatchWait - tconf.Max3PCBatchSize = 3 - tconf.Max3PCBatchWait = 5 + tconf.Max3PCBatchSize = 10 def reset(): tconf.Max3PCBatchSize = oldSize - tconf.Max3PCBatchWait = oldTIme request.addfinalizer(reset) return tconf diff --git a/plenum/test/batching_3pc/test_basic_batching.py b/plenum/test/batching_3pc/test_basic_batching.py index 4e2a83a680..d31272fa2a 100644 --- a/plenum/test/batching_3pc/test_basic_batching.py +++ b/plenum/test/batching_3pc/test_basic_batching.py @@ -31,8 +31,7 @@ def test3PCOverBatchWithThresholdReqs(tconf, looper, txnPoolNodeSet, client, :return: """ reqs = sendRandomRequests(wallet1, client, tconf.Max3PCBatchSize) - waitForSufficientRepliesForRequests(looper, client, requests=reqs, - customTimeoutPerReq=tconf.Max3PCBatchWait-1) + waitForSufficientRepliesForRequests(looper, client, requests=reqs) def test3PCOverBatchWithLessThanThresholdReqs(tconf, looper, txnPoolNodeSet, @@ -43,8 +42,7 @@ def test3PCOverBatchWithLessThanThresholdReqs(tconf, looper, txnPoolNodeSet, :return: """ reqs = sendRandomRequests(wallet1, client, tconf.Max3PCBatchSize - 1) - waitForSufficientRepliesForRequests(looper, client, requests=reqs, - customTimeoutPerReq=tconf.Max3PCBatchWait + 1) + waitForSufficientRepliesForRequests(looper, client, requests=reqs) def testTreeRootsCorrectAfterEachBatch(tconf, looper, txnPoolNodeSet, @@ -56,14 +54,12 @@ def testTreeRootsCorrectAfterEachBatch(tconf, looper, txnPoolNodeSet, """ # Send 1 batch reqs = sendRandomRequests(wallet1, client, tconf.Max3PCBatchSize) - waitForSufficientRepliesForRequests(looper, client, requests=reqs, - customTimeoutPerReq=tconf.Max3PCBatchWait) + waitForSufficientRepliesForRequests(looper, client, requests=reqs) checkNodesHaveSameRoots(txnPoolNodeSet) # Send 2 batches reqs = sendRandomRequests(wallet1, client, 2 * tconf.Max3PCBatchSize) - waitForSufficientRepliesForRequests(looper, client, requests=reqs, - customTimeoutPerReq=2*tconf.Max3PCBatchWait) + waitForSufficientRepliesForRequests(looper, client, requests=reqs) checkNodesHaveSameRoots(txnPoolNodeSet) @@ -90,11 +86,11 @@ def rejectingMethod(self, req): node.doDynamicValidation = types.MethodType(rejectingMethod, node) reqs = sendRandomRequests(wallet1, client, tconf.Max3PCBatchSize) - waitForSufficientRepliesForRequests(looper, client, requests=reqs[:-1], - customTimeoutPerReq=tconf.Max3PCBatchWait) + waitForSufficientRepliesForRequests(looper, client, requests=reqs[:-1]) + with pytest.raises(AssertionError): - waitForSufficientRepliesForRequests(looper, client, requests=reqs[-1:], - customTimeoutPerReq=tconf.Max3PCBatchWait) + waitForSufficientRepliesForRequests(looper, client, requests=reqs[-1:]) + for node in txnPoolNodeSet: looper.run(eventually(checkRejectWithReason, client, 'Simulated rejection', node.clientstack.name, diff --git a/plenum/test/batching_3pc/test_batching_scenarios.py b/plenum/test/batching_3pc/test_batching_scenarios.py index 99795a8c0c..b318986b97 100644 --- a/plenum/test/batching_3pc/test_batching_scenarios.py +++ b/plenum/test/batching_3pc/test_batching_scenarios.py @@ -44,8 +44,7 @@ def specificPrePrepares(wrappedMsg): reqs = sendRandomRequests(wallet1, client, (ppsToDelay+1)*tconf.Max3PCBatchSize) - waitForSufficientRepliesForRequests(looper, client, requests=reqs, - customTimeoutPerReq=(ppsToDelay + 1) * tconf.Max3PCBatchWait) + waitForSufficientRepliesForRequests(looper, client, requests=reqs) checkNodesHaveSameRoots(txnPoolNodeSet) for r in otherR: diff --git a/plenum/test/checkpoints/conftest.py b/plenum/test/checkpoints/conftest.py index ecbeff66ec..749d1409f8 100644 --- a/plenum/test/checkpoints/conftest.py +++ b/plenum/test/checkpoints/conftest.py @@ -2,8 +2,7 @@ from plenum.test.pool_transactions.conftest import looper, clientAndWallet1, \ client1, wallet1, client1Connected - -CHK_FREQ = 5 +from plenum.test.batching_3pc.conftest import tconf @pytest.fixture(scope="module") @@ -11,8 +10,8 @@ def chkFreqPatched(tconf, request): oldChkFreq = tconf.CHK_FREQ oldLogSize = tconf.LOG_SIZE - tconf.CHK_FREQ = CHK_FREQ - tconf.LOG_SIZE = 3*tconf.CHK_FREQ + tconf.CHK_FREQ = 2 + tconf.LOG_SIZE = 2*tconf.CHK_FREQ def reset(): tconf.CHK_FREQ = oldChkFreq @@ -21,3 +20,13 @@ def reset(): request.addfinalizer(reset) return tconf + + +@pytest.fixture(scope="module") +def reqs_for_checkpoint(chkFreqPatched): + return chkFreqPatched.CHK_FREQ * chkFreqPatched.Max3PCBatchSize + + +@pytest.fixture(scope="module") +def reqs_for_logsize(chkFreqPatched): + return chkFreqPatched.LOG_SIZE * chkFreqPatched.Max3PCBatchSize diff --git a/plenum/test/checkpoints/test_basic_checkpointing.py b/plenum/test/checkpoints/test_basic_checkpointing.py index 6d752f3b8e..fad4eb4b8f 100644 --- a/plenum/test/checkpoints/test_basic_checkpointing.py +++ b/plenum/test/checkpoints/test_basic_checkpointing.py @@ -1,35 +1,38 @@ +import pytest + from stp_core.loop.eventually import eventually from plenum.test import waits -from plenum.test.checkpoints.conftest import CHK_FREQ from plenum.test.checkpoints.helper import chkChkpoints from plenum.test.helper import sendReqsToNodesAndVerifySuffReplies def testCheckpointCreated(chkFreqPatched, looper, txnPoolNodeSet, client1, - wallet1, client1Connected): + wallet1, client1Connected, reqs_for_checkpoint): """ After requests less than `CHK_FREQ`, there should be one checkpoint on each replica. After `CHK_FREQ`, one checkpoint should become stable """ - sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, CHK_FREQ-1, 1) + # Send one batch less so checkpoint is not created + sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, + reqs_for_checkpoint-(chkFreqPatched.Max3PCBatchSize), 1) # Deliberately waiting so as to verify that not more than 1 checkpoint is # created looper.runFor(2) chkChkpoints(txnPoolNodeSet, 1) - sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, 1, 1) + sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, chkFreqPatched.Max3PCBatchSize, 1) timeout = waits.expectedTransactionExecutionTime(len(txnPoolNodeSet)) looper.run(eventually(chkChkpoints, txnPoolNodeSet, 1, 0, retryWait=1, timeout=timeout)) def testOldCheckpointDeleted(chkFreqPatched, looper, txnPoolNodeSet, client1, - wallet1, client1Connected): + wallet1, client1Connected, reqs_for_checkpoint): """ Send requests more than twice of `CHK_FREQ`, there should be one new stable checkpoint on each replica. The old stable checkpoint should be removed """ - sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, 2*CHK_FREQ, + sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, 2*reqs_for_checkpoint, 1) sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, 1, 1) diff --git a/plenum/test/checkpoints/test_discard_old_checkpoint_messages.py b/plenum/test/checkpoints/test_discard_old_checkpoint_messages.py index b954a02540..6bfd9d0441 100644 --- a/plenum/test/checkpoints/test_discard_old_checkpoint_messages.py +++ b/plenum/test/checkpoints/test_discard_old_checkpoint_messages.py @@ -1,6 +1,5 @@ from stp_core.loop.eventually import eventually from plenum.common.types import Checkpoint -from plenum.test.checkpoints.conftest import CHK_FREQ from plenum.test.checkpoints.helper import chkChkpoints from plenum.test.helper import sendReqsToNodesAndVerifySuffReplies, \ checkDiscardMsg @@ -8,8 +7,9 @@ def testDiscardCheckpointMsgForStableCheckpoint(chkFreqPatched, looper, txnPoolNodeSet, client1, - wallet1, client1Connected): - sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, CHK_FREQ, 1) + wallet1, client1Connected, + reqs_for_checkpoint): + sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, reqs_for_checkpoint, 1) looper.run(eventually(chkChkpoints, txnPoolNodeSet, 1, 0, retryWait=1)) node1 = txnPoolNodeSet[0] rep1 = node1.replicas[0] diff --git a/plenum/test/checkpoints/test_message_outside_watermark.py b/plenum/test/checkpoints/test_message_outside_watermark.py index 2bda8ea407..c2a0b87802 100644 --- a/plenum/test/checkpoints/test_message_outside_watermark.py +++ b/plenum/test/checkpoints/test_message_outside_watermark.py @@ -1,6 +1,4 @@ from plenum.test import waits -from plenum.test.checkpoints.conftest import CHK_FREQ -from plenum.test.checkpoints.helper import chkChkpoints from plenum.test.delayers import ppDelay from plenum.test.helper import sendReqsToNodesAndVerifySuffReplies, \ countDiscarded @@ -11,7 +9,8 @@ def testNonPrimaryRecvs3PhaseMessageOutsideWatermarks(chkFreqPatched, looper, txnPoolNodeSet, client1, - wallet1, client1Connected): + wallet1, client1Connected, + reqs_for_logsize): """ A node is slow in processing PRE-PREPAREs such that lot of requests happen and the slow node has started getting 3 phase messages outside of it @@ -22,7 +21,7 @@ def testNonPrimaryRecvs3PhaseMessageOutsideWatermarks(chkFreqPatched, looper, """ delay = 15 instId = 1 - reqsToSend = chkFreqPatched.LOG_SIZE + 2 + reqsToSend = reqs_for_logsize + 2 npr = getNonPrimaryReplicas(txnPoolNodeSet, instId) slowReplica = npr[0] slowNode = slowReplica.node diff --git a/plenum/test/checkpoints/test_message_outside_watermark1.py b/plenum/test/checkpoints/test_message_outside_watermark1.py index fb288b07a5..ea1b2f695d 100644 --- a/plenum/test/checkpoints/test_message_outside_watermark1.py +++ b/plenum/test/checkpoints/test_message_outside_watermark1.py @@ -1,3 +1,5 @@ +import math + from stp_core.loop.eventually import eventually from plenum.test import waits @@ -11,7 +13,8 @@ def testPrimaryRecvs3PhaseMessageOutsideWatermarks(tconf, chkFreqPatched, looper, txnPoolNodeSet, client1, - wallet1, client1Connected): + wallet1, client1Connected, + reqs_for_logsize): """ One of the primary starts getting lot of requests, more than his log size and queues up requests since they will go beyond its watermarks. This @@ -19,9 +22,9 @@ def testPrimaryRecvs3PhaseMessageOutsideWatermarks(tconf, chkFreqPatched, looper Eventually this primary will send PRE-PREPARE for all requests and those requests will complete """ - delay = 5 + delay = 3 instId = 1 - reqsToSend = 2*chkFreqPatched.LOG_SIZE + 1 + reqsToSend = 2*reqs_for_logsize + 1 npr = getNonPrimaryReplicas(txnPoolNodeSet, instId) pr = getPrimaryReplica(txnPoolNodeSet, instId) from plenum.server.replica import TPCStat @@ -30,9 +33,14 @@ def testPrimaryRecvs3PhaseMessageOutsideWatermarks(tconf, chkFreqPatched, looper for r in npr: r.node.nodeIbStasher.delay(ppDelay(delay, instId)) + tm_exec_1_batch = waits.expectedTransactionExecutionTime(len(txnPoolNodeSet)) + batch_count = math.ceil(reqsToSend / tconf.Max3PCBatchSize) + total_timeout = (tm_exec_1_batch + delay) * batch_count + def chk(): - assert orderedCount + reqsToSend == pr.stats.get(TPCStat.OrderSent) + assert orderedCount + batch_count == pr.stats.get(TPCStat.OrderSent) - print('Sending {} requests'.format(reqsToSend)) - sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, reqsToSend, 1) + sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, reqsToSend, + 1, override_timeout_limit=True, + total_timeout=total_timeout) looper.run(eventually(chk, retryWait=1, timeout=3)) diff --git a/plenum/test/checkpoints/test_stable_checkpoint.py b/plenum/test/checkpoints/test_stable_checkpoint.py index ad5db58ca9..0b98ad76aa 100644 --- a/plenum/test/checkpoints/test_stable_checkpoint.py +++ b/plenum/test/checkpoints/test_stable_checkpoint.py @@ -1,36 +1,37 @@ from stp_core.loop.eventually import eventually from plenum.test import waits -from plenum.test.checkpoints.conftest import CHK_FREQ from plenum.test.checkpoints.helper import chkChkpoints from plenum.test.helper import sendReqsToNodesAndVerifySuffReplies -def checkRequestCounts(nodes, count): +def checkRequestCounts(nodes, req_count, cons_count): for node in nodes: - assert len(node.requests) == count + assert len(node.requests) == req_count for r in node.replicas: - assert len(r.commits) == count - assert len(r.prepares) == count + assert len(r.commits) == cons_count + assert len(r.prepares) == cons_count def testRequestOlderThanStableCheckpointRemoved(chkFreqPatched, looper, txnPoolNodeSet, client1, - wallet1, client1Connected): + wallet1, client1Connected, + reqs_for_checkpoint): reqs = sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, - CHK_FREQ-1, 1) + reqs_for_checkpoint - (chkFreqPatched.Max3PCBatchSize), 1) timeout = waits.expectedTransactionExecutionTime(len(txnPoolNodeSet)) looper.run(eventually(chkChkpoints, txnPoolNodeSet, 1, retryWait=1, timeout=timeout)) - checkRequestCounts(txnPoolNodeSet, len(reqs)) - sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, 1, 1) + checkRequestCounts(txnPoolNodeSet, len(reqs), chkFreqPatched.CHK_FREQ-1) + sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, + chkFreqPatched.Max3PCBatchSize, 1) looper.run(eventually(chkChkpoints, txnPoolNodeSet, 1, 0, retryWait=1, timeout=timeout)) - checkRequestCounts(txnPoolNodeSet, 0) + checkRequestCounts(txnPoolNodeSet, 0, 0) sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, - 3*CHK_FREQ + 1, 1) + reqs_for_checkpoint + 1, 1) looper.run(eventually(chkChkpoints, txnPoolNodeSet, 2, 0, retryWait=1, timeout=timeout)) - checkRequestCounts(txnPoolNodeSet, 1) + checkRequestCounts(txnPoolNodeSet, 1, 1) diff --git a/plenum/test/checkpoints/test_stable_checkpoint1.py b/plenum/test/checkpoints/test_stable_checkpoint1.py index 6492f47295..7d5f6ff26f 100644 --- a/plenum/test/checkpoints/test_stable_checkpoint1.py +++ b/plenum/test/checkpoints/test_stable_checkpoint1.py @@ -1,7 +1,6 @@ from stp_core.loop.eventually import eventually from plenum.test import waits -from plenum.test.checkpoints.conftest import CHK_FREQ from plenum.test.checkpoints.helper import chkChkpoints from plenum.test.delayers import ppDelay from plenum.test.helper import sendReqsToNodesAndVerifySuffReplies @@ -10,7 +9,8 @@ def testStableCheckpointWhenOneInstanceSlow(chkFreqPatched, looper, txnPoolNodeSet, client1, - wallet1, client1Connected): + wallet1, client1Connected, + reqs_for_checkpoint): delay = 5 pr = getPrimaryReplica(txnPoolNodeSet, 1) slowNode = pr.node @@ -18,7 +18,7 @@ def testStableCheckpointWhenOneInstanceSlow(chkFreqPatched, looper, for n in otherNodes: n.nodeIbStasher.delay(ppDelay(delay, 1)) - sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, CHK_FREQ, 1) + sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, reqs_for_checkpoint, 1) timeout = waits.expectedTransactionExecutionTime(len(txnPoolNodeSet)) + delay looper.run(eventually(chkChkpoints, txnPoolNodeSet, 1, 0, retryWait=1, timeout=timeout)) diff --git a/plenum/test/conftest.py b/plenum/test/conftest.py index 0b405dc016..e0a1ad2daa 100644 --- a/plenum/test/conftest.py +++ b/plenum/test/conftest.py @@ -41,7 +41,7 @@ from plenum.test.helper import randomOperation, \ checkReqAck, checkLastClientReqForNode, waitForSufficientRepliesForRequests, \ waitForViewChange, requestReturnedToNode, randomText, \ - mockGetInstalledDistributions, mockImportModule + mockGetInstalledDistributions, mockImportModule, chk_all_funcs from plenum.test.node_request.node_request_helper import checkPrePrepared, \ checkPropagated, checkPrepared, checkCommitted from plenum.test.plugin.helper import getPluginPath @@ -49,7 +49,7 @@ from plenum.test.test_node import TestNode, TestNodeSet, Pool, \ checkNodesConnected, ensureElectionsDone, genNodeReg -Logger.setLogLevel(logging.DEBUG) +Logger.setLogLevel(logging.NOTSET) logger = getlogger() config = getConfig() @@ -162,8 +162,6 @@ def getValueFromModule(request, name: str, default: Any = None): PLUGIN_BASE_DIR_PATH: testPluginBaseDirPath, PLUGIN_TYPE_STATS_CONSUMER: "stats_consumer" }, - 'Max3PCBatchSize': 1, - 'DELTA': .8 } @@ -378,17 +376,19 @@ def reqAcked1(looper, nodeSet, client1, sent1, faultyNodes): propTimeout = waits.expectedClientToPoolRequestDeliveryTime(numerOfNodes) coros = [partial(checkLastClientReqForNode, node, sent1) for node in nodeSet] - looper.run(eventuallyAll(*coros, - totalTimeout=propTimeout, - acceptableFails=faultyNodes)) + # looper.run(eventuallyAll(*coros, + # totalTimeout=propTimeout, + # acceptableFails=faultyNodes)) + chk_all_funcs(looper, coros, acceptable_fails=faultyNodes, timeout=propTimeout) # Wait until sufficient number of acks received coros2 = [partial(checkReqAck, client1, node, sent1.identifier, sent1.reqId) for node in nodeSet] ackTimeout = waits.expectedReqAckQuorumTime() - looper.run(eventuallyAll(*coros2, - totalTimeout=ackTimeout, - acceptableFails=faultyNodes)) + # looper.run(eventuallyAll(*coros2, + # totalTimeout=ackTimeout, + # acceptableFails=faultyNodes)) + chk_all_funcs(looper, coros2, acceptable_fails=faultyNodes, timeout=ackTimeout) return sent1 diff --git a/plenum/test/helper.py b/plenum/test/helper.py index b95f7a4954..b249cc991d 100644 --- a/plenum/test/helper.py +++ b/plenum/test/helper.py @@ -87,7 +87,9 @@ def waitForSufficientRepliesForRequests(looper, requestIds = None, fVal=None, customTimeoutPerReq=None, - add_delay_to_timeout: float = 0): + add_delay_to_timeout: float = 0, + override_timeout_limit=False, + total_timeout=None): """ Checks number of replies for given requests of specific client and raises exception if quorum not reached at least for one @@ -105,16 +107,17 @@ def waitForSufficientRepliesForRequests(looper, nodeCount = len(client.nodeReg) fVal = fVal or getMaxFailures(nodeCount) - timeoutPerRequest = customTimeoutPerReq or \ - waits.expectedTransactionExecutionTime(nodeCount) - timeoutPerRequest += add_delay_to_timeout - # here we try to take into account what timeout for execution - # N request - totalTimeout should be in - # timeoutPerRequest < totalTimeout < timeoutPerRequest * N - # we cannot just take (timeoutPerRequest * N) because it is so huge. - # (for timeoutPerRequest=5 and N=10, totalTimeout=50sec) - # lets start with some simple formula: - totalTimeout = (1 + len(requestIds) / 10) * timeoutPerRequest + if not total_timeout: + timeoutPerRequest = customTimeoutPerReq or \ + waits.expectedTransactionExecutionTime(nodeCount) + timeoutPerRequest += add_delay_to_timeout + # here we try to take into account what timeout for execution + # N request - total_timeout should be in + # timeoutPerRequest < total_timeout < timeoutPerRequest * N + # we cannot just take (timeoutPerRequest * N) because it is so huge. + # (for timeoutPerRequest=5 and N=10, total_timeout=50sec) + # lets start with some simple formula: + total_timeout = (1 + len(requestIds) / 10) * timeoutPerRequest coros = [] for requestId in requestIds: @@ -123,9 +126,13 @@ def waitForSufficientRepliesForRequests(looper, requestId, fVal)) - looper.run(eventuallyAll(*coros, - retryWait=1, - totalTimeout=totalTimeout)) + chk_all_funcs(looper, coros, retry_wait=1, timeout=total_timeout, + override_eventually_timeout=override_timeout_limit) + + # looper.run(eventuallyAll(*coros, + # retryWait=1, + # totalTimeout=total_timeout, + # override_timeout_limit=override_timeout_limit)) def sendReqsToNodesAndVerifySuffReplies(looper: Looper, @@ -134,7 +141,9 @@ def sendReqsToNodesAndVerifySuffReplies(looper: Looper, numReqs: int, fVal: int=None, customTimeoutPerReq: float=None, - add_delay_to_timeout: float=0): + add_delay_to_timeout: float=0, + override_timeout_limit=False, + total_timeout=None): nodeCount = len(client.nodeReg) fVal = fVal or getMaxFailures(nodeCount) requests = sendRandomRequests(wallet, client, numReqs) @@ -142,7 +151,9 @@ def sendReqsToNodesAndVerifySuffReplies(looper: Looper, requests=requests, fVal=fVal, customTimeoutPerReq=customTimeoutPerReq, - add_delay_to_timeout=add_delay_to_timeout) + add_delay_to_timeout=add_delay_to_timeout, + override_timeout_limit=override_timeout_limit, + total_timeout=total_timeout) return requests @@ -683,4 +694,27 @@ def nodeByName(nodes, name): for node in nodes: if node.name == name: return node - raise Exception("Node with the name '{}' has not been found.".format(name)) \ No newline at end of file + raise Exception("Node with the name '{}' has not been found.".format(name)) + + +def chk_all_funcs(looper, funcs, acceptable_fails=0, retry_wait=None, + timeout=None, override_eventually_timeout=False): + # TODO: Move this logic to eventuallyAll + def chk(): + fails = 0 + for func in funcs: + try: + func() + except Exception: + fails += 1 + assert fails <= acceptable_fails + + kwargs = {} + if retry_wait: + kwargs['retryWait'] = retry_wait + if timeout: + kwargs['timeout'] = timeout + if override_eventually_timeout: + kwargs['override_timeout_limit'] = override_eventually_timeout + + looper.run(eventually(chk, **kwargs)) diff --git a/plenum/test/node_request/node_request_helper.py b/plenum/test/node_request/node_request_helper.py index 5919d9da74..bffdd3b369 100644 --- a/plenum/test/node_request/node_request_helper.py +++ b/plenum/test/node_request/node_request_helper.py @@ -4,10 +4,12 @@ from stp_core.loop.eventually import eventuallyAll from plenum.common.types import PrePrepare, OPERATION, f from plenum.common.constants import DOMAIN_LEDGER_ID +from plenum.common.types import OPERATION, f from plenum.common.util import getMaxFailures from plenum.server.node import Node from plenum.server.replica import Replica from plenum.test import waits +from plenum.test.helper import chk_all_funcs from plenum.test.spy_helpers import getAllArgs from plenum.test.test_node import TestNode, getNonPrimaryReplicas, \ getAllReplicas, getPrimaryReplica @@ -40,10 +42,8 @@ def g(node: TestNode): numOfMsgsWithFaults) timeout = waits.expectedPropagateTime(len(nodeSet)) - coros = [partial(g, node) for node in nodeSet] - looper.run(eventuallyAll(*coros, - totalTimeout=timeout, - acceptableFails=faultyNodes)) + funcs = [partial(g, node) for node in nodeSet] + chk_all_funcs(looper, funcs, faultyNodes, timeout) def checkPrePrepared(looper, @@ -167,9 +167,10 @@ def nonPrimaryReceivesCorrectNumberOfPREPREPAREs(): primarySentsCorrectNumberOfPREPREPAREs() nonPrimaryReceivesCorrectNumberOfPREPREPAREs() - coros = [partial(g, instId) for instId in instIds] + funcs = [partial(g, instId) for instId in instIds] # TODO Select or create the timeout from 'waits'. Don't use constant. - looper.run(eventuallyAll(*coros, retryWait=1, totalTimeout=timeout)) + # looper.run(eventuallyAll(*coros, retryWait=1, totalTimeout=timeout)) + chk_all_funcs(looper, funcs, faultyNodes, timeout) def checkPrepared(looper, nodeSet, preprepared1, instIds, faultyNodes=0, @@ -275,9 +276,10 @@ def nonPrimaryReplicasReceiveCorrectNumberOfPREPAREs(): primaryReceivesCorrectNumberOfPREPAREs() nonPrimaryReplicasReceiveCorrectNumberOfPREPAREs() - coros = [partial(g, instId) for instId in instIds] + funcs = [partial(g, instId) for instId in instIds] # TODO Select or create the timeout from 'waits'. Don't use constant. - looper.run(eventuallyAll(*coros, retryWait=1, totalTimeout=timeout)) + # looper.run(eventuallyAll(*coros, retryWait=1, totalTimeout=timeout)) + chk_all_funcs(looper, funcs, faultyNodes, timeout) def checkCommitted(looper, nodeSet, prepared1, instIds, faultyNodes=0): @@ -289,6 +291,8 @@ def g(instId): allReplicas = getAllReplicas(nodeSet, instId) primaryReplica = getPrimaryReplica(nodeSet, instId) + # Question: Why 2 checks are being made, one with the data structure + # and then the spylog def replicasSeesCorrectNumOfCOMMITs(): """ num of commit messages must be = n when zero fault; @@ -343,9 +347,10 @@ def replicasReceivesCorrectNumberOfCOMMITs(): replicasReceivesCorrectNumberOfCOMMITs() replicasSeesCorrectNumOfCOMMITs() - coros = [partial(g, instId) for instId in instIds] + funcs = [partial(g, instId) for instId in instIds] # TODO Select or create the timeout from 'waits'. Don't use constant. - looper.run(eventuallyAll(*coros, retryWait=1, totalTimeout=timeout)) + # looper.run(eventuallyAll(*coros, retryWait=1, totalTimeout=timeout)) + chk_all_funcs(looper, funcs, faultyNodes, timeout) def msgCountOK(nodesSize, diff --git a/plenum/test/node_request/test_commit/test_num_of_commit_with_f_plus_one_faults.py b/plenum/test/node_request/test_commit/test_num_of_commit_with_f_plus_one_faults.py index d2f6059a62..e0eba05d8a 100644 --- a/plenum/test/node_request/test_commit/test_num_of_commit_with_f_plus_one_faults.py +++ b/plenum/test/node_request/test_commit/test_num_of_commit_with_f_plus_one_faults.py @@ -38,8 +38,9 @@ def afterElection(setup, up): def testNumOfCommitMsgsWithFPlusOneFaults(afterElection, looper, nodeSet, prepared1, noRetryReq): with pytest.raises(AssertionError): + # To raise an error pass less than the actual number of faults checkCommitted(looper, nodeSet, prepared1, range(getNoInstances(len(nodeSet))), - faultyNodes) + faultyNodes-1) diff --git a/plenum/test/node_request/test_request_forwarding.py b/plenum/test/node_request/test_request_forwarding.py index 284f8440c2..37dd663cda 100644 --- a/plenum/test/node_request/test_request_forwarding.py +++ b/plenum/test/node_request/test_request_forwarding.py @@ -1,42 +1,45 @@ +import pytest + from plenum.common.constants import DOMAIN_LEDGER_ID from plenum.test import waits -from plenum.test.delayers import nom_delay +from plenum.test.delayers import nom_delay, delay_3pc_messages from plenum.test.helper import sendRandomRequests, \ - waitForSufficientRepliesForRequests, sendReqsToNodesAndVerifySuffReplies + waitForSufficientRepliesForRequests +from plenum.test.batching_3pc.conftest import tconf from plenum.test.pool_transactions.conftest import looper, clientAndWallet1, \ client1, wallet1, client1Connected -from plenum.test.batching_3pc.conftest import tconf from plenum.test.test_node import ensureElectionsDone from plenum.test.view_change.helper import ensure_view_change from stp_core.loop.eventually import eventually def test_all_replicas_hold_request_keys(looper, txnPoolNodeSet, client1, - wallet1, client1Connected, tconf): + wallet1, client1Connected, tconf): """ All replicas whether primary or non primary hold request keys of forwarded requests. Once requests are ordered, they request keys are removed from replica. """ - def chk(count, all_same=True): + delay_3pc_messages(txnPoolNodeSet, 0, 2) + delay_3pc_messages(txnPoolNodeSet, 1, 2) + + def chk(count): # All replicas have same amount of forwarded request keys and all keys # are finalised. for node in txnPoolNodeSet: for r in node.replicas: - if all_same or r.isPrimary is False: + if r.isPrimary is False: assert len(r.requestQueues[DOMAIN_LEDGER_ID]) == count for i in range(count): k = r.requestQueues[DOMAIN_LEDGER_ID][i] assert r.requests[k].finalised + elif r.isPrimary is True: + assert len(r.requestQueues[DOMAIN_LEDGER_ID]) == 0 - # Send less that batch number of request so batch is not immediately sent - # and primary can be checked - reqs = sendRandomRequests(wallet1, client1, tconf.Max3PCBatchSize-1) - # All replicas should have all request keys with them - looper.run(eventually(chk, tconf.Max3PCBatchSize-1)) + reqs = sendRandomRequests(wallet1, client1, tconf.Max3PCBatchSize - 1) # Only non primary replicas should have all request keys with them - looper.run(eventually(chk, tconf.Max3PCBatchSize - 1, False, - timeout=tconf.Max3PCBatchWait)) - waitForSufficientRepliesForRequests(looper, client1, requests=reqs) + looper.run(eventually(chk, tconf.Max3PCBatchSize - 1)) + waitForSufficientRepliesForRequests(looper, client1, requests=reqs, + add_delay_to_timeout=2) # Replicas should have no request keys with them since they are ordered looper.run(eventually(chk, 0)) # Need to wait since one node might not # have processed it. @@ -46,8 +49,14 @@ def chk(count, all_same=True): node.nodeIbStasher.delay(nom_delay(delay)) ensure_view_change(looper, txnPoolNodeSet, client1, wallet1) - reqs = sendRandomRequests(wallet1, client1, 2*tconf.Max3PCBatchSize) - looper.run(eventually(chk, 2*tconf.Max3PCBatchSize)) - ensureElectionsDone(looper, txnPoolNodeSet) - waitForSufficientRepliesForRequests(looper, client1, requests=reqs) - looper.run(eventually(chk, 0)) \ No newline at end of file + reqs = sendRandomRequests(wallet1, client1, 2 * tconf.Max3PCBatchSize) + looper.run(eventually(chk, 2 * tconf.Max3PCBatchSize)) + + # Since each nomination is delayed and there will be multiple nominations + # so adding some extra time + timeout = waits.expectedPoolElectionTimeout(len(txnPoolNodeSet)) + \ + len(txnPoolNodeSet)*delay + ensureElectionsDone(looper, txnPoolNodeSet, customTimeout=timeout) + waitForSufficientRepliesForRequests(looper, client1, requests=reqs, + add_delay_to_timeout=2) + looper.run(eventually(chk, 0)) diff --git a/plenum/test/primary_election/test_primary_election_case1.py b/plenum/test/primary_election/test_primary_election_case1.py index 7fd4118963..6e9322f90f 100644 --- a/plenum/test/primary_election/test_primary_election_case1.py +++ b/plenum/test/primary_election/test_primary_election_case1.py @@ -22,6 +22,7 @@ delayOfNomination = 5 + @pytest.fixture() def case1Setup(startedNodes: TestNodeSet): nodes = startedNodes @@ -71,10 +72,9 @@ def testPrimaryElectionCase1(case1Setup, looper, keySharedNodes): # Node B sends multiple NOMINATE messages for Node D but only after A has # nominated itself - timeout = waits.expectedPoolNominationTimeout(nodeCount=1) + timeout = waits.expectedPoolNominationTimeout(nodeCount=len(keySharedNodes)) looper.run(eventually(checkNomination, nodeA, nodeA.name, - retryWait=.25, - timeout=timeout)) + retryWait=.25, timeout=timeout)) instId = getSelfNominationByNode(nodeA) diff --git a/setup.py b/setup.py index ac79d9001a..a074e0c4e2 100644 --- a/setup.py +++ b/setup.py @@ -58,7 +58,7 @@ data_files=[( (BASE_DIR, ['data/pool_transactions_sandbox', ]) )], - install_requires=['ledger-dev==0.2.28', 'stp-dev==0.1.51', + install_requires=['ledger-dev==0.2.28', 'stp-dev==0.1.53', 'state-trie-dev==0.1.15', 'jsonpickle', 'prompt_toolkit==0.57', 'pygments', 'ioflo==1.5.4', 'semver', 'base58', 'orderedset',