Skip to content

Commit

Permalink
Propagating headers to log on AM
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Protasio <[email protected]>
  • Loading branch information
alanprot committed Nov 2, 2023
1 parent 752c354 commit 1e41bac
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 3 deletions.
24 changes: 23 additions & 1 deletion pkg/alertmanager/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"math/rand"
"net/http"
"net/textproto"
"path"
"strings"
"sync"
Expand Down Expand Up @@ -39,10 +40,12 @@ type Distributor struct {
alertmanagerClientsPool ClientsPool

logger log.Logger

targetHeaders []string
}

// NewDistributor constructs a new Distributor
func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *ring.Ring, alertmanagerClientsPool ClientsPool, logger log.Logger, reg prometheus.Registerer) (d *Distributor, err error) {
func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *ring.Ring, alertmanagerClientsPool ClientsPool, targetHeaders []string, logger log.Logger, reg prometheus.Registerer) (d *Distributor, err error) {
if alertmanagerClientsPool == nil {
alertmanagerClientsPool = newAlertmanagerClientsPool(client.NewRingServiceDiscovery(alertmanagersRing), cfg, logger, reg)
}
Expand All @@ -53,6 +56,7 @@ func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *r
maxRecvMsgSize: maxRecvMsgSize,
alertmanagerRing: alertmanagersRing,
alertmanagerClientsPool: alertmanagerClientsPool,
targetHeaders: targetHeaders,
}

d.Service = services.NewBasicService(nil, d.running, nil)
Expand Down Expand Up @@ -170,9 +174,11 @@ func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Req
var responses []*httpgrpc.HTTPResponse
var responsesMtx sync.Mutex
grpcHeaders := httpToHttpgrpcHeaders(r.Header)

err = ring.DoBatch(r.Context(), RingOp, d.alertmanagerRing, []uint32{shardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error {
// Use a background context to make sure all alertmanagers get the request even if we return early.
localCtx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), userID), opentracing.SpanFromContext(r.Context()))

sp, localCtx := opentracing.StartSpanFromContext(localCtx, "Distributor.doQuorum")
defer sp.Finish()

Expand Down Expand Up @@ -305,6 +311,22 @@ func (d *Distributor) doRequest(ctx context.Context, am ring.InstanceDesc, req *
return nil, errors.Wrapf(err, "failed to get alertmanager client from pool (alertmanager address: %s)", am.Addr)
}

headers := make(map[string]string, 0)

for _, h := range req.Headers {
headers[h.Key] = h.Values[0]
}

headerMap := make(map[string]string, 0)
// Remove non-existent header.
for _, header := range d.targetHeaders {
if v, ok := headers[textproto.CanonicalMIMEHeaderKey(header)]; ok {
headerMap[header] = v
}
}

ctx = util_log.ContextWithHeaderMap(ctx, headerMap)

return amClient.HandleRequest(ctx, req)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/alertmanager/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func prepare(t *testing.T, numAM, numHappyAM, replicationFactor int, responseBod
cfg := &MultitenantAlertmanagerConfig{}
flagext.DefaultValues(cfg)

d, err := NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, amRing, newMockAlertmanagerClientFactory(amByAddr), util_log.Logger, prometheus.NewRegistry())
d, err := NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, amRing, newMockAlertmanagerClientFactory(amByAddr), []string{}, util_log.Logger, prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))

Expand Down
4 changes: 3 additions & 1 deletion pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type MultitenantAlertmanagerConfig struct {

EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`

TargetHeaders []string `yaml:"-"` // Propagated by config.
}

type ClusterConfig struct {
Expand Down Expand Up @@ -424,7 +426,7 @@ func createMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, fallbackC
am.grpcServer = server.NewServer(&handlerForGRPCServer{am: am})

am.alertmanagerClientsPool = newAlertmanagerClientsPool(client.NewRingServiceDiscovery(am.ring), cfg.AlertmanagerClient, logger, am.registry)
am.distributor, err = NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, am.ring, am.alertmanagerClientsPool, log.With(logger, "component", "AlertmanagerDistributor"), am.registry)
am.distributor, err = NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, am.ring, am.alertmanagerClientsPool, cfg.TargetHeaders, log.With(logger, "component", "AlertmanagerDistributor"), am.registry)
if err != nil {
return nil, errors.Wrap(err, "create distributor")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ func (t *Cortex) initConfig() (serv services.Service, err error) {

func (t *Cortex) initAlertManager() (serv services.Service, err error) {
t.Cfg.Alertmanager.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Alertmanager.TargetHeaders = t.Cfg.API.HTTPRequestHeadersToLog

// Initialise the store.
store, err := alertstore.NewAlertStore(context.Background(), t.Cfg.AlertmanagerStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer)
Expand Down

0 comments on commit 1e41bac

Please sign in to comment.