From 6f0ff8368a08c45a8dffbc729f57b53ec7cbb0af Mon Sep 17 00:00:00 2001 From: Max Tropets Date: Thu, 11 Jul 2024 20:19:43 +0000 Subject: [PATCH] Test LRU in e2e test --- samples/apps/logging/logging.cpp | 10 --- tests/historical_query_cache.py | 131 +++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+), 10 deletions(-) create mode 100644 tests/historical_query_cache.py diff --git a/samples/apps/logging/logging.cpp b/samples/apps/logging/logging.cpp index 719227179846..8b58ce9fbe57 100644 --- a/samples/apps/logging/logging.cpp +++ b/samples/apps/logging/logging.cpp @@ -1657,11 +1657,6 @@ namespace loggingapp ccf::http::headers::CONTENT_TYPE, ccf::http::headervalues::contenttype::JSON); ctx.rpc_ctx->set_response_body(j_response.dump()); - - // ALSO: Assume this response makes it all the way to the client, and - // they're finished with it, so we can drop the retrieved state. In a - // real app this may be driven by a separate client request or an LRU - historical_cache.drop_cached_states(handle); }; make_endpoint( get_historical_range_path, @@ -1828,11 +1823,6 @@ namespace loggingapp ccf::http::headers::CONTENT_TYPE, ccf::http::headervalues::contenttype::JSON); ctx.rpc_ctx->set_response_body(j_response.dump()); - - // ALSO: Assume this response makes it all the way to the client, and - // they're finished with it, so we can drop the retrieved state. In a - // real app this may be driven by a separate client request or an LRU - historical_cache.drop_cached_states(handle); }; make_endpoint( get_historical_sparse_path, diff --git a/tests/historical_query_cache.py b/tests/historical_query_cache.py new file mode 100644 index 000000000000..3c12f0f1adb2 --- /dev/null +++ b/tests/historical_query_cache.py @@ -0,0 +1,131 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the Apache 2.0 License. +import infra.e2e_args +import infra.network +import infra.proc +import infra.commit +import http +from infra.snp import IS_SNP +import infra.jwt_issuer +import time +import infra.bencher + +from loguru import logger as LOG + +DEFAULT_TIMEOUT_S = 10 if IS_SNP else 5 + + +def format_message(idx): + return """ + Nodes whisper secrets, + Across vast digital realms, + Harmony in bits. + """ + str( + idx + ) + + +def submit_log_entry(primary, idx): + with primary.client("user0") as c: + msg = format_message(idx) + r = c.post( + "/app/log/public", + { + "id": idx, + "msg": msg, + }, + log_capture=None, + ) + assert r.status_code == http.HTTPStatus.OK + return (r.view, r.seqno) + + +def get_and_verify_entry(client, idx): + start_time = time.time() + end_time = start_time + 10 + entries = [] + path = f"/app/log/public/historical/range?id={idx}" + while time.time() < end_time: + r = client.get(path, headers={}) + if r.status_code == http.HTTPStatus.OK: + j_body = r.body.json() + entries += j_body["entries"] + if "@nextLink" in j_body: + path = j_body["@nextLink"] + continue + else: + # No @nextLink means we've reached end of range + assert entries[0]["msg"] == format_message(idx) + return + elif r.status_code == http.HTTPStatus.ACCEPTED: + # Ignore retry-after header, retry soon + time.sleep(0.1) + continue + else: + raise ValueError( + f""" + Unexpected status code from historical range query: {r.status_code} + + {r.body} + """ + ) + + raise TimeoutError("Historical range not available") + + +def test_historical_query_stress_cache(network, args): + """This test loads the historical cache good enough so it's force to + lru_shrink. We go over the range twice and make sure we're able to load new + entries after they get evicted from the cache.""" + + jwt_issuer = infra.jwt_issuer.JwtIssuer() + jwt_issuer.register(network) + jwt = jwt_issuer.issue_jwt() + + primary, _ = network.find_primary() + + start = 1 + end = 100 + last_seqno = None + last_view = None + for i in range(start, end + 1): + last_view, last_seqno = submit_log_entry(primary, i) + + with primary.client("user0") as c: + infra.commit.wait_for_commit(c, seqno=last_seqno, view=last_view, timeout=10) + + network.wait_for_all_nodes_to_commit(primary=primary) + node = network.find_node_by_role(role=infra.network.NodeRole.BACKUP, log_capture=[]) + + with node.client(common_headers={"authorization": f"Bearer {jwt}"}) as c: + for cycle in range(0, 2): + LOG.info(f"Polling [{start}:{end + 1}] range. Attempt=[{cycle}]") + for idx in range(start, end + 1): + get_and_verify_entry(c, idx) + + return network + + +def run(args): + with infra.network.network( + args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb + ) as network: + network.start_and_open(args) + + network = test_historical_query_stress_cache(network, args) + + +if __name__ == "__main__": + + def add(parser): + pass + + args = infra.e2e_args.cli_args(add=add) + args.package = "samples/apps/logging/liblogging" + args.nodes = infra.e2e_args.max_nodes(args, f=0) + args.initial_member_count = 1 + args.sig_ms_interval = 1000 # Set to cchost default value + + args.historical_cache_soft_limit = "10KB" + + run(args)