From 6612a1f43266acc777baf02e0e7cc7d52fe35007 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Wed, 17 Jul 2024 10:37:35 +1000 Subject: [PATCH 01/16] Use non-deprecated NewSession --- lambda/main.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lambda/main.go b/lambda/main.go index 900767a..373c633 100644 --- a/lambda/main.go +++ b/lambda/main.go @@ -170,7 +170,10 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { } // establish an AWS session to be re-used - sess := session.New() + sess, err := session.NewSession() + if err != nil { + return "", err + } // get last scale in and out from asg's activities // This is wrapped in a mutex to avoid multiple outbound requests if the From 8f9f14b344068d823463665952457c4e73b0abc6 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Wed, 17 Jul 2024 10:38:41 +1000 Subject: [PATCH 02/16] Log friendly scale-in/out zero times --- lambda/main.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lambda/main.go b/lambda/main.go index 373c633..f220fc9 100644 --- a/lambda/main.go +++ b/lambda/main.go @@ -207,15 +207,19 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { return } + lastScaleInStr := "never" if scaleInOutput != nil { lastScaleIn = *scaleInOutput.StartTime + lastScaleInStr = lastScaleIn.Format(time.RFC3339Nano) } + lastScaleOutStr := "never" if scaleOutOutput != nil { lastScaleOut = *scaleOutOutput.StartTime + lastScaleOutStr = lastScaleOut.Format(time.RFC3339Nano) } scalingTimeDiff := time.Since(scalingLastActivityStartTime) - log.Printf("Succesfully retrieved last scaling activity events. Last scale out %v, last scale in %v. Discovery took %s.", lastScaleOut, lastScaleIn, scalingTimeDiff) + log.Printf("Succesfully retrieved last scaling activity events. Last scale out %s, last scale in %s. Discovery took %s.", lastScaleOutStr, lastScaleInStr, scalingTimeDiff) }() token := os.Getenv("BUILDKITE_AGENT_TOKEN") From af8ca66d80b97115aa09d7a6b4607c74a8e7087b Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Wed, 17 Jul 2024 10:40:03 +1000 Subject: [PATCH 03/16] Tiny error handling reordering --- lambda/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lambda/main.go b/lambda/main.go index f220fc9..0ed0e75 100644 --- a/lambda/main.go +++ b/lambda/main.go @@ -226,11 +226,11 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { ssmTokenKey := os.Getenv("BUILDKITE_AGENT_TOKEN_SSM_KEY") if ssmTokenKey != "" { - var err error - token, err = scaler.RetrieveFromParameterStore(sess, ssmTokenKey) + tk, err := scaler.RetrieveFromParameterStore(sess, ssmTokenKey) if err != nil { return "", err } + token = tk } if token == "" { From 6eae195f9b527e660b341814af88218f7107c67e Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Wed, 17 Jul 2024 10:40:50 +1000 Subject: [PATCH 04/16] Run scaler before checking for timeout Rearrange the loop so that scaler.Run happens first. This also lets us check for timeout while sleeping. --- lambda/main.go | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/lambda/main.go b/lambda/main.go index 0ed0e75..d52ff77 100644 --- a/lambda/main.go +++ b/lambda/main.go @@ -266,28 +266,27 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { } for { - select { - case <-timeout: - return "", nil - - default: - minPollDuration, err := scaler.Run() - if err != nil { - log.Printf("Scaling error: %v", err) - } + minPollDuration, err := scaler.Run() + if err != nil { + log.Printf("Scaling error: %v", err) + } - if interval < minPollDuration { - interval = minPollDuration - log.Printf("Increasing poll interval to %v based on rate limit", - interval) - } + if interval < minPollDuration { + interval = minPollDuration + log.Printf("Increasing poll interval to %v based on rate limit", + interval) + } - // Persist the times back into the global state - lastScaleIn = scaler.LastScaleIn() - lastScaleOut = scaler.LastScaleOut() + // Persist the times back into the global state + lastScaleIn = scaler.LastScaleIn() + lastScaleOut = scaler.LastScaleOut() - log.Printf("Waiting for %v\n", interval) - time.Sleep(interval) + log.Printf("Waiting for %v\n", interval) + select { + case <-timeout: + return "", nil + case <-time.After(interval): + // Continue } } } From 2bd8330910d0bd0b941c4ddde679ec7ef89e2856 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Wed, 17 Jul 2024 10:42:34 +1000 Subject: [PATCH 05/16] Incredibly minor code flattening --- scaler/scaler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scaler/scaler.go b/scaler/scaler.go index fb63171..476fa81 100644 --- a/scaler/scaler.go +++ b/scaler/scaler.go @@ -115,7 +115,8 @@ func (s *Scaler) Run() (time.Duration, error) { if desired > asg.MaxSize { log.Printf("⚠️ Desired count exceed MaxSize, capping at %d", asg.MaxSize) desired = asg.MaxSize - } else if desired < asg.MinSize { + } + if desired < asg.MinSize { log.Printf("⚠️ Desired count is less than MinSize, capping at %d", asg.MinSize) desired = asg.MinSize } From 0daf1f042f84a36a363c465ef9d5d098f0af3209 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Tue, 16 Jul 2024 16:31:51 +1000 Subject: [PATCH 06/16] Reorder env var parsing error handling --- lambda/main.go | 46 ++++++++++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/lambda/main.go b/lambda/main.go index d52ff77..9e788e5 100644 --- a/lambda/main.go +++ b/lambda/main.go @@ -81,69 +81,75 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { ) if v := os.Getenv("LAMBDA_INTERVAL"); v != "" { - if interval, err = time.ParseDuration(v); err != nil { + d, err := time.ParseDuration(v) + if err != nil { return "", err } + interval = d } if v := os.Getenv("LAMBDA_TIMEOUT"); v != "" { - if timeoutDuration, err := time.ParseDuration(v); err != nil { + d, err := time.ParseDuration(v) + if err != nil { return "", err - } else { - timeout = time.After(timeoutDuration) } + timeout = time.After(d) } if v := os.Getenv("ASG_ACTIVITY_TIMEOUT"); v != "" { - if timeoutDuration, err := time.ParseDuration(v); err != nil { + d, err := time.ParseDuration(v) + if err != nil { return "", err - } else { - asgActivityTimeoutDuration = timeoutDuration } + asgActivityTimeoutDuration = d } if v := os.Getenv("SCALE_IN_COOLDOWN_PERIOD"); v != "" { - if scaleInCooldownPeriod, err = time.ParseDuration(v); err != nil { + p, err := time.ParseDuration(v) + if err != nil { return "", err } + scaleInCooldownPeriod = p } if v := os.Getenv("SCALE_IN_FACTOR"); v != "" { - if scaleInFactor, err = strconv.ParseFloat(v, 64); err != nil { + f, err := strconv.ParseFloat(v, 64) + if err != nil { return "", err } - scaleInFactor = math.Abs(scaleInFactor) + scaleInFactor = math.Abs(f) } if v := os.Getenv("SCALE_ONLY_AFTER_ALL_EVENT"); v != "" { - if v == "true" || v == "1" { - scaleOnlyAfterAllEvent = true - } + scaleOnlyAfterAllEvent = v == "true" || v == "1" } if v := os.Getenv("SCALE_OUT_COOLDOWN_PERIOD"); v != "" { - if scaleOutCooldownPeriod, err = time.ParseDuration(v); err != nil { + p, err := time.ParseDuration(v) + if err != nil { return "", err } + scaleOutCooldownPeriod = p } if v := os.Getenv("SCALE_OUT_FACTOR"); v != "" { - if scaleOutFactor, err = strconv.ParseFloat(v, 64); err != nil { + f, err := strconv.ParseFloat(v, 64) + if err != nil { return "", err } - scaleOutFactor = math.Abs(scaleOutFactor) + scaleOutFactor = math.Abs(f) } if v := os.Getenv("INCLUDE_WAITING"); v != "" { - if v == "true" || v == "1" { - includeWaiting = true - } + includeWaiting = v == "true" || v == "1" } if v := os.Getenv("INSTANCE_BUFFER"); v != "" { - if instanceBuffer, err = strconv.Atoi(v); err != nil { + i, err := strconv.Atoi(v) + if err != nil { return "", err } + instanceBuffer = i } maxDescribeScalingActivitiesPages := -1 From 5e4acc0185397c9a5ec7ebea54b4c8b3f865e081 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Tue, 16 Jul 2024 17:16:38 +1000 Subject: [PATCH 07/16] Front-load checking required env vars --- lambda/main.go | 45 ++++++++++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/lambda/main.go b/lambda/main.go index 9e788e5..be278d7 100644 --- a/lambda/main.go +++ b/lambda/main.go @@ -26,23 +26,6 @@ var ( lastScaleIn, lastScaleOut time.Time ) -func mustGetEnv(env string) string { - val := os.Getenv(env) - if val == "" { - log.Fatalf("Env %q not set", env) - } - return val -} - -func mustGetEnvInt(env string) int { - v := mustGetEnv(env) - vi, err := strconv.Atoi(v) - if err != nil { - log.Fatalf("Env %q is not an int: %v", env, v) - } - return vi -} - func main() { if os.Getenv("DEBUG") != "" { _, err := Handler(context.Background(), json.RawMessage([]byte{})) @@ -80,6 +63,26 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { disableScaleOut, disableScaleIn bool ) + // Required environment variables + buildkiteQueue := os.Getenv("BUILDKITE_QUEUE") + if buildkiteQueue == "" { + return "", errors.New("BUILDKITE_QUEUE is required") + } + + asgName := os.Getenv("ASG_NAME") + if asgName == "" { + return "", errors.New("ASG_NAME is required") + } + + agentsPerInstanceStr := os.Getenv("AGENTS_PER_INSTANCE") + if agentsPerInstanceStr == "" { + return "", errors.New("AGENTS_PER_INSTANCE is required") + } + agentsPerInstance, err := strconv.Atoi(agentsPerInstanceStr) + if err != nil { + return "", fmt.Errorf("AGENTS_PER_INSTANCE must be an integer: %w", err) + } + if v := os.Getenv("LAMBDA_INTERVAL"); v != "" { d, err := time.ParseDuration(v) if err != nil { @@ -194,7 +197,7 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { } asg := &scaler.ASGDriver{ - Name: mustGetEnv("ASG_NAME"), + Name: asgName, Sess: sess, MaxDescribeScalingActivitiesPages: maxDescribeScalingActivitiesPages, } @@ -245,9 +248,9 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { client := buildkite.NewClient(token) params := scaler.Params{ - BuildkiteQueue: mustGetEnv("BUILDKITE_QUEUE"), - AutoScalingGroupName: mustGetEnv("ASG_NAME"), - AgentsPerInstance: mustGetEnvInt("AGENTS_PER_INSTANCE"), + BuildkiteQueue: buildkiteQueue, + AutoScalingGroupName: asgName, + AgentsPerInstance: agentsPerInstance, IncludeWaiting: includeWaiting, ScaleInParams: scaler.ScaleParams{ CooldownPeriod: scaleInCooldownPeriod, From 6c6d1aa371acbc9936442429c461e87963f482be Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Wed, 17 Jul 2024 10:46:17 +1000 Subject: [PATCH 08/16] Nil time fields paranoia checks --- lambda/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lambda/main.go b/lambda/main.go index be278d7..b25ce83 100644 --- a/lambda/main.go +++ b/lambda/main.go @@ -217,12 +217,12 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { } lastScaleInStr := "never" - if scaleInOutput != nil { + if scaleInOutput != nil && scaleInOutput.StartTime != nil { lastScaleIn = *scaleInOutput.StartTime lastScaleInStr = lastScaleIn.Format(time.RFC3339Nano) } lastScaleOutStr := "never" - if scaleOutOutput != nil { + if scaleOutOutput != nil && scaleOutOutput.StartTime != nil { lastScaleOut = *scaleOutOutput.StartTime lastScaleOutStr = lastScaleOut.Format(time.RFC3339Nano) } From 68bb3519c0ded80a4b4369d5e5e3c667ff642fd5 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Wed, 17 Jul 2024 10:47:29 +1000 Subject: [PATCH 09/16] Use boolean for tracking if fetch happened --- lambda/main.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lambda/main.go b/lambda/main.go index b25ce83..a75f7cb 100644 --- a/lambda/main.go +++ b/lambda/main.go @@ -23,6 +23,7 @@ import ( // On a cold start this will be reset to a zero value var ( lastScaleMu sync.Mutex + lastScaleTimesFetched bool lastScaleIn, lastScaleOut time.Time ) @@ -191,7 +192,7 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { lastScaleMu.Lock() defer lastScaleMu.Unlock() - if (disableScaleIn || !lastScaleIn.IsZero()) && (disableScaleOut || !lastScaleOut.IsZero()) { + if lastScaleTimesFetched { // We've already fetched the last scaling times that we need. return } @@ -227,6 +228,8 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { lastScaleOutStr = lastScaleOut.Format(time.RFC3339Nano) } + lastScaleTimesFetched = true + scalingTimeDiff := time.Since(scalingLastActivityStartTime) log.Printf("Succesfully retrieved last scaling activity events. Last scale out %s, last scale in %s. Discovery took %s.", lastScaleOutStr, lastScaleInStr, scalingTimeDiff) }() From 434ad806d486e4e05e73e8f9b455bb1e95e4fb5e Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Wed, 17 Jul 2024 10:48:26 +1000 Subject: [PATCH 10/16] Tweak 'Waiting for' log log.Printf includes newline Also we're now waiting for timeout or sleep --- lambda/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambda/main.go b/lambda/main.go index a75f7cb..9b701a8 100644 --- a/lambda/main.go +++ b/lambda/main.go @@ -293,7 +293,7 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { lastScaleIn = scaler.LastScaleIn() lastScaleOut = scaler.LastScaleOut() - log.Printf("Waiting for %v\n", interval) + log.Printf("Waiting for %v or timeout", interval) select { case <-timeout: return "", nil From 3a2f4abba32ad2e1480f6d27b6b1242e2007a3b0 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Wed, 17 Jul 2024 10:49:59 +1000 Subject: [PATCH 11/16] Flatten main flow in NewScaler --- scaler/scaler.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/scaler/scaler.go b/scaler/scaler.go index 476fa81..6f0ff9d 100644 --- a/scaler/scaler.go +++ b/scaler/scaler.go @@ -68,20 +68,20 @@ func NewScaler(client *buildkite.Client, sess *session.Session, params Params) ( if params.DryRun { scaler.autoscaling = &dryRunASG{} - if params.PublishCloudWatchMetrics { scaler.metrics = &dryRunMetricsPublisher{} } - } else { - scaler.autoscaling = &ASGDriver{ - Name: params.AutoScalingGroupName, - Sess: sess, - } + return scaler, nil + } - if params.PublishCloudWatchMetrics { - scaler.metrics = &cloudWatchMetricsPublisher{ - sess: sess, - } + scaler.autoscaling = &ASGDriver{ + Name: params.AutoScalingGroupName, + Sess: sess, + } + + if params.PublishCloudWatchMetrics { + scaler.metrics = &cloudWatchMetricsPublisher{ + sess: sess, } } From 4be5b0dcfb6d9558100bfd328ef2fe855d98ddba Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Wed, 17 Jul 2024 10:10:29 +1000 Subject: [PATCH 12/16] Simplify boolean == true, use switches --- scaler/scaler.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/scaler/scaler.go b/scaler/scaler.go index 6f0ff9d..5da1cfe 100644 --- a/scaler/scaler.go +++ b/scaler/scaler.go @@ -143,7 +143,7 @@ func (s *Scaler) scaleIn(desired int64, current AutoscaleGroupDetails) error { lastScaleInEvent := s.scaleInParams.LastEvent lastScaleOutEvent := s.scaleOutParams.LastEvent lastEvent := lastScaleInEvent - if s.scaleOnlyAfterAllEvent == true && lastScaleInEvent.Before(lastScaleOutEvent) { + if s.scaleOnlyAfterAllEvent && lastScaleInEvent.Before(lastScaleOutEvent) { lastEvent = lastScaleOutEvent } cooldownRemaining := s.scaleInParams.CooldownPeriod - time.Since(lastEvent) @@ -162,13 +162,16 @@ func (s *Scaler) scaleIn(desired int64, current AutoscaleGroupDetails) error { // Use Floor to avoid never reaching upper bound factoredChange := int64(math.Floor(float64(change) * factor)) - if factoredChange < change { + switch { + case factoredChange < change: log.Printf("👮‍️ Increasing scale-in of %d by factor of %0.2f", change, factor) - } else if factoredChange > change { + + case factoredChange > change: log.Printf("👮‍️ Decreasing scale-in of %d by factor of %0.2f", change, factor) - } else { + + default: log.Printf("👮‍️ Scale-in factor of %0.2f was ignored", factor) } @@ -206,7 +209,7 @@ func (s *Scaler) scaleOut(desired int64, current AutoscaleGroupDetails) error { lastScaleInEvent := s.scaleInParams.LastEvent lastScaleOutEvent := s.scaleOutParams.LastEvent lastEvent := lastScaleOutEvent - if s.scaleOnlyAfterAllEvent == true && lastScaleOutEvent.Before(lastScaleInEvent) { + if s.scaleOnlyAfterAllEvent && lastScaleOutEvent.Before(lastScaleInEvent) { lastEvent = lastScaleInEvent } cooldownRemaining := s.scaleOutParams.CooldownPeriod - time.Since(lastEvent) @@ -225,13 +228,16 @@ func (s *Scaler) scaleOut(desired int64, current AutoscaleGroupDetails) error { // Use Ceil to avoid never reaching upper bound factoredChange := int64(math.Ceil(float64(change) * s.scaleOutParams.Factor)) - if factoredChange > change { + switch { + case factoredChange > change: log.Printf("👮‍️ Increasing scale-out of %d by factor of %0.2f", change, s.scaleOutParams.Factor) - } else if factoredChange < change { + + case factoredChange < change: log.Printf("👮‍️ Decreasing scale-out of %d by factor of %0.2f", change, s.scaleOutParams.Factor) - } else { + + default: log.Printf("👮‍️ Scale-out factor of %0.2f was ignored", s.scaleOutParams.Factor) } From 362507e8be317f4a7e10e79a6e3cee233113f2f9 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Wed, 17 Jul 2024 11:24:51 +1000 Subject: [PATCH 13/16] Update to Go 1.22 * Build container (Makefile) * Pipeline containers (pipeline.yaml) * go.mod * go get -u && go mod tidy --- .buildkite/pipeline.yaml | 128 +++++++++++++++++++-------------------- Makefile | 3 +- go.mod | 4 +- go.sum | 7 ++- 4 files changed, 73 insertions(+), 69 deletions(-) diff --git a/.buildkite/pipeline.yaml b/.buildkite/pipeline.yaml index 35a0af3..2ad7431 100644 --- a/.buildkite/pipeline.yaml +++ b/.buildkite/pipeline.yaml @@ -2,73 +2,73 @@ agents: queue: elastic-runners steps: -- name: ":go: Check go fmt" - key: go-fmt - command: .buildkite/steps/test-go-fmt.sh - plugins: - - docker#v5.8.0: - image: golang:1.19 - workdir: /go/src/github.com/buildkite/buildkite-agent-scaler + - name: ":go: Check go fmt" + key: go-fmt + command: .buildkite/steps/test-go-fmt.sh + plugins: + - docker#v5.8.0: + image: golang:1.22 + workdir: /go/src/github.com/buildkite/buildkite-agent-scaler -- name: ":golang: Run Tests" - key: test - command: .buildkite/steps/tests.sh - plugins: - - docker#v5.8.0: - image: golang:1.19 - workdir: /go/src/github.com/buildkite/buildkite-agent-scaler + - name: ":golang: Run Tests" + key: test + command: .buildkite/steps/tests.sh + plugins: + - docker#v5.8.0: + image: golang:1.22 + workdir: /go/src/github.com/buildkite/buildkite-agent-scaler -- name: ":lambda: Build Lambda" - key: build-lambda - depends_on: - - go-fmt - command: .buildkite/steps/build-lambda.sh - artifact_paths: - - handler.zip + - name: ":lambda: Build Lambda" + key: build-lambda + depends_on: + - go-fmt + command: .buildkite/steps/build-lambda.sh + artifact_paths: + - handler.zip -- label: ":s3: Pubish to S3 Branch Location" - key: s3-branch - depends_on: - - test - - build-lambda - command: .buildkite/steps/upload-to-s3.sh - plugins: - - aws-assume-role-with-web-identity: - role-arn: arn:aws:iam::032379705303:role/pipeline-buildkite-buildkite-agent-scaler + - label: ":s3: Pubish to S3 Branch Location" + key: s3-branch + depends_on: + - test + - build-lambda + command: .buildkite/steps/upload-to-s3.sh + plugins: + - aws-assume-role-with-web-identity: + role-arn: arn:aws:iam::032379705303:role/pipeline-buildkite-buildkite-agent-scaler -- if: build.tag =~ /^.+\$/ || build.env("RELEASE_DRY_RUN") == "true" - label: ":github: Draft GitHub Release" - key: github-release - depends_on: - - build-lambda - command: .buildkite/steps/github-release.sh - env: - BUILDKITE_AGENT_GIT_FETCH_FLAGS: -v --prune --tags - plugins: - - aws-assume-role-with-web-identity: - role-arn: arn:aws:iam::445615400570:role/pipeline-buildkite-buildkite-agent-scaler - - aws-ssm#v1.0.0: - parameters: - GITHUB_RELEASE_ACCESS_TOKEN: /pipelines/buildkite/buildkite-agent-scaler/GITHUB_RELEASE_ACCESS_TOKEN - - docker#v5.8.0: - image: alpine:3.18 - propagate-environment: true - mount-buildkite-agent: true - environment: - - GITHUB_RELEASE_ACCESS_TOKEN - - BUILDKITE_AGENT_ACCESS_TOKEN + - if: build.tag =~ /^.+\$/ || build.env("RELEASE_DRY_RUN") == "true" + label: ":github: Draft GitHub Release" + key: github-release + depends_on: + - build-lambda + command: .buildkite/steps/github-release.sh + env: + BUILDKITE_AGENT_GIT_FETCH_FLAGS: -v --prune --tags + plugins: + - aws-assume-role-with-web-identity: + role-arn: arn:aws:iam::445615400570:role/pipeline-buildkite-buildkite-agent-scaler + - aws-ssm#v1.0.0: + parameters: + GITHUB_RELEASE_ACCESS_TOKEN: /pipelines/buildkite/buildkite-agent-scaler/GITHUB_RELEASE_ACCESS_TOKEN + - docker#v5.8.0: + image: alpine:3.18 + propagate-environment: true + mount-buildkite-agent: true + environment: + - GITHUB_RELEASE_ACCESS_TOKEN + - BUILDKITE_AGENT_ACCESS_TOKEN -- if: build.tag =~ /^.+\$/ || build.env("RELEASE_DRY_RUN") == "true" - key: block-s3-release - block: ":rocket: Draft :github: Release Approved?" + - if: build.tag =~ /^.+\$/ || build.env("RELEASE_DRY_RUN") == "true" + key: block-s3-release + block: ":rocket: Draft :github: Release Approved?" -- if: build.tag =~ /^.+\$/ || build.env("RELEASE_DRY_RUN") == "true" - label: ":s3: Publish to S3 Release Location" - key: s3-release - depends_on: - - github-release - - block-s3-release - command: .buildkite/steps/upload-to-s3.sh release - plugins: - - aws-assume-role-with-web-identity: - role-arn: arn:aws:iam::032379705303:role/pipeline-buildkite-buildkite-agent-scaler + - if: build.tag =~ /^.+\$/ || build.env("RELEASE_DRY_RUN") == "true" + label: ":s3: Publish to S3 Release Location" + key: s3-release + depends_on: + - github-release + - block-s3-release + command: .buildkite/steps/upload-to-s3.sh release + plugins: + - aws-assume-role-with-web-identity: + role-arn: arn:aws:iam::032379705303:role/pipeline-buildkite-buildkite-agent-scaler diff --git a/Makefile b/Makefile index c9b69e1..a9896c6 100644 --- a/Makefile +++ b/Makefile @@ -36,7 +36,8 @@ bootstrap: lambda/main.go --user $(USER) \ --volume $(PWD):/app \ --workdir /app \ - --rm golang:1.19 \ + --rm \ + golang:1.22 \ go build -ldflags="$(LD_FLAGS)" -buildvcs="$(BUILDVSC_FLAG)" -tags lambda.norpc -o bootstrap ./lambda lambda-sync: handler.zip diff --git a/go.mod b/go.mod index 49528cd..96e49b8 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,10 @@ module github.com/buildkite/buildkite-agent-scaler -go 1.19 +go 1.22 require ( github.com/aws/aws-lambda-go v1.47.0 - github.com/aws/aws-sdk-go v1.54.14 + github.com/aws/aws-sdk-go v1.54.19 ) require github.com/jmespath/go-jmespath v0.4.0 // indirect diff --git a/go.sum b/go.sum index 23a03f6..5ba2149 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,10 @@ github.com/aws/aws-lambda-go v1.47.0 h1:0H8s0vumYx/YKs4sE7YM0ktwL2eWse+kfopsRI1sXVI= github.com/aws/aws-lambda-go v1.47.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A= -github.com/aws/aws-sdk-go v1.54.14 h1:llJ60MzLzovyDE/rEDbUjS1cICh7krk1PwQwNlKRoeQ= -github.com/aws/aws-sdk-go v1.54.14/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= +github.com/aws/aws-sdk-go v1.54.19 h1:tyWV+07jagrNiCcGRzRhdtVjQs7Vy41NwsuOcl0IbVI= +github.com/aws/aws-sdk-go v1.54.19/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= @@ -12,7 +13,9 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s= +github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 3b43b8d7c7748d124bb8f8571f7122691dfcb365 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Wed, 17 Jul 2024 13:46:19 +1000 Subject: [PATCH 14/16] Fix gotestsum install --- .buildkite/steps/tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.buildkite/steps/tests.sh b/.buildkite/steps/tests.sh index 1d7224c..6b85ef1 100755 --- a/.buildkite/steps/tests.sh +++ b/.buildkite/steps/tests.sh @@ -1,7 +1,7 @@ #!/bin/bash set -euo pipefail -GO111MODULE=off go get gotest.tools/gotestsum +go install gotest.tools/gotestsum@v1.12.0 echo '+++ Running tests' gotestsum --junitfile "junit-${OSTYPE}.xml" -- -count=1 -failfast ./... From 5a27c3ac42817f7c323615041232810ab37a3f34 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Wed, 17 Jul 2024 15:40:31 +1000 Subject: [PATCH 15/16] Fix #73 --- lambda/main.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/lambda/main.go b/lambda/main.go index 9b701a8..6e18222 100644 --- a/lambda/main.go +++ b/lambda/main.go @@ -42,8 +42,9 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { log.Printf("buildkite-agent-scaler version %s", version.VersionString()) var ( - timeout <-chan time.Time = make(chan time.Time) - interval time.Duration = 10 * time.Second + timeoutDuration time.Duration + timeout <-chan time.Time + interval = 10 * time.Second asgActivityTimeoutDuration = 10 * time.Second @@ -97,6 +98,7 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { if err != nil { return "", err } + timeoutDuration = d timeout = time.After(d) } @@ -293,10 +295,16 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { lastScaleIn = scaler.LastScaleIn() lastScaleOut = scaler.LastScaleOut() - log.Printf("Waiting for %v or timeout", interval) + logMsg := "Waiting for LAMBDA_INTERVAL (%v)" + if timeout != nil { + logMsg += " or timeout" + } + log.Printf(logMsg, interval) select { case <-timeout: + log.Printf("Exiting due to LAMBDA_TIMEOUT (%v)", timeoutDuration) return "", nil + case <-time.After(interval): // Continue } From 4f64f3c9acbacaadac33a2faeb964c1c6f23e29c Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Thu, 18 Jul 2024 16:47:41 +1000 Subject: [PATCH 16/16] Refactor environment variable loading --- lambda/env.go | 89 +++++++++++++++++++++++++++ lambda/main.go | 159 +++++++++---------------------------------------- 2 files changed, 117 insertions(+), 131 deletions(-) create mode 100644 lambda/env.go diff --git a/lambda/env.go b/lambda/env.go new file mode 100644 index 0000000..3f7ab3b --- /dev/null +++ b/lambda/env.go @@ -0,0 +1,89 @@ +package main + +import ( + "log" + "os" + "strconv" + "time" +) + +// RequireEnvString reads an environment variable, and if it is not set, calls +// [log.Fatalf]. +func RequireEnvString(name string) string { + v := os.Getenv(name) + if v == "" { + log.Fatalf("%s is required", name) + } + return v +} + +// RequireEnvInt reads an environment variable and parses it as a decimal int. +// If it is not set or does not parse, it calls [log.Fatalf]. +func RequireEnvInt(name string) int { + v := RequireEnvString(name) + i, err := strconv.Atoi(v) + if err != nil { + log.Fatalf("%s must be an integer: %v", name, err) + } + return i +} + +// EnvInt reads an environment variable, and if it is set, parses it with +// [strconv.Atoi].If it is not set, it returns def. If it does not parse, it calls +// [log.Fatalf]. +func EnvInt(name string, def int) int { + v := os.Getenv(name) + if v == "" { + return def + } + i, err := strconv.Atoi(v) + if err != nil { + log.Fatalf("%s must be an integer: %v", name, err) + } + return i +} + +// EnvDuration reads an environment variable, and if it is set, parses it as a +// [time.Duration]. If it is not set, it returns def. If parsing fails, it calls +// [log.Fatalf]. +func EnvDuration(name string, def time.Duration) time.Duration { + v := os.Getenv(name) + if v == "" { + return def + } + d, err := time.ParseDuration(v) + if err != nil { + log.Fatalf("%s must be a duration: %v", name, err) + } + return d +} + +// EnvFloat reads an environment variable, and if it is set, parses it using +// [strconv.ParseFloat]. If it is not set, it returns 0. If parsing fails, it +// calls [log.Fatalf]. +func EnvFloat(name string) float64 { + v := os.Getenv(name) + if v == "" { + return 0 + } + f, err := strconv.ParseFloat(v, 64) + if err != nil { + log.Fatalf("%s must be a number: %v", name, err) + } + return f +} + +// EnvBool reads an environment variable, and if it is set, parses it using +// [strconv.ParseBool]. If it is not set, it returns false. If parsing fails, it +// calls [log.Fatalf]. +func EnvBool(name string) bool { + v := os.Getenv(name) + if v == "" { + return false + } + b, err := strconv.ParseBool(v) + if err != nil { + log.Fatalf("%s must be a boolean: %v", name, err) + } + return b +} diff --git a/lambda/main.go b/lambda/main.go index 6e18222..e6bf1b8 100644 --- a/lambda/main.go +++ b/lambda/main.go @@ -4,11 +4,9 @@ import ( "context" "encoding/json" "errors" - "fmt" "log" "math" "os" - "strconv" "sync" "time" @@ -28,7 +26,7 @@ var ( ) func main() { - if os.Getenv("DEBUG") != "" { + if EnvBool("DEBUG") { _, err := Handler(context.Background(), json.RawMessage([]byte{})) if err != nil { log.Fatal(err) @@ -41,144 +39,43 @@ func main() { func Handler(ctx context.Context, evt json.RawMessage) (string, error) { log.Printf("buildkite-agent-scaler version %s", version.VersionString()) - var ( - timeoutDuration time.Duration - timeout <-chan time.Time - interval = 10 * time.Second - - asgActivityTimeoutDuration = 10 * time.Second - - scaleInCooldownPeriod time.Duration - scaleInFactor float64 - - scaleOutCooldownPeriod time.Duration - scaleOutFactor float64 - - instanceBuffer = 0 - - scaleOnlyAfterAllEvent bool - - includeWaiting bool - err error - - publishCloudWatchMetrics bool - disableScaleOut, disableScaleIn bool - ) - // Required environment variables - buildkiteQueue := os.Getenv("BUILDKITE_QUEUE") - if buildkiteQueue == "" { - return "", errors.New("BUILDKITE_QUEUE is required") - } - - asgName := os.Getenv("ASG_NAME") - if asgName == "" { - return "", errors.New("ASG_NAME is required") - } - - agentsPerInstanceStr := os.Getenv("AGENTS_PER_INSTANCE") - if agentsPerInstanceStr == "" { - return "", errors.New("AGENTS_PER_INSTANCE is required") - } - agentsPerInstance, err := strconv.Atoi(agentsPerInstanceStr) - if err != nil { - return "", fmt.Errorf("AGENTS_PER_INSTANCE must be an integer: %w", err) - } + buildkiteQueue := RequireEnvString("BUILDKITE_QUEUE") + asgName := RequireEnvString("ASG_NAME") + agentsPerInstance := RequireEnvInt("AGENTS_PER_INSTANCE") - if v := os.Getenv("LAMBDA_INTERVAL"); v != "" { - d, err := time.ParseDuration(v) - if err != nil { - return "", err - } - interval = d - } + // Optional environment variables (but they must parse correctly if set). + interval := EnvDuration("LAMBDA_INTERVAL", 10*time.Second) - if v := os.Getenv("LAMBDA_TIMEOUT"); v != "" { - d, err := time.ParseDuration(v) - if err != nil { - return "", err - } - timeoutDuration = d - timeout = time.After(d) + timeoutDuration := EnvDuration("LAMBDA_TIMEOUT", -1) + var timeout <-chan time.Time + if timeoutDuration >= 0 { + timeout = time.After(timeoutDuration) } - if v := os.Getenv("ASG_ACTIVITY_TIMEOUT"); v != "" { - d, err := time.ParseDuration(v) - if err != nil { - return "", err - } - asgActivityTimeoutDuration = d - } - - if v := os.Getenv("SCALE_IN_COOLDOWN_PERIOD"); v != "" { - p, err := time.ParseDuration(v) - if err != nil { - return "", err - } - scaleInCooldownPeriod = p - } - - if v := os.Getenv("SCALE_IN_FACTOR"); v != "" { - f, err := strconv.ParseFloat(v, 64) - if err != nil { - return "", err - } - scaleInFactor = math.Abs(f) - } - - if v := os.Getenv("SCALE_ONLY_AFTER_ALL_EVENT"); v != "" { - scaleOnlyAfterAllEvent = v == "true" || v == "1" - } - - if v := os.Getenv("SCALE_OUT_COOLDOWN_PERIOD"); v != "" { - p, err := time.ParseDuration(v) - if err != nil { - return "", err - } - scaleOutCooldownPeriod = p - } - - if v := os.Getenv("SCALE_OUT_FACTOR"); v != "" { - f, err := strconv.ParseFloat(v, 64) - if err != nil { - return "", err - } - scaleOutFactor = math.Abs(f) - } - - if v := os.Getenv("INCLUDE_WAITING"); v != "" { - includeWaiting = v == "true" || v == "1" - } - - if v := os.Getenv("INSTANCE_BUFFER"); v != "" { - i, err := strconv.Atoi(v) - if err != nil { - return "", err - } - instanceBuffer = i - } - - maxDescribeScalingActivitiesPages := -1 - if v := os.Getenv("MAX_DESCRIBE_SCALING_ACTIVITIES_PAGES"); v != "" { - maxDescribeScalingActivitiesPages, err = strconv.Atoi(v) - if err != nil { - fmt.Fprintf(os.Stderr, "Failed to parse MAX_DESCRIBE_SCALING_ACTIVITIES_PAGES: %v", err) - } - } + asgActivityTimeoutDuration := EnvDuration("ASG_ACTIVITY_TIMEOUT", 10*time.Second) + scaleInCooldownPeriod := EnvDuration("SCALE_IN_COOLDOWN_PERIOD", 0) + scaleInFactor := math.Abs(EnvFloat("SCALE_IN_FACTOR")) + scaleOutCooldownPeriod := EnvDuration("SCALE_OUT_COOLDOWN_PERIOD", 0) + scaleOutFactor := math.Abs(EnvFloat("SCALE_OUT_FACTOR")) + scaleOnlyAfterAllEvent := EnvBool("SCALE_ONLY_AFTER_ALL_EVENT") + includeWaiting := EnvBool("INCLUDE_WAITING") + instanceBuffer := EnvInt("INSTANCE_BUFFER", 0) + maxDescribeScalingActivitiesPages := EnvInt("MAX_DESCRIBE_SCALING_ACTIVITIES_PAGES", -1) - if m := os.Getenv("CLOUDWATCH_METRICS"); m == "true" || m == "1" { - log.Printf("Publishing cloudwatch metrics") - publishCloudWatchMetrics = true + publishCloudWatchMetrics := EnvBool("CLOUDWATCH_METRICS") + if publishCloudWatchMetrics { + log.Print("Publishing cloudwatch metrics") } - if m := os.Getenv("DISABLE_SCALE_IN"); m == "true" || m == "1" { - log.Printf("Disabling scale-in 🙅🏼‍") - disableScaleIn = true + disableScaleIn := EnvBool("DISABLE_SCALE_IN") + if disableScaleIn { + log.Print("Disabling scale-in 🙅🏼‍") } - if m := os.Getenv("DISABLE_SCALE_OUT"); m == "true" || m == "1" { - log.Printf("Disabling scale-out 🙅🏼‍♂️") - disableScaleOut = true + disableScaleOut := EnvBool("DISABLE_SCALE_OUT") + if disableScaleOut { + log.Print("Disabling scale-out 🙅🏼‍♂️") } // establish an AWS session to be re-used