Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Master #30

Merged
merged 5 commits into from
May 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions plenum/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing import List, Union, Dict, Optional, Tuple, Set, Any, \
Iterable

from plenum.common.ledger import Ledger
from plenum.common.stacks import nodeStackClass
from stp_core.crypto.nacl_wrappers import Signer
from stp_core.network.auth_mode import AuthMode
Expand All @@ -31,10 +32,10 @@
from plenum.common.motor import Motor
from plenum.common.plugin_helper import loadPlugins
from plenum.common.request import Request
from plenum.common.startable import Status, LedgerState, Mode
from plenum.common.startable import Status, Mode
from plenum.common.constants import REPLY, POOL_LEDGER_TXNS, \
LEDGER_STATUS, CONSISTENCY_PROOF, CATCHUP_REP, REQACK, REQNACK, REJECT, OP_FIELD_NAME, \
POOL_LEDGER_ID, TXN_TIME
POOL_LEDGER_ID, TXN_TIME, LedgerState
from plenum.common.txn_util import getTxnOrderedFields
from plenum.common.types import Reply, f, LedgerStatus, TaggedTuples
from plenum.common.util import getMaxFailures, checkIfMoreThanFSameItems, rawToFriendly
Expand Down Expand Up @@ -106,7 +107,7 @@ def __init__(self,
self.mode = None
HasPoolManager.__init__(self)
self.ledgerManager = LedgerManager(self, ownedByNode=False)
self.ledgerManager.addLedger(0, self.ledger,
self.ledgerManager.addLedger(POOL_LEDGER_ID, self.ledger,
postCatchupCompleteClbk=self.postPoolLedgerCaughtUp,
postTxnAddedToLedgerClbk=self.postTxnFromCatchupAddedToLedger)
else:
Expand Down Expand Up @@ -624,13 +625,12 @@ def verifyMerkleProof(*replies: Tuple[Reply]) -> bool:
ignored = {F.auditPath.name, F.seqNo.name, F.rootHash.name, TXN_TIME}
for r in replies:
seqNo = r[f.RESULT.nm][F.seqNo.name]
rootHash = base64.b64decode(
r[f.RESULT.nm][F.rootHash.name].encode())
auditPath = [base64.b64decode(
a.encode()) for a in r[f.RESULT.nm][F.auditPath.name]]
rootHash = Ledger.strToHash(
r[f.RESULT.nm][F.rootHash.name])
auditPath = [Ledger.strToHash(a) for a in
r[f.RESULT.nm][F.auditPath.name]]
filtered = dict((k, v) for (k, v) in r[f.RESULT.nm].items()
if k not in ignored
)
if k not in ignored)
result = serializer.serialize(filtered)
verifier.verify_leaf_inclusion(result, seqNo - 1,
auditPath,
Expand Down
13 changes: 11 additions & 2 deletions plenum/common/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# inter-node communication
from enum import IntEnum
from enum import IntEnum, unique

from plenum.common.roles import Roles
from plenum.common.transactions import PlenumTransactions
Expand Down Expand Up @@ -101,10 +101,19 @@ class StorageType(IntEnum):
File = 1
Ledger = 2


class KeyValueStorageType(IntEnum):
Leveldb = 1
Memory = 2


@unique
class LedgerState(IntEnum):
not_synced = 1
syncing = 2
synced = 3


OP_FIELD_NAME = "op"

CLIENT_STACK_SUFFIX = "C"
Expand All @@ -121,4 +130,4 @@ class KeyValueStorageType(IntEnum):

PLUGIN_BASE_DIR_PATH = "PluginBaseDirPath"
POOL_LEDGER_ID = 0
DOMAIN_LEDGER_ID = 1
DOMAIN_LEDGER_ID = 1
1 change: 1 addition & 0 deletions plenum/common/keygen_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def initNodeKeysForBothStacks(name, baseDir, sigseed, override=False, config=Non
return nodeStackClass.initLocalKeys(name, baseDir, sigseed,
override=override)


def areKeysSetup(name, baseDir, config=None):
return nodeStackClass.areKeysSetup(name, baseDir)

Expand Down
8 changes: 4 additions & 4 deletions plenum/common/ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ def __init__(self, *args, **kwargs):
self.uncommittedRootHash = None
self.uncommittedTree = None

@property
def uncommittedSize(self):
return len(self.uncommittedTxns)

def appendTxns(self, txns: List):
# These transactions are not yet committed so they do not go to
# the ledger
Expand Down Expand Up @@ -102,3 +98,7 @@ def treeWithAppliedTxns(self, txns: List, currentTree=None):
@staticmethod
def hashToStr(h):
return base58.b58encode(h)

@staticmethod
def strToHash(s):
return base58.b58decode(s)
47 changes: 30 additions & 17 deletions plenum/common/ledger_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import heapq
import operator
from base64 import b64encode, b64decode
from collections import Callable
from functools import partial
from random import shuffle
Expand All @@ -9,14 +8,13 @@
from typing import Optional

import time
from ledger.ledger import Ledger
from plenum.common.ledger import Ledger
from ledger.merkle_verifier import MerkleVerifier
from ledger.util import F

from plenum.common.startable import LedgerState
from plenum.common.types import LedgerStatus, CatchupRep, \
ConsistencyProof, f, CatchupReq, ConsProofRequest
from plenum.common.constants import POOL_LEDGER_ID
from plenum.common.constants import POOL_LEDGER_ID, LedgerState
from plenum.common.util import getMaxFailures
from plenum.common.config_util import getConfig
from stp_core.common.log import getlogger
Expand Down Expand Up @@ -323,6 +321,10 @@ def canProcessConsistencyProof(self, proof: ConsistencyProof) -> bool:
def processCatchupReq(self, req: CatchupReq, frm: str):
logger.debug("{} received catchup request: {} from {}".
format(self, req, frm))
if not self.ownedByNode:
self.discard(req, reason="Only node can serve catchup requests",
logMethod=logger.warn)
return

start = getattr(req, f.SEQ_NO_START.nm)
end = getattr(req, f.SEQ_NO_END.nm)
Expand Down Expand Up @@ -352,8 +354,11 @@ def processCatchupReq(self, req: CatchupReq, frm: str):

logger.debug("{} generating consistency proof: {} from {}".
format(self, end, req.catchupTill))
consProof = [b64encode(p).decode() for p in
consProof = [Ledger.hashToStr(p) for p in
ledger.tree.consistency_proof(end, req.catchupTill)]

for seq_no in txns:
txns[seq_no] = self.owner.update_txn_with_extra_data(txns[seq_no])
self.sendTo(msg=CatchupRep(getattr(req, f.LEDGER_ID.nm), txns,
consProof), to=frm)

Expand Down Expand Up @@ -422,7 +427,7 @@ def _processCatchupReplies(self, ledgerId, ledger: Ledger,
if result:
ledgerInfo = self.getLedgerInfoByType(ledgerId)
for _, txn in catchUpReplies[:toBeProcessed]:
merkleInfo = ledger.add(txn)
merkleInfo = ledger.add(self._transform(txn))
txn[F.seqNo.name] = merkleInfo[F.seqNo.name]
ledgerInfo.postTxnAddedToLedgerClbk(ledgerId, txn)
self._removePrcdCatchupReply(ledgerId, nodeName, seqNo)
Expand Down Expand Up @@ -450,6 +455,14 @@ def _removePrcdCatchupReply(self, ledgerId, node, seqNo):
break
ledgerInfo.recvdCatchupRepliesFrm[node].pop(i)

def _transform(self, txn):
# Certain transactions other than pool ledger might need to be
# transformed to certain format before applying to the ledger
if not self.ownedByNode:
return txn
else:
return self.owner.transform_txn_for_ledger(txn)

def hasValidCatchupReplies(self, ledgerId, ledger, seqNo, catchUpReplies):
# Here seqNo has to be the seqNo of first transaction of
# `catchupReplies`
Expand All @@ -460,19 +473,20 @@ def hasValidCatchupReplies(self, ledgerId, ledger, seqNo, catchUpReplies):
seqNo)

txns = getattr(catchupReply, f.TXNS.nm)

# Add only those transaction in the temporary tree from the above
# batch

# Transfers of odcits in RAET converts integer keys to string
txns = [txn for s, txn in catchUpReplies[:len(txns)] if str(s) in txns]
txns = [self._transform(txn) for s, txn in catchUpReplies[:len(txns)]
if str(s) in txns]

# Creating a temporary tree which will be used to verify consistency
# proof, by inserting transactions. Duplicating a merkle tree is not
# expensive since we are using a compact merkle tree.
tempTree = ledger.treeWithAppliedTxns(txns)

proof = getattr(catchupReply, f.CONS_PROOF.nm)


ledgerInfo = self.getLedgerInfoByType(ledgerId)
verifier = ledgerInfo.verifier
cp = ledgerInfo.catchUpTill
Expand All @@ -481,13 +495,13 @@ def hasValidCatchupReplies(self, ledgerId, ledger, seqNo, catchUpReplies):
try:
logger.debug("{} verifying proof for {}, {}, {}, {}, {}".
format(self, tempTree.tree_size, finalSize,
tempTree.root_hash, b64decode(finalMTH),
[b64decode(p) for p in proof]))
tempTree.root_hash, Ledger.strToHash(finalMTH),
[Ledger.strToHash(p) for p in proof]))
verified = verifier.verify_tree_consistency(tempTree.tree_size,
finalSize,
tempTree.root_hash,
b64decode(finalMTH),
[b64decode(p) for p in
Ledger.strToHash(finalMTH),
[Ledger.strToHash(p) for p in
proof])
except Exception as ex:
logger.info("{} could not verify catchup reply {} since {}".
Expand Down Expand Up @@ -778,10 +792,9 @@ def _buildConsistencyProof(self, ledgerId, seqNoStart, seqNoEnd):
seqNoStart,
seqNoEnd,
ppSeqNo,
b64encode(oldRoot).decode(),
b64encode(newRoot).decode(),
[b64encode(p).decode() for p in
proof]
Ledger.hashToStr(oldRoot),
Ledger.hashToStr(newRoot),
[Ledger.hashToStr(p) for p in proof]
)

def _compareLedger(self, status: LedgerStatus):
Expand Down
11 changes: 7 additions & 4 deletions plenum/common/stack_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import shutil
from abc import abstractproperty
from abc import abstractmethod
from collections import OrderedDict

from plenum.common.keygen_utils import initRemoteKeys
Expand All @@ -25,15 +25,18 @@ def __init__(self, name, basedirpath, isNode=True):
self.isNode = isNode
self.hashStore = None

@abstractproperty
@property
@abstractmethod
def hasLedger(self) -> bool:
raise NotImplementedError

@abstractproperty
@property
@abstractmethod
def ledgerLocation(self) -> str:
raise NotImplementedError

@abstractproperty
@property
@abstractmethod
def ledgerFile(self) -> str:
raise NotImplementedError

Expand Down
7 changes: 0 additions & 7 deletions plenum/common/startable.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,3 @@ class Mode(IntEnum):
discovered = 3 # caught up with pool txn ledger
syncing = 4 # catching up on domain txn ledger
participating = 5 # caught up with domain txn ledger


@unique
class LedgerState(IntEnum):
not_synced = 1
syncing = 2
synced = 3
9 changes: 6 additions & 3 deletions plenum/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@

# Monitoring configuration
PerfCheckFreq = 10
DELTA = 0.8

# Temporarily reducing DELTA till the calculations for extra work are not
# incorporated
DELTA = 0.4
LAMBDA = 60
OMEGA = 5
SendMonitorStats = False
Expand Down Expand Up @@ -164,7 +167,7 @@
# Max batch size for 3 phase commit
Max3PCBatchSize = 100
# Max time to wait before creating a batch for 3 phase commit
Max3PCBatchWait = 5
Max3PCBatchWait = 1

# Maximum lifespan for a batch, this needs to be changed if
# `Max3PCBatchSize` is changed
Expand All @@ -186,4 +189,4 @@
CLIENT_REQACK_TIMEOUT = 5
CLIENT_REPLY_TIMEOUT = Max3PCBatchWait + 10
CLIENT_MAX_RETRY_ACK = 5
CLIENT_MAX_RETRY_REPLY = 5
CLIENT_MAX_RETRY_REPLY = 5
4 changes: 4 additions & 0 deletions plenum/persistence/req_id_to_txn.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,9 @@ def get(self, identifier, reqId) -> Optional[int]:
except (KeyError, ValueError):
return None

@property
def size(self):
return self._keyValueStorage.size

def close(self):
self._keyValueStorage.close()
6 changes: 6 additions & 0 deletions plenum/persistence/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@


def txnsWithSeqNo(seqNoStart, seqNoEnd, txns):
"""
Update each transaction with a sequence number field
"""
txns = deepcopy(txns)
for txn, seqNo in zip(txns, range(seqNoStart, seqNoEnd + 1)):
txn[F.seqNo.name] = seqNo
return txns


def txnsWithMerkleInfo(ledger, committedTxns):
"""
Update each transaction with the merkle root hash and audit path
"""
committedTxns = deepcopy(committedTxns)
for txn in committedTxns:
mi = ledger.merkleInfo(txn.get(F.seqNo.name))
Expand Down
11 changes: 10 additions & 1 deletion plenum/server/domain_req_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,19 @@ def _reqToTxn(self, req: Request):

def apply(self, req: Request):
txn = self._reqToTxn(req)
(start, end), _ = self.ledger.appendTxns([txn])
(start, end), _ = self.ledger.appendTxns([self.transform_txn_for_ledger(txn)])
self.updateState(txnsWithSeqNo(start, end, [txn]))
return txn

@staticmethod
def transform_txn_for_ledger(txn):
"""
Some transactions need to be updated before they can be stored in the
ledger, eg. storing certain payload in another data store and only its
hash in the ledger
"""
return txn

def updateState(self, txns, isCommitted=False):
for txn in txns:
self._updateStateWithSingleTxn(txn, isCommitted=isCommitted)
Expand Down
Loading