From 0281e2babc00779f5c3299e23133b1b322a781b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chrz=C4=85szcz?= Date: Thu, 14 Jan 2021 10:19:09 +0100 Subject: [PATCH 1/3] Fix a minor typo --- src/cassandra/mongoose_cassandra_worker.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cassandra/mongoose_cassandra_worker.erl b/src/cassandra/mongoose_cassandra_worker.erl index b523d4bf0f..b8bb7e9602 100644 --- a/src/cassandra/mongoose_cassandra_worker.erl +++ b/src/cassandra/mongoose_cassandra_worker.erl @@ -509,7 +509,7 @@ is_finished(_Req = #request{action = #read_action{}}, Result) -> {abort, AbortReason :: term(), worker_state()} | {retry, Timeout :: non_neg_integer(), worker_state()}. retry_info(_, _, #request{retry_left = 0} = _Req, State) -> - {abort, retry_limit_exeeded, State}; + {abort, retry_limit_exceeded, State}; retry_info({down, cqerl_client}, _Reason, _Req, State) -> {retry, 5 + rand:uniform(20), State}; retry_info(cqerl_error, {16#1100 = _WriteTimout, _, _}, _Req, State) -> From 0193e156b6b676f705027584e7bde1039c7e44ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chrz=C4=85szcz?= Date: Thu, 14 Jan 2021 10:23:01 +0100 Subject: [PATCH 2/3] Store the cluster config in the env of the cqerl app This prevents mongooseim from losing the connections to cassandra when cqerl's supervision subtree including 'cqerl_cluster' is restarted and the config is taken from the env - which was empty before the change. On the first start the pools are started one by one (on demand), so it is not possible to use the env there as well - hence the current approach, which has to be slightly inconsistent. --- src/wpool/mongoose_wpool_cassandra.erl | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/wpool/mongoose_wpool_cassandra.erl b/src/wpool/mongoose_wpool_cassandra.erl index e781e1d4a1..ba30afd797 100644 --- a/src/wpool/mongoose_wpool_cassandra.erl +++ b/src/wpool/mongoose_wpool_cassandra.erl @@ -13,7 +13,9 @@ start(Host, Tag, WpoolOptsIn, CqerlOpts) -> PoolSize = proplists:get_value(workers, WpoolOptsIn, 20), application:set_env(cqerl, num_clients, PoolSize), ExtConfig = extend_config(CqerlOpts), - Res = cqerl_cluster:add_nodes(Tag, proplists:get_value(servers, ExtConfig), ExtConfig), + Servers = proplists:get_value(servers, ExtConfig), + set_cluster_config(Tag, Servers, ExtConfig), + Res = cqerl_cluster:add_nodes(Tag, Servers, ExtConfig), case lists:keyfind(error, 1, Res) of false -> ok; @@ -38,3 +40,9 @@ extend_config(PoolConfig) -> ConfigMap = maps:merge(Defaults, maps:from_list(PoolConfig)), maps:to_list(ConfigMap). +%% make the config survive the restart of 'cqerl_cluster' in case of a network failure +set_cluster_config(Tag, Servers, ExtConfig) -> + Clusters = application:get_env(cqerl, cassandra_clusters, []), + ClusterConfig = {Tag, {Servers, ExtConfig}}, + NewClusters = lists:keystore(Tag, 1, Clusters, ClusterConfig), + application:set_env(cqerl, cassandra_clusters, NewClusters). From 6233c37e7d563273c98b218c54e76d7afb8cf278 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chrz=C4=85szcz?= Date: Thu, 14 Jan 2021 11:08:35 +0100 Subject: [PATCH 3/3] Update the cassandra tests for broken connections - Create a test to ensure the config survives restarts of cqerl_cluster - Create a simplified version of the broken connection test that would not fail randomly. - Improve the stress test for broken connections - Delete data from the DB for better repeatability (the queries were getting slower because of more data, resulting in timeouts) - Repeat failed queries to check if the connections recover - Disable the stress test on CI as it can fail (unlikely but possible) The stress test was run 100 times locally with various intervals for the network failure. It survived the test even with the interval of 1 second, but with many failed queries (this was quite expected). For 5 seconds it was not possible to cause a failed query locally. --- big_tests/tests/mongoose_cassandra_SUITE.erl | 77 ++++++++++++++++---- 1 file changed, 62 insertions(+), 15 deletions(-) diff --git a/big_tests/tests/mongoose_cassandra_SUITE.erl b/big_tests/tests/mongoose_cassandra_SUITE.erl index 8a51ce1e2b..eddc800edc 100644 --- a/big_tests/tests/mongoose_cassandra_SUITE.erl +++ b/big_tests/tests/mongoose_cassandra_SUITE.erl @@ -60,12 +60,14 @@ all() -> [ {group, happy_cases}, {group, not_so_happy_cases} + %{group, stress_cases} % not intended for CI, uncomment for manual stress test ]. groups() -> G = [ {happy_cases, [parallel], happy_cases()}, - {not_so_happy_cases, [sequence, {repeat_until_all_ok, 5}], not_so_happy_cases()} + {not_so_happy_cases, [sequence], not_so_happy_cases()}, + {stress_cases, [sequence], stress_cases()} ], ct_helper:repeat_all_until_all_ok(G). @@ -81,7 +83,13 @@ happy_cases() -> not_so_happy_cases() -> [ - big_batch_read_write_unstable_connection_should_succeed + should_work_after_restart_of_cqerl_cluster, + should_work_after_connection_reset + ]. + +stress_cases() -> + [ + big_batch_read_write_unstable_connection_should_succeed ]. init_per_suite(Config) -> @@ -119,7 +127,8 @@ prepared_queries() -> [ {insert, "INSERT INTO " ?TEST_TABLE_NAME " (f1, f2) VALUES (?, ?)"}, {select_by_f1, "SELECT * FROM " ?TEST_TABLE_NAME " WHERE f1 = ?"}, - {select_by_f2, "SELECT * FROM " ?TEST_TABLE_NAME " WHERE f2 = ? ALLOW FILTERING"} + {select_by_f2, "SELECT * FROM " ?TEST_TABLE_NAME " WHERE f2 = ? ALLOW FILTERING"}, + {delete, "DELETE FROM " ?TEST_TABLE_NAME " WHERE f1 = ?"} ]. %%. @@ -189,27 +198,38 @@ big_objects_read_write_should_succeed(Config) -> ok. +should_work_after_connection_reset(Config) -> + reset_all_cassandra_connections(), + mongoose_helper:wait_until(fun() -> + call(test_query, [?TEST_DB_POOL_NAME]) + end, ok). + +should_work_after_restart_of_cqerl_cluster(Config) -> + Pid1 = call(erlang, whereis, [cqerl_cluster]), + call(erlang, exit, [Pid1, kill]), + mongoose_helper:wait_until(fun() -> + Pid2 = call(erlang, whereis, [cqerl_cluster]), + Pid1 =/= Pid2 andalso Pid2 =/= undefined + end, true), + mongoose_helper:wait_until(fun() -> + call(test_query, [?TEST_DB_POOL_NAME]) + end, ok). + big_batch_read_write_unstable_connection_should_succeed(Config) -> Pid = spawn_link(fun reset_all_cassandra_connections_loop/0), - batch_main(Config, 30, timer:seconds(60)), - Pid ! exit, - - ok. - + batch_main(Config, 30, timer:seconds(20)), + Pid ! exit. %%. %%' Helpers %% +%% Changing the Interval to 1 second makes some requests fail - cqerl might be improved there reset_all_cassandra_connections_loop() -> reset_all_cassandra_connections_loop(timer:seconds(5)). reset_all_cassandra_connections_loop(Interval) -> - {ok, ProxyPid} = gun:open(?TCP_PROXY_API_HOST, ?TCP_PROXY_API_PORT), - %% More on this proxy's API: https://github.com/emicklei/zazkia - Ref = gun:post(ProxyPid, <<"/routes/" ?TCP_PROXY_SERVICE_NAME "/links/close">>, []), - gun:await(ProxyPid, Ref), - gun:close(ProxyPid), + reset_all_cassandra_connections(), receive exit -> ok @@ -217,23 +237,50 @@ reset_all_cassandra_connections_loop(Interval) -> reset_all_cassandra_connections_loop(Interval) end. +reset_all_cassandra_connections() -> + {ok, ProxyPid} = gun:open(?TCP_PROXY_API_HOST, ?TCP_PROXY_API_PORT), + %% More on this proxy's API: https://github.com/emicklei/zazkia + Ref = gun:post(ProxyPid, <<"/routes/" ?TCP_PROXY_SERVICE_NAME "/links/close">>, []), + gun:await(ProxyPid, Ref), + gun:close(ProxyPid). + batch_main(Config, RowCount, TimeLeft) when TimeLeft > 0 -> StartTime = os:system_time(millisecond), F2 = str(), Rows = lists:sort([#{f1 => int(), f2 => F2} || _ <- lists:seq(1, RowCount)]), - ?assertMatch(ok, cql_write(Config, insert, Rows)), + ?assertMatch(ok, call_until_no_error(cql_write, Config, insert, Rows, 5)), - {Result, MaybeRows} = cql_read(Config, select_by_f2, #{f2 => F2}), + {Result, MaybeRows} = call_until_no_error(cql_read, Config, select_by_f2, #{f2 => F2}, 5), ?assertMatch({ok, _}, {Result, MaybeRows}), ?assertMatch(RowCount, length(MaybeRows)), ?assertMatch(Rows, lists:sort(MaybeRows)), + RowsToDelete = [maps:with([f1], Row) || Row <- Rows], + ?assertMatch(ok, call_until_no_error(cql_write, Config, delete, RowsToDelete, 5)), + EndTime = os:system_time(millisecond), batch_main(Config, RowCount, TimeLeft - (EndTime - StartTime)); batch_main(_Config, _RowCount, _TimeLeft) -> ok. +%% The connection is unstable, so the query can fail sometimes. +%% In this case log the error and just repeat the failing query. +call_until_no_error(F, Config, QueryName, Arg, Retries) -> + try call(F, cql_args(Config, [QueryName, Arg])) of + {error, Error} when Retries > 0 -> + ct:pal("Got error ~p from ~p(~p, ~p)", [Error, F, QueryName, Arg]), + timer:sleep(timer:seconds(1)), + call_until_no_error(F, Config, QueryName, Arg, Retries - 1); + Result -> + Result + catch + error:{badrpc, _} = Error when Retries > 0 -> + ct:pal("Got exception error:~p from ~p(~p, ~p)", [Error, F, QueryName, Arg]), + timer:sleep(timer:seconds(1)), + call_until_no_error(F, Config, QueryName, Arg, Retries - 1) + end. + getenv_or_fail(EnvVar) -> case os:getenv(EnvVar) of false -> error("environment variable " ++ EnvVar ++ "not defined");