Skip to content

Commit

Permalink
fix: recover in-progress operations on broker restart (#983)
Browse files Browse the repository at this point in the history
- previously if an operation was in progress when the broker terminated
  it would forever be recorded as "in progress" in the broker database
  and was hard to clean up
- now on startup, the broker marks any in-progress operations as failed
- this allows the service instance to be cleaned up without intervention
  from a CloudFoundry administrator

[#178142867](https://www.pivotaltracker.com/story/show/178142867)
  • Loading branch information
blgm authored Mar 27, 2024
1 parent 92921cb commit c845267
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 19 deletions.
6 changes: 4 additions & 2 deletions dbservice/dbservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import (
"sync"

"code.cloudfoundry.org/lager/v3"
"gorm.io/gorm"

_ "gorm.io/driver/sqlite"
"gorm.io/gorm"
)

var once sync.Once
Expand All @@ -35,6 +34,9 @@ func New(logger lager.Logger) *gorm.DB {
if err := RunMigrations(db); err != nil {
panic(fmt.Sprintf("Error migrating database: %s", err))
}
if err := recoverInProgressOperations(db, logger); err != nil {
panic(fmt.Sprintf("Error recovering in-progress operations: %s", err))
}
})
return db
}
24 changes: 24 additions & 0 deletions dbservice/recover_in_progress_operations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package dbservice

import (
"code.cloudfoundry.org/lager/v3"
"github.com/cloudfoundry/cloud-service-broker/dbservice/models"
"gorm.io/gorm"
)

func recoverInProgressOperations(db *gorm.DB, logger lager.Logger) error {
logger = logger.Session("recover-in-progress-operations")

var terraformDeploymentBatch []models.TerraformDeployment
result := db.Where("last_operation_state = ?", "in progress").FindInBatches(&terraformDeploymentBatch, 100, func(tx *gorm.DB, batchNumber int) error {
for i := range terraformDeploymentBatch {
terraformDeploymentBatch[i].LastOperationState = "failed"
terraformDeploymentBatch[i].LastOperationMessage = "the broker restarted while the operation was in progress"
logger.Info("mark-as-failed", lager.Data{"workspace_id": terraformDeploymentBatch[i].ID})
}

return tx.Save(&terraformDeploymentBatch).Error
})

return result.Error
}
83 changes: 83 additions & 0 deletions dbservice/recover_in_progress_operations_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package dbservice

import (
"strings"
"testing"

"code.cloudfoundry.org/lager/v3/lagertest"
"github.com/cloudfoundry/cloud-service-broker/dbservice/models"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)

func TestRecoverInProgressOperations(t *testing.T) {
// Setup
db, err := gorm.Open(sqlite.Open(":memory:"), nil)
if err != nil {
t.Errorf("failed to create test database: %s", err)
}

if err = db.Migrator().CreateTable(&models.TerraformDeployment{}); err != nil {
t.Errorf("failed to create test table: %s", err)
}

const recoverID = "fake-id-to-recover"
err = db.Create(&models.TerraformDeployment{
ID: recoverID,
LastOperationType: "fake-type",
LastOperationState: "in progress",
LastOperationMessage: "fake-type in progress",
}).Error
if err != nil {
t.Errorf("failed to create test database data: %s", err)
}
const okID = "fake-id-that-does-not-need-to-be-recovered"
err = db.Create(&models.TerraformDeployment{
ID: okID,
LastOperationType: "fake-type",
LastOperationState: "succeeded",
LastOperationMessage: "fake-type succeeded",
}).Error
if err != nil {
t.Errorf("failed to create test database data: %s", err)
}

// Call the function
logger := lagertest.NewTestLogger("test")
recoverInProgressOperations(db, logger)

// It marks the in-progress operation as failed
var r1 models.TerraformDeployment
err = db.Where("id = ?", recoverID).First(&r1).Error
if err != nil {
t.Errorf("failed to load updated test data: %s", err)
}

const expState = "failed"
if r1.LastOperationState != expState {
t.Errorf("LastOperationState, expected %q, got %q", expState, r1.LastOperationState)
}

const expMessage = "the broker restarted while the operation was in progress"
if r1.LastOperationMessage != expMessage {
t.Errorf("LastOperationMessage, expected %q, got %q", expMessage, r1.LastOperationMessage)
}

// It does not update other operations
var r2 models.TerraformDeployment
err = db.Where("id = ?", okID).First(&r2).Error
if err != nil {
t.Errorf("failed to load updated test data: %s", err)
}
if r2.LastOperationState != "succeeded" || r2.LastOperationMessage != "fake-type succeeded" {
t.Error("row corruption")
}

// It logs the expected message
const expLog1 = `"message":"test.recover-in-progress-operations.mark-as-failed"`
const expLog2 = `"workspace_id":"fake-id-to-recover"`
logMessage := string(logger.Buffer().Contents())
if !strings.Contains(logMessage, expLog1) || !strings.Contains(logMessage, expLog2) {
t.Errorf("log, expected to contain %q and %q, got %q", expLog1, expLog2, logMessage)
}
}
46 changes: 29 additions & 17 deletions integrationtest/termination_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var _ = Describe("Recovery From Broker Termination", func() {
})
})

It("cannot recover from a terminated create", func() {
It("can recover from a terminated create", func() {
By("starting to provision")
instanceGUID := uuid.New()
response := broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.New(), nil)
Expand All @@ -48,24 +48,28 @@ var _ = Describe("Recovery From Broker Termination", func() {

By("terminating and restarting the broker")
Expect(broker.Stop()).To(Succeed())
broker = must(testdrive.StartBroker(csb, brokerpak, database))
broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr)))

By("INCORRECTLY reporting that an operation is still in progress")
By("reporting that an operation failed")
lastOperation, err := broker.LastOperation(instanceGUID)
Expect(err).NotTo(HaveOccurred())
Expect(lastOperation.Description).To(Equal("provision in progress"))
Expect(lastOperation.State).To(BeEquivalentTo("in progress"))
Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress"))
Expect(lastOperation.State).To(BeEquivalentTo("failed"))

By("logging a message")
ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instanceGUID)
Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws)))

// OSBAPI requires that HTTP 409 (Conflict) is returned
By("refusing to allow a duplicate instance")
response = broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.New(), nil)
Expect(response.Error).NotTo(HaveOccurred())
Expect(response.StatusCode).To(Equal(http.StatusConflict))

By("INCORRECTLY failing to allow the instance to be cleaned up")
By("allowing the instance to be cleaned up")
response = broker.Client.Deprovision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.New())
Expect(response.Error).NotTo(HaveOccurred())
Expect(response.StatusCode).To(Equal(http.StatusInternalServerError))
Expect(response.StatusCode).To(Equal(http.StatusOK))
})

It("can recover from a terminated update", func() {
Expand All @@ -80,13 +84,17 @@ var _ = Describe("Recovery From Broker Termination", func() {

By("terminating and restarting the broker")
Expect(broker.Stop()).To(Succeed())
broker = must(testdrive.StartBroker(csb, brokerpak, database))
broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr)))

By("INCORRECTLY reporting that an operation is still in progress")
By("reporting that an operation failed")
lastOperation, err := broker.LastOperation(instance.GUID)
Expect(err).NotTo(HaveOccurred())
Expect(lastOperation.Description).To(Equal("update in progress"))
Expect(lastOperation.State).To(BeEquivalentTo("in progress"))
Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress"))
Expect(lastOperation.State).To(BeEquivalentTo("failed"))

By("logging a message")
ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instance.GUID)
Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws)))

By("allowing the operation to be restarted")
Expect(broker.UpdateService(instance)).To(Succeed())
Expand All @@ -104,13 +112,17 @@ var _ = Describe("Recovery From Broker Termination", func() {

By("terminating and restarting the broker")
Expect(broker.Stop()).To(Succeed())
broker = must(testdrive.StartBroker(csb, brokerpak, database))
broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr)))

By("INCORRECTLY reporting that an operation is still in progress")
By("reporting that an operation failed")
lastOperation, err := broker.LastOperation(instance.GUID)
Expect(err).NotTo(HaveOccurred())
Expect(lastOperation.Description).To(Equal("deprovision in progress"))
Expect(lastOperation.State).To(BeEquivalentTo("in progress"))
Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress"))
Expect(lastOperation.State).To(BeEquivalentTo("failed"))

By("logging a message")
ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instance.GUID)
Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws)))

By("allowing the operation to be restarted")
Expect(broker.Deprovision(instance)).To(Succeed())
Expand All @@ -129,7 +141,7 @@ var _ = Describe("Recovery From Broker Termination", func() {

By("terminating and restarting the broker")
Expect(broker.Stop()).To(Succeed())
broker = must(testdrive.StartBroker(csb, brokerpak, database))
broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr)))

By("allowing the operation to be restarted")
_, err = broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID))
Expand All @@ -152,7 +164,7 @@ var _ = Describe("Recovery From Broker Termination", func() {

By("terminating and restarting the broker")
Expect(broker.Stop()).To(Succeed())
broker = must(testdrive.StartBroker(csb, brokerpak, database))
broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr)))

By("allowing the operation to be restarted")
Expect(broker.DeleteBinding(instance, bindingGUID)).To(Succeed())
Expand Down

0 comments on commit c845267

Please sign in to comment.