Skip to content

Commit

Permalink
Split node setup and join, and use correct ordering in basicperf (#5527)
Browse files Browse the repository at this point in the history
  • Loading branch information
achamayou authored Aug 10, 2023
1 parent 9c0228c commit 9bc2165
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 70 deletions.
2 changes: 1 addition & 1 deletion .daily_canary
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
--- ___ ___
(- -) (= =) | Y & +--?
( V ) z . z O +---=---'
/--x-m- /--n-n---xXx--/--yY-----.
/--x-m- /--n-n---xXx--/--yY-----...
35 changes: 20 additions & 15 deletions tests/infra/basicperf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import json
import shutil
import datetime
import ccf.ledger


def configure_remote_client(args, client_id, client_host, common_dir):
Expand Down Expand Up @@ -179,17 +180,32 @@ def create_and_fill_key_space(size: int, primary: infra.node.Node) -> List[str]:
return space


def create_and_add_node(network, host, old_primary, snapshots_dir, statistics):
LOG.info(f"Add new node: {host}")
def replace_primary(network, host, old_primary, snapshots_dir, statistics):
LOG.info(f"Set up new node: {host}")
node = network.create_node(host)
statistics["new_node_join_start_time"] = datetime.datetime.now().isoformat()
network.join_node(
network.setup_join_node(
node,
args.package,
args,
target_node=network.nodes[1],
timeout=10,
copy_ledger=False,
snapshots_dir=snapshots_dir,
follow_redirect=False,
)
LOG.info(f"Shut down primary: {old_primary.local_node_id}")
statistics["initial_primary_shutdown_time"] = datetime.datetime.now().isoformat()
old_primary.stop()
LOG.info(f"Start new node: {node.local_node_id}")
network.run_join_node(node, wait_for_node_in_store=False)
primary, _ = network.wait_for_new_primary(old_primary)
statistics["new_primary_detected_time"] = datetime.datetime.now().isoformat()
network.wait_for_node_in_store(
primary,
node.node_id,
node_status=ccf.ledger.NodeStatus.PENDING,
timeout=5,
)
LOG.info(f"Replace node {old_primary.local_node_id} with {node.local_node_id}")
network.replace_stopped_node(old_primary, node, args, statistics=statistics)
Expand Down Expand Up @@ -345,21 +361,10 @@ def run(args):
os.path.join(committed_snapshots_dir, latest_snapshot),
latest_snapshot_dir,
)
LOG.info(
f"Stopping primary after {args.stop_primary_after_s} seconds"
)
statistics[
"initial_primary_shutdown_time"
] = datetime.datetime.now().isoformat()
primary.stop()
primary_has_stopped = True
old_primary = primary
primary, _ = network.wait_for_new_primary(primary)
statistics[
"new_primary_detected_time"
] = datetime.datetime.now().isoformat()
if args.add_new_node_after_primary_stops:
create_and_add_node(
replace_primary(
network,
args.add_new_node_after_primary_stops,
old_primary,
Expand Down
127 changes: 88 additions & 39 deletions tests/infra/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def create_node(self, host, binary_dir=None, library_dir=None, **kwargs):
self.nodes.append(node)
return node

def _add_node(
def _setup_node(
self,
node,
lib_name,
Expand Down Expand Up @@ -369,7 +369,7 @@ def _add_node(
if not node.version_after("ccf-2.0.3") and read_only_snapshots_dir is not None:
snapshots_dir = read_only_snapshots_dir

node.join(
node.prepare_join(
lib_name=lib_name,
workspace=args.workspace,
label=args.label,
Expand All @@ -382,6 +382,35 @@ def _add_node(
**kwargs,
)

def _add_node(
self,
node,
lib_name,
args,
target_node=None,
recovery=False,
ledger_dir=None,
copy_ledger=True,
read_only_ledger_dirs=None,
from_snapshot=True,
snapshots_dir=None,
**kwargs,
):
self._setup_node(
node,
lib_name,
args,
target_node,
recovery,
ledger_dir,
copy_ledger,
read_only_ledger_dirs,
from_snapshot,
snapshots_dir,
**kwargs,
)
node.complete_join()

# If the network is opening or recovering, nodes are trusted without consortium approval
if (
self.status == ServiceStatus.OPENING
Expand Down Expand Up @@ -831,55 +860,75 @@ def stop_all_nodes(
"Fatal error found during node shutdown", node_errors
)

def join_node(
def setup_join_node(
self,
node,
lib_name,
args,
target_node=None,
timeout=JOIN_TIMEOUT,
stop_on_error=False,
**kwargs,
):
forwarded_args = {
arg: getattr(args, arg, None)
for arg in infra.network.Network.node_args_to_forward
}
self._add_node(node, lib_name, args, target_node, **forwarded_args, **kwargs)
self._setup_node(node, lib_name, args, target_node, **forwarded_args, **kwargs)

primary, _ = self.find_primary()
try:
self.wait_for_node_in_store(
primary,
node.node_id,
node_status=(
ccf.ledger.NodeStatus.PENDING
if self.status == ServiceStatus.OPEN
else ccf.ledger.NodeStatus.TRUSTED
),
timeout=timeout,
)
except TimeoutError as e:
LOG.error(f"New pending node {node.node_id} failed to join the network")
if stop_on_error:
assert node.remote.check_done()
node.stop()
out_path, err_path = node.get_logs()
if out_path is not None and err_path is not None:
errors, _ = log_errors(out_path, err_path)
else:
errors = []
self.nodes.remove(node)
if errors:
# Throw accurate exceptions if known errors found in
for error in errors:
if "Quote does not contain known enclave measurement" in error:
raise CodeIdNotFound from e
if "StartupSeqnoIsOld" in error:
raise StartupSeqnoIsOld from e
if "invalid cert on handshake" in error:
raise ServiceCertificateInvalid from e
raise
def run_join_node(
self,
node,
timeout=JOIN_TIMEOUT,
stop_on_error=False,
wait_for_node_in_store=True,
):
node.complete_join()
if wait_for_node_in_store:
primary, _ = self.find_primary()
try:
self.wait_for_node_in_store(
primary,
node.node_id,
node_status=(
ccf.ledger.NodeStatus.PENDING
if self.status == ServiceStatus.OPEN
else ccf.ledger.NodeStatus.TRUSTED
),
timeout=timeout,
)
except TimeoutError as e:
LOG.error(f"New pending node {node.node_id} failed to join the network")
if stop_on_error:
assert node.remote.check_done()
node.stop()
out_path, err_path = node.get_logs()
if out_path is not None and err_path is not None:
errors, _ = log_errors(out_path, err_path)
else:
errors = []
self.nodes.remove(node)
if errors:
# Throw accurate exceptions if known errors found in
for error in errors:
if "Quote does not contain known enclave measurement" in error:
raise CodeIdNotFound from e
if "StartupSeqnoIsOld" in error:
raise StartupSeqnoIsOld from e
if "invalid cert on handshake" in error:
raise ServiceCertificateInvalid from e
raise

def join_node(
self,
node,
lib_name,
args,
target_node=None,
timeout=JOIN_TIMEOUT,
stop_on_error=False,
**kwargs,
):
self.setup_join_node(node, lib_name, args, target_node, **kwargs)
self.run_join_node(node, timeout, stop_on_error)

def trust_node(
self,
Expand Down
57 changes: 42 additions & 15 deletions tests/infra/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class NodeNetworkState(Enum):
stopped = auto()
started = auto()
joined = auto()
setup = auto()


class State(Enum):
Expand Down Expand Up @@ -140,7 +141,6 @@ def __init__(
if self.version is not None
else None
)
self.consensus = None
self.certificate_valid_from = None
self.certificate_validity_days = None
self.initial_node_data_json_file = node_data_json_file
Expand Down Expand Up @@ -208,7 +208,7 @@ def start(
members_info,
**kwargs,
):
self._start(
self._setup(
infra.remote.StartType.start,
lib_name,
enclave_type,
Expand All @@ -218,6 +218,7 @@ def start(
members_info=members_info,
**kwargs,
)
self._start()
self.network_state = NodeNetworkState.joined

def join(
Expand All @@ -229,7 +230,7 @@ def join(
common_dir,
**kwargs,
):
self._start(
self._setup(
infra.remote.StartType.join,
lib_name,
enclave_type,
Expand All @@ -238,9 +239,32 @@ def join(
common_dir,
**kwargs,
)
self._start()

def prepare_join(
self,
lib_name,
enclave_type,
workspace,
label,
common_dir,
**kwargs,
):
self._setup(
infra.remote.StartType.join,
lib_name,
enclave_type,
workspace,
label,
common_dir,
**kwargs,
)

def complete_join(self):
self._start()

def recover(self, lib_name, enclave_type, workspace, label, common_dir, **kwargs):
self._start(
self._setup(
infra.remote.StartType.recover,
lib_name,
enclave_type,
Expand All @@ -249,12 +273,10 @@ def recover(self, lib_name, enclave_type, workspace, label, common_dir, **kwargs
common_dir,
**kwargs,
)
self._start()
self.network_state = NodeNetworkState.joined

def get_consensus(self):
return self.consensus

def _start(
def _setup(
self,
start_type,
lib_name,
Expand All @@ -268,10 +290,7 @@ def _start(
):
"""
Creates a CCFRemote instance, sets it up (connects, creates the directory
and ships over the files), and (optionally) starts the node by executing
the appropriate command.
If self.debug is set, it will not actually start up the node, but will
prompt the user to do so manually.
and ships over the files)
"""
lib_path = infra.path.build_lib_path(
lib_name, enclave_type, enclave_platform, library_dir=self.library_dir
Expand All @@ -280,6 +299,7 @@ def _start(
members_info = members_info or []
self.label = label

self.certificate_validity_days = kwargs.get("initial_node_cert_validity_days")
self.remote = infra.remote.CCFRemote(
start_type,
lib_path,
Expand All @@ -304,6 +324,15 @@ def _start(
**kwargs,
)
self.remote.setup()
self.network_state = NodeNetworkState.setup

def _start(self):
"""
(optionally) starts the node by executing the appropriate command.
If self.debug is set, it will not actually start up the node, but will
prompt the user to do so manually.
"""
assert self.network_state == NodeNetworkState.setup
self.network_state = NodeNetworkState.started
if self.debug:
with open("/tmp/vscode-gdb.sh", "a", encoding="utf-8") as f:
Expand Down Expand Up @@ -340,8 +369,6 @@ def _start(
f"Error starting node {self.local_node_id}"
) from e

self.consensus = kwargs.get("consensus")

timeout = 5
start_time = time.time()
while time.time() < start_time + timeout:
Expand All @@ -362,7 +389,7 @@ def _start(
time.sleep(0.1)

self._read_ports()
self.certificate_validity_days = kwargs.get("initial_node_cert_validity_days")

start_msg = f"Node {self.local_node_id} started: {self.node_id}"
if self.version is not None:
start_msg += f" [version: {self.version}]"
Expand Down

0 comments on commit 9bc2165

Please sign in to comment.