From 6f994320b21f69ee14313367db32b87e0a08d17d Mon Sep 17 00:00:00 2001 From: Jesse Aukeman Date: Tue, 14 May 2019 13:11:09 -0400 Subject: [PATCH 1/4] Add AWS MSK cluster resource * implement aws_msk_cluster resource with support for tags * implement acceptance tests --- aws/provider.go | 1 + aws/resource_aws_msk_cluster.go | 372 ++++++++++++++++++ aws/resource_aws_msk_cluster_test.go | 481 +++++++++++++++++++++++ aws/tags_msk.go | 102 +++++ aws/tags_msk_test.go | 101 +++++ website/aws.erb | 10 + website/docs/r/msk_cluster.html.markdown | 124 ++++++ 7 files changed, 1191 insertions(+) create mode 100644 aws/resource_aws_msk_cluster.go create mode 100644 aws/resource_aws_msk_cluster_test.go create mode 100644 aws/tags_msk.go create mode 100644 aws/tags_msk_test.go create mode 100644 website/docs/r/msk_cluster.html.markdown diff --git a/aws/provider.go b/aws/provider.go index b0d6d2eb33b..fab15af72a3 100644 --- a/aws/provider.go +++ b/aws/provider.go @@ -559,6 +559,7 @@ func Provider() terraform.ResourceProvider { "aws_media_package_channel": resourceAwsMediaPackageChannel(), "aws_media_store_container": resourceAwsMediaStoreContainer(), "aws_media_store_container_policy": resourceAwsMediaStoreContainerPolicy(), + "aws_msk_cluster": resourceAwsMskCluster(), "aws_nat_gateway": resourceAwsNatGateway(), "aws_network_acl": resourceAwsNetworkAcl(), "aws_default_network_acl": resourceAwsDefaultNetworkAcl(), diff --git a/aws/resource_aws_msk_cluster.go b/aws/resource_aws_msk_cluster.go new file mode 100644 index 00000000000..5b4a91e30a4 --- /dev/null +++ b/aws/resource_aws_msk_cluster.go @@ -0,0 +1,372 @@ +package aws + +import ( + "fmt" + "log" + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kafka" + "github.com/hashicorp/terraform/helper/resource" + "github.com/hashicorp/terraform/helper/schema" + "github.com/hashicorp/terraform/helper/validation" +) + +func resourceAwsMskCluster() *schema.Resource { + return &schema.Resource{ + Create: resourceAwsMskClusterCreate, + Read: resourceAwsMskClusterRead, + Update: resourceAwsMskClusterUpdate, + Delete: resourceAwsMskClusterDelete, + Importer: &schema.ResourceImporter{ + State: schema.ImportStatePassthrough, + }, + Timeouts: &schema.ResourceTimeout{ + Create: schema.DefaultTimeout(45 * time.Minute), + Delete: schema.DefaultTimeout(45 * time.Minute), + }, + Schema: map[string]*schema.Schema{ + "arn": { + Type: schema.TypeString, + Computed: true, + }, + "bootstrap_brokers": { + Type: schema.TypeString, + Computed: true, + }, + "broker_node_group_info": { + Type: schema.TypeList, + Required: true, + ForceNew: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "az_distribution": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + Default: kafka.BrokerAZDistributionDefault, + ValidateFunc: validation.StringInSlice([]string{ + kafka.BrokerAZDistributionDefault, + }, true), + }, + "client_subnets": { + Type: schema.TypeList, + Required: true, + ForceNew: true, + Elem: &schema.Schema{ + Type: schema.TypeString, + }, + }, + "instance_type": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "security_groups": { + Type: schema.TypeList, + Required: true, + ForceNew: true, + Elem: &schema.Schema{ + Type: schema.TypeString, + }, + }, + "ebs_volume_size": { + Type: schema.TypeInt, + Required: true, + ForceNew: true, + ValidateFunc: validation.IntBetween(1, 16384), + }, + }, + }, + }, + "cluster_name": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + ValidateFunc: validation.StringLenBetween(1, 64), + }, + "encryption_info": { + Type: schema.TypeList, + Optional: true, + Computed: true, + ForceNew: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "encryption_at_rest_kms_id": { + Type: schema.TypeString, + Optional: true, + Computed: true, + ForceNew: true, + DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool { + // MSK api accepts either KMS short id or arn, but always returns arn. + // treat them as equivalent + return strings.Contains(old, new) + }, + }, + }, + }, + }, + "enhanced_monitoring": { + Type: schema.TypeString, + Optional: true, + Default: kafka.EnhancedMonitoringDefault, + ForceNew: true, + ValidateFunc: validation.StringInSlice([]string{ + kafka.EnhancedMonitoringDefault, + kafka.EnhancedMonitoringPerBroker, + kafka.EnhancedMonitoringPerTopicPerBroker, + }, true), + }, + "kafka_version": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + ValidateFunc: validation.StringLenBetween(1, 64), + }, + "number_of_broker_nodes": { + Type: schema.TypeInt, + Required: true, + ForceNew: true, + }, + "tags": tagsSchema(), + "zookeeper_connect_string": { + Type: schema.TypeString, + Computed: true, + }, + }, + } +} + +func resourceAwsMskClusterCreate(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kafkaconn + + nodeInfo := d.Get("broker_node_group_info").([]interface{})[0].(map[string]interface{}) + + input := &kafka.CreateClusterInput{ + ClusterName: aws.String(d.Get("cluster_name").(string)), + EnhancedMonitoring: aws.String(d.Get("enhanced_monitoring").(string)), + NumberOfBrokerNodes: aws.Int64(int64(d.Get("number_of_broker_nodes").(int))), + BrokerNodeGroupInfo: &kafka.BrokerNodeGroupInfo{ + BrokerAZDistribution: aws.String(nodeInfo["az_distribution"].(string)), + InstanceType: aws.String(nodeInfo["instance_type"].(string)), + StorageInfo: &kafka.StorageInfo{ + EbsStorageInfo: &kafka.EBSStorageInfo{ + VolumeSize: aws.Int64(int64(nodeInfo["ebs_volume_size"].(int))), + }, + }, + ClientSubnets: expandStringList(nodeInfo["client_subnets"].([]interface{})), + SecurityGroups: expandStringList(nodeInfo["security_groups"].([]interface{})), + }, + KafkaVersion: aws.String(d.Get("kafka_version").(string)), + } + + if v, ok := d.GetOk("encryption_info"); ok { + info := v.([]interface{}) + if len(info) == 1 { + if info[0] == nil { + return fmt.Errorf("At least one item is expected inside encryption_info") + + } + + i := info[0].(map[string]interface{}) + + input.EncryptionInfo = &kafka.EncryptionInfo{ + EncryptionAtRest: &kafka.EncryptionAtRest{ + DataVolumeKMSKeyId: aws.String(i["encryption_at_rest_kms_id"].(string)), + }, + } + } + } + + var out *kafka.CreateClusterOutput + err := resource.Retry(30*time.Second, func() *resource.RetryError { + var err error + out, err = conn.CreateCluster(input) + + if err != nil { + if isAWSErr(err, kafka.ErrCodeTooManyRequestsException, "Too Many Requests") { + return resource.RetryableError(err) + } + return resource.NonRetryableError(err) + } + return nil + }) + if err != nil { + return fmt.Errorf("error creating MSK cluster (%s): %s", d.Id(), err) + } + + d.SetId(aws.StringValue(out.ClusterArn)) + + // set tags while cluster is being created + tags := tagsFromMapMskCluster(d.Get("tags").(map[string]interface{})) + + if err := setTagsMskCluster(conn, d, aws.StringValue(out.ClusterArn)); err != nil { + return err + } + + d.Set("tags", tagsToMapMskCluster(tags)) + d.SetPartial("tags") + + log.Printf("[DEBUG] Waiting for MSK cluster %q to be created", d.Id()) + err = waitForClusterCreation(conn, d.Id()) + if err != nil { + return fmt.Errorf("error waiting for MSK cluster creation (%s): %s", d.Id(), err) + } + d.Partial(false) + + log.Printf("[DEBUG] MSK cluster %q created", d.Id()) + + return resourceAwsMskClusterRead(d, meta) +} + +func waitForClusterCreation(conn *kafka.Kafka, arn string) error { + return resource.Retry(60*time.Minute, func() *resource.RetryError { + out, err := conn.DescribeCluster(&kafka.DescribeClusterInput{ + ClusterArn: aws.String(arn), + }) + if err != nil { + return resource.NonRetryableError(err) + } + if out.ClusterInfo != nil { + if aws.StringValue(out.ClusterInfo.State) == kafka.ClusterStateFailed { + return resource.NonRetryableError(fmt.Errorf("Cluster creation failed with cluster state %q", kafka.ClusterStateFailed)) + } + if aws.StringValue(out.ClusterInfo.State) == kafka.ClusterStateActive { + return nil + } + } + return resource.RetryableError(fmt.Errorf("%q: cluster still creating", arn)) + }) +} + +func resourceAwsMskClusterRead(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kafkaconn + + out, err := conn.DescribeCluster(&kafka.DescribeClusterInput{ + ClusterArn: aws.String(d.Id()), + }) + if err != nil { + if isAWSErr(err, kafka.ErrCodeNotFoundException, "") { + log.Printf("[WARN] MSK Cluster (%s) not found, removing from state", d.Id()) + d.SetId("") + return nil + } + return fmt.Errorf("failed lookup cluster %s: %s", d.Id(), err) + } + + brokerOut, err := conn.GetBootstrapBrokers(&kafka.GetBootstrapBrokersInput{ + ClusterArn: aws.String(d.Id()), + }) + if err != nil { + return fmt.Errorf("failed requesting bootstrap broker info for %q : %s", d.Id(), err) + } + + cluster := out.ClusterInfo + + d.SetId(aws.StringValue(cluster.ClusterArn)) + d.Set("arn", aws.StringValue(cluster.ClusterArn)) + d.Set("bootstrap_brokers", brokerOut.BootstrapBrokerString) + + d.Set("broker_node_group_info", flattenMskBrokerNodeGroupInfo(cluster.BrokerNodeGroupInfo)) + + d.Set("cluster_name", aws.StringValue(cluster.ClusterName)) + d.Set("enhanced_monitoring", aws.StringValue(cluster.EnhancedMonitoring)) + d.Set("encryption_info", flattenMskEncryptionInfo(cluster.EncryptionInfo)) + d.Set("kafka_version", aws.StringValue(cluster.CurrentBrokerSoftwareInfo.KafkaVersion)) + d.Set("number_of_broker_nodes", aws.Int64Value(cluster.NumberOfBrokerNodes)) + d.Set("zookeeper_connect_string", cluster.ZookeeperConnectString) + + listTagsOut, err := conn.ListTagsForResource(&kafka.ListTagsForResourceInput{ + ResourceArn: cluster.ClusterArn, + }) + if err != nil { + return fmt.Errorf("failed listing tags for msk cluster %q: %s", d.Id(), err) + } + + d.Set("tags", tagsToMapMskCluster(listTagsOut.Tags)) + + return nil +} + +func resourceAwsMskClusterUpdate(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kafkaconn + + // currently tags are the only thing that are updatable.. + if err := setTagsMskCluster(conn, d, d.Id()); err != nil { + return fmt.Errorf("failed updating tags for msk cluster %q: %s", d.Id(), err) + } + + return resourceAwsMskClusterRead(d, meta) + +} + +func flattenMskBrokerNodeGroupInfo(b *kafka.BrokerNodeGroupInfo) []map[string]interface{} { + + if b == nil { + return []map[string]interface{}{} + } + + m := map[string]interface{}{ + "az_distribution": aws.StringValue(b.BrokerAZDistribution), + "client_subnets": flattenStringList(b.ClientSubnets), + "instance_type": aws.StringValue(b.InstanceType), + "security_groups": flattenStringList(b.SecurityGroups), + } + if b.StorageInfo != nil { + if b.StorageInfo.EbsStorageInfo != nil { + m["ebs_volume_size"] = int(aws.Int64Value(b.StorageInfo.EbsStorageInfo.VolumeSize)) + } + } + return []map[string]interface{}{m} +} + +func flattenMskEncryptionInfo(e *kafka.EncryptionInfo) []map[string]interface{} { + if e == nil || e.EncryptionAtRest == nil { + return []map[string]interface{}{} + } + + m := map[string]interface{}{ + "encryption_at_rest_kms_id": aws.StringValue(e.EncryptionAtRest.DataVolumeKMSKeyId), + } + + return []map[string]interface{}{m} +} + +func resourceAwsMskClusterDelete(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kafkaconn + + log.Printf("[DEBUG] Deleting MSK cluster: %q", d.Id()) + _, err := conn.DeleteCluster(&kafka.DeleteClusterInput{ + ClusterArn: aws.String(d.Id()), + }) + if err != nil { + if isAWSErr(err, kafka.ErrCodeNotFoundException, "") { + return nil + } + return fmt.Errorf("failed deleting MSK cluster %q: %s", d.Id(), err) + } + + log.Printf("[DEBUG] Waiting for MSK cluster %q to be deleted", d.Id()) + + return resourceAwsMskClusterDeleteWaiter(conn, d.Id()) +} + +func resourceAwsMskClusterDeleteWaiter(conn *kafka.Kafka, arn string) error { + return resource.Retry(60*time.Minute, func() *resource.RetryError { + _, err := conn.DescribeCluster(&kafka.DescribeClusterInput{ + ClusterArn: aws.String(arn), + }) + + if err != nil { + if isAWSErr(err, kafka.ErrCodeNotFoundException, "") { + return nil + } + return resource.NonRetryableError(err) + } + + return resource.RetryableError(fmt.Errorf("timeout while waiting for the cluster %q to be deleted", arn)) + }) +} diff --git a/aws/resource_aws_msk_cluster_test.go b/aws/resource_aws_msk_cluster_test.go new file mode 100644 index 00000000000..b49f31d7a49 --- /dev/null +++ b/aws/resource_aws_msk_cluster_test.go @@ -0,0 +1,481 @@ +package aws + +import ( + "fmt" + "log" + "regexp" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kafka" + + "github.com/hashicorp/terraform/helper/acctest" + "github.com/hashicorp/terraform/helper/resource" + "github.com/hashicorp/terraform/terraform" +) + +func init() { + resource.AddTestSweepers("aws_msk_cluster", &resource.Sweeper{ + Name: "aws_msk_cluster", + F: testSweepMskClusters, + }) +} + +func testSweepMskClusters(region string) error { + client, err := sharedClientForRegion(region) + if err != nil { + return fmt.Errorf("error getting client: %s", err) + } + + conn := client.(*AWSClient).kafkaconn + + out, err := conn.ListClusters(&kafka.ListClustersInput{}) + if err != nil { + if testSweepSkipSweepError(err) { + log.Printf("[WARN] skipping msk cluster domain sweep for %s: %s", region, err) + return nil + } + return fmt.Errorf("Error retrieving MSK clusters: %s", err) + } + + for _, cluster := range out.ClusterInfoList { + log.Printf("[INFO] Deleting Msk cluster: %s", *cluster.ClusterName) + _, err := conn.DeleteCluster(&kafka.DeleteClusterInput{ + ClusterArn: cluster.ClusterArn, + }) + if err != nil { + log.Printf("[ERROR] Failed to delete MSK cluster %s: %s", *cluster.ClusterName, err) + continue + } + err = resourceAwsMskClusterDeleteWaiter(conn, *cluster.ClusterArn) + if err != nil { + log.Printf("[ERROR] failed to wait for deletion of MSK cluster %s: %s", *cluster.ClusterName, err) + } + } + return nil +} + +func TestAccAWSMskCluster_basic(t *testing.T) { + var cluster kafka.ClusterInfo + var td kafka.ListTagsForResourceOutput + ri := acctest.RandInt() + resourceName := "aws_msk_cluster.example" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckMskClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testAccMskClusterConfig_basic(ri), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster), + testAccMatchResourceAttrRegionalARN(resourceName, "arn", "kafka", regexp.MustCompile(`cluster/.+`)), + testAccMatchResourceAttrRegionalARN(resourceName, "encryption_info.0.encryption_at_rest_kms_id", "kms", regexp.MustCompile(`key/.+`)), + resource.TestCheckResourceAttr(resourceName, "cluster_name", fmt.Sprintf("tf-test-%d", ri)), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.az_distribution", kafka.BrokerAZDistributionDefault), + resource.TestCheckResourceAttr(resourceName, "kafka_version", "1.1.1"), + resource.TestCheckResourceAttr(resourceName, "number_of_broker_nodes", "3"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.instance_type", "kafka.m5.large"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.ebs_volume_size", "10"), + resource.TestMatchResourceAttr(resourceName, "zookeeper_connect_string", regexp.MustCompile(`^\d+\.\d+\.\d+\.\d+:\d+,\d+\.\d+\.\d+\.\d+:\d+,\d+\.\d+\.\d+\.\d+:\d+$`)), + resource.TestMatchResourceAttr(resourceName, "bootstrap_brokers", regexp.MustCompile(`^(([-\w]+\.){1,}[\w]+:\d+,){2,}([-\w]+\.){1,}[\w]+:\d+$`)), + resource.TestCheckResourceAttr(resourceName, "enhanced_monitoring", kafka.EnhancedMonitoringDefault), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.client_subnets.#", "3"), + resource.TestCheckResourceAttrPair(resourceName, "broker_node_group_info.0.client_subnets.0", "aws_subnet.example_subnet_az1", "id"), + resource.TestCheckResourceAttrPair(resourceName, "broker_node_group_info.0.client_subnets.1", "aws_subnet.example_subnet_az2", "id"), + resource.TestCheckResourceAttrPair(resourceName, "broker_node_group_info.0.client_subnets.2", "aws_subnet.example_subnet_az3", "id"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.security_groups.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "broker_node_group_info.0.security_groups.0", "aws_security_group.example_sg", "id"), + testAccLoadMskTags(&cluster, &td), + testAccCheckMskClusterTags(&td, "foo", "bar"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{ + "bootstrap_brokers", // API may mutate ordering and selection of brokers to return + }, + }, + }, + }) +} +func TestAccAWSMskCluster_kms(t *testing.T) { + var cluster kafka.ClusterInfo + ri := acctest.RandInt() + resourceName := "aws_msk_cluster.example" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckMskClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testAccMskClusterConfig_kms(ri), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster), + resource.TestCheckResourceAttrPair(resourceName, "encryption_info.0.encryption_at_rest_kms_id", "aws_kms_key.example_key", "arn"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{ + "bootstrap_brokers", // API may mutate ordering and selection of brokers to return + }, + }, + }, + }) +} + +func TestAccAWSMskCluster_enhancedMonitoring(t *testing.T) { + var cluster kafka.ClusterInfo + ri := acctest.RandInt() + resourceName := "aws_msk_cluster.example" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckMskClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testAccMskClusterConfig_enhancedMonitoring(ri), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster), + resource.TestCheckResourceAttr(resourceName, "enhanced_monitoring", kafka.EnhancedMonitoringPerBroker), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{ + "bootstrap_brokers", + }, + }, + }, + }) +} +func TestAccAWSMskCluster_tagsUpdate(t *testing.T) { + var cluster kafka.ClusterInfo + var td kafka.ListTagsForResourceOutput + ri := acctest.RandInt() + resourceName := "aws_msk_cluster.example" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckMskClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testAccMskClusterConfig_basic(ri), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster), + testAccLoadMskTags(&cluster, &td), + testAccCheckMskClusterTags(&td, "foo", "bar"), + ), + }, + { + Config: testAccMskClusterConfig_tagsUpdate(ri), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster), + testAccLoadMskTags(&cluster, &td), + testAccCheckMskClusterTags(&td, "foo", "baz"), + testAccCheckMskClusterTags(&td, "new", "type"), + ), + }, + }, + }) +} + +func TestAccAWSMskCluster_brokerNodes(t *testing.T) { + var cluster kafka.ClusterInfo + ri := acctest.RandInt() + resourceName := "aws_msk_cluster.example" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckMskClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testAccMskClusterConfig_brokerNodes(ri), + Check: resource.ComposeTestCheckFunc( + testAccCheckMskClusterExists(resourceName, &cluster), + resource.TestCheckResourceAttr(resourceName, "number_of_broker_nodes", "6"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.instance_type", "kafka.m5.large"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.ebs_volume_size", "1"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.client_subnets.#", "3"), + resource.TestCheckResourceAttrPair(resourceName, "broker_node_group_info.0.client_subnets.0", "aws_subnet.example_subnet_az1", "id"), + resource.TestCheckResourceAttrPair(resourceName, "broker_node_group_info.0.client_subnets.1", "aws_subnet.example_subnet_az2", "id"), + resource.TestCheckResourceAttrPair(resourceName, "broker_node_group_info.0.client_subnets.2", "aws_subnet.example_subnet_az3", "id"), + resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.security_groups.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "broker_node_group_info.0.security_groups.0", "aws_security_group.example_sg", "id"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{ + "bootstrap_brokers", // API may mutate ordering and selection of brokers to return + }, + }, + }, + }) +} + +func testAccCheckMskClusterDestroy(s *terraform.State) error { + for _, rs := range s.RootModule().Resources { + if rs.Type != "aws_msk_cluster" { + continue + } + + conn := testAccProvider.Meta().(*AWSClient).kafkaconn + opts := &kafka.DescribeClusterInput{ + ClusterArn: aws.String(rs.Primary.ID), + } + + _, err := conn.DescribeCluster(opts) + if err != nil { + if isAWSErr(err, kafka.ErrCodeNotFoundException, "") { + continue + } + return err + } + } + return nil +} + +func testAccCheckMskClusterExists(n string, cluster *kafka.ClusterInfo) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[n] + if !ok { + return fmt.Errorf("Not found: %s", n) + } + + if rs.Primary.ID == "" { + return fmt.Errorf("No Cluster arn is set") + } + + conn := testAccProvider.Meta().(*AWSClient).kafkaconn + resp, err := conn.DescribeCluster(&kafka.DescribeClusterInput{ + ClusterArn: aws.String(rs.Primary.ID), + }) + if err != nil { + return fmt.Errorf("Error describing cluster: %s", err.Error()) + } + + *cluster = *resp.ClusterInfo + return nil + } +} + +func testAccLoadMskTags(cluster *kafka.ClusterInfo, td *kafka.ListTagsForResourceOutput) resource.TestCheckFunc { + return func(s *terraform.State) error { + conn := testAccProvider.Meta().(*AWSClient).kafkaconn + + tagOut, err := conn.ListTagsForResource(&kafka.ListTagsForResourceInput{ + ResourceArn: cluster.ClusterArn, + }) + if err != nil { + return err + } + if tagOut != nil { + *td = *tagOut + log.Printf("[DEBUG] loaded acceptance test tags: %v (from %v)", td, tagOut) + } + return nil + } +} + +func testAccCheckMskClusterTags(td *kafka.ListTagsForResourceOutput, key string, value string) resource.TestCheckFunc { + return func(s *terraform.State) error { + m := tagsToMapMskCluster(td.Tags) + v, ok := m[key] + if value != "" && !ok { + return fmt.Errorf("Missing tag: %s - (found tags %v)", key, m) + } else if value == "" && ok { + return fmt.Errorf("Extra tag: %s", key) + } + if value == "" { + return nil + } + if v != value { + return fmt.Errorf("%s: bad value: %s", key, v) + } + return nil + } +} + +func testAccMskClusterBaseConfig() string { + return fmt.Sprintf(` +resource "aws_vpc" "example_vpc" { + cidr_block = "192.168.0.0/22" + tags = { + Name = "tf-testacc-msk-cluster-vpc" + } +} + +data "aws_availability_zones" "available" { + state = "available" +} + +resource "aws_subnet" "example_subnet_az1" { + vpc_id = "${aws_vpc.example_vpc.id}" + cidr_block = "192.168.0.0/24" + availability_zone = "${data.aws_availability_zones.available.names[0]}" + tags = { + Name = "tf-testacc-msk-cluster-subnet-az1" + } +} +resource "aws_subnet" "example_subnet_az2" { + vpc_id = "${aws_vpc.example_vpc.id}" + cidr_block = "192.168.1.0/24" + availability_zone = "${data.aws_availability_zones.available.names[1]}" + tags = { + Name = "tf-testacc-msk-cluster-subnet-az2" + } +} +resource "aws_subnet" "example_subnet_az3" { + vpc_id = "${aws_vpc.example_vpc.id}" + cidr_block = "192.168.2.0/24" + availability_zone = "${data.aws_availability_zones.available.names[2]}" + tags = { + Name = "tf-testacc-msk-cluster-subnet-az3" + } +} + +resource "aws_security_group" "example_sg" { + vpc_id = "${aws_vpc.example_vpc.id}" +} +`) + +} +func testAccMskClusterConfig_basic(randInt int) string { + return testAccMskClusterBaseConfig() + fmt.Sprintf(` + +resource "aws_msk_cluster" "example" { + cluster_name = "tf-test-%d" + kafka_version = "1.1.1" + number_of_broker_nodes = 3 + broker_node_group_info { + instance_type = "kafka.m5.large" + ebs_volume_size = 10 + client_subnets = [ "${aws_subnet.example_subnet_az1.id}", "${aws_subnet.example_subnet_az2.id}", "${aws_subnet.example_subnet_az3.id}" ] + security_groups = [ "${aws_security_group.example_sg.id}" ] + } + tags = { + foo = "bar" + } +} +`, randInt) +} + +func testAccMskClusterConfig_kms(randInt int) string { + return testAccMskClusterBaseConfig() + fmt.Sprintf(` + +resource "aws_kms_key" "example_key" { + description = "tf-testacc-msk-cluster-kms" + tags = { + Name = "tf-testacc-msk-cluster-kms" + } +} + +resource "aws_msk_cluster" "example" { + cluster_name = "tf-test-%d" + kafka_version = "1.1.1" + number_of_broker_nodes = 3 + encryption_info { + encryption_at_rest_kms_id = "${aws_kms_key.example_key.key_id}" + } + broker_node_group_info { + instance_type = "kafka.m5.large" + ebs_volume_size = 10 + client_subnets = [ "${aws_subnet.example_subnet_az1.id}", "${aws_subnet.example_subnet_az2.id}", "${aws_subnet.example_subnet_az3.id}" ] + security_groups = [ "${aws_security_group.example_sg.id}" ] + } +} +`, randInt) + +} + +func testAccMskClusterConfig_enhancedMonitoring(randInt int) string { + return testAccMskClusterBaseConfig() + fmt.Sprintf(` + +resource "aws_msk_cluster" "example" { + cluster_name = "tf-test-%d" + kafka_version = "1.1.1" + number_of_broker_nodes = 3 + broker_node_group_info { + instance_type = "kafka.m5.large" + ebs_volume_size = 10 + client_subnets = [ "${aws_subnet.example_subnet_az1.id}", "${aws_subnet.example_subnet_az2.id}", "${aws_subnet.example_subnet_az3.id}" ] + security_groups = [ "${aws_security_group.example_sg.id}" ] + } + enhanced_monitoring = "PER_BROKER" +} +`, randInt) + +} + +func testAccMskClusterConfig_v2_1_0(randInt int) string { + return testAccMskClusterBaseConfig() + fmt.Sprintf(` + +resource "aws_msk_cluster" "example" { + cluster_name = "tf-test-%d" + kafka_version = "2.1.0" + number_of_broker_nodes = 3 + broker_node_group_info { + instance_type = "kafka.m5.large" + ebs_volume_size = 1 + client_subnets = [ "${aws_subnet.example_subnet_az1.id}", "${aws_subnet.example_subnet_az2.id}", "${aws_subnet.example_subnet_az3.id}" ] + security_groups = [ "${aws_security_group.example_sg.id}" ] + } +} +`, randInt) + +} + +func testAccMskClusterConfig_brokerNodes(randInt int) string { + return testAccMskClusterBaseConfig() + fmt.Sprintf(` + +resource "aws_msk_cluster" "example" { + cluster_name = "tf-test-%d" + kafka_version = "2.1.0" + number_of_broker_nodes = 6 + broker_node_group_info { + instance_type = "kafka.m5.large" + ebs_volume_size = 1 + client_subnets = [ "${aws_subnet.example_subnet_az1.id}", "${aws_subnet.example_subnet_az2.id}", "${aws_subnet.example_subnet_az3.id}" ] + security_groups = [ "${aws_security_group.example_sg.id}" ] + } +} +`, randInt) + +} + +func testAccMskClusterConfig_tagsUpdate(randInt int) string { + return testAccMskClusterBaseConfig() + fmt.Sprintf(` + +resource "aws_msk_cluster" "example" { + cluster_name = "tf-test-%d" + kafka_version = "1.1.1" + number_of_broker_nodes = 3 + broker_node_group_info { + instance_type = "kafka.m5.large" + ebs_volume_size = 10 + client_subnets = [ "${aws_subnet.example_subnet_az1.id}", "${aws_subnet.example_subnet_az2.id}", "${aws_subnet.example_subnet_az3.id}" ] + security_groups = [ "${aws_security_group.example_sg.id}" ] + } + tags = { + foo = "baz" + new = "type" + } +} +`, randInt) + +} diff --git a/aws/tags_msk.go b/aws/tags_msk.go new file mode 100644 index 00000000000..2e27426f69f --- /dev/null +++ b/aws/tags_msk.go @@ -0,0 +1,102 @@ +package aws + +import ( + "log" + "regexp" + + "github.com/aws/aws-sdk-go/aws" + kafka "github.com/aws/aws-sdk-go/service/kafka" + "github.com/hashicorp/terraform/helper/schema" +) + +// setTags is a helper to set the tags for a resource. It expects the +// tags field to be named "tags" +func setTagsMskCluster(conn *kafka.Kafka, d *schema.ResourceData, arn string) error { + if d.HasChange("tags") { + oraw, nraw := d.GetChange("tags") + o := oraw.(map[string]interface{}) + n := nraw.(map[string]interface{}) + create, remove := diffTagsMskCluster(tagsFromMapMskCluster(o), tagsFromMapMskCluster(n)) + + // Set tags + if len(remove) > 0 { + log.Printf("[DEBUG] Removing tags: %#v", remove) + keys := make([]*string, 0, len(remove)) + for k := range remove { + keys = append(keys, aws.String(k)) + } + _, err := conn.UntagResource(&kafka.UntagResourceInput{ + ResourceArn: aws.String(arn), + TagKeys: keys, + }) + if err != nil { + return err + } + } + if len(create) > 0 { + log.Printf("[DEBUG] Creating tags: %#v", create) + _, err := conn.TagResource(&kafka.TagResourceInput{ + ResourceArn: aws.String(arn), + Tags: create, + }) + if err != nil { + return err + } + } + } + return nil +} + +// diffTags takes our tags locally and the ones remotely and returns +// the set of tags that must be created, and the set of tags that must +// be destroyed. +func diffTagsMskCluster(oldTags, newTags map[string]*string) (map[string]*string, map[string]*string) { + + // Build the list of what to remove + remove := make(map[string]*string) + for k, v := range oldTags { + newVal, existsInNew := newTags[k] + if !existsInNew || *newVal != *v { + // Delete it! + remove[k] = v + } + } + return newTags, remove +} + +// tagsFromMap returns the tags for the given map of data. +func tagsFromMapMskCluster(m map[string]interface{}) map[string]*string { + result := make(map[string]*string) + for k, v := range m { + if !tagIgnoredMskCluster(k, v.(string)) { + result[k] = aws.String(v.(string)) + } + } + return result +} + +// tagsToMap turns the list of tags into a map. +func tagsToMapMskCluster(ts map[string]*string) map[string]string { + result := make(map[string]string) + for k, v := range ts { + if !tagIgnoredMskCluster(k, aws.StringValue(v)) { + result[k] = aws.StringValue(v) + } + } + return result +} + +// compare a tag against a list of strings and checks if it should +// be ignored or not +func tagIgnoredMskCluster(key, value string) bool { + filter := []string{"^aws:"} + for _, ignore := range filter { + log.Printf("[DEBUG] Matching %v with %v\n", ignore, key) + r, _ := regexp.MatchString(ignore, key) + if r { + log.Printf("[DEBUG] Found AWS specific tag %s (val %s), ignoring.\n", key, value) + return true + } + } + return false +} diff --git a/aws/tags_msk_test.go b/aws/tags_msk_test.go new file mode 100644 index 00000000000..a212c8aac73 --- /dev/null +++ b/aws/tags_msk_test.go @@ -0,0 +1,101 @@ +package aws + +import ( + "reflect" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kafka" +) + +func TestDiffMskClusterTags(t *testing.T) { + cases := []struct { + Old, New map[string]interface{} + Create, Remove map[string]string + }{ + // Basic add/remove + { + Old: map[string]interface{}{ + "foo": "bar", + }, + New: map[string]interface{}{ + "bar": "baz", + }, + Create: map[string]string{ + "bar": "baz", + }, + Remove: map[string]string{ + "foo": "bar", + }, + }, + + // Modify + { + Old: map[string]interface{}{ + "foo": "bar", + }, + New: map[string]interface{}{ + "foo": "baz", + }, + Create: map[string]string{ + "foo": "baz", + }, + Remove: map[string]string{ + "foo": "bar", + }, + }, + } + + for i, tc := range cases { + c, r := diffTagsMskCluster(tagsFromMapMskCluster(tc.Old), tagsFromMapMskCluster(tc.New)) + cm := tagsToMapMskCluster(c) + rm := tagsToMapMskCluster(r) + if !reflect.DeepEqual(cm, tc.Create) { + t.Fatalf("%d: bad create: %#v", i, cm) + } + if !reflect.DeepEqual(rm, tc.Remove) { + t.Fatalf("%d: bad remove: %#v", i, rm) + } + } +} + +func TestTagsToMapMskCluster(t *testing.T) { + source := map[string]*string{ + "foo": aws.String("bar"), + "bar": aws.String("baz"), + } + + inter := tagsToMapMskCluster(source) + t.Logf("%v -> %v", source, inter) + // final := tagsFromMapMskCluster(inter) + + //if !reflect.DeepEqual(source, final) { + //t.Fatalf("bad tag transformation: %v -> %v -> %v", source, inter, final) + //} +} + +func TestIgnoringTagsMskCluster(t *testing.T) { + ignoredTags := map[string]string{ + "aws:cloudformation:logical-id": "foo", + "aws:foo:bar": "baz", + } + for k, v := range ignoredTags { + if !tagIgnoredMskCluster(k, v) { + t.Fatalf("Tag %v with value %v not ignored, but should be!", k, v) + } + } +} + +func TestCheckMskClusterTags(t *testing.T) { + tags := make(map[string]*string) + tags["foo"] = aws.String("bar") + td := &kafka.ListTagsForResourceOutput{ + Tags: tags, + } + + testFunc := testAccCheckMskClusterTags(td, "foo", "bar") + err := testFunc(nil) + if err != nil { + t.Fatalf("Failed when expected to succeed: %s", err) + } +} diff --git a/website/aws.erb b/website/aws.erb index 4ff3f34ed0d..229824d9552 100644 --- a/website/aws.erb +++ b/website/aws.erb @@ -1876,6 +1876,16 @@ +
  • + MSK Resources + +
  • +
  • Neptune Resources