From 7f785a9ef51ff30b15127a467e483e5e061bd28d Mon Sep 17 00:00:00 2001 From: Matthew Sladen Date: Fri, 23 Sep 2022 21:00:04 +0100 Subject: [PATCH] r/aws_kinesis_firehose_delivery_stream - Send defaults for BufferIntervalInSeconds and BufferSizeInMBs Send default processor parameter values to remove requirement to set both 'BufferSizeInMBs' and 'BufferIntervalInSeconds' even if one is using the default value. --- internal/service/firehose/delivery_stream.go | 113 ++++++++++++------ .../service/firehose/delivery_stream_test.go | 79 ++++++++++++ 2 files changed, 157 insertions(+), 35 deletions(-) diff --git a/internal/service/firehose/delivery_stream.go b/internal/service/firehose/delivery_stream.go index 74751aeb147..66175230953 100644 --- a/internal/service/firehose/delivery_stream.go +++ b/internal/service/firehose/delivery_stream.go @@ -197,7 +197,7 @@ func processingConfigurationSchema() *schema.Schema { Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "parameters": { - Type: schema.TypeList, + Type: schema.TypeSet, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ @@ -648,18 +648,9 @@ func flattenProcessingConfiguration(pc *firehose.ProcessingConfiguration, roleAr return []map[string]interface{}{} } + defaultLambdaParams := defaultProcessorParameters(roleArn) processingConfiguration := make([]map[string]interface{}, 1) - // It is necessary to explicitly filter this out - // to prevent diffs during routine use and retain the ability - // to show diffs if any field has drifted - defaultLambdaParams := map[string]string{ - "NumberOfRetries": "3", - "RoleArn": roleArn, - "BufferSizeInMBs": "3", - "BufferIntervalInSeconds": "60", - } - processors := make([]interface{}, len(pc.Processors)) for i, p := range pc.Processors { t := aws.StringValue(p.Type) @@ -1668,10 +1659,10 @@ func expandS3BackupConfig(d map[string]interface{}) *firehose.S3DestinationConfi func createExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3DestinationConfiguration { s3 := d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{}) - + roleArn := s3["role_arn"].(string) configuration := &firehose.ExtendedS3DestinationConfiguration{ BucketARN: aws.String(s3["bucket_arn"].(string)), - RoleARN: aws.String(s3["role_arn"].(string)), + RoleARN: aws.String(roleArn), BufferingHints: &firehose.BufferingHints{ IntervalInSeconds: aws.Int64(int64(s3["buffer_interval"].(int))), SizeInMBs: aws.Int64(int64(s3["buffer_size"].(int))), @@ -1683,7 +1674,8 @@ func createExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat } if _, ok := s3["processing_configuration"]; ok { - configuration.ProcessingConfiguration = extractProcessingConfiguration(s3) + processingConfiguration := extractProcessingConfiguration(s3, defaultProcessorParameters(roleArn)) + configuration.ProcessingConfiguration = processingConfiguration } if _, ok := s3["dynamic_partitioning_configuration"]; ok { @@ -1762,9 +1754,15 @@ func updateS3BackupConfig(d map[string]interface{}) *firehose.S3DestinationUpdat func updateExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3DestinationUpdate { s3 := d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{}) + roleArn := s3["role_arn"].(string) + + defaultParams := defaultProcessorParameters(roleArn) + + processingConfiguration := extractProcessingConfiguration(s3, defaultParams) + configuration := &firehose.ExtendedS3DestinationUpdate{ BucketARN: aws.String(s3["bucket_arn"].(string)), - RoleARN: aws.String(s3["role_arn"].(string)), + RoleARN: aws.String(roleArn), BufferingHints: &firehose.BufferingHints{ IntervalInSeconds: aws.Int64((int64)(s3["buffer_interval"].(int))), SizeInMBs: aws.Int64((int64)(s3["buffer_size"].(int))), @@ -1775,7 +1773,7 @@ func updateExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat EncryptionConfiguration: extractEncryptionConfiguration(s3), DataFormatConversionConfiguration: expandDataFormatConversionConfiguration(s3["data_format_conversion_configuration"].([]interface{})), CloudWatchLoggingOptions: extractCloudWatchLoggingConfiguration(s3), - ProcessingConfiguration: extractProcessingConfiguration(s3), + ProcessingConfiguration: processingConfiguration, } if _, ok := s3["cloudwatch_logging_options"]; ok { @@ -1992,8 +1990,8 @@ func extractDynamicPartitioningConfiguration(s3 map[string]interface{}) *firehos return DynamicPartitioningConfiguration } -func extractProcessingConfiguration(s3 map[string]interface{}) *firehose.ProcessingConfiguration { - config := s3["processing_configuration"].([]interface{}) +func extractProcessingConfiguration(configMap map[string]interface{}, defaultParams map[string]string) *firehose.ProcessingConfiguration { + config := configMap["processing_configuration"].([]interface{}) if len(config) == 0 || config[0] == nil { // It is possible to just pass nil here, but this seems to be the // canonical form that AWS uses, and is less likely to produce diffs. @@ -2007,16 +2005,17 @@ func extractProcessingConfiguration(s3 map[string]interface{}) *firehose.Process return &firehose.ProcessingConfiguration{ Enabled: aws.Bool(processingConfiguration["enabled"].(bool)), - Processors: extractProcessors(processingConfiguration["processors"].([]interface{})), + Processors: extractProcessors(processingConfiguration["processors"].([]interface{}), defaultParams), } } -func extractProcessors(processingConfigurationProcessors []interface{}) []*firehose.Processor { +func extractProcessors(processingConfigurationProcessors []interface{}, defaultParams map[string]string) []*firehose.Processor { processors := []*firehose.Processor{} for _, processor := range processingConfigurationProcessors { extractedProcessor := extractProcessor(processor.(map[string]interface{})) if extractedProcessor != nil { + extractedProcessor = mergeDefaultProcessingParameters(extractedProcessor, defaultParams) processors = append(processors, extractedProcessor) } } @@ -2030,7 +2029,7 @@ func extractProcessor(processingConfigurationProcessor map[string]interface{}) * if processorType != "" { processor = &firehose.Processor{ Type: aws.String(processorType), - Parameters: extractProcessorParameters(processingConfigurationProcessor["parameters"].([]interface{})), + Parameters: extractProcessorParameters(processingConfigurationProcessor["parameters"].(*schema.Set).List()), } } return processor @@ -2124,12 +2123,13 @@ func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3Destinati redshift := rl[0].(map[string]interface{}) + roleArn := redshift["role_arn"].(string) configuration := &firehose.RedshiftDestinationConfiguration{ ClusterJDBCURL: aws.String(redshift["cluster_jdbcurl"].(string)), RetryOptions: extractRedshiftRetryOptions(redshift), Password: aws.String(redshift["password"].(string)), Username: aws.String(redshift["username"].(string)), - RoleARN: aws.String(redshift["role_arn"].(string)), + RoleARN: aws.String(roleArn), CopyCommand: extractCopyCommandConfiguration(redshift), S3Configuration: s3Config, } @@ -2138,7 +2138,7 @@ func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3Destinati configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift) } if _, ok := redshift["processing_configuration"]; ok { - configuration.ProcessingConfiguration = extractProcessingConfiguration(redshift) + configuration.ProcessingConfiguration = extractProcessingConfiguration(redshift, defaultProcessorParameters(roleArn)) } if s3BackupMode, ok := redshift["s3_backup_mode"]; ok { configuration.S3BackupMode = aws.String(s3BackupMode.(string)) @@ -2157,12 +2157,13 @@ func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3Destinati redshift := rl[0].(map[string]interface{}) + roleArn := redshift["role_arn"].(string) configuration := &firehose.RedshiftDestinationUpdate{ ClusterJDBCURL: aws.String(redshift["cluster_jdbcurl"].(string)), RetryOptions: extractRedshiftRetryOptions(redshift), Password: aws.String(redshift["password"].(string)), Username: aws.String(redshift["username"].(string)), - RoleARN: aws.String(redshift["role_arn"].(string)), + RoleARN: aws.String(roleArn), CopyCommand: extractCopyCommandConfiguration(redshift), S3Update: s3Update, } @@ -2171,7 +2172,7 @@ func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3Destinati configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift) } if _, ok := redshift["processing_configuration"]; ok { - configuration.ProcessingConfiguration = extractProcessingConfiguration(redshift) + configuration.ProcessingConfiguration = extractProcessingConfiguration(redshift, defaultProcessorParameters(roleArn)) } if s3BackupMode, ok := redshift["s3_backup_mode"]; ok { configuration.S3BackupMode = aws.String(s3BackupMode.(string)) @@ -2196,11 +2197,12 @@ func createElasticsearchConfig(d *schema.ResourceData, s3Config *firehose.S3Dest es := esList[0].(map[string]interface{}) + roleArn := es["role_arn"].(string) config := &firehose.ElasticsearchDestinationConfiguration{ BufferingHints: extractBufferingHints(es), IndexName: aws.String(es["index_name"].(string)), RetryOptions: extractElasticsearchRetryOptions(es), - RoleARN: aws.String(es["role_arn"].(string)), + RoleARN: aws.String(roleArn), TypeName: aws.String(es["type_name"].(string)), S3Configuration: s3Config, } @@ -2218,7 +2220,7 @@ func createElasticsearchConfig(d *schema.ResourceData, s3Config *firehose.S3Dest } if _, ok := es["processing_configuration"]; ok { - config.ProcessingConfiguration = extractProcessingConfiguration(es) + config.ProcessingConfiguration = extractProcessingConfiguration(es, defaultProcessorParameters(roleArn)) } if indexRotationPeriod, ok := es["index_rotation_period"]; ok { @@ -2244,11 +2246,12 @@ func updateElasticsearchConfig(d *schema.ResourceData, s3Update *firehose.S3Dest es := esList[0].(map[string]interface{}) + roleArn := es["role_arn"].(string) update := &firehose.ElasticsearchDestinationUpdate{ BufferingHints: extractBufferingHints(es), IndexName: aws.String(es["index_name"].(string)), RetryOptions: extractElasticsearchRetryOptions(es), - RoleARN: aws.String(es["role_arn"].(string)), + RoleARN: aws.String(roleArn), TypeName: aws.String(es["type_name"].(string)), S3Update: s3Update, } @@ -2266,7 +2269,7 @@ func updateElasticsearchConfig(d *schema.ResourceData, s3Update *firehose.S3Dest } if _, ok := es["processing_configuration"]; ok { - update.ProcessingConfiguration = extractProcessingConfiguration(es) + update.ProcessingConfiguration = extractProcessingConfiguration(es, defaultProcessorParameters(roleArn)) } if indexRotationPeriod, ok := es["index_rotation_period"]; ok { @@ -2295,7 +2298,7 @@ func createSplunkConfig(d *schema.ResourceData, s3Config *firehose.S3Destination } if _, ok := splunk["processing_configuration"]; ok { - configuration.ProcessingConfiguration = extractProcessingConfiguration(splunk) + configuration.ProcessingConfiguration = extractProcessingConfiguration(splunk, defaultProcessorParameters("")) } if _, ok := splunk["cloudwatch_logging_options"]; ok { @@ -2327,7 +2330,7 @@ func updateSplunkConfig(d *schema.ResourceData, s3Update *firehose.S3Destination } if _, ok := splunk["processing_configuration"]; ok { - configuration.ProcessingConfiguration = extractProcessingConfiguration(splunk) + configuration.ProcessingConfiguration = extractProcessingConfiguration(splunk, defaultProcessorParameters("")) } if _, ok := splunk["cloudwatch_logging_options"]; ok { @@ -2349,9 +2352,10 @@ func createHTTPEndpointConfig(d *schema.ResourceData, s3Config *firehose.S3Desti HttpEndpoint := sl[0].(map[string]interface{}) + roleArn := HttpEndpoint["role_arn"].(string) configuration := &firehose.HttpEndpointDestinationConfiguration{ RetryOptions: extractHTTPEndpointRetryOptions(HttpEndpoint), - RoleARN: aws.String(HttpEndpoint["role_arn"].(string)), + RoleARN: aws.String(roleArn), S3Configuration: s3Config, } @@ -2368,7 +2372,7 @@ func createHTTPEndpointConfig(d *schema.ResourceData, s3Config *firehose.S3Desti configuration.BufferingHints = bufferingHints if _, ok := HttpEndpoint["processing_configuration"]; ok { - configuration.ProcessingConfiguration = extractProcessingConfiguration(HttpEndpoint) + configuration.ProcessingConfiguration = extractProcessingConfiguration(HttpEndpoint, defaultProcessorParameters(roleArn)) } if _, ok := HttpEndpoint["request_configuration"]; ok { @@ -2394,9 +2398,10 @@ func updateHTTPEndpointConfig(d *schema.ResourceData, s3Update *firehose.S3Desti HttpEndpoint := sl[0].(map[string]interface{}) + roleArn := HttpEndpoint["role_arn"].(string) configuration := &firehose.HttpEndpointDestinationUpdate{ RetryOptions: extractHTTPEndpointRetryOptions(HttpEndpoint), - RoleARN: aws.String(HttpEndpoint["role_arn"].(string)), + RoleARN: aws.String(roleArn), S3Update: s3Update, } @@ -2413,7 +2418,7 @@ func updateHTTPEndpointConfig(d *schema.ResourceData, s3Update *firehose.S3Desti configuration.BufferingHints = bufferingHints if _, ok := HttpEndpoint["processing_configuration"]; ok { - configuration.ProcessingConfiguration = extractProcessingConfiguration(HttpEndpoint) + configuration.ProcessingConfiguration = extractProcessingConfiguration(HttpEndpoint, defaultProcessorParameters(roleArn)) } if _, ok := HttpEndpoint["request_configuration"]; ok { @@ -2965,3 +2970,41 @@ func expandDeliveryStreamEncryptionConfigurationInput(tfList []interface{}) *fir return apiObject } + +func defaultProcessorParameters(roleArn string) map[string]string { + defaultParams := map[string]string{ + "NumberOfRetries": "3", + "BufferSizeInMBs": "3", + "BufferIntervalInSeconds": "60", + } + if roleArn != "" { + defaultParams["RoleArn"] = roleArn + } + return defaultParams +} + +func mergeDefaultProcessingParameters(processor *firehose.Processor, toMerge map[string]string) *firehose.Processor { + if aws.StringValue(processor.Type) != firehose.ProcessorTypeLambda { + return processor + } + params := processor.Parameters + for key, value := range toMerge { + found := false + for _, param := range processor.Parameters { + if key == aws.StringValue(param.ParameterName) { + found = true + continue + } + } + if !found { + params = append(params, &firehose.ProcessorParameter{ + ParameterName: aws.String(key), + ParameterValue: aws.String(value), + }) + } + } + return &firehose.Processor{ + Type: processor.Type, + Parameters: params, + } +} diff --git a/internal/service/firehose/delivery_stream_test.go b/internal/service/firehose/delivery_stream_test.go index f9c37d2826f..a0103ebb2c4 100644 --- a/internal/service/firehose/delivery_stream_test.go +++ b/internal/service/firehose/delivery_stream_test.go @@ -975,6 +975,32 @@ func TestAccFirehoseDeliveryStream_extendedS3Updates(t *testing.T) { S3BackupMode: aws.String("Enabled"), } + secondUpdateExtendedS3DestinationConfig := &firehose.ExtendedS3DestinationDescription{ + BufferingHints: &firehose.BufferingHints{ + IntervalInSeconds: aws.Int64(400), + SizeInMBs: aws.Int64(10), + }, + ProcessingConfiguration: &firehose.ProcessingConfiguration{ + Enabled: aws.Bool(true), + Processors: []*firehose.Processor{ + { + Type: aws.String("Lambda"), + Parameters: []*firehose.ProcessorParameter{ + { + ParameterName: aws.String("LambdaArn"), + ParameterValue: aws.String("valueNotTested"), + }, + { + ParameterName: aws.String("BufferIntervalInSeconds"), + ParameterValue: aws.String("201"), + }, + }, + }, + }, + }, + S3BackupMode: aws.String("Enabled"), + } + removeProcessorsExtendedS3DestinationConfig := &firehose.ExtendedS3DestinationDescription{ BufferingHints: &firehose.BufferingHints{ IntervalInSeconds: aws.Int64(400), @@ -1012,6 +1038,13 @@ func TestAccFirehoseDeliveryStream_extendedS3Updates(t *testing.T) { testAccCheckDeliveryStreamAttributes(&stream, nil, firstUpdateExtendedS3DestinationConfig, nil, nil, nil, nil), ), }, + { + Config: testAccDeliveryStreamConfig_extendedS3UpdatesSetBufferIntervalNoBufferSize(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckDeliveryStreamExists(resourceName, &stream), + testAccCheckDeliveryStreamAttributes(&stream, nil, secondUpdateExtendedS3DestinationConfig, nil, nil, nil, nil), + ), + }, { Config: testAccDeliveryStreamConfig_extendedS3UpdatesRemoveProcessors(rName), Check: resource.ComposeTestCheckFunc( @@ -2994,6 +3027,52 @@ resource "aws_kinesis_firehose_delivery_stream" "test" { `, rName)) } +func testAccDeliveryStreamConfig_extendedS3UpdatesSetBufferIntervalNoBufferSize(rName string) string { + return acctest.ConfigCompose( + testAccLambdaBasicConfig(rName), + testAccDeliveryStreamBaseConfig(rName), + fmt.Sprintf(` +resource "aws_kinesis_firehose_delivery_stream" "test" { + depends_on = [aws_iam_role_policy.firehose] + name = %[1]q + destination = "extended_s3" + + extended_s3_configuration { + role_arn = aws_iam_role.firehose.arn + bucket_arn = aws_s3_bucket.bucket.arn + + processing_configuration { + enabled = true + + processors { + type = "Lambda" + + parameters { + parameter_name = "LambdaArn" + parameter_value = "${aws_lambda_function.lambda_function_test.arn}:$LATEST" + } + + parameters { + parameter_name = "BufferIntervalInSeconds" + parameter_value = 201 + } + } + } + + buffer_size = 10 + buffer_interval = 400 + compression_format = "GZIP" + s3_backup_mode = "Enabled" + + s3_backup_configuration { + role_arn = aws_iam_role.firehose.arn + bucket_arn = aws_s3_bucket.bucket.arn + } + } +} +`, rName)) +} + func testAccDeliveryStreamConfig_extendedS3UpdatesRemoveProcessors(rName string) string { return acctest.ConfigCompose( testAccLambdaBasicConfig(rName),