Skip to content

Commit

Permalink
Merge pull request #29269 from hashicorp/b-dynamodb-pitr-recreate-on-…
Browse files Browse the repository at this point in the history
…enable

dynamodb/table: Not recreate on PITR enable
  • Loading branch information
YakDriver authored Feb 9, 2023
2 parents 8dcb7af + 4b76aeb commit 94ca3c4
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 35 deletions.
7 changes: 7 additions & 0 deletions .changelog/29269.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:bug
resource/aws_dynamodb_table: Avoid recreating table replicas when enabling PITR on them
```

```release-note:enhancement
resource/aws_dynamodb_table: Add `arn`, `stream_arn`, and `stream_label` attributes to `replica` to obtain this information for replicas
```
18 changes: 18 additions & 0 deletions GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SVC_DIR ?= ./internal/service
TEST_COUNT ?= 1
ACCTEST_TIMEOUT ?= 180m
ACCTEST_PARALLELISM ?= 20
P ?= 20
GO_VER ?= go
SWEEP_TIMEOUT ?= 60m

Expand All @@ -14,10 +15,19 @@ ifneq ($(origin PKG), undefined)
TEST = ./$(PKG_NAME)/...
endif

ifneq ($(origin K), undefined)
PKG_NAME = internal/service/$(K)
TEST = ./$(PKG_NAME)/...
endif

ifneq ($(origin TESTS), undefined)
RUNARGS = -run='$(TESTS)'
endif

ifneq ($(origin T), undefined)
RUNARGS = -run='$(T)'
endif

ifneq ($(origin SWEEPERS), undefined)
SWEEPARGS = -sweep-run='$(SWEEPERS)'
endif
Expand Down Expand Up @@ -57,6 +67,10 @@ ifeq ($(PKG_NAME), internal/service/wavelength)
TEST = ./$(PKG_NAME)/...
endif

ifneq ($(P), 20)
ACCTEST_PARALLELISM = $(P)
endif

default: build

build: fmtcheck
Expand Down Expand Up @@ -101,6 +115,9 @@ testacc: fmtcheck
fi
TF_ACC=1 $(GO_VER) test ./$(PKG_NAME)/... -v -count $(TEST_COUNT) -parallel $(ACCTEST_PARALLELISM) $(RUNARGS) $(TESTARGS) -timeout $(ACCTEST_TIMEOUT)

t: fmtcheck
TF_ACC=1 $(GO_VER) test ./$(PKG_NAME)/... -v -count $(TEST_COUNT) -parallel $(ACCTEST_PARALLELISM) $(RUNARGS) $(TESTARGS) -timeout $(ACCTEST_TIMEOUT)

testacc-lint:
@echo "Checking acceptance tests with terrafmt"
find $(SVC_DIR) -type f -name '*_test.go' \
Expand Down Expand Up @@ -297,6 +314,7 @@ yamllint:
build \
gen \
sweep \
t \
test \
testacc \
testacc-lint \
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ require (
github.com/hashicorp/terraform-plugin-framework-timeouts v0.3.0
github.com/hashicorp/terraform-plugin-framework-validators v0.9.0
github.com/hashicorp/terraform-plugin-go v0.14.3
github.com/hashicorp/terraform-plugin-log v0.7.0
github.com/hashicorp/terraform-plugin-log v0.8.0
github.com/hashicorp/terraform-plugin-mux v0.8.0
github.com/hashicorp/terraform-plugin-sdk/v2 v2.24.1
github.com/jmespath/go-jmespath v0.4.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ github.com/hashicorp/terraform-plugin-framework-validators v0.9.0 h1:LYz4bXh3t7b
github.com/hashicorp/terraform-plugin-framework-validators v0.9.0/go.mod h1:+BVERsnfdlhYR2YkXMBtPnmn9UsL19U3qUtSZ+Y/5MY=
github.com/hashicorp/terraform-plugin-go v0.14.3 h1:nlnJ1GXKdMwsC8g1Nh05tK2wsC3+3BL/DBBxFEki+j0=
github.com/hashicorp/terraform-plugin-go v0.14.3/go.mod h1:7ees7DMZ263q8wQ6E4RdIdR6nHHJtrdt4ogX5lPkX1A=
github.com/hashicorp/terraform-plugin-log v0.7.0 h1:SDxJUyT8TwN4l5b5/VkiTIaQgY6R+Y2BQ0sRZftGKQs=
github.com/hashicorp/terraform-plugin-log v0.7.0/go.mod h1:p4R1jWBXRTvL4odmEkFfDdhUjHf9zcs/BCoNHAc7IK4=
github.com/hashicorp/terraform-plugin-log v0.8.0 h1:pX2VQ/TGKu+UU1rCay0OlzosNKe4Nz1pepLXj95oyy0=
github.com/hashicorp/terraform-plugin-log v0.8.0/go.mod h1:1myFrhVsBLeylQzYYEV17VVjtG8oYPRFdaZs7xdW2xs=
github.com/hashicorp/terraform-plugin-mux v0.8.0 h1:WCTP66mZ+iIaIrCNJnjPEYnVjawTshnDJu12BcXK1EI=
github.com/hashicorp/terraform-plugin-mux v0.8.0/go.mod h1:vdW0daEi8Kd4RFJmet5Ot+SIVB/B8SwQVJiYKQwdCy8=
github.com/hashicorp/terraform-plugin-sdk/v2 v2.24.1 h1:zHcMbxY0+rFO9gY99elV/XC/UnQVg7FhRCbj1i5b7vM=
Expand Down
167 changes: 143 additions & 24 deletions internal/service/dynamodb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ func ResourceTable() *schema.Resource {
Optional: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"arn": {
Type: schema.TypeString,
Computed: true,
},
"kms_key_arn": {
Type: schema.TypeString,
Optional: true,
Expand All @@ -282,6 +286,14 @@ func ResourceTable() *schema.Resource {
Required: true,
// update is equivalent of force a new *replica*, not table
},
"stream_arn": {
Type: schema.TypeString,
Computed: true,
},
"stream_label": {
Type: schema.TypeString,
Computed: true,
},
},
},
},
Expand Down Expand Up @@ -666,6 +678,10 @@ func resourceTableRead(ctx context.Context, d *schema.ResourceData, meta interfa
return create.DiagError(names.DynamoDB, create.ErrActionReading, ResNameTable, d.Id(), err)
}

if replicas, err = enrichReplicas(ctx, conn, aws.StringValue(table.TableArn), d.Id(), meta.(*conns.AWSClient).TerraformVersion, replicas); err != nil {
return create.DiagError(names.DynamoDB, create.ErrActionReading, ResNameTable, d.Id(), err)
}

replicas = addReplicaTagPropagates(d.Get("replica").(*schema.Set), replicas)
replicas = clearReplicaDefaultKeys(ctx, replicas, meta)

Expand Down Expand Up @@ -1099,9 +1115,14 @@ func createReplicas(ctx context.Context, conn *dynamodb.DynamoDB, tableName stri
},
}

// currently this would not be needed because (replica has these arguments):
// region_name can't be updated - new replica
// kms_key_arn can't be updated - remove/add replica
// propagate_tags - handled elsewhere
// point_in_time_recovery - handled elsewhere
// if provisioned_throughput_override or table_class_override were added, they could be updated here
if !create {
var replicaInput = &dynamodb.UpdateReplicationGroupMemberAction{}

if v, ok := tfMap["region_name"].(string); ok && v != "" {
replicaInput.RegionName = aws.String(v)
}
Expand Down Expand Up @@ -1145,6 +1166,10 @@ func createReplicas(ctx context.Context, conn *dynamodb.DynamoDB, tableName stri
_, err = conn.UpdateTableWithContext(ctx, input)
}

// An update that doesn't (makes no changes) returns ValidationException
// (same region_name and kms_key_arn as currently) throws unhelpfully worded exception:
// ValidationException: One or more parameter values were invalid: KMSMasterKeyId must be specified for each replica.

if create && tfawserr.ErrMessageContains(err, "ValidationException", "already exist") {
return createReplicas(ctx, conn, tableName, tfList, tfVersion, false, timeout)
}
Expand Down Expand Up @@ -1283,49 +1308,95 @@ func updateReplica(ctx context.Context, d *schema.ResourceData, conn *dynamodb.D
o := oRaw.(*schema.Set)
n := nRaw.(*schema.Set)

removed := o.Difference(n).List()
added := n.Difference(o).List()

// 1. changing replica kms keys requires recreation of the replica, like ForceNew, but we don't want to ForceNew the *table*
// 2. also, in order to update PITR if a replica is encrypted (has KMS key), it requires recreation (how'd u recover from a backup encrypted with a different key?)
removeRaw := o.Difference(n).List()
addRaw := n.Difference(o).List()

var removeFirst []interface{} // replicas to delete before recreating (like ForceNew without recreating table)
var toAdd []interface{}
var toRemove []interface{}

// first pass - add replicas that don't have corresponding remove entry
for _, a := range addRaw {
add := true
ma := a.(map[string]interface{})
for _, r := range removeRaw {
mr := r.(map[string]interface{})

if ma["region_name"].(string) == mr["region_name"].(string) {
add = false
break
}
}

// For true updates, don't remove and add, just update (i.e., keep in added
// but remove from removed)
for _, a := range added {
for j, r := range removed {
if add {
toAdd = append(toAdd, ma)
}
}

// second pass - remove replicas that don't have corresponding add entry
for _, r := range removeRaw {
remove := true
mr := r.(map[string]interface{})
for _, a := range addRaw {
ma := a.(map[string]interface{})

if ma["region_name"].(string) == mr["region_name"].(string) {
remove = false
break
}
}

if remove {
toRemove = append(toRemove, mr)
}
}

// third pass - for replicas that exist in both add and remove
// For true updates, don't remove and add, just update
for _, a := range addRaw {
ma := a.(map[string]interface{})
for _, r := range removeRaw {
mr := r.(map[string]interface{})

if ma["region_name"].(string) == mr["region_name"].(string) && (ma["kms_key_arn"].(string) != "" || mr["kms_key_arn"].(string) != "") {
removeFirst = append(removeFirst, removed[j])
removed = append(removed[:j], removed[j+1:]...)
if ma["region_name"].(string) != mr["region_name"].(string) {
continue
}

if ma["region_name"].(string) == mr["region_name"].(string) {
removed = append(removed[:j], removed[j+1:]...)
continue
// like "ForceNew" for the replica - KMS change
if ma["kms_key_arn"].(string) != mr["kms_key_arn"].(string) {
toRemove = append(toRemove, mr)
toAdd = append(toAdd, ma)
break
}

// just update PITR
if ma["point_in_time_recovery"].(bool) != mr["point_in_time_recovery"].(bool) {
if err := updatePITR(ctx, conn, d.Id(), ma["point_in_time_recovery"].(bool), ma["region_name"].(string), tfVersion, d.Timeout(schema.TimeoutUpdate)); err != nil {
return fmt.Errorf("updating replica (%s) point in time recovery: %w", ma["region_name"].(string), err)
}
break
}

// nothing changed, assuming propagate_tags changed so do nothing here
break
}
}

if len(removeFirst) > 0 { // like ForceNew but doesn't recreate the table
if len(removeFirst) > 0 { // mini ForceNew, recreates replica but doesn't recreate the table
if err := deleteReplicas(ctx, conn, d.Id(), removeFirst, d.Timeout(schema.TimeoutUpdate)); err != nil {
return fmt.Errorf("updating replicas, while deleting: %w", err)
}
}

if len(added) > 0 {
if err := createReplicas(ctx, conn, d.Id(), added, tfVersion, true, d.Timeout(schema.TimeoutUpdate)); err != nil {
return fmt.Errorf("updating replicas, while creating: %w", err)
if len(toRemove) > 0 {
if err := deleteReplicas(ctx, conn, d.Id(), toRemove, d.Timeout(schema.TimeoutUpdate)); err != nil {
return fmt.Errorf("updating replicas, while deleting: %w", err)
}
}

if len(removed) > 0 {
if err := deleteReplicas(ctx, conn, d.Id(), removed, d.Timeout(schema.TimeoutUpdate)); err != nil {
return fmt.Errorf("updating replicas, while deleting: %w", err)
if len(toAdd) > 0 {
if err := createReplicas(ctx, conn, d.Id(), toAdd, tfVersion, true, d.Timeout(schema.TimeoutCreate)); err != nil {
return fmt.Errorf("updating replicas, while creating: %w", err)
}
}

Expand Down Expand Up @@ -1501,10 +1572,18 @@ func deleteReplicas(ctx context.Context, conn *dynamodb.DynamoDB, tableName stri

err := resource.RetryContext(ctx, updateTableTimeout, func() *resource.RetryError {
_, err := conn.UpdateTableWithContext(ctx, input)
notFoundRetries := 0
if err != nil {
if tfawserr.ErrCodeEquals(err, "ThrottlingException") {
return resource.RetryableError(err)
}
if tfawserr.ErrCodeEquals(err, dynamodb.ErrCodeResourceNotFoundException) {
notFoundRetries++
if notFoundRetries > 3 {
return resource.NonRetryableError(err)
}
return resource.RetryableError(err)
}
if tfawserr.ErrMessageContains(err, dynamodb.ErrCodeLimitExceededException, "can be created, updated, or deleted simultaneously") {
return resource.RetryableError(err)
}
Expand All @@ -1521,7 +1600,7 @@ func deleteReplicas(ctx context.Context, conn *dynamodb.DynamoDB, tableName stri
_, err = conn.UpdateTableWithContext(ctx, input)
}

if err != nil {
if err != nil && !tfawserr.ErrCodeEquals(err, dynamodb.ErrCodeResourceNotFoundException) {
return fmt.Errorf("deleting replica (%s): %w", regionName, err)
}

Expand Down Expand Up @@ -1570,6 +1649,25 @@ func replicaPITR(ctx context.Context, conn *dynamodb.DynamoDB, tableName string,
return enabled, nil
}

func replicaStream(ctx context.Context, conn *dynamodb.DynamoDB, tableName string, region string, tfVersion string) (string, string) {
// This does not return an error because it is attempting to add "Computed"-only information to replica - tolerating errors.
session, err := conns.NewSessionForRegion(&conn.Config, region, tfVersion)
if err != nil {
log.Printf("[WARN] Attempting to get replica (%s) stream information, ignoring encountered error: %s", tableName, err)
return "", ""
}

conn = dynamodb.New(session)

table, err := FindTableByName(ctx, conn, tableName)
if err != nil {
log.Printf("[WARN] When attempting to get replica (%s) stream information, ignoring encountered error: %s", tableName, err)
return "", ""
}

return aws.StringValue(table.LatestStreamArn), aws.StringValue(table.LatestStreamLabel)
}

func addReplicaPITRs(ctx context.Context, conn *dynamodb.DynamoDB, tableName string, tfVersion string, replicas []interface{}) ([]interface{}, error) {
// This non-standard approach is needed because PITR info for a replica
// must come from a region-specific connection.
Expand All @@ -1588,6 +1686,27 @@ func addReplicaPITRs(ctx context.Context, conn *dynamodb.DynamoDB, tableName str
return replicas, nil
}

func enrichReplicas(ctx context.Context, conn *dynamodb.DynamoDB, arn, tableName, tfVersion string, replicas []interface{}) ([]interface{}, error) {
// This non-standard approach is needed because PITR info for a replica
// must come from a region-specific connection.
for i, replicaRaw := range replicas {
replica := replicaRaw.(map[string]interface{})

newARN, err := ARNForNewRegion(arn, replica["region_name"].(string))
if err != nil {
return nil, fmt.Errorf("creating new-region ARN: %s", err)
}
replica["arn"] = newARN

streamARN, streamLabel := replicaStream(ctx, conn, tableName, replica["region_name"].(string), tfVersion)
replica["stream_arn"] = streamARN
replica["stream_label"] = streamLabel
replicas[i] = replica
}

return replicas, nil
}

func addReplicaTagPropagates(configReplicas *schema.Set, replicas []interface{}) []interface{} {
if configReplicas.Len() == 0 {
return replicas
Expand Down
Loading

0 comments on commit 94ca3c4

Please sign in to comment.