Skip to content

Commit

Permalink
Allow for retries of single requests in a batch on failure (#5355)
Browse files Browse the repository at this point in the history
Signed-off-by: Modular Magician <[email protected]>

Co-authored-by: emily <[email protected]>
  • Loading branch information
modular-magician and emilymye committed Jan 9, 2020
1 parent a3214a0 commit c7b7470
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 114 deletions.
199 changes: 129 additions & 70 deletions google/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@ package google
import (
"context"
"fmt"
"github.com/hashicorp/errwrap"
"log"
"sync"
"time"

"github.com/hashicorp/errwrap"
)

const defaultBatchSendIntervalSec = 3

// RequestBatcher is a global batcher object that keeps track of
// existing batches.
// In general, a batcher should be created per service that requires batching
// in order to prevent blocking batching for one service due to another,
// and to minimize the possibility of overlap in batchKey formats
// (see SendRequestWithTimeout)
// RequestBatcher keeps track of batched requests globally.
// It should be created at a provider level. In general, one
// should be created per service that requires batching to:
// - prevent blocking batching for one service due to another,
// - minimize the possibility of overlap in batchKey formats (see SendRequestWithTimeout)
type RequestBatcher struct {
sync.Mutex

Expand All @@ -27,56 +25,74 @@ type RequestBatcher struct {
debugId string
}

// BatchRequest represents a single request to a global batcher.
type BatchRequest struct {
// ResourceName represents the underlying resource for which
// a request is made. Its format is determined by what SendF expects, but
// typically should be the name of the parent GCP resource being changed.
ResourceName string

// Body is this request's data to be passed to SendF, and may be combined
// with other bodies using CombineF.
Body interface{}

// CombineF function determines how to combine bodies from two batches.
CombineF batcherCombineFunc

// SendF function determines how to actually send a batched request to a
// third party service. The arguments given to this function are
// (ResourceName, Body) where Body may have been combined with other request
// Bodies.
SendF batcherSendFunc

// ID for debugging request. This should be specific to a single request
// (i.e. per Terraform resource)
DebugId string
}

// These types are meant to be the public interface to batchers. They define
// logic to manage batch data type and behavior, and require service-specific
// implementations per type of request per service.
// Function type for combine existing batches and additional batch data
type batcherCombineFunc func(body interface{}, toAdd interface{}) (interface{}, error)
// batch data format and logic to send/combine batches, i.e. they require
// specific implementations per type of request.
type (
// BatchRequest represents a single request to a global batcher.
BatchRequest struct {
// ResourceName represents the underlying resource for which
// a request is made. Its format is determined by what SendF expects, but
// typically should be the name of the parent GCP resource being changed.
ResourceName string

// Body is this request's data to be passed to SendF, and may be combined
// with other bodies using CombineF.
Body interface{}

// CombineF function determines how to combine bodies from two batches.
CombineF BatcherCombineFunc

// SendF function determines how to actually send a batched request to a
// third party service. The arguments given to this function are
// (ResourceName, Body) where Body may have been combined with other request
// Bodies.
SendF BatcherSendFunc

// ID for debugging request. This should be specific to a single request
// (i.e. per Terraform resource)
DebugId string
}

// Function type for sending a batch request
type batcherSendFunc func(resourceName string, body interface{}) (interface{}, error)
// BatcherCombineFunc is a function type for combine existing batches and additional batch data
BatcherCombineFunc func(body interface{}, toAdd interface{}) (interface{}, error)

// BatcherSendFunc is a function type for sending a batch request
BatcherSendFunc func(resourceName string, body interface{}) (interface{}, error)
)

// batchResponse bundles an API response (data, error) tuple.
type batchResponse struct {
body interface{}
err error
}

// startedBatch refers to a processed batch whose timer to send the request has
// already been started. The responses for the request is sent to each listener
// channel, representing parallel callers that are waiting on requests
// combined into this batch.
func (br *batchResponse) IsError() bool {
return br.err != nil
}

// startedBatch refers to a registered batch to group batch requests coming in.
// The timer manages the time after which a given batch is sent.
type startedBatch struct {
batchKey string

// Combined Batch Request
*BatchRequest

listeners []chan batchResponse
timer *time.Timer
// subscribers is a registry of the requests (batchSubscriber) combined into this batcher.

subscribers []batchSubscriber

timer *time.Timer
}

// batchSubscriber contains information required for a single request for a startedBatch.
type batchSubscriber struct {
// singleRequest is the original request this subscriber represents
singleRequest *BatchRequest

// respCh is the channel created to communicate the result to a waiting goroutine.s
respCh chan batchResponse
}

// batchingConfig contains user configuration for controlling batch requests.
Expand All @@ -94,8 +110,12 @@ func NewRequestBatcher(debugId string, ctx context.Context, config *batchingConf
batches: make(map[string]*startedBatch),
}

// Start goroutine to managing stopping the batcher if the provider-level parent context is closed.
go func(b *RequestBatcher) {
<-ctx.Done()
// Block until parent context is closed
<-b.parentCtx.Done()

log.Printf("[DEBUG] parent context canceled, cleaning up batcher batches")
b.stop()
}(batcher)

Expand All @@ -108,19 +128,19 @@ func (b *RequestBatcher) stop() {

log.Printf("[DEBUG] Stopping batcher %q", b.debugId)
for batchKey, batch := range b.batches {
log.Printf("[DEBUG] Cleaning up batch request %q", batchKey)
log.Printf("[DEBUG] Cancelling started batch for batchKey %q", batchKey)
batch.timer.Stop()
for _, l := range batch.listeners {
close(l)
for _, l := range batch.subscribers {
close(l.respCh)
}
}
}

// SendRequestWithTimeout is expected to be called per parallel call.
// It manages waiting on the result of a batch request.
// SendRequestWithTimeout is a blocking call for making a single request, run alone or as part of a batch.
// It manages registering the single request with the batcher and waiting on the result.
//
// Batch requests are grouped by the given batchKey. batchKey
// should be unique to the API request being sent, most likely similar to
// Params:
// batchKey: A string to group batchable requests. It should be unique to the API request being sent, similar to
// the HTTP request URL with GCP resource ID included in the URL (the caller
// may choose to use a key with method if needed to diff GET/read and
// POST/create)
Expand Down Expand Up @@ -182,40 +202,75 @@ func (b *RequestBatcher) registerBatchRequest(batchKey string, newRequest *Batch
return batch.addRequest(newRequest)
}

// Batch doesn't exist for given batch key - create a new batch.

log.Printf("[DEBUG] Creating new batch %q from request %q", newRequest.DebugId, batchKey)

// The calling goroutine will need a channel to wait on for a response.
respCh := make(chan batchResponse, 1)
sub := batchSubscriber{
singleRequest: newRequest,
respCh: respCh,
}

// Create a new batch.
// Create a new batch with copy of the given batch request.
b.batches[batchKey] = &startedBatch{
BatchRequest: newRequest,
batchKey: batchKey,
listeners: []chan batchResponse{respCh},
BatchRequest: &BatchRequest{
ResourceName: newRequest.ResourceName,
Body: newRequest.Body,
CombineF: newRequest.CombineF,
SendF: newRequest.SendF,
DebugId: fmt.Sprintf("Combined batch for started batch %q", batchKey),
},
batchKey: batchKey,
subscribers: []batchSubscriber{sub},
}

// Start a timer to send the request
b.batches[batchKey].timer = time.AfterFunc(b.sendAfter, func() {
batch := b.popBatch(batchKey)

var resp batchResponse
if batch == nil {
log.Printf("[DEBUG] Batch not found in saved batches, running single request batch %q", batchKey)
resp = newRequest.send()
log.Printf("[ERROR] batch should have been added to saved batches - just run as single request %q", newRequest.DebugId)
respCh <- newRequest.send()
close(respCh)
} else {
log.Printf("[DEBUG] Sending batch %q combining %d requests)", batchKey, len(batch.listeners))
resp = batch.send()
}

// Send message to all goroutines waiting on result.
for _, ch := range batch.listeners {
ch <- resp
close(ch)
b.sendBatchWithSingleRetry(batchKey, batch)
}
})

return respCh, nil
}

func (b *RequestBatcher) sendBatchWithSingleRetry(batchKey string, batch *startedBatch) {
log.Printf("[DEBUG] Sending batch %q combining %d requests)", batchKey, len(batch.subscribers))
resp := batch.send()

// If the batch failed and combines more than one request, retry each single request.
if resp.IsError() && len(batch.subscribers) > 1 {
log.Printf("[DEBUG] Batch failed with error: %v", resp.err)
log.Printf("[DEBUG] Sending each request in batch separately")
for _, sub := range batch.subscribers {
log.Printf("[DEBUG] Retrying single request %q", sub.singleRequest.DebugId)
singleResp := sub.singleRequest.send()
log.Printf("[DEBUG] Retried single request %q returned response: %v", sub.singleRequest.DebugId, singleResp)

if singleResp.IsError() {
singleResp.err = errwrap.Wrapf(
"batch request and retry as single request failed - final error: {{err}}",
singleResp.err)
}
sub.respCh <- singleResp
close(sub.respCh)
}
} else {
// Send result to all subscribers
for _, sub := range batch.subscribers {
sub.respCh <- resp
close(sub.respCh)
}
}
}

// popBatch safely gets and removes a batch with given batchkey from the
// RequestBatcher's started batches.
func (b *RequestBatcher) popBatch(batchKey string) *startedBatch {
Expand Down Expand Up @@ -246,7 +301,11 @@ func (batch *startedBatch) addRequest(newRequest *BatchRequest) (<-chan batchRes
log.Printf("[DEBUG] Added batch request %q to batch. New batch body: %v", newRequest.DebugId, batch.Body)

respCh := make(chan batchResponse, 1)
batch.listeners = append(batch.listeners, respCh)
sub := batchSubscriber{
singleRequest: newRequest,
respCh: respCh,
}
batch.subscribers = append(batch.subscribers, sub)
return respCh, nil
}

Expand Down
57 changes: 40 additions & 17 deletions google/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -139,41 +140,63 @@ func TestRequestBatcher_errInSend(t *testing.T) {
enableBatching: true,
})

testResource := "resource for send error"
sendErrTmpl := "this is an expected error in send batch for resource %q"
// combineF keeps track of the batched indexes
testCombine := func(body interface{}, toAdd interface{}) (interface{}, error) {
return append(body.([]int), toAdd.([]int)...), nil
}

// combineF is no-op
testCombine := func(_ interface{}, _ interface{}) (interface{}, error) {
failIdx := 0
testResource := "RESOURCE-SEND-ERROR"
expectedErrMsg := fmt.Sprintf("Error - batch %q contains idx %d", testResource, failIdx)

testSendBatch := func(resourceName string, body interface{}) (interface{}, error) {
log.Printf("[DEBUG] sendBatch body: %+v", body)
for _, v := range body.([]int) {
if v == failIdx {
return nil, fmt.Errorf(expectedErrMsg)
}
}
return nil, nil
}

testSendBatch := func(resourceName string, cnt interface{}) (interface{}, error) {
return cnt, fmt.Errorf(sendErrTmpl, resourceName)
}
numRequests := 3

wg := sync.WaitGroup{}
wg.Add(2)
wg.Add(numRequests)

for i := 0; i < 2; i++ {
for i := 0; i < numRequests; i++ {
go func(idx int) {
defer wg.Done()

req := &BatchRequest{
DebugId: fmt.Sprintf("sendError %d", idx),
ResourceName: testResource,
Body: nil,
Body: []int{idx},
CombineF: testCombine,
SendF: testSendBatch,
}

_, err := testBatcher.SendRequestWithTimeout("batchSendError", req, time.Duration(10)*time.Second)
if err == nil {
t.Errorf("expected error, got none")
return
}
expectedErr := fmt.Sprintf(sendErrTmpl, testResource)
if !strings.Contains(err.Error(), fmt.Sprintf(sendErrTmpl, testResource)) {
t.Errorf("expected error %q, got error: %v", expectedErr, err)
// Requests without index 0 should have succeeded
if idx == failIdx {
// We expect an error
if err == nil {
t.Errorf("expected error for request %d, got none", idx)
}
// Check error message
expectedErrPrefix := "batch request and retry as single request failed - final error: "
if !strings.Contains(err.Error(), expectedErrPrefix) {
t.Errorf("expected error %q to contain %q", err, expectedErrPrefix)
}
if !strings.Contains(err.Error(), expectedErrMsg) {
t.Errorf("expected error %q to contain %q", err, expectedErrMsg)
}
} else {

// We shouldn't get error for non-failure index
if err != nil {
t.Errorf("expected request %d to succeed, got error: %v", idx, err)
}
}
}(i)
}
Expand Down
2 changes: 1 addition & 1 deletion google/iam_batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func combineBatchIamPolicyModifiers(currV interface{}, toAddV interface{}) (inte
return append(currModifiers, newModifiers...), nil
}

func sendBatchModifyIamPolicy(updater ResourceIamUpdater) batcherSendFunc {
func sendBatchModifyIamPolicy(updater ResourceIamUpdater) BatcherSendFunc {
return func(resourceName string, body interface{}) (interface{}, error) {
modifiers, ok := body.([]iamPolicyModifyFunc)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion google/resource_google_project_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func resourceGoogleProjectServiceCreate(d *schema.ResourceData, meta interface{}
}

srv := d.Get("service").(string)
err = BatchRequestEnableServices(map[string]struct{}{srv: {}}, project, d, config)
err = BatchRequestEnableService(srv, project, d, config)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit c7b7470

Please sign in to comment.