Skip to content

Commit

Permalink
Use original batch parameters in all tests except the ones that use b…
Browse files Browse the repository at this point in the history
…atches and use alternate of eventuallyAll to check a collection of functions under a timeout (#190)

* updates to test helpers and change in forwarding requests logic

* overriding batch params in tests

* use correct timeouts in tests

* accounting for nomination delay
  • Loading branch information
andkononykhin authored and ashcherbakov committed Jun 2, 2017
1 parent c163dd7 commit 56181ff
Show file tree
Hide file tree
Showing 20 changed files with 188 additions and 134 deletions.
2 changes: 1 addition & 1 deletion plenum/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ def retryForExpected(self):
# even if pool is just busy and cannot answer quickly,
# that's why using maintainConnections instead
# self.nodestack.connect(name=remote.name)
self.nodestack.maintainConnections()
self.nodestack.maintainConnections(force=True)

if aliveRequests:
# Need a delay in case connection has to be established with some
Expand Down
22 changes: 9 additions & 13 deletions plenum/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,6 @@
# Expected time for one stack to get connected to another
ExpectedConnectTime = 3.3 if sys.platform == 'win32' else 2


# After ordering every `CHK_FREQ` requests, replica sends a CHECKPOINT
CHK_FREQ = 100000

# Difference between low water mark and high water mark
LOG_SIZE = 3*CHK_FREQ


# Since the ledger is stored in a flat file, this makes the ledger do
# an fsync on every write. Making it True can significantly slow
# down writes as shown in a test `test_file_store_perf.py` in the ledger
Expand All @@ -167,11 +159,8 @@
# Max batch size for 3 phase commit
Max3PCBatchSize = 100
# Max time to wait before creating a batch for 3 phase commit
Max3PCBatchWait = 1
Max3PCBatchWait = .001

# Maximum lifespan for a batch, this needs to be changed if
# `Max3PCBatchSize` is changed
ThreePCBatchTimeout = 25

# Each node keeps a map of PrePrepare sequence numbers and the corresponding
# txn seqnos that came out of it. Helps in servicing Consistency Proof Requests
Expand All @@ -186,7 +175,14 @@
MaxStateProofTime = 3


# After ordering every `CHK_FREQ` batches, replica sends a CHECKPOINT
CHK_FREQ = 10000

# Difference between low water mark and high water mark
LOG_SIZE = 3*CHK_FREQ


CLIENT_REQACK_TIMEOUT = 5
CLIENT_REPLY_TIMEOUT = Max3PCBatchWait + 10
CLIENT_REPLY_TIMEOUT = 15
CLIENT_MAX_RETRY_ACK = 5
CLIENT_MAX_RETRY_REPLY = 5
3 changes: 0 additions & 3 deletions plenum/server/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,6 @@ def __init__(self,
# help in voting for/against a view change.
self.lost_primary_at = None

# First view change message received for a view no
self.view_change_started_at = {}

tp = loadPlugins(self.basedirpath)
logger.debug("total plugins loaded in node: {}".format(tp))
# TODO: this is already happening in `start`, why here then?
Expand Down
5 changes: 1 addition & 4 deletions plenum/test/batching_3pc/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@
@pytest.fixture(scope="module")
def tconf(tconf, request):
oldSize = tconf.Max3PCBatchSize
oldTIme = tconf.Max3PCBatchWait
tconf.Max3PCBatchSize = 3
tconf.Max3PCBatchWait = 5
tconf.Max3PCBatchSize = 10

def reset():
tconf.Max3PCBatchSize = oldSize
tconf.Max3PCBatchWait = oldTIme

request.addfinalizer(reset)
return tconf
Expand Down
20 changes: 8 additions & 12 deletions plenum/test/batching_3pc/test_basic_batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ def test3PCOverBatchWithThresholdReqs(tconf, looper, txnPoolNodeSet, client,
:return:
"""
reqs = sendRandomRequests(wallet1, client, tconf.Max3PCBatchSize)
waitForSufficientRepliesForRequests(looper, client, requests=reqs,
customTimeoutPerReq=tconf.Max3PCBatchWait-1)
waitForSufficientRepliesForRequests(looper, client, requests=reqs)


def test3PCOverBatchWithLessThanThresholdReqs(tconf, looper, txnPoolNodeSet,
Expand All @@ -43,8 +42,7 @@ def test3PCOverBatchWithLessThanThresholdReqs(tconf, looper, txnPoolNodeSet,
:return:
"""
reqs = sendRandomRequests(wallet1, client, tconf.Max3PCBatchSize - 1)
waitForSufficientRepliesForRequests(looper, client, requests=reqs,
customTimeoutPerReq=tconf.Max3PCBatchWait + 1)
waitForSufficientRepliesForRequests(looper, client, requests=reqs)


def testTreeRootsCorrectAfterEachBatch(tconf, looper, txnPoolNodeSet,
Expand All @@ -56,14 +54,12 @@ def testTreeRootsCorrectAfterEachBatch(tconf, looper, txnPoolNodeSet,
"""
# Send 1 batch
reqs = sendRandomRequests(wallet1, client, tconf.Max3PCBatchSize)
waitForSufficientRepliesForRequests(looper, client, requests=reqs,
customTimeoutPerReq=tconf.Max3PCBatchWait)
waitForSufficientRepliesForRequests(looper, client, requests=reqs)
checkNodesHaveSameRoots(txnPoolNodeSet)

# Send 2 batches
reqs = sendRandomRequests(wallet1, client, 2 * tconf.Max3PCBatchSize)
waitForSufficientRepliesForRequests(looper, client, requests=reqs,
customTimeoutPerReq=2*tconf.Max3PCBatchWait)
waitForSufficientRepliesForRequests(looper, client, requests=reqs)
checkNodesHaveSameRoots(txnPoolNodeSet)


Expand All @@ -90,11 +86,11 @@ def rejectingMethod(self, req):
node.doDynamicValidation = types.MethodType(rejectingMethod, node)

reqs = sendRandomRequests(wallet1, client, tconf.Max3PCBatchSize)
waitForSufficientRepliesForRequests(looper, client, requests=reqs[:-1],
customTimeoutPerReq=tconf.Max3PCBatchWait)
waitForSufficientRepliesForRequests(looper, client, requests=reqs[:-1])

with pytest.raises(AssertionError):
waitForSufficientRepliesForRequests(looper, client, requests=reqs[-1:],
customTimeoutPerReq=tconf.Max3PCBatchWait)
waitForSufficientRepliesForRequests(looper, client, requests=reqs[-1:])

for node in txnPoolNodeSet:
looper.run(eventually(checkRejectWithReason, client,
'Simulated rejection', node.clientstack.name,
Expand Down
3 changes: 1 addition & 2 deletions plenum/test/batching_3pc/test_batching_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ def specificPrePrepares(wrappedMsg):
reqs = sendRandomRequests(wallet1, client,
(ppsToDelay+1)*tconf.Max3PCBatchSize)

waitForSufficientRepliesForRequests(looper, client, requests=reqs,
customTimeoutPerReq=(ppsToDelay + 1) * tconf.Max3PCBatchWait)
waitForSufficientRepliesForRequests(looper, client, requests=reqs)
checkNodesHaveSameRoots(txnPoolNodeSet)

for r in otherR:
Expand Down
17 changes: 13 additions & 4 deletions plenum/test/checkpoints/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@

from plenum.test.pool_transactions.conftest import looper, clientAndWallet1, \
client1, wallet1, client1Connected

CHK_FREQ = 5
from plenum.test.batching_3pc.conftest import tconf


@pytest.fixture(scope="module")
def chkFreqPatched(tconf, request):
oldChkFreq = tconf.CHK_FREQ
oldLogSize = tconf.LOG_SIZE

tconf.CHK_FREQ = CHK_FREQ
tconf.LOG_SIZE = 3*tconf.CHK_FREQ
tconf.CHK_FREQ = 2
tconf.LOG_SIZE = 2*tconf.CHK_FREQ

def reset():
tconf.CHK_FREQ = oldChkFreq
Expand All @@ -21,3 +20,13 @@ def reset():
request.addfinalizer(reset)

return tconf


@pytest.fixture(scope="module")
def reqs_for_checkpoint(chkFreqPatched):
return chkFreqPatched.CHK_FREQ * chkFreqPatched.Max3PCBatchSize


@pytest.fixture(scope="module")
def reqs_for_logsize(chkFreqPatched):
return chkFreqPatched.LOG_SIZE * chkFreqPatched.Max3PCBatchSize
15 changes: 9 additions & 6 deletions plenum/test/checkpoints/test_basic_checkpointing.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,38 @@
import pytest

from stp_core.loop.eventually import eventually
from plenum.test import waits
from plenum.test.checkpoints.conftest import CHK_FREQ
from plenum.test.checkpoints.helper import chkChkpoints
from plenum.test.helper import sendReqsToNodesAndVerifySuffReplies


def testCheckpointCreated(chkFreqPatched, looper, txnPoolNodeSet, client1,
wallet1, client1Connected):
wallet1, client1Connected, reqs_for_checkpoint):
"""
After requests less than `CHK_FREQ`, there should be one checkpoint
on each replica. After `CHK_FREQ`, one checkpoint should become stable
"""
sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, CHK_FREQ-1, 1)
# Send one batch less so checkpoint is not created
sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1,
reqs_for_checkpoint-(chkFreqPatched.Max3PCBatchSize), 1)
# Deliberately waiting so as to verify that not more than 1 checkpoint is
# created
looper.runFor(2)
chkChkpoints(txnPoolNodeSet, 1)

sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, 1, 1)
sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, chkFreqPatched.Max3PCBatchSize, 1)

timeout = waits.expectedTransactionExecutionTime(len(txnPoolNodeSet))
looper.run(eventually(chkChkpoints, txnPoolNodeSet, 1, 0, retryWait=1, timeout=timeout))


def testOldCheckpointDeleted(chkFreqPatched, looper, txnPoolNodeSet, client1,
wallet1, client1Connected):
wallet1, client1Connected, reqs_for_checkpoint):
"""
Send requests more than twice of `CHK_FREQ`, there should be one new stable
checkpoint on each replica. The old stable checkpoint should be removed
"""
sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, 2*CHK_FREQ,
sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, 2*reqs_for_checkpoint,
1)

sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, 1, 1)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from stp_core.loop.eventually import eventually
from plenum.common.types import Checkpoint
from plenum.test.checkpoints.conftest import CHK_FREQ
from plenum.test.checkpoints.helper import chkChkpoints
from plenum.test.helper import sendReqsToNodesAndVerifySuffReplies, \
checkDiscardMsg


def testDiscardCheckpointMsgForStableCheckpoint(chkFreqPatched, looper,
txnPoolNodeSet, client1,
wallet1, client1Connected):
sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, CHK_FREQ, 1)
wallet1, client1Connected,
reqs_for_checkpoint):
sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, reqs_for_checkpoint, 1)
looper.run(eventually(chkChkpoints, txnPoolNodeSet, 1, 0, retryWait=1))
node1 = txnPoolNodeSet[0]
rep1 = node1.replicas[0]
Expand Down
7 changes: 3 additions & 4 deletions plenum/test/checkpoints/test_message_outside_watermark.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from plenum.test import waits
from plenum.test.checkpoints.conftest import CHK_FREQ
from plenum.test.checkpoints.helper import chkChkpoints
from plenum.test.delayers import ppDelay
from plenum.test.helper import sendReqsToNodesAndVerifySuffReplies, \
countDiscarded
Expand All @@ -11,7 +9,8 @@

def testNonPrimaryRecvs3PhaseMessageOutsideWatermarks(chkFreqPatched, looper,
txnPoolNodeSet, client1,
wallet1, client1Connected):
wallet1, client1Connected,
reqs_for_logsize):
"""
A node is slow in processing PRE-PREPAREs such that lot of requests happen
and the slow node has started getting 3 phase messages outside of it
Expand All @@ -22,7 +21,7 @@ def testNonPrimaryRecvs3PhaseMessageOutsideWatermarks(chkFreqPatched, looper,
"""
delay = 15
instId = 1
reqsToSend = chkFreqPatched.LOG_SIZE + 2
reqsToSend = reqs_for_logsize + 2
npr = getNonPrimaryReplicas(txnPoolNodeSet, instId)
slowReplica = npr[0]
slowNode = slowReplica.node
Expand Down
20 changes: 14 additions & 6 deletions plenum/test/checkpoints/test_message_outside_watermark1.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import math

from stp_core.loop.eventually import eventually

from plenum.test import waits
Expand All @@ -11,17 +13,18 @@

def testPrimaryRecvs3PhaseMessageOutsideWatermarks(tconf, chkFreqPatched, looper,
txnPoolNodeSet, client1,
wallet1, client1Connected):
wallet1, client1Connected,
reqs_for_logsize):
"""
One of the primary starts getting lot of requests, more than his log size
and queues up requests since they will go beyond its watermarks. This
happens since other nodes are slow in processing its PRE-PREPARE.
Eventually this primary will send PRE-PREPARE for all requests and those
requests will complete
"""
delay = 5
delay = 3
instId = 1
reqsToSend = 2*chkFreqPatched.LOG_SIZE + 1
reqsToSend = 2*reqs_for_logsize + 1
npr = getNonPrimaryReplicas(txnPoolNodeSet, instId)
pr = getPrimaryReplica(txnPoolNodeSet, instId)
from plenum.server.replica import TPCStat
Expand All @@ -30,9 +33,14 @@ def testPrimaryRecvs3PhaseMessageOutsideWatermarks(tconf, chkFreqPatched, looper
for r in npr:
r.node.nodeIbStasher.delay(ppDelay(delay, instId))

tm_exec_1_batch = waits.expectedTransactionExecutionTime(len(txnPoolNodeSet))
batch_count = math.ceil(reqsToSend / tconf.Max3PCBatchSize)
total_timeout = (tm_exec_1_batch + delay) * batch_count

def chk():
assert orderedCount + reqsToSend == pr.stats.get(TPCStat.OrderSent)
assert orderedCount + batch_count == pr.stats.get(TPCStat.OrderSent)

print('Sending {} requests'.format(reqsToSend))
sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, reqsToSend, 1)
sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, reqsToSend,
1, override_timeout_limit=True,
total_timeout=total_timeout)
looper.run(eventually(chk, retryWait=1, timeout=3))
25 changes: 13 additions & 12 deletions plenum/test/checkpoints/test_stable_checkpoint.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,37 @@
from stp_core.loop.eventually import eventually
from plenum.test import waits
from plenum.test.checkpoints.conftest import CHK_FREQ
from plenum.test.checkpoints.helper import chkChkpoints
from plenum.test.helper import sendReqsToNodesAndVerifySuffReplies


def checkRequestCounts(nodes, count):
def checkRequestCounts(nodes, req_count, cons_count):
for node in nodes:
assert len(node.requests) == count
assert len(node.requests) == req_count
for r in node.replicas:
assert len(r.commits) == count
assert len(r.prepares) == count
assert len(r.commits) == cons_count
assert len(r.prepares) == cons_count


def testRequestOlderThanStableCheckpointRemoved(chkFreqPatched, looper,
txnPoolNodeSet, client1,
wallet1, client1Connected):
wallet1, client1Connected,
reqs_for_checkpoint):
reqs = sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1,
CHK_FREQ-1, 1)
reqs_for_checkpoint - (chkFreqPatched.Max3PCBatchSize), 1)
timeout = waits.expectedTransactionExecutionTime(len(txnPoolNodeSet))
looper.run(eventually(chkChkpoints, txnPoolNodeSet, 1, retryWait=1,
timeout=timeout))
checkRequestCounts(txnPoolNodeSet, len(reqs))
sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, 1, 1)
checkRequestCounts(txnPoolNodeSet, len(reqs), chkFreqPatched.CHK_FREQ-1)
sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1,
chkFreqPatched.Max3PCBatchSize, 1)

looper.run(eventually(chkChkpoints, txnPoolNodeSet, 1, 0, retryWait=1,
timeout=timeout))
checkRequestCounts(txnPoolNodeSet, 0)
checkRequestCounts(txnPoolNodeSet, 0, 0)

sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1,
3*CHK_FREQ + 1, 1)
reqs_for_checkpoint + 1, 1)

looper.run(eventually(chkChkpoints, txnPoolNodeSet, 2, 0, retryWait=1,
timeout=timeout))
checkRequestCounts(txnPoolNodeSet, 1)
checkRequestCounts(txnPoolNodeSet, 1, 1)
6 changes: 3 additions & 3 deletions plenum/test/checkpoints/test_stable_checkpoint1.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from stp_core.loop.eventually import eventually

from plenum.test import waits
from plenum.test.checkpoints.conftest import CHK_FREQ
from plenum.test.checkpoints.helper import chkChkpoints
from plenum.test.delayers import ppDelay
from plenum.test.helper import sendReqsToNodesAndVerifySuffReplies
Expand All @@ -10,15 +9,16 @@

def testStableCheckpointWhenOneInstanceSlow(chkFreqPatched, looper,
txnPoolNodeSet, client1,
wallet1, client1Connected):
wallet1, client1Connected,
reqs_for_checkpoint):
delay = 5
pr = getPrimaryReplica(txnPoolNodeSet, 1)
slowNode = pr.node
otherNodes = [n for n in txnPoolNodeSet if n != slowNode]
for n in otherNodes:
n.nodeIbStasher.delay(ppDelay(delay, 1))

sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, CHK_FREQ, 1)
sendReqsToNodesAndVerifySuffReplies(looper, wallet1, client1, reqs_for_checkpoint, 1)
timeout = waits.expectedTransactionExecutionTime(len(txnPoolNodeSet)) + delay
looper.run(eventually(chkChkpoints, txnPoolNodeSet, 1, 0, retryWait=1,
timeout=timeout))
Loading

0 comments on commit 56181ff

Please sign in to comment.