Skip to content

Commit

Permalink
Add reconfiguration to basicperf (#5507)
Browse files Browse the repository at this point in the history
  • Loading branch information
achamayou authored Aug 8, 2023
1 parent c9d784f commit abeeed6
Showing 1 changed file with 75 additions and 4 deletions.
79 changes: 75 additions & 4 deletions tests/infra/basicperf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import random
import string
import json
import shutil
import datetime


def configure_remote_client(args, client_id, client_host, common_dir):
Expand Down Expand Up @@ -177,6 +179,35 @@ def create_and_fill_key_space(size: int, primary: infra.node.Node) -> List[str]:
return space


def create_and_add_node(
network, host, old_primary, new_primary, snapshots_dir, statistics
):
LOG.info(f"Retiring old primary {old_primary.local_node_id}")
statistics[
"initial_primary_retirement_start_time"
] = datetime.datetime.now().isoformat()
network.retire_node(new_primary, old_primary)
statistics[
"initial_primary_retirement_complete_time"
] = datetime.datetime.now().isoformat()
LOG.info("Old primary is retired")

LOG.info(f"Adding new node: {host}")
node = network.create_node(host)
statistics["new_node_join_start_time"] = datetime.datetime.now().isoformat()
network.join_node(
node,
args.package,
args,
timeout=10,
copy_ledger=False,
snapshots_dir=snapshots_dir,
)
network.trust_node(node, args)
statistics["new_node_join_complete_time"] = datetime.datetime.now().isoformat()
LOG.info(f"Done adding new node: {host}")


def run(args):
hosts = args.nodes or ["local://localhost"]

Expand Down Expand Up @@ -212,7 +243,11 @@ def run(args):
requests_file_paths = []
for client_def in args.client_def:
count, gen, iterations, target = client_def.split(",")
rr_idx = 0
# The round robin index deliberately starts at 1, so that backups/any are
# loaded uniformly but the first instance slightly less so where possible. This
# is useful when running a failover test, to avoid the new primary being targeted
# by reads.
rr_idx = 1
for _ in range(int(count)):
LOG.info(f"Generating {iterations} requests for client_{client_idx}")
msgs = generator.Messages()
Expand Down Expand Up @@ -262,7 +297,7 @@ def run(args):
# All clients talking to the primary are configured to fail over to the first backup,
# which is the only node whose election timeout has not been raised, to guarantee its
# election as the old primary becomes unavailable.
if args.stop_primary_after_s:
if args.stop_primary_after_s and target == "primary":
cmd.append(
f"--failover-server-address={backups[0].get_public_rpc_host()}:{backups[0].get_public_rpc_port()}"
)
Expand All @@ -283,7 +318,9 @@ def run(args):
format_width = len(str(args.client_timeout_s)) + 3

try:
statistics = {}
start_time = time.time()
primary_has_stopped = False
while True:
stop_waiting = True
for i, remote_client in enumerate(clients):
Expand All @@ -302,12 +339,46 @@ def run(args):
if (
args.stop_primary_after_s
and time.time() > start_time + args.stop_primary_after_s
and not primary.is_stopped()
and not primary_has_stopped
):
committed_snapshots_dir = network.get_committed_snapshots(
primary, force_txs=False
)
snapshots = os.listdir(committed_snapshots_dir)
sorted_snapshots = sorted(
snapshots, key=lambda x: int(x.split("_")[1])
)
latest_snapshot = sorted_snapshots[-1]
latest_snapshot_dir = os.path.join(
network.common_dir, "snapshots_to_copy"
)
os.mkdir(latest_snapshot_dir)
shutil.copy(
os.path.join(committed_snapshots_dir, latest_snapshot),
latest_snapshot_dir,
)
LOG.info(
f"Stopping primary after {args.stop_primary_after_s} seconds"
)
statistics[
"initial_primary_shutdown_time"
] = datetime.datetime.now().isoformat()
primary.stop()
primary_has_stopped = True
old_primary = primary
primary, _ = network.wait_for_new_primary(primary)
statistics[
"new_primary_election_time"
] = datetime.datetime.now().isoformat()
if args.add_new_node_after_primary_stops:
create_and_add_node(
network,
args.add_new_node_after_primary_stops,
old_primary,
primary,
latest_snapshot_dir,
statistics,
)

time.sleep(1)

Expand Down Expand Up @@ -335,7 +406,6 @@ def run(args):

network.stop_all_nodes()

statistics = {}
agg = []

for client_id, remote_client in enumerate(clients):
Expand Down Expand Up @@ -526,6 +596,7 @@ def cli_args():
parser.add_argument(
"--stop-primary-after-s", help="Stop primary after this many seconds", type=int
)
parser.add_argument("--add-new-node-after-primary-stops", type=str)
return infra.e2e_args.cli_args(
parser=parser, accept_unknown=False, ledger_chunk_bytes_override="5MB"
)
Expand Down

0 comments on commit abeeed6

Please sign in to comment.