From 4ff9629818699c04cb2095ebe0558be9eb390dec Mon Sep 17 00:00:00 2001 From: Amit Kumar Gupta Date: Sun, 27 Jul 2014 04:53:39 -0400 Subject: [PATCH] update app-manager to match AZ balance work in auction --- handler/handler.go | 11 ++++++---- handler/handler_test.go | 22 ++++++++++++++----- .../app_manager_runner/app_manager_runner.go | 5 +++++ integration/integration_test.go | 1 + main.go | 16 ++++++++++++-- .../start_message_builder.go | 6 ++++- .../start_message_builder_test.go | 4 +++- stop_message_builder/stop_message_builder.go | 22 +++++++++++++++++++ 8 files changed, 73 insertions(+), 14 deletions(-) create mode 100644 stop_message_builder/stop_message_builder.go diff --git a/handler/handler.go b/handler/handler.go index 33b1088..d240e4d 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/cloudfoundry-incubator/app-manager/start_message_builder" + "github.com/cloudfoundry-incubator/app-manager/stop_message_builder" "github.com/cloudfoundry-incubator/delta_force/delta_force" Bbs "github.com/cloudfoundry-incubator/runtime-schema/bbs" "github.com/cloudfoundry-incubator/runtime-schema/models" @@ -17,18 +18,21 @@ var ErrNoHealthCheckDefined = errors.New("no health check defined for stack") type Handler struct { bbs Bbs.AppManagerBBS startMessageBuilder *start_message_builder.StartMessageBuilder + stopMessageBuilder *stop_message_builder.StopMessageBuilder logger lager.Logger } func NewHandler( bbs Bbs.AppManagerBBS, startMessageBuilder *start_message_builder.StartMessageBuilder, + stopMessageBuilder *stop_message_builder.StopMessageBuilder, logger lager.Logger, ) Handler { handlerLogger := logger.Session("handler") return Handler{ bbs: bbs, startMessageBuilder: startMessageBuilder, + stopMessageBuilder: stopMessageBuilder, logger: handlerLogger, } } @@ -157,10 +161,9 @@ func (h Handler) processDesiredChange(desiredChange models.DesiredLRPChange) { "desired-app-message": desiredLRP, "stop-duplicate-index": indexToStopAllButOne, }) - err = h.bbs.RequestLRPStopAuction(models.LRPStopAuction{ - ProcessGuid: desiredLRP.ProcessGuid, - Index: indexToStopAllButOne, - }) + + stopMessage := h.stopMessageBuilder.Build(desiredLRP, indexToStopAllButOne) + err = h.bbs.RequestLRPStopAuction(stopMessage) if err != nil { changeLogger.Error("request-stop-auction-failed", err, lager.Data{ diff --git a/handler/handler_test.go b/handler/handler_test.go index d78a541..356b150 100644 --- a/handler/handler_test.go +++ b/handler/handler_test.go @@ -6,6 +6,7 @@ import ( . "github.com/cloudfoundry-incubator/app-manager/handler" "github.com/cloudfoundry-incubator/app-manager/start_message_builder" + "github.com/cloudfoundry-incubator/app-manager/stop_message_builder" "github.com/cloudfoundry-incubator/runtime-schema/bbs/fake_bbs" "github.com/cloudfoundry-incubator/runtime-schema/models" "github.com/cloudfoundry/storeadapter" @@ -20,9 +21,11 @@ import ( var _ = Describe("Handler", func() { var ( startMessageBuilder *start_message_builder.StartMessageBuilder + stopMessageBuilder *stop_message_builder.StopMessageBuilder bbs *fake_bbs.FakeAppManagerBBS logger *lagertest.TestLogger desiredLRP models.DesiredLRP + numAZs int repAddrRelativeToExecutor string healthChecks map[string]string @@ -32,6 +35,8 @@ var _ = Describe("Handler", func() { BeforeEach(func() { bbs = fake_bbs.NewFakeAppManagerBBS() + numAZs = 4 + repAddrRelativeToExecutor = "127.0.0.1:20515" healthChecks = map[string]string{ @@ -40,9 +45,10 @@ var _ = Describe("Handler", func() { logger = lagertest.NewTestLogger("test") - startMessageBuilder = start_message_builder.New(repAddrRelativeToExecutor, healthChecks, logger) + startMessageBuilder = start_message_builder.New(numAZs, repAddrRelativeToExecutor, healthChecks, logger) + stopMessageBuilder = stop_message_builder.New(numAZs) - handlerRunner := NewHandler(bbs, startMessageBuilder, logger) + handlerRunner := NewHandler(bbs, startMessageBuilder, stopMessageBuilder, logger) desiredLRP = models.DesiredLRP{ ProcessGuid: "the-app-guid-the-app-version", @@ -385,13 +391,17 @@ var _ = Describe("Handler", func() { stopAuctions := bbs.GetLRPStopAuctions() Ω(stopAuctions).Should(ContainElement(models.LRPStopAuction{ - ProcessGuid: "the-app-guid-the-app-version", - Index: 1, + ProcessGuid: "the-app-guid-the-app-version", + Index: 1, + NumInstances: desiredLRP.Instances, + NumAZs: numAZs, })) Ω(stopAuctions).Should(ContainElement(models.LRPStopAuction{ - ProcessGuid: "the-app-guid-the-app-version", - Index: 2, + ProcessGuid: "the-app-guid-the-app-version", + Index: 2, + NumInstances: desiredLRP.Instances, + NumAZs: numAZs, })) }) diff --git a/integration/app_manager_runner/app_manager_runner.go b/integration/app_manager_runner/app_manager_runner.go index eea409a..af246cb 100644 --- a/integration/app_manager_runner/app_manager_runner.go +++ b/integration/app_manager_runner/app_manager_runner.go @@ -3,6 +3,7 @@ package app_manager_runner import ( "encoding/json" "os/exec" + "strconv" "strings" "time" @@ -17,6 +18,7 @@ type AppManagerRunner struct { etcdCluster []string circuses map[string]string Session *gexec.Session + numAZs int repAddrRelativeToExecutor string } @@ -25,12 +27,14 @@ func New( appManagerBin string, etcdCluster []string, circuses map[string]string, + numAZs int, repAddrRelativeToExecutor string, ) *AppManagerRunner { return &AppManagerRunner{ appManagerBin: appManagerBin, etcdCluster: etcdCluster, circuses: circuses, + numAZs: numAZs, repAddrRelativeToExecutor: repAddrRelativeToExecutor, } @@ -50,6 +54,7 @@ func (r *AppManagerRunner) StartWithoutCheck() { r.appManagerBin, "-etcdCluster", strings.Join(r.etcdCluster, ","), "-circuses", string(circusesFlag), + "-numAZs", strconv.Itoa(r.numAZs), "-repAddrRelativeToExecutor", r.repAddrRelativeToExecutor, ), gexec.NewPrefixedWriter("\x1b[32m[o]\x1b[35m[app-manager]\x1b[0m ", ginkgo.GinkgoWriter), diff --git a/integration/integration_test.go b/integration/integration_test.go index 52bdb7f..81174f8 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -41,6 +41,7 @@ var _ = Describe("Starting apps", func() { appManagerPath, etcdRunner.NodeURLS(), map[string]string{"some-stack": "some-health-check.tar.gz"}, + 4, "127.0.0.1:20515", ) diff --git a/main.go b/main.go index 2cac313..a962159 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ import ( "github.com/cloudfoundry-incubator/app-manager/handler" "github.com/cloudfoundry-incubator/app-manager/start_message_builder" + "github.com/cloudfoundry-incubator/app-manager/stop_message_builder" ) var repAddrRelativeToExecutor = flag.String( @@ -38,9 +39,19 @@ var circuses = flag.String( "app lifecycle binary bundle mapping (stack => bundle filename in fileserver)", ) +var numAZs = flag.Int( + "numAZs", + -1, + "total number of AZs on which Executors are running", +) + func main() { flag.Parse() + if *numAZs < 0 { + panic("needs (non-negative) number of AZs") + } + logger := cf_lager.New("app-manager") bbs := initializeBbs(logger) @@ -51,10 +62,11 @@ func main() { logger.Fatal("invalid-health-checks", err) } - startMessageBuilder := start_message_builder.New(*repAddrRelativeToExecutor, circuseDownloadURLs, logger) + startMessageBuilder := start_message_builder.New(*numAZs, *repAddrRelativeToExecutor, circuseDownloadURLs, logger) + stopMessageBuilder := stop_message_builder.New(*numAZs) group := grouper.EnvokeGroup(grouper.RunGroup{ - "handler": handler.NewHandler(bbs, startMessageBuilder, logger), + "handler": handler.NewHandler(bbs, startMessageBuilder, stopMessageBuilder, logger), }) logger.Info("started") diff --git a/start_message_builder/start_message_builder.go b/start_message_builder/start_message_builder.go index af72242..0d5e4f3 100644 --- a/start_message_builder/start_message_builder.go +++ b/start_message_builder/start_message_builder.go @@ -19,13 +19,15 @@ import ( var ErrNoCircusDefined = errors.New("no lifecycle binary bundle defined for stack") type StartMessageBuilder struct { + numAZs int repAddrRelativeToExecutor string logger lager.Logger circuses map[string]string } -func New(repAddrRelativeToExecutor string, circuses map[string]string, logger lager.Logger) *StartMessageBuilder { +func New(numAZs int, repAddrRelativeToExecutor string, circuses map[string]string, logger lager.Logger) *StartMessageBuilder { return &StartMessageBuilder{ + numAZs: numAZs, repAddrRelativeToExecutor: repAddrRelativeToExecutor, circuses: circuses, logger: logger, @@ -84,6 +86,8 @@ func (b *StartMessageBuilder) Build(desiredLRP models.DesiredLRP, lrpIndex int, return models.LRPStartAuction{ ProcessGuid: lrpGuid, InstanceGuid: instanceGuid.String(), + NumInstances: desiredLRP.Instances, + NumAZs: b.numAZs, State: models.LRPStartAuctionStatePending, Index: lrpIndex, diff --git a/start_message_builder/start_message_builder_test.go b/start_message_builder/start_message_builder_test.go index 26f7a7e..8828d28 100644 --- a/start_message_builder/start_message_builder_test.go +++ b/start_message_builder/start_message_builder_test.go @@ -18,6 +18,7 @@ var _ = Describe("Start Message Builder", func() { desiredLRP models.DesiredLRP circuses map[string]string fileServerURL string + numAZs int ) BeforeEach(func() { @@ -27,7 +28,8 @@ var _ = Describe("Start Message Builder", func() { circuses = map[string]string{ "some-stack": "some-circus.tgz", } - builder = New(repAddrRelativeToExecutor, circuses, logger) + numAZs = 4 + builder = New(numAZs, repAddrRelativeToExecutor, circuses, logger) desiredLRP = models.DesiredLRP{ ProcessGuid: "the-app-guid-the-app-version", Source: "http://the-droplet.uri.com", diff --git a/stop_message_builder/stop_message_builder.go b/stop_message_builder/stop_message_builder.go new file mode 100644 index 0000000..eb9a4ad --- /dev/null +++ b/stop_message_builder/stop_message_builder.go @@ -0,0 +1,22 @@ +package stop_message_builder + +import "github.com/cloudfoundry-incubator/runtime-schema/models" + +type StopMessageBuilder struct { + numAZs int +} + +func New(numAZs int) *StopMessageBuilder { + return &StopMessageBuilder{ + numAZs: numAZs, + } +} + +func (b *StopMessageBuilder) Build(desiredLRP models.DesiredLRP, indexToStopAllButOne int) models.LRPStopAuction { + return models.LRPStopAuction{ + ProcessGuid: desiredLRP.ProcessGuid, + Index: indexToStopAllButOne, + NumInstances: desiredLRP.Instances, + NumAZs: b.numAZs, + } +}