diff --git a/aws/internal/service/kafka/waiter/status.go b/aws/internal/service/kafka/waiter/status.go new file mode 100644 index 00000000000..52186f45f3f --- /dev/null +++ b/aws/internal/service/kafka/waiter/status.go @@ -0,0 +1,38 @@ +package waiter + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kafka" + "github.com/hashicorp/aws-sdk-go-base/tfawserr" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" +) + +const ( + ConfigurationStateDeleted = "Deleted" + ConfigurationStateUnknown = "Unknown" +) + +// ConfigurationState fetches the Operation and its Status +func ConfigurationState(conn *kafka.Kafka, arn string) resource.StateRefreshFunc { + return func() (interface{}, string, error) { + input := &kafka.DescribeConfigurationInput{ + Arn: aws.String(arn), + } + + output, err := conn.DescribeConfiguration(input) + + if tfawserr.ErrMessageContains(err, kafka.ErrCodeBadRequestException, "Configuration ARN does not exist") { + return output, ConfigurationStateDeleted, nil + } + + if err != nil { + return output, ConfigurationStateUnknown, err + } + + if output == nil { + return output, ConfigurationStateUnknown, nil + } + + return output, aws.StringValue(output.State), nil + } +} diff --git a/aws/internal/service/kafka/waiter/waiter.go b/aws/internal/service/kafka/waiter/waiter.go new file mode 100644 index 00000000000..2f84656adfe --- /dev/null +++ b/aws/internal/service/kafka/waiter/waiter.go @@ -0,0 +1,31 @@ +package waiter + +import ( + "time" + + "github.com/aws/aws-sdk-go/service/kafka" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" +) + +const ( + // Maximum amount of time to wait for an Configuration to return Deleted + ConfigurationDeletedTimeout = 5 * time.Minute +) + +// ConfigurationDeleted waits for an Configuration to return Deleted +func ConfigurationDeleted(conn *kafka.Kafka, arn string) (*kafka.DescribeConfigurationOutput, error) { + stateConf := &resource.StateChangeConf{ + Pending: []string{kafka.ConfigurationStateDeleting}, + Target: []string{ConfigurationStateDeleted}, + Refresh: ConfigurationState(conn, arn), + Timeout: ConfigurationDeletedTimeout, + } + + outputRaw, err := stateConf.WaitForState() + + if output, ok := outputRaw.(*kafka.DescribeConfigurationOutput); ok { + return output, err + } + + return nil, err +} diff --git a/aws/resource_aws_msk_configuration.go b/aws/resource_aws_msk_configuration.go index 86fa4d97b30..345aa0eed1a 100644 --- a/aws/resource_aws_msk_configuration.go +++ b/aws/resource_aws_msk_configuration.go @@ -7,13 +7,15 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/kafka" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kafka/waiter" ) func resourceAwsMskConfiguration() *schema.Resource { return &schema.Resource{ Create: resourceAwsMskConfigurationCreate, Read: resourceAwsMskConfigurationRead, - Delete: schema.Noop, + Update: resourceAwsMskConfigurationUpdate, + Delete: resourceAwsMskConfigurationDelete, Importer: &schema.ResourceImporter{ State: schema.ImportStatePassthrough, @@ -27,7 +29,6 @@ func resourceAwsMskConfiguration() *schema.Resource { "description": { Type: schema.TypeString, Optional: true, - ForceNew: true, }, "kafka_versions": { Type: schema.TypeSet, @@ -49,7 +50,6 @@ func resourceAwsMskConfiguration() *schema.Resource { "server_properties": { Type: schema.TypeString, Required: true, - ForceNew: true, }, }, } @@ -88,7 +88,7 @@ func resourceAwsMskConfigurationRead(d *schema.ResourceData, meta interface{}) e configurationOutput, err := conn.DescribeConfiguration(configurationInput) - if isAWSErr(err, kafka.ErrCodeNotFoundException, "") { + if isAWSErr(err, kafka.ErrCodeBadRequestException, "Configuration ARN does not exist") { log.Printf("[WARN] MSK Configuration (%s) not found, removing from state", d.Id()) d.SetId("") return nil @@ -123,7 +123,7 @@ func resourceAwsMskConfigurationRead(d *schema.ResourceData, meta interface{}) e } d.Set("arn", aws.StringValue(configurationOutput.Arn)) - d.Set("description", aws.StringValue(configurationOutput.Description)) + d.Set("description", aws.StringValue(revisionOutput.Description)) if err := d.Set("kafka_versions", aws.StringValueSlice(configurationOutput.KafkaVersions)); err != nil { return fmt.Errorf("error setting kafka_versions: %s", err) @@ -135,3 +135,44 @@ func resourceAwsMskConfigurationRead(d *schema.ResourceData, meta interface{}) e return nil } + +func resourceAwsMskConfigurationUpdate(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kafkaconn + + input := &kafka.UpdateConfigurationInput{ + Arn: aws.String(d.Id()), + ServerProperties: []byte(d.Get("server_properties").(string)), + } + + if v, ok := d.GetOk("description"); ok { + input.Description = aws.String(v.(string)) + } + + _, err := conn.UpdateConfiguration(input) + + if err != nil { + return fmt.Errorf("error updating MSK Configuration (%s): %w", d.Id(), err) + } + + return resourceAwsMskConfigurationRead(d, meta) +} + +func resourceAwsMskConfigurationDelete(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kafkaconn + + input := &kafka.DeleteConfigurationInput{ + Arn: aws.String(d.Id()), + } + + _, err := conn.DeleteConfiguration(input) + + if err != nil { + return fmt.Errorf("error deleting MSK Configuration (%s): %w", d.Id(), err) + } + + if _, err := waiter.ConfigurationDeleted(conn, d.Id()); err != nil { + return fmt.Errorf("error waiting for MSK Configuration (%s): %w", d.Id(), err) + } + + return nil +} diff --git a/aws/resource_aws_msk_configuration_test.go b/aws/resource_aws_msk_configuration_test.go index 47c863a9a29..398af256ee7 100644 --- a/aws/resource_aws_msk_configuration_test.go +++ b/aws/resource_aws_msk_configuration_test.go @@ -2,16 +2,79 @@ package aws import ( "fmt" + "log" "regexp" "testing" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/kafka" + "github.com/hashicorp/aws-sdk-go-base/tfawserr" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" ) +func init() { + resource.AddTestSweepers("aws_msk_configuration", &resource.Sweeper{ + Name: "aws_msk_configuration", + F: testSweepMskConfigurations, + Dependencies: []string{ + "aws_msk_cluster", + }, + }) +} + +func testSweepMskConfigurations(region string) error { + client, err := sharedClientForRegion(region) + if err != nil { + return fmt.Errorf("error getting client: %s", err) + } + conn := client.(*AWSClient).kafkaconn + var sweeperErrs *multierror.Error + + input := &kafka.ListConfigurationsInput{} + + err = conn.ListConfigurationsPages(input, func(page *kafka.ListConfigurationsOutput, isLast bool) bool { + if page == nil { + return !isLast + } + + for _, configuration := range page.Configurations { + if configuration == nil { + continue + } + + arn := aws.StringValue(configuration.Arn) + log.Printf("[INFO] Deleting MSK Configuration: %s", arn) + + r := resourceAwsMskConfiguration() + d := r.Data(nil) + d.SetId(arn) + err := r.Delete(d, client) + + if err != nil { + log.Printf("[ERROR] %s", err) + sweeperErrs = multierror.Append(sweeperErrs, err) + continue + } + } + + return !isLast + }) + + if testSweepSkipSweepError(err) { + log.Printf("[WARN] Skipping MSK Configurations sweep for %s: %s", region, err) + return sweeperErrs.ErrorOrNil() // In case we have completed some pages, but had errors + } + + if err != nil { + sweeperErrs = multierror.Append(sweeperErrs, fmt.Errorf("error retrieving MSK Configurations: %w", err)) + } + + return sweeperErrs.ErrorOrNil() +} + func TestAccAWSMskConfiguration_basic(t *testing.T) { var configuration1 kafka.DescribeConfigurationOutput rName := acctest.RandomWithPrefix("tf-acc-test") @@ -43,11 +106,33 @@ func TestAccAWSMskConfiguration_basic(t *testing.T) { }) } -func TestAccAWSMskConfiguration_Description(t *testing.T) { +func TestAccAWSMskConfiguration_disappears(t *testing.T) { var configuration1 kafka.DescribeConfigurationOutput rName := acctest.RandomWithPrefix("tf-acc-test") resourceName := "aws_msk_configuration.test" + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckMskConfigurationDestroy, + Steps: []resource.TestStep{ + { + Config: testAccMskConfigurationConfig(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskConfigurationExists(resourceName, &configuration1), + testAccCheckResourceDisappears(testAccProvider, resourceAwsMskConfiguration(), resourceName), + ), + ExpectNonEmptyPlan: true, + }, + }, + }) +} + +func TestAccAWSMskConfiguration_Description(t *testing.T) { + var configuration1, configuration2 kafka.DescribeConfigurationOutput + rName := acctest.RandomWithPrefix("tf-acc-test") + resourceName := "aws_msk_configuration.test" + resource.ParallelTest(t, resource.TestCase{ PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) }, Providers: testAccProviders, @@ -65,6 +150,14 @@ func TestAccAWSMskConfiguration_Description(t *testing.T) { ImportState: true, ImportStateVerify: true, }, + { + Config: testAccMskConfigurationConfigDescription(rName, "description2"), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskConfigurationExists(resourceName, &configuration2), + resource.TestCheckResourceAttr(resourceName, "description", "description2"), + resource.TestCheckResourceAttr(resourceName, "latest_revision", "2"), + ), + }, }, }) } @@ -96,9 +189,11 @@ func TestAccAWSMskConfiguration_KafkaVersions(t *testing.T) { } func TestAccAWSMskConfiguration_ServerProperties(t *testing.T) { - var configuration1 kafka.DescribeConfigurationOutput + var configuration1, configuration2 kafka.DescribeConfigurationOutput rName := acctest.RandomWithPrefix("tf-acc-test") resourceName := "aws_msk_configuration.test" + serverProperty1 := "auto.create.topics.enable = false" + serverProperty2 := "auto.create.topics.enable = true" resource.ParallelTest(t, resource.TestCase{ PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) }, @@ -106,10 +201,10 @@ func TestAccAWSMskConfiguration_ServerProperties(t *testing.T) { CheckDestroy: testAccCheckMskConfigurationDestroy, Steps: []resource.TestStep{ { - Config: testAccMskConfigurationConfigServerProperties(rName), + Config: testAccMskConfigurationConfigServerProperties(rName, serverProperty1), Check: resource.ComposeTestCheckFunc( testAccCheckMskConfigurationExists(resourceName, &configuration1), - resource.TestMatchResourceAttr(resourceName, "server_properties", regexp.MustCompile(`auto.create.topics.enable = false`)), + resource.TestMatchResourceAttr(resourceName, "server_properties", regexp.MustCompile(serverProperty1)), ), }, { @@ -117,17 +212,43 @@ func TestAccAWSMskConfiguration_ServerProperties(t *testing.T) { ImportState: true, ImportStateVerify: true, }, + { + Config: testAccMskConfigurationConfigServerProperties(rName, serverProperty2), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskConfigurationExists(resourceName, &configuration2), + resource.TestCheckResourceAttr(resourceName, "latest_revision", "2"), + resource.TestMatchResourceAttr(resourceName, "server_properties", regexp.MustCompile(serverProperty2)), + ), + }, }, }) } func testAccCheckMskConfigurationDestroy(s *terraform.State) error { + conn := testAccProvider.Meta().(*AWSClient).kafkaconn + for _, rs := range s.RootModule().Resources { if rs.Type != "aws_msk_configuration" { continue } - // The API does not support deletions at this time + input := &kafka.DescribeConfigurationInput{ + Arn: aws.String(rs.Primary.ID), + } + + output, err := conn.DescribeConfiguration(input) + + if tfawserr.ErrMessageContains(err, kafka.ErrCodeBadRequestException, "Configuration ARN does not exist") { + continue + } + + if err != nil { + return err + } + + if output != nil { + return fmt.Errorf("MSK Configuration (%s) still exists", rs.Primary.ID) + } } return nil @@ -203,15 +324,15 @@ PROPERTIES `, rName) } -func testAccMskConfigurationConfigServerProperties(rName string) string { +func testAccMskConfigurationConfigServerProperties(rName string, serverProperty string) string { return fmt.Sprintf(` resource "aws_msk_configuration" "test" { kafka_versions = ["2.1.0"] name = %[1]q server_properties = < **NOTE:** The API does not support deleting MSK configurations. Removing this Terraform resource will only remove the Terraform state for it. - ## Example Usage ```hcl