Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CQ: Fix shared store scanner missing messages #12392

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 91 additions & 88 deletions deps/rabbit/src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

-export([compact_file/2, truncate_file/4, delete_file/2]). %% internal

-export([scan_file_for_valid_messages/1]). %% salvage tool
-export([scan_file_for_valid_messages/1, scan_file_for_valid_messages/2]). %% salvage tool

-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_call/4, prioritise_cast/3,
Expand Down Expand Up @@ -1472,31 +1472,28 @@ list_sorted_filenames(Dir, Ext) ->

-define(SCAN_BLOCK_SIZE, 4194304). %% 4MB

scan_file_for_valid_messages(Dir, FileName) ->
scan_file_for_valid_messages(form_filename(Dir, FileName)).

%% Exported as a salvage tool. Not as accurate as node recovery
%% because it doesn't have the queue index.
scan_file_for_valid_messages(Path) ->
scan_file_for_valid_messages(Path, fun(Obj) -> {valid, Obj} end).

scan_file_for_valid_messages(Path, Fun) ->
case file:open(Path, [read, binary, raw]) of
{ok, Fd} ->
{ok, FileSize} = file:position(Fd, eof),
{ok, _} = file:position(Fd, bof),
Messages = scan(<<>>, Fd, 0, FileSize, #{}, []),
Messages = scan(<<>>, Fd, Fun, 0, FileSize, #{}, []),
ok = file:close(Fd),
case Messages of
[] ->
{ok, [], 0};
[{_, TotalSize, Offset}|_] ->
{ok, Messages, Offset + TotalSize}
end;
{ok, Messages};
{error, enoent} ->
{ok, [], 0};
{ok, []};
{error, Reason} ->
{error, {unable_to_scan_file,
filename:basename(Path),
Reason}}
end.

scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
scan(Buffer, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) ->
case file:read(Fd, ?SCAN_BLOCK_SIZE) of
eof ->
Acc;
Expand All @@ -1505,12 +1502,12 @@ scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
<<>> -> Data0;
_ -> <<Buffer/binary, Data0/binary>>
end,
scan_data(Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
end.

%% Message might have been found.
scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
Fd, Offset, FileSize, MsgIdsFound, Acc)
Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
when Size >= 16 ->
<<MsgIdInt:128, _/bits>> = MsgIdAndMsg,
case MsgIdsFound of
Expand All @@ -1519,26 +1516,37 @@ scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
%% simply be a coincidence. Try the next byte.
#{MsgIdInt := true} ->
<<_, Rest2/bits>> = Data,
scan_data(Rest2, Fd, Offset + 1, FileSize, MsgIdsFound, Acc);
scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc);
%% Data looks to be a message.
_ ->
%% Avoid sub-binary construction.
MsgId = <<MsgIdInt:128>>,
TotalSize = Size + 9,
scan_data(Rest, Fd, Offset + TotalSize, FileSize,
MsgIdsFound#{MsgIdInt => true},
[{MsgId, TotalSize, Offset}|Acc])
case Fun({MsgId, TotalSize, Offset}) of
%% Confirmed to be a message by the provided fun.
{valid, Entry} ->
scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize,
MsgIdsFound#{MsgIdInt => true}, [Entry|Acc]);
%% Confirmed to be a message but we don't need it anymore.
previously_valid ->
scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize,
MsgIdsFound#{MsgIdInt => true}, Acc);
%% Not a message, try the next byte.
invalid ->
<<_, Rest2/bits>> = Data,
scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc)
end
end;
%% This might be the start of a message.
scan_data(<<Size:64, Rest/bits>> = Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
scan_data(<<Size:64, Rest/bits>> = Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Rest) < Size + 1, Size < FileSize - Offset ->
scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc);
scan_data(Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
scan(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc);
scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Data) < 8 ->
scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc);
scan(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc);
%% This is definitely not a message. Try the next byte.
scan_data(<<_, Rest/bits>>, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
scan_data(Rest, Fd, Offset + 1, FileSize, MsgIdsFound, Acc).
scan_data(<<_, Rest/bits>>, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) ->
scan_data(Rest, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc).

%%----------------------------------------------------------------------------
%% Ets index
Expand Down Expand Up @@ -1742,47 +1750,39 @@ build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},

build_index_worker(Gatherer, #msstate { index_ets = IndexEts, dir = Dir },
File, Files) ->
FileName = filenum_to_name(File),
Path = form_filename(Dir, filenum_to_name(File)),
rabbit_log:debug("Rebuilding message location index from ~ts (~B file(s) remaining)",
[form_filename(Dir, FileName), length(Files)]),
[Path, length(Files)]),
%% The scan function already dealt with duplicate messages
%% within the file. We then get messages in reverse order.
{ok, Messages, FileSize} =
scan_file_for_valid_messages(Dir, FileName),
%% Valid messages are in file order so the last message is
%% the last message from the list.
{ValidMessages, ValidTotalSize} =
lists:foldl(
fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
%% Fan-out may result in the same message data in multiple
%% files so we have to guard against it.
case index_lookup(IndexEts, MsgId) of
#msg_location { file = undefined } = StoreEntry ->
ok = index_update(IndexEts, StoreEntry #msg_location {
file = File, offset = Offset,
total_size = TotalSize }),
{[Obj | VMAcc], VTSAcc + TotalSize};
_ ->
{VMAcc, VTSAcc}
end
end, {[], 0}, Messages),
FileSize1 =
case Files of
%% if it's the last file, we'll truncate to remove any
%% rubbish above the last valid message. This affects the
%% file size.
[] -> case ValidMessages of
[] -> 0;
_ -> {_MsgId, TotalSize, Offset} =
lists:last(ValidMessages),
Offset + TotalSize
end;
[_|_] -> FileSize
end,
%% within the file, and only returns valid messages (we do
%% the index lookup in the fun). But we get messages in reverse order.
{ok, Messages} = scan_file_for_valid_messages(Path,
fun (Obj = {MsgId, TotalSize, Offset}) ->
%% Fan-out may result in the same message data in multiple
%% files so we have to guard against it.
case index_lookup(IndexEts, MsgId) of
#msg_location { file = undefined } = StoreEntry ->
ok = index_update(IndexEts, StoreEntry #msg_location {
file = File, offset = Offset,
total_size = TotalSize }),
{valid, Obj};
_ ->
invalid
end
end),
ValidTotalSize = lists:foldl(fun({_, TotalSize, _}, Acc) -> Acc + TotalSize end, 0, Messages),
%% Any file may have rubbish at the end of it that we will want truncated.
%% Note that the last message in the file is the first in the list.
FileSize = case Messages of
[] ->
0;
[{_, TotalSize, Offset}|_] ->
Offset + TotalSize
end,
ok = gatherer:in(Gatherer, #file_summary {
file = File,
valid_total_size = ValidTotalSize,
file_size = FileSize1,
file_size = FileSize,
locked = false }),
ok = gatherer:finish(Gatherer).

Expand Down Expand Up @@ -1933,7 +1933,7 @@ compact_file(File, State = #gc_state { index_ets = IndexEts,
%% Load the messages. It's possible to get 0 messages here;
%% that's OK. That means we have little to do as the file is
%% about to be deleted.
{Messages, _} = scan_and_vacuum_message_file(File, State),
Messages = scan_and_vacuum_message_file(File, State),
%% Blank holes. We must do this first otherwise the file is left
%% with data that may confuse the code (for example data that looks
%% like a message, isn't a message, but spans over a real message).
Expand Down Expand Up @@ -2087,7 +2087,7 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
_ ->
[#file_summary{ valid_total_size = 0,
file_size = FileSize }] = ets:lookup(FileSummaryEts, File),
{[], 0} = scan_and_vacuum_message_file(File, State),
[] = scan_and_vacuum_message_file(File, State),
ok = file:delete(form_filename(Dir, filenum_to_name(File))),
true = ets:delete(FileSummaryEts, File),
rabbit_log:debug("Deleted empty file number ~tp; reclaimed ~tp bytes", [File, FileSize]),
Expand All @@ -2096,28 +2096,31 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,

scan_and_vacuum_message_file(File, #gc_state{ index_ets = IndexEts, dir = Dir }) ->
%% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
lists:foldl(
fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) ->
case index_lookup(IndexEts, MsgId) of
#msg_location { file = File, total_size = TotalSize,
offset = Offset, ref_count = 0 } = Entry ->
index_delete_object(IndexEts, Entry),
Acc;
#msg_location { file = File, total_size = TotalSize,
offset = Offset } = Entry ->
{[ Entry | List ], TotalSize + Size};
%% Fan-out may remove the entry but also write a new
%% entry in a different file when it needs to write
%% a message and the existing reference is in a file
%% that's about to be deleted. So we explicitly accept
%% these cases and ignore this message.
#msg_location { file = OtherFile, total_size = TotalSize }
when File =/= OtherFile ->
Acc;
not_found ->
Acc
end
end, {[], 0}, Messages).
Path = form_filename(Dir, filenum_to_name(File)),
{ok, Messages} = scan_file_for_valid_messages(Path,
fun ({MsgId, TotalSize, Offset}) ->
case index_lookup(IndexEts, MsgId) of
#msg_location { file = File, total_size = TotalSize,
offset = Offset, ref_count = 0 } = Entry ->
index_delete_object(IndexEts, Entry),
%% The message was valid, but since we have now deleted
%% it due to having no ref_count, it becomes invalid.
%% We still want to let the scan function skip though.
previously_valid;
#msg_location { file = File, total_size = TotalSize,
offset = Offset } = Entry ->
{valid, Entry};
%% Fan-out may remove the entry but also write a new
%% entry in a different file when it needs to write
%% a message and the existing reference is in a file
%% that's about to be deleted. So we explicitly accept
%% these cases and ignore this message.
#msg_location { file = OtherFile, total_size = TotalSize }
when File =/= OtherFile ->
invalid;
not_found ->
invalid
end
end),
%% @todo Do we really need to reverse messages?
lists:reverse(Messages).
23 changes: 17 additions & 6 deletions deps/rabbit/test/backing_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,22 @@ msg_store_file_scan1(Config) ->
%% Messages with no content.
ok = Scan([{bin, <<0:64, "deadbeefdeadbeef", 255>>}]),
ok = Scan([{msg, gen_id(), <<>>}]),
%% Tricky messages.
%%
%% These only get properly detected when the index is populated.
%% In this test case we simulate the index with a fun.
TrickyScan = fun (Blocks, Expected, Fun) ->
Path = gen_msg_file(Config, Blocks),
Result = rabbit_msg_store:scan_file_for_valid_messages(Path, Fun),
case Result of
Expected -> ok;
_ -> {expected, Expected, got, Result}
end
end,
ok = TrickyScan(
[{bin, <<0, 0:48, 17, 17, "idididididididid", 255, 0:4352/unit:8, 255>>}],
{ok, [{<<"idididididididid">>, 4378, 1}]},
fun(Obj = {<<"idididididididid">>, 4378, 1}) -> {valid, Obj}; (_) -> invalid end),
%% All good!!
passed.

Expand Down Expand Up @@ -662,12 +678,7 @@ gen_msg_file(Config, Blocks) ->

gen_result(Blocks) ->
Messages = gen_result(Blocks, 0, []),
case Messages of
[] ->
{ok, [], 0};
[{_, TotalSize, Offset}|_] ->
{ok, Messages, Offset + TotalSize}
end.
{ok, Messages}.

gen_result([], _, Acc) ->
Acc;
Expand Down
Loading