Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ext_proc: send attributes #30781

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,13 @@ message ExternalProcessor {
// for a reply.
bool async_mode = 4;

// [#not-implemented-hide:]
// Envoy provides a number of :ref:`attributes <arch_overview_attributes>`
// for expressive policies. Each attribute name provided in this field will be
// matched against that list and populated in the request_headers message.
// See the :ref:`attribute documentation <arch_overview_request_attributes>`
// for the list of supported attributes and their types.
repeated string request_attributes = 5;

// [#not-implemented-hide:]
// Envoy provides a number of :ref:`attributes <arch_overview_attributes>`
// for expressive policies. Each attribute name provided in this field will be
// matched against that list and populated in the response_headers message.
Expand Down Expand Up @@ -257,12 +255,10 @@ message ExtProcOverrides {
// Set a different asynchronous processing option than the default.
bool async_mode = 2;

// [#not-implemented-hide:]
// Set different optional attributes than the default setting of the
// ``request_attributes`` field.
repeated string request_attributes = 3;

// [#not-implemented-hide:]
// Set different optional properties than the default setting of the
// ``response_attributes`` field.
repeated string response_attributes = 4;
Expand Down
7 changes: 7 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -203,5 +203,12 @@ new_features:
- area: tracing
change: |
Provide initial span attributes to a sampler used in the OpenTelemetry tracer.
- area: ext_proc
change: |
implemented
:ref:`request_attributes <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.request_attributes>`
and
:ref:`response_attributes <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.response_attributes>`
config APIs to enable sending and receiving attributes from/to the external processing server.

deprecated:
25 changes: 24 additions & 1 deletion source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ envoy_cc_library(
"ext_proc.h",
"processor_state.h",
],
copts = select({
"//bazel:windows_x86_64": [],
"//conditions:default": [
"-DUSE_CEL_PARSER",
],
}),
deps = [
":client_interface",
":mutation_utils_lib",
Expand All @@ -29,24 +35,41 @@ envoy_cc_library(
"//source/common/buffer:buffer_lib",
"//source/common/protobuf",
"//source/common/runtime:runtime_features_lib",
"//source/extensions/filters/common/expr:evaluator_lib",
"//source/extensions/filters/common/mutation_rules:mutation_rules_lib",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings:str_format",
"@com_google_cel_cpp//eval/public:builtin_func_registrar",
"@com_google_cel_cpp//eval/public:cel_expr_builder_factory",
"@envoy_api//envoy/config/common/mutation_rules/v3:pkg_cc_proto",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto",
"@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto",
],
] + select(
{
"//bazel:windows_x86_64": [],
"//conditions:default": [
"@com_google_cel_cpp//parser",
],
},
),
)

envoy_cc_extension(
name = "config",
srcs = ["config.cc"],
hdrs = ["config.h"],
copts = select({
"//bazel:windows_x86_64": [],
"//conditions:default": [
"-DUSE_CEL_PARSER",
],
}),
deps = [
":client_lib",
":ext_proc",
"//source/extensions/filters/common/expr:evaluator_lib",
"//source/extensions/filters/http/common:factory_base_lib",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto",
],
Expand Down
13 changes: 7 additions & 6 deletions source/extensions/filters/http/ext_proc/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromPro
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, message_timeout, DefaultMessageTimeoutMs);
const uint32_t max_message_timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, max_message_timeout, DefaultMaxMessageTimeoutMs);
const auto filter_config =
std::make_shared<FilterConfig>(proto_config, std::chrono::milliseconds(message_timeout_ms),
max_message_timeout_ms, context.scope(), stats_prefix);
const auto filter_config = std::make_shared<FilterConfig>(
proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms,
context.scope(), stats_prefix, Envoy::Extensions::Filters::Common::Expr::getBuilder(context));

return [filter_config, grpc_service = proto_config.grpc_service(),
&context](Http::FilterChainFactoryCallbacks& callbacks) {
Expand Down Expand Up @@ -45,9 +45,10 @@ ExternalProcessingFilterConfig::createFilterFactoryFromProtoWithServerContextTyp
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, message_timeout, DefaultMessageTimeoutMs);
const uint32_t max_message_timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, max_message_timeout, DefaultMaxMessageTimeoutMs);
const auto filter_config =
std::make_shared<FilterConfig>(proto_config, std::chrono::milliseconds(message_timeout_ms),
max_message_timeout_ms, server_context.scope(), stats_prefix);
const auto filter_config = std::make_shared<FilterConfig>(
proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms,
server_context.scope(), stats_prefix,
Envoy::Extensions::Filters::Common::Expr::getBuilder(server_context));

return [filter_config, grpc_service = proto_config.grpc_service(),
&server_context](Http::FilterChainFactoryCallbacks& callbacks) {
Expand Down
3 changes: 2 additions & 1 deletion source/extensions/filters/http/ext_proc/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.h"
#include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.validate.h"

#include "source/extensions/filters/common/expr/evaluator.h"
#include "source/extensions/filters/http/common/factory_base.h"

namespace Envoy {
Expand All @@ -29,7 +30,7 @@ class ExternalProcessingFilterConfig

Router::RouteSpecificFilterConfigConstSharedPtr createRouteSpecificFilterConfigTyped(
const envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute& proto_config,
Server::Configuration::ServerFactoryContext& context,
Server::Configuration::ServerFactoryContext&,
ProtobufMessage::ValidationVisitor& validator) override;

Http::FilterFactoryCb createFilterFactoryFromProtoWithServerContextTyped(
Expand Down
91 changes: 88 additions & 3 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

#include "absl/strings/str_format.h"

#if defined(USE_CEL_PARSER)
#include "parser/parser.h"
#endif

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
Expand Down Expand Up @@ -113,6 +117,27 @@ ExtProcLoggingInfo::grpcCalls(envoy::config::core::v3::TrafficDirection traffic_
: encoding_processor_grpc_calls_;
}

absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>
FilterConfig::initExpressions(const Protobuf::RepeatedPtrField<std::string>& matchers) const {
absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr> expressions;
#if defined(USE_CEL_PARSER)
for (const auto& matcher : matchers) {
auto parse_status = google::api::expr::parser::Parse(matcher);
Copy link
Member

@tyxia tyxia Nov 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CEL usage here is not correct.

We need to extend the lifetime ofparse_status here because CEL lib specifically requires that parsed expression(return of parser::Parse() ) need to outlive the compiled expression (return ofExpr::createExpression)

You can see some correct examples here and here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to that comment, the source_info must also outlive the parsed expression, but we throw it away in this common method. Is this also a bug?

Copy link
Member

@tyxia tyxia Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation but that is probably intentional because CHECK is not performed in Envoy:

source_info is optional for EVALUATION phase (where the return of this function is used) and is used for CHECK phase( see my comment here and working example).

So, it is fine for my CEL use case because only EVAL is performed in Envoy and both PARSE and CHECK are performed in control plane, as the split model I mentioned before. (i.e., source_info is not used)

Back to common method, I think it is fine so far and in your use case here, because CHECK has not been performed in Envoy data plane.

If we really want CHECK functionality in Envoy(probably should avoid as performance overhead), we can update that common method to use the source info in that parsed expression, i.e., return of parser::Parse(matcher) which we want to save here.
cced @kyessenov the author of common method.

if (!parse_status.ok()) {
throw EnvoyException("Unable to parse descriptor expression: " +
parse_status.status().ToString());
}
expressions.emplace(matcher, Extensions::Filters::Common::Expr::createExpression(
builder_->builder(), parse_status.value().expr()));
}
#else
ENVOY_LOG(warn, "CEL expression parsing is not available for use in this environment."
" Attempted to parse " +
std::to_string(matchers.size()) + " expressions");
#endif
return expressions;
}

FilterConfigPerRoute::FilterConfigPerRoute(const ExtProcPerRoute& config)
: disabled_(config.disabled()) {
if (config.has_overrides()) {
Expand Down Expand Up @@ -201,7 +226,8 @@ void Filter::onDestroy() {
}

FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream) {
Http::RequestOrResponseHeaderMap& headers, bool end_stream,
absl::optional<ProtobufWkt::Struct> proto) {
switch (openStream()) {
case StreamOpenState::Error:
return FilterHeadersStatus::StopIteration;
Expand All @@ -219,6 +245,9 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
MutationUtils::headersToProto(headers, config_->allowedHeaders(), config_->disallowedHeaders(),
*headers_req->mutable_headers());
headers_req->set_end_of_stream(end_stream);
if (proto.has_value()) {
(*headers_req->mutable_attributes())[FilterName] = proto.value();
}
state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(),
ProcessorState::CallbackState::HeadersCallback);
ENVOY_LOG(debug, "Sending headers message");
Expand All @@ -228,6 +257,48 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
return FilterHeadersStatus::StopIteration;
}

const absl::optional<ProtobufWkt::Struct> Filter::evaluateAttributes(
Filters::Common::Expr::ActivationPtr activation,
const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>&
expr) {
absl::optional<ProtobufWkt::Struct> proto;
if (expr.size() > 0) {
proto.emplace(ProtobufWkt::Struct{});
for (const auto& hash_entry : expr) {
ProtobufWkt::Arena arena;
const auto result = hash_entry.second.get()->Evaluate(*activation, &arena);
if (!result.ok()) {
// TODO: Stats?
continue;
}

if (result.value().IsError()) {
ENVOY_LOG(trace, "error parsing cel expression {}", hash_entry.first);
continue;
}

ProtobufWkt::Value value;
switch (result.value().type()) {
case google::api::expr::runtime::CelValue::Type::kBool:
value.set_bool_value(result.value().BoolOrDie());
break;
case google::api::expr::runtime::CelValue::Type::kNullType:
value.set_null_value(ProtobufWkt::NullValue{});
break;
case google::api::expr::runtime::CelValue::Type::kDouble:
value.set_number_value(result.value().DoubleOrDie());
break;
default:
value.set_string_value(Filters::Common::Expr::print(result.value()));
}

(*(proto.value()).mutable_fields())[hash_entry.first] = value;
}
}

return proto;
}

FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_stream) {
ENVOY_LOG(trace, "decodeHeaders: end_stream = {}", end_stream);
mergePerRouteConfig();
Expand All @@ -237,7 +308,14 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_st

FilterHeadersStatus status = FilterHeadersStatus::Continue;
if (decoding_state_.sendHeaders()) {
status = onHeaders(decoding_state_, headers, end_stream);
absl::optional<Envoy::ProtobufWkt::Struct> proto;
if (!config_->requestExpr().empty()) {
auto activation_ptr = Filters::Common::Expr::createActivation(decoding_state_.streamInfo(),
&headers, nullptr, nullptr);
proto = evaluateAttributes(std::move(activation_ptr), config_->requestExpr());
}

status = onHeaders(decoding_state_, headers, end_stream, proto);
ENVOY_LOG(trace, "onHeaders returning {}", static_cast<int>(status));
} else {
ENVOY_LOG(trace, "decodeHeaders: Skipped header processing");
Expand Down Expand Up @@ -515,7 +593,14 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s

FilterHeadersStatus status = FilterHeadersStatus::Continue;
if (!processing_complete_ && encoding_state_.sendHeaders()) {
status = onHeaders(encoding_state_, headers, end_stream);
absl::optional<Envoy::ProtobufWkt::Struct> proto;
if (!config_->responseExpr().empty()) {
auto activation_ptr = Filters::Common::Expr::createActivation(encoding_state_.streamInfo(),
nullptr, &headers, nullptr);
proto = evaluateAttributes(std::move(activation_ptr), config_->responseExpr());
}

status = onHeaders(encoding_state_, headers, end_stream, proto);
ENVOY_LOG(trace, "onHeaders returns {}", static_cast<int>(status));
} else {
ENVOY_LOG(trace, "encodeHeaders: Skipped header processing");
Expand Down
38 changes: 34 additions & 4 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "source/common/common/logger.h"
#include "source/common/common/matchers.h"
#include "source/common/protobuf/protobuf.h"
#include "source/extensions/filters/common/expr/evaluator.h"
#include "source/extensions/filters/common/mutation_rules/mutation_rules.h"
#include "source/extensions/filters/http/common/pass_through_filter.h"
#include "source/extensions/filters/http/ext_proc/client.h"
Expand Down Expand Up @@ -121,12 +122,13 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object {
Upstream::HostDescriptionConstSharedPtr upstream_host_;
};

class FilterConfig {
class FilterConfig : public Logger::Loggable<Logger::Id::ext_proc> {
public:
FilterConfig(const envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor& config,
const std::chrono::milliseconds message_timeout,
const uint32_t max_message_timeout_ms, Stats::Scope& scope,
const std::string& stats_prefix)
const std::string& stats_prefix,
Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder)
: failure_mode_allow_(config.failure_mode_allow()),
disable_clear_route_cache_(config.disable_clear_route_cache()),
message_timeout_(message_timeout), max_message_timeout_ms_(max_message_timeout_ms),
Expand All @@ -136,7 +138,9 @@ class FilterConfig {
allow_mode_override_(config.allow_mode_override()),
disable_immediate_response_(config.disable_immediate_response()),
allowed_headers_(initHeaderMatchers(config.forward_rules().allowed_headers())),
disallowed_headers_(initHeaderMatchers(config.forward_rules().disallowed_headers())) {}
disallowed_headers_(initHeaderMatchers(config.forward_rules().disallowed_headers())),
builder_(builder), request_expr_(initExpressions(config.request_attributes())),
response_expr_(initExpressions(config.response_attributes())) {}

bool failureModeAllow() const { return failure_mode_allow_; }

Expand Down Expand Up @@ -166,6 +170,16 @@ class FilterConfig {

const Envoy::ProtobufWkt::Struct& filterMetadata() const { return filter_metadata_; }

const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>&
requestExpr() const {
return request_expr_;
}

const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>&
responseExpr() const {
return response_expr_;
}

private:
ExtProcFilterStats generateStats(const std::string& prefix,
const std::string& filter_stats_prefix, Stats::Scope& scope) {
Expand All @@ -183,6 +197,9 @@ class FilterConfig {
return header_matchers;
}

absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>
initExpressions(const Protobuf::RepeatedPtrField<std::string>& matchers) const;

const bool failure_mode_allow_;
const bool disable_clear_route_cache_;
const std::chrono::milliseconds message_timeout_;
Expand All @@ -201,6 +218,13 @@ class FilterConfig {
const std::vector<Matchers::StringMatcherPtr> allowed_headers_;
// Empty disallowed_header_ means disallow nothing, i.e, allow all.
const std::vector<Matchers::StringMatcherPtr> disallowed_headers_;

Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder_;

const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>
request_expr_;
const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>
response_expr_;
};

using FilterConfigSharedPtr = std::shared_ptr<FilterConfig>;
Expand Down Expand Up @@ -300,7 +324,13 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
void sendImmediateResponse(const envoy::service::ext_proc::v3::ImmediateResponse& response);

Http::FilterHeadersStatus onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream);
Http::RequestOrResponseHeaderMap& headers, bool end_stream,
absl::optional<ProtobufWkt::Struct> proto);

const absl::optional<ProtobufWkt::Struct> evaluateAttributes(
Filters::Common::Expr::ActivationPtr activation,
const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>&
expr);
// Return a pair of whether to terminate returning the current result.
std::pair<bool, Http::FilterDataStatus> sendStreamChunk(ProcessorState& state);
Http::FilterDataStatus onData(ProcessorState& state, Buffer::Instance& data, bool end_stream);
Expand Down
6 changes: 6 additions & 0 deletions source/extensions/filters/http/ext_proc/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
virtual envoy::service::ext_proc::v3::HttpTrailers*
mutableTrailers(envoy::service::ext_proc::v3::ProcessingRequest& request) const PURE;

virtual StreamInfo::StreamInfo& streamInfo() PURE;

protected:
void setBodyMode(
envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode body_mode);
Expand Down Expand Up @@ -283,6 +285,8 @@ class DecodingProcessorState : public ProcessorState {
void requestWatermark() override;
void clearWatermark() override;

StreamInfo::StreamInfo& streamInfo() override { return decoder_callbacks_->streamInfo(); }

private:
void setProcessingModeInternal(
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode);
Expand Down Expand Up @@ -356,6 +360,8 @@ class EncodingProcessorState : public ProcessorState {
void requestWatermark() override;
void clearWatermark() override;

StreamInfo::StreamInfo& streamInfo() override { return encoder_callbacks_->streamInfo(); }

private:
void setProcessingModeInternal(
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode);
Expand Down
Loading
Loading