diff --git a/big_tests/tests/mongoose_cassandra_SUITE.erl b/big_tests/tests/mongoose_cassandra_SUITE.erl index 8a51ce1e2b..eddc800edc 100644 --- a/big_tests/tests/mongoose_cassandra_SUITE.erl +++ b/big_tests/tests/mongoose_cassandra_SUITE.erl @@ -60,12 +60,14 @@ all() -> [ {group, happy_cases}, {group, not_so_happy_cases} + %{group, stress_cases} % not intended for CI, uncomment for manual stress test ]. groups() -> G = [ {happy_cases, [parallel], happy_cases()}, - {not_so_happy_cases, [sequence, {repeat_until_all_ok, 5}], not_so_happy_cases()} + {not_so_happy_cases, [sequence], not_so_happy_cases()}, + {stress_cases, [sequence], stress_cases()} ], ct_helper:repeat_all_until_all_ok(G). @@ -81,7 +83,13 @@ happy_cases() -> not_so_happy_cases() -> [ - big_batch_read_write_unstable_connection_should_succeed + should_work_after_restart_of_cqerl_cluster, + should_work_after_connection_reset + ]. + +stress_cases() -> + [ + big_batch_read_write_unstable_connection_should_succeed ]. init_per_suite(Config) -> @@ -119,7 +127,8 @@ prepared_queries() -> [ {insert, "INSERT INTO " ?TEST_TABLE_NAME " (f1, f2) VALUES (?, ?)"}, {select_by_f1, "SELECT * FROM " ?TEST_TABLE_NAME " WHERE f1 = ?"}, - {select_by_f2, "SELECT * FROM " ?TEST_TABLE_NAME " WHERE f2 = ? ALLOW FILTERING"} + {select_by_f2, "SELECT * FROM " ?TEST_TABLE_NAME " WHERE f2 = ? ALLOW FILTERING"}, + {delete, "DELETE FROM " ?TEST_TABLE_NAME " WHERE f1 = ?"} ]. %%. @@ -189,27 +198,38 @@ big_objects_read_write_should_succeed(Config) -> ok. +should_work_after_connection_reset(Config) -> + reset_all_cassandra_connections(), + mongoose_helper:wait_until(fun() -> + call(test_query, [?TEST_DB_POOL_NAME]) + end, ok). + +should_work_after_restart_of_cqerl_cluster(Config) -> + Pid1 = call(erlang, whereis, [cqerl_cluster]), + call(erlang, exit, [Pid1, kill]), + mongoose_helper:wait_until(fun() -> + Pid2 = call(erlang, whereis, [cqerl_cluster]), + Pid1 =/= Pid2 andalso Pid2 =/= undefined + end, true), + mongoose_helper:wait_until(fun() -> + call(test_query, [?TEST_DB_POOL_NAME]) + end, ok). + big_batch_read_write_unstable_connection_should_succeed(Config) -> Pid = spawn_link(fun reset_all_cassandra_connections_loop/0), - batch_main(Config, 30, timer:seconds(60)), - Pid ! exit, - - ok. - + batch_main(Config, 30, timer:seconds(20)), + Pid ! exit. %%. %%' Helpers %% +%% Changing the Interval to 1 second makes some requests fail - cqerl might be improved there reset_all_cassandra_connections_loop() -> reset_all_cassandra_connections_loop(timer:seconds(5)). reset_all_cassandra_connections_loop(Interval) -> - {ok, ProxyPid} = gun:open(?TCP_PROXY_API_HOST, ?TCP_PROXY_API_PORT), - %% More on this proxy's API: https://github.com/emicklei/zazkia - Ref = gun:post(ProxyPid, <<"/routes/" ?TCP_PROXY_SERVICE_NAME "/links/close">>, []), - gun:await(ProxyPid, Ref), - gun:close(ProxyPid), + reset_all_cassandra_connections(), receive exit -> ok @@ -217,23 +237,50 @@ reset_all_cassandra_connections_loop(Interval) -> reset_all_cassandra_connections_loop(Interval) end. +reset_all_cassandra_connections() -> + {ok, ProxyPid} = gun:open(?TCP_PROXY_API_HOST, ?TCP_PROXY_API_PORT), + %% More on this proxy's API: https://github.com/emicklei/zazkia + Ref = gun:post(ProxyPid, <<"/routes/" ?TCP_PROXY_SERVICE_NAME "/links/close">>, []), + gun:await(ProxyPid, Ref), + gun:close(ProxyPid). + batch_main(Config, RowCount, TimeLeft) when TimeLeft > 0 -> StartTime = os:system_time(millisecond), F2 = str(), Rows = lists:sort([#{f1 => int(), f2 => F2} || _ <- lists:seq(1, RowCount)]), - ?assertMatch(ok, cql_write(Config, insert, Rows)), + ?assertMatch(ok, call_until_no_error(cql_write, Config, insert, Rows, 5)), - {Result, MaybeRows} = cql_read(Config, select_by_f2, #{f2 => F2}), + {Result, MaybeRows} = call_until_no_error(cql_read, Config, select_by_f2, #{f2 => F2}, 5), ?assertMatch({ok, _}, {Result, MaybeRows}), ?assertMatch(RowCount, length(MaybeRows)), ?assertMatch(Rows, lists:sort(MaybeRows)), + RowsToDelete = [maps:with([f1], Row) || Row <- Rows], + ?assertMatch(ok, call_until_no_error(cql_write, Config, delete, RowsToDelete, 5)), + EndTime = os:system_time(millisecond), batch_main(Config, RowCount, TimeLeft - (EndTime - StartTime)); batch_main(_Config, _RowCount, _TimeLeft) -> ok. +%% The connection is unstable, so the query can fail sometimes. +%% In this case log the error and just repeat the failing query. +call_until_no_error(F, Config, QueryName, Arg, Retries) -> + try call(F, cql_args(Config, [QueryName, Arg])) of + {error, Error} when Retries > 0 -> + ct:pal("Got error ~p from ~p(~p, ~p)", [Error, F, QueryName, Arg]), + timer:sleep(timer:seconds(1)), + call_until_no_error(F, Config, QueryName, Arg, Retries - 1); + Result -> + Result + catch + error:{badrpc, _} = Error when Retries > 0 -> + ct:pal("Got exception error:~p from ~p(~p, ~p)", [Error, F, QueryName, Arg]), + timer:sleep(timer:seconds(1)), + call_until_no_error(F, Config, QueryName, Arg, Retries - 1) + end. + getenv_or_fail(EnvVar) -> case os:getenv(EnvVar) of false -> error("environment variable " ++ EnvVar ++ "not defined"); diff --git a/src/cassandra/mongoose_cassandra_worker.erl b/src/cassandra/mongoose_cassandra_worker.erl index b523d4bf0f..b8bb7e9602 100644 --- a/src/cassandra/mongoose_cassandra_worker.erl +++ b/src/cassandra/mongoose_cassandra_worker.erl @@ -509,7 +509,7 @@ is_finished(_Req = #request{action = #read_action{}}, Result) -> {abort, AbortReason :: term(), worker_state()} | {retry, Timeout :: non_neg_integer(), worker_state()}. retry_info(_, _, #request{retry_left = 0} = _Req, State) -> - {abort, retry_limit_exeeded, State}; + {abort, retry_limit_exceeded, State}; retry_info({down, cqerl_client}, _Reason, _Req, State) -> {retry, 5 + rand:uniform(20), State}; retry_info(cqerl_error, {16#1100 = _WriteTimout, _, _}, _Req, State) -> diff --git a/src/wpool/mongoose_wpool_cassandra.erl b/src/wpool/mongoose_wpool_cassandra.erl index e781e1d4a1..ba30afd797 100644 --- a/src/wpool/mongoose_wpool_cassandra.erl +++ b/src/wpool/mongoose_wpool_cassandra.erl @@ -13,7 +13,9 @@ start(Host, Tag, WpoolOptsIn, CqerlOpts) -> PoolSize = proplists:get_value(workers, WpoolOptsIn, 20), application:set_env(cqerl, num_clients, PoolSize), ExtConfig = extend_config(CqerlOpts), - Res = cqerl_cluster:add_nodes(Tag, proplists:get_value(servers, ExtConfig), ExtConfig), + Servers = proplists:get_value(servers, ExtConfig), + set_cluster_config(Tag, Servers, ExtConfig), + Res = cqerl_cluster:add_nodes(Tag, Servers, ExtConfig), case lists:keyfind(error, 1, Res) of false -> ok; @@ -38,3 +40,9 @@ extend_config(PoolConfig) -> ConfigMap = maps:merge(Defaults, maps:from_list(PoolConfig)), maps:to_list(ConfigMap). +%% make the config survive the restart of 'cqerl_cluster' in case of a network failure +set_cluster_config(Tag, Servers, ExtConfig) -> + Clusters = application:get_env(cqerl, cassandra_clusters, []), + ClusterConfig = {Tag, {Servers, ExtConfig}}, + NewClusters = lists:keystore(Tag, 1, Clusters, ClusterConfig), + application:set_env(cqerl, cassandra_clusters, NewClusters).