Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ext_proc: revise service api for attributes #32176

Merged
merged 14 commits into from
Feb 13, 2024
Merged
1 change: 1 addition & 0 deletions api/envoy/service/ext_proc/v3/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ licenses(["notice"]) # Apache 2
api_proto_package(
has_services = True,
deps = [
"//envoy/annotations:pkg",
"//envoy/config/core/v3:pkg",
"//envoy/extensions/filters/http/ext_proc/v3:pkg",
"//envoy/type/v3:pkg",
Expand Down
20 changes: 13 additions & 7 deletions api/envoy/service/ext_proc/v3/external_processor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import "envoy/type/v3/http_status.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/struct.proto";

import "envoy/annotations/deprecation.proto";
import "udpa/annotations/status.proto";
import "validate/validate.proto";

Expand Down Expand Up @@ -56,7 +57,7 @@ service ExternalProcessor {

// This represents the different types of messages that Envoy can send
// to an external processing server.
// [#next-free-field: 9]
// [#next-free-field: 10]
message ProcessingRequest {
// Specify whether the filter that sent this request is running in synchronous
// or asynchronous mode. The choice of synchronous or asynchronous mode
Expand Down Expand Up @@ -112,6 +113,12 @@ message ProcessingRequest {

// Dynamic metadata associated with the request.
config.core.v3.Metadata metadata_context = 8;

// The values of properties selected by the ``request_attributes``
// or ``response_attributes`` list in the configuration. Each entry
// in the list is populated from the standard
// :ref:`attributes <arch_overview_attributes>` supported across Envoy.
map<string, google.protobuf.Struct> attributes = 9;
}

// For every ProcessingRequest received by the server with the ``async_mode`` field
Expand Down Expand Up @@ -204,12 +211,11 @@ message HttpHeaders {
config.core.v3.HeaderMap headers = 1;

// [#not-implemented-hide:]
// The values of properties selected by the ``request_attributes``
// or ``response_attributes`` list in the configuration. Each entry
// in the list is populated
// from the standard :ref:`attributes <arch_overview_attributes>`
// supported across Envoy.
map<string, google.protobuf.Struct> attributes = 2;
// This field is deprecated and not implemented. Attributes will be sent in
// the top-level :ref:`attributes <envoy_v3_api_field_service.ext_proc.v3.ProcessingRequest.attributes`
// field.
map<string, google.protobuf.Struct> attributes = 2
[deprecated = true, (envoy.annotations.deprecated_at_minor_version) = "3.0"];

// If true, then there is no message body associated with this
// request or response.
Expand Down
57 changes: 29 additions & 28 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "source/extensions/filters/http/ext_proc/ext_proc.h"

#include <memory>

#include "envoy/config/common/mutation_rules/v3/mutation_rules.pb.h"
#include "envoy/config/core/v3/grpc_service.pb.h"
#include "envoy/extensions/filters/http/ext_proc/v3/processing_mode.pb.h"
Expand Down Expand Up @@ -276,8 +278,7 @@ void Filter::onDestroy() {
}

FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream,
ProtobufWkt::Struct* proto) {
Http::RequestOrResponseHeaderMap& headers, bool end_stream) {
switch (openStream()) {
case StreamOpenState::Error:
return FilterHeadersStatus::StopIteration;
Expand All @@ -291,14 +292,12 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
state.setHeaders(&headers);
state.setHasNoBody(end_stream);
ProcessingRequest req;
addAttributes(state, req);
addDynamicMetadata(state, req);
auto* headers_req = state.mutableHeaders(req);
MutationUtils::headersToProto(headers, config_->allowedHeaders(), config_->disallowedHeaders(),
*headers_req->mutable_headers());
headers_req->set_end_of_stream(end_stream);
if (proto != nullptr) {
(*headers_req->mutable_attributes())[FilterName] = *proto;
}
state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(),
ProcessorState::CallbackState::HeadersCallback);
ENVOY_LOG(debug, "Sending headers message");
Expand All @@ -315,19 +314,14 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_st
decoding_state_.setCompleteBodyAvailable(true);
}

// Set the request headers on decoding and encoding state in case they are
// needed later.
decoding_state_.setRequestHeaders(&headers);
encoding_state_.setRequestHeaders(&headers);

FilterHeadersStatus status = FilterHeadersStatus::Continue;
if (decoding_state_.sendHeaders()) {
ProtobufWkt::Struct proto;

if (config_->expressionManager().hasRequestExpr()) {
auto activation_ptr = Filters::Common::Expr::createActivation(
&config_->expressionManager().localInfo(), decoding_state_.callbacks()->streamInfo(),
&headers, nullptr, nullptr);
proto = config_->expressionManager().evaluateRequestAttributes(*activation_ptr);
}

status = onHeaders(decoding_state_, headers, end_stream,
config_->expressionManager().hasRequestExpr() ? &proto : nullptr);
status = onHeaders(decoding_state_, headers, end_stream);
ENVOY_LOG(trace, "onHeaders returning {}", static_cast<int>(status));
} else {
ENVOY_LOG(trace, "decodeHeaders: Skipped header processing");
Expand Down Expand Up @@ -590,7 +584,7 @@ FilterTrailersStatus Filter::onTrailers(ProcessorState& state, Http::HeaderMap&
FilterTrailersStatus Filter::decodeTrailers(RequestTrailerMap& trailers) {
ENVOY_LOG(trace, "decodeTrailers");
const auto status = onTrailers(decoding_state_, trailers);
ENVOY_LOG(trace, "encodeTrailers returning {}", static_cast<int>(status));
ENVOY_LOG(trace, "decodeTrailers returning {}", static_cast<int>(status));
return status;
}

Expand All @@ -605,17 +599,7 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s

FilterHeadersStatus status = FilterHeadersStatus::Continue;
if (!processing_complete_ && encoding_state_.sendHeaders()) {
ProtobufWkt::Struct proto;

if (config_->expressionManager().hasResponseExpr()) {
auto activation_ptr = Filters::Common::Expr::createActivation(
&config_->expressionManager().localInfo(), encoding_state_.callbacks()->streamInfo(),
nullptr, &headers, nullptr);
proto = config_->expressionManager().evaluateResponseAttributes(*activation_ptr);
}

status = onHeaders(encoding_state_, headers, end_stream,
config_->expressionManager().hasResponseExpr() ? &proto : nullptr);
status = onHeaders(encoding_state_, headers, end_stream);
ENVOY_LOG(trace, "onHeaders returns {}", static_cast<int>(status));
} else {
ENVOY_LOG(trace, "encodeHeaders: Skipped header processing");
Expand Down Expand Up @@ -650,6 +634,7 @@ ProcessingRequest Filter::setupBodyChunk(ProcessorState& state, const Buffer::In
bool end_stream) {
ENVOY_LOG(debug, "Sending a body chunk of {} bytes, end_stream {}", data.length(), end_stream);
ProcessingRequest req;
addAttributes(state, req);
addDynamicMetadata(state, req);
auto* body_req = state.mutableBody(req);
body_req->set_end_of_stream(end_stream);
Expand All @@ -667,6 +652,7 @@ void Filter::sendBodyChunk(ProcessorState& state, ProcessorState::CallbackState

void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers) {
ProcessingRequest req;
addAttributes(state, req);
addDynamicMetadata(state, req);
auto* trailers_req = state.mutableTrailers(req);
MutationUtils::headersToProto(trailers, config_->allowedHeaders(), config_->disallowedHeaders(),
Expand Down Expand Up @@ -771,6 +757,21 @@ void Filter::addDynamicMetadata(const ProcessorState& state, ProcessingRequest&
*req.mutable_metadata_context() = forwarding_metadata;
}

void Filter::addAttributes(ProcessorState& state, ProcessingRequest& req) {
if (!state.sendAttributes(config_->expressionManager())) {
return;
}

auto activation_ptr = Filters::Common::Expr::createActivation(
&config_->expressionManager().localInfo(), state.callbacks()->streamInfo(),
state.requestHeaders(), dynamic_cast<const Http::ResponseHeaderMap*>(state.responseHeaders()),
dynamic_cast<const Http::ResponseTrailerMap*>(state.responseTrailers()));
auto attributes = state.evaluateAttributes(config_->expressionManager(), *activation_ptr);

state.setSentAttributes(true);
(*req.mutable_attributes())[FilterName] = attributes;
}

void Filter::setDynamicMetadata(Http::StreamFilterCallbacks* cb, const ProcessorState& state,
const ProcessingResponse& response) {
if (state.untypedReceivingMetadataNamespaces().empty() || !response.has_dynamic_metadata()) {
Expand Down
4 changes: 2 additions & 2 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,7 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
void sendImmediateResponse(const envoy::service::ext_proc::v3::ImmediateResponse& response);

Http::FilterHeadersStatus onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream,
ProtobufWkt::Struct* proto);
Http::RequestOrResponseHeaderMap& headers, bool end_stream);

// Return a pair of whether to terminate returning the current result.
std::pair<bool, Http::FilterDataStatus> sendStreamChunk(ProcessorState& state);
Expand All @@ -386,6 +385,7 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
void setDecoderDynamicMetadata(const envoy::service::ext_proc::v3::ProcessingResponse& response);
void addDynamicMetadata(const ProcessorState& state,
envoy::service::ext_proc::v3::ProcessingRequest& req);
void addAttributes(ProcessorState& state, envoy::service::ext_proc::v3::ProcessingRequest& req);

const FilterConfigSharedPtr config_;
const ExternalProcessorClientPtr client_;
Expand Down
43 changes: 43 additions & 0 deletions source/extensions/filters/http/ext_proc/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "source/common/common/logger.h"

#include "absl/status/status.h"
#include "matching_utils.h"

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -134,8 +135,12 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
return body_mode_;
}

void setRequestHeaders(Http::RequestHeaderMap* headers) { request_headers_ = headers; }
void setHeaders(Http::RequestOrResponseHeaderMap* headers) { headers_ = headers; }
void setTrailers(Http::HeaderMap* trailers) { trailers_ = trailers; }
const Http::RequestHeaderMap* requestHeaders() const { return request_headers_; };
virtual const Http::RequestOrResponseHeaderMap* responseHeaders() const PURE;
const Http::HeaderMap* responseTrailers() const { return trailers_; }

void onStartProcessorCall(Event::TimerCb cb, std::chrono::milliseconds timeout,
CallbackState callback_state);
Expand Down Expand Up @@ -202,6 +207,14 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {

virtual Http::StreamFilterCallbacks* callbacks() const PURE;

virtual bool sendAttributes(const ExpressionManager& mgr) const PURE;

void setSentAttributes(bool sent) { attributes_sent_ = sent; }

virtual ProtobufWkt::Struct
evaluateAttributes(const ExpressionManager& mgr,
const Filters::Common::Expr::Activation& activation) const PURE;

protected:
void setBodyMode(
envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode body_mode);
Expand Down Expand Up @@ -236,6 +249,10 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
// The specific mode for body handling
envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode body_mode_;

// The request_headers_ field is guaranteed to hold the pointer to the request
// headers as set in decodeHeaders. This allows both decoding and encoding states
// to have access to the request headers map.
Http::RequestHeaderMap* request_headers_ = nullptr;
Http::RequestOrResponseHeaderMap* headers_ = nullptr;
Http::HeaderMap* trailers_ = nullptr;
Event::TimerPtr message_timer_;
Expand All @@ -250,6 +267,9 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
const std::vector<std::string>* typed_forwarding_namespaces_{};
const std::vector<std::string>* untyped_receiving_namespaces_{};

// If true, the attributes for this processing state have already been sent.
bool attributes_sent_{};

private:
virtual void clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse&) {}
};
Expand Down Expand Up @@ -324,6 +344,17 @@ class DecodingProcessorState : public ProcessorState {

Http::StreamFilterCallbacks* callbacks() const override { return decoder_callbacks_; }

bool sendAttributes(const ExpressionManager& mgr) const override {
return !attributes_sent_ && mgr.hasRequestExpr();
}

const Http::RequestOrResponseHeaderMap* responseHeaders() const override { return nullptr; }
ProtobufWkt::Struct
evaluateAttributes(const ExpressionManager& mgr,
const Filters::Common::Expr::Activation& activation) const override {
return mgr.evaluateRequestAttributes(activation);
}

private:
void setProcessingModeInternal(
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode);
Expand Down Expand Up @@ -404,6 +435,18 @@ class EncodingProcessorState : public ProcessorState {

Http::StreamFilterCallbacks* callbacks() const override { return encoder_callbacks_; }

bool sendAttributes(const ExpressionManager& mgr) const override {
return !attributes_sent_ && mgr.hasResponseExpr();
}

const Http::RequestOrResponseHeaderMap* responseHeaders() const override { return headers_; }

ProtobufWkt::Struct
evaluateAttributes(const ExpressionManager& mgr,
const Filters::Common::Expr::Activation& activation) const override {
return mgr.evaluateResponseAttributes(activation);
}

private:
void setProcessingModeInternal(
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode);
Expand Down
1 change: 1 addition & 0 deletions test/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ envoy_extension_cc_test(
"//test/proto:helloworld_proto_cc_proto",
"//test/test_common:test_runtime_lib",
"//test/test_common:utility_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/http/set_metadata/v3:pkg_cc_proto",
"@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto",
Expand Down
Loading
Loading