Skip to content

Commit

Permalink
Add support for multiple formats in a single ORCA header.
Browse files Browse the repository at this point in the history
Signed-off-by: blake-snyder <[email protected]>
  • Loading branch information
blake-snyder committed Oct 3, 2024
1 parent fd94672 commit 691e5f4
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 63 deletions.
79 changes: 42 additions & 37 deletions source/common/orca/orca_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ const Http::LowerCaseString& endpointLoadMetricsHeaderBin() {
CONSTRUCT_ON_FIRST_USE(Http::LowerCaseString, kEndpointLoadMetricsHeaderBin);
}

const Http::LowerCaseString& endpointLoadMetricsHeaderJson() {
CONSTRUCT_ON_FIRST_USE(Http::LowerCaseString, kEndpointLoadMetricsHeaderJson);
}

absl::Status tryCopyNamedMetricToOrcaLoadReport(absl::string_view metric_name, double metric_value,
OrcaLoadReport& orca_load_report) {
if (metric_name.empty()) {
Expand All @@ -53,14 +49,11 @@ absl::Status tryCopyNamedMetricToOrcaLoadReport(absl::string_view metric_name, d
return absl::OkStatus();
}

std::vector<absl::string_view> parseCommaDelimitedHeader(const HeaderMap::GetResult& entry) {
std::vector<absl::string_view> parseCommaDelimitedHeader(const absl::string_view entry) {
std::vector<absl::string_view> values;
values.reserve(entry.size());
for (size_t i = 0; i < entry.size(); ++i) {
std::vector<absl::string_view> tokens =
Envoy::Http::HeaderUtility::parseCommaDelimitedHeader(entry[i]->value().getStringView());
values.insert(values.end(), tokens.begin(), tokens.end());
}
std::vector<absl::string_view> tokens =
Envoy::Http::HeaderUtility::parseCommaDelimitedHeader(entry);
values.insert(values.end(), tokens.begin(), tokens.end());
return values;
}

Expand Down Expand Up @@ -112,7 +105,7 @@ absl::Status tryCopyMetricToOrcaLoadReport(absl::string_view metric_name,
return absl::OkStatus();
}

absl::Status tryParseNativeHttpEncoded(const HeaderMap::GetResult& header,
absl::Status tryParseNativeHttpEncoded(const absl::string_view header,
OrcaLoadReport& orca_load_report) {
const std::vector<absl::string_view> values = parseCommaDelimitedHeader(header);

Expand All @@ -133,41 +126,53 @@ absl::Status tryParseNativeHttpEncoded(const HeaderMap::GetResult& header,
return absl::OkStatus();
}

absl::Status tryParseSerializedBinary(const absl::string_view header,
OrcaLoadReport& orca_load_report) {
if (header.empty()) {
return absl::InvalidArgumentError("ORCA binary header value is empty");
}
const std::string decoded_value = Envoy::Base64::decode(header);
if (decoded_value.empty()) {
return absl::InvalidArgumentError(
fmt::format("unable to decode ORCA binary header value: {}", header));
}
if (!orca_load_report.ParseFromString(decoded_value)) {
return absl::InvalidArgumentError(
fmt::format("unable to parse binaryheader to OrcaLoadReport: {}", header));
}
return absl::OkStatus();
}

} // namespace

absl::StatusOr<OrcaLoadReport> parseOrcaLoadReportHeaders(const HeaderMap& headers) {
OrcaLoadReport load_report;

// Binary protobuf format.
// Binary protobuf format. Lagacy header from gRPC implementation.
if (const auto header_bin = headers.get(endpointLoadMetricsHeaderBin()); !header_bin.empty()) {
const auto header_value = header_bin[0]->value().getStringView();
if (header_value.empty()) {
return absl::InvalidArgumentError("ORCA binary header value is empty");
}
const std::string decoded_value = Envoy::Base64::decode(header_value);
if (decoded_value.empty()) {
return absl::InvalidArgumentError(
fmt::format("unable to decode ORCA binary header value: {}", header_value));
}
if (!load_report.ParseFromString(decoded_value)) {
return absl::InvalidArgumentError(
fmt::format("unable to parse binaryheader to OrcaLoadReport: {}", header_value));
}
} else if (const auto header_native_http = headers.get(endpointLoadMetricsHeader());
!header_native_http.empty()) {
// Native HTTP format.
RETURN_IF_NOT_OK(tryParseNativeHttpEncoded(header_native_http, load_report));
} else if (const auto header_json = headers.get(endpointLoadMetricsHeaderJson());
!header_json.empty()) {
// JSON format.
RETURN_IF_NOT_OK(tryParseSerializedBinary(header_value, load_report));
} else if (const auto header = headers.get(endpointLoadMetricsHeader()); !header.empty()) {
std::pair<absl::string_view, absl::string_view> split_header =
absl::StrSplit(header[0]->value().getStringView(), absl::MaxSplits(' ', 1));

if (split_header.first == kHeaderFormatPrefixBin) { // Binary protobuf format.
RETURN_IF_NOT_OK(tryParseSerializedBinary(split_header.second, load_report));
} else if (split_header.first == kHeaderFormatPrefixText) { // Native HTTP format.
RETURN_IF_NOT_OK(tryParseNativeHttpEncoded(split_header.second, load_report));
} else if (split_header.first == kHeaderFormatPrefixJson) { // JSON format.
#if defined(ENVOY_ENABLE_FULL_PROTOS) && defined(ENVOY_ENABLE_YAML)
bool has_unknown_field = false;
const std::string json_string = std::string(header_json[0]->value().getStringView());
RETURN_IF_ERROR(
Envoy::MessageUtil::loadFromJsonNoThrow(json_string, load_report, has_unknown_field));
const std::string json_string = std::string(split_header.second);
bool has_unknown_field = false;
RETURN_IF_ERROR(
Envoy::MessageUtil::loadFromJsonNoThrow(json_string, load_report, has_unknown_field));
#else
IS_ENVOY_BUG("JSON formatted ORCA header support not implemented for this build");
IS_ENVOY_BUG("JSON formatted ORCA header support not implemented for this build");
#endif // !ENVOY_ENABLE_FULL_PROTOS || !ENVOY_ENABLE_YAML
} else {
return absl::InvalidArgumentError(
fmt::format("unsupported ORCA header format: {}", split_header.first));
}
} else {
return absl::NotFoundError("no ORCA data sent from the backend");
}
Expand Down
5 changes: 4 additions & 1 deletion source/common/orca/orca_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ namespace Orca {
// Headers used to send ORCA load metrics from the backend.
static constexpr absl::string_view kEndpointLoadMetricsHeader = "endpoint-load-metrics";
static constexpr absl::string_view kEndpointLoadMetricsHeaderBin = "endpoint-load-metrics-bin";
static constexpr absl::string_view kEndpointLoadMetricsHeaderJson = "endpoint-load-metrics-json";
// Prefix used to determine format expected in kEndpointLoadMetricsHeader.
static constexpr absl::string_view kHeaderFormatPrefixBin = "BIN";
static constexpr absl::string_view kHeaderFormatPrefixJson = "JSON";
static constexpr absl::string_view kHeaderFormatPrefixText = "TEXT";
// The following fields are the names of the metrics tracked in the ORCA load
// report proto.
static constexpr absl::string_view kApplicationUtilizationField = "application_utilization";
Expand Down
109 changes: 84 additions & 25 deletions test/common/orca/orca_parser_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ namespace Envoy {
namespace Orca {
namespace {

const std::string formattedHeaderPrefixText() {
CONSTRUCT_ON_FIRST_USE(std::string, absl::StrCat(kHeaderFormatPrefixText, " "));
}

const std::string formattedHeaderPrefixJson() {
CONSTRUCT_ON_FIRST_USE(std::string, absl::StrCat(kHeaderFormatPrefixJson, " "));
}

const std::string formattedHeaderPrefixBin() {
CONSTRUCT_ON_FIRST_USE(std::string, absl::StrCat(kHeaderFormatPrefixBin, " "));
}

// Returns an example OrcaLoadReport proto with all fields populated.
static xds::data::orca::v3::OrcaLoadReport exampleOrcaLoadReport() {
xds::data::orca::v3::OrcaLoadReport orca_load_report;
Expand Down Expand Up @@ -43,19 +55,38 @@ TEST(OrcaParserUtilTest, MissingOrcaHeaders) {
StatusHelpers::HasStatus(absl::NotFoundError("no ORCA data sent from the backend")));
}

TEST(OrcaParserUtilTest, InvalidOrcaHeaderPrefix) {
// Verify that error is returned when unknown/invalid prefix is found in ORCA
// header value.
Http::TestRequestHeaderMapImpl headers{
{std::string(kEndpointLoadMetricsHeader), "BAD random-value"}};
EXPECT_THAT(
parseOrcaLoadReportHeaders(headers),
StatusHelpers::HasStatus(absl::InvalidArgumentError("unsupported ORCA header format: BAD")));
}

TEST(OrcaParserUtilTest, EmptyOrcaHeader) {
Http::TestRequestHeaderMapImpl headers{{std::string(kEndpointLoadMetricsHeader), ""}};
EXPECT_THAT(
parseOrcaLoadReportHeaders(headers),
StatusHelpers::HasStatus(absl::InvalidArgumentError("unsupported ORCA header format: ")));
}

TEST(OrcaParserUtilTest, NativeHttpEncodedHeader) {
Http::TestRequestHeaderMapImpl headers{
{std::string(kEndpointLoadMetricsHeader),
"cpu_utilization:0.7,application_utilization:0.8,mem_utilization:0.9,"
"rps_fractional:1000,eps:2,"
"named_metrics.foo:123,named_metrics.bar:0.2"}};
absl::StrCat(formattedHeaderPrefixText(),
"cpu_utilization:0.7,application_utilization:0.8,mem_utilization:0.9,"
"rps_fractional:1000,eps:2,"
"named_metrics.foo:123,named_metrics.bar:0.2")}};
EXPECT_THAT(parseOrcaLoadReportHeaders(headers),
StatusHelpers::IsOkAndHolds(ProtoEq(exampleOrcaLoadReport())));
}

TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderIncorrectFieldType) {
Http::TestRequestHeaderMapImpl headers{
{std::string(kEndpointLoadMetricsHeader), "cpu_utilization:\"0.7\""}};
{std::string(kEndpointLoadMetricsHeader),
absl::StrCat(formattedHeaderPrefixText(), "cpu_utilization:\"0.7\"")}};
EXPECT_THAT(parseOrcaLoadReportHeaders(headers),
StatusHelpers::HasStatus(
absl::InvalidArgumentError("unable to parse custom backend load metric "
Expand All @@ -65,7 +96,8 @@ TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderIncorrectFieldType) {
TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderNanMetricValue) {
Http::TestRequestHeaderMapImpl headers{
{std::string(kEndpointLoadMetricsHeader),
absl::StrCat("cpu_utilization:", std::numeric_limits<double>::quiet_NaN())}};
absl::StrCat(formattedHeaderPrefixText(),
"cpu_utilization:", std::numeric_limits<double>::quiet_NaN())}};
EXPECT_THAT(parseOrcaLoadReportHeaders(headers),
StatusHelpers::HasStatus(absl::InvalidArgumentError(
"custom backend load metric value(cpu_utilization) cannot be NaN.")));
Expand All @@ -74,7 +106,8 @@ TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderNanMetricValue) {
TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderInfinityMetricValue) {
Http::TestRequestHeaderMapImpl headers{
{std::string(kEndpointLoadMetricsHeader),
absl::StrCat("cpu_utilization:", std::numeric_limits<double>::infinity())}};
absl::StrCat(formattedHeaderPrefixText(),
"cpu_utilization:", std::numeric_limits<double>::infinity())}};
EXPECT_THAT(parseOrcaLoadReportHeaders(headers),
StatusHelpers::HasStatus(absl::InvalidArgumentError(
"custom backend load metric value(cpu_utilization) cannot be "
Expand All @@ -83,24 +116,28 @@ TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderInfinityMetricValue) {

TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderContainsDuplicateMetric) {
Http::TestRequestHeaderMapImpl headers{
{std::string(kEndpointLoadMetricsHeader), "cpu_utilization:0.7,cpu_utilization:0.8"}};
{std::string(kEndpointLoadMetricsHeader),
absl::StrCat(formattedHeaderPrefixText(), "cpu_utilization:0.7,cpu_utilization:0.8")}};
EXPECT_THAT(parseOrcaLoadReportHeaders(headers),
StatusHelpers::HasStatus(absl::AlreadyExistsError(absl::StrCat(
kEndpointLoadMetricsHeader, " contains duplicate metric: cpu_utilization"))));
}

TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderUnsupportedMetric) {
Http::TestRequestHeaderMapImpl headers{
{std::string(kEndpointLoadMetricsHeader), "cpu_utilization:0.7,unsupported_metric:0.8"}};
{std::string(kEndpointLoadMetricsHeader),
absl::StrCat(formattedHeaderPrefixText(), "cpu_utilization:0.7,unsupported_metric:0.8")}};
EXPECT_THAT(parseOrcaLoadReportHeaders(headers),
StatusHelpers::HasStatus(
absl::InvalidArgumentError("unsupported metric name: unsupported_metric")));
}

TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderContainsDuplicateNamedMetric) {
Http::TestRequestHeaderMapImpl headers{{std::string(kEndpointLoadMetricsHeader),
"named_metrics.foo:123,named_metrics.duplicate:123,"
"named_metrics.duplicate:0.2"}};
Http::TestRequestHeaderMapImpl headers{
{std::string(kEndpointLoadMetricsHeader),
absl::StrCat(
formattedHeaderPrefixText(),
"named_metrics.foo:123,named_metrics.duplicate:123,named_metrics.duplicate:0.2")}};
EXPECT_THAT(
parseOrcaLoadReportHeaders(headers),
StatusHelpers::HasStatus(absl::AlreadyExistsError(absl::StrCat(
Expand All @@ -109,58 +146,66 @@ TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderContainsDuplicateNamedMetric) {

TEST(OrcaParserUtilTest, NativeHttpEncodedHeaderContainsEmptyNamedMetricKey) {
Http::TestRequestHeaderMapImpl headers{
{std::string(kEndpointLoadMetricsHeader), "named_metrics.:123"}};
{std::string(kEndpointLoadMetricsHeader),
absl::StrCat(formattedHeaderPrefixText(), "named_metrics.:123")}};
EXPECT_THAT(parseOrcaLoadReportHeaders(headers),
StatusHelpers::HasStatus(absl::InvalidArgumentError("named metric key is empty.")));
}

TEST(OrcaParserUtilTest, InvalidNativeHttpEncodedHeader) {
Http::TestRequestHeaderMapImpl headers{
{std::string(kEndpointLoadMetricsHeader), "not-a-list-of-key-value-pairs"}};
{std::string(kEndpointLoadMetricsHeader),
absl::StrCat(formattedHeaderPrefixText(), "not-a-list-of-key-value-pairs")}};
EXPECT_THAT(parseOrcaLoadReportHeaders(headers),
StatusHelpers::HasStatus(
absl::InvalidArgumentError("metric values cannot be empty strings")));
}

TEST(OrcaParserUtilTest, JsonHeader) {
Http::TestRequestHeaderMapImpl headers{
{std::string(kEndpointLoadMetricsHeaderJson),
"{\"cpu_utilization\": 0.7, \"application_utilization\": 0.8, "
"\"mem_utilization\": 0.9, \"rps_fractional\": 1000, \"eps\": 2, "
"\"named_metrics\": {\"foo\": 123,\"bar\": 0.2}}"}};
{std::string(kEndpointLoadMetricsHeader),
absl::StrCat(formattedHeaderPrefixJson(),
"{\"cpu_utilization\": 0.7, \"application_utilization\": 0.8, "
"\"mem_utilization\": 0.9, \"rps_fractional\": 1000, \"eps\": 2, "
"\"named_metrics\": {\"foo\": 123,\"bar\": 0.2}}")}};
EXPECT_THAT(parseOrcaLoadReportHeaders(headers),
StatusHelpers::IsOkAndHolds(ProtoEq(exampleOrcaLoadReport())));
}

TEST(OrcaParserUtilTest, InvalidJsonHeader) {
Http::TestRequestHeaderMapImpl headers{
{std::string(kEndpointLoadMetricsHeaderJson), "not-a-valid-json-string"}};
{std::string(kEndpointLoadMetricsHeader),
absl::StrCat(formattedHeaderPrefixJson(), "JSON not-a-valid-json-string")}};
EXPECT_THAT(parseOrcaLoadReportHeaders(headers),
StatusHelpers::HasStatus(absl::StatusCode::kInvalidArgument,
testing::HasSubstr("invalid JSON")));
}

TEST(OrcaParserUtilTest, JsonHeaderUnknownField) {
Http::TestRequestHeaderMapImpl headers{
{std::string(kEndpointLoadMetricsHeaderJson),
"{\"cpu_utilization\": 0.7, \"application_utilization\": 0.8, "
"\"mem_utilization\": 0.9, \"rps_fractional\": 1000, \"eps\": 2, "
"\"unknown_field\": 2,"
"\"named_metrics\": {\"foo\": 123,\"bar\": 0.2}}"}};
{std::string(kEndpointLoadMetricsHeader),
absl::StrCat(formattedHeaderPrefixJson(),
"{\"cpu_utilization\": 0.7, \"application_utilization\": 0.8, "
"\"mem_utilization\": 0.9, \"rps_fractional\": 1000, \"eps\": 2, "
"\"unknown_field\": 2,"
"\"named_metrics\": {\"foo\": 123,\"bar\": 0.2}}")}};
EXPECT_THAT(parseOrcaLoadReportHeaders(headers),
StatusHelpers::HasStatus(absl::StatusCode::kInvalidArgument,
testing::HasSubstr("invalid JSON")));
}

TEST(OrcaParserUtilTest, JsonHeaderIncorrectFieldType) {
Http::TestRequestHeaderMapImpl headers{
{std::string(kEndpointLoadMetricsHeaderJson), "{\"cpu_utilization\": \"0.7\""}};
{std::string(kEndpointLoadMetricsHeader),
absl::StrCat(formattedHeaderPrefixJson(), "{\"cpu_utilization\": \"0.7\"")}};
EXPECT_THAT(parseOrcaLoadReportHeaders(headers),
StatusHelpers::HasStatus(absl::StatusCode::kInvalidArgument,
testing::HasSubstr("invalid JSON")));
}

TEST(OrcaParserUtilTest, BinaryHeader) {
TEST(OrcaParserUtilTest, LegacyBinaryHeader) {
// Verify processing of headers sent in legacy ORCA header inherited from gRPC
// implmentation works as intended.
const std::string proto_string =
TestUtility::getProtobufBinaryStringFromMessage(exampleOrcaLoadReport());
const auto orca_load_report_header_bin =
Expand All @@ -171,6 +216,20 @@ TEST(OrcaParserUtilTest, BinaryHeader) {
StatusHelpers::IsOkAndHolds(ProtoEq(exampleOrcaLoadReport())));
}

TEST(OrcaParserUtilTest, BinaryHeader) {
// Verify serialized binary header processing when using default ORCA header
// and appropriate format prefix in the header value.
const std::string proto_string =
TestUtility::getProtobufBinaryStringFromMessage(exampleOrcaLoadReport());
const auto orca_load_report_header_bin =
Envoy::Base64::encode(proto_string.c_str(), proto_string.length());
Http::TestRequestHeaderMapImpl headers{
{std::string(kEndpointLoadMetricsHeader),
absl::StrCat(formattedHeaderPrefixBin(), orca_load_report_header_bin)}};
EXPECT_THAT(parseOrcaLoadReportHeaders(headers),
StatusHelpers::IsOkAndHolds(ProtoEq(exampleOrcaLoadReport())));
}

TEST(OrcaParserUtilTest, InvalidBinaryHeader) {
const std::string proto_string =
TestUtility::getProtobufBinaryStringFromMessage(exampleOrcaLoadReport());
Expand Down

0 comments on commit 691e5f4

Please sign in to comment.