Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 413 response handling and re-enable related couch replicator test #1234

Merged
merged 2 commits into from
Mar 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/couch/src/couch_httpd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1170,6 +1170,19 @@ before_response(Req0, Code0, Headers0, Args0) ->

respond_(#httpd{mochi_req = MochiReq}, Code, Headers, _Args, start_response) ->
MochiReq:start_response({Code, Headers});
respond_(#httpd{mochi_req = MochiReq}, 413, Headers, Args, Type) ->
% Special handling for the 413 response. Make sure the socket is closed as
% we don't know how much data was read before the error was thrown. Also
% drain all the data in the receive buffer to avoid connction being reset
% before the 413 response is parsed by the client. This is still racy, it
% just increases the chances of 413 being detected correctly by the client
% (rather than getting a brutal TCP reset).
erlang:put(mochiweb_request_force_close, true),
Socket = MochiReq:get(socket),
mochiweb_socket:recv(Socket, 0, 0),
Result = MochiReq:Type({413, Headers, Args}),
mochiweb_socket:recv(Socket, 0, 0),
Result;
respond_(#httpd{mochi_req = MochiReq}, Code, Headers, Args, Type) ->
MochiReq:Type({Code, Headers, Args}).

Expand Down
20 changes: 13 additions & 7 deletions src/couch_replicator/src/couch_replicator_httpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
-define(MAX_WAIT, 5 * 60 * 1000).
-define(STREAM_STATUS, ibrowse_stream_status).

-define(STOP_HTTP_WORKER, stop_http_worker).

% This limit is for the number of messages we're willing to discard
% from an HTTP stream in clean_mailbox/1 before killing the worker
Expand Down Expand Up @@ -78,10 +78,14 @@ send_req(HttpDb, Params1, Callback) ->
throw:{retry, NewHttpDb0, NewParams0} ->
{retry, NewHttpDb0, NewParams0}
after
ok = couch_replicator_httpc_pool:release_worker(
HttpDb#httpdb.httpc_pool,
Worker
),
Pool = HttpDb#httpdb.httpc_pool,
case get(?STOP_HTTP_WORKER) of
stop ->
ok = stop_and_release_worker(Pool, Worker),
erase(?STOP_HTTP_WORKER);
undefined ->
ok = couch_replicator_httpc_pool:release_worker(Pool, Worker)
end,
clean_mailbox(Response)
end,
% This is necessary to keep this tail-recursive. Calling
Expand Down Expand Up @@ -138,7 +142,7 @@ stop_and_release_worker(Pool, Worker) ->
ok = couch_replicator_httpc_pool:release_worker_sync(Pool, Worker).

process_response({error, sel_conn_closed}, Worker, HttpDb, Params, _Cb) ->
stop_and_release_worker(HttpDb#httpdb.httpc_pool, Worker),
put(?STOP_HTTP_WORKER, stop),
maybe_retry(sel_conn_closed, Worker, HttpDb, Params);


Expand All @@ -147,7 +151,7 @@ process_response({error, sel_conn_closed}, Worker, HttpDb, Params, _Cb) ->
%% and closes the socket, ibrowse will detect that error when it sends
%% next request.
process_response({error, connection_closing}, Worker, HttpDb, Params, _Cb) ->
stop_and_release_worker(HttpDb#httpdb.httpc_pool, Worker),
put(?STOP_HTTP_WORKER, stop),
maybe_retry({error, connection_closing}, Worker, HttpDb, Params);

process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) ->
Expand All @@ -167,6 +171,7 @@ process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
?JSON_DECODE(Json)
end,
process_auth_response(HttpDb, Ok, Headers, Params),
if Ok =:= 413 -> put(?STOP_HTTP_WORKER, stop); true -> ok end,
Callback(Ok, Headers, EJson);
R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
backoff_success(HttpDb, Params),
Expand Down Expand Up @@ -194,6 +199,7 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
stream_data_self(HttpDb1, Params, Worker, ReqId, Callback)
end,
put(?STREAM_STATUS, {streaming, Worker}),
if Ok =:= 413 -> put(?STOP_HTTP_WORKER, stop); true -> ok end,
ibrowse:stream_next(ReqId),
try
Ret = Callback(Ok, Headers, StreamDataFun),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ reduce_max_request_size_test_() ->
% attachment which exceed maximum request size are simply
% closed instead of returning a 413 request. That makes these
% tests flaky.
% ++ [{Pair, fun should_replicate_one_with_attachment/2}
% || Pair <- Pairs]
++ [{Pair, fun should_replicate_one_with_attachment/2}
|| Pair <- Pairs]
}
}.

Expand Down Expand Up @@ -90,12 +90,12 @@ should_replicate_one({From, To}, {_Ctx, {Source, Target}}) ->
% POST-ing individual documents directly and skip bulk_docs. Test that case
% separately
% See note in main test function why this was disabled.
% should_replicate_one_with_attachment({From, To}, {_Ctx, {Source, Target}}) ->
% {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
% {inorder, [should_populate_source_one_large_attachment(Source),
% should_populate_source(Source),
% should_replicate(Source, Target),
% should_compare_databases(Source, Target, [<<"doc0">>])]}}.
should_replicate_one_with_attachment({From, To}, {_Ctx, {Source, Target}}) ->
{lists:flatten(io_lib:format("~p -> ~p", [From, To])),
{inorder, [should_populate_source_one_large_attachment(Source),
should_populate_source(Source),
should_replicate(Source, Target),
should_compare_databases(Source, Target, [<<"doc0">>])]}}.


should_populate_source({remote, Source}) ->
Expand All @@ -112,11 +112,11 @@ should_populate_source_one_large_one_small(Source) ->
{timeout, ?TIMEOUT_EUNIT, ?_test(one_large_one_small(Source, 12000, 3000))}.


% should_populate_source_one_large_attachment({remote, Source}) ->
% should_populate_source_one_large_attachment(Source);
should_populate_source_one_large_attachment({remote, Source}) ->
should_populate_source_one_large_attachment(Source);

% should_populate_source_one_large_attachment(Source) ->
% {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_attachment(Source, 70000, 70000))}.
should_populate_source_one_large_attachment(Source) ->
{timeout, ?TIMEOUT_EUNIT, ?_test(one_large_attachment(Source, 70000, 70000))}.


should_replicate({remote, Source}, Target) ->
Expand Down Expand Up @@ -156,8 +156,8 @@ one_large_one_small(DbName, Large, Small) ->
add_doc(DbName, <<"doc1">>, Small, 0).


% one_large_attachment(DbName, Size, AttSize) ->
% add_doc(DbName, <<"doc0">>, Size, AttSize).
one_large_attachment(DbName, Size, AttSize) ->
add_doc(DbName, <<"doc0">>, Size, AttSize).


add_doc(DbName, DocId, Size, AttSize) when is_binary(DocId) ->
Expand Down