Skip to content

Commit

Permalink
x-pack/filebeat/input/http_endpoint: fix handling of deeply nested nu…
Browse files Browse the repository at this point in the history
…meric values (#40115)

In order to avoid precision loss during processing of numeric values
that cannot be exactly represented in a double-precision floating point
value, the json.Number unmarshaller is used. This has the effect that
conversion to native types on exit from the CEL program fails since
protobuf does not handle json.Number. For non-nested values that are
referenced by the CEL runtime, this does not caues an issue since the
native-to-value conversion will attempt to convert the string
representation of the numeric value. This conversion is not recursively
applied.

Recursively apply the type conversion to all child values on referencing
a value; converting numeric values to the most exact type and falling
back to a string representation if no native numeric represetation is
possible.

Also add an undocumented debug function equivalent to the debug that is
available in the CEL input and improve error logging.
  • Loading branch information
efd6 authored Jul 8, 2024
1 parent 2cb3a86 commit f0401c6
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix filestream not correctly tracking the offset of a file when using the `include_message` parser. {pull}39873[39873] {issue}39653[39653]
- Upgrade github.com/hashicorp/go-retryablehttp to mitigate CVE-2024-6104 {pull}40036[40036]
- Fix for Google Workspace duplicate events issue by adding canonical sorting over fingerprint keys array to maintain key order. {pull}40055[40055] {issue}39859[39859]
- Fix handling of deeply nested numeric values in HTTP Endpoint CEL programs. {pull}40115[40115]

*Heartbeat*

Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ In certain scenarios when the source of the request is not able to do that, it c

The normal operation of the input treats the body either as a single event when the body is an object, or as a set of events when the body is an array. If the body should be handled differently, for example a set of events in an array field of an object to be handled as a set of events, then a https://opensource.google.com/projects/cel[Common Expression Language (CEL)] program can be provided through this configuration field. The name of the object in the CEL program is `obj`. No CEL extensions are provided beyond the function in the CEL https://github.com/google/cel-spec/blob/master/doc/langdef.md#standard[standard library]. CEL https://pkg.go.dev/github.com/google/cel-go/cel#OptionalTypes[optional types] are supported.

Note that during evaluation, numbers that are not representable exactly within a double floating point value will be converted to a string to avoid data corruption.

[float]
==== `response_code`

Expand Down
45 changes: 40 additions & 5 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ func getTimeoutWait(u *url.URL, log *logp.Logger) (time.Duration, error) {
}

func (h *handler) sendAPIErrorResponse(txID string, w http.ResponseWriter, r *http.Request, log *logp.Logger, status int, apiError error) {
log.Errorw("request error", "tx_id", txID, "status_code", status, "error", apiError)

w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)

Expand Down Expand Up @@ -382,7 +384,7 @@ func decodeJSON(body io.Reader, prg *program) (objs []mapstr.M, rawMessages []js
objs = append(objs, nobjs...)
rawMessages = append(rawMessages, nrawMessages...)
default:
return nil, nil, errUnsupportedType
return nil, nil, fmt.Errorf("%w: %T", errUnsupportedType, v)
}
}
for i := range objs {
Expand All @@ -396,7 +398,7 @@ type program struct {
ast *cel.Ast
}

func newProgram(src string) (*program, error) {
func newProgram(src string, log *logp.Logger) (*program, error) {
if src == "" {
return nil, nil
}
Expand All @@ -410,6 +412,7 @@ func newProgram(src string) (*program, error) {
cel.OptionalTypes(cel.OptionalTypesVersion(lib.OptionalTypesVersion)),
cel.CustomTypeAdapter(&numberAdapter{registry}),
cel.CustomTypeProvider(registry),
lib.Debug(debug(log)),
)
if err != nil {
return nil, fmt.Errorf("failed to create env: %w", err)
Expand All @@ -427,22 +430,54 @@ func newProgram(src string) (*program, error) {
return &program{prg: prg, ast: ast}, nil
}

func debug(log *logp.Logger) func(string, any) {
log = log.Named("http_endpoint_cel_debug")
return func(tag string, value any) {
level := "DEBUG"
if _, ok := value.(error); ok {
level = "ERROR"
}
log.Debugw(level, "tag", tag, "value", value)
}
}

var _ types.Adapter = (*numberAdapter)(nil)

type numberAdapter struct {
fallback types.Adapter
}

func (a *numberAdapter) NativeToValue(value any) ref.Val {
if n, ok := value.(json.Number); ok {
switch value := value.(type) {
case []any:
for i, v := range value {
value[i] = a.NativeToValue(v)
}
case map[string]any:
for k, v := range value {
value[k] = a.NativeToValue(v)
}
case json.Number:
var errs []error
i, err := n.Int64()
i, err := value.Int64()
if err == nil {
return types.Int(i)
}
errs = append(errs, err)
f, err := n.Float64()
f, err := value.Float64()
if err == nil {
// Literalise floats that could have been an integer greater than
// can be stored without loss of precision in a double.
// This is any integer wider than the IEEE-754 double mantissa.
// As a heuristic, allow anything that includes a decimal point
// or uses scientific notation. We could be more careful, but
// it is likely not important, and other languages use the same
// rule.
if f >= 0x1p53 && !strings.ContainsFunc(string(value), func(r rune) bool {
return r == '.' || r == 'e' || r == 'E'
}) {
return types.String(value)
}
return types.Double(f)
}
errs = append(errs, err)
Expand Down
38 changes: 30 additions & 8 deletions x-pack/filebeat/input/http_endpoint/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ var withTraces = flag.Bool("log-traces", false, "specify logging request traces
const traceLogsDir = "trace_logs"

func Test_httpReadJSON(t *testing.T) {
log := logp.NewLogger("http_endpoint_test")

tests := []struct {
name string
body string
Expand Down Expand Up @@ -143,32 +145,52 @@ func Test_httpReadJSON(t *testing.T) {
"timestamp": 1578090901599,
"records": [
{
"data": "aGVsbG8="
"data": "aGVsbG8=",
"number": 1
},
{
"data": "c21hbGwgd29ybGQ=",
"number": 9007199254740991
},
{
"data": "aGVsbG8gd29ybGQ=",
"number": 9007199254740992
},
{
"data": "YmlnIHdvcmxk",
"number": 9223372036854775808
},
{
"data": "aGVsbG8gd29ybGQ="
"data": "d2lsbCBpdCBiZSBmcmllbmRzIHdpdGggbWU=",
"number": 3.14
}
]
}`,
program: `obj.records.map(r, {
"requestId": obj.requestId,
"requestId": debug("REQID", obj.requestId),
"timestamp": string(obj.timestamp), // leave timestamp in unix milli for ingest to handle.
"event": r,
})`,
wantRawMessage: []json.RawMessage{
[]byte(`{"event":{"data":"aGVsbG8="},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
[]byte(`{"event":{"data":"aGVsbG8gd29ybGQ="},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
[]byte(`{"event":{"data":"aGVsbG8=","number":1},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
[]byte(`{"event":{"data":"c21hbGwgd29ybGQ=","number":9007199254740991},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
[]byte(`{"event":{"data":"aGVsbG8gd29ybGQ=","number":"9007199254740992"},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
[]byte(`{"event":{"data":"YmlnIHdvcmxk","number":"9223372036854775808"},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
[]byte(`{"event":{"data":"d2lsbCBpdCBiZSBmcmllbmRzIHdpdGggbWU=","number":3.14},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
},
wantObjs: []mapstr.M{
{"event": map[string]any{"data": "aGVsbG8="}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
{"event": map[string]any{"data": "aGVsbG8gd29ybGQ="}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
{"event": map[string]any{"data": "aGVsbG8=", "number": int64(1)}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
{"event": map[string]any{"data": "c21hbGwgd29ybGQ=", "number": int64(9007199254740991)}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
{"event": map[string]any{"data": "aGVsbG8gd29ybGQ=", "number": "9007199254740992"}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
{"event": map[string]any{"data": "YmlnIHdvcmxk", "number": "9223372036854775808"}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
{"event": map[string]any{"data": "d2lsbCBpdCBiZSBmcmllbmRzIHdpdGggbWU=", "number": 3.14}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
},
wantStatus: http.StatusOK,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
prg, err := newProgram(tt.program)
prg, err := newProgram(tt.program, log)
if err != nil {
t.Fatalf("failed to compile program: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub func(beat.Event), metr

var prg *program
if e.config.Program != "" {
prg, err = newProgram(e.config.Program)
prg, err = newProgram(e.config.Program, log)
if err != nil {
return err
}
Expand Down

0 comments on commit f0401c6

Please sign in to comment.