Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/http_endpoint: fix handling of deeply nested numeric values #40115

Merged
merged 1 commit into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix filestream not correctly tracking the offset of a file when using the `include_message` parser. {pull}39873[39873] {issue}39653[39653]
- Upgrade github.com/hashicorp/go-retryablehttp to mitigate CVE-2024-6104 {pull}40036[40036]
- Fix for Google Workspace duplicate events issue by adding canonical sorting over fingerprint keys array to maintain key order. {pull}40055[40055] {issue}39859[39859]
- Fix handling of deeply nested numeric values in HTTP Endpoint CEL programs. {pull}40115[40115]

*Heartbeat*

Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ In certain scenarios when the source of the request is not able to do that, it c

The normal operation of the input treats the body either as a single event when the body is an object, or as a set of events when the body is an array. If the body should be handled differently, for example a set of events in an array field of an object to be handled as a set of events, then a https://opensource.google.com/projects/cel[Common Expression Language (CEL)] program can be provided through this configuration field. The name of the object in the CEL program is `obj`. No CEL extensions are provided beyond the function in the CEL https://github.com/google/cel-spec/blob/master/doc/langdef.md#standard[standard library]. CEL https://pkg.go.dev/github.com/google/cel-go/cel#OptionalTypes[optional types] are supported.

Note that during evaluation, numbers that are not representable exactly within a double floating point value will be converted to a string to avoid data corruption.

[float]
==== `response_code`

Expand Down
45 changes: 40 additions & 5 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ func getTimeoutWait(u *url.URL, log *logp.Logger) (time.Duration, error) {
}

func (h *handler) sendAPIErrorResponse(txID string, w http.ResponseWriter, r *http.Request, log *logp.Logger, status int, apiError error) {
log.Errorw("request error", "tx_id", txID, "status_code", status, "error", apiError)

w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)

Expand Down Expand Up @@ -382,7 +384,7 @@ func decodeJSON(body io.Reader, prg *program) (objs []mapstr.M, rawMessages []js
objs = append(objs, nobjs...)
rawMessages = append(rawMessages, nrawMessages...)
default:
return nil, nil, errUnsupportedType
return nil, nil, fmt.Errorf("%w: %T", errUnsupportedType, v)
}
}
for i := range objs {
Expand All @@ -396,7 +398,7 @@ type program struct {
ast *cel.Ast
}

func newProgram(src string) (*program, error) {
func newProgram(src string, log *logp.Logger) (*program, error) {
if src == "" {
return nil, nil
}
Expand All @@ -410,6 +412,7 @@ func newProgram(src string) (*program, error) {
cel.OptionalTypes(cel.OptionalTypesVersion(lib.OptionalTypesVersion)),
cel.CustomTypeAdapter(&numberAdapter{registry}),
cel.CustomTypeProvider(registry),
lib.Debug(debug(log)),
)
if err != nil {
return nil, fmt.Errorf("failed to create env: %w", err)
Expand All @@ -427,22 +430,54 @@ func newProgram(src string) (*program, error) {
return &program{prg: prg, ast: ast}, nil
}

func debug(log *logp.Logger) func(string, any) {
log = log.Named("http_endpoint_cel_debug")
return func(tag string, value any) {
level := "DEBUG"
if _, ok := value.(error); ok {
level = "ERROR"
}
log.Debugw(level, "tag", tag, "value", value)
}
}

var _ types.Adapter = (*numberAdapter)(nil)

type numberAdapter struct {
fallback types.Adapter
}

func (a *numberAdapter) NativeToValue(value any) ref.Val {
if n, ok := value.(json.Number); ok {
switch value := value.(type) {
case []any:
for i, v := range value {
value[i] = a.NativeToValue(v)
}
case map[string]any:
for k, v := range value {
value[k] = a.NativeToValue(v)
}
case json.Number:
var errs []error
i, err := n.Int64()
i, err := value.Int64()
if err == nil {
return types.Int(i)
}
errs = append(errs, err)
f, err := n.Float64()
f, err := value.Float64()
if err == nil {
// Literalise floats that could have been an integer greater than
// can be stored without loss of precision in a double.
// This is any integer wider than the IEEE-754 double mantissa.
// As a heuristic, allow anything that includes a decimal point
// or uses scientific notation. We could be more careful, but
// it is likely not important, and other languages use the same
// rule.
if f >= 0x1p53 && !strings.ContainsFunc(string(value), func(r rune) bool {
return r == '.' || r == 'e' || r == 'E'
}) {
return types.String(value)
}
return types.Double(f)
}
errs = append(errs, err)
Expand Down
38 changes: 30 additions & 8 deletions x-pack/filebeat/input/http_endpoint/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ var withTraces = flag.Bool("log-traces", false, "specify logging request traces
const traceLogsDir = "trace_logs"

func Test_httpReadJSON(t *testing.T) {
log := logp.NewLogger("http_endpoint_test")

tests := []struct {
name string
body string
Expand Down Expand Up @@ -143,32 +145,52 @@ func Test_httpReadJSON(t *testing.T) {
"timestamp": 1578090901599,
"records": [
{
"data": "aGVsbG8="
"data": "aGVsbG8=",
"number": 1
},
{
"data": "c21hbGwgd29ybGQ=",
"number": 9007199254740991
},
{
"data": "aGVsbG8gd29ybGQ=",
"number": 9007199254740992
},
{
"data": "YmlnIHdvcmxk",
"number": 9223372036854775808
},
{
"data": "aGVsbG8gd29ybGQ="
"data": "d2lsbCBpdCBiZSBmcmllbmRzIHdpdGggbWU=",
"number": 3.14
}
]
}`,
program: `obj.records.map(r, {
"requestId": obj.requestId,
"requestId": debug("REQID", obj.requestId),
"timestamp": string(obj.timestamp), // leave timestamp in unix milli for ingest to handle.
"event": r,
})`,
wantRawMessage: []json.RawMessage{
[]byte(`{"event":{"data":"aGVsbG8="},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
[]byte(`{"event":{"data":"aGVsbG8gd29ybGQ="},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
[]byte(`{"event":{"data":"aGVsbG8=","number":1},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
[]byte(`{"event":{"data":"c21hbGwgd29ybGQ=","number":9007199254740991},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
[]byte(`{"event":{"data":"aGVsbG8gd29ybGQ=","number":"9007199254740992"},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
[]byte(`{"event":{"data":"YmlnIHdvcmxk","number":"9223372036854775808"},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
[]byte(`{"event":{"data":"d2lsbCBpdCBiZSBmcmllbmRzIHdpdGggbWU=","number":3.14},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
},
wantObjs: []mapstr.M{
{"event": map[string]any{"data": "aGVsbG8="}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
{"event": map[string]any{"data": "aGVsbG8gd29ybGQ="}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
{"event": map[string]any{"data": "aGVsbG8=", "number": int64(1)}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
{"event": map[string]any{"data": "c21hbGwgd29ybGQ=", "number": int64(9007199254740991)}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
{"event": map[string]any{"data": "aGVsbG8gd29ybGQ=", "number": "9007199254740992"}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
{"event": map[string]any{"data": "YmlnIHdvcmxk", "number": "9223372036854775808"}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
{"event": map[string]any{"data": "d2lsbCBpdCBiZSBmcmllbmRzIHdpdGggbWU=", "number": 3.14}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
},
wantStatus: http.StatusOK,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
prg, err := newProgram(tt.program)
prg, err := newProgram(tt.program, log)
if err != nil {
t.Fatalf("failed to compile program: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub func(beat.Event), metr

var prg *program
if e.config.Program != "" {
prg, err = newProgram(e.config.Program)
prg, err = newProgram(e.config.Program, log)
if err != nil {
return err
}
Expand Down
Loading