Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that stack operation lock is always unlocked #892

Merged
merged 3 commits into from
Jun 23, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
**Bugs**

* Fixed a bug where multiple duplicate ECS services could be created by the CloudFormation backend, when using the `Custom::ECSService` resource [#884](https://github.com/remind101/empire/pull/884).
* Fixed a bug where the lock obtained during stack operations was not always unlocked. [#892](https://github.com/remind101/empire/pull/892)

**Performance**

Expand Down
101 changes: 59 additions & 42 deletions scheduler/cloudformation/cloudformation.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ type Scheduler struct {
// Any additional tags to add to stacks.
Tags []*cloudformation.Tag

// When true, alls to Submit won't return until the stack has
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/alls/calls

// successfully updated/created.
wait bool

// CloudFormation client for creating stacks.
cloudformation cloudformationClient

Expand Down Expand Up @@ -160,21 +164,13 @@ func (s *Scheduler) Submit(ctx context.Context, app *scheduler.App) error {

// SubmitOptions are options provided to SubmitWithOptions.
type SubmitOptions struct {
// Done is a channel that is sent on when the stack is fully created or
// updated.
Done chan error

// When true, does not make any changes to DNS. This is only used when
// migrating to this scheduler
NoDNS bool
}

// SubmitWithOptions submits (or updates) the CloudFormation stack for the app.
func (s *Scheduler) SubmitWithOptions(ctx context.Context, app *scheduler.App, opts SubmitOptions) error {
if opts.Done == nil {
opts.Done = make(chan error)
}

tx, err := s.db.Begin()
if err != nil {
return err
Expand Down Expand Up @@ -259,6 +255,7 @@ func (s *Scheduler) submit(ctx context.Context, tx *sql.Tx, app *scheduler.App,
})
}

done := make(chan error, 1)
_, err = s.cloudformation.DescribeStacks(&cloudformation.DescribeStacksInput{
StackName: aws.String(stackName),
})
Expand All @@ -268,7 +265,7 @@ func (s *Scheduler) submit(ctx context.Context, tx *sql.Tx, app *scheduler.App,
TemplateURL: aws.String(url),
Tags: tags,
Parameters: parameters,
}, opts.Done); err != nil {
}, done); err != nil {
return fmt.Errorf("error creating stack: %v", err)
}
} else if err == nil {
Expand All @@ -278,36 +275,59 @@ func (s *Scheduler) submit(ctx context.Context, tx *sql.Tx, app *scheduler.App,
Parameters: parameters,
// TODO: Update Go client
// Tags: tags,
}, opts.Done); err != nil {
return fmt.Errorf("error updating stack: %v", err)
}, done); err != nil {
return err
}
} else {
return fmt.Errorf("error describing stack: %v", err)
}

if s.wait {
if err := <-done; err != nil {
return err
}
}

return nil
}

// createStack creates a new CloudFormation stack with the given input. This
// function returns as soon as the stack creation has been submitted. It does
// not wait for the stack creation to complete.
func (s *Scheduler) createStack(input *cloudformation.CreateStackInput, done chan error) error {
p := func() error {
waiter := s.cloudformation.WaitUntilStackCreateComplete

submitted := make(chan error)
fn := func() error {
_, err := s.cloudformation.CreateStack(input)
submitted <- err
return err
}
waiter := s.cloudformation.WaitUntilStackCreateComplete
locked := make(chan struct{})
return s.performStackOperation(*input.StackName, p, waiter, locked, done)
go func() {
done <- s.performStackOperation(*input.StackName, fn, waiter)
}()

return <-submitted
}

// updateStack enqueues the stack update, and waits a set amount of time for the
// stack update to be submitted.
// updateStack updates an existing CloudFormation stack with the given input.
// If there are no other active updates, this function returns as soon as the
// stack update has been submitted. If there are other updates, the function
// returns after `lockTimeout` and the update continues in the background.
func (s *Scheduler) updateStack(input *cloudformation.UpdateStackInput, done chan error) error {
waiter := s.cloudformation.WaitUntilStackUpdateComplete

locked := make(chan struct{})
errCh := make(chan error)
submitted := make(chan error)
fn := func() error {
close(locked)
err := s.executeStackUpdate(input)
submitted <- err
return err
}

go func() {
p := func() error { return s.executeStackUpdate(input) }
waiter := s.cloudformation.WaitUntilStackUpdateComplete
errCh <- s.performStackOperation(*input.StackName, p, waiter, locked, done)
done <- s.performStackOperation(*input.StackName, fn, waiter)
}()

var err error
Expand All @@ -317,24 +337,26 @@ func (s *Scheduler) updateStack(input *cloudformation.UpdateStackInput, done cha
// around, so we return. But, if the stack update times out, or
// there's an error, that information is essentially silenced.
return nil
case err = <-errCh:
case err = <-submitted:
case <-locked:
// if a lock is obtained within the time frame, we might as well
// just wait for the update to get submitted.
err = <-errCh
err = <-submitted
}

return err
}

// performStackOperation enqueues the stack update/create. This function returns as soon
// as the stack update/create has been submitted.
// performStackOperation encapsulates the process of obtaining the stack
// operation lock, performing the stack operation, waiting for it to complete,
// then unlocking the stack operation lock.
//
// * If there are no operations currently in progress, the stack operation will execute.
// * If there is a currently active stack operation, the operation will be queued behind it.
//
// An error will be sent on the `done` channel when the stack operation completes.
func (s *Scheduler) performStackOperation(stackName string, fn func() error, waiter func(*cloudformation.DescribeStacksInput) error, locked chan struct{}, done chan error) error {
// * If there is a currently active stack operation, this operation will wait
// until the other stack operation has completed.
// * If there is another pending stack operation, it will be replaced by the new
// update.
func (s *Scheduler) performStackOperation(stackName string, fn func() error, waiter func(*cloudformation.DescribeStacksInput) error) error {
l, err := newAdvisoryLock(s.db, stackName)
if err != nil {
return err
Expand All @@ -356,12 +378,10 @@ func (s *Scheduler) performStackOperation(stackName string, fn func() error, wai
}
return fmt.Errorf("error obtaining stack operation lock %s: %v", stackName, err)
}

close(locked)
defer l.Unlock()

// Once the lock has been obtained, let's perform the stack operation.
err = fn()
if err != nil {
if err := fn(); err != nil {
return err
}

Expand All @@ -370,16 +390,14 @@ func (s *Scheduler) performStackOperation(stackName string, fn func() error, wai
StackName: aws.String(stackName),
})
}
// Start up a goroutine that will wait for this stack update to
// complete, and release the lock when it completes.
go s.waitUntilStackOperationComplete(l, wait, done)

return nil
// Wait until this stack operation has completed. The lock will be
// unlocked when this returns.
return s.waitUntilStackOperationComplete(l, wait)
}

// waitUntilStackOperationComplete waits until wait returns, or it times out. It
// also ensures that the advisory lock is released.
func (s *Scheduler) waitUntilStackOperationComplete(lock *pglock.AdvisoryLock, wait func() error, done chan error) {
// waitUntilStackOperationComplete waits until wait returns, or it times out.
func (s *Scheduler) waitUntilStackOperationComplete(lock *pglock.AdvisoryLock, wait func() error) error {
errCh := make(chan error)
go func() { errCh <- wait() }()

Expand All @@ -390,8 +408,7 @@ func (s *Scheduler) waitUntilStackOperationComplete(lock *pglock.AdvisoryLock, w
case err = <-errCh:
}

lock.Unlock()
done <- err
return err
}

// executeStackUpdate performs a stack update.
Expand Down
81 changes: 65 additions & 16 deletions scheduler/cloudformation/cloudformation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestScheduler_Submit_NewStack(t *testing.T) {
s := &Scheduler{
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
wait: true,
cloudformation: c,
s3: x,
db: db,
Expand Down Expand Up @@ -75,8 +76,7 @@ func TestScheduler_Submit_NewStack(t *testing.T) {
StackName: aws.String("acme-inc"),
}).Return(nil)

done := make(chan error)
err := s.SubmitWithOptions(context.Background(), &scheduler.App{
err := s.Submit(context.Background(), &scheduler.App{
ID: "c9366591-ab68-4d49-a333-95ce5a23df68",
Name: "acme-inc",
Processes: []*scheduler.Process{
Expand All @@ -85,13 +85,9 @@ func TestScheduler_Submit_NewStack(t *testing.T) {
Instances: 1,
},
},
}, SubmitOptions{
Done: done,
})
assert.NoError(t, err)

assert.NoError(t, <-done)

c.AssertExpectations(t)
x.AssertExpectations(t)
}
Expand All @@ -105,6 +101,7 @@ func TestScheduler_Submit_ExistingStack(t *testing.T) {
s := &Scheduler{
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
wait: true,
cloudformation: c,
s3: x,
db: db,
Expand Down Expand Up @@ -142,15 +139,64 @@ func TestScheduler_Submit_ExistingStack(t *testing.T) {
StackName: aws.String("acme-inc"),
}).Return(nil)

done := make(chan error)
err := s.SubmitWithOptions(context.Background(), &scheduler.App{
err := s.Submit(context.Background(), &scheduler.App{
ID: "c9366591-ab68-4d49-a333-95ce5a23df68",
Name: "acme-inc",
}, SubmitOptions{
Done: done,
})
assert.NoError(t, err)
assert.NoError(t, <-done)

c.AssertExpectations(t)
x.AssertExpectations(t)
}

func TestScheduler_Submit_UpdateError(t *testing.T) {
db := newDB(t)
defer db.Close()

x := new(mockS3Client)
c := new(mockCloudFormationClient)
s := &Scheduler{
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
wait: true,
cloudformation: c,
s3: x,
db: db,
}

x.On("PutObject", &s3.PutObjectInput{
Bucket: aws.String("bucket"),
Body: bytes.NewReader([]byte("{}")),
Key: aws.String("/acme-inc/c9366591-ab68-4d49-a333-95ce5a23df68/bf21a9e8fbc5a3846fb05b4fa0859e0917b2202f"),
ContentType: aws.String("application/json"),
}).Return(&s3.PutObjectOutput{}, nil)

c.On("ValidateTemplate", &cloudformation.ValidateTemplateInput{
TemplateURL: aws.String("https://bucket.s3.amazonaws.com/acme-inc/c9366591-ab68-4d49-a333-95ce5a23df68/bf21a9e8fbc5a3846fb05b4fa0859e0917b2202f"),
}).Return(&cloudformation.ValidateTemplateOutput{}, nil)

c.On("DescribeStacks", &cloudformation.DescribeStacksInput{
StackName: aws.String("acme-inc"),
}).Return(&cloudformation.DescribeStacksOutput{
Stacks: []*cloudformation.Stack{
{StackStatus: aws.String("CREATE_COMPLETE")},
},
}, nil)

c.On("UpdateStack", &cloudformation.UpdateStackInput{
StackName: aws.String("acme-inc"),
TemplateURL: aws.String("https://bucket.s3.amazonaws.com/acme-inc/c9366591-ab68-4d49-a333-95ce5a23df68/bf21a9e8fbc5a3846fb05b4fa0859e0917b2202f"),
Parameters: []*cloudformation.Parameter{
{ParameterKey: aws.String("DNS"), ParameterValue: aws.String("true")},
{ParameterKey: aws.String("RestartKey"), ParameterValue: aws.String("uuid")},
},
}).Return(&cloudformation.UpdateStackOutput{}, errors.New("stack update failed"))

err := s.Submit(context.Background(), &scheduler.App{
ID: "c9366591-ab68-4d49-a333-95ce5a23df68",
Name: "acme-inc",
})
assert.EqualError(t, err, `error updating stack: stack update failed`)

c.AssertExpectations(t)
x.AssertExpectations(t)
Expand All @@ -165,6 +211,7 @@ func TestScheduler_Submit_ExistingStack_RemovedProcess(t *testing.T) {
s := &Scheduler{
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
wait: true,
cloudformation: c,
s3: x,
db: db,
Expand Down Expand Up @@ -211,18 +258,14 @@ func TestScheduler_Submit_ExistingStack_RemovedProcess(t *testing.T) {
StackName: aws.String("acme-inc"),
}).Return(nil)

done := make(chan error)
err := s.SubmitWithOptions(context.Background(), &scheduler.App{
err := s.Submit(context.Background(), &scheduler.App{
ID: "c9366591-ab68-4d49-a333-95ce5a23df68",
Name: "acme-inc",
Processes: []*scheduler.Process{
{Type: "web", Instances: 1},
},
}, SubmitOptions{
Done: done,
})
assert.NoError(t, err)
assert.NoError(t, <-done)

c.AssertExpectations(t)
x.AssertExpectations(t)
Expand All @@ -237,6 +280,7 @@ func TestScheduler_Submit_TemplateTooLarge(t *testing.T) {
s := &Scheduler{
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
wait: true,
cloudformation: c,
s3: x,
db: db,
Expand Down Expand Up @@ -276,6 +320,7 @@ func TestScheduler_Remove(t *testing.T) {
s := &Scheduler{
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
wait: true,
cloudformation: c,
s3: x,
db: db,
Expand Down Expand Up @@ -319,6 +364,7 @@ func TestScheduler_Remove_NoCFStack(t *testing.T) {
s := &Scheduler{
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
wait: true,
cloudformation: c,
s3: x,
db: db,
Expand Down Expand Up @@ -347,6 +393,7 @@ func TestScheduler_Remove_NoDBStack_NoCFStack(t *testing.T) {
s := &Scheduler{
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
wait: true,
cloudformation: c,
s3: x,
db: db,
Expand All @@ -370,6 +417,7 @@ func TestScheduler_Instances(t *testing.T) {
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
Cluster: "cluster",
wait: true,
cloudformation: c,
s3: x,
ecs: e,
Expand Down Expand Up @@ -505,6 +553,7 @@ func TestScheduler_Instances_ManyTasks(t *testing.T) {
Template: template.Must(template.New("t").Parse("{}")),
Bucket: "bucket",
Cluster: "cluster",
wait: true,
cloudformation: c,
s3: x,
ecs: e,
Expand Down