From 7796d07434aa7245855f6f5c1791db749efdd2d1 Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Thu, 23 Jun 2016 10:49:19 +0700 Subject: [PATCH 1/3] Fix bug with lock not unlocking. --- scheduler/cloudformation/cloudformation.go | 101 ++++++++++-------- .../cloudformation/cloudformation_test.go | 81 +++++++++++--- 2 files changed, 124 insertions(+), 58 deletions(-) diff --git a/scheduler/cloudformation/cloudformation.go b/scheduler/cloudformation/cloudformation.go index 18af4bb21..9d52590eb 100644 --- a/scheduler/cloudformation/cloudformation.go +++ b/scheduler/cloudformation/cloudformation.go @@ -131,6 +131,10 @@ type Scheduler struct { // Any additional tags to add to stacks. Tags []*cloudformation.Tag + // When true, alls to Submit won't return until the stack has + // successfully updated/created. + wait bool + // CloudFormation client for creating stacks. cloudformation cloudformationClient @@ -160,10 +164,6 @@ func (s *Scheduler) Submit(ctx context.Context, app *scheduler.App) error { // SubmitOptions are options provided to SubmitWithOptions. type SubmitOptions struct { - // Done is a channel that is sent on when the stack is fully created or - // updated. - Done chan error - // When true, does not make any changes to DNS. This is only used when // migrating to this scheduler NoDNS bool @@ -171,10 +171,6 @@ type SubmitOptions struct { // SubmitWithOptions submits (or updates) the CloudFormation stack for the app. func (s *Scheduler) SubmitWithOptions(ctx context.Context, app *scheduler.App, opts SubmitOptions) error { - if opts.Done == nil { - opts.Done = make(chan error) - } - tx, err := s.db.Begin() if err != nil { return err @@ -259,6 +255,7 @@ func (s *Scheduler) submit(ctx context.Context, tx *sql.Tx, app *scheduler.App, }) } + done := make(chan error) _, err = s.cloudformation.DescribeStacks(&cloudformation.DescribeStacksInput{ StackName: aws.String(stackName), }) @@ -268,7 +265,7 @@ func (s *Scheduler) submit(ctx context.Context, tx *sql.Tx, app *scheduler.App, TemplateURL: aws.String(url), Tags: tags, Parameters: parameters, - }, opts.Done); err != nil { + }, done); err != nil { return fmt.Errorf("error creating stack: %v", err) } } else if err == nil { @@ -278,36 +275,59 @@ func (s *Scheduler) submit(ctx context.Context, tx *sql.Tx, app *scheduler.App, Parameters: parameters, // TODO: Update Go client // Tags: tags, - }, opts.Done); err != nil { - return fmt.Errorf("error updating stack: %v", err) + }, done); err != nil { + return err } } else { return fmt.Errorf("error describing stack: %v", err) } + if s.wait { + if err := <-done; err != nil { + return err + } + } + return nil } +// createStack creates a new CloudFormation stack with the given input. This +// function returns as soon as the stack creation has been submitted. It does +// not wait for the stack creation to complete. func (s *Scheduler) createStack(input *cloudformation.CreateStackInput, done chan error) error { - p := func() error { + waiter := s.cloudformation.WaitUntilStackCreateComplete + + submitted := make(chan error) + fn := func() error { _, err := s.cloudformation.CreateStack(input) + submitted <- err return err } - waiter := s.cloudformation.WaitUntilStackCreateComplete - locked := make(chan struct{}) - return s.performStackOperation(*input.StackName, p, waiter, locked, done) + go func() { + done <- s.performStackOperation(*input.StackName, fn, waiter) + }() + + return <-submitted } -// updateStack enqueues the stack update, and waits a set amount of time for the -// stack update to be submitted. +// updateStack updates an existing CloudFormation stack with the given input. +// If there are no other active updates, this function returns as soon as the +// stack update has been submitted. If there are other updates, the function +// returns after `lockTimeout` and the update continues in the background. func (s *Scheduler) updateStack(input *cloudformation.UpdateStackInput, done chan error) error { + waiter := s.cloudformation.WaitUntilStackUpdateComplete + locked := make(chan struct{}) - errCh := make(chan error) + submitted := make(chan error) + fn := func() error { + close(locked) + err := s.executeStackUpdate(input) + submitted <- err + return err + } go func() { - p := func() error { return s.executeStackUpdate(input) } - waiter := s.cloudformation.WaitUntilStackUpdateComplete - errCh <- s.performStackOperation(*input.StackName, p, waiter, locked, done) + done <- s.performStackOperation(*input.StackName, fn, waiter) }() var err error @@ -317,24 +337,26 @@ func (s *Scheduler) updateStack(input *cloudformation.UpdateStackInput, done cha // around, so we return. But, if the stack update times out, or // there's an error, that information is essentially silenced. return nil - case err = <-errCh: + case err = <-submitted: case <-locked: // if a lock is obtained within the time frame, we might as well // just wait for the update to get submitted. - err = <-errCh + err = <-submitted } return err } -// performStackOperation enqueues the stack update/create. This function returns as soon -// as the stack update/create has been submitted. +// performStackOperation encapsulates the process of obtaining the stack +// operation lock, performing the stack operation, waiting for it to complete, +// then unlocking the stack operation lock. // // * If there are no operations currently in progress, the stack operation will execute. -// * If there is a currently active stack operation, the operation will be queued behind it. -// -// An error will be sent on the `done` channel when the stack operation completes. -func (s *Scheduler) performStackOperation(stackName string, fn func() error, waiter func(*cloudformation.DescribeStacksInput) error, locked chan struct{}, done chan error) error { +// * If there is a currently active stack operation, this operation will wait +// until the other stack operation has completed. +// * If there is another pending stack operation, it will be replaced by the new +// update. +func (s *Scheduler) performStackOperation(stackName string, fn func() error, waiter func(*cloudformation.DescribeStacksInput) error) error { l, err := newAdvisoryLock(s.db, stackName) if err != nil { return err @@ -356,12 +378,10 @@ func (s *Scheduler) performStackOperation(stackName string, fn func() error, wai } return fmt.Errorf("error obtaining stack operation lock %s: %v", stackName, err) } - - close(locked) + defer l.Unlock() // Once the lock has been obtained, let's perform the stack operation. - err = fn() - if err != nil { + if err := fn(); err != nil { return err } @@ -370,16 +390,14 @@ func (s *Scheduler) performStackOperation(stackName string, fn func() error, wai StackName: aws.String(stackName), }) } - // Start up a goroutine that will wait for this stack update to - // complete, and release the lock when it completes. - go s.waitUntilStackOperationComplete(l, wait, done) - return nil + // Wait until this stack operation has completed. The lock will be + // unlocked when this returns. + return s.waitUntilStackOperationComplete(l, wait) } -// waitUntilStackOperationComplete waits until wait returns, or it times out. It -// also ensures that the advisory lock is released. -func (s *Scheduler) waitUntilStackOperationComplete(lock *pglock.AdvisoryLock, wait func() error, done chan error) { +// waitUntilStackOperationComplete waits until wait returns, or it times out. +func (s *Scheduler) waitUntilStackOperationComplete(lock *pglock.AdvisoryLock, wait func() error) error { errCh := make(chan error) go func() { errCh <- wait() }() @@ -390,8 +408,7 @@ func (s *Scheduler) waitUntilStackOperationComplete(lock *pglock.AdvisoryLock, w case err = <-errCh: } - lock.Unlock() - done <- err + return err } // executeStackUpdate performs a stack update. diff --git a/scheduler/cloudformation/cloudformation_test.go b/scheduler/cloudformation/cloudformation_test.go index 67847c9c5..ac0444ac3 100644 --- a/scheduler/cloudformation/cloudformation_test.go +++ b/scheduler/cloudformation/cloudformation_test.go @@ -37,6 +37,7 @@ func TestScheduler_Submit_NewStack(t *testing.T) { s := &Scheduler{ Template: template.Must(template.New("t").Parse("{}")), Bucket: "bucket", + wait: true, cloudformation: c, s3: x, db: db, @@ -75,8 +76,7 @@ func TestScheduler_Submit_NewStack(t *testing.T) { StackName: aws.String("acme-inc"), }).Return(nil) - done := make(chan error) - err := s.SubmitWithOptions(context.Background(), &scheduler.App{ + err := s.Submit(context.Background(), &scheduler.App{ ID: "c9366591-ab68-4d49-a333-95ce5a23df68", Name: "acme-inc", Processes: []*scheduler.Process{ @@ -85,13 +85,9 @@ func TestScheduler_Submit_NewStack(t *testing.T) { Instances: 1, }, }, - }, SubmitOptions{ - Done: done, }) assert.NoError(t, err) - assert.NoError(t, <-done) - c.AssertExpectations(t) x.AssertExpectations(t) } @@ -105,6 +101,7 @@ func TestScheduler_Submit_ExistingStack(t *testing.T) { s := &Scheduler{ Template: template.Must(template.New("t").Parse("{}")), Bucket: "bucket", + wait: true, cloudformation: c, s3: x, db: db, @@ -142,15 +139,64 @@ func TestScheduler_Submit_ExistingStack(t *testing.T) { StackName: aws.String("acme-inc"), }).Return(nil) - done := make(chan error) - err := s.SubmitWithOptions(context.Background(), &scheduler.App{ + err := s.Submit(context.Background(), &scheduler.App{ ID: "c9366591-ab68-4d49-a333-95ce5a23df68", Name: "acme-inc", - }, SubmitOptions{ - Done: done, }) assert.NoError(t, err) - assert.NoError(t, <-done) + + c.AssertExpectations(t) + x.AssertExpectations(t) +} + +func TestScheduler_Submit_UpdateError(t *testing.T) { + db := newDB(t) + defer db.Close() + + x := new(mockS3Client) + c := new(mockCloudFormationClient) + s := &Scheduler{ + Template: template.Must(template.New("t").Parse("{}")), + Bucket: "bucket", + wait: true, + cloudformation: c, + s3: x, + db: db, + } + + x.On("PutObject", &s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Body: bytes.NewReader([]byte("{}")), + Key: aws.String("/acme-inc/c9366591-ab68-4d49-a333-95ce5a23df68/bf21a9e8fbc5a3846fb05b4fa0859e0917b2202f"), + ContentType: aws.String("application/json"), + }).Return(&s3.PutObjectOutput{}, nil) + + c.On("ValidateTemplate", &cloudformation.ValidateTemplateInput{ + TemplateURL: aws.String("https://bucket.s3.amazonaws.com/acme-inc/c9366591-ab68-4d49-a333-95ce5a23df68/bf21a9e8fbc5a3846fb05b4fa0859e0917b2202f"), + }).Return(&cloudformation.ValidateTemplateOutput{}, nil) + + c.On("DescribeStacks", &cloudformation.DescribeStacksInput{ + StackName: aws.String("acme-inc"), + }).Return(&cloudformation.DescribeStacksOutput{ + Stacks: []*cloudformation.Stack{ + {StackStatus: aws.String("CREATE_COMPLETE")}, + }, + }, nil) + + c.On("UpdateStack", &cloudformation.UpdateStackInput{ + StackName: aws.String("acme-inc"), + TemplateURL: aws.String("https://bucket.s3.amazonaws.com/acme-inc/c9366591-ab68-4d49-a333-95ce5a23df68/bf21a9e8fbc5a3846fb05b4fa0859e0917b2202f"), + Parameters: []*cloudformation.Parameter{ + {ParameterKey: aws.String("DNS"), ParameterValue: aws.String("true")}, + {ParameterKey: aws.String("RestartKey"), ParameterValue: aws.String("uuid")}, + }, + }).Return(&cloudformation.UpdateStackOutput{}, errors.New("stack update failed")) + + err := s.Submit(context.Background(), &scheduler.App{ + ID: "c9366591-ab68-4d49-a333-95ce5a23df68", + Name: "acme-inc", + }) + assert.EqualError(t, err, `error updating stack: stack update failed`) c.AssertExpectations(t) x.AssertExpectations(t) @@ -165,6 +211,7 @@ func TestScheduler_Submit_ExistingStack_RemovedProcess(t *testing.T) { s := &Scheduler{ Template: template.Must(template.New("t").Parse("{}")), Bucket: "bucket", + wait: true, cloudformation: c, s3: x, db: db, @@ -211,18 +258,14 @@ func TestScheduler_Submit_ExistingStack_RemovedProcess(t *testing.T) { StackName: aws.String("acme-inc"), }).Return(nil) - done := make(chan error) - err := s.SubmitWithOptions(context.Background(), &scheduler.App{ + err := s.Submit(context.Background(), &scheduler.App{ ID: "c9366591-ab68-4d49-a333-95ce5a23df68", Name: "acme-inc", Processes: []*scheduler.Process{ {Type: "web", Instances: 1}, }, - }, SubmitOptions{ - Done: done, }) assert.NoError(t, err) - assert.NoError(t, <-done) c.AssertExpectations(t) x.AssertExpectations(t) @@ -237,6 +280,7 @@ func TestScheduler_Submit_TemplateTooLarge(t *testing.T) { s := &Scheduler{ Template: template.Must(template.New("t").Parse("{}")), Bucket: "bucket", + wait: true, cloudformation: c, s3: x, db: db, @@ -276,6 +320,7 @@ func TestScheduler_Remove(t *testing.T) { s := &Scheduler{ Template: template.Must(template.New("t").Parse("{}")), Bucket: "bucket", + wait: true, cloudformation: c, s3: x, db: db, @@ -319,6 +364,7 @@ func TestScheduler_Remove_NoCFStack(t *testing.T) { s := &Scheduler{ Template: template.Must(template.New("t").Parse("{}")), Bucket: "bucket", + wait: true, cloudformation: c, s3: x, db: db, @@ -347,6 +393,7 @@ func TestScheduler_Remove_NoDBStack_NoCFStack(t *testing.T) { s := &Scheduler{ Template: template.Must(template.New("t").Parse("{}")), Bucket: "bucket", + wait: true, cloudformation: c, s3: x, db: db, @@ -370,6 +417,7 @@ func TestScheduler_Instances(t *testing.T) { Template: template.Must(template.New("t").Parse("{}")), Bucket: "bucket", Cluster: "cluster", + wait: true, cloudformation: c, s3: x, ecs: e, @@ -505,6 +553,7 @@ func TestScheduler_Instances_ManyTasks(t *testing.T) { Template: template.Must(template.New("t").Parse("{}")), Bucket: "bucket", Cluster: "cluster", + wait: true, cloudformation: c, s3: x, ecs: e, From a82a2c9f17a8ce8bf30df0aaa873b5595ce09b76 Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Thu, 23 Jun 2016 12:23:18 +0700 Subject: [PATCH 2/3] Update CHANGELOG. --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 51948cdcd..df3d205c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ **Bugs** * Fixed a bug where multiple duplicate ECS services could be created by the CloudFormation backend, when using the `Custom::ECSService` resource [#884](https://github.com/remind101/empire/pull/884). +* Fixed a bug where the lock obtained during stack operations was not always unlocked. [#892](https://github.com/remind101/empire/pull/892) **Performance** From eb06e5b70a63a56a3d9332e605c657a0706eabdc Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Fri, 24 Jun 2016 05:35:33 +0700 Subject: [PATCH 3/3] Buffer the `done` channel so that goroutine always exits. --- scheduler/cloudformation/cloudformation.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler/cloudformation/cloudformation.go b/scheduler/cloudformation/cloudformation.go index 9d52590eb..324261198 100644 --- a/scheduler/cloudformation/cloudformation.go +++ b/scheduler/cloudformation/cloudformation.go @@ -255,7 +255,7 @@ func (s *Scheduler) submit(ctx context.Context, tx *sql.Tx, app *scheduler.App, }) } - done := make(chan error) + done := make(chan error, 1) _, err = s.cloudformation.DescribeStacks(&cloudformation.DescribeStacksInput{ StackName: aws.String(stackName), })