Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce Buildkite-Agent-Metrics-Poll-Duration header #83

Merged
merged 2 commits into from
May 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"time"
)

Expand All @@ -17,6 +18,8 @@ const (
IdleAgentCount = "IdleAgentCount"
BusyAgentCount = "BusyAgentCount"
TotalAgentCount = "TotalAgentCount"

PollDurationHeader = `Buildkite-Agent-Metrics-Poll-Duration`
)

type Collector struct {
Expand All @@ -30,9 +33,10 @@ type Collector struct {
}

type Result struct {
Totals map[string]int
Queues map[string]map[string]int
Org string
Totals map[string]int
Queues map[string]map[string]int
Org string
PollDuration time.Duration
}

type organizationResponse struct {
Expand Down Expand Up @@ -120,6 +124,17 @@ func (c *Collector) Collect() (*Result, error) {

var allMetrics allMetricsResponse
defer res.Body.Close()

// Check if we get a poll duration header from server
if pollSeconds := res.Header.Get(PollDurationHeader); pollSeconds != "" {
pollSecondsInt, err := strconv.ParseInt(pollSeconds, 10, 64)
if err != nil {
log.Printf("Failed to parse %s header: %v", PollDurationHeader, err)
} else {
result.PollDuration = time.Duration(pollSecondsInt) * time.Second
}
}

err = json.NewDecoder(res.Body).Decode(&allMetrics)
if err != nil {
return nil, err
Expand Down
95 changes: 82 additions & 13 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,20 @@ func TestCollectorWithEmptyResponseForAllQueues(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/metrics" {
w.WriteHeader(http.StatusOK)
io.WriteString(w, `{}`)
_, _ = io.WriteString(w, `{
"organization": {
"slug": "test"
},
"jobs": {},
"agents": {}
}`)
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
c := &Collector{
Endpoint: s.URL,
Token: "abc123",
Endpoint: s.URL,
Token: "abc123",
UserAgent: "some-client/1.2.3",
}
res, err := c.Collect()
Expand Down Expand Up @@ -57,14 +63,30 @@ func TestCollectorWithNoJobsForAllQueues(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/metrics" {
w.WriteHeader(http.StatusOK)
io.WriteString(w, `{"jobs":{"scheduled":0,"running":0,"all":0,"queues":{}},"agents":{"idle":0,"busy":0,"all":0,"queues":{}}}`)
_, _ = io.WriteString(w, `{
"organization": {
"slug": "test"
},
"jobs": {
"scheduled": 0,
"running": 0,
"total": 0,
"queues": {}
},
"agents": {
"idle": 0,
"busy": 0,
"total": 0,
"queues": {}
}
}`)
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
c := &Collector{
Endpoint: s.URL,
Token: "abc123",
Endpoint: s.URL,
Token: "abc123",
UserAgent: "some-client/1.2.3",
}
res, err := c.Collect()
Expand Down Expand Up @@ -102,14 +124,47 @@ func TestCollectorWithSomeJobsAndAgentsForAllQueues(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/metrics" {
w.WriteHeader(http.StatusOK)
io.WriteString(w, `{"jobs":{"scheduled":3,"running":1,"total":4,"queues":{"default":{"scheduled":2,"running":1,"total":3},"deploy":{"scheduled":1,"running":0,"total":1}}},"agents":{"idle":0,"busy":1,"total":1,"queues":{"default":{"idle":0,"busy":1,"total":1}}}}`)
_, _ = io.WriteString(w, `{
"organization": {
"slug": "test"
},
"jobs": {
"scheduled": 3,
"running": 1,
"total": 4,
"queues": {
"default": {
"scheduled": 2,
"running": 1,
"total": 3
},
"deploy": {
"scheduled": 1,
"running": 0,
"total": 1
}
}
},
"agents": {
"idle": 0,
"busy": 1,
"total": 1,
"queues": {
"default": {
"idle": 0,
"busy": 1,
"total": 1
}
}
}
}`)
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
c := &Collector{
Endpoint: s.URL,
Token: "abc123",
Endpoint: s.URL,
Token: "abc123",
UserAgent: "some-client/1.2.3",
}
res, err := c.Collect()
Expand Down Expand Up @@ -166,16 +221,30 @@ func TestCollectorWithSomeJobsAndAgentsForAQueue(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/metrics/queue" && r.URL.Query().Get("name") == "deploy" {
w.WriteHeader(http.StatusOK)
io.WriteString(w, `{"jobs":{"scheduled":3,"running":1,"total":4},"agents":{"idle":0,"busy":1,"total":1}}`)
_, _ = io.WriteString(w, `{
"organization": {
"slug": "test"
},
"jobs": {
"scheduled": 3,
"running": 1,
"total": 4
},
"agents": {
"idle": 0,
"busy": 1,
"total": 1
}
}`)
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
c := &Collector{
Endpoint: s.URL,
Token: "abc123",
Endpoint: s.URL,
Token: "abc123",
UserAgent: "some-client/1.2.3",
Queue: "deploy",
Queue: "deploy",
}
res, err := c.Collect()
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions lambda/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (
"github.com/buildkite/buildkite-agent-metrics/version"
)

var (
nextPollTime time.Time
)

func main() {
if os.Getenv(`DEBUG`) != "" {
_, err := Handler(context.Background(), json.RawMessage([]byte{}))
Expand All @@ -42,6 +46,12 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) {

t := time.Now()

if !nextPollTime.IsZero() && nextPollTime.After(t) {
log.Printf("Skipping polling, next poll time is in %v",
nextPollTime.Sub(t))
return "", nil
}

if ssmTokenKey != "" {
token = backend.RetrieveFromParameterStore(ssmTokenKey)
}
Expand Down Expand Up @@ -88,5 +98,9 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) {
}

log.Printf("Finished in %s", time.Now().Sub(t))

// Store the next acceptable poll time in global state
nextPollTime = time.Now().Add(res.PollDuration)

return "", nil
}
27 changes: 20 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,33 +92,46 @@ func main() {
DebugHttp: *debugHttp,
}

f := func() error {
f := func() (time.Duration, error) {
t := time.Now()

result, err := c.Collect()
if err != nil {
return err
return time.Duration(0), err
}

if !*dryRun {
err = bk.Collect(result)
if err != nil {
return err
return time.Duration(0), err
}
}

log.Printf("Finished in %s", time.Now().Sub(t))
return nil
return result.PollDuration, nil
}

if err := f(); err != nil {
minPollDuration, err := f()
if err != nil {
fmt.Println(err)
os.Exit(1)
}

if *interval > 0 {
for _ = range time.NewTicker(*interval).C {
if err := f(); err != nil {
for {
waitTime := *interval

// Respect the min poll duration returned by the API
if *interval < minPollDuration {
log.Printf("Increasing poll duration based on rate-limit headers")
waitTime = minPollDuration
}

log.Printf("Waiting for %v (minimum of %v)", waitTime, minPollDuration)
time.Sleep(waitTime)

minPollDuration, err = f()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
Expand Down