diff --git a/deps/rabbitmq_management/priv/www/js/main.js b/deps/rabbitmq_management/priv/www/js/main.js
index 1bb37646fac0..98b13a0d3cae 100644
--- a/deps/rabbitmq_management/priv/www/js/main.js
+++ b/deps/rabbitmq_management/priv/www/js/main.js
@@ -1286,7 +1286,11 @@ function get_msgs(params) {
with_req('POST', path, JSON.stringify(params), function(resp) {
var msgs = JSON.parse(resp.responseText);
if (msgs.length == 0) {
- show_popup('info', 'Queue is empty');
+ if ("offset" in params) {
+ show_popup('info', 'No messages in stream at given offset');
+ } else {
+ show_popup('info', 'Queue is empty');
+ }
} else {
$('#msg-wrapper').slideUp(200);
replace_content('msg-wrapper', format('messages', {'msgs': msgs}));
diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs
index c9d7319bb4ff..d9bf53376131 100644
--- a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs
+++ b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs
@@ -309,19 +309,21 @@
<%= format('publish', {'mode': 'queue', 'queue': queue}) %>
-<% if (!is_stream(queue)) { %>
Get messages
+ <% if (!is_stream(queue)) { %>
Warning: getting messages from a queue is a destructive action.
+ <% } %>
-<% } %>
<% if (is_user_policymaker) { %>
diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_queue_get.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_queue_get.erl
index d08439b3a8a4..7de9eeccb828 100644
--- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_queue_get.erl
+++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_queue_get.erl
@@ -7,6 +7,7 @@
-module(rabbit_mgmt_wm_queue_get).
+-include_lib("kernel/include/logger.hrl").
-export([init/2, resource_exists/2, is_authorized/2, allow_missing_post/2,
allowed_methods/2, accept_content/2, content_types_provided/2,
content_types_accepted/2]).
@@ -47,34 +48,124 @@ accept_content(ReqData, Context) ->
do_it(ReqData0, Context) ->
VHost = rabbit_mgmt_util:vhost(ReqData0),
Q = rabbit_mgmt_util:id(queue, ReqData0),
- rabbit_mgmt_util:with_decode(
- [ackmode, count, encoding], ReqData0, Context,
- fun([AckModeBin, CountBin, EncBin], Body, ReqData) ->
- rabbit_mgmt_util:with_channel(
- VHost, ReqData, Context,
- fun (Ch) ->
- AckMode = list_to_atom(binary_to_list(AckModeBin)),
- Count = rabbit_mgmt_util:parse_int(CountBin),
- Enc = case EncBin of
- <<"auto">> -> auto;
- <<"base64">> -> base64;
- _ -> throw({error, <<"Unsupported encoding. Please use auto or base64.">>})
- end,
- Trunc = case maps:get(truncate, Body, undefined) of
- undefined -> none;
- TruncBin -> rabbit_mgmt_util:parse_int(
- TruncBin)
- end,
-
- Reply = basic_gets(Count, Ch, Q, AckMode, Enc, Trunc),
- maybe_return(Reply, Ch, AckMode),
- rabbit_mgmt_util:reply(remove_delivery_tag(Reply),
- ReqData, Context)
- end)
- end).
+ Resource = rabbit_misc:r(<<"/">>, queue, Q),
+ {ok, Queue} = rabbit_amqqueue:lookup(Resource),
+ case amqqueue:get_type(Queue) of
+ rabbit_stream_queue ->
+ rabbit_mgmt_util:with_decode(
+ [count, encoding, offset], ReqData0, Context,
+ fun([CountBin, EncBin, OffsetBin], Body, ReqData) ->
+ rabbit_mgmt_util:with_channel(
+ VHost, ReqData, Context,
+ fun (Ch) ->
+ Count = rabbit_mgmt_util:parse_int(CountBin),
+ Enc = case EncBin of
+ <<"auto">> -> auto;
+ <<"base64">> -> base64;
+ _ -> throw({error, <<"Unsupported encoding. Please use auto or base64.">>})
+ end,
+ Offset = rabbit_mgmt_util:parse_int(OffsetBin),
+ Trunc = case maps:get(truncate, Body, undefined) of
+ undefined -> none;
+ TruncBin -> rabbit_mgmt_util:parse_int(
+ TruncBin)
+ end,
+ CTag = <<"ctag">>,
+ Reply = start_subscription_gets(
+ Count, Ch, Q, CTag, Offset, Enc, Trunc),
+ rabbit_mgmt_util:reply(remove_delivery_tag(Reply),
+ ReqData, Context)
+ end)
+ end);
+ _ ->
+ rabbit_mgmt_util:with_decode(
+ [ackmode, count, encoding], ReqData0, Context,
+ fun([AckModeBin, CountBin, EncBin], Body, ReqData) ->
+ rabbit_mgmt_util:with_channel(
+ VHost, ReqData, Context,
+ fun (Ch) ->
+ AckMode = list_to_atom(binary_to_list(AckModeBin)),
+ Count = rabbit_mgmt_util:parse_int(CountBin),
+ Enc = case EncBin of
+ <<"auto">> -> auto;
+ <<"base64">> -> base64;
+ _ -> throw({error, <<"Unsupported encoding. Please use auto or base64.">>})
+ end,
+ Trunc = case maps:get(truncate, Body, undefined) of
+ undefined -> none;
+ TruncBin -> rabbit_mgmt_util:parse_int(
+ TruncBin)
+ end,
+ Reply = basic_gets(Count, Ch, Q, AckMode, Enc,
+ Trunc),
+ maybe_return(Reply, Ch, AckMode),
+ rabbit_mgmt_util:reply(remove_delivery_tag(Reply),
+ ReqData, Context)
+ end)
+ end)
+ end.
+
+start_subscription_gets(Count, Ch, Queue, CTag, Offset, Enc, Trunc) ->
+ qos(Ch, Count),
+ subscribe(Ch, Queue, false, Offset, CTag),
+ Replies = subscription_gets(Count, Ch, Queue, CTag, Offset, Enc, Trunc),
+ cancel_subscription(Ch, CTag),
+ Replies.
+
+subscription_gets(0, _Ch, _Queue, _CTag, _Offset, _Enc, _Trunc) ->
+ [];
+subscription_gets(Count, Ch, Queue, CTag, Offset, Enc, Trunc) ->
+ case subscription_get(Ch, Enc, Trunc) of
+ none -> [];
+ Reply -> [Reply | subscription_gets(Count - 1, Ch, Queue, CTag, Offset, Enc, Trunc)]
+ end.
+subscription_get(Ch, Enc, Trunc) ->
+ receive
+ {#'basic.deliver'{redelivered = Redelivered,
+ exchange = Exchange,
+ routing_key = RoutingKey,
+ delivery_tag = DeliveryTag,
+ consumer_tag = ConsumerTag},
+ #amqp_msg{props = Props, payload = Payload}} ->
+ ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ [{payload_bytes, size(Payload)},
+ {redelivered, Redelivered},
+ {exchange, Exchange},
+ {routing_key, RoutingKey},
+ {consumer_tag, ConsumerTag},
+ {properties, rabbit_mgmt_format:basic_properties(Props)}] ++
+ payload_part(maybe_truncate(Payload, Trunc), Enc)
+ after
+ 300 ->
+ none
+ end.
+subscribe(Ch, Queue, NoAck, Offset, CTag) ->
+ amqp_channel:subscribe(
+ Ch,
+ #'basic.consume'{queue = Queue,
+ no_ack = NoAck,
+ consumer_tag = CTag,
+ arguments = [{<<"x-stream-offset">>, long, Offset}]},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = CTag} ->
+ ok
+ end.
+qos(Ch, Prefetch) ->
+ #'basic.qos_ok'{} = amqp_channel:call(
+ Ch,
+ #'basic.qos'{global = false, prefetch_count = Prefetch}).
+
+cancel_subscription(Ch, CTag) ->
+ amqp_channel:call(
+ Ch,
+ #'basic.cancel'{
+ consumer_tag = CTag,
+ nowait = false}).
basic_gets(0, _, _, _, _, _) ->
[];