Skip to content

Commit

Permalink
Unblock group of consumers on super stream partition
Browse files Browse the repository at this point in the history
A group of consumers on a super stream can end up blocked
without an active consumer. This can happen with consumer
churn: one consumer gets removed, which makes the active
consumer passive, but the former active consumer never
gets to know because it has been removed itself.

This commit changes the structure of the messages the SAC
coordinator sends to consumer connections, to embed enough
information to look up the group and to instruct it to choose
a new active consumer when the race condition mentioned above
comes up.

Because of the changes in the structure of messages, a feature
flag is required to make sure the SAC coordinator starts
sending the new messages only when all the nodes have been upgraded.

References #7743
  • Loading branch information
acogoluegnes authored and michaelklishin committed Apr 4, 2023
1 parent f1af3bb commit 70538c5
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 79 deletions.
9 changes: 9 additions & 0 deletions deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,12 @@
stability => stable,
depends_on => [stream_queue]
}}).


-rabbit_feature_flag(
{stream_sac_coordinator_unblock_group,
#{desc => "Bug fix to unblock a group of consumers in a super stream partition",
doc_url => "https://github.com/rabbitmq/rabbitmq-server/issues/7743",
stability => stable,
depends_on => [stream_single_active_consumer]
}}).
103 changes: 82 additions & 21 deletions deps/rabbit/src/rabbit_stream_sac_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
of
{value, Consumer} ->
G1 = remove_from_group(Consumer, Group0),
handle_consumer_removal(G1, Consumer);
handle_consumer_removal(G1, Consumer, Stream, ConsumerName);
false ->
{Group0, []}
end,
Expand All @@ -247,19 +247,24 @@ apply(#command_activate_consumer{vhost = VirtualHost,
stream = Stream,
consumer_name = ConsumerName},
#?MODULE{groups = StreamGroups0} = State0) ->
rabbit_log:debug("Activating consumer on ~tp, group ~p",
[Stream, ConsumerName]),
{G, Eff} =
case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of
undefined ->
rabbit_log:warning("trying to activate consumer in group ~tp, but "
rabbit_log:warning("Trying to activate consumer in group ~tp, but "
"the group does not longer exist",
[{VirtualHost, Stream, ConsumerName}]),
{undefined, []};
Group ->
#consumer{pid = Pid, subscription_id = SubId} =
evaluate_active_consumer(Group),
rabbit_log:debug("New active consumer on ~tp, group ~tp " ++
"is ~tp from ~tp",
[Stream, ConsumerName, SubId, Pid]),
Group1 =
update_consumer_state_in_group(Group, Pid, SubId, true),
{Group1, [notify_consumer_effect(Pid, SubId, true)]}
{Group1, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]}
end,
StreamGroups1 =
update_groups(VirtualHost, Stream, ConsumerName, G, StreamGroups0),
Expand Down Expand Up @@ -499,7 +504,8 @@ do_register_consumer(VirtualHost,
Effects =
case Active of
true ->
[notify_consumer_effect(ConnectionPid, SubscriptionId, Active)];
[notify_consumer_effect(ConnectionPid, SubscriptionId,
Stream, ConsumerName, Active)];
_ ->
[]
end,
Expand Down Expand Up @@ -527,7 +533,8 @@ do_register_consumer(VirtualHost,
active = true},
G1 = add_to_group(Consumer0, Group0),
{G1,
[notify_consumer_effect(ConnectionPid, SubscriptionId, true)]};
[notify_consumer_effect(ConnectionPid, SubscriptionId,
Stream, ConsumerName, true)]};
_G ->
%% whatever the current state is, the newcomer will be passive
Consumer0 =
Expand All @@ -546,18 +553,28 @@ do_register_consumer(VirtualHost,
%% the current active stays the same
{G1, []};
_ ->
rabbit_log:debug("SAC consumer registration: " ++
"active consumer change on stream ~tp, group ~tp. " ++
"Notifying ~tp from ~tp it is no longer active.",
[Stream, ConsumerName, ActSubId, ActPid]),
%% there's a change, telling the active it's not longer active
{update_consumer_state_in_group(G1,
ActPid,
ActSubId,
false),
[notify_consumer_effect(ActPid,
ActSubId,
Stream,
ConsumerName,
false,
true)]}
end;
false ->
%% no active consumer in the (non-empty) group, we are waiting for the reply of a former active
rabbit_log:debug("SAC consumer registration: no active consumer on stream ~tp, group ~tp. " ++
"Likely waiting for a response from former active consumer.",
[Stream, ConsumerName]),
%% no active consumer in the (non-empty) group,
%% we are waiting for the reply of a former active
{G1, []}
end
end,
Expand All @@ -571,27 +588,27 @@ do_register_consumer(VirtualHost,
lookup_consumer(ConnectionPid, SubscriptionId, Group1),
{State#?MODULE{groups = StreamGroups1}, {ok, Active}, Effects}.

handle_consumer_removal(#group{consumers = []} = G, _) ->
handle_consumer_removal(#group{consumers = []} = G, _, _, _) ->
{G, []};
handle_consumer_removal(#group{partition_index = -1} = Group0,
Consumer) ->
Consumer, Stream, ConsumerName) ->
case Consumer of
#consumer{active = true} ->
%% this is the active consumer we remove, computing the new one
Group1 = compute_active_consumer(Group0),
case lookup_active_consumer(Group1) of
{value, #consumer{pid = Pid, subscription_id = SubId}} ->
%% creating the side effect to notify the new active consumer
{Group1, [notify_consumer_effect(Pid, SubId, true)]};
{Group1, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]};
_ ->
%% no active consumer found in the group, nothing to do
{Group1, []}
end;
#consumer{active = false} ->
%% not the active consumer, nothing to do."),
%% not the active consumer, nothing to do.
{Group0, []}
end;
handle_consumer_removal(Group0, Consumer) ->
handle_consumer_removal(Group0, Consumer, Stream, ConsumerName) ->
case lookup_active_consumer(Group0) of
{value,
#consumer{pid = ActPid, subscription_id = ActSubId} =
Expand All @@ -601,40 +618,81 @@ handle_consumer_removal(Group0, Consumer) ->
%% the current active stays the same
{Group0, []};
_ ->
rabbit_log:debug("SAC consumer removal: " ++
"active consumer change on stream ~tp, group ~tp. " ++
"Notifying ~tp from ~tp it is no longer active.",
[Stream, ConsumerName, ActSubId, ActPid]),

%% there's a change, telling the active it's not longer active
{update_consumer_state_in_group(Group0,
ActPid,
ActSubId,
false),
[notify_consumer_effect(ActPid, ActSubId, false, true)]}
[notify_consumer_effect(ActPid, ActSubId,
Stream, ConsumerName, false, true)]}
end;
false ->
case Consumer#consumer.active of
true ->
%% the active one is going away, picking a new one
#consumer{pid = P, subscription_id = SID} =
evaluate_active_consumer(Group0),
rabbit_log:debug("SAC consumer removal: " ++
"active consumer change on stream ~tp, group ~tp. " ++
"Notifying ~tp from ~tp it is the new active consumer.",
[Stream, ConsumerName, SID, P]),
{update_consumer_state_in_group(Group0, P, SID, true),
[notify_consumer_effect(P, SID, true)]};
[notify_consumer_effect(P, SID,
Stream, ConsumerName, true)]};
false ->
%% no active consumer in the (non-empty) group, we are waiting for the reply of a former active
rabbit_log:debug("SAC consumer removal: no active consumer on stream ~tp, group ~tp. " ++
"Likely waiting for a response from former active consumer.",
[Stream, ConsumerName]),
%% no active consumer in the (non-empty) group,
%% we are waiting for the reply of a former active
{Group0, []}
end
end.

notify_consumer_effect(Pid, SubId, Active) ->
notify_consumer_effect(Pid, SubId, Active, false).
message_type() ->
case has_unblock_group_support() of
true ->
map;
false ->
tuple
end.

notify_consumer_effect(Pid, SubId, Stream, Name, Active) ->
notify_consumer_effect(Pid, SubId, Stream, Name, Active, false).

notify_consumer_effect(Pid, SubId, Active, false = _SteppingDown) ->
notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown) ->
notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown, message_type()).

notify_consumer_effect(Pid, SubId, _Stream, _Name, Active, false = _SteppingDown, tuple) ->
mod_call_effect(Pid,
{sac,
{{subscription_id, SubId}, {active, Active},
{{subscription_id, SubId},
{active, Active},
{extra, []}}});
notify_consumer_effect(Pid, SubId, Active, true = _SteppingDown) ->
notify_consumer_effect(Pid, SubId, _Stream, _Name, Active, true = _SteppingDown, tuple) ->
mod_call_effect(Pid,
{sac,
{{subscription_id, SubId}, {active, Active},
{extra, [{stepping_down, true}]}}}).
{{subscription_id, SubId},
{active, Active},
{extra, [{stepping_down, true}]}}});
notify_consumer_effect(Pid, SubId, Stream, Name, Active, false = _SteppingDown, map) ->
mod_call_effect(Pid,
{sac, #{subscription_id => SubId,
stream => Stream,
consumer_name => Name,
active => Active}});
notify_consumer_effect(Pid, SubId, Stream, Name, Active, true = _SteppingDown, map) ->
mod_call_effect(Pid,
{sac, #{subscription_id => SubId,
stream => Stream,
consumer_name => Name,
active => Active,
stepping_down => true}}).

maybe_create_group(VirtualHost,
Stream,
Expand Down Expand Up @@ -743,3 +801,6 @@ mod_call_effect(Pid, Msg) ->
send_message(ConnectionPid, Msg) ->
ConnectionPid ! Msg,
ok.

has_unblock_group_support() ->
rabbit_feature_flags:is_enabled(stream_sac_coordinator_unblock_group).
Loading

0 comments on commit 70538c5

Please sign in to comment.