Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Master #28

Merged
merged 9 commits into from
May 12, 2017
8 changes: 4 additions & 4 deletions plenum/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def submitReqs(self, *reqs: Request) -> List[Request]:
for request in reqs:
if self.mode == Mode.discovered and self.hasSufficientConnections:
logger.debug('Client {} sending request {}'.format(self, request))
self.nodestack.send(request)
self.send(request)
self.expectingFor(request)
else:
logger.debug("{} pending request since in mode {} and "
Expand Down Expand Up @@ -470,7 +470,7 @@ def flushMsgsPendingConnection(self):
.format(queueSize))
while self.reqsPendingConnection:
req, signer = self.reqsPendingConnection.popleft()
self.nodestack.send(req, signer=signer)
self.send(req, signer=signer)

def expectingFor(self, request: Request, nodes: Optional[Set[str]]=None):
nodes = nodes or {r.name for r in self.nodestack.remotes.values()
Expand Down Expand Up @@ -587,14 +587,14 @@ def resendRequests(self, keys):
def sendLedgerStatus(self, nodeName: str):
ledgerStatus = LedgerStatus(0, self.ledger.size, self.ledger.root_hash)
rid = self.nodestack.getRemote(nodeName).uid
self.nodestack.send(ledgerStatus, rid)
self.send(ledgerStatus, rid)

def send(self, msg: Any, *rids: Iterable[int], signer: Signer = None):
self.nodestack.send(msg, *rids, signer=signer)

def sendToNodes(self, msg: Any, names: Iterable[str]):
rids = [rid for rid, r in self.nodestack.remotes.items() if r.name in names]
self.nodestack.send(msg, *rids)
self.send(msg, *rids)

@staticmethod
def verifyMerkleProof(*replies: Tuple[Reply]) -> bool:
Expand Down
1 change: 0 additions & 1 deletion plenum/common/ledger_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,6 @@ def hasValidCatchupReplies(self, ledgerType, ledger, seqNo, catchUpReplies):
# proof, by inserting transactions. Duplicating a merkle tree is not
# expensive since we are using a compact merkle tree.
tempTree = copy(ledger.tree)

# Get the batch of transactions in the catchup reply which has sequence
# number `seqNo`
nodeName, catchupReply = self._getCatchupReplyForSeqNo(ledgerType,
Expand Down
7 changes: 4 additions & 3 deletions plenum/common/script_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from plenum.common.roles import Roles
from plenum.common.signer_simple import SimpleSigner
from plenum.common.constants import TXN_TYPE, TARGET_NYM, DATA, NODE_IP, \
NODE_PORT, CLIENT_IP, CLIENT_PORT, ALIAS, NODE, CLIENT_STACK_SUFFIX
NODE_PORT, CLIENT_IP, CLIENT_PORT, ALIAS, NODE, CLIENT_STACK_SUFFIX, SERVICES, VALIDATOR
from plenum.test import waits
from plenum.test.test_node import getAllReplicas

Expand Down Expand Up @@ -228,7 +228,8 @@ def submitNodeIpChange(client, stewardWallet, name: str, nym: str,
NODE_PORT: int(nodePort),
CLIENT_IP: clientIp,
CLIENT_PORT: int(clientPort),
ALIAS: name
ALIAS: name,
SERVICES: [VALIDATOR],
}
}
signedOp = stewardWallet.signOp(txn, stewardWallet.defaultId)
Expand All @@ -255,7 +256,7 @@ def changeHA(looper, config, nodeName, nodeSeed, newNodeHA,
client = Client(stewardName,
ha=('0.0.0.0', randomClientPort), config=config)
looper.add(client)
timeout = waits.expectedClientConnectionTimeout(3)
timeout = waits.expectedClientToPoolConnectionTimeout(4)
looper.run(eventually(__checkClientConnected, client,
retryWait=1, timeout=timeout))

Expand Down
73 changes: 47 additions & 26 deletions plenum/common/test_network_setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import ipaddress
import os
from collections import namedtuple

Expand Down Expand Up @@ -159,9 +160,10 @@ def bootstrapTestNodes(cls, config, startingPort, nodeParamsFileName, domainTxnF
parser = argparse.ArgumentParser(
description="Generate pool transactions for testing")

parser.add_argument('--nodes', required=True, type=int,
help='node count, '
'should be less than 100')
parser.add_argument('--nodes', required=True,
help='node count should be less than 100',
type=cls._bootstrapArgsTypeNodeCount,
)
parser.add_argument('--clients', required=True, type=int,
help='client count')
parser.add_argument('--nodeNum', type=int,
Expand All @@ -173,11 +175,12 @@ def bootstrapTestNodes(cls, config, startingPort, nodeParamsFileName, domainTxnF
'number of nodes then the '
'remaining nodes are assigned the loopback '
'IP, i.e 127.0.0.1',
type=str)
type=cls._bootstrapArgsTypeIps)

parser.add_argument('--envName',
help='Environment name (test or live)',
type=str,
choices=('test', 'live'),
default="test",
required=False)

Expand All @@ -187,30 +190,49 @@ def bootstrapTestNodes(cls, config, startingPort, nodeParamsFileName, domainTxnF
action='store_true')

args = parser.parse_args()
if args.nodes > 100:
print("Cannot run {} nodes for testing purposes as of now. "
"This is not a problem with the protocol but some placeholder"
" rules we put in place which will be replaced by our "
"Governance model. Going to run only 100".format(args.nodes))
nodeCount = 100
else:
nodeCount = args.nodes
clientCount = args.clients
nodeNum = args.nodeNum
ips = args.ips
envName = args.envName
appendToLedgers = args.appendToLedgers
if nodeNum:
assert nodeNum <= nodeCount, "nodeNum should be less than equal " \
"to nodeCount"

steward_defs, node_defs = cls.gen_defs(ips, nodeCount, startingPort)
client_defs = cls.gen_client_defs(clientCount)

if args.nodeNum:
assert 0 <= args.nodeNum <= args.nodes, \
"nodeNum should be less ore equal to nodeCount"

steward_defs, node_defs = cls.gen_defs(args.ips, args.nodes, startingPort)
client_defs = cls.gen_client_defs(args.clients)
trustee_def = cls.gen_trustee_def(1)
cls.bootstrapTestNodesCore(config, envName, appendToLedgers,
cls.bootstrapTestNodesCore(config, args.envName, args.appendToLedgers,
domainTxnFieldOrder, trustee_def,
steward_defs, node_defs, client_defs,
nodeNum, nodeParamsFileName)
args.nodeNum, nodeParamsFileName)

@staticmethod
def _bootstrapArgsTypeNodeCount(nodesStrArg):
if not nodesStrArg.isdigit():
raise argparse.ArgumentTypeError('should be a number')
n = int(nodesStrArg)
if n > 100:
raise argparse.ArgumentTypeError(
"Cannot run {} nodes for testing purposes as of now. "
"This is not a problem with the protocol but some placeholder "
"rules we put in place which will be replaced by our "
"Governance model. Going to run only 100".format(n)
)
if n <= 0:
raise argparse.ArgumentTypeError('should be > 0')
return n

@staticmethod
def _bootstrapArgsTypeIps(ipsStrArg):
ips = []
for ip in ipsStrArg.split(','):
ip = ip.strip()
try:
ipaddress.ip_address(ip)
except ValueError:
raise argparse.ArgumentTypeError(
"'{}' is an invalid IP address".format(ip)
)
else:
ips.append(ip)
return ips

@classmethod
def gen_defs(cls, ips, nodeCount, starting_port):
Expand All @@ -224,7 +246,6 @@ def gen_defs(cls, ips, nodeCount, starting_port):
if not ips:
ips = ['127.0.0.1'] * nodeCount
else:
ips = ips.split(",")
if len(ips) != nodeCount:
if len(ips) > nodeCount:
ips = ips[:nodeCount]
Expand Down
20 changes: 15 additions & 5 deletions plenum/server/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1299,11 +1299,14 @@ def postTxnFromCatchupAddedToLedger(self, ledgerType: int, txn: Any):
if ledgerType == 0:
self.poolManager.onPoolMembershipChange(txn)
if ledgerType == 1:
if txn.get(TXN_TYPE) == NYM:
self.addNewRole(txn)
self.post_txn_from_catchup_added_to_domain_ledger(txn)
self.reqsFromCatchupReplies.add((txn.get(f.IDENTIFIER.nm),
txn.get(f.REQ_ID.nm)))

def post_txn_from_catchup_added_to_domain_ledger(self, txn):
if txn.get(TXN_TYPE) == NYM:
self.addNewRole(txn)

def sendPoolLedgerStatus(self, nodeName):
self.sendLedgerStatus(nodeName, 0)

Expand Down Expand Up @@ -1373,7 +1376,7 @@ def processRequest(self, request: Request, frm: str):
self.transmitToClient(RequestAck(*request.key), frm)

# noinspection PyUnusedLocal
async def processPropagate(self, msg: Propagate, frm):
def processPropagate(self, msg: Propagate, frm):
"""
Process one propagateRequest sent to this node asynchronously

Expand Down Expand Up @@ -1603,6 +1606,12 @@ def startViewChangeIfPrimaryWentOffline(self, nodesGoingDown):
:param nodesGoingDown: the nodes which have gone down
:return: whether view change started
"""

if self.name in nodesGoingDown:
# Node which is going down should not
# participate in a view change
return

for node in nodesGoingDown:
for instId, replica in enumerate(self.replicas):
leftOne = '{}:{}'.format(node, instId)
Expand Down Expand Up @@ -1667,8 +1676,9 @@ def isSignatureVerificationNeeded(self, msg: Any):

def checkValidOperation(self, clientId, reqId, operation):
if operation.get(TXN_TYPE) in POOL_TXN_TYPES:
if not self.poolManager.checkValidOperation(operation):
raise InvalidClientRequest(clientId, reqId)
error = self.poolManager.checkValidOperation(operation)
if error is not None:
raise InvalidClientRequest(clientId, reqId, error)

if self.opVerifiers:
try:
Expand Down
64 changes: 52 additions & 12 deletions plenum/server/pool_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import ipaddress

import base58
from typing import Dict, Tuple
from functools import lru_cache

Expand All @@ -6,7 +9,7 @@
from plenum.common.txn_util import updateGenesisPoolTxnFile

from plenum.common.exceptions import UnsupportedOperation, \
UnauthorizedClientRequest
UnauthorizedClientRequest, InvalidClientRequest

from plenum.common.stack_manager import TxnStackManager
from stp_core.network.auth_mode import AuthMode
Expand Down Expand Up @@ -68,6 +71,13 @@ def __init__(self, node, ha=None, cliname=None, cliha=None):
self.nstack, self.cstack, self.nodeReg, self.cliNodeReg = \
self.getStackParamsAndNodeReg(self.name, self.basedirpath, ha=ha,
cliname=cliname, cliha=cliha)
self._dataFieldsValidators = (
(NODE_IP, self._isIpAddressValid),
(CLIENT_IP, self._isIpAddressValid),
(NODE_PORT, self._isPortValid),
(CLIENT_PORT, self._isPortValid),
)


@property
def hasLedger(self):
Expand Down Expand Up @@ -252,10 +262,16 @@ def getNodeName(self, nym):
return nodeTxn[DATA][ALIAS]

def checkValidOperation(self, operation):
checks = []
# data exists and is dict
if operation[TXN_TYPE] == NODE:
checks.append(DATA in operation and isinstance(operation[DATA], dict))
return all(checks)
if DATA not in operation:
return "'{}' is missed".format(DATA)
if not isinstance(operation[DATA], dict):
return "'{}' is not a dict".format(DATA)

# VerKey must be base58
if len(set(operation[TARGET_NYM]) - set(base58.alphabet)) != 0:
return "'{}' is not a base58 string".format(TARGET_NYM)

def checkRequestAuthorized(self, request):
typ = request.operation.get(TXN_TYPE)
Expand Down Expand Up @@ -301,12 +317,33 @@ def isStewardOfNode(self, stewardNym, nodeNym):
return True
return False

@staticmethod
def _validateNodeData(data):
if data.get(NODE_IP, "nodeip") == data.get(CLIENT_IP, "clientip") and \
data.get(NODE_PORT, "nodeport") == data.get(CLIENT_PORT, "clientport"):
def _validateNodeData(self, data):
# 'data' contains all required fields
for fn in (NODE_IP, CLIENT_IP, NODE_PORT, CLIENT_PORT, SERVICES):
if fn not in data:
return "field '{}' is missed".format(fn)

if data[NODE_IP] == data[CLIENT_IP] and data[NODE_PORT] == data[CLIENT_PORT]:
return "node and client ha can't be same"

# check a content of the fields
for fn, validator in self._dataFieldsValidators:
if not validator(data[fn]):
return "'{}' ('{}') is invalid".format(fn, data[fn])

@staticmethod
def _isIpAddressValid(ipAddress):
try:
ipaddress.ip_address(ipAddress)
except ValueError:
return False
else:
return ipAddress != '0.0.0.0'

@staticmethod
def _isPortValid(port):
return isinstance(port, int) and 0 < port <= 65535

def authErrorWhileUpdatingNode(self, request):
origin = request.identifier
operation = request.operation
Expand All @@ -320,10 +357,13 @@ def authErrorWhileUpdatingNode(self, request):
nodeNym = operation.get(TARGET_NYM)
if not self.isStewardOfNode(origin, nodeNym):
return "{} is not a steward of node {}".format(origin, nodeNym)
for txn in self.ledger.getAllTxn().values():
if txn[TXN_TYPE] == NODE and nodeNym == txn[TARGET_NYM]:
if txn[DATA] == operation.get(DATA, {}):
return "node already has the same data as requested"

previousNodeTxns = [txn for txn in self.ledger.getAllTxn().values()
if txn[TXN_TYPE] == NODE and nodeNym == txn[TARGET_NYM]]
# check only the last node txn
lastNodeTxnData = previousNodeTxns[-1].get(DATA, {}) if previousNodeTxns else None
if lastNodeTxnData is not None and lastNodeTxnData == operation.get(DATA, {}):
return "node already has the same data as requested"
if self.isNodeDataConflicting(data, nodeNym):
return "existing data has conflicts with " \
"request data {}".format(operation.get(DATA))
Expand Down
39 changes: 36 additions & 3 deletions plenum/test/client/test_client_sends_to_f_plus_one_nodes.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,47 @@
from plenum.test.client.conftest import passThroughReqAcked1

from plenum.client.client import Client
from plenum.test.helper import sendReqsToNodesAndVerifySuffReplies
from plenum.test.malicious_behaviors_client import \
genDoesntSendRequestToSomeNodes
from plenum.test.node_catchup.helper import waitNodeLedgersEquality


nodeCount = 4
clientFault = genDoesntSendRequestToSomeNodes("AlphaC")
reqAcked1 = passThroughReqAcked1


def testReplyWhenRequestSentToMoreThanFPlusOneNodes(looper, nodeSet,
fClient: Client, replied1):
pass
fClient, replied1,
wallet1):
"""
Alpha would not be sent request but other nodes will be, so Alpha will
just rely on propagates from other nodes
"""
alpha = nodeSet.Alpha
other_nodes = [n for n in nodeSet if n != alpha]

def chk(req_count=1):
for node in nodeSet:
prc_req = node.processRequest.__name__
prc_ppg = node.processPropagate.__name__
if node != alpha:
# All nodes except alpha will receive requests from client
assert node.spylog.count(prc_req) == req_count
else:
# Alpha will not receive requests from client
assert node.spylog.count(prc_req) == 0

# All nodes will get propagates from others
assert node.spylog.count(prc_ppg) == req_count*(nodeCount - 1)

# Ledger is same for all nodes
waitNodeLedgersEquality(looper, alpha, *other_nodes)
chk(1)

more_reqs_count = 5
sendReqsToNodesAndVerifySuffReplies(looper, wallet1, fClient,
more_reqs_count, 1)
# Ledger is same for all nodes
waitNodeLedgersEquality(looper, alpha, *other_nodes)
chk(6) # Since one request is already sent as part of `replied1`
Loading