diff --git a/collector/collector.go b/collector/collector.go index ff5a9582..104f1667 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -7,6 +7,7 @@ import ( "net/http" "net/http/httputil" "net/url" + "strconv" "time" ) @@ -17,6 +18,8 @@ const ( IdleAgentCount = "IdleAgentCount" BusyAgentCount = "BusyAgentCount" TotalAgentCount = "TotalAgentCount" + + PollDurationHeader = `Buildkite-Agent-Metrics-Poll-Duration` ) type Collector struct { @@ -30,9 +33,10 @@ type Collector struct { } type Result struct { - Totals map[string]int - Queues map[string]map[string]int - Org string + Totals map[string]int + Queues map[string]map[string]int + Org string + PollDuration time.Duration } type organizationResponse struct { @@ -120,6 +124,17 @@ func (c *Collector) Collect() (*Result, error) { var allMetrics allMetricsResponse defer res.Body.Close() + + // Check if we get a poll duration header from server + if pollSeconds := res.Header.Get(PollDurationHeader); pollSeconds != "" { + pollSecondsInt, err := strconv.ParseInt(pollSeconds, 10, 64) + if err != nil { + log.Printf("Failed to parse %s header: %v", PollDurationHeader, err) + } else { + result.PollDuration = time.Duration(pollSecondsInt) * time.Second + } + } + err = json.NewDecoder(res.Body).Decode(&allMetrics) if err != nil { return nil, err diff --git a/collector/collector_test.go b/collector/collector_test.go index 5eb0a419..ac74d7b8 100644 --- a/collector/collector_test.go +++ b/collector/collector_test.go @@ -12,14 +12,20 @@ func TestCollectorWithEmptyResponseForAllQueues(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/metrics" { w.WriteHeader(http.StatusOK) - io.WriteString(w, `{}`) + _, _ = io.WriteString(w, `{ + "organization": { + "slug": "test" + }, + "jobs": {}, + "agents": {} + }`) } else { w.WriteHeader(http.StatusNotFound) } })) c := &Collector{ - Endpoint: s.URL, - Token: "abc123", + Endpoint: s.URL, + Token: "abc123", UserAgent: "some-client/1.2.3", } res, err := c.Collect() @@ -57,14 +63,30 @@ func TestCollectorWithNoJobsForAllQueues(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/metrics" { w.WriteHeader(http.StatusOK) - io.WriteString(w, `{"jobs":{"scheduled":0,"running":0,"all":0,"queues":{}},"agents":{"idle":0,"busy":0,"all":0,"queues":{}}}`) + _, _ = io.WriteString(w, `{ + "organization": { + "slug": "test" + }, + "jobs": { + "scheduled": 0, + "running": 0, + "total": 0, + "queues": {} + }, + "agents": { + "idle": 0, + "busy": 0, + "total": 0, + "queues": {} + } + }`) } else { w.WriteHeader(http.StatusNotFound) } })) c := &Collector{ - Endpoint: s.URL, - Token: "abc123", + Endpoint: s.URL, + Token: "abc123", UserAgent: "some-client/1.2.3", } res, err := c.Collect() @@ -102,14 +124,47 @@ func TestCollectorWithSomeJobsAndAgentsForAllQueues(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/metrics" { w.WriteHeader(http.StatusOK) - io.WriteString(w, `{"jobs":{"scheduled":3,"running":1,"total":4,"queues":{"default":{"scheduled":2,"running":1,"total":3},"deploy":{"scheduled":1,"running":0,"total":1}}},"agents":{"idle":0,"busy":1,"total":1,"queues":{"default":{"idle":0,"busy":1,"total":1}}}}`) + _, _ = io.WriteString(w, `{ + "organization": { + "slug": "test" + }, + "jobs": { + "scheduled": 3, + "running": 1, + "total": 4, + "queues": { + "default": { + "scheduled": 2, + "running": 1, + "total": 3 + }, + "deploy": { + "scheduled": 1, + "running": 0, + "total": 1 + } + } + }, + "agents": { + "idle": 0, + "busy": 1, + "total": 1, + "queues": { + "default": { + "idle": 0, + "busy": 1, + "total": 1 + } + } + } + }`) } else { w.WriteHeader(http.StatusNotFound) } })) c := &Collector{ - Endpoint: s.URL, - Token: "abc123", + Endpoint: s.URL, + Token: "abc123", UserAgent: "some-client/1.2.3", } res, err := c.Collect() @@ -166,16 +221,30 @@ func TestCollectorWithSomeJobsAndAgentsForAQueue(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/metrics/queue" && r.URL.Query().Get("name") == "deploy" { w.WriteHeader(http.StatusOK) - io.WriteString(w, `{"jobs":{"scheduled":3,"running":1,"total":4},"agents":{"idle":0,"busy":1,"total":1}}`) + _, _ = io.WriteString(w, `{ + "organization": { + "slug": "test" + }, + "jobs": { + "scheduled": 3, + "running": 1, + "total": 4 + }, + "agents": { + "idle": 0, + "busy": 1, + "total": 1 + } + }`) } else { w.WriteHeader(http.StatusNotFound) } })) c := &Collector{ - Endpoint: s.URL, - Token: "abc123", + Endpoint: s.URL, + Token: "abc123", UserAgent: "some-client/1.2.3", - Queue: "deploy", + Queue: "deploy", } res, err := c.Collect() if err != nil { diff --git a/lambda/main.go b/lambda/main.go index 9032c45b..6d7f1f3e 100644 --- a/lambda/main.go +++ b/lambda/main.go @@ -16,6 +16,10 @@ import ( "github.com/buildkite/buildkite-agent-metrics/version" ) +var ( + nextPollTime time.Time +) + func main() { if os.Getenv(`DEBUG`) != "" { _, err := Handler(context.Background(), json.RawMessage([]byte{})) @@ -42,6 +46,12 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { t := time.Now() + if !nextPollTime.IsZero() && nextPollTime.After(t) { + log.Printf("Skipping polling, next poll time is in %v", + nextPollTime.Sub(t)) + return "", nil + } + if ssmTokenKey != "" { token = backend.RetrieveFromParameterStore(ssmTokenKey) } @@ -88,5 +98,9 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { } log.Printf("Finished in %s", time.Now().Sub(t)) + + // Store the next acceptable poll time in global state + nextPollTime = time.Now().Add(res.PollDuration) + return "", nil } diff --git a/main.go b/main.go index 6573a544..ba5dce5a 100644 --- a/main.go +++ b/main.go @@ -92,33 +92,46 @@ func main() { DebugHttp: *debugHttp, } - f := func() error { + f := func() (time.Duration, error) { t := time.Now() result, err := c.Collect() if err != nil { - return err + return time.Duration(0), err } if !*dryRun { err = bk.Collect(result) if err != nil { - return err + return time.Duration(0), err } } log.Printf("Finished in %s", time.Now().Sub(t)) - return nil + return result.PollDuration, nil } - if err := f(); err != nil { + minPollDuration, err := f() + if err != nil { fmt.Println(err) os.Exit(1) } if *interval > 0 { - for _ = range time.NewTicker(*interval).C { - if err := f(); err != nil { + for { + waitTime := *interval + + // Respect the min poll duration returned by the API + if *interval < minPollDuration { + log.Printf("Increasing poll duration based on rate-limit headers") + waitTime = minPollDuration + } + + log.Printf("Waiting for %v (minimum of %v)", waitTime, minPollDuration) + time.Sleep(waitTime) + + minPollDuration, err = f() + if err != nil { fmt.Println(err) os.Exit(1) }