Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/{cel,httpjson,http_endpoint,internal/httplog}: add debugging bread crumbs #38636

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Parse more fields from Elasticsearch slowlogs {pull}38295[38295]
- Update CEL mito extensions to v1.10.0 to add keys/values helper. {pull}38504[38504]
- Add support for Active Directory an entity analytics provider. {pull}37919[37919]
- Add debugging breadcrumb to logs when writing request trace log. {pull}38636[38636]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@

const margin = 1e3 // 1OkB ought to be enough room for all the remainder of the trace details.
maxSize := cfg.Resource.Tracer.MaxSize * 1e6
trace = httplog.NewLoggingRoundTripper(c.Transport, traceLogger, max(0, maxSize-margin))
trace = httplog.NewLoggingRoundTripper(c.Transport, traceLogger, max(0, maxSize-margin), log)
c.Transport = trace
}

Expand Down Expand Up @@ -1156,7 +1156,7 @@
// walkMap walks to all ends of the provided path in m and applies fn to the
// final element of each walk. Nested arrays are not handled.
func walkMap(m mapstr.M, path string, fn func(parent mapstr.M, key string)) {
key, rest, more := strings.Cut(path, ".")

Check failure on line 1159 in x-pack/filebeat/input/cel/input.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

rest declared and not used (typecheck)
v, ok := m[key]
if !ok {
return
Expand Down
25 changes: 20 additions & 5 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
"net"
"net/http"
"reflect"
"strconv"
"time"

"github.com/google/cel-go/cel"
"github.com/google/cel-go/checker/decls"
"github.com/google/cel-go/common/types"
"github.com/google/cel-go/common/types/ref"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/types/known/structpb"
Expand All @@ -42,10 +44,12 @@ var (
)

type handler struct {
metrics *inputMetrics
publisher stateless.Publisher
log *logp.Logger
validator apiValidator
metrics *inputMetrics
publisher stateless.Publisher
log *logp.Logger
validator apiValidator
txBaseID string // Random value to make transaction IDs unique.
txIDCounter *atomic.Uint64 // Transaction ID counter that is incremented for each request.

reqLogger *zap.Logger
host, scheme string
Expand Down Expand Up @@ -185,9 +189,11 @@ func (h *handler) logRequest(r *http.Request, status int, respBody []byte) {
zap.ByteString("http.response.body.content", respBody),
)
}
txID := h.nextTxID()
h.log.Debugw("new request trace transaction", "id", txID)
// Limit request logging body size to 10kiB.
const maxBodyLen = 10 * (1 << 10)
httplog.LogRequest(h.reqLogger, r, maxBodyLen, extra...)
httplog.LogRequest(h.reqLogger.With(zap.String("transaction.id", txID)), r, maxBodyLen, extra...)
if scheme != "" {
r.URL.Scheme = scheme
}
Expand All @@ -196,6 +202,15 @@ func (h *handler) logRequest(r *http.Request, status int, respBody []byte) {
}
}

func (h *handler) nextTxID() string {
count := h.txIDCounter.Inc()
return h.formatTxID(count)
}

func (h *handler) formatTxID(count uint64) string {
return h.txBaseID + "-" + strconv.FormatUint(count, 10)
}

func (h *handler) sendResponse(w http.ResponseWriter, status int, message string) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)
Expand Down
15 changes: 14 additions & 1 deletion x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package http_endpoint
import (
"context"
"crypto/tls"
"encoding/base32"
"encoding/binary"
"errors"
"fmt"
"net"
Expand All @@ -18,6 +20,7 @@ import (

"github.com/rcrowley/go-metrics"
"go.elastic.co/ecszap"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -297,7 +300,10 @@ func (s *server) getErr() error {

func newHandler(ctx context.Context, c config, prg *program, pub stateless.Publisher, log *logp.Logger, metrics *inputMetrics) http.Handler {
h := &handler{
log: log,
log: log,
txBaseID: newID(),
txIDCounter: atomic.NewUint64(0),

publisher: pub,
metrics: metrics,
validator: apiValidator{
Expand Down Expand Up @@ -344,6 +350,13 @@ func newHandler(ctx context.Context, c config, prg *program, pub stateless.Publi
return h
}

// newID returns an ID derived from the current time.
func newID() string {
var data [8]byte
binary.LittleEndian.PutUint64(data[:], uint64(time.Now().UnixNano()))
return base32.HexEncoding.WithPadding(base32.NoPadding).EncodeToString(data[:])
}

// inputMetrics handles the input's metric reporting.
type inputMetrics struct {
unregister func()
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func newNetHTTPClient(ctx context.Context, cfg *requestConfig, log *logp.Logger,
if maxSize < 0 {
maxSize = 0
}
netHTTPClient.Transport = httplog.NewLoggingRoundTripper(netHTTPClient.Transport, traceLogger, maxSize)
netHTTPClient.Transport = httplog.NewLoggingRoundTripper(netHTTPClient.Transport, traceLogger, maxSize, log)
}

if reg != nil {
Expand Down
17 changes: 11 additions & 6 deletions x-pack/filebeat/input/internal/httplog/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"strconv"
"time"

"github.com/elastic/elastic-agent-libs/logp"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -31,24 +32,26 @@ const TraceIDKey = contextKey("trace.id")
type contextKey string

// NewLoggingRoundTripper returns a LoggingRoundTripper that logs requests and
// responses to the provided logger.
func NewLoggingRoundTripper(next http.RoundTripper, logger *zap.Logger, maxBodyLen int) *LoggingRoundTripper {
// responses to the provided logger. Transaction creation is logged to log.
func NewLoggingRoundTripper(next http.RoundTripper, logger *zap.Logger, maxBodyLen int, log *logp.Logger) *LoggingRoundTripper {
return &LoggingRoundTripper{
transport: next,
maxBodyLen: maxBodyLen,
logger: logger,
txLog: logger,
txBaseID: newID(),
txIDCounter: atomic.NewUint64(0),
log: log,
}
}

// LoggingRoundTripper is an http.RoundTripper that logs requests and responses.
type LoggingRoundTripper struct {
transport http.RoundTripper
maxBodyLen int // The maximum length of a body. Longer bodies will be truncated.
logger *zap.Logger // Destination logger.
txLog *zap.Logger // Destination logger.
txBaseID string // Random value to make transaction IDs unique.
txIDCounter *atomic.Uint64 // Transaction ID counter that is incremented for each request.
log *logp.Logger
}

// RoundTrip implements the http.RoundTripper interface, logging
Expand Down Expand Up @@ -80,8 +83,10 @@ type LoggingRoundTripper struct {
// event.original (the response without body from httputil.DumpResponse)
func (rt *LoggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
// Create a child logger for this request.
log := rt.logger.With(
zap.String("transaction.id", rt.nextTxID()),
txID := rt.nextTxID()
rt.log.Debugw("new request trace transaction", "id", txID)
log := rt.txLog.With(
zap.String("transaction.id", txID),
)

if v := req.Context().Value(TraceIDKey); v != nil {
Expand Down
Loading