Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource/aws_msk_configuration: Implement Update and Delete support #14826

Merged
merged 1 commit into from
Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions aws/internal/service/kafka/waiter/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package waiter

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kafka"
"github.com/hashicorp/aws-sdk-go-base/tfawserr"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)

const (
ConfigurationStateDeleted = "Deleted"
ConfigurationStateUnknown = "Unknown"
)

// ConfigurationState fetches the Operation and its Status
func ConfigurationState(conn *kafka.Kafka, arn string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
input := &kafka.DescribeConfigurationInput{
Arn: aws.String(arn),
}

output, err := conn.DescribeConfiguration(input)

if tfawserr.ErrMessageContains(err, kafka.ErrCodeBadRequestException, "Configuration ARN does not exist") {
return output, ConfigurationStateDeleted, nil
}

if err != nil {
return output, ConfigurationStateUnknown, err
}

if output == nil {
return output, ConfigurationStateUnknown, nil
}

return output, aws.StringValue(output.State), nil
}
}
31 changes: 31 additions & 0 deletions aws/internal/service/kafka/waiter/waiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package waiter

import (
"time"

"github.com/aws/aws-sdk-go/service/kafka"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)

const (
// Maximum amount of time to wait for an Configuration to return Deleted
ConfigurationDeletedTimeout = 5 * time.Minute
)

// ConfigurationDeleted waits for an Configuration to return Deleted
func ConfigurationDeleted(conn *kafka.Kafka, arn string) (*kafka.DescribeConfigurationOutput, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kafka.ConfigurationStateDeleting},
Target: []string{ConfigurationStateDeleted},
Refresh: ConfigurationState(conn, arn),
Timeout: ConfigurationDeletedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if output, ok := outputRaw.(*kafka.DescribeConfigurationOutput); ok {
return output, err
}

return nil, err
}
51 changes: 46 additions & 5 deletions aws/resource_aws_msk_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kafka"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kafka/waiter"
)

func resourceAwsMskConfiguration() *schema.Resource {
return &schema.Resource{
Create: resourceAwsMskConfigurationCreate,
Read: resourceAwsMskConfigurationRead,
Delete: schema.Noop,
Update: resourceAwsMskConfigurationUpdate,
Delete: resourceAwsMskConfigurationDelete,

Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
Expand All @@ -27,7 +29,6 @@ func resourceAwsMskConfiguration() *schema.Resource {
"description": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"kafka_versions": {
Type: schema.TypeSet,
Expand All @@ -49,7 +50,6 @@ func resourceAwsMskConfiguration() *schema.Resource {
"server_properties": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
},
}
Expand Down Expand Up @@ -88,7 +88,7 @@ func resourceAwsMskConfigurationRead(d *schema.ResourceData, meta interface{}) e

configurationOutput, err := conn.DescribeConfiguration(configurationInput)

if isAWSErr(err, kafka.ErrCodeNotFoundException, "") {
if isAWSErr(err, kafka.ErrCodeBadRequestException, "Configuration ARN does not exist") {
log.Printf("[WARN] MSK Configuration (%s) not found, removing from state", d.Id())
d.SetId("")
return nil
Expand Down Expand Up @@ -123,7 +123,7 @@ func resourceAwsMskConfigurationRead(d *schema.ResourceData, meta interface{}) e
}

d.Set("arn", aws.StringValue(configurationOutput.Arn))
d.Set("description", aws.StringValue(configurationOutput.Description))
d.Set("description", aws.StringValue(revisionOutput.Description))

if err := d.Set("kafka_versions", aws.StringValueSlice(configurationOutput.KafkaVersions)); err != nil {
return fmt.Errorf("error setting kafka_versions: %s", err)
Expand All @@ -135,3 +135,44 @@ func resourceAwsMskConfigurationRead(d *schema.ResourceData, meta interface{}) e

return nil
}

func resourceAwsMskConfigurationUpdate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).kafkaconn

input := &kafka.UpdateConfigurationInput{
Arn: aws.String(d.Id()),
ServerProperties: []byte(d.Get("server_properties").(string)),
}

if v, ok := d.GetOk("description"); ok {
input.Description = aws.String(v.(string))
}

_, err := conn.UpdateConfiguration(input)

if err != nil {
return fmt.Errorf("error updating MSK Configuration (%s): %w", d.Id(), err)
}

return resourceAwsMskConfigurationRead(d, meta)
}

func resourceAwsMskConfigurationDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).kafkaconn

input := &kafka.DeleteConfigurationInput{
Arn: aws.String(d.Id()),
}

_, err := conn.DeleteConfiguration(input)

if err != nil {
return fmt.Errorf("error deleting MSK Configuration (%s): %w", d.Id(), err)
}

if _, err := waiter.ConfigurationDeleted(conn, d.Id()); err != nil {
return fmt.Errorf("error waiting for MSK Configuration (%s): %w", d.Id(), err)
}

return nil
}
137 changes: 129 additions & 8 deletions aws/resource_aws_msk_configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,79 @@ package aws

import (
"fmt"
"log"
"regexp"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kafka"
"github.com/hashicorp/aws-sdk-go-base/tfawserr"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/terraform"
)

func init() {
resource.AddTestSweepers("aws_msk_configuration", &resource.Sweeper{
Name: "aws_msk_configuration",
F: testSweepMskConfigurations,
Dependencies: []string{
"aws_msk_cluster",
},
})
}

func testSweepMskConfigurations(region string) error {
client, err := sharedClientForRegion(region)
if err != nil {
return fmt.Errorf("error getting client: %s", err)
}
conn := client.(*AWSClient).kafkaconn
var sweeperErrs *multierror.Error

input := &kafka.ListConfigurationsInput{}

err = conn.ListConfigurationsPages(input, func(page *kafka.ListConfigurationsOutput, isLast bool) bool {
if page == nil {
return !isLast
}

for _, configuration := range page.Configurations {
if configuration == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure the likelihood of catching a configuration in a deleting state at this point, but just curious if we should check the state here as well since the docs for DeleteConfiguration note the configuration must be in either the active or delete_failed state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is certainly in the realm of possibility (albeit very unlikely as these seem to delete very fast currently)! Rather than handling it just in our testing code, it would be something that should be handled in the resource Delete function. We won't be able to really test it, but let me see if I can grab the specific error via the AWS CLI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's what I got with quickly doing aws kafka delete-configuration:

An error occurred (BadRequestException) when calling the DeleteConfiguration operation: This operation is only valid for resources that are in one of the following states :[ACTIVE, DELETE_FAILED]

I was hoping it would either have a more specific error code for this situation or include DELETING in the error message so we could safely skip it, but given the way this error is setup they could add other state values for configurations which we should always return the error instead of skipping it. 🙁 I think we'll want to leave this edge case in there for now unless it does become a real problem for practitioners or our sweepers.

continue
}

arn := aws.StringValue(configuration.Arn)
log.Printf("[INFO] Deleting MSK Configuration: %s", arn)

r := resourceAwsMskConfiguration()
d := r.Data(nil)
d.SetId(arn)
err := r.Delete(d, client)

if err != nil {
log.Printf("[ERROR] %s", err)
sweeperErrs = multierror.Append(sweeperErrs, err)
continue
}
}

return !isLast
})

if testSweepSkipSweepError(err) {
log.Printf("[WARN] Skipping MSK Configurations sweep for %s: %s", region, err)
return sweeperErrs.ErrorOrNil() // In case we have completed some pages, but had errors
}

if err != nil {
sweeperErrs = multierror.Append(sweeperErrs, fmt.Errorf("error retrieving MSK Configurations: %w", err))
}

return sweeperErrs.ErrorOrNil()
}

func TestAccAWSMskConfiguration_basic(t *testing.T) {
var configuration1 kafka.DescribeConfigurationOutput
rName := acctest.RandomWithPrefix("tf-acc-test")
Expand Down Expand Up @@ -43,11 +106,33 @@ func TestAccAWSMskConfiguration_basic(t *testing.T) {
})
}

func TestAccAWSMskConfiguration_Description(t *testing.T) {
func TestAccAWSMskConfiguration_disappears(t *testing.T) {
var configuration1 kafka.DescribeConfigurationOutput
rName := acctest.RandomWithPrefix("tf-acc-test")
resourceName := "aws_msk_configuration.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckMskConfigurationDestroy,
Steps: []resource.TestStep{
{
Config: testAccMskConfigurationConfig(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckMskConfigurationExists(resourceName, &configuration1),
testAccCheckResourceDisappears(testAccProvider, resourceAwsMskConfiguration(), resourceName),
),
ExpectNonEmptyPlan: true,
},
},
})
}

func TestAccAWSMskConfiguration_Description(t *testing.T) {
var configuration1, configuration2 kafka.DescribeConfigurationOutput
rName := acctest.RandomWithPrefix("tf-acc-test")
resourceName := "aws_msk_configuration.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) },
Providers: testAccProviders,
Expand All @@ -65,6 +150,14 @@ func TestAccAWSMskConfiguration_Description(t *testing.T) {
ImportState: true,
ImportStateVerify: true,
},
{
Config: testAccMskConfigurationConfigDescription(rName, "description2"),
Check: resource.ComposeTestCheckFunc(
testAccCheckMskConfigurationExists(resourceName, &configuration2),
resource.TestCheckResourceAttr(resourceName, "description", "description2"),
resource.TestCheckResourceAttr(resourceName, "latest_revision", "2"),
),
},
},
})
}
Expand Down Expand Up @@ -96,38 +189,66 @@ func TestAccAWSMskConfiguration_KafkaVersions(t *testing.T) {
}

func TestAccAWSMskConfiguration_ServerProperties(t *testing.T) {
var configuration1 kafka.DescribeConfigurationOutput
var configuration1, configuration2 kafka.DescribeConfigurationOutput
rName := acctest.RandomWithPrefix("tf-acc-test")
resourceName := "aws_msk_configuration.test"
serverProperty1 := "auto.create.topics.enable = false"
serverProperty2 := "auto.create.topics.enable = true"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckMskConfigurationDestroy,
Steps: []resource.TestStep{
{
Config: testAccMskConfigurationConfigServerProperties(rName),
Config: testAccMskConfigurationConfigServerProperties(rName, serverProperty1),
Check: resource.ComposeTestCheckFunc(
testAccCheckMskConfigurationExists(resourceName, &configuration1),
resource.TestMatchResourceAttr(resourceName, "server_properties", regexp.MustCompile(`auto.create.topics.enable = false`)),
resource.TestMatchResourceAttr(resourceName, "server_properties", regexp.MustCompile(serverProperty1)),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
{
Config: testAccMskConfigurationConfigServerProperties(rName, serverProperty2),
Check: resource.ComposeTestCheckFunc(
testAccCheckMskConfigurationExists(resourceName, &configuration2),
resource.TestCheckResourceAttr(resourceName, "latest_revision", "2"),
resource.TestMatchResourceAttr(resourceName, "server_properties", regexp.MustCompile(serverProperty2)),
),
},
},
})
}

func testAccCheckMskConfigurationDestroy(s *terraform.State) error {
conn := testAccProvider.Meta().(*AWSClient).kafkaconn

for _, rs := range s.RootModule().Resources {
if rs.Type != "aws_msk_configuration" {
continue
}

// The API does not support deletions at this time
input := &kafka.DescribeConfigurationInput{
Arn: aws.String(rs.Primary.ID),
}

output, err := conn.DescribeConfiguration(input)

if tfawserr.ErrMessageContains(err, kafka.ErrCodeBadRequestException, "Configuration ARN does not exist") {
continue
}

if err != nil {
return err
}

if output != nil {
return fmt.Errorf("MSK Configuration (%s) still exists", rs.Primary.ID)
}
}

return nil
Expand Down Expand Up @@ -203,15 +324,15 @@ PROPERTIES
`, rName)
}

func testAccMskConfigurationConfigServerProperties(rName string) string {
func testAccMskConfigurationConfigServerProperties(rName string, serverProperty string) string {
return fmt.Sprintf(`
resource "aws_msk_configuration" "test" {
kafka_versions = ["2.1.0"]
name = %[1]q

server_properties = <<PROPERTIES
auto.create.topics.enable = false
%[2]s
PROPERTIES
}
`, rName)
`, rName, serverProperty)
}
2 changes: 0 additions & 2 deletions website/docs/r/msk_configuration.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ description: |-

Manages an Amazon Managed Streaming for Kafka configuration. More information can be found on the [MSK Developer Guide](https://docs.aws.amazon.com/msk/latest/developerguide/msk-configuration.html).

~> **NOTE:** The API does not support deleting MSK configurations. Removing this Terraform resource will only remove the Terraform state for it.

## Example Usage

```hcl
Expand Down