Skip to content

Commit

Permalink
Merge pull request #78 from hyperledger/master
Browse files Browse the repository at this point in the history
Master
  • Loading branch information
spivachuk authored Jan 19, 2018
2 parents 8ee6cb2 + 59cf160 commit d922f5f
Show file tree
Hide file tree
Showing 21 changed files with 555 additions and 45 deletions.
37 changes: 37 additions & 0 deletions docs/catchup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Ledger catchup

A node uses a process called `catchup` to sync its ledgers with other nodes. It does this process after
- starting up: Any node communicates state of its ledgers to any other node it connects to and learns whether is ahead or behind or at same state as others
- after a view change: After a view change starts, nodes again communicate state of their ledgers to other nodes.
- if it realises that it has missed some transactions: Nodes periodically send checkpoints indicating how many transactions they have processed
recently, if a node finds out that is missed some txns, it will perform catchup.

The catchup is managed by a object called `LedgerManager`. The `LedgerManager` maintains a `LedgerInfo` object for each ledger and references each ledger by its unique id called `ledger_id`.
When a `ledger` is initialised, `addLedger` method of `LedgerManager` is called; this method registers the ledger with the `LedgerManager`.
`addLedger` also accepts callbacks which are called on occurence of different events, like before/after starting catchup for a ledger,
before/after marking catchup complete for a ledger, after adding any transaction that was received in catchup to the ledger.
The `LedgerManager` performs catchup of each ledger sequentially, which means unless catchup for one ledger is complete, catchup for other will not start.
This is mostly done for simplicity and can be optimised but the pool ledger needs to be caught up first as the pool ledger knows how many nodes are there
in the network. Catchup for any ledger involves these steps:
- Learn the correct state (how many txns, merkle roots) of the ledger by using `LedgerStatus` messages from other nodes.
- Once sufficient (`Quorums.ledger_status`) and consistent `LedgerStatus` messages are received so that the node knows the latest state of the ledger, if it finds it ledger to be
latest, it marks the ledger catchup complete otherwise wait for `ConsistencyProof` messages from other nodes until a timeout.
- When a node receives a `LedgerStatus` and it finds the sending node's ledger behind, it sends a `ConsistencyProof` message. This message is like a
merkle subset proof of the sending node's ledger and the receiving node's ledger, eg. if the sending node A's ledger has size 20 and merkle root `x` and
the receiving node B's size is 30 with merkle root `y`, B sends a proof to A that A's ledger with size 20 and root `x` is a subset of B's ledger with size
30 and root `y`.
- After receiving a `ConsistencyProof`, the node verifies it.
- Once the node that is catching up has sufficient (`Quorums.consistency_proof`) and consistent `ConsistencyProof` messages, it knows how many transactions it needs to catchup and
then requests those transactions from other nodes by equally distributing the load. eg if a node has to catchup 1000 txns and there are 5 other nodes in the
network, then the node will request 200 txns from each. The node catching up sends a `CatchupReq` message to other nodes and expects a corresponding `CatchupRep`

Apart from this if the node does not receive sufficient or consistent `ConsistencyProof`s under a timeout, it requests them using `request_CPs_if_needed`.
Similarly if the node does not receive sufficient or consistent `CatchupRep`s under a timeout, it requests them using `request_txns_if_needed`.
These timeouts can be configured using `ConsistencyProofsTimeout` and `CatchupTransactionsTimeout` respectively from the config file


Relevant code:
- LedgerManager: `plenum/common/ledger_manager.py`
- LedgerInfo: `plenum/common/ledger_info.py`
- Catchup message types: `plenum/common/messages/node_messages.py`
- Catchup quorum values: `plenum/server/quorums.py`
2 changes: 2 additions & 0 deletions docs/main.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Outline of the system
Node
57 changes: 57 additions & 0 deletions docs/request_handling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Request handling

To handle requests sent by client, the nodes require a ledger and/or a state as a data store. Request handling logic is written in `RequestHandler`.
`RequestHandler` is extended by new classes to support new requests. A node provides methods `register_new_ledger` to register new `Ledger` and `register_req_handler` to register new `RequestHandler`s.
There can be a callback registered to execute after a write request has been successfully executed using `register_executer`.


There are 2 types of requests a client can send:
- Query:
Here the client is requesting transactions or some state variables from the node(s). The client can either send a `GET_TXN` to get any transaction with a sequence number from any ledger.
Or it can send specific `GET_*` transactions for queries
- Write:
Here the client is asking the nodes to write a new transaction to the ledger and change some state variable. This requires the nodes to run a consensus protocol (currently RBFT).
If the protocol run is successful, then the client's proposed changes are written.


#### Request handling
Below is a description of how a request is processed.
A node on receiving a client request in `validateClientMsg`:
- The node performs static validation checks (validation that does not require any state, like mandatory fields are present, etc), it uses `ClientMessageValidator` and
`RequestHandler`'s `doStaticValidation` for this.
- If the static validation passes, it checks if the signature check is required (not required for queries) and does that if needed in `verifySignature`. More on this later.
- Checks if it's a generic transaction query (`GET_TXN`). If it is then query the ledger for that particular sequence number and return the result. A `REQNACK` might be sent if the query is not correctly constructed.
- Checks if it's a specific query (needs a specific `RequestHandler`), if it is then it uses `process_query` that uses the specific `RequestHandler` to respond to the query. A `REQNACK` might be sent if the query is not correctly constructed.
- If it is a write, then node checks if it has already processed the request before by checking the uniqueness of `identifier` and `reqId` fields of the Request.
- If it has already processed the request, then it sends the corresponding `Reply` to the client
- If the `Request` is already in process, then it sends an acknowledgement to the client as a `REQACK`
- If the node has not seen the `Request` before it broadcasts the `Request` to all nodes in a `PROPAGATE`.
- Once a node receives sufficient (`Quorums.propagate`) `PROPAGATE`s for a request, it forwards the request to its replicas.
- A primary replica on receiving a forwarded request does dynamic validation (requiring state, like if violating some unique constraint or doing an unauthorised action)
on the request using `doDynamicValidation` which in turn calls the corresponding `RequestHandler`'s `validate` method. If the validation succeeds then
`apply` method of `RequestHandler` is called which optimistically applies the changes to ledger and state.
If the validation fails, the primary sends a `REJECT` to the client. Then the primary sends a `PRE-PREPARE` to other nodes which contains the merkle roots and some other information for all such requests.
- The non-primary replicas on receiving the `PRE-PREPARE` performs the same dynamic validation that the primary performed on each request. It also checks if the merkle roots match.
If all checks pass the replicas send a `PREPARE` otherwise they reject the `PRE-PREPARE`
- Once the consensus protocol is successfully executed on the request, the replicas send `ORDERED` message to its node and the node updates the monitor.
- If the `ORDERED` message above was sent by the master replica then the node executes the request; meaning they commit any changes made to the ledger and state by that
request by calling the corresponding `RequestHandler`'s `commit` method and send a `Reply` back to client.
- The node also tracks the request's `identifier` and `reqId` in a key value database in `updateSeqNoMap`.


#### Signature verification
Each node has a `ReqAuthenticator` object which allows to register `ClientAuthNr` objects using `register_authenticator` method. During signature verification,
a node runs each registered authenticator over the request and if any authenticator results in an exception then signature verification is considered failed.
A node has atleast 1 authenticator called `CoreAuthNr` whose `authenticate` method is called over the serialised request data to verify signature.


Relevant code:
- Node: `plenum/server/node.py`
- Replica: `plenum/server/replica.py`
- Propagator: `plenum/server/propagator.py`
- Request: `plenum/common/request.py`
- Request structure validation: `plenum/common/messages/client_request.py`
- RequestHandler: `plenum/server/req_handler.py`
- Request Authenticator: `plenum/server/req_authenticator.py`
- Core Authenticator: `plenum/server/client_authn.py`
- Quorums: `plenum/server/quorums.py`
3 changes: 1 addition & 2 deletions docs/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ Each new transaction is added to the ledger (log) and is also hashed (sha256) an
results in a new merkle root. Thus for each transaction a merkle proof of presence, called `inclusion proof` or `audit_path` can be created by
using the root hash a few (`O(lgn)`, `n` being the total number of leaves/transactions in the tree/ledger) intermediate hashes.

Hashes of all the
leaves and intermediate nodes of the tree are stored in a `HashStore`, enabling the creating of inclusion proofs and subset proofs. A subset proof
Hashes of all the leaves and intermediate nodes of the tree are stored in a `HashStore`, enabling the creating of inclusion proofs and subset proofs. A subset proof
proves that a particular merkle tree is a subset of another (usually larger) merkle tree; more on this in the Catchup doc. The `HashStore` has 2 separate storages for storing leaves
and intermediate nodes, each leaf or node of the tree is 32 bytes. Each of these storages can be a binary file or a key value store.
The leaf or node hashes are queried by their number. When a client write request completes or it requests a transaction with a particular sequence number from a ledger,
Expand Down
5 changes: 5 additions & 0 deletions plenum/common/ledger_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ def set_defaults(self):
# process as completed
self.ledgerStatusOk = set()

# Key of the 3PC-batch ordered by the master instance that contained
# the last transaction of this node's ledger
self.last_txn_3PC_key = None

# Dictionary of consistency proofs received for the ledger
# in process of catching up
# Key is the node name and value is a consistency proof
Expand Down Expand Up @@ -79,6 +83,7 @@ def done_syncing(self):
self.canSync = False
self.state = LedgerState.synced
self.ledgerStatusOk = set()
self.last_txn_3PC_key = None
self.recvdConsistencyProofs = {}
self.receivedCatchUpReplies = []
self.recvdCatchupRepliesFrm = {}
Expand Down
25 changes: 14 additions & 11 deletions plenum/common/ledger_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,13 @@ def processLedgerStatus(self, status: LedgerStatus, frm: str):
# post sending this ledger status
ledgerInfo.recvdConsistencyProofs[frm] = None
ledgerInfo.ledgerStatusOk.add(frm)

if self.isLedgerSame(ledgerStatus) \
and ledgerStatus.viewNo is not None \
and ledgerStatus.ppSeqNo is not None:
ledgerInfo.last_txn_3PC_key = \
(ledgerStatus.viewNo, ledgerStatus.ppSeqNo)

if self.has_ledger_status_quorum(
len(ledgerInfo.ledgerStatusOk), self.owner.totalNodes):
logger.debug("{} found out from {} that its "
Expand All @@ -316,14 +323,10 @@ def processLedgerStatus(self, status: LedgerStatus, frm: str):
# If this node's ledger is same as the ledger status (which is
# also the majority of the pool), then set the last ordered
# 3PC key
key = (ledgerStatus.viewNo, ledgerStatus.ppSeqNo)
self.do_pre_catchup(ledgerId)
if self.isLedgerSame(ledgerStatus) and key != (None, None):
# Any state cleaup that is part of pre-catchup should be
# done
self.catchupCompleted(ledgerId, key)
else:
self.catchupCompleted(ledgerId)
# Any state cleanup that is part of pre-catchup should be
# done
self.catchupCompleted(ledgerId, ledgerInfo.last_txn_3PC_key)
else:
# Ledger was already synced
self.mark_ledger_synced(ledgerId)
Expand Down Expand Up @@ -844,10 +847,9 @@ def startCatchUpProcess(self, ledgerId: int, proof: ConsistencyProof):
.format(CATCH_UP_PREFIX, self, ledgerId))

def _getCatchupTimeout(self, numRequest, batchSize):
return numRequest * (self.config.CatchupTransactionsTimeout +
0.1 * batchSize)
return numRequest * self.config.CatchupTransactionsTimeout

def catchupCompleted(self, ledgerId: int, last_3PC: Tuple=(0, 0)):
def catchupCompleted(self, ledgerId: int, last_3PC: Optional[Tuple]=None):
if ledgerId not in self.ledgerRegistry:
logger.error("{}{} called catchup completed for ledger {}".
format(CATCH_UP_PREFIX, self, ledgerId))
Expand All @@ -856,7 +858,8 @@ def catchupCompleted(self, ledgerId: int, last_3PC: Tuple=(0, 0)):
# Since multiple ledger will be caught up and catchups might happen
# multiple times for a single ledger, the largest seen
# ppSeqNo needs to be known.
if compare_3PC_keys(self.last_caught_up_3PC, last_3PC) > 0:
if last_3PC is not None \
and compare_3PC_keys(self.last_caught_up_3PC, last_3PC) > 0:
self.last_caught_up_3PC = last_3PC

self.mark_ledger_synced(ledgerId)
Expand Down
6 changes: 5 additions & 1 deletion plenum/common/test_network_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from stp_core.common.util import adict


CLIENT_CONNECTIONS_LIMIT = 15360


class TestNetworkSetup:
@staticmethod
def getNumberFromName(name: str) -> int:
Expand All @@ -45,7 +48,8 @@ def writeNodeParamsFile(filePath, name, nPort, cPort):
contents = [
'NODE_NAME={}'.format(name),
'NODE_PORT={}'.format(nPort),
'NODE_CLIENT_PORT={}'.format(cPort)
'NODE_CLIENT_PORT={}'.format(cPort),
'CLIENT_CONNECTIONS_LIMIT={}'.format(CLIENT_CONNECTIONS_LIMIT)
]
with open(filePath, 'w') as f:
f.writelines(os.linesep.join(contents))
Expand Down
32 changes: 32 additions & 0 deletions plenum/common/txn_util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections import OrderedDict
import json

from ledger.genesis_txn.genesis_txn_file_util import create_genesis_txn_init_ledger
from plenum.common.constants import TXN_TIME, TXN_TYPE, TARGET_NYM, ROLE, \
Expand Down Expand Up @@ -107,3 +108,34 @@ def idr_from_req_data(data):
return data[f.IDENTIFIER.nm]
else:
return Request.gen_idr_from_sigs(data.get(f.SIGS.nm, {}))


def sdk_reqToTxn(sdk_req, cons_time=None):
"""
Transform a client request such that it can be stored in the ledger.
Also this is what will be returned to the client in the reply
:param sdk_req: sdk request in str or dict type
:param cons_time: UTC epoch at which consensus was reached
:return:
"""
# TODO: we should not reformat transaction this way
# When refactor keep in mind thought about back compatibility

if isinstance(sdk_req, dict):
data = sdk_req
elif isinstance(sdk_req, str):
data = json.loads(sdk_req)
else:
raise TypeError(
"Expected dict or str as input, but got: {}".format(type(sdk_req)))

res = {
f.IDENTIFIER.nm: data[f.IDENTIFIER.nm],
f.REQ_ID.nm: data[f.REQ_ID.nm],
f.SIG.nm: data.get(f.SIG.nm, None),
f.SIGS.nm: data.get(f.SIGS.nm, None),
TXN_TIME: cons_time or data.get(TXN_TIME)
}
res.update(data[OPERATION])
return res
7 changes: 6 additions & 1 deletion plenum/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@
ConsistencyProofsTimeout = 5

# Timeout factor after which a node starts requesting transactions
CatchupTransactionsTimeout = 5
# We assume, that making consistency proof + iterate over all transactions (getAllTxn)
# will take a little time (0.003 sec for making cp for 10 000 txns +
# 0.2 sec for getAllTxn for 10 000 txn)
# Therefore, node communication is the most cost operation
# Timeout for pool catchuping would be nodeCount * CatchupTransactionsTimeout
CatchupTransactionsTimeout = 6


# Log configuration
Expand Down
14 changes: 10 additions & 4 deletions plenum/server/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1957,17 +1957,18 @@ def processRequest(self, request: Request, frm: str):
ledgerId = self.ledger_id_for_request(request)
ledger = self.getLedger(ledgerId)

if self.is_query(request.operation[TXN_TYPE]):
self.process_query(request, frm)
return

reply = self.getReplyFromLedger(ledger, request)
if reply:
logger.debug("{} returning REPLY from already processed "
"REQUEST: {}".format(self, request))
self.transmitToClient(reply, frm)
return

if self.is_query(request.operation[TXN_TYPE]):
self.process_query(request, frm)
return

# If the node is not already processing the request
if not self.isProcessingReq(*request.key):
self.startedProcessingReq(*request.key, frm)
# If not already got the propagate request(PROPAGATE) for the
Expand Down Expand Up @@ -2252,6 +2253,11 @@ def propose_view_change(self):
'so view change will not be proposed'.format(self))
return

if not self.isReady():
logger.trace('{} The node is not ready yet '
'so view change will not be proposed'.format(self))
return

disconnected_time = time.perf_counter() - self.lost_primary_at
if disconnected_time >= self.config.ToleratePrimaryDisconnection:
logger.info("{} primary has been disconnected for too long"
Expand Down
2 changes: 1 addition & 1 deletion plenum/server/quorums.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, n):
self.same_consistency_proof = Quorum(f + 1)
self.consistency_proof = Quorum(f + 1)
self.ledger_status = Quorum(n - f - 1)
self.checkpoint = Quorum(2 * f)
self.checkpoint = Quorum(n - f - 1)
self.timestamp = Quorum(f + 1)
self.bls_signatures = Quorum(n - f)
self.observer_data = Quorum(f + 1)
Expand Down
3 changes: 1 addition & 2 deletions plenum/server/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -1770,8 +1770,7 @@ def markCheckPointStable(self, seqNo):

def checkIfCheckpointStable(self, key: Tuple[int, int]):
ckState = self.checkpoints[key]
# TODO: what if len(ckState.receivedDigests) > 2 * f?
if len(ckState.receivedDigests) == self.quorums.checkpoint.value:
if self.quorums.checkpoint.is_reached(len(ckState.receivedDigests)):
self.markCheckPointStable(ckState.seqNo)
return True
else:
Expand Down
2 changes: 1 addition & 1 deletion plenum/test/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ def checkViewNoForNodes(nodes: Iterable[TestNode], expectedViewNo: int = None):
assert len(viewNos) == 1
vNo, = viewNos
if expectedViewNo is not None:
assert vNo == expectedViewNo, ','.join(['{} -> Ratio: {}'.format(
assert vNo >= expectedViewNo, ','.join(['{} -> Ratio: {}'.format(
node.name, node.monitor.masterThroughputRatio()) for node in nodes])
return vNo

Expand Down
Loading

0 comments on commit d922f5f

Please sign in to comment.