Skip to content

Commit

Permalink
Merge pull request #892 from remind101/lock-bug
Browse files Browse the repository at this point in the history
Ensure that stack operation lock is always unlocked
  • Loading branch information
ejholmes authored Jun 23, 2016
2 parents 375176a + eb06e5b commit 4aab678
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
**Bugs**

* Fixed a bug where multiple duplicate ECS services could be created by the CloudFormation backend, when using the `Custom::ECSService` resource [#884](https://github.com/remind101/empire/pull/884).
* Fixed a bug where the lock obtained during stack operations was not always unlocked. [#892](https://github.com/remind101/empire/pull/892)

**Performance**

Expand Down
101 changes: 59 additions & 42 deletions scheduler/cloudformation/cloudformation.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ type Scheduler struct {
// Any additional tags to add to stacks.
Tags []*cloudformation.Tag

// When true, alls to Submit won't return until the stack has
// successfully updated/created.
wait bool

// CloudFormation client for creating stacks.
cloudformation cloudformationClient

Expand Down Expand Up @@ -160,21 +164,13 @@ func (s *Scheduler) Submit(ctx context.Context, app *scheduler.App) error {

// SubmitOptions are options provided to SubmitWithOptions.
type SubmitOptions struct {
// Done is a channel that is sent on when the stack is fully created or
// updated.
Done chan error

// When true, does not make any changes to DNS. This is only used when
// migrating to this scheduler
NoDNS bool
}

// SubmitWithOptions submits (or updates) the CloudFormation stack for the app.
func (s *Scheduler) SubmitWithOptions(ctx context.Context, app *scheduler.App, opts SubmitOptions) error {
if opts.Done == nil {
opts.Done = make(chan error)
}

tx, err := s.db.Begin()
if err != nil {
return err
Expand Down Expand Up @@ -259,6 +255,7 @@ func (s *Scheduler) submit(ctx context.Context, tx *sql.Tx, app *scheduler.App,
})
}

done := make(chan error, 1)
_, err = s.cloudformation.DescribeStacks(&cloudformation.DescribeStacksInput{
StackName: aws.String(stackName),
})
Expand All @@ -268,7 +265,7 @@ func (s *Scheduler) submit(ctx context.Context, tx *sql.Tx, app *scheduler.App,
TemplateURL: aws.String(url),
Tags: tags,
Parameters: parameters,
}, opts.Done); err != nil {
}, done); err != nil {
return fmt.Errorf("error creating stack: %v", err)
}
} else if err == nil {
Expand All @@ -278,36 +275,59 @@ func (s *Scheduler) submit(ctx context.Context, tx *sql.Tx, app *scheduler.App,
Parameters: parameters,
// TODO: Update Go client
// Tags: tags,
}, opts.Done); err != nil {
return fmt.Errorf("error updating stack: %v", err)
}, done); err != nil {
return err
}
} else {
return fmt.Errorf("error describing stack: %v", err)
}

if s.wait {
if err := <-done; err != nil {
return err
}
}

return nil
}

// createStack creates a new CloudFormation stack with the given input. This
// function returns as soon as the stack creation has been submitted. It does
// not wait for the stack creation to complete.
func (s *Scheduler) createStack(input *cloudformation.CreateStackInput, done chan error) error {
p := func() error {
waiter := s.cloudformation.WaitUntilStackCreateComplete

submitted := make(chan error)
fn := func() error {
_, err := s.cloudformation.CreateStack(input)
submitted <- err
return err
}
waiter := s.cloudformation.WaitUntilStackCreateComplete
locked := make(chan struct{})
return s.performStackOperation(*input.StackName, p, waiter, locked, done)
go func() {
done <- s.performStackOperation(*input.StackName, fn, waiter)
}()

return <-submitted
}

// updateStack enqueues the stack update, and waits a set amount of time for the
// stack update to be submitted.
// updateStack updates an existing CloudFormation stack with the given input.
// If there are no other active updates, this function returns as soon as the
// stack update has been submitted. If there are other updates, the function
// returns after `lockTimeout` and the update continues in the background.
func (s *Scheduler) updateStack(input *cloudformation.UpdateStackInput, done chan error) error {
waiter := s.cloudformation.WaitUntilStackUpdateComplete

locked := make(chan struct{})
errCh := make(chan error)
submitted := make(chan error)
fn := func() error {
close(locked)
err := s.executeStackUpdate(input)
submitted <- err
return err
}

go func() {
p := func() error { return s.executeStackUpdate(input) }
waiter := s.cloudformation.WaitUntilStackUpdateComplete
errCh <- s.performStackOperation(*input.StackName, p, waiter, locked, done)
done <- s.performStackOperation(*input.StackName, fn, waiter)
}()

var err error
Expand All @@ -317,24 +337,26 @@ func (s *Scheduler) updateStack(input *cloudformation.UpdateStackInput, done cha
// around, so we return. But, if the stack update times out, or
// there's an error, that information is essentially silenced.
return nil
case err = <-errCh:
case err = <-submitted:
case <-locked:
// if a lock is obtained within the time frame, we might as well
// just wait for the update to get submitted.
err = <-errCh
err = <-submitted
}

return err
}

// performStackOperation enqueues the stack update/create. This function returns as soon
// as the stack update/create has been submitted.
// performStackOperation encapsulates the process of obtaining the stack
// operation lock, performing the stack operation, waiting for it to complete,
// then unlocking the stack operation lock.
//
// * If there are no operations currently in progress, the stack operation will execute.
// * If there is a currently active stack operation, the operation will be queued behind it.
//
// An error will be sent on the `done` channel when the stack operation completes.
func (s *Scheduler) performStackOperation(stackName string, fn func() error, waiter func(*cloudformation.DescribeStacksInput) error, locked chan struct{}, done chan error) error {
// * If there is a currently active stack operation, this operation will wait
// until the other stack operation has completed.
// * If there is another pending stack operation, it will be replaced by the new
// update.
func (s *Scheduler) performStackOperation(stackName string, fn func() error, waiter func(*cloudformation.DescribeStacksInput) error) error {
l, err := newAdvisoryLock(s.db, stackName)
if err != nil {
return err
Expand All @@ -356,12 +378,10 @@ func (s *Scheduler) performStackOperation(stackName string, fn func() error, wai
}
return fmt.Errorf("error obtaining stack operation lock %s: %v", stackName, err)
}

close(locked)
defer l.Unlock()

// Once the lock has been obtained, let's perform the stack operation.
err = fn()
if err != nil {
if err := fn(); err != nil {
return err
}

Expand All @@ -370,16 +390,14 @@ func (s *Scheduler) performStackOperation(stackName string, fn func() error, wai
StackName: aws.String(stackName),
})
}
// Start up a goroutine that will wait for this stack update to
// complete, and release the lock when it completes.
go s.waitUntilStackOperationComplete(l, wait, done)

return nil
// Wait until this stack operation has completed. The lock will be
// unlocked when this returns.
return s.waitUntilStackOperationComplete(l, wait)
}

// waitUntilStackOperationComplete waits until wait returns, or it times out. It
// also ensures that the advisory lock is released.
func (s *Scheduler) waitUntilStackOperationComplete(lock *pglock.AdvisoryLock, wait func() error, done chan error) {
// waitUntilStackOperationComplete waits until wait returns, or it times out.
func (s *Scheduler) waitUntilStackOperationComplete(lock *pglock.AdvisoryLock, wait func() error) error {
errCh := make(chan error)
go func() { errCh <- wait() }()

Expand All @@ -390,8 +408,7 @@ func (s *Scheduler) waitUntilStackOperationComplete(lock *pglock.AdvisoryLock, w
case err = <-errCh:
}

lock.Unlock()
done <- err
return err
}

// executeStackUpdate performs a stack update.
Expand Down
81 changes: 65 additions & 16 deletions scheduler/cloudformation/cloudformation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestScheduler_Submit_NewStack(t *testing.T) {
s := &Scheduler{
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
wait: true,
cloudformation: c,
s3: x,
db: db,
Expand Down Expand Up @@ -75,8 +76,7 @@ func TestScheduler_Submit_NewStack(t *testing.T) {
StackName: aws.String("acme-inc"),
}).Return(nil)

done := make(chan error)
err := s.SubmitWithOptions(context.Background(), &scheduler.App{
err := s.Submit(context.Background(), &scheduler.App{
ID: "c9366591-ab68-4d49-a333-95ce5a23df68",
Name: "acme-inc",
Processes: []*scheduler.Process{
Expand All @@ -85,13 +85,9 @@ func TestScheduler_Submit_NewStack(t *testing.T) {
Instances: 1,
},
},
}, SubmitOptions{
Done: done,
})
assert.NoError(t, err)

assert.NoError(t, <-done)

c.AssertExpectations(t)
x.AssertExpectations(t)
}
Expand All @@ -105,6 +101,7 @@ func TestScheduler_Submit_ExistingStack(t *testing.T) {
s := &Scheduler{
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
wait: true,
cloudformation: c,
s3: x,
db: db,
Expand Down Expand Up @@ -142,15 +139,64 @@ func TestScheduler_Submit_ExistingStack(t *testing.T) {
StackName: aws.String("acme-inc"),
}).Return(nil)

done := make(chan error)
err := s.SubmitWithOptions(context.Background(), &scheduler.App{
err := s.Submit(context.Background(), &scheduler.App{
ID: "c9366591-ab68-4d49-a333-95ce5a23df68",
Name: "acme-inc",
}, SubmitOptions{
Done: done,
})
assert.NoError(t, err)
assert.NoError(t, <-done)

c.AssertExpectations(t)
x.AssertExpectations(t)
}

func TestScheduler_Submit_UpdateError(t *testing.T) {
db := newDB(t)
defer db.Close()

x := new(mockS3Client)
c := new(mockCloudFormationClient)
s := &Scheduler{
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
wait: true,
cloudformation: c,
s3: x,
db: db,
}

x.On("PutObject", &s3.PutObjectInput{
Bucket: aws.String("bucket"),
Body: bytes.NewReader([]byte("{}")),
Key: aws.String("/acme-inc/c9366591-ab68-4d49-a333-95ce5a23df68/bf21a9e8fbc5a3846fb05b4fa0859e0917b2202f"),
ContentType: aws.String("application/json"),
}).Return(&s3.PutObjectOutput{}, nil)

c.On("ValidateTemplate", &cloudformation.ValidateTemplateInput{
TemplateURL: aws.String("https://bucket.s3.amazonaws.com/acme-inc/c9366591-ab68-4d49-a333-95ce5a23df68/bf21a9e8fbc5a3846fb05b4fa0859e0917b2202f"),
}).Return(&cloudformation.ValidateTemplateOutput{}, nil)

c.On("DescribeStacks", &cloudformation.DescribeStacksInput{
StackName: aws.String("acme-inc"),
}).Return(&cloudformation.DescribeStacksOutput{
Stacks: []*cloudformation.Stack{
{StackStatus: aws.String("CREATE_COMPLETE")},
},
}, nil)

c.On("UpdateStack", &cloudformation.UpdateStackInput{
StackName: aws.String("acme-inc"),
TemplateURL: aws.String("https://bucket.s3.amazonaws.com/acme-inc/c9366591-ab68-4d49-a333-95ce5a23df68/bf21a9e8fbc5a3846fb05b4fa0859e0917b2202f"),
Parameters: []*cloudformation.Parameter{
{ParameterKey: aws.String("DNS"), ParameterValue: aws.String("true")},
{ParameterKey: aws.String("RestartKey"), ParameterValue: aws.String("uuid")},
},
}).Return(&cloudformation.UpdateStackOutput{}, errors.New("stack update failed"))

err := s.Submit(context.Background(), &scheduler.App{
ID: "c9366591-ab68-4d49-a333-95ce5a23df68",
Name: "acme-inc",
})
assert.EqualError(t, err, `error updating stack: stack update failed`)

c.AssertExpectations(t)
x.AssertExpectations(t)
Expand All @@ -165,6 +211,7 @@ func TestScheduler_Submit_ExistingStack_RemovedProcess(t *testing.T) {
s := &Scheduler{
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
wait: true,
cloudformation: c,
s3: x,
db: db,
Expand Down Expand Up @@ -211,18 +258,14 @@ func TestScheduler_Submit_ExistingStack_RemovedProcess(t *testing.T) {
StackName: aws.String("acme-inc"),
}).Return(nil)

done := make(chan error)
err := s.SubmitWithOptions(context.Background(), &scheduler.App{
err := s.Submit(context.Background(), &scheduler.App{
ID: "c9366591-ab68-4d49-a333-95ce5a23df68",
Name: "acme-inc",
Processes: []*scheduler.Process{
{Type: "web", Instances: 1},
},
}, SubmitOptions{
Done: done,
})
assert.NoError(t, err)
assert.NoError(t, <-done)

c.AssertExpectations(t)
x.AssertExpectations(t)
Expand All @@ -237,6 +280,7 @@ func TestScheduler_Submit_TemplateTooLarge(t *testing.T) {
s := &Scheduler{
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
wait: true,
cloudformation: c,
s3: x,
db: db,
Expand Down Expand Up @@ -276,6 +320,7 @@ func TestScheduler_Remove(t *testing.T) {
s := &Scheduler{
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
wait: true,
cloudformation: c,
s3: x,
db: db,
Expand Down Expand Up @@ -319,6 +364,7 @@ func TestScheduler_Remove_NoCFStack(t *testing.T) {
s := &Scheduler{
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
wait: true,
cloudformation: c,
s3: x,
db: db,
Expand Down Expand Up @@ -347,6 +393,7 @@ func TestScheduler_Remove_NoDBStack_NoCFStack(t *testing.T) {
s := &Scheduler{
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
wait: true,
cloudformation: c,
s3: x,
db: db,
Expand All @@ -370,6 +417,7 @@ func TestScheduler_Instances(t *testing.T) {
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
Cluster: "cluster",
wait: true,
cloudformation: c,
s3: x,
ecs: e,
Expand Down Expand Up @@ -505,6 +553,7 @@ func TestScheduler_Instances_ManyTasks(t *testing.T) {
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
Cluster: "cluster",
wait: true,
cloudformation: c,
s3: x,
ecs: e,
Expand Down

0 comments on commit 4aab678

Please sign in to comment.