Skip to content

Commit

Permalink
Merge pull request #153 from fairfaxmedia/feature/aggregate-datacente…
Browse files Browse the repository at this point in the history
…r-combined

Implement aggregated datacenter metrics
  • Loading branch information
leklund authored May 29, 2024
2 parents f895b40 + 5582807 commit 11aba6d
Show file tree
Hide file tree
Showing 8 changed files with 662 additions and 16 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,19 @@ Different flags (for the same filter target) combine with AND semantics. For
example, `-metric-allowlist 'bytes_total$' -metric-blocklist imgopto` would only
export metrics whose names ended in bytes_total, but didn't include imgopto.

### Metrics Grouping: by datacenter or aggregate

The Fastly real-time stats API returns measurements grouped by datacenter as
well as aggregated measurements for all datacenters. By default, exported
metrics are grouped by datacenter. The response body size of the metrics
endpoint can potentially be very large. This will be exacerbated when using
the exporter with many services, many origins with Origin Inspector, and many
domains with Domain Inspector. One way to reduce the output size of the
metrics endpoint is by using the `-aggregate-only` flag. When this flag is
used only the `aggregated` metrics from the real-time stats API will be
exported. Metrics will still include the datacenter label but it will always
be set to "aggregate".

### Service discovery

Per-service metrics are available via `/metrics?target=<service ID>`. Available
Expand Down
3 changes: 3 additions & 0 deletions cmd/fastly-exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func main() {
serviceRefresh time.Duration
apiTimeout time.Duration
rtTimeout time.Duration
aggregateOnly bool
debug bool
versionFlag bool
configFileExample bool
Expand All @@ -67,6 +68,7 @@ func main() {
fs.DurationVar(&serviceRefresh, "api-refresh", 1*time.Minute, "DEPRECATED -- use service-refresh instead")
fs.DurationVar(&apiTimeout, "api-timeout", 15*time.Second, "HTTP client timeout for api.fastly.com requests (5–60s)")
fs.DurationVar(&rtTimeout, "rt-timeout", 45*time.Second, "HTTP client timeout for rt.fastly.com requests (45–120s)")
fs.BoolVar(&aggregateOnly, "aggregate-only", false, "Use aggregated data rather than per-datacenter")
fs.BoolVar(&debug, "debug", false, "log debug information")
fs.BoolVar(&versionFlag, "version", false, "print version information and exit")
fs.String("config-file", "", "config file (optional)")
Expand Down Expand Up @@ -341,6 +343,7 @@ func main() {
subscriberOptions = []rt.SubscriberOption{
rt.WithLogger(rtLogger),
rt.WithMetadataProvider(serviceCache),
rt.WithAggregateOnly(aggregateOnly),
}
)
manager = rt.NewManager(serviceCache, rtClient, token, registry, subscriberOptions, productCache, rtLogger)
Expand Down
12 changes: 11 additions & 1 deletion pkg/domain/process.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
package domain

// Process updates the metrics with data from the API response.
func Process(response *Response, serviceID, serviceName, serviceVersion string, m *Metrics) {
func Process(response *Response, serviceID, serviceName, _ string, m *Metrics, aggregateOnly bool) {
const aggregateDC = "aggregate"

for _, d := range response.Data {
if aggregateOnly {
for domain, stats := range d.Aggregated {
process(serviceID, serviceName, aggregateDC, domain, stats, m)
}

continue
}

for datacenter, byDomain := range d.Datacenter {
for domain, stats := range byDomain {
process(serviceID, serviceName, datacenter, domain, stats, m)
Expand Down
12 changes: 11 additions & 1 deletion pkg/origin/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,18 @@ const (
)

// Process updates the metrics with data from the API response.
func Process(response *Response, serviceID, serviceName, serviceVersion string, m *Metrics) {
func Process(response *Response, serviceID, serviceName, _ string, m *Metrics, aggregateOnly bool) {
const aggregateDC = "aggregate"

for _, d := range response.Data {
if aggregateOnly {
for origin, stats := range d.Aggregated {
process(serviceID, serviceName, aggregateDC, origin, stats, m)
}

continue
}

for datacenter, byOrigin := range d.Datacenter {
for origin, stats := range byOrigin {
process(serviceID, serviceName, datacenter, origin, stats, m)
Expand Down
10 changes: 9 additions & 1 deletion pkg/realtime/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,16 @@ import (
)

// Process updates the metrics with data from the API response.
func Process(response *Response, serviceID, serviceName, serviceVersion string, m *Metrics) {
func Process(response *Response, serviceID, serviceName, _ string, m *Metrics, aggregateOnly bool) {
const aggregateDC = "aggregate"

for _, d := range response.Data {
if aggregateOnly {
process(serviceID, serviceName, aggregateDC, d.Aggregated, m)

continue
}

for datacenter, stats := range d.Datacenter {
process(serviceID, serviceName, datacenter, stats, m)
}
Expand Down
450 changes: 450 additions & 0 deletions pkg/rt/common_test.go

Large diffs are not rendered by default.

34 changes: 21 additions & 13 deletions pkg/rt/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@ type MetadataProvider interface {
// Subscriber polls rt.fastly.com endpoints for a single service ID. It emits
// the received stats data to Prometheus metrics.
type Subscriber struct {
client HTTPClient
token string
serviceID string
provider MetadataProvider
metrics *prom.Metrics
postprocess func()
logger log.Logger
rtDelayCount int
oiDelayCount int
diDelayCount int
client HTTPClient
token string
serviceID string
provider MetadataProvider
metrics *prom.Metrics
postprocess func()
logger log.Logger
rtDelayCount int
oiDelayCount int
diDelayCount int
aggregateOnly bool
}

// SubscriberOption provides some additional behavior to a subscriber.
Expand All @@ -71,6 +72,13 @@ func WithPostprocess(f func()) SubscriberOption {
return func(s *Subscriber) { s.postprocess = f }
}

// WithAggregateOnly sets whether aggregate metrics are output instead of
// per datacenter metrics. By default, per datacenter are provided. Enabling
// this feature will significantly reduce the payload size of the metrics endpoint.
func WithAggregateOnly(aggregateOnly bool) SubscriberOption {
return func(s *Subscriber) { s.aggregateOnly = aggregateOnly }
}

// NewSubscriber returns a ready-to-use subscriber. Callers must be sure to
// invoke the Run method of the returned subscriber in order to actually update
// any metrics.
Expand Down Expand Up @@ -223,7 +231,7 @@ func (s *Subscriber) queryRealtime(ctx context.Context, ts uint64) (currentName
s.rtDelayCount = 0
result = apiResultSuccess
}
realtime.Process(&response, s.serviceID, name, version, s.metrics.Realtime)
realtime.Process(&response, s.serviceID, name, version, s.metrics.Realtime, s.aggregateOnly)
s.postprocess()

case http.StatusUnauthorized, http.StatusForbidden:
Expand Down Expand Up @@ -287,7 +295,7 @@ func (s *Subscriber) queryOrigins(ctx context.Context, ts uint64) (currentName s
s.oiDelayCount = 0
result = apiResultSuccess
}
origin.Process(&response, s.serviceID, name, version, s.metrics.Origin)
origin.Process(&response, s.serviceID, name, version, s.metrics.Origin, s.aggregateOnly)
s.postprocess()

case http.StatusUnauthorized, http.StatusForbidden:
Expand Down Expand Up @@ -351,7 +359,7 @@ func (s *Subscriber) queryDomains(ctx context.Context, ts uint64) (currentName s
s.diDelayCount = 0
result = apiResultSuccess
}
domain.Process(&response, s.serviceID, name, version, s.metrics.Domain)
domain.Process(&response, s.serviceID, name, version, s.metrics.Domain, s.aggregateOnly)
s.postprocess()

case http.StatusUnauthorized, http.StatusForbidden:
Expand Down
144 changes: 144 additions & 0 deletions pkg/rt/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,54 @@ func TestRTSubscriberFixture(t *testing.T) {
}
}

func TestRTSubscriberAggOnlyFixture(t *testing.T) {
var (
namespace = "testspace"
subsystem = "testsystem"
registry = prometheus.NewRegistry()
nameFilter = filter.Filter{}
metrics = prom.NewMetrics(namespace, subsystem, nameFilter, registry)
)

// Set up a subscriber.
var (
client = newMockRealtimeClient(rtResponseFixture, `{}`)
serviceID = "my-service-id"
serviceName = "my-service-name"
serviceVersion = 123
cache = &mockCache{}
processed = make(chan struct{})
postprocess = func() { close(processed) }
options = []rt.SubscriberOption{rt.WithMetadataProvider(cache), rt.WithPostprocess(postprocess), rt.WithAggregateOnly(true)}
subscriber = rt.NewSubscriber(client, "irrelevant token", serviceID, metrics, options...)
)

// Prep the mock cache.
cache.update([]api.Service{{ID: serviceID, Name: serviceName, Version: serviceVersion}})

// Tell the subscriber to fetch real-time stats.
ctx, cancel := context.WithCancel(context.Background())
errc := make(chan error, 1)
go func() { errc <- subscriber.RunRealtime(ctx) }()

// Block until the subscriber does finishes one fetch
<-processed

// Assert the Prometheus metrics.
output := prometheusOutput(t, registry, namespace+"_"+subsystem+"_")
assertMetricOutput(t, expectedRTMetricsAggOutputMap, output)

// Kill the subscriber's goroutine, and wait for it to finish.
cancel()
err := <-errc
switch {
case err == nil:
case errors.Is(err, context.Canceled):
case err != nil:
t.Fatal(err)
}
}

func TestOriginSubscriberFixture(t *testing.T) {
var (
namespace = "testspace"
Expand Down Expand Up @@ -110,6 +158,54 @@ func TestOriginSubscriberFixture(t *testing.T) {
}
}

func TestOriginSubscriberAggOnlyFixture(t *testing.T) {
var (
namespace = "testspace"
subsystem = "testsytem"
registry = prometheus.NewRegistry()
nameFilter = filter.Filter{}
metrics = prom.NewMetrics(namespace, subsystem, nameFilter, registry)
)

// Set up a subscriber.
var (
client = newMockRealtimeClient(originsResponseFixture, `{}`)
serviceID = "my-service-id"
serviceName = "my-service-name"
serviceVersion = 123
cache = &mockCache{}
processed = make(chan struct{})
postprocess = func() { close(processed) }
options = []rt.SubscriberOption{rt.WithMetadataProvider(cache), rt.WithPostprocess(postprocess), rt.WithAggregateOnly(true)}
subscriber = rt.NewSubscriber(client, "irrelevant token", serviceID, metrics, options...)
)

// Prep the mock cache.
cache.update([]api.Service{{ID: serviceID, Name: serviceName, Version: serviceVersion}})

// Tell the subscriber to fetch real-time stats.
ctx, cancel := context.WithCancel(context.Background())
errc := make(chan error, 1)
go func() { errc <- subscriber.RunOrigins(ctx) }()

// Block until the subscriber does finishes one fetch
<-processed

// Assert the Prometheus metrics.
output := prometheusOutput(t, registry, namespace+"_origin_")
assertMetricOutput(t, expectedOriginsMetricsAggOutputMap, output)

// Kill the subscriber's goroutine, and wait for it to finish.
cancel()
err := <-errc
switch {
case err == nil:
case errors.Is(err, context.Canceled):
case err != nil:
t.Fatal(err)
}
}

func TestDomainSubscriberFixture(t *testing.T) {
var (
namespace = "testspace"
Expand Down Expand Up @@ -158,6 +254,54 @@ func TestDomainSubscriberFixture(t *testing.T) {
}
}

func TestDomainSubscriberAggOnlyFixture(t *testing.T) {
var (
namespace = "testspace"
subsystem = "testsytem"
registry = prometheus.NewRegistry()
nameFilter = filter.Filter{}
metrics = prom.NewMetrics(namespace, subsystem, nameFilter, registry)
)

// Set up a subscriber.
var (
client = newMockRealtimeClient(domainsResponseFixture, `{}`)
serviceID = "my-service-id"
serviceName = "my-service-name"
serviceVersion = 123
cache = &mockCache{}
processed = make(chan struct{})
postprocess = func() { close(processed) }
options = []rt.SubscriberOption{rt.WithMetadataProvider(cache), rt.WithPostprocess(postprocess), rt.WithAggregateOnly(true)}
subscriber = rt.NewSubscriber(client, "irrelevant token", serviceID, metrics, options...)
)

// Prep the mock cache.
cache.update([]api.Service{{ID: serviceID, Name: serviceName, Version: serviceVersion}})

// Tell the subscriber to fetch real-time stats.
ctx, cancel := context.WithCancel(context.Background())
errc := make(chan error, 1)
go func() { errc <- subscriber.RunDomains(ctx) }()

// Block until the subscriber does finishes one fetch
<-processed

// Assert the Prometheus metrics.
output := prometheusOutput(t, registry, namespace+"_domain_")
assertMetricOutput(t, expectedDomainsMetricsAggOutputMap, output)

// Kill the subscriber's goroutine, and wait for it to finish.
cancel()
err := <-errc
switch {
case err == nil:
case errors.Is(err, context.Canceled):
case err != nil:
t.Fatal(err)
}
}

func TestSubscriberNoData(t *testing.T) {
var (
client = newMockRealtimeClient(`{"Error": "No data available, please retry"}`, `{}`)
Expand Down

0 comments on commit 11aba6d

Please sign in to comment.