Skip to content

Commit

Permalink
Update the cassandra tests for broken connections
Browse files Browse the repository at this point in the history
- Create a test to ensure the config survives restarts of cqerl_cluster
- Create a simplified version of the broken connection test
    that would not fail randomly.
- Improve the stress test for broken connections
  - Delete data from the DB for better repeatability (the queries were
    getting slower because of more data, resulting in timeouts)
  - Repeat failed queries to check if the connections recover
- Disable the stress test on CI as it can fail (unlikely but
    possible)

The stress test was run 100 times locally with various intervals for
the network failure. It survived the test even with the interval of 1
second, but with many failed queries (this was quite expected).
For 5 seconds it was not possible to cause a failed query locally.
  • Loading branch information
chrzaszcz committed Jan 14, 2021
1 parent 0193e15 commit 6233c37
Showing 1 changed file with 62 additions and 15 deletions.
77 changes: 62 additions & 15 deletions big_tests/tests/mongoose_cassandra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ all() ->
[
{group, happy_cases},
{group, not_so_happy_cases}
%{group, stress_cases} % not intended for CI, uncomment for manual stress test
].

groups() ->
G = [
{happy_cases, [parallel], happy_cases()},
{not_so_happy_cases, [sequence, {repeat_until_all_ok, 5}], not_so_happy_cases()}
{not_so_happy_cases, [sequence], not_so_happy_cases()},
{stress_cases, [sequence], stress_cases()}
],
ct_helper:repeat_all_until_all_ok(G).

Expand All @@ -81,7 +83,13 @@ happy_cases() ->

not_so_happy_cases() ->
[
big_batch_read_write_unstable_connection_should_succeed
should_work_after_restart_of_cqerl_cluster,
should_work_after_connection_reset
].

stress_cases() ->
[
big_batch_read_write_unstable_connection_should_succeed
].

init_per_suite(Config) ->
Expand Down Expand Up @@ -119,7 +127,8 @@ prepared_queries() ->
[
{insert, "INSERT INTO " ?TEST_TABLE_NAME " (f1, f2) VALUES (?, ?)"},
{select_by_f1, "SELECT * FROM " ?TEST_TABLE_NAME " WHERE f1 = ?"},
{select_by_f2, "SELECT * FROM " ?TEST_TABLE_NAME " WHERE f2 = ? ALLOW FILTERING"}
{select_by_f2, "SELECT * FROM " ?TEST_TABLE_NAME " WHERE f2 = ? ALLOW FILTERING"},
{delete, "DELETE FROM " ?TEST_TABLE_NAME " WHERE f1 = ?"}
].

%%.
Expand Down Expand Up @@ -189,51 +198,89 @@ big_objects_read_write_should_succeed(Config) ->

ok.

should_work_after_connection_reset(Config) ->
reset_all_cassandra_connections(),
mongoose_helper:wait_until(fun() ->
call(test_query, [?TEST_DB_POOL_NAME])
end, ok).

should_work_after_restart_of_cqerl_cluster(Config) ->
Pid1 = call(erlang, whereis, [cqerl_cluster]),
call(erlang, exit, [Pid1, kill]),
mongoose_helper:wait_until(fun() ->
Pid2 = call(erlang, whereis, [cqerl_cluster]),
Pid1 =/= Pid2 andalso Pid2 =/= undefined
end, true),
mongoose_helper:wait_until(fun() ->
call(test_query, [?TEST_DB_POOL_NAME])
end, ok).

big_batch_read_write_unstable_connection_should_succeed(Config) ->
Pid = spawn_link(fun reset_all_cassandra_connections_loop/0),
batch_main(Config, 30, timer:seconds(60)),
Pid ! exit,

ok.

batch_main(Config, 30, timer:seconds(20)),
Pid ! exit.

%%.
%%' Helpers
%%

%% Changing the Interval to 1 second makes some requests fail - cqerl might be improved there
reset_all_cassandra_connections_loop() ->
reset_all_cassandra_connections_loop(timer:seconds(5)).

reset_all_cassandra_connections_loop(Interval) ->
{ok, ProxyPid} = gun:open(?TCP_PROXY_API_HOST, ?TCP_PROXY_API_PORT),
%% More on this proxy's API: https://github.com/emicklei/zazkia
Ref = gun:post(ProxyPid, <<"/routes/" ?TCP_PROXY_SERVICE_NAME "/links/close">>, []),
gun:await(ProxyPid, Ref),
gun:close(ProxyPid),
reset_all_cassandra_connections(),
receive
exit ->
ok
after Interval ->
reset_all_cassandra_connections_loop(Interval)
end.

reset_all_cassandra_connections() ->
{ok, ProxyPid} = gun:open(?TCP_PROXY_API_HOST, ?TCP_PROXY_API_PORT),
%% More on this proxy's API: https://github.com/emicklei/zazkia
Ref = gun:post(ProxyPid, <<"/routes/" ?TCP_PROXY_SERVICE_NAME "/links/close">>, []),
gun:await(ProxyPid, Ref),
gun:close(ProxyPid).

batch_main(Config, RowCount, TimeLeft) when TimeLeft > 0 ->
StartTime = os:system_time(millisecond),
F2 = str(),
Rows = lists:sort([#{f1 => int(), f2 => F2} || _ <- lists:seq(1, RowCount)]),
?assertMatch(ok, cql_write(Config, insert, Rows)),
?assertMatch(ok, call_until_no_error(cql_write, Config, insert, Rows, 5)),

{Result, MaybeRows} = cql_read(Config, select_by_f2, #{f2 => F2}),
{Result, MaybeRows} = call_until_no_error(cql_read, Config, select_by_f2, #{f2 => F2}, 5),
?assertMatch({ok, _}, {Result, MaybeRows}),
?assertMatch(RowCount, length(MaybeRows)),
?assertMatch(Rows, lists:sort(MaybeRows)),

RowsToDelete = [maps:with([f1], Row) || Row <- Rows],
?assertMatch(ok, call_until_no_error(cql_write, Config, delete, RowsToDelete, 5)),

EndTime = os:system_time(millisecond),

batch_main(Config, RowCount, TimeLeft - (EndTime - StartTime));
batch_main(_Config, _RowCount, _TimeLeft) ->
ok.

%% The connection is unstable, so the query can fail sometimes.
%% In this case log the error and just repeat the failing query.
call_until_no_error(F, Config, QueryName, Arg, Retries) ->
try call(F, cql_args(Config, [QueryName, Arg])) of
{error, Error} when Retries > 0 ->
ct:pal("Got error ~p from ~p(~p, ~p)", [Error, F, QueryName, Arg]),
timer:sleep(timer:seconds(1)),
call_until_no_error(F, Config, QueryName, Arg, Retries - 1);
Result ->
Result
catch
error:{badrpc, _} = Error when Retries > 0 ->
ct:pal("Got exception error:~p from ~p(~p, ~p)", [Error, F, QueryName, Arg]),
timer:sleep(timer:seconds(1)),
call_until_no_error(F, Config, QueryName, Arg, Retries - 1)
end.

getenv_or_fail(EnvVar) ->
case os:getenv(EnvVar) of
false -> error("environment variable " ++ EnvVar ++ "not defined");
Expand Down

0 comments on commit 6233c37

Please sign in to comment.