From c8452677ef3d141efd150c5adbd4392377e2439c Mon Sep 17 00:00:00 2001 From: George Blue Date: Wed, 27 Mar 2024 15:01:49 +0000 Subject: [PATCH] fix: recover in-progress operations on broker restart (#983) - previously if an operation was in progress when the broker terminated it would forever be recorded as "in progress" in the broker database and was hard to clean up - now on startup, the broker marks any in-progress operations as failed - this allows the service instance to be cleaned up without intervention from a CloudFoundry administrator [#178142867](https://www.pivotaltracker.com/story/show/178142867) --- dbservice/dbservice.go | 6 +- dbservice/recover_in_progress_operations.go | 24 ++++++ .../recover_in_progress_operations_test.go | 83 +++++++++++++++++++ integrationtest/termination_recovery_test.go | 46 ++++++---- 4 files changed, 140 insertions(+), 19 deletions(-) create mode 100644 dbservice/recover_in_progress_operations.go create mode 100644 dbservice/recover_in_progress_operations_test.go diff --git a/dbservice/dbservice.go b/dbservice/dbservice.go index 45270649d..9ec304b0c 100644 --- a/dbservice/dbservice.go +++ b/dbservice/dbservice.go @@ -20,9 +20,8 @@ import ( "sync" "code.cloudfoundry.org/lager/v3" - "gorm.io/gorm" - _ "gorm.io/driver/sqlite" + "gorm.io/gorm" ) var once sync.Once @@ -35,6 +34,9 @@ func New(logger lager.Logger) *gorm.DB { if err := RunMigrations(db); err != nil { panic(fmt.Sprintf("Error migrating database: %s", err)) } + if err := recoverInProgressOperations(db, logger); err != nil { + panic(fmt.Sprintf("Error recovering in-progress operations: %s", err)) + } }) return db } diff --git a/dbservice/recover_in_progress_operations.go b/dbservice/recover_in_progress_operations.go new file mode 100644 index 000000000..31a0a7b37 --- /dev/null +++ b/dbservice/recover_in_progress_operations.go @@ -0,0 +1,24 @@ +package dbservice + +import ( + "code.cloudfoundry.org/lager/v3" + "github.com/cloudfoundry/cloud-service-broker/dbservice/models" + "gorm.io/gorm" +) + +func recoverInProgressOperations(db *gorm.DB, logger lager.Logger) error { + logger = logger.Session("recover-in-progress-operations") + + var terraformDeploymentBatch []models.TerraformDeployment + result := db.Where("last_operation_state = ?", "in progress").FindInBatches(&terraformDeploymentBatch, 100, func(tx *gorm.DB, batchNumber int) error { + for i := range terraformDeploymentBatch { + terraformDeploymentBatch[i].LastOperationState = "failed" + terraformDeploymentBatch[i].LastOperationMessage = "the broker restarted while the operation was in progress" + logger.Info("mark-as-failed", lager.Data{"workspace_id": terraformDeploymentBatch[i].ID}) + } + + return tx.Save(&terraformDeploymentBatch).Error + }) + + return result.Error +} diff --git a/dbservice/recover_in_progress_operations_test.go b/dbservice/recover_in_progress_operations_test.go new file mode 100644 index 000000000..ff14d2b37 --- /dev/null +++ b/dbservice/recover_in_progress_operations_test.go @@ -0,0 +1,83 @@ +package dbservice + +import ( + "strings" + "testing" + + "code.cloudfoundry.org/lager/v3/lagertest" + "github.com/cloudfoundry/cloud-service-broker/dbservice/models" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +func TestRecoverInProgressOperations(t *testing.T) { + // Setup + db, err := gorm.Open(sqlite.Open(":memory:"), nil) + if err != nil { + t.Errorf("failed to create test database: %s", err) + } + + if err = db.Migrator().CreateTable(&models.TerraformDeployment{}); err != nil { + t.Errorf("failed to create test table: %s", err) + } + + const recoverID = "fake-id-to-recover" + err = db.Create(&models.TerraformDeployment{ + ID: recoverID, + LastOperationType: "fake-type", + LastOperationState: "in progress", + LastOperationMessage: "fake-type in progress", + }).Error + if err != nil { + t.Errorf("failed to create test database data: %s", err) + } + const okID = "fake-id-that-does-not-need-to-be-recovered" + err = db.Create(&models.TerraformDeployment{ + ID: okID, + LastOperationType: "fake-type", + LastOperationState: "succeeded", + LastOperationMessage: "fake-type succeeded", + }).Error + if err != nil { + t.Errorf("failed to create test database data: %s", err) + } + + // Call the function + logger := lagertest.NewTestLogger("test") + recoverInProgressOperations(db, logger) + + // It marks the in-progress operation as failed + var r1 models.TerraformDeployment + err = db.Where("id = ?", recoverID).First(&r1).Error + if err != nil { + t.Errorf("failed to load updated test data: %s", err) + } + + const expState = "failed" + if r1.LastOperationState != expState { + t.Errorf("LastOperationState, expected %q, got %q", expState, r1.LastOperationState) + } + + const expMessage = "the broker restarted while the operation was in progress" + if r1.LastOperationMessage != expMessage { + t.Errorf("LastOperationMessage, expected %q, got %q", expMessage, r1.LastOperationMessage) + } + + // It does not update other operations + var r2 models.TerraformDeployment + err = db.Where("id = ?", okID).First(&r2).Error + if err != nil { + t.Errorf("failed to load updated test data: %s", err) + } + if r2.LastOperationState != "succeeded" || r2.LastOperationMessage != "fake-type succeeded" { + t.Error("row corruption") + } + + // It logs the expected message + const expLog1 = `"message":"test.recover-in-progress-operations.mark-as-failed"` + const expLog2 = `"workspace_id":"fake-id-to-recover"` + logMessage := string(logger.Buffer().Contents()) + if !strings.Contains(logMessage, expLog1) || !strings.Contains(logMessage, expLog2) { + t.Errorf("log, expected to contain %q and %q, got %q", expLog1, expLog2, logMessage) + } +} diff --git a/integrationtest/termination_recovery_test.go b/integrationtest/termination_recovery_test.go index 9ed7f07a4..c98b3dafe 100644 --- a/integrationtest/termination_recovery_test.go +++ b/integrationtest/termination_recovery_test.go @@ -39,7 +39,7 @@ var _ = Describe("Recovery From Broker Termination", func() { }) }) - It("cannot recover from a terminated create", func() { + It("can recover from a terminated create", func() { By("starting to provision") instanceGUID := uuid.New() response := broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.New(), nil) @@ -48,13 +48,17 @@ var _ = Describe("Recovery From Broker Termination", func() { By("terminating and restarting the broker") Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database)) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) - By("INCORRECTLY reporting that an operation is still in progress") + By("reporting that an operation failed") lastOperation, err := broker.LastOperation(instanceGUID) Expect(err).NotTo(HaveOccurred()) - Expect(lastOperation.Description).To(Equal("provision in progress")) - Expect(lastOperation.State).To(BeEquivalentTo("in progress")) + Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) + Expect(lastOperation.State).To(BeEquivalentTo("failed")) + + By("logging a message") + ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instanceGUID) + Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) // OSBAPI requires that HTTP 409 (Conflict) is returned By("refusing to allow a duplicate instance") @@ -62,10 +66,10 @@ var _ = Describe("Recovery From Broker Termination", func() { Expect(response.Error).NotTo(HaveOccurred()) Expect(response.StatusCode).To(Equal(http.StatusConflict)) - By("INCORRECTLY failing to allow the instance to be cleaned up") + By("allowing the instance to be cleaned up") response = broker.Client.Deprovision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.New()) Expect(response.Error).NotTo(HaveOccurred()) - Expect(response.StatusCode).To(Equal(http.StatusInternalServerError)) + Expect(response.StatusCode).To(Equal(http.StatusOK)) }) It("can recover from a terminated update", func() { @@ -80,13 +84,17 @@ var _ = Describe("Recovery From Broker Termination", func() { By("terminating and restarting the broker") Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database)) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) - By("INCORRECTLY reporting that an operation is still in progress") + By("reporting that an operation failed") lastOperation, err := broker.LastOperation(instance.GUID) Expect(err).NotTo(HaveOccurred()) - Expect(lastOperation.Description).To(Equal("update in progress")) - Expect(lastOperation.State).To(BeEquivalentTo("in progress")) + Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) + Expect(lastOperation.State).To(BeEquivalentTo("failed")) + + By("logging a message") + ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instance.GUID) + Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) By("allowing the operation to be restarted") Expect(broker.UpdateService(instance)).To(Succeed()) @@ -104,13 +112,17 @@ var _ = Describe("Recovery From Broker Termination", func() { By("terminating and restarting the broker") Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database)) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) - By("INCORRECTLY reporting that an operation is still in progress") + By("reporting that an operation failed") lastOperation, err := broker.LastOperation(instance.GUID) Expect(err).NotTo(HaveOccurred()) - Expect(lastOperation.Description).To(Equal("deprovision in progress")) - Expect(lastOperation.State).To(BeEquivalentTo("in progress")) + Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) + Expect(lastOperation.State).To(BeEquivalentTo("failed")) + + By("logging a message") + ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instance.GUID) + Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) By("allowing the operation to be restarted") Expect(broker.Deprovision(instance)).To(Succeed()) @@ -129,7 +141,7 @@ var _ = Describe("Recovery From Broker Termination", func() { By("terminating and restarting the broker") Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database)) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) By("allowing the operation to be restarted") _, err = broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) @@ -152,7 +164,7 @@ var _ = Describe("Recovery From Broker Termination", func() { By("terminating and restarting the broker") Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database)) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) By("allowing the operation to be restarted") Expect(broker.DeleteBinding(instance, bindingGUID)).To(Succeed())