Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ext_proc: send attributes #31090

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
87cf9ba
extract attributes changes from #29069
jbohanon Nov 8, 2023
45ec23d
fix ordering test constructor
jbohanon Nov 8, 2023
0a9f228
don't do CEL stuff if no attributes are configured
jbohanon Nov 16, 2023
6826a1c
Merge branch 'main' into feature/ext_proc/request-response-attributes
jbohanon Nov 16, 2023
9857119
fix variable shadowing
jbohanon Nov 17, 2023
c26d44c
Merge branch 'main' into feature/ext_proc/request-response-attributes
jbohanon Nov 17, 2023
db599b9
Merge branch 'main' into feature/ext_proc/request-response-attributes
jbohanon Nov 27, 2023
a4454be
refactor matching utils out
jbohanon Nov 28, 2023
34b8fbc
refactor matching utils out
jbohanon Nov 28, 2023
d5ce702
Merge branch 'main' into feature/ext_proc/request-response-attributes
jbohanon Nov 28, 2023
3631683
fix pointer/references
jbohanon Nov 29, 2023
47c2b46
where are my CEL objects going
jbohanon Nov 30, 2023
86ae01f
fix lifetime issue and clean up
jbohanon Nov 30, 2023
0d8f5d4
fix runtime feature format
jbohanon Nov 30, 2023
dc692e1
declare iter as reference
jbohanon Nov 30, 2023
8a28f0d
Merge branch 'main' into feature/ext_proc/request-response-attributes
jbohanon Nov 30, 2023
8d34e78
fix tests and clean up
jbohanon Nov 30, 2023
fd598b9
skip fuzzing request and response attributes on ASAN_FUZZER due to un…
jbohanon Dec 1, 2023
2954fc7
use unique_ptr instead of absl::optional for struct, reference for ac…
jbohanon Dec 4, 2023
520d511
use object to store pair of parsed, compiled expression
jbohanon Dec 5, 2023
2778b97
Merge branch 'main' into feature/ext_proc/request-response-attributes
jbohanon Dec 5, 2023
f6bbcf7
inline CelExpression initialization into emplace call
jbohanon Dec 15, 2023
fbdbfb4
raw ptr working
jbohanon Dec 15, 2023
5192143
Merge branch 'main' into feature/ext_proc/request-response-attributes
jbohanon Dec 15, 2023
58faac1
update dep allowlist
jbohanon Dec 15, 2023
19fdb00
use serverFactoryContext to get builder singleton
jbohanon Dec 15, 2023
8948d0f
update dep allowlist
jbohanon Dec 15, 2023
fe68f15
PR comments
jbohanon Dec 15, 2023
d6cb932
kick CI
jbohanon Dec 15, 2023
ca13534
Merge branch 'main' into feature/ext_proc/request-response-attributes
jbohanon Dec 19, 2023
ae4258d
plumb through local info
jbohanon Dec 19, 2023
8e02036
kick CI -- RBE cache error in iOS
jbohanon Dec 19, 2023
d4d94e6
add inludes for MockLocalInfo
jbohanon Dec 19, 2023
906183f
Merge branch 'main' into feature/ext_proc/request-response-attributes
jbohanon Dec 20, 2023
ebd053c
Merge branch 'main' into feature/ext_proc/request-response-attributes
jbohanon Jan 8, 2024
7019dd1
kick CI
jbohanon Jan 8, 2024
defdd94
kick CI
jbohanon Jan 9, 2024
c86a731
kick CI
jbohanon Jan 9, 2024
0b3e715
kick CI
jbohanon Jan 15, 2024
374770f
Merge branch 'main' into feature/ext_proc/request-response-attributes
jbohanon Jan 15, 2024
a3e689f
revert unwanted local build change
jbohanon Jan 15, 2024
2c716b7
Merge branch 'main' into feature/ext_proc/request-response-attributes
jbohanon Jan 15, 2024
9948c57
add skip_on_windows due to recent CEL bump
jbohanon Jan 16, 2024
7d58055
Merge branch 'main' into feature/ext_proc/request-response-attributes
jbohanon Jan 16, 2024
fcaa5af
fix BUILD
jbohanon Jan 16, 2024
5fac8a5
skip client build on windows
jbohanon Jan 17, 2024
970fb4d
MORE SKIP ON WINDOWS
jbohanon Jan 17, 2024
a43136e
add filter to windows skip in bazel/repositories.bzl
jbohanon Jan 18, 2024
cfe7006
Merge branch 'main' into feature/ext_proc/request-response-attributes
jbohanon Jan 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ envoy_cc_library(
],
deps = [
":client_interface",
":matching_utils_lib",
":mutation_utils_lib",
"//envoy/event:timer_interface",
"//envoy/http:filter_interface",
Expand Down Expand Up @@ -80,6 +81,31 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "matching_utils_lib",
srcs = ["matching_utils.cc"],
hdrs = ["matching_utils.h"],
copts = select({
"//bazel:windows_x86_64": [],
"//conditions:default": [
"-DUSE_CEL_PARSER",
],
}),
deps = [
"//envoy/http:header_map_interface",
"//source/common/protobuf",
"//source/extensions/filters/common/expr:evaluator_lib",
"@com_google_cel_cpp//eval/public:cel_expr_builder_factory",
] + select(
{
"//bazel:windows_x86_64": [],
"//conditions:default": [
"@com_google_cel_cpp//parser",
],
},
),
)

envoy_cc_library(
name = "client_lib",
srcs = ["client_impl.cc"],
Expand Down
14 changes: 8 additions & 6 deletions source/extensions/filters/http/ext_proc/config.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "source/extensions/filters/http/ext_proc/config.h"

#include "source/extensions/filters/common/expr/evaluator.h"
#include "source/extensions/filters/http/ext_proc/client_impl.h"
#include "source/extensions/filters/http/ext_proc/ext_proc.h"

Expand All @@ -15,9 +16,9 @@ Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromPro
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, message_timeout, DefaultMessageTimeoutMs);
const uint32_t max_message_timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, max_message_timeout, DefaultMaxMessageTimeoutMs);
const auto filter_config =
std::make_shared<FilterConfig>(proto_config, std::chrono::milliseconds(message_timeout_ms),
max_message_timeout_ms, context.scope(), stats_prefix);
const auto filter_config = std::make_shared<FilterConfig>(
proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms,
context.scope(), stats_prefix, Envoy::Extensions::Filters::Common::Expr::getBuilder(context));

return [filter_config, grpc_service = proto_config.grpc_service(),
&context](Http::FilterChainFactoryCallbacks& callbacks) {
Expand Down Expand Up @@ -45,9 +46,10 @@ ExternalProcessingFilterConfig::createFilterFactoryFromProtoWithServerContextTyp
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, message_timeout, DefaultMessageTimeoutMs);
const uint32_t max_message_timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, max_message_timeout, DefaultMaxMessageTimeoutMs);
const auto filter_config =
std::make_shared<FilterConfig>(proto_config, std::chrono::milliseconds(message_timeout_ms),
max_message_timeout_ms, server_context.scope(), stats_prefix);
const auto filter_config = std::make_shared<FilterConfig>(
proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms,
server_context.scope(), stats_prefix,
Envoy::Extensions::Filters::Common::Expr::getBuilder(server_context));

return [filter_config, grpc_service = proto_config.grpc_service(),
&server_context](Http::FilterChainFactoryCallbacks& callbacks) {
Expand Down
24 changes: 21 additions & 3 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ void Filter::onDestroy() {
}

FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream) {
Http::RequestOrResponseHeaderMap& headers, bool end_stream,
absl::optional<ProtobufWkt::Struct> proto) {
jbohanon marked this conversation as resolved.
Show resolved Hide resolved
switch (openStream()) {
case StreamOpenState::Error:
return FilterHeadersStatus::StopIteration;
Expand All @@ -219,6 +220,9 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
MutationUtils::headersToProto(headers, config_->allowedHeaders(), config_->disallowedHeaders(),
*headers_req->mutable_headers());
headers_req->set_end_of_stream(end_stream);
if (proto.has_value()) {
(*headers_req->mutable_attributes())[FilterName] = proto.value();
}
state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(),
ProcessorState::CallbackState::HeadersCallback);
ENVOY_LOG(debug, "Sending headers message");
Expand All @@ -237,7 +241,14 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_st

FilterHeadersStatus status = FilterHeadersStatus::Continue;
if (decoding_state_.sendHeaders()) {
status = onHeaders(decoding_state_, headers, end_stream);
absl::optional<Envoy::ProtobufWkt::Struct> proto;
if (config_->expressionManager().hasRequestExpr()) {
auto activation_ptr = Filters::Common::Expr::createActivation(decoding_state_.streamInfo(),
&headers, nullptr, nullptr);
proto = config_->expressionManager().evaluateRequestAttributes(std::move(activation_ptr));
}

status = onHeaders(decoding_state_, headers, end_stream, proto);
ENVOY_LOG(trace, "onHeaders returning {}", static_cast<int>(status));
} else {
ENVOY_LOG(trace, "decodeHeaders: Skipped header processing");
Expand Down Expand Up @@ -515,7 +526,14 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s

FilterHeadersStatus status = FilterHeadersStatus::Continue;
if (!processing_complete_ && encoding_state_.sendHeaders()) {
status = onHeaders(encoding_state_, headers, end_stream);
absl::optional<Envoy::ProtobufWkt::Struct> proto;
if (config_->expressionManager().hasResponseExpr()) {
auto activation_ptr = Filters::Common::Expr::createActivation(encoding_state_.streamInfo(),
nullptr, &headers, nullptr);
proto = config_->expressionManager().evaluateResponseAttributes(std::move(activation_ptr));
}

status = onHeaders(encoding_state_, headers, end_stream, proto);
ENVOY_LOG(trace, "onHeaders returns {}", static_cast<int>(status));
} else {
ENVOY_LOG(trace, "encodeHeaders: Skipped header processing");
Expand Down
30 changes: 15 additions & 15 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "source/extensions/filters/common/mutation_rules/mutation_rules.h"
#include "source/extensions/filters/http/common/pass_through_filter.h"
#include "source/extensions/filters/http/ext_proc/client.h"
#include "source/extensions/filters/http/ext_proc/matching_utils.h"
#include "source/extensions/filters/http/ext_proc/processor_state.h"

namespace Envoy {
Expand Down Expand Up @@ -126,7 +127,8 @@ class FilterConfig {
FilterConfig(const envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor& config,
const std::chrono::milliseconds message_timeout,
const uint32_t max_message_timeout_ms, Stats::Scope& scope,
const std::string& stats_prefix)
const std::string& stats_prefix,
Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder)
: failure_mode_allow_(config.failure_mode_allow()),
disable_clear_route_cache_(config.disable_clear_route_cache()),
message_timeout_(message_timeout), max_message_timeout_ms_(max_message_timeout_ms),
Expand All @@ -135,8 +137,11 @@ class FilterConfig {
filter_metadata_(config.filter_metadata()),
allow_mode_override_(config.allow_mode_override()),
disable_immediate_response_(config.disable_immediate_response()),
allowed_headers_(initHeaderMatchers(config.forward_rules().allowed_headers())),
disallowed_headers_(initHeaderMatchers(config.forward_rules().disallowed_headers())) {}
allowed_headers_(
MatchingUtils::initHeaderMatchers(config.forward_rules().allowed_headers())),
disallowed_headers_(
MatchingUtils::initHeaderMatchers(config.forward_rules().disallowed_headers())),
expression_manager_(builder, config.request_attributes(), config.response_attributes()) {}

bool failureModeAllow() const { return failure_mode_allow_; }

Expand Down Expand Up @@ -166,23 +171,14 @@ class FilterConfig {

const Envoy::ProtobufWkt::Struct& filterMetadata() const { return filter_metadata_; }

const ExpressionManager& expressionManager() const { return expression_manager_; }

private:
ExtProcFilterStats generateStats(const std::string& prefix,
const std::string& filter_stats_prefix, Stats::Scope& scope) {
const std::string final_prefix = absl::StrCat(prefix, "ext_proc.", filter_stats_prefix);
return {ALL_EXT_PROC_FILTER_STATS(POOL_COUNTER_PREFIX(scope, final_prefix))};
}
const std::vector<Matchers::StringMatcherPtr>
initHeaderMatchers(const envoy::type::matcher::v3::ListStringMatcher& header_list) {
std::vector<Matchers::StringMatcherPtr> header_matchers;
for (const auto& matcher : header_list.patterns()) {
header_matchers.push_back(
std::make_unique<Matchers::StringMatcherImpl<envoy::type::matcher::v3::StringMatcher>>(
matcher));
}
return header_matchers;
}

const bool failure_mode_allow_;
const bool disable_clear_route_cache_;
const std::chrono::milliseconds message_timeout_;
Expand All @@ -201,6 +197,8 @@ class FilterConfig {
const std::vector<Matchers::StringMatcherPtr> allowed_headers_;
// Empty disallowed_header_ means disallow nothing, i.e, allow all.
const std::vector<Matchers::StringMatcherPtr> disallowed_headers_;

const ExpressionManager expression_manager_;
};

using FilterConfigSharedPtr = std::shared_ptr<FilterConfig>;
Expand Down Expand Up @@ -300,7 +298,9 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
void sendImmediateResponse(const envoy::service::ext_proc::v3::ImmediateResponse& response);

Http::FilterHeadersStatus onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream);
Http::RequestOrResponseHeaderMap& headers, bool end_stream,
absl::optional<ProtobufWkt::Struct> proto);

// Return a pair of whether to terminate returning the current result.
std::pair<bool, Http::FilterDataStatus> sendStreamChunk(ProcessorState& state);
Http::FilterDataStatus onData(ProcessorState& state, Buffer::Instance& data, bool end_stream);
Expand Down
84 changes: 84 additions & 0 deletions source/extensions/filters/http/ext_proc/matching_utils.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#include "source/extensions/filters/http/ext_proc/matching_utils.h"

#include <memory>

#if defined(USE_CEL_PARSER)
#include "parser/parser.h"
#endif

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

absl::flat_hash_map<std::string, Filters::Common::Expr::ExpressionPtr>
ExpressionManager::initExpressions(const Protobuf::RepeatedPtrField<std::string>& matchers) {
absl::flat_hash_map<std::string, Filters::Common::Expr::ExpressionPtr> expressions;
#if defined(USE_CEL_PARSER)
for (const auto& matcher : matchers) {
auto parse_status = google::api::expr::parser::Parse(matcher);
jbohanon marked this conversation as resolved.
Show resolved Hide resolved
if (!parse_status.ok()) {
throw EnvoyException("Unable to parse descriptor expression: " +
parse_status.status().ToString());
}

auto iter = expr_list_.emplace_back(parse_status.value());
jbohanon marked this conversation as resolved.
Show resolved Hide resolved

Filters::Common::Expr::ExpressionPtr expression =
Extensions::Filters::Common::Expr::createExpression(builder_->builder(), iter.expr());

expressions.try_emplace(matcher, std::move(expression));
}
#else
ENVOY_LOG(warn, "CEL expression parsing is not available for use in this environment."
" Attempted to parse " +
std::to_string(matchers.size()) + " expressions");
#endif
return expressions;
}

const absl::optional<ProtobufWkt::Struct> ExpressionManager::evaluateAttributes(
jbohanon marked this conversation as resolved.
Show resolved Hide resolved
const Filters::Common::Expr::ActivationPtr& activation,
const absl::flat_hash_map<std::string, Filters::Common::Expr::ExpressionPtr>& expr) {
absl::optional<ProtobufWkt::Struct> proto;
if (!expr.empty()) {
proto.emplace(ProtobufWkt::Struct{});
for (const auto& hash_entry : expr) {
ProtobufWkt::Arena arena;
const auto result = hash_entry.second->Evaluate(*activation, &arena);
if (!result.ok()) {
// TODO: Stats?
continue;
}

if (result.value().IsError()) {
ENVOY_LOG(trace, "error parsing cel expression {}", hash_entry.first);
continue;
}

ProtobufWkt::Value value;
switch (result.value().type()) {
case google::api::expr::runtime::CelValue::Type::kBool:
value.set_bool_value(result.value().BoolOrDie());
break;
case google::api::expr::runtime::CelValue::Type::kNullType:
value.set_null_value(ProtobufWkt::NullValue{});
break;
case google::api::expr::runtime::CelValue::Type::kDouble:
value.set_number_value(result.value().DoubleOrDie());
break;
default:
value.set_string_value(Filters::Common::Expr::print(result.value()));
}

(*(proto.value()).mutable_fields())[hash_entry.first] = value;
}
}

return proto;
}

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
69 changes: 69 additions & 0 deletions source/extensions/filters/http/ext_proc/matching_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#pragma once

#include "source/common/common/logger.h"
#include "source/common/common/matchers.h"
#include "source/common/protobuf/protobuf.h"
#include "source/extensions/filters/common/expr/evaluator.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

class ExpressionManager : public Logger::Loggable<Logger::Id::ext_proc> {
public:
ExpressionManager(Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder,
const Protobuf::RepeatedPtrField<std::string>& request_matchers,
const Protobuf::RepeatedPtrField<std::string>& response_matchers)
: builder_(builder), request_expr_(initExpressions(request_matchers)),
response_expr_(initExpressions(response_matchers)){};

bool hasRequestExpr() const { return !request_expr_.empty(); };

bool hasResponseExpr() const { return !response_expr_.empty(); };

const absl::optional<ProtobufWkt::Struct>
evaluateRequestAttributes(const Filters::Common::Expr::ActivationPtr& activation) const {
jbohanon marked this conversation as resolved.
Show resolved Hide resolved
return evaluateAttributes(activation, request_expr_);
}

const absl::optional<ProtobufWkt::Struct>
evaluateResponseAttributes(const Filters::Common::Expr::ActivationPtr& activation) const {
return evaluateAttributes(activation, response_expr_);
}

static const absl::optional<ProtobufWkt::Struct> evaluateAttributes(
const Filters::Common::Expr::ActivationPtr& activation,
const absl::flat_hash_map<std::string, Filters::Common::Expr::ExpressionPtr>& expr);

private:
// This list is required to maintain the lifetimes of expr objects on which compiled expressions
// depend
std::list<google::api::expr::v1alpha1::ParsedExpr> expr_list_;
absl::flat_hash_map<std::string, Filters::Common::Expr::ExpressionPtr>
initExpressions(const Protobuf::RepeatedPtrField<std::string>& matchers);

Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder_;

const absl::flat_hash_map<std::string, Filters::Common::Expr::ExpressionPtr> request_expr_;
const absl::flat_hash_map<std::string, Filters::Common::Expr::ExpressionPtr> response_expr_;
};

class MatchingUtils : public Logger::Loggable<Logger::Id::ext_proc> {
jbohanon marked this conversation as resolved.
Show resolved Hide resolved
public:
static const std::vector<Matchers::StringMatcherPtr>
initHeaderMatchers(const envoy::type::matcher::v3::ListStringMatcher& header_list) {
std::vector<Matchers::StringMatcherPtr> header_matchers;
for (const auto& matcher : header_list.patterns()) {
header_matchers.push_back(
std::make_unique<Matchers::StringMatcherImpl<envoy::type::matcher::v3::StringMatcher>>(
matcher));
}
return header_matchers;
}
};

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
6 changes: 6 additions & 0 deletions source/extensions/filters/http/ext_proc/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
virtual envoy::service::ext_proc::v3::HttpTrailers*
mutableTrailers(envoy::service::ext_proc::v3::ProcessingRequest& request) const PURE;

virtual StreamInfo::StreamInfo& streamInfo() PURE;

protected:
void setBodyMode(
envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode body_mode);
Expand Down Expand Up @@ -283,6 +285,8 @@ class DecodingProcessorState : public ProcessorState {
void requestWatermark() override;
void clearWatermark() override;

StreamInfo::StreamInfo& streamInfo() override { return decoder_callbacks_->streamInfo(); }

private:
void setProcessingModeInternal(
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode);
Expand Down Expand Up @@ -356,6 +360,8 @@ class EncodingProcessorState : public ProcessorState {
void requestWatermark() override;
void clearWatermark() override;

StreamInfo::StreamInfo& streamInfo() override { return encoder_callbacks_->streamInfo(); }

private:
void setProcessingModeInternal(
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode);
Expand Down
Loading
Loading