Skip to content

Commit

Permalink
Merge pull request #3006 from esl/cassandra-fix
Browse files Browse the repository at this point in the history
Fix the Cassandra connections
  • Loading branch information
arcusfelis committed Jan 14, 2021
2 parents 7f60757 + 6233c37 commit bd13278
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 17 deletions.
77 changes: 62 additions & 15 deletions big_tests/tests/mongoose_cassandra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ all() ->
[
{group, happy_cases},
{group, not_so_happy_cases}
%{group, stress_cases} % not intended for CI, uncomment for manual stress test
].

groups() ->
G = [
{happy_cases, [parallel], happy_cases()},
{not_so_happy_cases, [sequence, {repeat_until_all_ok, 5}], not_so_happy_cases()}
{not_so_happy_cases, [sequence], not_so_happy_cases()},
{stress_cases, [sequence], stress_cases()}
],
ct_helper:repeat_all_until_all_ok(G).

Expand All @@ -81,7 +83,13 @@ happy_cases() ->

not_so_happy_cases() ->
[
big_batch_read_write_unstable_connection_should_succeed
should_work_after_restart_of_cqerl_cluster,
should_work_after_connection_reset
].

stress_cases() ->
[
big_batch_read_write_unstable_connection_should_succeed
].

init_per_suite(Config) ->
Expand Down Expand Up @@ -119,7 +127,8 @@ prepared_queries() ->
[
{insert, "INSERT INTO " ?TEST_TABLE_NAME " (f1, f2) VALUES (?, ?)"},
{select_by_f1, "SELECT * FROM " ?TEST_TABLE_NAME " WHERE f1 = ?"},
{select_by_f2, "SELECT * FROM " ?TEST_TABLE_NAME " WHERE f2 = ? ALLOW FILTERING"}
{select_by_f2, "SELECT * FROM " ?TEST_TABLE_NAME " WHERE f2 = ? ALLOW FILTERING"},
{delete, "DELETE FROM " ?TEST_TABLE_NAME " WHERE f1 = ?"}
].

%%.
Expand Down Expand Up @@ -189,51 +198,89 @@ big_objects_read_write_should_succeed(Config) ->

ok.

should_work_after_connection_reset(Config) ->
reset_all_cassandra_connections(),
mongoose_helper:wait_until(fun() ->
call(test_query, [?TEST_DB_POOL_NAME])
end, ok).

should_work_after_restart_of_cqerl_cluster(Config) ->
Pid1 = call(erlang, whereis, [cqerl_cluster]),
call(erlang, exit, [Pid1, kill]),
mongoose_helper:wait_until(fun() ->
Pid2 = call(erlang, whereis, [cqerl_cluster]),
Pid1 =/= Pid2 andalso Pid2 =/= undefined
end, true),
mongoose_helper:wait_until(fun() ->
call(test_query, [?TEST_DB_POOL_NAME])
end, ok).

big_batch_read_write_unstable_connection_should_succeed(Config) ->
Pid = spawn_link(fun reset_all_cassandra_connections_loop/0),
batch_main(Config, 30, timer:seconds(60)),
Pid ! exit,

ok.

batch_main(Config, 30, timer:seconds(20)),
Pid ! exit.

%%.
%%' Helpers
%%

%% Changing the Interval to 1 second makes some requests fail - cqerl might be improved there
reset_all_cassandra_connections_loop() ->
reset_all_cassandra_connections_loop(timer:seconds(5)).

reset_all_cassandra_connections_loop(Interval) ->
{ok, ProxyPid} = gun:open(?TCP_PROXY_API_HOST, ?TCP_PROXY_API_PORT),
%% More on this proxy's API: https://github.com/emicklei/zazkia
Ref = gun:post(ProxyPid, <<"/routes/" ?TCP_PROXY_SERVICE_NAME "/links/close">>, []),
gun:await(ProxyPid, Ref),
gun:close(ProxyPid),
reset_all_cassandra_connections(),
receive
exit ->
ok
after Interval ->
reset_all_cassandra_connections_loop(Interval)
end.

reset_all_cassandra_connections() ->
{ok, ProxyPid} = gun:open(?TCP_PROXY_API_HOST, ?TCP_PROXY_API_PORT),
%% More on this proxy's API: https://github.com/emicklei/zazkia
Ref = gun:post(ProxyPid, <<"/routes/" ?TCP_PROXY_SERVICE_NAME "/links/close">>, []),
gun:await(ProxyPid, Ref),
gun:close(ProxyPid).

batch_main(Config, RowCount, TimeLeft) when TimeLeft > 0 ->
StartTime = os:system_time(millisecond),
F2 = str(),
Rows = lists:sort([#{f1 => int(), f2 => F2} || _ <- lists:seq(1, RowCount)]),
?assertMatch(ok, cql_write(Config, insert, Rows)),
?assertMatch(ok, call_until_no_error(cql_write, Config, insert, Rows, 5)),

{Result, MaybeRows} = cql_read(Config, select_by_f2, #{f2 => F2}),
{Result, MaybeRows} = call_until_no_error(cql_read, Config, select_by_f2, #{f2 => F2}, 5),
?assertMatch({ok, _}, {Result, MaybeRows}),
?assertMatch(RowCount, length(MaybeRows)),
?assertMatch(Rows, lists:sort(MaybeRows)),

RowsToDelete = [maps:with([f1], Row) || Row <- Rows],
?assertMatch(ok, call_until_no_error(cql_write, Config, delete, RowsToDelete, 5)),

EndTime = os:system_time(millisecond),

batch_main(Config, RowCount, TimeLeft - (EndTime - StartTime));
batch_main(_Config, _RowCount, _TimeLeft) ->
ok.

%% The connection is unstable, so the query can fail sometimes.
%% In this case log the error and just repeat the failing query.
call_until_no_error(F, Config, QueryName, Arg, Retries) ->
try call(F, cql_args(Config, [QueryName, Arg])) of
{error, Error} when Retries > 0 ->
ct:pal("Got error ~p from ~p(~p, ~p)", [Error, F, QueryName, Arg]),
timer:sleep(timer:seconds(1)),
call_until_no_error(F, Config, QueryName, Arg, Retries - 1);
Result ->
Result
catch
error:{badrpc, _} = Error when Retries > 0 ->
ct:pal("Got exception error:~p from ~p(~p, ~p)", [Error, F, QueryName, Arg]),
timer:sleep(timer:seconds(1)),
call_until_no_error(F, Config, QueryName, Arg, Retries - 1)
end.

getenv_or_fail(EnvVar) ->
case os:getenv(EnvVar) of
false -> error("environment variable " ++ EnvVar ++ "not defined");
Expand Down
2 changes: 1 addition & 1 deletion src/cassandra/mongoose_cassandra_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ is_finished(_Req = #request{action = #read_action{}}, Result) ->
{abort, AbortReason :: term(), worker_state()} |
{retry, Timeout :: non_neg_integer(), worker_state()}.
retry_info(_, _, #request{retry_left = 0} = _Req, State) ->
{abort, retry_limit_exeeded, State};
{abort, retry_limit_exceeded, State};
retry_info({down, cqerl_client}, _Reason, _Req, State) ->
{retry, 5 + rand:uniform(20), State};
retry_info(cqerl_error, {16#1100 = _WriteTimout, _, _}, _Req, State) ->
Expand Down
10 changes: 9 additions & 1 deletion src/wpool/mongoose_wpool_cassandra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ start(Host, Tag, WpoolOptsIn, CqerlOpts) ->
PoolSize = proplists:get_value(workers, WpoolOptsIn, 20),
application:set_env(cqerl, num_clients, PoolSize),
ExtConfig = extend_config(CqerlOpts),
Res = cqerl_cluster:add_nodes(Tag, proplists:get_value(servers, ExtConfig), ExtConfig),
Servers = proplists:get_value(servers, ExtConfig),
set_cluster_config(Tag, Servers, ExtConfig),
Res = cqerl_cluster:add_nodes(Tag, Servers, ExtConfig),
case lists:keyfind(error, 1, Res) of
false ->
ok;
Expand All @@ -38,3 +40,9 @@ extend_config(PoolConfig) ->
ConfigMap = maps:merge(Defaults, maps:from_list(PoolConfig)),
maps:to_list(ConfigMap).

%% make the config survive the restart of 'cqerl_cluster' in case of a network failure
set_cluster_config(Tag, Servers, ExtConfig) ->
Clusters = application:get_env(cqerl, cassandra_clusters, []),
ClusterConfig = {Tag, {Servers, ExtConfig}},
NewClusters = lists:keystore(Tag, 1, Clusters, ClusterConfig),
application:set_env(cqerl, cassandra_clusters, NewClusters).

0 comments on commit bd13278

Please sign in to comment.