From 5c630d2ec50e7968a2ffba69eaa17fd651ee703c Mon Sep 17 00:00:00 2001 From: Jacob Bohanon Date: Tue, 23 Jan 2024 19:08:25 -0500 Subject: [PATCH 1/3] ext_proc: send attributes (#31090) Introduce the ability to send attributes in the External Processing Request --------- Signed-off-by: Jacob Bohanon --- .../filters/http/ext_proc/v3/ext_proc.proto | 3 - bazel/repositories.bzl | 1 + bazel/repository_locations.bzl | 2 + changelogs/current.yaml | 7 ++ source/extensions/filters/http/ext_proc/BUILD | 31 ++++++ .../filters/http/ext_proc/config.cc | 17 +-- .../filters/http/ext_proc/ext_proc.cc | 30 +++++- .../filters/http/ext_proc/ext_proc.h | 32 +++--- .../filters/http/ext_proc/matching_utils.cc | 102 ++++++++++++++++++ .../filters/http/ext_proc/matching_utils.h | 65 +++++++++++ .../filters/http/ext_proc/processor_state.h | 6 ++ test/extensions/filters/http/ext_proc/BUILD | 35 ++++++ .../filters/http/ext_proc/config_test.cc | 16 ++- .../ext_proc/ext_proc_integration_test.cc | 68 +++++++++++- .../filters/http/ext_proc/filter_test.cc | 9 +- .../filters/http/ext_proc/ordering_test.cc | 9 +- .../http/ext_proc/unit_test_fuzz/BUILD | 3 + .../unit_test_fuzz/ext_proc_unit_test_fuzz.cc | 21 +++- 18 files changed, 417 insertions(+), 40 deletions(-) create mode 100644 source/extensions/filters/http/ext_proc/matching_utils.cc create mode 100644 source/extensions/filters/http/ext_proc/matching_utils.h diff --git a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto index c9500486a3a5..9874208aabdf 100644 --- a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto +++ b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto @@ -28,7 +28,6 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // **Current Implementation Status:** // All options and processing modes are implemented except for the following: // -// * Request and response attributes are not sent and not processed. // * Dynamic metadata in responses from the external processor is ignored. // * "async mode" is not implemented. @@ -125,7 +124,6 @@ message ExternalProcessor { // for a reply. bool async_mode = 4; - // [#not-implemented-hide:] // Envoy provides a number of :ref:`attributes ` // for expressive policies. Each attribute name provided in this field will be // matched against that list and populated in the request_headers message. @@ -133,7 +131,6 @@ message ExternalProcessor { // for the list of supported attributes and their types. repeated string request_attributes = 5; - // [#not-implemented-hide:] // Envoy provides a number of :ref:`attributes ` // for expressive policies. Each attribute name provided in this field will be // matched against that list and populated in the response_headers message. diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index aa93c9c838d8..65d6de41043d 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -18,6 +18,7 @@ WINDOWS_SKIP_TARGETS = [ "envoy.access_loggers.extension_filters.cel", "envoy.rate_limit_descriptors.expr", "envoy.filters.http.rate_limit_quota", + "envoy.filters.http.ext_proc", "envoy.formatter.cel", "envoy.matching.inputs.cel_data_input", "envoy.matching.matchers.cel_matcher", diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index 65e74866380a..29b226ae7bc9 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -1214,6 +1214,7 @@ REPOSITORY_LOCATIONS_SPEC = dict( "envoy.access_loggers.wasm", "envoy.bootstrap.wasm", "envoy.rate_limit_descriptors.expr", + "envoy.filters.http.ext_proc", "envoy.filters.http.rate_limit_quota", "envoy.filters.http.rbac", "envoy.filters.http.wasm", @@ -1243,6 +1244,7 @@ REPOSITORY_LOCATIONS_SPEC = dict( "envoy.formatter.cel", "envoy.bootstrap.wasm", "envoy.rate_limit_descriptors.expr", + "envoy.filters.http.ext_proc", "envoy.filters.http.rate_limit_quota", "envoy.filters.http.rbac", "envoy.filters.http.wasm", diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 61fc804b4e27..3c5d29ced017 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -21,5 +21,12 @@ new_features: Added an off-by-default runtime flag ``envoy.reloadable_features.google_grpc_disable_tls_13`` to disable TLSv1.3 usage by gRPC SDK for ``google_grpc`` services. +- area: ext_proc + change: | + implemented + :ref:`request_attributes ` + and + :ref:`response_attributes ` + config APIs to enable sending and receiving attributes to/from the external processing server. deprecated: diff --git a/source/extensions/filters/http/ext_proc/BUILD b/source/extensions/filters/http/ext_proc/BUILD index 484395f170d3..976de7b294b8 100644 --- a/source/extensions/filters/http/ext_proc/BUILD +++ b/source/extensions/filters/http/ext_proc/BUILD @@ -19,8 +19,10 @@ envoy_cc_library( "ext_proc.h", "processor_state.h", ], + tags = ["skip_on_windows"], deps = [ ":client_interface", + ":matching_utils_lib", ":mutation_utils_lib", "//envoy/event:timer_interface", "//envoy/http:filter_interface", @@ -44,6 +46,7 @@ envoy_cc_extension( name = "config", srcs = ["config.cc"], hdrs = ["config.h"], + tags = ["skip_on_windows"], deps = [ ":client_lib", ":ext_proc", @@ -55,6 +58,7 @@ envoy_cc_extension( envoy_cc_library( name = "client_interface", hdrs = ["client.h"], + tags = ["skip_on_windows"], deps = [ "//envoy/grpc:async_client_manager_interface", "//envoy/grpc:status", @@ -80,10 +84,37 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "matching_utils_lib", + srcs = ["matching_utils.cc"], + hdrs = ["matching_utils.h"], + copts = select({ + "//bazel:windows_x86_64": [], + "//conditions:default": [ + "-DUSE_CEL_PARSER", + ], + }), + tags = ["skip_on_windows"], + deps = [ + "//envoy/http:header_map_interface", + "//source/common/protobuf", + "//source/extensions/filters/common/expr:evaluator_lib", + "@com_google_cel_cpp//eval/public:cel_expr_builder_factory", + ] + select( + { + "//bazel:windows_x86_64": [], + "//conditions:default": [ + "@com_google_cel_cpp//parser", + ], + }, + ), +) + envoy_cc_library( name = "client_lib", srcs = ["client_impl.cc"], hdrs = ["client_impl.h"], + tags = ["skip_on_windows"], deps = [ ":client_interface", "//envoy/grpc:async_client_interface", diff --git a/source/extensions/filters/http/ext_proc/config.cc b/source/extensions/filters/http/ext_proc/config.cc index d11953036843..ee45fc6a73c4 100644 --- a/source/extensions/filters/http/ext_proc/config.cc +++ b/source/extensions/filters/http/ext_proc/config.cc @@ -1,5 +1,6 @@ #include "source/extensions/filters/http/ext_proc/config.h" +#include "source/extensions/filters/common/expr/evaluator.h" #include "source/extensions/filters/http/ext_proc/client_impl.h" #include "source/extensions/filters/http/ext_proc/ext_proc.h" @@ -15,9 +16,11 @@ Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromPro PROTOBUF_GET_MS_OR_DEFAULT(proto_config, message_timeout, DefaultMessageTimeoutMs); const uint32_t max_message_timeout_ms = PROTOBUF_GET_MS_OR_DEFAULT(proto_config, max_message_timeout, DefaultMaxMessageTimeoutMs); - const auto filter_config = - std::make_shared(proto_config, std::chrono::milliseconds(message_timeout_ms), - max_message_timeout_ms, context.scope(), stats_prefix); + const auto filter_config = std::make_shared( + proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms, + context.scope(), stats_prefix, + Envoy::Extensions::Filters::Common::Expr::getBuilder(context.serverFactoryContext()), + context.serverFactoryContext().localInfo()); return [filter_config, grpc_service = proto_config.grpc_service(), &context](Http::FilterChainFactoryCallbacks& callbacks) { @@ -44,9 +47,11 @@ ExternalProcessingFilterConfig::createFilterFactoryFromProtoWithServerContextTyp PROTOBUF_GET_MS_OR_DEFAULT(proto_config, message_timeout, DefaultMessageTimeoutMs); const uint32_t max_message_timeout_ms = PROTOBUF_GET_MS_OR_DEFAULT(proto_config, max_message_timeout, DefaultMaxMessageTimeoutMs); - const auto filter_config = - std::make_shared(proto_config, std::chrono::milliseconds(message_timeout_ms), - max_message_timeout_ms, server_context.scope(), stats_prefix); + const auto filter_config = std::make_shared( + proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms, + server_context.scope(), stats_prefix, + Envoy::Extensions::Filters::Common::Expr::getBuilder(server_context), + server_context.localInfo()); return [filter_config, grpc_service = proto_config.grpc_service(), &server_context](Http::FilterChainFactoryCallbacks& callbacks) { diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 82b6848f9d3f..2a3ac43d314c 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -221,7 +221,8 @@ void Filter::onDestroy() { } FilterHeadersStatus Filter::onHeaders(ProcessorState& state, - Http::RequestOrResponseHeaderMap& headers, bool end_stream) { + Http::RequestOrResponseHeaderMap& headers, bool end_stream, + ProtobufWkt::Struct* proto) { switch (openStream()) { case StreamOpenState::Error: return FilterHeadersStatus::StopIteration; @@ -239,6 +240,9 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state, MutationUtils::headersToProto(headers, config_->allowedHeaders(), config_->disallowedHeaders(), *headers_req->mutable_headers()); headers_req->set_end_of_stream(end_stream); + if (proto != nullptr) { + (*headers_req->mutable_attributes())[FilterName] = *proto; + } state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), ProcessorState::CallbackState::HeadersCallback); ENVOY_LOG(debug, "Sending headers message"); @@ -257,7 +261,17 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_st FilterHeadersStatus status = FilterHeadersStatus::Continue; if (decoding_state_.sendHeaders()) { - status = onHeaders(decoding_state_, headers, end_stream); + ProtobufWkt::Struct proto; + + if (config_->expressionManager().hasRequestExpr()) { + auto activation_ptr = Filters::Common::Expr::createActivation( + &config_->expressionManager().localInfo(), decoding_state_.streamInfo(), &headers, + nullptr, nullptr); + proto = config_->expressionManager().evaluateRequestAttributes(*activation_ptr); + } + + status = onHeaders(decoding_state_, headers, end_stream, + config_->expressionManager().hasRequestExpr() ? &proto : nullptr); ENVOY_LOG(trace, "onHeaders returning {}", static_cast(status)); } else { ENVOY_LOG(trace, "decodeHeaders: Skipped header processing"); @@ -535,7 +549,17 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s FilterHeadersStatus status = FilterHeadersStatus::Continue; if (!processing_complete_ && encoding_state_.sendHeaders()) { - status = onHeaders(encoding_state_, headers, end_stream); + ProtobufWkt::Struct proto; + + if (config_->expressionManager().hasResponseExpr()) { + auto activation_ptr = Filters::Common::Expr::createActivation( + &config_->expressionManager().localInfo(), encoding_state_.streamInfo(), nullptr, + &headers, nullptr); + proto = config_->expressionManager().evaluateResponseAttributes(*activation_ptr); + } + + status = onHeaders(encoding_state_, headers, end_stream, + config_->expressionManager().hasResponseExpr() ? &proto : nullptr); ENVOY_LOG(trace, "onHeaders returns {}", static_cast(status)); } else { ENVOY_LOG(trace, "encodeHeaders: Skipped header processing"); diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 20a2e676719f..49954e6ac1e7 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -24,6 +24,7 @@ #include "source/extensions/filters/common/mutation_rules/mutation_rules.h" #include "source/extensions/filters/http/common/pass_through_filter.h" #include "source/extensions/filters/http/ext_proc/client.h" +#include "source/extensions/filters/http/ext_proc/matching_utils.h" #include "source/extensions/filters/http/ext_proc/processor_state.h" namespace Envoy { @@ -126,7 +127,9 @@ class FilterConfig { FilterConfig(const envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor& config, const std::chrono::milliseconds message_timeout, const uint32_t max_message_timeout_ms, Stats::Scope& scope, - const std::string& stats_prefix) + const std::string& stats_prefix, + Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder, + const LocalInfo::LocalInfo& local_info) : failure_mode_allow_(config.failure_mode_allow()), disable_clear_route_cache_(config.disable_clear_route_cache()), message_timeout_(message_timeout), max_message_timeout_ms_(max_message_timeout_ms), @@ -136,7 +139,9 @@ class FilterConfig { allow_mode_override_(config.allow_mode_override()), disable_immediate_response_(config.disable_immediate_response()), allowed_headers_(initHeaderMatchers(config.forward_rules().allowed_headers())), - disallowed_headers_(initHeaderMatchers(config.forward_rules().disallowed_headers())) {} + disallowed_headers_(initHeaderMatchers(config.forward_rules().disallowed_headers())), + expression_manager_(builder, local_info, config.request_attributes(), + config.response_attributes()) {} bool failureModeAllow() const { return failure_mode_allow_; } @@ -164,7 +169,9 @@ class FilterConfig { return disallowed_headers_; } - const Envoy::ProtobufWkt::Struct& filterMetadata() const { return filter_metadata_; } + const ProtobufWkt::Struct& filterMetadata() const { return filter_metadata_; } + + const ExpressionManager& expressionManager() const { return expression_manager_; } private: ExtProcFilterStats generateStats(const std::string& prefix, @@ -172,17 +179,6 @@ class FilterConfig { const std::string final_prefix = absl::StrCat(prefix, "ext_proc.", filter_stats_prefix); return {ALL_EXT_PROC_FILTER_STATS(POOL_COUNTER_PREFIX(scope, final_prefix))}; } - const std::vector - initHeaderMatchers(const envoy::type::matcher::v3::ListStringMatcher& header_list) { - std::vector header_matchers; - for (const auto& matcher : header_list.patterns()) { - header_matchers.push_back( - std::make_unique>( - matcher)); - } - return header_matchers; - } - const bool failure_mode_allow_; const bool disable_clear_route_cache_; const std::chrono::milliseconds message_timeout_; @@ -191,7 +187,7 @@ class FilterConfig { ExtProcFilterStats stats_; const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode processing_mode_; const Filters::Common::MutationRules::Checker mutation_checker_; - const Envoy::ProtobufWkt::Struct filter_metadata_; + const ProtobufWkt::Struct filter_metadata_; // If set to true, allow the processing mode to be modified by the ext_proc response. const bool allow_mode_override_; // If set to true, disable the immediate response from the ext_proc server, which means @@ -201,6 +197,8 @@ class FilterConfig { const std::vector allowed_headers_; // Empty disallowed_header_ means disallow nothing, i.e, allow all. const std::vector disallowed_headers_; + + const ExpressionManager expression_manager_; }; using FilterConfigSharedPtr = std::shared_ptr; @@ -315,7 +313,9 @@ class Filter : public Logger::Loggable, void sendImmediateResponse(const envoy::service::ext_proc::v3::ImmediateResponse& response); Http::FilterHeadersStatus onHeaders(ProcessorState& state, - Http::RequestOrResponseHeaderMap& headers, bool end_stream); + Http::RequestOrResponseHeaderMap& headers, bool end_stream, + ProtobufWkt::Struct* proto); + // Return a pair of whether to terminate returning the current result. std::pair sendStreamChunk(ProcessorState& state); Http::FilterDataStatus onData(ProcessorState& state, Buffer::Instance& data, bool end_stream); diff --git a/source/extensions/filters/http/ext_proc/matching_utils.cc b/source/extensions/filters/http/ext_proc/matching_utils.cc new file mode 100644 index 000000000000..e778be5ab945 --- /dev/null +++ b/source/extensions/filters/http/ext_proc/matching_utils.cc @@ -0,0 +1,102 @@ +#include "source/extensions/filters/http/ext_proc/matching_utils.h" + +#include + +#if defined(USE_CEL_PARSER) +#include "parser/parser.h" +#endif + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace ExternalProcessing { + +absl::flat_hash_map +ExpressionManager::initExpressions(const Protobuf::RepeatedPtrField& matchers) { + absl::flat_hash_map expressions; +#if defined(USE_CEL_PARSER) + for (const auto& matcher : matchers) { + if (expressions.contains(matcher)) { + continue; + } + auto parse_status = google::api::expr::parser::Parse(matcher); + if (!parse_status.ok()) { + throw EnvoyException("Unable to parse descriptor expression: " + + parse_status.status().ToString()); + } + + Filters::Common::Expr::ExpressionPtr expression = + Extensions::Filters::Common::Expr::createExpression(builder_->builder(), + parse_status.value().expr()); + + expressions.emplace( + matcher, ExpressionManager::CelExpression{parse_status.value(), std::move(expression)}); + } +#else + ENVOY_LOG(warn, "CEL expression parsing is not available for use in this environment." + " Attempted to parse " + + std::to_string(matchers.size()) + " expressions"); +#endif + return expressions; +} + +ProtobufWkt::Struct +ExpressionManager::evaluateAttributes(const Filters::Common::Expr::Activation& activation, + const absl::flat_hash_map& expr) { + + ProtobufWkt::Struct proto; + + if (expr.empty()) { + return proto; + } + + for (const auto& hash_entry : expr) { + ProtobufWkt::Arena arena; + const auto result = hash_entry.second.compiled_expr_->Evaluate(activation, &arena); + if (!result.ok()) { + // TODO: Stats? + continue; + } + + if (result.value().IsError()) { + ENVOY_LOG(trace, "error parsing cel expression {}", hash_entry.first); + continue; + } + + ProtobufWkt::Value value; + switch (result.value().type()) { + case google::api::expr::runtime::CelValue::Type::kBool: + value.set_bool_value(result.value().BoolOrDie()); + break; + case google::api::expr::runtime::CelValue::Type::kNullType: + value.set_null_value(ProtobufWkt::NullValue{}); + break; + case google::api::expr::runtime::CelValue::Type::kDouble: + value.set_number_value(result.value().DoubleOrDie()); + break; + default: + value.set_string_value(Filters::Common::Expr::print(result.value())); + } + + auto proto_mut_fields = proto.mutable_fields(); + (*proto_mut_fields)[hash_entry.first] = value; + } + + return proto; +} + +std::vector +initHeaderMatchers(const envoy::type::matcher::v3::ListStringMatcher& header_list) { + std::vector header_matchers; + for (const auto& matcher : header_list.patterns()) { + header_matchers.push_back( + std::make_unique>( + matcher)); + } + return header_matchers; +} + +} // namespace ExternalProcessing +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/ext_proc/matching_utils.h b/source/extensions/filters/http/ext_proc/matching_utils.h new file mode 100644 index 000000000000..f02c51ac2728 --- /dev/null +++ b/source/extensions/filters/http/ext_proc/matching_utils.h @@ -0,0 +1,65 @@ +#pragma once + +#include "source/common/common/logger.h" +#include "source/common/common/matchers.h" +#include "source/common/protobuf/protobuf.h" +#include "source/extensions/filters/common/expr/evaluator.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace ExternalProcessing { + +class ExpressionManager : public Logger::Loggable { +public: + struct CelExpression { + google::api::expr::v1alpha1::ParsedExpr parsed_expr_; + Filters::Common::Expr::ExpressionPtr compiled_expr_; + }; + + ExpressionManager(Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder, + const LocalInfo::LocalInfo& local_info, + const Protobuf::RepeatedPtrField& request_matchers, + const Protobuf::RepeatedPtrField& response_matchers) + : builder_(builder), local_info_(local_info), + request_expr_(initExpressions(request_matchers)), + response_expr_(initExpressions(response_matchers)){}; + + bool hasRequestExpr() const { return !request_expr_.empty(); }; + + bool hasResponseExpr() const { return !response_expr_.empty(); }; + + ProtobufWkt::Struct + evaluateRequestAttributes(const Filters::Common::Expr::Activation& activation) const { + return evaluateAttributes(activation, request_expr_); + } + + ProtobufWkt::Struct + evaluateResponseAttributes(const Filters::Common::Expr::Activation& activation) const { + return evaluateAttributes(activation, response_expr_); + } + + static ProtobufWkt::Struct + evaluateAttributes(const Filters::Common::Expr::Activation& activation, + const absl::flat_hash_map& expr); + + const LocalInfo::LocalInfo& localInfo() const { return local_info_; }; + +private: + absl::flat_hash_map + initExpressions(const Protobuf::RepeatedPtrField& matchers); + + Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder_; + const LocalInfo::LocalInfo& local_info_; + + const absl::flat_hash_map request_expr_; + const absl::flat_hash_map response_expr_; +}; + +std::vector +initHeaderMatchers(const envoy::type::matcher::v3::ListStringMatcher& header_list); + +} // namespace ExternalProcessing +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/ext_proc/processor_state.h b/source/extensions/filters/http/ext_proc/processor_state.h index de4628864941..c6c50fdecbbb 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.h +++ b/source/extensions/filters/http/ext_proc/processor_state.h @@ -172,6 +172,8 @@ class ProcessorState : public Logger::Loggable { virtual envoy::service::ext_proc::v3::HttpTrailers* mutableTrailers(envoy::service::ext_proc::v3::ProcessingRequest& request) const PURE; + virtual StreamInfo::StreamInfo& streamInfo() PURE; + protected: void setBodyMode( envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode body_mode); @@ -283,6 +285,8 @@ class DecodingProcessorState : public ProcessorState { void requestWatermark() override; void clearWatermark() override; + StreamInfo::StreamInfo& streamInfo() override { return decoder_callbacks_->streamInfo(); } + private: void setProcessingModeInternal( const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode); @@ -356,6 +360,8 @@ class EncodingProcessorState : public ProcessorState { void requestWatermark() override; void clearWatermark() override; + StreamInfo::StreamInfo& streamInfo() override { return encoder_callbacks_->streamInfo(); } + private: void setProcessingModeInternal( const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode); diff --git a/test/extensions/filters/http/ext_proc/BUILD b/test/extensions/filters/http/ext_proc/BUILD index a99fde2191ee..0f00ed80ee5b 100644 --- a/test/extensions/filters/http/ext_proc/BUILD +++ b/test/extensions/filters/http/ext_proc/BUILD @@ -19,6 +19,7 @@ envoy_extension_cc_test( size = "small", srcs = ["config_test.cc"], extension_names = ["envoy.filters.http.ext_proc"], + tags = ["skip_on_windows"], deps = [ "//source/extensions/filters/http/ext_proc:config", "//test/mocks/server:factory_context_mocks", @@ -30,7 +31,14 @@ envoy_extension_cc_test( name = "filter_test", size = "small", srcs = ["filter_test.cc"], + copts = select({ + "//bazel:windows_x86_64": [], + "//conditions:default": [ + "-DUSE_CEL_PARSER", + ], + }), extension_names = ["envoy.filters.http.ext_proc"], + tags = ["skip_on_windows"], deps = [ ":mock_server_lib", ":utils_lib", @@ -61,6 +69,7 @@ envoy_extension_cc_test( size = "small", srcs = ["state_test.cc"], extension_names = ["envoy.filters.http.ext_proc"], + tags = ["skip_on_windows"], deps = [ "//source/extensions/filters/http/ext_proc", ], @@ -71,11 +80,13 @@ envoy_extension_cc_test( size = "small", srcs = ["ordering_test.cc"], extension_names = ["envoy.filters.http.ext_proc"], + tags = ["skip_on_windows"], deps = [ ":mock_server_lib", "//source/extensions/filters/http/ext_proc", "//test/common/http:common_lib", "//test/mocks/event:event_mocks", + "//test/mocks/local_info:local_info_mocks", "//test/mocks/server:factory_context_mocks", "//test/test_common:test_runtime_lib", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", @@ -87,6 +98,7 @@ envoy_extension_cc_test( size = "small", srcs = ["client_test.cc"], extension_names = ["envoy.filters.http.ext_proc"], + tags = ["skip_on_windows"], deps = [ "//source/common/http:header_map_lib", "//source/extensions/filters/http/ext_proc:client_lib", @@ -102,6 +114,7 @@ envoy_extension_cc_test( size = "small", srcs = ["mutation_utils_test.cc"], extension_names = ["envoy.filters.http.ext_proc"], + tags = ["skip_on_windows"], deps = [ ":utils_lib", "//source/extensions/filters/common/mutation_rules:mutation_rules_lib", @@ -117,10 +130,17 @@ envoy_extension_cc_test( name = "ext_proc_integration_test", size = "large", # This test can take a while under tsan. srcs = ["ext_proc_integration_test.cc"], + copts = select({ + "//bazel:windows_x86_64": [], + "//conditions:default": [ + "-DUSE_CEL_PARSER", + ], + }), extension_names = ["envoy.filters.http.ext_proc"], shard_count = 4, tags = [ "cpu:3", + "skip_on_windows", ], deps = [ ":logging_test_filter_lib", @@ -139,9 +159,16 @@ envoy_extension_cc_test( name = "streaming_integration_test", size = "large", srcs = ["streaming_integration_test.cc"], + copts = select({ + "//bazel:windows_x86_64": [], + "//conditions:default": [ + "-DUSE_CEL_PARSER", + ], + }), extension_names = ["envoy.filters.http.ext_proc"], tags = [ "cpu:3", + "skip_on_windows", ], deps = [ ":test_processor_lib", @@ -163,6 +190,7 @@ envoy_extension_cc_test_library( srcs = ["test_processor.cc"], hdrs = ["test_processor.h"], extension_names = ["envoy.filters.http.ext_proc"], + tags = ["skip_on_windows"], deps = [ "//envoy/network:address_interface", "//test/test_common:network_utility_lib", @@ -178,6 +206,7 @@ envoy_extension_cc_test_library( srcs = ["mock_server.cc"], hdrs = ["mock_server.h"], extension_names = ["envoy.filters.http.ext_proc"], + tags = ["skip_on_windows"], deps = [ "//source/extensions/filters/http/ext_proc:client_interface", ], @@ -188,6 +217,7 @@ envoy_extension_cc_test_library( srcs = ["utils.cc"], hdrs = ["utils.h"], extension_names = ["envoy.filters.http.ext_proc"], + tags = ["skip_on_windows"], deps = [ "//envoy/http:header_map_interface", "//test/test_common:utility_lib", @@ -201,6 +231,7 @@ envoy_extension_cc_test_library( srcs = ["ext_proc_grpc_fuzz_helper.cc"], hdrs = ["ext_proc_grpc_fuzz_helper.h"], extension_names = ["envoy.filters.http.ext_proc"], + tags = ["skip_on_windows"], deps = [ "//source/common/common:thread_lib", "//source/common/grpc:common_lib", @@ -243,6 +274,7 @@ envoy_cc_fuzz_test( srcs = ["ext_proc_grpc_fuzz.cc"], hdrs = ["ext_proc_grpc_fuzz.h"], corpus = "ext_proc_grpc_corpus", + tags = ["skip_on_windows"], deps = EXT_PROC_GRPC_FUZZ_TEST_DEPS, ) @@ -251,6 +283,7 @@ envoy_cc_fuzz_test( srcs = ["ext_proc_grpc_fuzz_persistent.cc"], hdrs = ["ext_proc_grpc_fuzz.h"], corpus = "ext_proc_grpc_corpus", + tags = ["skip_on_windows"], deps = EXT_PROC_GRPC_FUZZ_TEST_DEPS, ) @@ -258,6 +291,7 @@ envoy_extension_cc_test( name = "ext_proc_benchmark_test", srcs = ["ext_proc_benchmark_test.cc"], extension_names = ["envoy.filters.http.ext_proc"], + tags = ["skip_on_windows"], deps = [ ":test_processor_lib", "//envoy/http:header_map_interface", @@ -285,6 +319,7 @@ envoy_extension_cc_test_library( "logging_test_filter.cc", ], extension_names = ["envoy.filters.http.ext_proc"], + tags = ["skip_on_windows"], deps = [ ":logging_test_filter_proto_cc_proto", "//envoy/http:filter_interface", diff --git a/test/extensions/filters/http/ext_proc/config_test.cc b/test/extensions/filters/http/ext_proc/config_test.cc index 6cfa1d9196a9..47e36558a260 100644 --- a/test/extensions/filters/http/ext_proc/config_test.cc +++ b/test/extensions/filters/http/ext_proc/config_test.cc @@ -21,8 +21,12 @@ TEST(HttpExtProcConfigTest, CorrectConfig) { target_uri: ext_proc_server stat_prefix: google failure_mode_allow: true - request_attributes: 'Foo, Bar, Baz' - response_attributes: More + request_attributes: + - 'Foo' + - 'Bar' + - 'Baz' + response_attributes: + - 'More' processing_mode: request_header_mode: send response_header_mode: skip @@ -54,8 +58,12 @@ TEST(HttpExtProcConfigTest, CorrectConfigServerContext) { target_uri: ext_proc_server stat_prefix: google failure_mode_allow: true - request_attributes: 'Foo, Bar, Baz' - response_attributes: More + request_attributes: + - 'Foo' + - 'Bar' + - 'Baz' + response_attributes: + - 'More' processing_mode: request_header_mode: send response_header_mode: skip diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index b75f2eb4e142..18cbbc77fc5d 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -79,7 +79,7 @@ class ExtProcIntegrationTest : public HttpIntegrationTest, scoped_runtime_.mergeValues( {{"envoy.reloadable_features.send_header_raw_value", header_raw_value_}}); scoped_runtime_.mergeValues( - {{"envoy_reloadable_features_immediate_response_use_filter_mutation_rule", + {{"envoy.reloadable_features.immediate_response_use_filter_mutation_rule", filter_mutation_rule_}}); config_helper_.addConfigModifier([this, config_option]( @@ -1836,6 +1836,28 @@ TEST_P(ExtProcIntegrationTest, GetAndImmediateRespondMutationAllowEnvoy) { EXPECT_THAT(response->headers(), SingleHeaderValueIs("x-envoy-foo", "bar")); } +// Test the filter using an ext_proc server that responds to the request_header message +// by sending back an immediate_response message with x-envoy header mutation. +// The deprecated default checker allows x-envoy headers to be mutated and should +// override config-level checkers if the runtime guard is disabled. +TEST_P(ExtProcIntegrationTest, GetAndRespondImmediatelyWithDefaultHeaderMutationChecker) { + // this is default, but setting explicitly for test clarity + filter_mutation_rule_ = "false"; + proto_config_.mutable_mutation_rules()->mutable_allow_envoy()->set_value(false); + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + processAndRespondImmediately(*grpc_upstreams_[0], true, [](ImmediateResponse& immediate) { + immediate.mutable_status()->set_code(envoy::type::v3::StatusCode::Unauthorized); + auto* hdr = immediate.mutable_headers()->add_set_headers(); + // Adding x-envoy header is allowed since default overrides config. + hdr->mutable_header()->set_key("x-envoy-foo"); + hdr->mutable_header()->set_value("bar"); + }); + verifyDownstreamResponse(*response, 401); + EXPECT_FALSE(response->headers().get(LowerCaseString("x-envoy-foo")).empty()); +} + // Test the filter with request body buffering enabled using // an ext_proc server that responds to the request_body message // by modifying a header that should cause an error. @@ -3285,4 +3307,48 @@ TEST_P(ExtProcIntegrationTest, SendBodyBufferedPartialWithTrailer) { verifyDownstreamResponse(*response, 200); } +#if defined(USE_CEL_PARSER) +// Test the filter using the default configuration by connecting to +// an ext_proc server that responds to the request_headers message +// by requesting to modify the request headers. +TEST_P(ExtProcIntegrationTest, GetAndSetRequestResponseAttributes) { + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SEND); + proto_config_.mutable_request_attributes()->Add("request.path"); + proto_config_.mutable_request_attributes()->Add("request.method"); + proto_config_.mutable_request_attributes()->Add("request.scheme"); + proto_config_.mutable_request_attributes()->Add("connection.mtls"); + proto_config_.mutable_response_attributes()->Add("response.code"); + proto_config_.mutable_response_attributes()->Add("response.code_details"); + + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + processRequestHeadersMessage( + *grpc_upstreams_[0], true, [](const HttpHeaders& req, HeadersResponse&) { + EXPECT_EQ(req.attributes().size(), 1); + auto proto_struct = req.attributes().at("envoy.filters.http.ext_proc"); + EXPECT_EQ(proto_struct.fields().at("request.path").string_value(), "/"); + EXPECT_EQ(proto_struct.fields().at("request.method").string_value(), "GET"); + EXPECT_EQ(proto_struct.fields().at("request.scheme").string_value(), "http"); + EXPECT_EQ(proto_struct.fields().at("connection.mtls").bool_value(), false); + return true; + }); + + handleUpstreamRequest(); + + processResponseHeadersMessage( + *grpc_upstreams_[0], false, [](const HttpHeaders& req, HeadersResponse&) { + EXPECT_EQ(req.attributes().size(), 1); + auto proto_struct = req.attributes().at("envoy.filters.http.ext_proc"); + EXPECT_EQ(proto_struct.fields().at("response.code").string_value(), "200"); + EXPECT_EQ(proto_struct.fields().at("response.code_details").string_value(), + StreamInfo::ResponseCodeDetails::get().ViaUpstream); + return true; + }); + + verifyDownstreamResponse(*response, 200); +} +#endif + } // namespace Envoy diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 47865bdbf334..7854a763dbac 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -21,6 +21,7 @@ #include "test/mocks/event/mocks.h" #include "test/mocks/http/mocks.h" #include "test/mocks/http/stream_encoder.h" +#include "test/mocks/local_info/mocks.h" #include "test/mocks/network/mocks.h" #include "test/mocks/router/mocks.h" #include "test/mocks/runtime/mocks.h" @@ -121,8 +122,11 @@ class HttpFilterTest : public testing::Test { if (!yaml.empty()) { TestUtility::loadFromYaml(yaml, proto_config); } - config_ = - std::make_shared(proto_config, 200ms, 10000, *stats_store_.rootScope(), ""); + config_ = std::make_shared( + proto_config, 200ms, 10000, *stats_store_.rootScope(), "", + std::make_shared( + Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr)), + local_info_); filter_ = std::make_unique(config_, std::move(client_), proto_config.grpc_service()); filter_->setEncoderFilterCallbacks(encoder_callbacks_); EXPECT_CALL(encoder_callbacks_, encoderBufferLimit()).WillRepeatedly(Return(BufferSize)); @@ -561,6 +565,7 @@ class HttpFilterTest : public testing::Test { std::vector timers_; TestScopedRuntime scoped_runtime_; Envoy::Event::SimulatedTimeSystem* test_time_; + testing::NiceMock local_info_; }; // Using the default configuration, test the filter with a processor that diff --git a/test/extensions/filters/http/ext_proc/ordering_test.cc b/test/extensions/filters/http/ext_proc/ordering_test.cc index 744692ef22ff..06d1e89e4aba 100644 --- a/test/extensions/filters/http/ext_proc/ordering_test.cc +++ b/test/extensions/filters/http/ext_proc/ordering_test.cc @@ -7,6 +7,7 @@ #include "test/extensions/filters/http/ext_proc/mock_server.h" #include "test/mocks/event/mocks.h" #include "test/mocks/http/mocks.h" +#include "test/mocks/local_info/mocks.h" #include "test/mocks/network/mocks.h" #include "test/mocks/router/mocks.h" #include "test/mocks/stream_info/mocks.h" @@ -70,8 +71,11 @@ class OrderingTest : public testing::Test { if (cb) { (*cb)(proto_config); } - config_ = std::make_shared(proto_config, kMessageTimeout, kMaxMessageTimeoutMs, - *stats_store_.rootScope(), ""); + config_ = std::make_shared( + proto_config, kMessageTimeout, kMaxMessageTimeoutMs, *stats_store_.rootScope(), "", + std::make_shared( + Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr)), + local_info_); filter_ = std::make_unique(config_, std::move(client_), proto_config.grpc_service()); filter_->setEncoderFilterCallbacks(encoder_callbacks_); filter_->setDecoderFilterCallbacks(decoder_callbacks_); @@ -212,6 +216,7 @@ class OrderingTest : public testing::Test { Http::TestResponseHeaderMapImpl response_headers_; Http::TestRequestTrailerMapImpl request_trailers_; Http::TestResponseTrailerMapImpl response_trailers_; + NiceMock local_info_; }; // A base class for tests that will check that gRPC streams fail while being created diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD b/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD index 2a7265e6a63b..630a47ea12df 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD @@ -13,6 +13,7 @@ envoy_package() envoy_cc_mock( name = "ext_proc_mocks", hdrs = ["mocks.h"], + tags = ["skip_on_windows"], deps = [ "//source/extensions/filters/http/ext_proc:client_interface", "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", @@ -33,12 +34,14 @@ envoy_cc_fuzz_test( name = "ext_proc_unit_test_fuzz", srcs = ["ext_proc_unit_test_fuzz.cc"], corpus = "ext_proc_corpus", + tags = ["skip_on_windows"], deps = [ ":ext_proc_mocks", ":ext_proc_unit_test_fuzz_proto_cc_proto", "//source/extensions/filters/http/ext_proc:config", "//test/extensions/filters/http/common/fuzz:http_filter_fuzzer_lib", "//test/mocks/http:http_mocks", + "//test/mocks/local_info:local_info_mocks", "//test/mocks/network:network_mocks", ], ) diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc index 984b828e0e02..184c4be32365 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc @@ -5,6 +5,7 @@ #include "test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h" #include "test/fuzz/fuzz_runner.h" #include "test/mocks/http/mocks.h" +#include "test/mocks/local_info/mocks.h" #include "test/mocks/network/mocks.h" using testing::Return; @@ -45,7 +46,8 @@ class FuzzerMocks { NiceMock request_trailers_; NiceMock response_trailers_; NiceMock buffer_; - testing::NiceMock async_client_stream_info_; + NiceMock async_client_stream_info_; + NiceMock local_info_; }; DEFINE_PROTO_FUZZER( @@ -56,6 +58,17 @@ DEFINE_PROTO_FUZZER( ENVOY_LOG_MISC(debug, "EnvoyException during validation: {}", e.what()); return; } + + // ASAN fuzz testing is producing an error on CEL parsing that is believed to be a false positive. + // It is determined that the error is unrelated to the implementation of request and response + // attributes as demonstrated here + // https://github.com/envoyproxy/envoy/compare/main...jbohanon:envoy:bug/cel-parser-antlr-exception-use-after-free. + // Discussion on this is located at https://github.com/envoyproxy/envoy/pull/31017 + if (!input.config().request_attributes().empty() || + !input.config().response_attributes().empty()) { + return; + } + static FuzzerMocks mocks; NiceMock stats_store; @@ -71,8 +84,10 @@ DEFINE_PROTO_FUZZER( try { config = std::make_shared( - proto_config, std::chrono::milliseconds(200), 200, *stats_store.rootScope(), - "ext_proc_prefix"); + proto_config, std::chrono::milliseconds(200), 200, *stats_store.rootScope(), "", + std::make_shared( + Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr)), + mocks.local_info_); } catch (const EnvoyException& e) { ENVOY_LOG_MISC(debug, "EnvoyException during ext_proc filter config validation: {}", e.what()); return; From 318a91564488443387d18b1b58380e104e45ff7e Mon Sep 17 00:00:00 2001 From: Jacob Bohanon Date: Tue, 30 Jan 2024 13:18:57 -0500 Subject: [PATCH 2/3] ext_proc: send and receive dynamic metadata (#30747) Introduce the ability to send dynamic metadata in the External Processing Request. Also implements the API for returning dynamic metadata as part of the External Processing Response. --------- Signed-off-by: Jacob Bohanon --- .../filters/http/ext_proc/v3/ext_proc.proto | 41 +- .../ext_proc/v3/external_processor.proto | 11 +- changelogs/current.yaml | 8 + .../filters/http/ext_proc/ext_proc.cc | 194 +++++- .../filters/http/ext_proc/ext_proc.h | 74 ++- .../filters/http/ext_proc/processor_state.h | 60 +- test/extensions/filters/http/ext_proc/BUILD | 11 +- .../filters/http/ext_proc/config_test.cc | 18 + .../ext_proc/ext_proc_integration_test.cc | 121 ++++ .../filters/http/ext_proc/filter_test.cc | 552 +++++++++++++++++- test/integration/filters/BUILD | 1 + .../filters/stream_info_to_headers_filter.cc | 44 ++ 12 files changed, 1106 insertions(+), 29 deletions(-) diff --git a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto index 9874208aabdf..ebcc53d774d4 100644 --- a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto +++ b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto @@ -28,7 +28,6 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // **Current Implementation Status:** // All options and processing modes are implemented except for the following: // -// * Dynamic metadata in responses from the external processor is ignored. // * "async mode" is not implemented. // The filter communicates with an external gRPC service called an "external processor" @@ -97,7 +96,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // ` object in a namespace matching the filter // name. // -// [#next-free-field: 16] +// [#next-free-field: 17] message ExternalProcessor { // Configuration for the gRPC service that the filter will communicate with. // The filter supports both the "Envoy" and "Google" gRPC clients. @@ -203,6 +202,35 @@ message ExternalProcessor { // Instead, the stream to the external processor will be closed. There will be no // more external processing for this stream from now on. bool disable_immediate_response = 15; + + // Options related to the sending and receiving of dynamic metadata. + MetadataOptions metadata_options = 16; +} + +// The MetadataOptions structure defines options for the sending and receiving of +// dynamic metadata. Specifically, which namespaces to send to the server, whether +// metadata returned by the server may be written, and how that metadata may be written. +message MetadataOptions { + message MetadataNamespaces { + // Specifies a list of metadata namespaces whose values, if present, + // will be passed to the ext_proc service as an opaque *protobuf::Struct*. + repeated string untyped = 1; + + // Specifies a list of metadata namespaces whose values, if present, + // will be passed to the ext_proc service as a *protobuf::Any*. This allows + // envoy and the external processing server to share the protobuf message + // definition for safe parsing. + repeated string typed = 2; + } + + // Describes which typed or untyped dynamic metadata namespaces to forward to + // the external processing server. + MetadataNamespaces forwarding_namespaces = 1; + + // Describes which typed or untyped dynamic metadata namespaces to accept from + // the external processing server. Set to empty or leave unset to disallow writing + // any received dynamic metadata. Receiving of typed metadata is not supported. + MetadataNamespaces receiving_namespaces = 2; } // The HeaderForwardingRules structure specifies what headers are @@ -245,7 +273,7 @@ message ExtProcPerRoute { } // Overrides that may be set on a per-route basis -// [#next-free-field: 6] +// [#next-free-field: 7] message ExtProcOverrides { // Set a different processing mode for this route than the default. ProcessingMode processing_mode = 1; @@ -266,4 +294,11 @@ message ExtProcOverrides { // Set a different gRPC service for this route than the default. config.core.v3.GrpcService grpc_service = 5; + + // Options related to the sending and receiving of dynamic metadata. + // Lists of forwarding and receiving namespaces will be overridden in their entirety, + // meaning the most-specific config that specifies this override will be the final + // config used. It is the prerogative of the control plane to ensure this + // most-specific config contains the correct final overrides. + MetadataOptions metadata_options = 6; } diff --git a/api/envoy/service/ext_proc/v3/external_processor.proto b/api/envoy/service/ext_proc/v3/external_processor.proto index 50fba503f846..0813bdf6d724 100644 --- a/api/envoy/service/ext_proc/v3/external_processor.proto +++ b/api/envoy/service/ext_proc/v3/external_processor.proto @@ -56,7 +56,7 @@ service ExternalProcessor { // This represents the different types of messages that Envoy can send // to an external processing server. -// [#next-free-field: 8] +// [#next-free-field: 9] message ProcessingRequest { // Specify whether the filter that sent this request is running in synchronous // or asynchronous mode. The choice of synchronous or asynchronous mode @@ -115,6 +115,9 @@ message ProcessingRequest { // in the filter configuration. HttpTrailers response_trailers = 7; } + + // Dynamic metadata associated with the request. + config.core.v3.Metadata metadata_context = 8; } // For every ProcessingRequest received by the server with the ``async_mode`` field @@ -158,9 +161,9 @@ message ProcessingResponse { ImmediateResponse immediate_response = 7; } - // [#not-implemented-hide:] - // Optional metadata that will be emitted as dynamic metadata to be consumed by the next - // filter. This metadata will be placed in the namespace ``envoy.filters.http.ext_proc``. + // Optional metadata that will be emitted as dynamic metadata to be consumed by + // following filters. This metadata will be placed in the namespace(s) specified by the top-level + // field name(s) of the struct. google.protobuf.Struct dynamic_metadata = 8; // Override how parts of the HTTP request and response are processed diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 3c5d29ced017..4efd550cd6be 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -28,5 +28,13 @@ new_features: and :ref:`response_attributes ` config APIs to enable sending and receiving attributes to/from the external processing server. +- area: ext_proc + change: | + added + :ref:`metadata_options ` + config API to enable sending and receiving metadata from/to the external processing server. Both typed and untyped dynamic + metadata may be sent to the server. If + :ref:`receiving_namespaces ` + is defined, returned metadata may be written to the specified allowed namespaces. deprecated: diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 2a3ac43d314c..4b41dbdc57a2 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -115,6 +115,49 @@ ExtProcLoggingInfo::grpcCalls(envoy::config::core::v3::TrafficDirection traffic_ : encoding_processor_grpc_calls_; } +std::vector +FilterConfigPerRoute::initNamespaces(const Protobuf::RepeatedPtrField& ns) { + if (ns.empty()) { + return {}; + } + + std::vector namespaces; + for (const auto& single_ns : ns) { + namespaces.emplace_back(single_ns); + } + return namespaces; +} + +absl::optional> +FilterConfigPerRoute::initUntypedForwardingNamespaces(const ExtProcPerRoute& config) { + if (!config.has_overrides() || !config.overrides().has_metadata_options() || + !config.overrides().metadata_options().has_forwarding_namespaces()) { + return absl::nullopt; + } + + return {initNamespaces(config.overrides().metadata_options().forwarding_namespaces().untyped())}; +} + +absl::optional> +FilterConfigPerRoute::initTypedForwardingNamespaces(const ExtProcPerRoute& config) { + if (!config.has_overrides() || !config.overrides().has_metadata_options() || + !config.overrides().metadata_options().has_forwarding_namespaces()) { + return absl::nullopt; + } + + return {initNamespaces(config.overrides().metadata_options().forwarding_namespaces().typed())}; +} + +absl::optional> +FilterConfigPerRoute::initUntypedReceivingNamespaces(const ExtProcPerRoute& config) { + if (!config.has_overrides() || !config.overrides().has_metadata_options() || + !config.overrides().metadata_options().has_receiving_namespaces()) { + return absl::nullopt; + } + + return {initNamespaces(config.overrides().metadata_options().receiving_namespaces().untyped())}; +} + absl::optional FilterConfigPerRoute::initProcessingMode(const ExtProcPerRoute& config) { if (!config.disabled() && config.has_overrides() && config.overrides().has_processing_mode()) { @@ -142,14 +185,26 @@ FilterConfigPerRoute::mergeProcessingMode(const FilterConfigPerRoute& less_speci FilterConfigPerRoute::FilterConfigPerRoute(const ExtProcPerRoute& config) : disabled_(config.disabled()), processing_mode_(initProcessingMode(config)), - grpc_service_(initGrpcService(config)) {} + grpc_service_(initGrpcService(config)), + untyped_forwarding_namespaces_(initUntypedForwardingNamespaces(config)), + typed_forwarding_namespaces_(initTypedForwardingNamespaces(config)), + untyped_receiving_namespaces_(initUntypedReceivingNamespaces(config)) {} FilterConfigPerRoute::FilterConfigPerRoute(const FilterConfigPerRoute& less_specific, const FilterConfigPerRoute& more_specific) : disabled_(more_specific.disabled()), processing_mode_(mergeProcessingMode(less_specific, more_specific)), grpc_service_(more_specific.grpcService().has_value() ? more_specific.grpcService() - : less_specific.grpcService()) {} + : less_specific.grpcService()), + untyped_forwarding_namespaces_(more_specific.untypedForwardingMetadataNamespaces().has_value() + ? more_specific.untypedForwardingMetadataNamespaces() + : less_specific.untypedForwardingMetadataNamespaces()), + typed_forwarding_namespaces_(more_specific.typedForwardingMetadataNamespaces().has_value() + ? more_specific.typedForwardingMetadataNamespaces() + : less_specific.typedForwardingMetadataNamespaces()), + untyped_receiving_namespaces_(more_specific.untypedReceivingMetadataNamespaces().has_value() + ? more_specific.untypedReceivingMetadataNamespaces() + : less_specific.untypedReceivingMetadataNamespaces()) {} void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) { Http::PassThroughFilter::setDecoderFilterCallbacks(callbacks); @@ -236,6 +291,7 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state, state.setHeaders(&headers); state.setHasNoBody(end_stream); ProcessingRequest req; + addDynamicMetadata(state, req); auto* headers_req = state.mutableHeaders(req); MutationUtils::headersToProto(headers, config_->allowedHeaders(), config_->disallowedHeaders(), *headers_req->mutable_headers()); @@ -265,8 +321,8 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_st if (config_->expressionManager().hasRequestExpr()) { auto activation_ptr = Filters::Common::Expr::createActivation( - &config_->expressionManager().localInfo(), decoding_state_.streamInfo(), &headers, - nullptr, nullptr); + &config_->expressionManager().localInfo(), decoding_state_.callbacks()->streamInfo(), + &headers, nullptr, nullptr); proto = config_->expressionManager().evaluateRequestAttributes(*activation_ptr); } @@ -553,8 +609,8 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s if (config_->expressionManager().hasResponseExpr()) { auto activation_ptr = Filters::Common::Expr::createActivation( - &config_->expressionManager().localInfo(), encoding_state_.streamInfo(), nullptr, - &headers, nullptr); + &config_->expressionManager().localInfo(), encoding_state_.callbacks()->streamInfo(), + nullptr, &headers, nullptr); proto = config_->expressionManager().evaluateResponseAttributes(*activation_ptr); } @@ -594,6 +650,7 @@ ProcessingRequest Filter::setupBodyChunk(ProcessorState& state, const Buffer::In bool end_stream) { ENVOY_LOG(debug, "Sending a body chunk of {} bytes, end_stream {}", data.length(), end_stream); ProcessingRequest req; + addDynamicMetadata(state, req); auto* body_req = state.mutableBody(req); body_req->set_end_of_stream(end_stream); body_req->set_body(data.toString()); @@ -610,6 +667,7 @@ void Filter::sendBodyChunk(ProcessorState& state, ProcessorState::CallbackState void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers) { ProcessingRequest req; + addDynamicMetadata(state, req); auto* trailers_req = state.mutableTrailers(req); MutationUtils::headersToProto(trailers, config_->allowedHeaders(), config_->disallowedHeaders(), *trailers_req->mutable_trailers()); @@ -663,6 +721,93 @@ void Filter::onNewTimeout(const ProtobufWkt::Duration& override_message_timeout) stats_.override_message_timeout_received_.inc(); } +void Filter::addDynamicMetadata(const ProcessorState& state, ProcessingRequest& req) { + // get the callbacks from the ProcessorState. This will be the appropriate + // callbacks for the current state of the filter + auto* cb = state.callbacks(); + envoy::config::core::v3::Metadata forwarding_metadata; + + // If metadata_context_namespaces is specified, pass matching filter metadata to the ext_proc + // service. If metadata key is set in both the connection and request metadata then the value + // will be the request metadata value. The metadata will only be searched for the callbacks + // corresponding to the traffic direction at the time of the external processing request. + const auto& request_metadata = cb->streamInfo().dynamicMetadata().filter_metadata(); + for (const auto& context_key : state.untypedForwardingMetadataNamespaces()) { + if (const auto metadata_it = request_metadata.find(context_key); + metadata_it != request_metadata.end()) { + (*forwarding_metadata.mutable_filter_metadata())[metadata_it->first] = metadata_it->second; + } else if (cb->connection().has_value()) { + const auto& connection_metadata = + cb->connection().value().get().streamInfo().dynamicMetadata().filter_metadata(); + if (const auto metadata_it = connection_metadata.find(context_key); + metadata_it != connection_metadata.end()) { + (*forwarding_metadata.mutable_filter_metadata())[metadata_it->first] = metadata_it->second; + } + } + } + + // If typed_metadata_context_namespaces is specified, pass matching typed filter metadata to the + // ext_proc service. If metadata key is set in both the connection and request metadata then + // the value will be the request metadata value. The metadata will only be searched for the + // callbacks corresponding to the traffic direction at the time of the external processing + // request. + const auto& request_typed_metadata = cb->streamInfo().dynamicMetadata().typed_filter_metadata(); + for (const auto& context_key : state.typedForwardingMetadataNamespaces()) { + if (const auto metadata_it = request_typed_metadata.find(context_key); + metadata_it != request_typed_metadata.end()) { + (*forwarding_metadata.mutable_typed_filter_metadata())[metadata_it->first] = + metadata_it->second; + } else if (cb->connection().has_value()) { + const auto& connection_typed_metadata = + cb->connection().value().get().streamInfo().dynamicMetadata().typed_filter_metadata(); + if (const auto metadata_it = connection_typed_metadata.find(context_key); + metadata_it != connection_typed_metadata.end()) { + (*forwarding_metadata.mutable_typed_filter_metadata())[metadata_it->first] = + metadata_it->second; + } + } + } + + *req.mutable_metadata_context() = forwarding_metadata; +} + +void Filter::setDynamicMetadata(Http::StreamFilterCallbacks* cb, const ProcessorState& state, + const ProcessingResponse& response) { + if (state.untypedReceivingMetadataNamespaces().empty() || !response.has_dynamic_metadata()) { + if (response.has_dynamic_metadata()) { + ENVOY_LOG(debug, "processing response included dynamic metadata, but no receiving " + "namespaces are configured."); + } + return; + } + + auto response_metadata = response.dynamic_metadata().fields(); + auto receiving_namespaces = state.untypedReceivingMetadataNamespaces(); + for (const auto& context_key : response_metadata) { + bool found_allowed_namespace = false; + if (auto metadata_it = + std::find(receiving_namespaces.begin(), receiving_namespaces.end(), context_key.first); + metadata_it != receiving_namespaces.end()) { + cb->streamInfo().setDynamicMetadata(context_key.first, + response_metadata.at(context_key.first).struct_value()); + found_allowed_namespace = true; + } + if (!found_allowed_namespace) { + ENVOY_LOG(debug, + "processing response included dynamic metadata for namespace not " + "configured for receiving: {}", + context_key.first); + } + } +} + +void Filter::setEncoderDynamicMetadata(const ProcessingResponse& response) { + setDynamicMetadata(encoder_callbacks_, encoding_state_, response); +} +void Filter::setDecoderDynamicMetadata(const ProcessingResponse& response) { + setDynamicMetadata(decoder_callbacks_, decoding_state_, response); +} + void Filter::onReceiveMessage(std::unique_ptr&& r) { if (processing_complete_) { ENVOY_LOG(debug, "Ignoring stream message received after processing complete"); @@ -693,21 +838,27 @@ void Filter::onReceiveMessage(std::unique_ptr&& r) { absl::Status processing_status; switch (response->response_case()) { case ProcessingResponse::ResponseCase::kRequestHeaders: + setDecoderDynamicMetadata(*response); processing_status = decoding_state_.handleHeadersResponse(response->request_headers()); break; case ProcessingResponse::ResponseCase::kResponseHeaders: + setEncoderDynamicMetadata(*response); processing_status = encoding_state_.handleHeadersResponse(response->response_headers()); break; case ProcessingResponse::ResponseCase::kRequestBody: + setDecoderDynamicMetadata(*response); processing_status = decoding_state_.handleBodyResponse(response->request_body()); break; case ProcessingResponse::ResponseCase::kResponseBody: + setEncoderDynamicMetadata(*response); processing_status = encoding_state_.handleBodyResponse(response->response_body()); break; case ProcessingResponse::ResponseCase::kRequestTrailers: + setDecoderDynamicMetadata(*response); processing_status = decoding_state_.handleTrailersResponse(response->request_trailers()); break; case ProcessingResponse::ResponseCase::kResponseTrailers: + setEncoderDynamicMetadata(*response); processing_status = encoding_state_.handleTrailersResponse(response->response_trailers()); break; case ProcessingResponse::ResponseCase::kImmediateResponse: @@ -717,6 +868,7 @@ void Filter::onReceiveMessage(std::unique_ptr&& r) { processing_status = absl::FailedPreconditionError("unhandled immediate response due to config disabled it"); } else { + setDecoderDynamicMetadata(*response); // We won't be sending anything more to the stream after we // receive this message. ENVOY_LOG(debug, "Sending immediate response"); @@ -906,7 +1058,7 @@ void Filter::mergePerRouteConfig() { ENVOY_LOG_MISC(debug, "Failed to retrieve the correct type of route specific filter config"); return; } - if (!merged_config) { + if (!merged_config.has_value()) { merged_config.emplace(*typed_cfg); } else { merged_config.emplace(FilterConfigPerRoute(merged_config.value(), *typed_cfg)); @@ -936,6 +1088,34 @@ void Filter::mergePerRouteConfig() { grpc_service_ = *merged_config->grpcService(); config_with_hash_key_.setConfig(*merged_config->grpcService()); } + + // For metadata namespaces, we only override the existing value if we have a + // value from our merged config. We indicate a lack of value from the merged + // config with absl::nullopt + + if (merged_config->untypedForwardingMetadataNamespaces().has_value()) { + untyped_forwarding_namespaces_ = merged_config->untypedForwardingMetadataNamespaces().value(); + ENVOY_LOG(trace, + "Setting new untyped forwarding metadata namespaces from per-route configuration"); + decoding_state_.setUntypedForwardingMetadataNamespaces(untyped_forwarding_namespaces_); + encoding_state_.setUntypedForwardingMetadataNamespaces(untyped_forwarding_namespaces_); + } + + if (merged_config->typedForwardingMetadataNamespaces().has_value()) { + typed_forwarding_namespaces_ = merged_config->typedForwardingMetadataNamespaces().value(); + ENVOY_LOG(trace, + "Setting new typed forwarding metadata namespaces from per-route configuration"); + decoding_state_.setTypedForwardingMetadataNamespaces(typed_forwarding_namespaces_); + encoding_state_.setTypedForwardingMetadataNamespaces(typed_forwarding_namespaces_); + } + + if (merged_config->untypedReceivingMetadataNamespaces().has_value()) { + untyped_receiving_namespaces_ = merged_config->untypedReceivingMetadataNamespaces().value(); + ENVOY_LOG(trace, + "Setting new untyped receiving metadata namespaces from per-route configuration"); + decoding_state_.setUntypedReceivingMetadataNamespaces(untyped_receiving_namespaces_); + encoding_state_.setUntypedReceivingMetadataNamespaces(untyped_receiving_namespaces_); + } } std::string responseCaseToString(const ProcessingResponse::ResponseCase response_case) { diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 49954e6ac1e7..78d97cbe9bab 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -140,6 +140,15 @@ class FilterConfig { disable_immediate_response_(config.disable_immediate_response()), allowed_headers_(initHeaderMatchers(config.forward_rules().allowed_headers())), disallowed_headers_(initHeaderMatchers(config.forward_rules().disallowed_headers())), + untyped_forwarding_namespaces_( + config.metadata_options().forwarding_namespaces().untyped().begin(), + config.metadata_options().forwarding_namespaces().untyped().end()), + typed_forwarding_namespaces_( + config.metadata_options().forwarding_namespaces().typed().begin(), + config.metadata_options().forwarding_namespaces().typed().end()), + untyped_receiving_namespaces_( + config.metadata_options().receiving_namespaces().untyped().begin(), + config.metadata_options().receiving_namespaces().untyped().end()), expression_manager_(builder, local_info, config.request_attributes(), config.response_attributes()) {} @@ -173,6 +182,18 @@ class FilterConfig { const ExpressionManager& expressionManager() const { return expression_manager_; } + const std::vector& untypedForwardingMetadataNamespaces() const { + return untyped_forwarding_namespaces_; + } + + const std::vector& typedForwardingMetadataNamespaces() const { + return typed_forwarding_namespaces_; + } + + const std::vector& untypedReceivingMetadataNamespaces() const { + return untyped_receiving_namespaces_; + } + private: ExtProcFilterStats generateStats(const std::string& prefix, const std::string& filter_stats_prefix, Stats::Scope& scope) { @@ -198,6 +219,10 @@ class FilterConfig { // Empty disallowed_header_ means disallow nothing, i.e, allow all. const std::vector disallowed_headers_; + const std::vector untyped_forwarding_namespaces_; + const std::vector typed_forwarding_namespaces_; + const std::vector untyped_receiving_namespaces_; + const ExpressionManager expression_manager_; }; @@ -223,6 +248,17 @@ class FilterConfigPerRoute : public Router::RouteSpecificFilterConfig { return grpc_service_; } + const absl::optional>& + untypedForwardingMetadataNamespaces() const { + return untyped_forwarding_namespaces_; + } + const absl::optional>& typedForwardingMetadataNamespaces() const { + return typed_forwarding_namespaces_; + } + const absl::optional>& untypedReceivingMetadataNamespaces() const { + return untyped_receiving_namespaces_; + } + private: absl::optional initProcessingMode(const envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute& config); @@ -230,6 +266,17 @@ class FilterConfigPerRoute : public Router::RouteSpecificFilterConfig { absl::optional initGrpcService(const envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute& config); + std::vector initNamespaces(const Protobuf::RepeatedPtrField& ns); + + absl::optional> initUntypedForwardingNamespaces( + const envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute& config); + + absl::optional> initTypedForwardingNamespaces( + const envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute& config); + + absl::optional> initUntypedReceivingNamespaces( + const envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute& config); + absl::optional mergeProcessingMode(const FilterConfigPerRoute& less_specific, const FilterConfigPerRoute& more_specific); @@ -238,6 +285,10 @@ class FilterConfigPerRoute : public Router::RouteSpecificFilterConfig { const absl::optional processing_mode_; const absl::optional grpc_service_; + + const absl::optional> untyped_forwarding_namespaces_; + const absl::optional> typed_forwarding_namespaces_; + const absl::optional> untyped_receiving_namespaces_; }; class Filter : public Logger::Loggable, @@ -260,8 +311,14 @@ class Filter : public Logger::Loggable, const envoy::config::core::v3::GrpcService& grpc_service) : config_(config), client_(std::move(client)), stats_(config->stats()), grpc_service_(grpc_service), config_with_hash_key_(grpc_service), - decoding_state_(*this, config->processingMode()), - encoding_state_(*this, config->processingMode()) {} + decoding_state_(*this, config->processingMode(), + config->untypedForwardingMetadataNamespaces(), + config->typedForwardingMetadataNamespaces(), + config->untypedReceivingMetadataNamespaces()), + encoding_state_(*this, config->processingMode(), + config->untypedForwardingMetadataNamespaces(), + config->typedForwardingMetadataNamespaces(), + config->untypedReceivingMetadataNamespaces()) {} const FilterConfig& config() const { return *config_; } @@ -303,6 +360,9 @@ class Filter : public Logger::Loggable, encoding_state_.callbackState() == ProcessorState::CallbackState::HeadersCallback); } + const ProcessorState& encodingState() { return encoding_state_; } + const ProcessorState& decodingState() { return decoding_state_; } + private: void mergePerRouteConfig(); StreamOpenState openStream(); @@ -320,6 +380,12 @@ class Filter : public Logger::Loggable, std::pair sendStreamChunk(ProcessorState& state); Http::FilterDataStatus onData(ProcessorState& state, Buffer::Instance& data, bool end_stream); Http::FilterTrailersStatus onTrailers(ProcessorState& state, Http::HeaderMap& trailers); + void setDynamicMetadata(Http::StreamFilterCallbacks* cb, const ProcessorState& state, + const envoy::service::ext_proc::v3::ProcessingResponse& response); + void setEncoderDynamicMetadata(const envoy::service::ext_proc::v3::ProcessingResponse& response); + void setDecoderDynamicMetadata(const envoy::service::ext_proc::v3::ProcessingResponse& response); + void addDynamicMetadata(const ProcessorState& state, + envoy::service::ext_proc::v3::ProcessingRequest& req); const FilterConfigSharedPtr config_; const ExternalProcessorClientPtr client_; @@ -347,6 +413,10 @@ class Filter : public Logger::Loggable, // Set to true when the mergePerRouteConfig() method has been called. bool route_config_merged_ = false; + + std::vector untyped_forwarding_namespaces_{}; + std::vector typed_forwarding_namespaces_{}; + std::vector untyped_receiving_namespaces_{}; }; extern std::string responseCaseToString( diff --git a/source/extensions/filters/http/ext_proc/processor_state.h b/source/extensions/filters/http/ext_proc/processor_state.h index c6c50fdecbbb..a3ebe27617f0 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.h +++ b/source/extensions/filters/http/ext_proc/processor_state.h @@ -73,10 +73,16 @@ class ProcessorState : public Logger::Loggable { }; explicit ProcessorState(Filter& filter, - envoy::config::core::v3::TrafficDirection traffic_direction) + envoy::config::core::v3::TrafficDirection traffic_direction, + const std::vector& untyped_forwarding_namespaces, + const std::vector& typed_forwarding_namespaces, + const std::vector& untyped_receiving_namespaces) : filter_(filter), watermark_requested_(false), paused_(false), no_body_(false), complete_body_available_(false), trailers_available_(false), body_replaced_(false), - partial_body_processed_(false), traffic_direction_(traffic_direction) {} + partial_body_processed_(false), traffic_direction_(traffic_direction), + untyped_forwarding_namespaces_(&untyped_forwarding_namespaces), + typed_forwarding_namespaces_(&typed_forwarding_namespaces), + untyped_receiving_namespaces_(&untyped_receiving_namespaces) {} ProcessorState(const ProcessorState&) = delete; virtual ~ProcessorState() = default; ProcessorState& operator=(const ProcessorState&) = delete; @@ -100,6 +106,28 @@ class ProcessorState : public Logger::Loggable { virtual void setProcessingMode( const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode) PURE; + + const std::vector& untypedForwardingMetadataNamespaces() const { + return *untyped_forwarding_namespaces_; + }; + void setUntypedForwardingMetadataNamespaces(const std::vector& ns) { + untyped_forwarding_namespaces_ = &ns; + }; + + const std::vector& typedForwardingMetadataNamespaces() const { + return *typed_forwarding_namespaces_; + }; + void setTypedForwardingMetadataNamespaces(const std::vector& ns) { + typed_forwarding_namespaces_ = &ns; + }; + + const std::vector& untypedReceivingMetadataNamespaces() const { + return *untyped_receiving_namespaces_; + }; + void setUntypedReceivingMetadataNamespaces(const std::vector& ns) { + untyped_receiving_namespaces_ = &ns; + }; + bool sendHeaders() const { return send_headers_; } bool sendTrailers() const { return send_trailers_; } envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode bodyMode() const { @@ -172,7 +200,7 @@ class ProcessorState : public Logger::Loggable { virtual envoy::service::ext_proc::v3::HttpTrailers* mutableTrailers(envoy::service::ext_proc::v3::ProcessingRequest& request) const PURE; - virtual StreamInfo::StreamInfo& streamInfo() PURE; + virtual Http::StreamFilterCallbacks* callbacks() const PURE; protected: void setBodyMode( @@ -218,6 +246,10 @@ class ProcessorState : public Logger::Loggable { absl::optional call_start_time_ = absl::nullopt; const envoy::config::core::v3::TrafficDirection traffic_direction_; + const std::vector* untyped_forwarding_namespaces_{}; + const std::vector* typed_forwarding_namespaces_{}; + const std::vector* untyped_receiving_namespaces_{}; + private: virtual void clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse&) {} }; @@ -225,8 +257,13 @@ class ProcessorState : public Logger::Loggable { class DecodingProcessorState : public ProcessorState { public: explicit DecodingProcessorState( - Filter& filter, const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode) - : ProcessorState(filter, envoy::config::core::v3::TrafficDirection::INBOUND) { + Filter& filter, const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode, + const std::vector& untyped_forwarding_namespaces, + const std::vector& typed_forwarding_namespaces, + const std::vector& untyped_receiving_namespaces) + : ProcessorState(filter, envoy::config::core::v3::TrafficDirection::INBOUND, + untyped_forwarding_namespaces, typed_forwarding_namespaces, + untyped_receiving_namespaces) { setProcessingModeInternal(mode); } DecodingProcessorState(const DecodingProcessorState&) = delete; @@ -285,7 +322,7 @@ class DecodingProcessorState : public ProcessorState { void requestWatermark() override; void clearWatermark() override; - StreamInfo::StreamInfo& streamInfo() override { return decoder_callbacks_->streamInfo(); } + Http::StreamFilterCallbacks* callbacks() const override { return decoder_callbacks_; } private: void setProcessingModeInternal( @@ -300,8 +337,13 @@ class DecodingProcessorState : public ProcessorState { class EncodingProcessorState : public ProcessorState { public: explicit EncodingProcessorState( - Filter& filter, const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode) - : ProcessorState(filter, envoy::config::core::v3::TrafficDirection::OUTBOUND) { + Filter& filter, const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode, + const std::vector& untyped_forwarding_namespaces, + const std::vector& typed_forwarding_namespaces, + const std::vector& untyped_receiving_namespaces) + : ProcessorState(filter, envoy::config::core::v3::TrafficDirection::OUTBOUND, + untyped_forwarding_namespaces, typed_forwarding_namespaces, + untyped_receiving_namespaces) { setProcessingModeInternal(mode); } EncodingProcessorState(const EncodingProcessorState&) = delete; @@ -360,7 +402,7 @@ class EncodingProcessorState : public ProcessorState { void requestWatermark() override; void clearWatermark() override; - StreamInfo::StreamInfo& streamInfo() override { return encoder_callbacks_->streamInfo(); } + Http::StreamFilterCallbacks* callbacks() const override { return encoder_callbacks_; } private: void setProcessingModeInternal( diff --git a/test/extensions/filters/http/ext_proc/BUILD b/test/extensions/filters/http/ext_proc/BUILD index 0f00ed80ee5b..9d511fbd9d8a 100644 --- a/test/extensions/filters/http/ext_proc/BUILD +++ b/test/extensions/filters/http/ext_proc/BUILD @@ -58,6 +58,7 @@ envoy_extension_cc_test( "//test/mocks/runtime:runtime_mocks", "//test/mocks/server:factory_context_mocks", "//test/mocks/server:overload_manager_mocks", + "//test/proto:helloworld_proto_cc_proto", "//test/test_common:test_runtime_lib", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", @@ -136,7 +137,11 @@ envoy_extension_cc_test( "-DUSE_CEL_PARSER", ], }), - extension_names = ["envoy.filters.http.ext_proc"], + extension_names = [ + "envoy.filters.http.ext_proc", + # TODO(jbohanon) use a test filter here instead of production filter + "envoy.filters.http.set_metadata", + ], shard_count = 4, tags = [ "cpu:3", @@ -146,11 +151,15 @@ envoy_extension_cc_test( ":logging_test_filter_lib", ":utils_lib", "//source/extensions/filters/http/ext_proc:config", + "//source/extensions/filters/http/set_metadata:config", "//test/common/http:common_lib", "//test/integration:http_integration_lib", + "//test/integration/filters:stream_info_to_headers_filter_lib", + "//test/proto:helloworld_proto_cc_proto", "//test/test_common:test_runtime_lib", "//test/test_common:utility_lib", "@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/filters/http/set_metadata/v3:pkg_cc_proto", "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", ], ) diff --git a/test/extensions/filters/http/ext_proc/config_test.cc b/test/extensions/filters/http/ext_proc/config_test.cc index 47e36558a260..fb982aee9b25 100644 --- a/test/extensions/filters/http/ext_proc/config_test.cc +++ b/test/extensions/filters/http/ext_proc/config_test.cc @@ -36,6 +36,15 @@ TEST(HttpExtProcConfigTest, CorrectConfig) { response_trailer_mode: send filter_metadata: hello: "world" + metadata_options: + forwarding_namespaces: + typed: + - ns1 + untyped: + - ns2 + receiving_namespaces: + untyped: + - ns2 )EOF"; ExternalProcessingFilterConfig factory; @@ -73,6 +82,15 @@ TEST(HttpExtProcConfigTest, CorrectConfigServerContext) { response_trailer_mode: send filter_metadata: hello: "world" + metadata_options: + forwarding_namespaces: + typed: + - ns1 + untyped: + - ns2 + receiving_namespaces: + untyped: + - ns2 )EOF"; ExternalProcessingFilterConfig factory; diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index 18cbbc77fc5d..d0becb1f2bab 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -1,6 +1,8 @@ #include +#include #include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.h" +#include "envoy/extensions/filters/http/set_metadata/v3/set_metadata.pb.h" #include "envoy/network/address.h" #include "envoy/service/ext_proc/v3/external_processor.pb.h" @@ -48,6 +50,7 @@ struct ConfigOptions { bool valid_grpc_server = true; bool add_logging_filter = false; bool http1_codec = false; + bool add_metadata = false; }; // These tests exercise the ext_proc filter through Envoy's integration test @@ -116,6 +119,38 @@ class ExtProcIntegrationTest : public HttpIntegrationTest, ext_proc_filter.mutable_typed_config()->PackFrom(proto_config_); config_helper_.prependFilter(MessageUtil::getJsonStringFromMessageOrError(ext_proc_filter)); + // Add set_metadata filter to inject dynamic metadata used for testing + if (config_option.add_metadata) { + envoy::config::listener::v3::Filter set_metadata_filter; + std::string set_metadata_filter_name = "envoy.filters.http.set_metadata"; + set_metadata_filter.set_name(set_metadata_filter_name); + + envoy::extensions::filters::http::set_metadata::v3::Config set_metadata_config; + auto* untyped_md = set_metadata_config.add_metadata(); + untyped_md->set_metadata_namespace("forwarding_ns_untyped"); + untyped_md->set_allow_overwrite(true); + ProtobufWkt::Struct test_md_val; + (*test_md_val.mutable_fields())["foo"].set_string_value("value from set_metadata"); + (*untyped_md->mutable_value()) = test_md_val; + + auto* typed_md = set_metadata_config.add_metadata(); + typed_md->set_metadata_namespace("forwarding_ns_typed"); + typed_md->set_allow_overwrite(true); + envoy::extensions::filters::http::set_metadata::v3::Metadata typed_md_to_stuff; + typed_md_to_stuff.set_metadata_namespace("typed_value from set_metadata"); + typed_md->mutable_typed_value()->PackFrom(typed_md_to_stuff); + + set_metadata_filter.mutable_typed_config()->PackFrom(set_metadata_config); + config_helper_.prependFilter( + MessageUtil::getJsonStringFromMessageOrError(set_metadata_filter)); + + // Add filter that dumps streamInfo into headers so we can check our receiving + // namespaces + config_helper_.prependFilter(fmt::format(R"EOF( + name: stream-info-to-headers-filter + )EOF")); + } + // Add logging test filter only in Envoy gRPC mode. // gRPC side stream logging is only supported in Envoy gRPC mode at the moment. if (clientType() == Grpc::ClientType::EnvoyGrpc && config_option.add_logging_filter && @@ -249,6 +284,26 @@ class ExtProcIntegrationTest : public HttpIntegrationTest, ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, request)); } + void processGenericMessage( + FakeUpstream& grpc_upstream, bool first_message, + absl::optional> cb) { + ProcessingRequest request; + if (first_message) { + ASSERT_TRUE(grpc_upstream.waitForHttpConnection(*dispatcher_, processor_connection_)); + ASSERT_TRUE(processor_connection_->waitForNewStream(*dispatcher_, processor_stream_)); + } + ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, request)); + ASSERT_TRUE(request.has_request_headers()); + if (first_message) { + processor_stream_->startGrpcStream(); + } + ProcessingResponse response; + const bool sendReply = !cb || (*cb)(request, response); + if (sendReply) { + processor_stream_->sendGrpcMessage(response); + } + } + void processRequestHeadersMessage( FakeUpstream& grpc_upstream, bool first_message, absl::optional> cb) { @@ -3307,6 +3362,72 @@ TEST_P(ExtProcIntegrationTest, SendBodyBufferedPartialWithTrailer) { verifyDownstreamResponse(*response, 200); } +TEST_P(ExtProcIntegrationTest, SendAndReceiveDynamicMetadata) { + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + + auto* md_opts = proto_config_.mutable_metadata_options(); + md_opts->mutable_forwarding_namespaces()->add_untyped("forwarding_ns_untyped"); + md_opts->mutable_forwarding_namespaces()->add_typed("forwarding_ns_typed"); + md_opts->mutable_receiving_namespaces()->add_untyped("receiving_ns_untyped"); + + ConfigOptions config_option = {}; + config_option.add_metadata = true; + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + + auto response = sendDownstreamRequest(absl::nullopt); + + ProtobufWkt::Struct test_md_struct; + (*test_md_struct.mutable_fields())["foo"].set_string_value("value from ext_proc"); + + ProtobufWkt::Value md_val; + *(md_val.mutable_struct_value()) = test_md_struct; + + processGenericMessage( + *grpc_upstreams_[0], true, [md_val](const ProcessingRequest& req, ProcessingResponse& resp) { + // Verify the processing request contains the untyped metadata we injected. + EXPECT_TRUE(req.metadata_context().filter_metadata().contains("forwarding_ns_untyped")); + const ProtobufWkt::Struct& fwd_metadata = + req.metadata_context().filter_metadata().at("forwarding_ns_untyped"); + EXPECT_EQ(1, fwd_metadata.fields_size()); + EXPECT_TRUE(fwd_metadata.fields().contains("foo")); + EXPECT_EQ("value from set_metadata", fwd_metadata.fields().at("foo").string_value()); + + // Verify the processing request contains the typed metadata we injected. + EXPECT_TRUE(req.metadata_context().typed_filter_metadata().contains("forwarding_ns_typed")); + const ProtobufWkt::Any& fwd_typed_metadata = + req.metadata_context().typed_filter_metadata().at("forwarding_ns_typed"); + EXPECT_EQ("type.googleapis.com/envoy.extensions.filters.http.set_metadata.v3.Metadata", + fwd_typed_metadata.type_url()); + envoy::extensions::filters::http::set_metadata::v3::Metadata typed_md_from_req; + fwd_typed_metadata.UnpackTo(&typed_md_from_req); + EXPECT_EQ("typed_value from set_metadata", typed_md_from_req.metadata_namespace()); + + // Spoof the response to contain receiving metadata. + HeadersResponse headers_resp; + (*resp.mutable_request_headers()) = headers_resp; + auto mut_md_fields = resp.mutable_dynamic_metadata()->mutable_fields(); + (*mut_md_fields).emplace("receiving_ns_untyped", md_val); + + return true; + }); + + handleUpstreamRequest(); + + ASSERT_TRUE(response->waitForEndStream()); + ASSERT_TRUE(response->complete()); + + // Verify the response received contains the headers from dynamic metadata we expect. + ASSERT_FALSE((*response).headers().empty()); + auto md_header_result = + (*response).headers().get(Http::LowerCaseString("receiving_ns_untyped.foo")); + ASSERT_EQ(1, md_header_result.size()); + EXPECT_EQ("value from ext_proc", md_header_result[0]->value().getStringView()); + + verifyDownstreamResponse(*response, 200); +} + #if defined(USE_CEL_PARSER) // Test the filter using the default configuration by connecting to // an ext_proc server that responds to the request_headers message diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 7854a763dbac..907f52aef5c6 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -98,6 +98,16 @@ class HttpFilterTest : public testing::Test { EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, route()).WillRepeatedly(Return(route_)); EXPECT_CALL(decoder_callbacks_, streamInfo()).WillRepeatedly(ReturnRef(stream_info_)); + EXPECT_CALL(encoder_callbacks_, streamInfo()).WillRepeatedly(ReturnRef(stream_info_)); + EXPECT_CALL(stream_info_, dynamicMetadata()).WillRepeatedly(ReturnRef(dynamic_metadata_)); + EXPECT_CALL(stream_info_, setDynamicMetadata(_, _)) + .Times(AnyNumber()) + .WillRepeatedly(Invoke(this, &HttpFilterTest::doSetDynamicMetadata)); + + EXPECT_CALL(decoder_callbacks_, connection()) + .WillRepeatedly(Return(OptRef{connection_})); + EXPECT_CALL(encoder_callbacks_, connection()) + .WillRepeatedly(Return(OptRef{connection_})); // Pointing dispatcher_.time_system_ to a SimulatedTimeSystem object. test_time_ = new Envoy::Event::SimulatedTimeSystem(); @@ -173,8 +183,6 @@ class HttpFilterTest : public testing::Test { if (final_expected_grpc_service_.has_value()) { EXPECT_TRUE(TestUtility::protoEqual(final_expected_grpc_service_.value(), config_with_hash_key.config())); - std::cout << final_expected_grpc_service_.value().DebugString(); - std::cout << config_with_hash_key.config().DebugString(); } stream_callbacks_ = &callbacks; @@ -190,6 +198,10 @@ class HttpFilterTest : public testing::Test { return stream; } + void doSetDynamicMetadata(const std::string& ns, const ProtobufWkt::Struct& val) { + (*dynamic_metadata_.mutable_filter_metadata())[ns] = val; + }; + void doSend(ProcessingRequest&& request, Unused) { last_request_ = std::move(request); } bool doSendClose() { return !server_closed_stream_; } @@ -549,7 +561,7 @@ class HttpFilterTest : public testing::Test { ExternalProcessorCallbacks* stream_callbacks_ = nullptr; ProcessingRequest last_request_; bool server_closed_stream_ = false; - NiceMock stats_store_; + testing::NiceMock stats_store_; FilterConfigSharedPtr config_; std::shared_ptr filter_; testing::NiceMock dispatcher_; @@ -565,6 +577,8 @@ class HttpFilterTest : public testing::Test { std::vector timers_; TestScopedRuntime scoped_runtime_; Envoy::Event::SimulatedTimeSystem* test_time_; + envoy::config::core::v3::Metadata dynamic_metadata_; + testing::NiceMock connection_; testing::NiceMock local_info_; }; @@ -3107,6 +3121,538 @@ TEST_F(HttpFilterTest, ResponseTrailerMutationExceedSizeLimit) { EXPECT_EQ(1, config_->stats().rejected_header_mutations_.value()); } +TEST_F(HttpFilterTest, MetadataOptionsOverride) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SKIP" + response_header_mode: "SEND" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + metadata_options: + forwarding_namespaces: + untyped: + - untyped_ns_1 + typed: + - typed_ns_1 + receiving_namespaces: + untyped: + - untyped_receiving_ns_1 + )EOF"); + ExtProcPerRoute override_cfg; + const std::string override_yaml = R"EOF( + overrides: + metadata_options: + forwarding_namespaces: + untyped: + - untyped_ns_2 + typed: + - typed_ns_2 + receiving_namespaces: + untyped: + - untyped_receiving_ns_2 + )EOF"; + TestUtility::loadFromYaml(override_yaml, override_cfg); + + FilterConfigPerRoute route_config(override_cfg); + + EXPECT_CALL(decoder_callbacks_, traversePerFilterConfig(_)) + .WillOnce( + testing::Invoke([&](std::function cb) { + cb(route_config); + })); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + response_headers_.addCopy(LowerCaseString("content-length"), "3"); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + processResponseHeaders(false, absl::nullopt); + + ASSERT_EQ(filter_->encodingState().untypedForwardingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->encodingState().untypedForwardingMetadataNamespaces()[0], "untyped_ns_2"); + ASSERT_EQ(filter_->decodingState().untypedForwardingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->decodingState().untypedForwardingMetadataNamespaces()[0], "untyped_ns_2"); + + ASSERT_EQ(filter_->encodingState().typedForwardingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->decodingState().typedForwardingMetadataNamespaces()[0], "typed_ns_2"); + + ASSERT_EQ(filter_->encodingState().untypedReceivingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->encodingState().untypedReceivingMetadataNamespaces()[0], + "untyped_receiving_ns_2"); + ASSERT_EQ(filter_->decodingState().untypedReceivingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->decodingState().untypedReceivingMetadataNamespaces()[0], + "untyped_receiving_ns_2"); + + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + + filter_->onDestroy(); +} + +// Validate that when metadata options are not specified as an override, the less-specific +// namespaces lists are used. +TEST_F(HttpFilterTest, MetadataOptionsNoOverride) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SKIP" + response_header_mode: "SEND" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + metadata_options: + forwarding_namespaces: + untyped: + - untyped_ns_1 + typed: + - typed_ns_1 + receiving_namespaces: + untyped: + - untyped_receiving_ns_1 + )EOF"); + ExtProcPerRoute override_cfg; + const std::string override_yaml = R"EOF( + overrides: {} + )EOF"; + TestUtility::loadFromYaml(override_yaml, override_cfg); + + FilterConfigPerRoute route_config(override_cfg); + + EXPECT_CALL(decoder_callbacks_, traversePerFilterConfig(_)) + .WillOnce( + testing::Invoke([&](std::function cb) { + cb(route_config); + })); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + response_headers_.addCopy(LowerCaseString("content-length"), "3"); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + processResponseHeaders(false, absl::nullopt); + + ASSERT_EQ(filter_->encodingState().untypedForwardingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->encodingState().untypedForwardingMetadataNamespaces()[0], "untyped_ns_1"); + ASSERT_EQ(filter_->decodingState().untypedForwardingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->decodingState().untypedForwardingMetadataNamespaces()[0], "untyped_ns_1"); + + ASSERT_EQ(filter_->encodingState().typedForwardingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->decodingState().typedForwardingMetadataNamespaces()[0], "typed_ns_1"); + + ASSERT_EQ(filter_->encodingState().untypedReceivingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->encodingState().untypedReceivingMetadataNamespaces()[0], + "untyped_receiving_ns_1"); + ASSERT_EQ(filter_->decodingState().untypedReceivingMetadataNamespaces().size(), 1); + EXPECT_EQ(filter_->decodingState().untypedReceivingMetadataNamespaces()[0], + "untyped_receiving_ns_1"); + + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + + filter_->onDestroy(); +} + +// Verify that the filter sets the processing request with dynamic metadata +// including when the metadata is on the connection stream info +TEST_F(HttpFilterTest, SendDynamicMetadata) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SKIP" + response_header_mode: "SEND" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + metadata_options: + forwarding_namespaces: + untyped: + - connection.and.request.have.data + - connection.has.data + - request.has.data + - neither.have.data + - untyped.and.typed.connection.data + - typed.connection.data + - untyped.connection.data + typed: + - untyped.and.typed.connection.data + - typed.connection.data + - typed.request.data + - untyped.connection.data + )EOF"); + + const std::string request_yaml = R"EOF( + filter_metadata: + connection.and.request.have.data: + data: request + request.has.data: + data: request + typed_filter_metadata: + typed.request.data: + # We are using ExtProcOverrides just because we know it is built and imported already. + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExtProcOverrides + request_attributes: + - request_typed + )EOF"; + + const std::string connection_yaml = R"EOF( + filter_metadata: + connection.and.request.have.data: + data: connection_untyped + connection.has.data: + data: connection_untyped + untyped.and.typed.connection.data: + data: connection_untyped + untyped.connection.data: + data: connection_untyped + not.selected.data: + data: connection_untyped + typed_filter_metadata: + untyped.and.typed.connection.data: + # We are using ExtProcOverrides just because we know it is built and imported already. + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExtProcOverrides + request_attributes: + - connection_typed + typed.connection.data: + # We are using ExtProcOverrides just because we know it is built and imported already. + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExtProcOverrides + request_attributes: + - connection_typed + not.selected.data: + # We are using ExtProcOverrides just because we know it is built and imported already. + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExtProcOverrides + request_attributes: + - connection_typed + )EOF"; + + envoy::config::core::v3::Metadata connection_metadata; + TestUtility::loadFromYaml(request_yaml, dynamic_metadata_); + TestUtility::loadFromYaml(connection_yaml, connection_metadata); + connection_.stream_info_.metadata_ = connection_metadata; + + Buffer::OwnedImpl empty_chunk; + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + // ensure the metadata that is attached to the processing request is identical to + // the metadata we specified above + EXPECT_EQ("request", last_request_.metadata_context() + .filter_metadata() + .at("connection.and.request.have.data") + .fields() + .at("data") + .string_value()); + + EXPECT_EQ("request", last_request_.metadata_context() + .filter_metadata() + .at("request.has.data") + .fields() + .at("data") + .string_value()); + + EXPECT_EQ("connection_untyped", last_request_.metadata_context() + .filter_metadata() + .at("connection.has.data") + .fields() + .at("data") + .string_value()); + + EXPECT_EQ("connection_untyped", last_request_.metadata_context() + .filter_metadata() + .at("untyped.and.typed.connection.data") + .fields() + .at("data") + .string_value()); + + EXPECT_EQ(0, last_request_.metadata_context().filter_metadata().count("neither.have.data")); + + EXPECT_EQ(0, last_request_.metadata_context().filter_metadata().count("not.selected.data")); + + EXPECT_EQ(0, last_request_.metadata_context().filter_metadata().count("typed.connection.data")); + + envoy::extensions::filters::http::ext_proc::v3::ExtProcOverrides typed_any; + last_request_.metadata_context() + .typed_filter_metadata() + .at("typed.connection.data") + .UnpackTo(&typed_any); + ASSERT_EQ(1, typed_any.request_attributes().size()); + EXPECT_EQ("connection_typed", typed_any.request_attributes()[0]); + + last_request_.metadata_context() + .typed_filter_metadata() + .at("untyped.and.typed.connection.data") + .UnpackTo(&typed_any); + ASSERT_EQ(1, typed_any.request_attributes().size()); + EXPECT_EQ("connection_typed", typed_any.request_attributes()[0]); + + EXPECT_EQ( + 0, last_request_.metadata_context().typed_filter_metadata().count("untyped.connection.data")); + + EXPECT_EQ(0, last_request_.metadata_context().typed_filter_metadata().count("not.selected.data")); + + processResponseHeaders(false, absl::nullopt); + + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + + filter_->onDestroy(); +} + +// Verify that when returning an response with dynamic_metadata field set, the filter emits +// dynamic metadata. +TEST_F(HttpFilterTest, EmitDynamicMetadata) { + // Configure the filter to only pass response headers to ext server. + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SKIP" + response_header_mode: "SEND" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + metadata_options: + receiving_namespaces: + untyped: + - envoy.filters.http.ext_proc + )EOF"); + + Buffer::OwnedImpl empty_chunk; + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, false)); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + processResponseHeaders(false, [](const HttpHeaders&, ProcessingResponse& resp, HeadersResponse&) { + ProtobufWkt::Struct foobar; + (*foobar.mutable_fields())["foo"].set_string_value("bar"); + auto metadata_mut = resp.mutable_dynamic_metadata()->mutable_fields(); + auto mut_struct = (*metadata_mut)["envoy.filters.http.ext_proc"].mutable_struct_value(); + *mut_struct = foobar; + }); + + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + + EXPECT_EQ("bar", dynamic_metadata_.filter_metadata() + .at("envoy.filters.http.ext_proc") + .fields() + .at("foo") + .string_value()); + + filter_->onDestroy(); +} + +// Verify that when returning an response with dynamic_metadata field set, the filter emits +// dynamic metadata to namespaces other than its own. +TEST_F(HttpFilterTest, EmitDynamicMetadataArbitraryNamespace) { + // Configure the filter to only pass response headers to ext server. + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SKIP" + response_header_mode: "SEND" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + metadata_options: + receiving_namespaces: + untyped: + - envoy.filters.http.ext_authz + )EOF"); + + Buffer::OwnedImpl empty_chunk; + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, false)); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + processResponseHeaders(false, [](const HttpHeaders&, ProcessingResponse& resp, HeadersResponse&) { + ProtobufWkt::Struct foobar; + (*foobar.mutable_fields())["foo"].set_string_value("bar"); + auto metadata_mut = resp.mutable_dynamic_metadata()->mutable_fields(); + auto mut_struct = (*metadata_mut)["envoy.filters.http.ext_authz"].mutable_struct_value(); + *mut_struct = foobar; + }); + + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + + EXPECT_EQ("bar", dynamic_metadata_.filter_metadata() + .at("envoy.filters.http.ext_authz") + .fields() + .at("foo") + .string_value()); + + filter_->onDestroy(); +} + +// Verify that when returning an response with dynamic_metadata field set, the +// filter does not emit metadata when no allowed namespaces are configured. +TEST_F(HttpFilterTest, DisableEmitDynamicMetadata) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SEND" + response_header_mode: "SEND" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + )EOF"); + + Buffer::OwnedImpl empty_chunk; + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + processRequestHeaders(false, absl::nullopt); + + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + processResponseHeaders(false, [](const HttpHeaders&, ProcessingResponse& resp, HeadersResponse&) { + ProtobufWkt::Struct foobar; + (*foobar.mutable_fields())["foo"].set_string_value("bar"); + auto metadata_mut = resp.mutable_dynamic_metadata()->mutable_fields(); + auto mut_struct = (*metadata_mut)["envoy.filters.http.ext_proc"].mutable_struct_value(); + *mut_struct = foobar; + }); + + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + + EXPECT_EQ(0, dynamic_metadata_.filter_metadata().size()); + + filter_->onDestroy(); +} + +// Verify that when returning an response with dynamic_metadata field set, the +// filter does not emit metadata to namespaces which are not allowed. +TEST_F(HttpFilterTest, DisableEmittingDynamicMetadataToDisallowedNamespaces) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SEND" + response_header_mode: "SEND" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + metadata_options: + receiving_namespaces: + untyped: + - envoy.filters.http.ext_proc + )EOF"); + + Buffer::OwnedImpl empty_chunk; + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + processRequestHeaders(false, absl::nullopt); + + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + processResponseHeaders(false, [](const HttpHeaders&, ProcessingResponse& resp, HeadersResponse&) { + ProtobufWkt::Struct foobar; + (*foobar.mutable_fields())["foo"].set_string_value("bar"); + auto metadata_mut = resp.mutable_dynamic_metadata()->mutable_fields(); + auto mut_struct = (*metadata_mut)["envoy.filters.http.ext_authz"].mutable_struct_value(); + *mut_struct = foobar; + }); + + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + + EXPECT_EQ(0, dynamic_metadata_.filter_metadata().size()); + + filter_->onDestroy(); +} + +// Verify that when returning an response with dynamic_metadata field set, the filter emits +// dynamic metadata and later emissions overwrite earlier ones. +TEST_F(HttpFilterTest, EmitDynamicMetadataUseLast) { + // Configure the filter to only pass response headers to ext server. + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SEND" + response_header_mode: "SEND" + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + metadata_options: + receiving_namespaces: + untyped: + - envoy.filters.http.ext_proc + )EOF"); + + Buffer::OwnedImpl empty_chunk; + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + processRequestHeaders(false, [](const HttpHeaders&, ProcessingResponse& resp, HeadersResponse&) { + ProtobufWkt::Struct batbaz; + (*batbaz.mutable_fields())["bat"].set_string_value("baz"); + auto metadata_mut = resp.mutable_dynamic_metadata()->mutable_fields(); + auto mut_struct = (*metadata_mut)["envoy.filters.http.ext_proc"].mutable_struct_value(); + *mut_struct = batbaz; + }); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + processResponseHeaders(false, [](const HttpHeaders&, ProcessingResponse& resp, HeadersResponse&) { + ProtobufWkt::Struct foobar; + (*foobar.mutable_fields())["foo"].set_string_value("bar"); + auto metadata_mut = resp.mutable_dynamic_metadata()->mutable_fields(); + auto mut_struct = (*metadata_mut)["envoy.filters.http.ext_proc"].mutable_struct_value(); + *mut_struct = foobar; + }); + + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(empty_chunk, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + + EXPECT_FALSE(dynamic_metadata_.filter_metadata() + .at("envoy.filters.http.ext_proc") + .fields() + .contains("bat")); + + EXPECT_EQ("bar", dynamic_metadata_.filter_metadata() + .at("envoy.filters.http.ext_proc") + .fields() + .at("foo") + .string_value()); + + filter_->onDestroy(); +} + class HttpFilter2Test : public HttpFilterTest, public ::Envoy::Http::HttpConnectionManagerImplMixin {}; diff --git a/test/integration/filters/BUILD b/test/integration/filters/BUILD index 867bca9cef3e..a81d62141edd 100644 --- a/test/integration/filters/BUILD +++ b/test/integration/filters/BUILD @@ -913,6 +913,7 @@ envoy_cc_test_library( "//envoy/http:filter_interface", "//envoy/registry", "//envoy/server:filter_config_interface", + "//source/common/protobuf", "//source/extensions/filters/http/common:pass_through_filter_lib", "//test/extensions/filters/http/common:empty_http_filter_config_lib", ], diff --git a/test/integration/filters/stream_info_to_headers_filter.cc b/test/integration/filters/stream_info_to_headers_filter.cc index 41077688ff15..489e4d561440 100644 --- a/test/integration/filters/stream_info_to_headers_filter.cc +++ b/test/integration/filters/stream_info_to_headers_filter.cc @@ -1,6 +1,7 @@ #include "envoy/registry/registry.h" #include "envoy/server/filter_config.h" +#include "source/common/protobuf/protobuf.h" #include "source/extensions/filters/http/common/pass_through_filter.h" #include "test/extensions/filters/http/common/empty_http_filter_config.h" @@ -15,6 +16,38 @@ std::string toUsec(MonotonicTime time) { return absl::StrCat(time.time_since_epo } // namespace +void addValueHeaders(Http::ResponseHeaderMap& headers, std::string key_prefix, + const ProtobufWkt::Value& val) { + switch (val.kind_case()) { + case ProtobufWkt::Value::kNullValue: + headers.addCopy(Http::LowerCaseString(key_prefix), "null"); + break; + case ProtobufWkt::Value::kNumberValue: + headers.addCopy(Http::LowerCaseString(key_prefix), std::to_string(val.number_value())); + break; + case ProtobufWkt::Value::kStringValue: + headers.addCopy(Http::LowerCaseString(key_prefix), val.string_value()); + break; + case ProtobufWkt::Value::kBoolValue: + headers.addCopy(Http::LowerCaseString(key_prefix), val.bool_value() ? "true" : "false"); + break; + case ProtobufWkt::Value::kListValue: { + const auto& vals = val.list_value().values(); + for (auto i = 0; i < vals.size(); ++i) { + addValueHeaders(headers, key_prefix + "." + std::to_string(i), vals[i]); + } + break; + } + case ProtobufWkt::Value::kStructValue: + for (const auto& field : val.struct_value().fields()) { + addValueHeaders(headers, key_prefix + "." + field.first, field.second); + } + break; + default: + break; + } +} + // A filter that sticks stream info into headers for integration testing. class StreamInfoToHeadersFilter : public Http::PassThroughFilter { public: @@ -97,6 +130,17 @@ class StreamInfoToHeadersFilter : public Http::PassThroughFilter { upstream_timing.connectionPoolCallbackLatency().value().count()); } } + + if (decoder_callbacks_->streamInfo().dynamicMetadata().filter_metadata_size() > 0) { + const auto& md = decoder_callbacks_->streamInfo().dynamicMetadata().filter_metadata(); + for (const auto& md_entry : md) { + std::string key_prefix = md_entry.first; + for (const auto& field : md_entry.second.fields()) { + addValueHeaders(headers, key_prefix + "." + field.first, field.second); + } + } + } + return Http::FilterHeadersStatus::Continue; } }; From f52f0b37a461975e9053b198375a347628dbbd7d Mon Sep 17 00:00:00 2001 From: Jacob Bohanon Date: Tue, 13 Feb 2024 11:50:10 -0500 Subject: [PATCH 3/3] ext_proc: revise service api for attributes (#32176) --------- Signed-off-by: Jacob Bohanon --- api/envoy/service/ext_proc/v3/BUILD | 1 + .../ext_proc/v3/external_processor.proto | 20 ++++--- .../filters/http/ext_proc/ext_proc.cc | 57 ++++++++++--------- .../filters/http/ext_proc/ext_proc.h | 4 +- .../filters/http/ext_proc/processor_state.h | 43 ++++++++++++++ test/extensions/filters/http/ext_proc/BUILD | 1 + .../ext_proc/ext_proc_integration_test.cc | 57 +++++++++++++++---- 7 files changed, 136 insertions(+), 47 deletions(-) diff --git a/api/envoy/service/ext_proc/v3/BUILD b/api/envoy/service/ext_proc/v3/BUILD index 0e337d5c3ed1..37704a324955 100644 --- a/api/envoy/service/ext_proc/v3/BUILD +++ b/api/envoy/service/ext_proc/v3/BUILD @@ -7,6 +7,7 @@ licenses(["notice"]) # Apache 2 api_proto_package( has_services = True, deps = [ + "//envoy/annotations:pkg", "//envoy/config/core/v3:pkg", "//envoy/extensions/filters/http/ext_proc/v3:pkg", "//envoy/type/v3:pkg", diff --git a/api/envoy/service/ext_proc/v3/external_processor.proto b/api/envoy/service/ext_proc/v3/external_processor.proto index 0813bdf6d724..273bf702a382 100644 --- a/api/envoy/service/ext_proc/v3/external_processor.proto +++ b/api/envoy/service/ext_proc/v3/external_processor.proto @@ -9,6 +9,7 @@ import "envoy/type/v3/http_status.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/struct.proto"; +import "envoy/annotations/deprecation.proto"; import "udpa/annotations/status.proto"; import "validate/validate.proto"; @@ -56,7 +57,7 @@ service ExternalProcessor { // This represents the different types of messages that Envoy can send // to an external processing server. -// [#next-free-field: 9] +// [#next-free-field: 10] message ProcessingRequest { // Specify whether the filter that sent this request is running in synchronous // or asynchronous mode. The choice of synchronous or asynchronous mode @@ -118,6 +119,12 @@ message ProcessingRequest { // Dynamic metadata associated with the request. config.core.v3.Metadata metadata_context = 8; + + // The values of properties selected by the ``request_attributes`` + // or ``response_attributes`` list in the configuration. Each entry + // in the list is populated from the standard + // :ref:`attributes ` supported across Envoy. + map attributes = 9; } // For every ProcessingRequest received by the server with the ``async_mode`` field @@ -210,12 +217,11 @@ message HttpHeaders { config.core.v3.HeaderMap headers = 1; // [#not-implemented-hide:] - // The values of properties selected by the ``request_attributes`` - // or ``response_attributes`` list in the configuration. Each entry - // in the list is populated - // from the standard :ref:`attributes ` - // supported across Envoy. - map attributes = 2; + // This field is deprecated and not implemented. Attributes will be sent in + // the top-level :ref:`attributes attributes = 2 + [deprecated = true, (envoy.annotations.deprecated_at_minor_version) = "3.0"]; // If true, then there is no message body associated with this // request or response. diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 4b41dbdc57a2..3e68fcae7568 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -1,5 +1,7 @@ #include "source/extensions/filters/http/ext_proc/ext_proc.h" +#include + #include "envoy/config/common/mutation_rules/v3/mutation_rules.pb.h" #include "envoy/config/core/v3/grpc_service.pb.h" #include "envoy/extensions/filters/http/ext_proc/v3/processing_mode.pb.h" @@ -276,8 +278,7 @@ void Filter::onDestroy() { } FilterHeadersStatus Filter::onHeaders(ProcessorState& state, - Http::RequestOrResponseHeaderMap& headers, bool end_stream, - ProtobufWkt::Struct* proto) { + Http::RequestOrResponseHeaderMap& headers, bool end_stream) { switch (openStream()) { case StreamOpenState::Error: return FilterHeadersStatus::StopIteration; @@ -291,14 +292,12 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state, state.setHeaders(&headers); state.setHasNoBody(end_stream); ProcessingRequest req; + addAttributes(state, req); addDynamicMetadata(state, req); auto* headers_req = state.mutableHeaders(req); MutationUtils::headersToProto(headers, config_->allowedHeaders(), config_->disallowedHeaders(), *headers_req->mutable_headers()); headers_req->set_end_of_stream(end_stream); - if (proto != nullptr) { - (*headers_req->mutable_attributes())[FilterName] = *proto; - } state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), ProcessorState::CallbackState::HeadersCallback); ENVOY_LOG(debug, "Sending headers message"); @@ -315,19 +314,14 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_st decoding_state_.setCompleteBodyAvailable(true); } + // Set the request headers on decoding and encoding state in case they are + // needed later. + decoding_state_.setRequestHeaders(&headers); + encoding_state_.setRequestHeaders(&headers); + FilterHeadersStatus status = FilterHeadersStatus::Continue; if (decoding_state_.sendHeaders()) { - ProtobufWkt::Struct proto; - - if (config_->expressionManager().hasRequestExpr()) { - auto activation_ptr = Filters::Common::Expr::createActivation( - &config_->expressionManager().localInfo(), decoding_state_.callbacks()->streamInfo(), - &headers, nullptr, nullptr); - proto = config_->expressionManager().evaluateRequestAttributes(*activation_ptr); - } - - status = onHeaders(decoding_state_, headers, end_stream, - config_->expressionManager().hasRequestExpr() ? &proto : nullptr); + status = onHeaders(decoding_state_, headers, end_stream); ENVOY_LOG(trace, "onHeaders returning {}", static_cast(status)); } else { ENVOY_LOG(trace, "decodeHeaders: Skipped header processing"); @@ -590,7 +584,7 @@ FilterTrailersStatus Filter::onTrailers(ProcessorState& state, Http::HeaderMap& FilterTrailersStatus Filter::decodeTrailers(RequestTrailerMap& trailers) { ENVOY_LOG(trace, "decodeTrailers"); const auto status = onTrailers(decoding_state_, trailers); - ENVOY_LOG(trace, "encodeTrailers returning {}", static_cast(status)); + ENVOY_LOG(trace, "decodeTrailers returning {}", static_cast(status)); return status; } @@ -605,17 +599,7 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s FilterHeadersStatus status = FilterHeadersStatus::Continue; if (!processing_complete_ && encoding_state_.sendHeaders()) { - ProtobufWkt::Struct proto; - - if (config_->expressionManager().hasResponseExpr()) { - auto activation_ptr = Filters::Common::Expr::createActivation( - &config_->expressionManager().localInfo(), encoding_state_.callbacks()->streamInfo(), - nullptr, &headers, nullptr); - proto = config_->expressionManager().evaluateResponseAttributes(*activation_ptr); - } - - status = onHeaders(encoding_state_, headers, end_stream, - config_->expressionManager().hasResponseExpr() ? &proto : nullptr); + status = onHeaders(encoding_state_, headers, end_stream); ENVOY_LOG(trace, "onHeaders returns {}", static_cast(status)); } else { ENVOY_LOG(trace, "encodeHeaders: Skipped header processing"); @@ -650,6 +634,7 @@ ProcessingRequest Filter::setupBodyChunk(ProcessorState& state, const Buffer::In bool end_stream) { ENVOY_LOG(debug, "Sending a body chunk of {} bytes, end_stream {}", data.length(), end_stream); ProcessingRequest req; + addAttributes(state, req); addDynamicMetadata(state, req); auto* body_req = state.mutableBody(req); body_req->set_end_of_stream(end_stream); @@ -667,6 +652,7 @@ void Filter::sendBodyChunk(ProcessorState& state, ProcessorState::CallbackState void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers) { ProcessingRequest req; + addAttributes(state, req); addDynamicMetadata(state, req); auto* trailers_req = state.mutableTrailers(req); MutationUtils::headersToProto(trailers, config_->allowedHeaders(), config_->disallowedHeaders(), @@ -771,6 +757,21 @@ void Filter::addDynamicMetadata(const ProcessorState& state, ProcessingRequest& *req.mutable_metadata_context() = forwarding_metadata; } +void Filter::addAttributes(ProcessorState& state, ProcessingRequest& req) { + if (!state.sendAttributes(config_->expressionManager())) { + return; + } + + auto activation_ptr = Filters::Common::Expr::createActivation( + &config_->expressionManager().localInfo(), state.callbacks()->streamInfo(), + state.requestHeaders(), dynamic_cast(state.responseHeaders()), + dynamic_cast(state.responseTrailers())); + auto attributes = state.evaluateAttributes(config_->expressionManager(), *activation_ptr); + + state.setSentAttributes(true); + (*req.mutable_attributes())[FilterName] = attributes; +} + void Filter::setDynamicMetadata(Http::StreamFilterCallbacks* cb, const ProcessorState& state, const ProcessingResponse& response) { if (state.untypedReceivingMetadataNamespaces().empty() || !response.has_dynamic_metadata()) { diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 78d97cbe9bab..1ef14a7c2e81 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -373,8 +373,7 @@ class Filter : public Logger::Loggable, void sendImmediateResponse(const envoy::service::ext_proc::v3::ImmediateResponse& response); Http::FilterHeadersStatus onHeaders(ProcessorState& state, - Http::RequestOrResponseHeaderMap& headers, bool end_stream, - ProtobufWkt::Struct* proto); + Http::RequestOrResponseHeaderMap& headers, bool end_stream); // Return a pair of whether to terminate returning the current result. std::pair sendStreamChunk(ProcessorState& state); @@ -386,6 +385,7 @@ class Filter : public Logger::Loggable, void setDecoderDynamicMetadata(const envoy::service::ext_proc::v3::ProcessingResponse& response); void addDynamicMetadata(const ProcessorState& state, envoy::service::ext_proc::v3::ProcessingRequest& req); + void addAttributes(ProcessorState& state, envoy::service::ext_proc::v3::ProcessingRequest& req); const FilterConfigSharedPtr config_; const ExternalProcessorClientPtr client_; diff --git a/source/extensions/filters/http/ext_proc/processor_state.h b/source/extensions/filters/http/ext_proc/processor_state.h index a3ebe27617f0..01e9bc57ae59 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.h +++ b/source/extensions/filters/http/ext_proc/processor_state.h @@ -15,6 +15,7 @@ #include "source/common/common/logger.h" #include "absl/status/status.h" +#include "matching_utils.h" namespace Envoy { namespace Extensions { @@ -134,8 +135,12 @@ class ProcessorState : public Logger::Loggable { return body_mode_; } + void setRequestHeaders(Http::RequestHeaderMap* headers) { request_headers_ = headers; } void setHeaders(Http::RequestOrResponseHeaderMap* headers) { headers_ = headers; } void setTrailers(Http::HeaderMap* trailers) { trailers_ = trailers; } + const Http::RequestHeaderMap* requestHeaders() const { return request_headers_; }; + virtual const Http::RequestOrResponseHeaderMap* responseHeaders() const PURE; + const Http::HeaderMap* responseTrailers() const { return trailers_; } void onStartProcessorCall(Event::TimerCb cb, std::chrono::milliseconds timeout, CallbackState callback_state); @@ -202,6 +207,14 @@ class ProcessorState : public Logger::Loggable { virtual Http::StreamFilterCallbacks* callbacks() const PURE; + virtual bool sendAttributes(const ExpressionManager& mgr) const PURE; + + void setSentAttributes(bool sent) { attributes_sent_ = sent; } + + virtual ProtobufWkt::Struct + evaluateAttributes(const ExpressionManager& mgr, + const Filters::Common::Expr::Activation& activation) const PURE; + protected: void setBodyMode( envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode body_mode); @@ -236,6 +249,10 @@ class ProcessorState : public Logger::Loggable { // The specific mode for body handling envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode body_mode_; + // The request_headers_ field is guaranteed to hold the pointer to the request + // headers as set in decodeHeaders. This allows both decoding and encoding states + // to have access to the request headers map. + Http::RequestHeaderMap* request_headers_ = nullptr; Http::RequestOrResponseHeaderMap* headers_ = nullptr; Http::HeaderMap* trailers_ = nullptr; Event::TimerPtr message_timer_; @@ -250,6 +267,9 @@ class ProcessorState : public Logger::Loggable { const std::vector* typed_forwarding_namespaces_{}; const std::vector* untyped_receiving_namespaces_{}; + // If true, the attributes for this processing state have already been sent. + bool attributes_sent_{}; + private: virtual void clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse&) {} }; @@ -324,6 +344,17 @@ class DecodingProcessorState : public ProcessorState { Http::StreamFilterCallbacks* callbacks() const override { return decoder_callbacks_; } + bool sendAttributes(const ExpressionManager& mgr) const override { + return !attributes_sent_ && mgr.hasRequestExpr(); + } + + const Http::RequestOrResponseHeaderMap* responseHeaders() const override { return nullptr; } + ProtobufWkt::Struct + evaluateAttributes(const ExpressionManager& mgr, + const Filters::Common::Expr::Activation& activation) const override { + return mgr.evaluateRequestAttributes(activation); + } + private: void setProcessingModeInternal( const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode); @@ -404,6 +435,18 @@ class EncodingProcessorState : public ProcessorState { Http::StreamFilterCallbacks* callbacks() const override { return encoder_callbacks_; } + bool sendAttributes(const ExpressionManager& mgr) const override { + return !attributes_sent_ && mgr.hasResponseExpr(); + } + + const Http::RequestOrResponseHeaderMap* responseHeaders() const override { return headers_; } + + ProtobufWkt::Struct + evaluateAttributes(const ExpressionManager& mgr, + const Filters::Common::Expr::Activation& activation) const override { + return mgr.evaluateResponseAttributes(activation); + } + private: void setProcessingModeInternal( const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode); diff --git a/test/extensions/filters/http/ext_proc/BUILD b/test/extensions/filters/http/ext_proc/BUILD index 9d511fbd9d8a..13971f8250e4 100644 --- a/test/extensions/filters/http/ext_proc/BUILD +++ b/test/extensions/filters/http/ext_proc/BUILD @@ -158,6 +158,7 @@ envoy_extension_cc_test( "//test/proto:helloworld_proto_cc_proto", "//test/test_common:test_runtime_lib", "//test/test_common:utility_lib", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/filters/http/set_metadata/v3:pkg_cc_proto", "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index d0becb1f2bab..fb936314551d 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -1,6 +1,7 @@ #include #include +#include "envoy/config/core/v3/base.pb.h" #include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.h" #include "envoy/extensions/filters/http/set_metadata/v3/set_metadata.pb.h" #include "envoy/network/address.h" @@ -293,7 +294,6 @@ class ExtProcIntegrationTest : public HttpIntegrationTest, ASSERT_TRUE(processor_connection_->waitForNewStream(*dispatcher_, processor_stream_)); } ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, request)); - ASSERT_TRUE(request.has_request_headers()); if (first_message) { processor_stream_->startGrpcStream(); } @@ -3429,45 +3429,82 @@ TEST_P(ExtProcIntegrationTest, SendAndReceiveDynamicMetadata) { } #if defined(USE_CEL_PARSER) -// Test the filter using the default configuration by connecting to -// an ext_proc server that responds to the request_headers message -// by requesting to modify the request headers. -TEST_P(ExtProcIntegrationTest, GetAndSetRequestResponseAttributes) { +TEST_P(ExtProcIntegrationTest, RequestResponseAttributes) { proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_response_trailer_mode(ProcessingMode::SEND); proto_config_.mutable_request_attributes()->Add("request.path"); proto_config_.mutable_request_attributes()->Add("request.method"); proto_config_.mutable_request_attributes()->Add("request.scheme"); proto_config_.mutable_request_attributes()->Add("connection.mtls"); + proto_config_.mutable_request_attributes()->Add("response.code"); proto_config_.mutable_response_attributes()->Add("response.code"); proto_config_.mutable_response_attributes()->Add("response.code_details"); initializeConfig(); HttpIntegrationTest::initialize(); auto response = sendDownstreamRequest(absl::nullopt); - processRequestHeadersMessage( - *grpc_upstreams_[0], true, [](const HttpHeaders& req, HeadersResponse&) { + + // Handle request headers message. + processGenericMessage( + *grpc_upstreams_[0], true, [](const ProcessingRequest& req, ProcessingResponse& resp) { + // Add something to the response so the message isn't seen as spurious + envoy::service::ext_proc::v3::HeadersResponse headers_resp; + *(resp.mutable_request_headers()) = headers_resp; + + EXPECT_TRUE(req.has_request_headers()); EXPECT_EQ(req.attributes().size(), 1); auto proto_struct = req.attributes().at("envoy.filters.http.ext_proc"); EXPECT_EQ(proto_struct.fields().at("request.path").string_value(), "/"); EXPECT_EQ(proto_struct.fields().at("request.method").string_value(), "GET"); EXPECT_EQ(proto_struct.fields().at("request.scheme").string_value(), "http"); EXPECT_EQ(proto_struct.fields().at("connection.mtls").bool_value(), false); + // Make sure we did not include the attribute which was not yet available. + EXPECT_EQ(proto_struct.fields().size(), 4); + EXPECT_FALSE(proto_struct.fields().contains("response.code")); + + // Make sure we are not including any data in the deprecated HttpHeaders.attributes. + EXPECT_TRUE(req.request_headers().attributes().empty()); return true; }); - handleUpstreamRequest(); + handleUpstreamRequestWithTrailer(); - processResponseHeadersMessage( - *grpc_upstreams_[0], false, [](const HttpHeaders& req, HeadersResponse&) { + // Handle response headers message. + processGenericMessage( + *grpc_upstreams_[0], false, [](const ProcessingRequest& req, ProcessingResponse& resp) { + // Add something to the response so the message isn't seen as spurious + envoy::service::ext_proc::v3::HeadersResponse headers_resp; + *(resp.mutable_response_headers()) = headers_resp; + + EXPECT_TRUE(req.has_response_headers()); EXPECT_EQ(req.attributes().size(), 1); auto proto_struct = req.attributes().at("envoy.filters.http.ext_proc"); EXPECT_EQ(proto_struct.fields().at("response.code").string_value(), "200"); EXPECT_EQ(proto_struct.fields().at("response.code_details").string_value(), StreamInfo::ResponseCodeDetails::get().ViaUpstream); + + // Make sure we didn't include request attributes in the response-path processing request. + EXPECT_FALSE(proto_struct.fields().contains("request.method")); + + // Make sure we are not including any data in the deprecated HttpHeaders.attributes. + EXPECT_TRUE(req.response_headers().attributes().empty()); return true; }); + // Handle response trailers message, making sure we did not send request or response attributes + // again. + processGenericMessage(*grpc_upstreams_[0], false, + [](const ProcessingRequest& req, ProcessingResponse& resp) { + // Add something to the response so the message isn't seen as spurious + envoy::service::ext_proc::v3::TrailersResponse trailer_resp; + *(resp.mutable_response_trailers()) = trailer_resp; + + EXPECT_TRUE(req.has_response_trailers()); + EXPECT_TRUE(req.attributes().empty()); + return true; + }); + verifyDownstreamResponse(*response, 200); } #endif