Skip to content

Commit

Permalink
Avoid send empty body to ext_proc server if decodeData() never called (
Browse files Browse the repository at this point in the history
…#28672)

* Avoid send empty body to ext_proc server if decodeData() not called

Signed-off-by: Yanjun Xiang <[email protected]>
  • Loading branch information
yanjunxiang-google authored Aug 3, 2023
1 parent 8094523 commit 5e4f350
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 43 deletions.
7 changes: 3 additions & 4 deletions api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,15 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// * Whether subsequent HTTP requests are transmitted synchronously or whether they are
// sent asynchronously.
// * To modify request or response trailers if they already exist
// * To add request or response trailers where they are not present
//
// The filter supports up to six different processing steps. Each is represented by
// a gRPC stream message that is sent to the external processor. For each message, the
// processor must send a matching response.
//
// * Request headers: Contains the headers from the original HTTP request.
// * Request body: Sent in a single message if the BUFFERED or BUFFERED_PARTIAL
// mode is chosen, in multiple messages if the STREAMED mode is chosen, and not
// at all otherwise.
// * Request body: Delivered if they are present and sent in a single message if
// the BUFFERED or BUFFERED_PARTIAL mode is chosen, in multiple messages if the
// STREAMED mode is chosen, and not at all otherwise.
// * Request trailers: Delivered if they are present and if the trailer mode is set
// to SEND.
// * Response headers: Contains the headers from the HTTP response. Keep in mind
Expand Down
18 changes: 4 additions & 14 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ FilterTrailersStatus Filter::onTrailers(ProcessorState& state, Http::HeaderMap&
return FilterTrailersStatus::StopIteration;
}

if (!body_delivered && state.bodyMode() == ProcessingMode::BUFFERED) {
if (!body_delivered && state.bufferedData() && state.bodyMode() == ProcessingMode::BUFFERED) {
// If no gRPC stream yet, opens it before sending data.
switch (openStream()) {
case StreamOpenState::Error:
Expand All @@ -477,7 +477,8 @@ FilterTrailersStatus Filter::onTrailers(ProcessorState& state, Http::HeaderMap&
// We would like to process the body in a buffered way, but until now the complete
// body has not arrived. With the arrival of trailers, we now know that the body
// has arrived.
sendBufferedData(state, ProcessorState::CallbackState::BufferedBodyCallback, true);
sendBodyChunk(state, *state.bufferedData(), ProcessorState::CallbackState::BufferedBodyCallback,
false);
state.setPaused(true);
return FilterTrailersStatus::StopIteration;
}
Expand Down Expand Up @@ -544,7 +545,7 @@ FilterTrailersStatus Filter::encodeTrailers(ResponseTrailerMap& trailers) {

void Filter::sendBodyChunk(ProcessorState& state, const Buffer::Instance& data,
ProcessorState::CallbackState new_state, bool end_stream) {
ENVOY_LOG(debug, "Sending a body chunk of {} bytes", data.length());
ENVOY_LOG(debug, "Sending a body chunk of {} bytes, end_stram {}", data.length(), end_stream);
state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(),
new_state);
ProcessingRequest req;
Expand All @@ -555,17 +556,6 @@ void Filter::sendBodyChunk(ProcessorState& state, const Buffer::Instance& data,
stats_.stream_msgs_sent_.inc();
}

void Filter::sendBufferedData(ProcessorState& state, ProcessorState::CallbackState new_state,
bool end_stream) {
if (state.hasBufferedData()) {
sendBodyChunk(state, *state.bufferedData(), new_state, end_stream);
} else {
// If there is no buffered data, sends an empty body.
Buffer::OwnedImpl data("");
sendBodyChunk(state, data, new_state, end_stream);
}
}

void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers) {
ProcessingRequest req;
auto* trailers_req = state.mutableTrailers(req);
Expand Down
3 changes: 0 additions & 3 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,6 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
void onMessageTimeout();
void onNewTimeout(const ProtobufWkt::Duration& override_message_timeout);

void sendBufferedData(ProcessorState& state, ProcessorState::CallbackState new_state,
bool end_stream);

void sendBodyChunk(ProcessorState& state, const Buffer::Instance& data,
ProcessorState::CallbackState new_state, bool end_stream);

Expand Down
19 changes: 12 additions & 7 deletions source/extensions/filters/http/ext_proc/processor_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,18 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon
} else if (complete_body_available_ && body_mode_ != ProcessingMode::NONE) {
// If we get here, then all the body data came in before the header message
// was complete, and the server wants the body. It doesn't matter whether the
// processing mode is buffered, streamed, or partially streamed -- if we get
// here then the whole body is in the buffer and we can proceed as if the
// "buffered" processing mode was set.
ENVOY_LOG(debug, "Sending buffered request body message");
filter_.sendBufferedData(*this, ProcessorState::CallbackState::BufferedBodyCallback, true);
clearWatermark();
return absl::OkStatus();
// processing mode is buffered, streamed, or partially buffered.
if (bufferedData()) {
// Get here, no_body_ = false, and complete_body_available_ = true, the end_stream
// flag of decodeData() can be determined by whether the trailers are received.
// Also, bufferedData() is not nullptr means decodeData() is called, even though
// the data can be an empty chunk.
filter_.sendBodyChunk(*this, *bufferedData(),
ProcessorState::CallbackState::BufferedBodyCallback,
!trailers_available_);
clearWatermark();
return absl::OkStatus();
}
} else if (body_mode_ == ProcessingMode::BUFFERED) {
// Here, we're not ready to continue processing because then
// we won't be able to modify the headers any more, so do nothing and
Expand Down
108 changes: 93 additions & 15 deletions test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,8 @@ TEST_P(ExtProcIntegrationTest, GetAndSetBodyAndHeadersOnResponsePartialBuffered)

// Should get just one message with the body
processResponseBodyMessage(
*grpc_upstreams_[0], false, [](const HttpBody&, BodyResponse& body_resp) {
*grpc_upstreams_[0], false, [](const HttpBody& body, BodyResponse& body_resp) {
EXPECT_TRUE(body.end_of_stream());
auto* header_mut = body_resp.mutable_response()->mutable_header_mutation();
auto* header_add = header_mut->add_set_headers();
header_add->mutable_header()->set_key("x-testing-response-header");
Expand Down Expand Up @@ -1017,7 +1018,7 @@ TEST_P(ExtProcIntegrationTest, GetAndSetBodyAndHeadersAndTrailersOnResponse) {

// Should get just one message with the body
processResponseBodyMessage(*grpc_upstreams_[0], false, [](const HttpBody& body, BodyResponse&) {
EXPECT_TRUE(body.end_of_stream());
EXPECT_FALSE(body.end_of_stream());
return true;
});

Expand Down Expand Up @@ -1424,7 +1425,8 @@ TEST_P(ExtProcIntegrationTest, GetAndIncorrectlyModifyHeaderOnBody) {
auto response = sendDownstreamRequestWithBody("Original body", absl::nullopt);
processRequestHeadersMessage(*grpc_upstreams_[0], true, absl::nullopt);
processRequestBodyMessage(
*grpc_upstreams_[0], false, [](const HttpBody&, BodyResponse& response) {
*grpc_upstreams_[0], false, [](const HttpBody& body, BodyResponse& response) {
EXPECT_TRUE(body.end_of_stream());
auto* mut = response.mutable_response()->mutable_header_mutation()->add_set_headers();
mut->mutable_header()->set_key(":scheme");
mut->mutable_header()->set_value("tcp");
Expand All @@ -1445,7 +1447,8 @@ TEST_P(ExtProcIntegrationTest, GetAndIncorrectlyModifyHeaderOnBodyPartialBuffer)
auto response = sendDownstreamRequestWithBody("Original body", absl::nullopt);
processRequestHeadersMessage(*grpc_upstreams_[0], true, absl::nullopt);
processRequestBodyMessage(
*grpc_upstreams_[0], false, [](const HttpBody&, BodyResponse& response) {
*grpc_upstreams_[0], false, [](const HttpBody& body, BodyResponse& response) {
EXPECT_TRUE(body.end_of_stream());
auto* mut = response.mutable_response()->mutable_header_mutation()->add_set_headers();
mut->mutable_header()->set_key(":scheme");
mut->mutable_header()->set_value("tcp");
Expand Down Expand Up @@ -1678,6 +1681,25 @@ TEST_P(ExtProcIntegrationTest, BufferBodyOverridePostWithEmptyBody) {
verifyDownstreamResponse(*response, 200);
}

TEST_P(ExtProcIntegrationTest, BufferEmptyBodyNotSendingHeader) {
proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SKIP);
proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::BUFFERED);
proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP);
initializeConfig();
HttpIntegrationTest::initialize();
auto response = sendDownstreamRequestWithBody("", absl::nullopt);

// We should get an empty body message this time
processRequestBodyMessage(*grpc_upstreams_[0], true, [](const HttpBody& body, BodyResponse&) {
EXPECT_TRUE(body.end_of_stream());
EXPECT_EQ(body.body().size(), 0);
return true;
});

handleUpstreamRequest();
verifyDownstreamResponse(*response, 200);
}

// Test how the filter responds when asked to buffer a response body for a POST
// request with an empty body. We should get an empty body message because
// the Envoy filter stream received the body after all the headers.
Expand Down Expand Up @@ -2163,9 +2185,9 @@ TEST_P(ExtProcIntegrationTest, RequestMessageNewTimeoutOutOfBounds) {
newTimeoutWrongConfigTest(override_message_timeout);
}

// Set the ext_proc filter in SKIP header, BUFFERED body mode.
// Send a request with headers and trailers.
TEST_P(ExtProcIntegrationTest, SendHeaderAndTrailerInBufferedMode) {
// Set the ext_proc filter in SKIP header, SEND trailer, and BUFFERED body mode.
// Send a request with headers and trailers. No body is sent to the ext_proc server.
TEST_P(ExtProcIntegrationTest, SkipHeaderSendTrailerInBufferedMode) {
proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::BUFFERED);
proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SKIP);
proto_config_.mutable_processing_mode()->set_request_trailer_mode(ProcessingMode::SEND);
Expand All @@ -2180,13 +2202,37 @@ TEST_P(ExtProcIntegrationTest, SendHeaderAndTrailerInBufferedMode) {
IntegrationStreamDecoderPtr response = std::move(encoder_decoder.second);
Http::TestRequestTrailerMapImpl request_trailers{{"request", "trailer"}};
codec_client_->sendTrailers(*request_encoder_, request_trailers);
processRequestBodyMessage(*grpc_upstreams_[0], true, absl::nullopt);
processRequestTrailersMessage(*grpc_upstreams_[0], false, absl::nullopt);
processRequestTrailersMessage(*grpc_upstreams_[0], true, absl::nullopt);
handleUpstreamRequest();
processResponseHeadersMessage(*grpc_upstreams_[0], false, absl::nullopt);
verifyDownstreamResponse(*response, 200);
}

// Set the ext_proc filter processing mode to send request header, body and trailer.
// Then have the client send header and trailer.
TEST_P(ExtProcIntegrationTest, ClientSendHeaderTrailerFilterConfigedSendAll) {
proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND);
proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::BUFFERED);
proto_config_.mutable_processing_mode()->set_request_trailer_mode(ProcessingMode::SEND);
proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP);
initializeConfig();
HttpIntegrationTest::initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));
Http::TestRequestHeaderMapImpl headers;
HttpTestUtility::addDefaultHeaders(headers);
auto encoder_decoder = codec_client_->startRequest(headers);
request_encoder_ = &encoder_decoder.first;
IntegrationStreamDecoderPtr response = std::move(encoder_decoder.second);
Http::TestRequestTrailerMapImpl request_trailers{{"request", "trailer"}};
codec_client_->sendTrailers(*request_encoder_, request_trailers);
processRequestHeadersMessage(*grpc_upstreams_[0], true, absl::nullopt);
processRequestTrailersMessage(*grpc_upstreams_[0], false, absl::nullopt);

handleUpstreamRequest();
verifyDownstreamResponse(*response, 200);
}

// Test the filter with the header allow list set and disallow list empty and
// verify only the allowed headers are sent to the ext_proc server.
TEST_P(ExtProcIntegrationTest, GetAndSetHeadersAndTrailersWithAllowedHeader) {
Expand Down Expand Up @@ -2404,7 +2450,8 @@ TEST_P(ExtProcIntegrationTest, GetAndSetBodyOnBothWithClearRouteCache) {
});
upstream_request_->encodeData(100, true);
processResponseBodyMessage(
*grpc_upstreams_[0], false, [](const HttpBody&, BodyResponse& body_resp) {
*grpc_upstreams_[0], false, [](const HttpBody& body, BodyResponse& body_resp) {
EXPECT_TRUE(body.end_of_stream());
auto* header_mut = body_resp.mutable_response()->mutable_header_mutation();
auto* header_add = header_mut->add_set_headers();
header_add->mutable_header()->set_key("x-testing-response-header");
Expand Down Expand Up @@ -2462,7 +2509,7 @@ TEST_P(ExtProcIntegrationTest, HeaderMutationCheckPassWithHcmSizeConfig) {
});
processRequestBodyMessage(
*grpc_upstreams_[0], false, [this](const HttpBody& body, BodyResponse& body_resp) {
EXPECT_TRUE(body.end_of_stream());
EXPECT_FALSE(body.end_of_stream());
addMutationSetHeaders(20, *body_resp.mutable_response()->mutable_header_mutation());
return true;
});
Expand All @@ -2480,7 +2527,7 @@ TEST_P(ExtProcIntegrationTest, HeaderMutationCheckPassWithHcmSizeConfig) {
});
processResponseBodyMessage(
*grpc_upstreams_[0], false, [this](const HttpBody& body, BodyResponse& body_resp) {
EXPECT_TRUE(body.end_of_stream());
EXPECT_FALSE(body.end_of_stream());
addMutationSetHeaders(20, *body_resp.mutable_response()->mutable_header_mutation());
return true;
});
Expand Down Expand Up @@ -2558,7 +2605,8 @@ TEST_P(ExtProcIntegrationTest, SetHeaderMutationFailWithRequestBody) {
auto response = sendDownstreamRequestWithBody("Original body", absl::nullopt);
processRequestHeadersMessage(*grpc_upstreams_[0], true, absl::nullopt);
processRequestBodyMessage(
*grpc_upstreams_[0], false, [this](const HttpBody&, BodyResponse& body_resp) {
*grpc_upstreams_[0], false, [this](const HttpBody& body, BodyResponse& body_resp) {
EXPECT_TRUE(body.end_of_stream());
addMutationSetHeaders(60, *body_resp.mutable_response()->mutable_header_mutation());
return true;
});
Expand All @@ -2585,7 +2633,8 @@ TEST_P(ExtProcIntegrationTest, RemoveHeaderMutationFailWithResponseBody) {
handleUpstreamRequest();
processResponseHeadersMessage(*grpc_upstreams_[0], true, absl::nullopt);
processResponseBodyMessage(
*grpc_upstreams_[0], false, [this](const HttpBody&, BodyResponse& body_resp) {
*grpc_upstreams_[0], false, [this](const HttpBody& body, BodyResponse& body_resp) {
EXPECT_TRUE(body.end_of_stream());
addMutationRemoveHeaders(60, *body_resp.mutable_response()->mutable_header_mutation());
return true;
});
Expand Down Expand Up @@ -2661,7 +2710,11 @@ TEST_P(ExtProcIntegrationTest, HeaderMutationResultSizeFailWithResponseTrailer)
{"x-trailer-foo-4", "foo-4"},
{"x-trailer-foo-5", "foo-5"}};
upstream_request_->encodeTrailers(response_trailers);
processResponseBodyMessage(*grpc_upstreams_[0], true, absl::nullopt);
// processResponseBodyMessage(*grpc_upstreams_[0], true, absl::nullopt);
processResponseBodyMessage(*grpc_upstreams_[0], true, [](const HttpBody& body, BodyResponse&) {
EXPECT_FALSE(body.end_of_stream());
return true;
});
processResponseTrailersMessage(
*grpc_upstreams_[0], false, [this](const HttpTrailers&, TrailersResponse& trailer_resp) {
// End result header count 46 + 5 > header count limit 50.
Expand Down Expand Up @@ -2706,4 +2759,29 @@ TEST_P(ExtProcIntegrationTest, ClientNoTrailerProcessingModeSendTrailer) {
verifyDownstreamResponse(*response, 200);
}

// Test when request trailer is received, it sends out the buffered body to ext_proc server.
TEST_P(ExtProcIntegrationTest, SkipHeaderTrailerSendBodyClientSendAll) {
proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SKIP);
proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::BUFFERED);
proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP);
initializeConfig();
HttpIntegrationTest::initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));
Http::TestRequestHeaderMapImpl headers;
HttpTestUtility::addDefaultHeaders(headers);
auto encoder_decoder = codec_client_->startRequest(headers);
request_encoder_ = &encoder_decoder.first;
codec_client_->sendData(*request_encoder_, 10, false);
IntegrationStreamDecoderPtr response = std::move(encoder_decoder.second);
Http::TestRequestTrailerMapImpl request_trailers{{"x-trailer-foo", "yes"}};
codec_client_->sendTrailers(*request_encoder_, request_trailers);
processRequestBodyMessage(*grpc_upstreams_[0], true, [](const HttpBody& body, BodyResponse&) {
EXPECT_FALSE(body.end_of_stream());
return true;
});
handleUpstreamRequest();
verifyDownstreamResponse(*response, 200);
}

} // namespace Envoy

0 comments on commit 5e4f350

Please sign in to comment.