diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 52d3ba7b2047..0cffabb7f08c 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -271,7 +271,7 @@ query_pid(StreamId, MFA) when is_list(StreamId) -> -spec stream_overview(stream_id()) -> {ok, #{epoch := osiris:epoch(), members := #{node() := #{state := term(), - role := writer | replica, + role := {writer | replica, osiris:epoch()}, current := term(), target := running | stopped}}, num_listeners := non_neg_integer(), diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 02ee2de9efd7..043d0e0e1a1b 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -650,7 +650,7 @@ status(Vhost, QueueName) -> {error, quorum_queue_not_supported}; {ok, Q} when ?amqqueue_is_stream(Q) -> [begin - [{role, Role}, + [get_key(role, C), get_key(node, C), get_key(epoch, C), get_key(offset, C), @@ -658,7 +658,7 @@ status(Vhost, QueueName) -> get_key(first_offset, C), get_key(readers, C), get_key(segments, C)] - end || {Role, C} <- get_counters(Q)]; + end || C <- get_counters(Q)]; {error, not_found} = E -> E end. @@ -666,33 +666,38 @@ status(Vhost, QueueName) -> get_key(Key, Cnt) -> {Key, maps:get(Key, Cnt, undefined)}. --spec is_writer({pid() | undefined, writer | replica}) -> boolean(). -is_writer({_, writer}) -> true; -is_writer(_Member) -> false. +-spec get_role({pid() | undefined, writer | replica}) -> writer | replica. +get_role({_, Role}) -> Role. get_counters(Q) -> #{name := StreamId} = amqqueue:get_type_state(Q), - {ok, #{members := Members}} = rabbit_stream_coordinator:stream_overview(StreamId), + {ok, Members} = rabbit_stream_coordinator:members(StreamId), %% split members to query the writer last %% this minimizes the risk of confusing output where replicas are ahead of the writer - Writer = maps:keys(maps:filter(fun (_, M) -> is_writer(M) end, Members)), - Replicas = maps:keys(maps:filter(fun (_, M) -> not is_writer(M) end, Members)), + NodeRoles = [{Node, get_role(M)} || {Node, M} <- maps:to_list(Members)], + {Writer, Replicas} = lists:partition(fun({_, Role}) -> Role =:= writer end, NodeRoles), QName = amqqueue:get_name(Q), - Counters0 = [begin - safe_get_overview(Node, QName) - end || Node <- lists:append(Replicas, Writer)], - Counters1 = lists:filter(fun (X) -> X =/= undefined end, Counters0), + Counters = [safe_get_overview(Node, QName, Role) + || {Node, Role} <- lists:append(Replicas, Writer)], %% sort again in the original order (by node) - lists:sort(fun ({_, M1}, {_, M2}) -> maps:get(node, M1) < maps:get(node, M2) end, Counters1). + lists:sort(fun (M1, M2) -> maps:get(node, M1) < maps:get(node, M2) end, Counters). -safe_get_overview(Node, QName) -> +-spec safe_get_overview(node(), rabbit_amqqueue:name(), writer | reader) -> + map(). +safe_get_overview(Node, QName, Role) -> case rpc:call(Node, ?MODULE, get_overview, [QName]) of {badrpc, _} -> - #{node => Node}; - Result -> - Result + #{role => Role, + node => Node}; + undefined -> + #{role => Role, + node => Node}; + {Role, C} -> + C#{role => Role} end. +-spec get_overview(rabbit_amqqueue:name()) -> + {writer | reader, map()} | undefined. get_overview(QName) -> case osiris_counters:overview({osiris_writer, QName}) of undefined ->