Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert ext_proc: send attributes #30781 #31017

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,15 @@ message ExternalProcessor {
// for a reply.
bool async_mode = 4;

// [#not-implemented-hide:]
// Envoy provides a number of :ref:`attributes <arch_overview_attributes>`
// for expressive policies. Each attribute name provided in this field will be
// matched against that list and populated in the request_headers message.
// See the :ref:`attribute documentation <arch_overview_request_attributes>`
// for the list of supported attributes and their types.
repeated string request_attributes = 5;

// [#not-implemented-hide:]
// Envoy provides a number of :ref:`attributes <arch_overview_attributes>`
// for expressive policies. Each attribute name provided in this field will be
// matched against that list and populated in the response_headers message.
Expand Down Expand Up @@ -255,10 +257,12 @@ message ExtProcOverrides {
// Set a different asynchronous processing option than the default.
bool async_mode = 2;

// [#not-implemented-hide:]
// Set different optional attributes than the default setting of the
// ``request_attributes`` field.
repeated string request_attributes = 3;

// [#not-implemented-hide:]
// Set different optional properties than the default setting of the
// ``response_attributes`` field.
repeated string response_attributes = 4;
Expand Down
7 changes: 0 additions & 7 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,6 @@ new_features:
- area: tracing
change: |
Provide initial span attributes to a sampler used in the OpenTelemetry tracer.
- area: ext_proc
change: |
implemented
:ref:`request_attributes <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.request_attributes>`
and
:ref:`response_attributes <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.response_attributes>`
config APIs to enable sending and receiving attributes from/to the external processing server.
- area: tracing
change: |
Added support to configure a Dynatrace resource detector for the OpenTelemetry tracer.
Expand Down
25 changes: 1 addition & 24 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ envoy_cc_library(
"ext_proc.h",
"processor_state.h",
],
copts = select({
"//bazel:windows_x86_64": [],
"//conditions:default": [
"-DUSE_CEL_PARSER",
],
}),
deps = [
":client_interface",
":mutation_utils_lib",
Expand All @@ -35,41 +29,24 @@ envoy_cc_library(
"//source/common/buffer:buffer_lib",
"//source/common/protobuf",
"//source/common/runtime:runtime_features_lib",
"//source/extensions/filters/common/expr:evaluator_lib",
"//source/extensions/filters/common/mutation_rules:mutation_rules_lib",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings:str_format",
"@com_google_cel_cpp//eval/public:builtin_func_registrar",
"@com_google_cel_cpp//eval/public:cel_expr_builder_factory",
"@envoy_api//envoy/config/common/mutation_rules/v3:pkg_cc_proto",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto",
"@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto",
] + select(
{
"//bazel:windows_x86_64": [],
"//conditions:default": [
"@com_google_cel_cpp//parser",
],
},
),
],
)

envoy_cc_extension(
name = "config",
srcs = ["config.cc"],
hdrs = ["config.h"],
copts = select({
"//bazel:windows_x86_64": [],
"//conditions:default": [
"-DUSE_CEL_PARSER",
],
}),
deps = [
":client_lib",
":ext_proc",
"//source/extensions/filters/common/expr:evaluator_lib",
"//source/extensions/filters/http/common:factory_base_lib",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto",
],
Expand Down
13 changes: 6 additions & 7 deletions source/extensions/filters/http/ext_proc/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromPro
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, message_timeout, DefaultMessageTimeoutMs);
const uint32_t max_message_timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, max_message_timeout, DefaultMaxMessageTimeoutMs);
const auto filter_config = std::make_shared<FilterConfig>(
proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms,
context.scope(), stats_prefix, Envoy::Extensions::Filters::Common::Expr::getBuilder(context));
const auto filter_config =
std::make_shared<FilterConfig>(proto_config, std::chrono::milliseconds(message_timeout_ms),
max_message_timeout_ms, context.scope(), stats_prefix);

return [filter_config, grpc_service = proto_config.grpc_service(),
&context](Http::FilterChainFactoryCallbacks& callbacks) {
Expand Down Expand Up @@ -45,10 +45,9 @@ ExternalProcessingFilterConfig::createFilterFactoryFromProtoWithServerContextTyp
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, message_timeout, DefaultMessageTimeoutMs);
const uint32_t max_message_timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, max_message_timeout, DefaultMaxMessageTimeoutMs);
const auto filter_config = std::make_shared<FilterConfig>(
proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms,
server_context.scope(), stats_prefix,
Envoy::Extensions::Filters::Common::Expr::getBuilder(server_context));
const auto filter_config =
std::make_shared<FilterConfig>(proto_config, std::chrono::milliseconds(message_timeout_ms),
max_message_timeout_ms, server_context.scope(), stats_prefix);

return [filter_config, grpc_service = proto_config.grpc_service(),
&server_context](Http::FilterChainFactoryCallbacks& callbacks) {
Expand Down
3 changes: 1 addition & 2 deletions source/extensions/filters/http/ext_proc/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.h"
#include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.validate.h"

#include "source/extensions/filters/common/expr/evaluator.h"
#include "source/extensions/filters/http/common/factory_base.h"

namespace Envoy {
Expand All @@ -30,7 +29,7 @@ class ExternalProcessingFilterConfig

Router::RouteSpecificFilterConfigConstSharedPtr createRouteSpecificFilterConfigTyped(
const envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute& proto_config,
Server::Configuration::ServerFactoryContext&,
Server::Configuration::ServerFactoryContext& context,
ProtobufMessage::ValidationVisitor& validator) override;

Http::FilterFactoryCb createFilterFactoryFromProtoWithServerContextTyped(
Expand Down
91 changes: 3 additions & 88 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@

#include "absl/strings/str_format.h"

#if defined(USE_CEL_PARSER)
#include "parser/parser.h"
#endif

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
Expand Down Expand Up @@ -117,27 +113,6 @@ ExtProcLoggingInfo::grpcCalls(envoy::config::core::v3::TrafficDirection traffic_
: encoding_processor_grpc_calls_;
}

absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>
FilterConfig::initExpressions(const Protobuf::RepeatedPtrField<std::string>& matchers) const {
absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr> expressions;
#if defined(USE_CEL_PARSER)
for (const auto& matcher : matchers) {
auto parse_status = google::api::expr::parser::Parse(matcher);
if (!parse_status.ok()) {
throw EnvoyException("Unable to parse descriptor expression: " +
parse_status.status().ToString());
}
expressions.emplace(matcher, Extensions::Filters::Common::Expr::createExpression(
builder_->builder(), parse_status.value().expr()));
}
#else
ENVOY_LOG(warn, "CEL expression parsing is not available for use in this environment."
" Attempted to parse " +
std::to_string(matchers.size()) + " expressions");
#endif
return expressions;
}

FilterConfigPerRoute::FilterConfigPerRoute(const ExtProcPerRoute& config)
: disabled_(config.disabled()) {
if (config.has_overrides()) {
Expand Down Expand Up @@ -226,8 +201,7 @@ void Filter::onDestroy() {
}

FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream,
absl::optional<ProtobufWkt::Struct> proto) {
Http::RequestOrResponseHeaderMap& headers, bool end_stream) {
switch (openStream()) {
case StreamOpenState::Error:
return FilterHeadersStatus::StopIteration;
Expand All @@ -245,9 +219,6 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
MutationUtils::headersToProto(headers, config_->allowedHeaders(), config_->disallowedHeaders(),
*headers_req->mutable_headers());
headers_req->set_end_of_stream(end_stream);
if (proto.has_value()) {
(*headers_req->mutable_attributes())[FilterName] = proto.value();
}
state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(),
ProcessorState::CallbackState::HeadersCallback);
ENVOY_LOG(debug, "Sending headers message");
Expand All @@ -257,48 +228,6 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
return FilterHeadersStatus::StopIteration;
}

const absl::optional<ProtobufWkt::Struct> Filter::evaluateAttributes(
Filters::Common::Expr::ActivationPtr activation,
const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>&
expr) {
absl::optional<ProtobufWkt::Struct> proto;
if (expr.size() > 0) {
proto.emplace(ProtobufWkt::Struct{});
for (const auto& hash_entry : expr) {
ProtobufWkt::Arena arena;
const auto result = hash_entry.second.get()->Evaluate(*activation, &arena);
if (!result.ok()) {
// TODO: Stats?
continue;
}

if (result.value().IsError()) {
ENVOY_LOG(trace, "error parsing cel expression {}", hash_entry.first);
continue;
}

ProtobufWkt::Value value;
switch (result.value().type()) {
case google::api::expr::runtime::CelValue::Type::kBool:
value.set_bool_value(result.value().BoolOrDie());
break;
case google::api::expr::runtime::CelValue::Type::kNullType:
value.set_null_value(ProtobufWkt::NullValue{});
break;
case google::api::expr::runtime::CelValue::Type::kDouble:
value.set_number_value(result.value().DoubleOrDie());
break;
default:
value.set_string_value(Filters::Common::Expr::print(result.value()));
}

(*(proto.value()).mutable_fields())[hash_entry.first] = value;
}
}

return proto;
}

FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_stream) {
ENVOY_LOG(trace, "decodeHeaders: end_stream = {}", end_stream);
mergePerRouteConfig();
Expand All @@ -308,14 +237,7 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_st

FilterHeadersStatus status = FilterHeadersStatus::Continue;
if (decoding_state_.sendHeaders()) {
absl::optional<Envoy::ProtobufWkt::Struct> proto;
if (!config_->requestExpr().empty()) {
auto activation_ptr = Filters::Common::Expr::createActivation(decoding_state_.streamInfo(),
&headers, nullptr, nullptr);
proto = evaluateAttributes(std::move(activation_ptr), config_->requestExpr());
}

status = onHeaders(decoding_state_, headers, end_stream, proto);
status = onHeaders(decoding_state_, headers, end_stream);
ENVOY_LOG(trace, "onHeaders returning {}", static_cast<int>(status));
} else {
ENVOY_LOG(trace, "decodeHeaders: Skipped header processing");
Expand Down Expand Up @@ -593,14 +515,7 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s

FilterHeadersStatus status = FilterHeadersStatus::Continue;
if (!processing_complete_ && encoding_state_.sendHeaders()) {
absl::optional<Envoy::ProtobufWkt::Struct> proto;
if (!config_->responseExpr().empty()) {
auto activation_ptr = Filters::Common::Expr::createActivation(encoding_state_.streamInfo(),
nullptr, &headers, nullptr);
proto = evaluateAttributes(std::move(activation_ptr), config_->responseExpr());
}

status = onHeaders(encoding_state_, headers, end_stream, proto);
status = onHeaders(encoding_state_, headers, end_stream);
ENVOY_LOG(trace, "onHeaders returns {}", static_cast<int>(status));
} else {
ENVOY_LOG(trace, "encodeHeaders: Skipped header processing");
Expand Down
38 changes: 4 additions & 34 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "source/common/common/logger.h"
#include "source/common/common/matchers.h"
#include "source/common/protobuf/protobuf.h"
#include "source/extensions/filters/common/expr/evaluator.h"
#include "source/extensions/filters/common/mutation_rules/mutation_rules.h"
#include "source/extensions/filters/http/common/pass_through_filter.h"
#include "source/extensions/filters/http/ext_proc/client.h"
Expand Down Expand Up @@ -122,13 +121,12 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object {
Upstream::HostDescriptionConstSharedPtr upstream_host_;
};

class FilterConfig : public Logger::Loggable<Logger::Id::ext_proc> {
class FilterConfig {
public:
FilterConfig(const envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor& config,
const std::chrono::milliseconds message_timeout,
const uint32_t max_message_timeout_ms, Stats::Scope& scope,
const std::string& stats_prefix,
Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder)
const std::string& stats_prefix)
: failure_mode_allow_(config.failure_mode_allow()),
disable_clear_route_cache_(config.disable_clear_route_cache()),
message_timeout_(message_timeout), max_message_timeout_ms_(max_message_timeout_ms),
Expand All @@ -138,9 +136,7 @@ class FilterConfig : public Logger::Loggable<Logger::Id::ext_proc> {
allow_mode_override_(config.allow_mode_override()),
disable_immediate_response_(config.disable_immediate_response()),
allowed_headers_(initHeaderMatchers(config.forward_rules().allowed_headers())),
disallowed_headers_(initHeaderMatchers(config.forward_rules().disallowed_headers())),
builder_(builder), request_expr_(initExpressions(config.request_attributes())),
response_expr_(initExpressions(config.response_attributes())) {}
disallowed_headers_(initHeaderMatchers(config.forward_rules().disallowed_headers())) {}

bool failureModeAllow() const { return failure_mode_allow_; }

Expand Down Expand Up @@ -170,16 +166,6 @@ class FilterConfig : public Logger::Loggable<Logger::Id::ext_proc> {

const Envoy::ProtobufWkt::Struct& filterMetadata() const { return filter_metadata_; }

const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>&
requestExpr() const {
return request_expr_;
}

const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>&
responseExpr() const {
return response_expr_;
}

private:
ExtProcFilterStats generateStats(const std::string& prefix,
const std::string& filter_stats_prefix, Stats::Scope& scope) {
Expand All @@ -197,9 +183,6 @@ class FilterConfig : public Logger::Loggable<Logger::Id::ext_proc> {
return header_matchers;
}

absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>
initExpressions(const Protobuf::RepeatedPtrField<std::string>& matchers) const;

const bool failure_mode_allow_;
const bool disable_clear_route_cache_;
const std::chrono::milliseconds message_timeout_;
Expand All @@ -218,13 +201,6 @@ class FilterConfig : public Logger::Loggable<Logger::Id::ext_proc> {
const std::vector<Matchers::StringMatcherPtr> allowed_headers_;
// Empty disallowed_header_ means disallow nothing, i.e, allow all.
const std::vector<Matchers::StringMatcherPtr> disallowed_headers_;

Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder_;

const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>
request_expr_;
const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>
response_expr_;
};

using FilterConfigSharedPtr = std::shared_ptr<FilterConfig>;
Expand Down Expand Up @@ -324,13 +300,7 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
void sendImmediateResponse(const envoy::service::ext_proc::v3::ImmediateResponse& response);

Http::FilterHeadersStatus onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream,
absl::optional<ProtobufWkt::Struct> proto);

const absl::optional<ProtobufWkt::Struct> evaluateAttributes(
Filters::Common::Expr::ActivationPtr activation,
const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>&
expr);
Http::RequestOrResponseHeaderMap& headers, bool end_stream);
// Return a pair of whether to terminate returning the current result.
std::pair<bool, Http::FilterDataStatus> sendStreamChunk(ProcessorState& state);
Http::FilterDataStatus onData(ProcessorState& state, Buffer::Instance& data, bool end_stream);
Expand Down
Loading
Loading