Skip to content

Commit

Permalink
Add unpause_twice and pause_multiple_times testcases
Browse files Browse the repository at this point in the history
  • Loading branch information
arcusfelis committed May 9, 2022
1 parent e7a25c1 commit e203243
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 9 deletions.
27 changes: 18 additions & 9 deletions src/cets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ other_pids(Server) ->
pause(Server) ->
short_call(Server, pause).

-spec unpause(server_ref(), pause_monitor()) -> ok.
-spec unpause(server_ref(), pause_monitor()) -> ok | {error, unknown_pause_monitor}.
unpause(Server, PauseRef) ->
short_call(Server, {unpause, PauseRef}).

Expand Down Expand Up @@ -358,10 +358,14 @@ replicate2([RemotePid | Servers], Msg) ->
replicate2([], _Msg) ->
ok.

apply_backlog([{Msg, From} | Backlog], State) ->
apply_backlog(State = #{backlog := Backlog}) ->
apply_backlog_ops(lists:reverse(Backlog), State),
State#{backlog := []}.

apply_backlog_ops([{Msg, From} | Backlog], State) ->
handle_op(From, Msg, State),
apply_backlog(Backlog, State);
apply_backlog([], _State) ->
apply_backlog_ops(Backlog, State);
apply_backlog_ops([], _State) ->
ok.

-spec short_call(server_ref(), short_msg()) -> term().
Expand All @@ -378,17 +382,22 @@ short_call(Server, Msg) ->

%% We support multiple pauses
%% Only when all pause requests are unpaused we continue
handle_unpause(_Ref, State = #{pause_monitors := []}) ->
{reply, {error, already_unpaused}, State};
handle_unpause(Mon, State = #{backlog := Backlog, pause_monitors := Mons}) ->
handle_unpause(Mon, State = #{pause_monitors := Mons}) ->
case lists:member(Mon, Mons) of
true ->
handle_unpause2(Mon, Mons, State);
false ->
{reply, {error, unknown_pause_monitor}, State}
end.

handle_unpause2(Mon, Mons, State) ->
erlang:demonitor(Mon, [flush]),
Mons2 = lists:delete(Mon, Mons),
State2 = State#{pause_monitors := Mons2},
State3 =
case Mons2 of
[] ->
apply_backlog(lists:reverse(Backlog), State2),
State2#{backlog := []};
apply_backlog(State2);
_ ->
State2
end,
Expand Down
26 changes: 26 additions & 0 deletions test/cets_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ all() -> [test_multinode, node_list_is_correct,
test_multinode_auto_discovery, test_locally,
handle_down_is_called,
events_are_applied_in_the_correct_order_after_unpause,
pause_multiple_times,
unpause_twice,
write_returns_if_remote_server_crashes,
mon_cleaner_works, sync_using_name_works,
insert_many_request].
Expand Down Expand Up @@ -146,6 +148,30 @@ events_are_applied_in_the_correct_order_after_unpause(_Config) ->
[ok = cets:wait_response(R, 5000) || R <- [R1, R2, R3, R4]],
[{2}, {3}, {6}, {7}] = lists:sort(cets:dump(T)).

pause_multiple_times(_Config) ->
T = t5,
{ok, Pid} = cets:start(T, #{}),
PauseMon1 = cets:pause(Pid),
PauseMon2 = cets:pause(Pid),
Ref1 = cets:insert_request(Pid, {1}),
Ref2 = cets:insert_request(Pid, {2}),
[] = cets:dump(T), %% No records yet, even after pong
ok = cets:unpause(Pid, PauseMon1),
pong = cets:ping(Pid),
[] = cets:dump(T), %% No records yet, even after pong
ok = cets:unpause(Pid, PauseMon2),
pong = cets:ping(Pid),
cets:wait_response(Ref1, 5000),
cets:wait_response(Ref2, 5000),
[{1}, {2}] = lists:sort(cets:dump(T)).

unpause_twice(_Config) ->
T = t6,
{ok, Pid} = cets:start(T, #{}),
PauseMon = cets:pause(Pid),
ok = cets:unpause(Pid, PauseMon),
{error, unknown_pause_monitor} = cets:unpause(Pid, PauseMon).

write_returns_if_remote_server_crashes(_Config) ->
{ok, Pid1} = cets:start(c1, #{}),
{ok, Pid2} = cets:start(c2, #{}),
Expand Down

0 comments on commit e203243

Please sign in to comment.