Skip to content

Commit

Permalink
r/aws_kinesis_firehose_delivery_stream - Send defaults for BufferInte…
Browse files Browse the repository at this point in the history
…rvalInSeconds

and BufferSizeInMBs

Send default processor parameter values to remove requirement to set
both 'BufferSizeInMBs' and 'BufferIntervalInSeconds' even if one is
using the default value.
  • Loading branch information
mtt88 committed Sep 23, 2022
1 parent da38070 commit 7f785a9
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 35 deletions.
113 changes: 78 additions & 35 deletions internal/service/firehose/delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func processingConfigurationSchema() *schema.Schema {
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"parameters": {
Type: schema.TypeList,
Type: schema.TypeSet,
Optional: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
Expand Down Expand Up @@ -648,18 +648,9 @@ func flattenProcessingConfiguration(pc *firehose.ProcessingConfiguration, roleAr
return []map[string]interface{}{}
}

defaultLambdaParams := defaultProcessorParameters(roleArn)
processingConfiguration := make([]map[string]interface{}, 1)

// It is necessary to explicitly filter this out
// to prevent diffs during routine use and retain the ability
// to show diffs if any field has drifted
defaultLambdaParams := map[string]string{
"NumberOfRetries": "3",
"RoleArn": roleArn,
"BufferSizeInMBs": "3",
"BufferIntervalInSeconds": "60",
}

processors := make([]interface{}, len(pc.Processors))
for i, p := range pc.Processors {
t := aws.StringValue(p.Type)
Expand Down Expand Up @@ -1668,10 +1659,10 @@ func expandS3BackupConfig(d map[string]interface{}) *firehose.S3DestinationConfi

func createExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3DestinationConfiguration {
s3 := d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{})

roleArn := s3["role_arn"].(string)
configuration := &firehose.ExtendedS3DestinationConfiguration{
BucketARN: aws.String(s3["bucket_arn"].(string)),
RoleARN: aws.String(s3["role_arn"].(string)),
RoleARN: aws.String(roleArn),
BufferingHints: &firehose.BufferingHints{
IntervalInSeconds: aws.Int64(int64(s3["buffer_interval"].(int))),
SizeInMBs: aws.Int64(int64(s3["buffer_size"].(int))),
Expand All @@ -1683,7 +1674,8 @@ func createExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat
}

if _, ok := s3["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(s3)
processingConfiguration := extractProcessingConfiguration(s3, defaultProcessorParameters(roleArn))
configuration.ProcessingConfiguration = processingConfiguration
}

if _, ok := s3["dynamic_partitioning_configuration"]; ok {
Expand Down Expand Up @@ -1762,9 +1754,15 @@ func updateS3BackupConfig(d map[string]interface{}) *firehose.S3DestinationUpdat
func updateExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3DestinationUpdate {
s3 := d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{})

roleArn := s3["role_arn"].(string)

defaultParams := defaultProcessorParameters(roleArn)

processingConfiguration := extractProcessingConfiguration(s3, defaultParams)

configuration := &firehose.ExtendedS3DestinationUpdate{
BucketARN: aws.String(s3["bucket_arn"].(string)),
RoleARN: aws.String(s3["role_arn"].(string)),
RoleARN: aws.String(roleArn),
BufferingHints: &firehose.BufferingHints{
IntervalInSeconds: aws.Int64((int64)(s3["buffer_interval"].(int))),
SizeInMBs: aws.Int64((int64)(s3["buffer_size"].(int))),
Expand All @@ -1775,7 +1773,7 @@ func updateExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat
EncryptionConfiguration: extractEncryptionConfiguration(s3),
DataFormatConversionConfiguration: expandDataFormatConversionConfiguration(s3["data_format_conversion_configuration"].([]interface{})),
CloudWatchLoggingOptions: extractCloudWatchLoggingConfiguration(s3),
ProcessingConfiguration: extractProcessingConfiguration(s3),
ProcessingConfiguration: processingConfiguration,
}

if _, ok := s3["cloudwatch_logging_options"]; ok {
Expand Down Expand Up @@ -1992,8 +1990,8 @@ func extractDynamicPartitioningConfiguration(s3 map[string]interface{}) *firehos
return DynamicPartitioningConfiguration
}

func extractProcessingConfiguration(s3 map[string]interface{}) *firehose.ProcessingConfiguration {
config := s3["processing_configuration"].([]interface{})
func extractProcessingConfiguration(configMap map[string]interface{}, defaultParams map[string]string) *firehose.ProcessingConfiguration {
config := configMap["processing_configuration"].([]interface{})
if len(config) == 0 || config[0] == nil {
// It is possible to just pass nil here, but this seems to be the
// canonical form that AWS uses, and is less likely to produce diffs.
Expand All @@ -2007,16 +2005,17 @@ func extractProcessingConfiguration(s3 map[string]interface{}) *firehose.Process

return &firehose.ProcessingConfiguration{
Enabled: aws.Bool(processingConfiguration["enabled"].(bool)),
Processors: extractProcessors(processingConfiguration["processors"].([]interface{})),
Processors: extractProcessors(processingConfiguration["processors"].([]interface{}), defaultParams),
}
}

func extractProcessors(processingConfigurationProcessors []interface{}) []*firehose.Processor {
func extractProcessors(processingConfigurationProcessors []interface{}, defaultParams map[string]string) []*firehose.Processor {
processors := []*firehose.Processor{}

for _, processor := range processingConfigurationProcessors {
extractedProcessor := extractProcessor(processor.(map[string]interface{}))
if extractedProcessor != nil {
extractedProcessor = mergeDefaultProcessingParameters(extractedProcessor, defaultParams)
processors = append(processors, extractedProcessor)
}
}
Expand All @@ -2030,7 +2029,7 @@ func extractProcessor(processingConfigurationProcessor map[string]interface{}) *
if processorType != "" {
processor = &firehose.Processor{
Type: aws.String(processorType),
Parameters: extractProcessorParameters(processingConfigurationProcessor["parameters"].([]interface{})),
Parameters: extractProcessorParameters(processingConfigurationProcessor["parameters"].(*schema.Set).List()),
}
}
return processor
Expand Down Expand Up @@ -2124,12 +2123,13 @@ func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3Destinati

redshift := rl[0].(map[string]interface{})

roleArn := redshift["role_arn"].(string)
configuration := &firehose.RedshiftDestinationConfiguration{
ClusterJDBCURL: aws.String(redshift["cluster_jdbcurl"].(string)),
RetryOptions: extractRedshiftRetryOptions(redshift),
Password: aws.String(redshift["password"].(string)),
Username: aws.String(redshift["username"].(string)),
RoleARN: aws.String(redshift["role_arn"].(string)),
RoleARN: aws.String(roleArn),
CopyCommand: extractCopyCommandConfiguration(redshift),
S3Configuration: s3Config,
}
Expand All @@ -2138,7 +2138,7 @@ func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3Destinati
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift)
}
if _, ok := redshift["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(redshift)
configuration.ProcessingConfiguration = extractProcessingConfiguration(redshift, defaultProcessorParameters(roleArn))
}
if s3BackupMode, ok := redshift["s3_backup_mode"]; ok {
configuration.S3BackupMode = aws.String(s3BackupMode.(string))
Expand All @@ -2157,12 +2157,13 @@ func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3Destinati

redshift := rl[0].(map[string]interface{})

roleArn := redshift["role_arn"].(string)
configuration := &firehose.RedshiftDestinationUpdate{
ClusterJDBCURL: aws.String(redshift["cluster_jdbcurl"].(string)),
RetryOptions: extractRedshiftRetryOptions(redshift),
Password: aws.String(redshift["password"].(string)),
Username: aws.String(redshift["username"].(string)),
RoleARN: aws.String(redshift["role_arn"].(string)),
RoleARN: aws.String(roleArn),
CopyCommand: extractCopyCommandConfiguration(redshift),
S3Update: s3Update,
}
Expand All @@ -2171,7 +2172,7 @@ func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3Destinati
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift)
}
if _, ok := redshift["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(redshift)
configuration.ProcessingConfiguration = extractProcessingConfiguration(redshift, defaultProcessorParameters(roleArn))
}
if s3BackupMode, ok := redshift["s3_backup_mode"]; ok {
configuration.S3BackupMode = aws.String(s3BackupMode.(string))
Expand All @@ -2196,11 +2197,12 @@ func createElasticsearchConfig(d *schema.ResourceData, s3Config *firehose.S3Dest

es := esList[0].(map[string]interface{})

roleArn := es["role_arn"].(string)
config := &firehose.ElasticsearchDestinationConfiguration{
BufferingHints: extractBufferingHints(es),
IndexName: aws.String(es["index_name"].(string)),
RetryOptions: extractElasticsearchRetryOptions(es),
RoleARN: aws.String(es["role_arn"].(string)),
RoleARN: aws.String(roleArn),
TypeName: aws.String(es["type_name"].(string)),
S3Configuration: s3Config,
}
Expand All @@ -2218,7 +2220,7 @@ func createElasticsearchConfig(d *schema.ResourceData, s3Config *firehose.S3Dest
}

if _, ok := es["processing_configuration"]; ok {
config.ProcessingConfiguration = extractProcessingConfiguration(es)
config.ProcessingConfiguration = extractProcessingConfiguration(es, defaultProcessorParameters(roleArn))
}

if indexRotationPeriod, ok := es["index_rotation_period"]; ok {
Expand All @@ -2244,11 +2246,12 @@ func updateElasticsearchConfig(d *schema.ResourceData, s3Update *firehose.S3Dest

es := esList[0].(map[string]interface{})

roleArn := es["role_arn"].(string)
update := &firehose.ElasticsearchDestinationUpdate{
BufferingHints: extractBufferingHints(es),
IndexName: aws.String(es["index_name"].(string)),
RetryOptions: extractElasticsearchRetryOptions(es),
RoleARN: aws.String(es["role_arn"].(string)),
RoleARN: aws.String(roleArn),
TypeName: aws.String(es["type_name"].(string)),
S3Update: s3Update,
}
Expand All @@ -2266,7 +2269,7 @@ func updateElasticsearchConfig(d *schema.ResourceData, s3Update *firehose.S3Dest
}

if _, ok := es["processing_configuration"]; ok {
update.ProcessingConfiguration = extractProcessingConfiguration(es)
update.ProcessingConfiguration = extractProcessingConfiguration(es, defaultProcessorParameters(roleArn))
}

if indexRotationPeriod, ok := es["index_rotation_period"]; ok {
Expand Down Expand Up @@ -2295,7 +2298,7 @@ func createSplunkConfig(d *schema.ResourceData, s3Config *firehose.S3Destination
}

if _, ok := splunk["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(splunk)
configuration.ProcessingConfiguration = extractProcessingConfiguration(splunk, defaultProcessorParameters(""))
}

if _, ok := splunk["cloudwatch_logging_options"]; ok {
Expand Down Expand Up @@ -2327,7 +2330,7 @@ func updateSplunkConfig(d *schema.ResourceData, s3Update *firehose.S3Destination
}

if _, ok := splunk["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(splunk)
configuration.ProcessingConfiguration = extractProcessingConfiguration(splunk, defaultProcessorParameters(""))
}

if _, ok := splunk["cloudwatch_logging_options"]; ok {
Expand All @@ -2349,9 +2352,10 @@ func createHTTPEndpointConfig(d *schema.ResourceData, s3Config *firehose.S3Desti

HttpEndpoint := sl[0].(map[string]interface{})

roleArn := HttpEndpoint["role_arn"].(string)
configuration := &firehose.HttpEndpointDestinationConfiguration{
RetryOptions: extractHTTPEndpointRetryOptions(HttpEndpoint),
RoleARN: aws.String(HttpEndpoint["role_arn"].(string)),
RoleARN: aws.String(roleArn),
S3Configuration: s3Config,
}

Expand All @@ -2368,7 +2372,7 @@ func createHTTPEndpointConfig(d *schema.ResourceData, s3Config *firehose.S3Desti
configuration.BufferingHints = bufferingHints

if _, ok := HttpEndpoint["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(HttpEndpoint)
configuration.ProcessingConfiguration = extractProcessingConfiguration(HttpEndpoint, defaultProcessorParameters(roleArn))
}

if _, ok := HttpEndpoint["request_configuration"]; ok {
Expand All @@ -2394,9 +2398,10 @@ func updateHTTPEndpointConfig(d *schema.ResourceData, s3Update *firehose.S3Desti

HttpEndpoint := sl[0].(map[string]interface{})

roleArn := HttpEndpoint["role_arn"].(string)
configuration := &firehose.HttpEndpointDestinationUpdate{
RetryOptions: extractHTTPEndpointRetryOptions(HttpEndpoint),
RoleARN: aws.String(HttpEndpoint["role_arn"].(string)),
RoleARN: aws.String(roleArn),
S3Update: s3Update,
}

Expand All @@ -2413,7 +2418,7 @@ func updateHTTPEndpointConfig(d *schema.ResourceData, s3Update *firehose.S3Desti
configuration.BufferingHints = bufferingHints

if _, ok := HttpEndpoint["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(HttpEndpoint)
configuration.ProcessingConfiguration = extractProcessingConfiguration(HttpEndpoint, defaultProcessorParameters(roleArn))
}

if _, ok := HttpEndpoint["request_configuration"]; ok {
Expand Down Expand Up @@ -2965,3 +2970,41 @@ func expandDeliveryStreamEncryptionConfigurationInput(tfList []interface{}) *fir

return apiObject
}

func defaultProcessorParameters(roleArn string) map[string]string {
defaultParams := map[string]string{
"NumberOfRetries": "3",
"BufferSizeInMBs": "3",
"BufferIntervalInSeconds": "60",
}
if roleArn != "" {
defaultParams["RoleArn"] = roleArn
}
return defaultParams
}

func mergeDefaultProcessingParameters(processor *firehose.Processor, toMerge map[string]string) *firehose.Processor {
if aws.StringValue(processor.Type) != firehose.ProcessorTypeLambda {
return processor
}
params := processor.Parameters
for key, value := range toMerge {
found := false
for _, param := range processor.Parameters {
if key == aws.StringValue(param.ParameterName) {
found = true
continue
}
}
if !found {
params = append(params, &firehose.ProcessorParameter{
ParameterName: aws.String(key),
ParameterValue: aws.String(value),
})
}
}
return &firehose.Processor{
Type: processor.Type,
Parameters: params,
}
}
Loading

0 comments on commit 7f785a9

Please sign in to comment.