Skip to content

Commit

Permalink
hanging resource issue is fixed for cluster and added retry logic (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
aniket-Kumar-c authored Dec 7, 2023
1 parent 7fc1a00 commit 4fa8fe6
Show file tree
Hide file tree
Showing 27 changed files with 1,105 additions and 288 deletions.
107 changes: 106 additions & 1 deletion internal/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package api

import (
"bytes"
"context"
"encoding/json"
goer "errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -49,7 +51,10 @@ type EndpointCfg struct {
SuccessStatus int
}

// Execute is used to construct and execute an HTTP request.
// defaultWaitAttempt re-attempt http request after 2 seconds.
const defaultWaitAttempt = time.Second * 2

// Execute is used to construct and execute a HTTP request.
// It then returns the response.
func (c *Client) Execute(
endpointCfg EndpointCfg,
Expand Down Expand Up @@ -102,3 +107,103 @@ func (c *Client) Execute(
Body: responseBody,
}, nil
}

// ExecuteWithRetry is used to construct and execute a HTTP request with retry.
// It then returns the response.
func (c *Client) ExecuteWithRetry(
ctx context.Context,
endpointCfg EndpointCfg,
payload any,
authToken string,
headers map[string]string,
) (response *Response, err error) {
var requestBody []byte
if payload != nil {
requestBody, err = json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("%s: %w", errors.ErrMarshallingPayload, err)
}
}

req, err := http.NewRequest(endpointCfg.Method, endpointCfg.Url, bytes.NewReader(requestBody))
if err != nil {
return nil, fmt.Errorf("%s: %w", errors.ErrConstructingRequest, err)
}

req.Header.Set("Authorization", "Bearer "+authToken)
for header, value := range headers {
req.Header.Set(header, value)
}

var fn = func() (response *Response, err error) {
apiRes, err := c.Do(req)
if err != nil {
return nil, fmt.Errorf("%s: %w", errors.ErrExecutingRequest, err)
}
defer apiRes.Body.Close()

responseBody, err := io.ReadAll(apiRes.Body)
if err != nil {
return
}

switch apiRes.StatusCode {
case endpointCfg.SuccessStatus:
// success case
case http.StatusGatewayTimeout:
return nil, errors.ErrGatewayTimeout
default:
var apiError Error
if err := json.Unmarshal(responseBody, &apiError); err != nil {
return nil, fmt.Errorf(
"unexpected code: %d, expected: %d, body: %s",
apiRes.StatusCode, endpointCfg.SuccessStatus, responseBody)
}
if apiError.Code == 0 {
return nil, fmt.Errorf(
"unexpected code: %d, expected: %d, body: %s",
apiRes.StatusCode, endpointCfg.SuccessStatus, responseBody)

}
return nil, &apiError
}

return &Response{
Response: apiRes,
Body: responseBody,
}, nil
}

return exec(ctx, fn, defaultWaitAttempt)
}

func exec(ctx context.Context, fn func() (response *Response, err error), waitOnReattempt time.Duration) (*Response, error) {
timer := time.NewTimer(time.Millisecond)

var (
err error
response *Response
)

const timeout = time.Minute * 10

var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()

for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("timed out executing request against api: %w", err)
case <-timer.C:
response, err = fn()
switch {
case err == nil:
return response, nil
case !goer.Is(err, errors.ErrGatewayTimeout):
return response, err
}
timer.Reset(waitOnReattempt)
}
}
}
3 changes: 2 additions & 1 deletion internal/api/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func GetPaginated[DataSchema ~[]T, T any](
cfg.Url = baseUrl + fmt.Sprintf("?page=%d&perPage=%d&sortBy=%s", page, perPage, string(sortBy))
cfg.Method = http.MethodGet

response, err := client.Execute(
response, err := client.ExecuteWithRetry(
ctx,
cfg,
nil,
token,
Expand Down
6 changes: 4 additions & 2 deletions internal/datasources/backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ func (d *Backups) Read(ctx context.Context, req datasource.ReadRequest, resp *da
// Get all the cycles
url := fmt.Sprintf("%s/v4/organizations/%s/projects/%s/clusters/%s/buckets/%s/backup/cycles", d.HostURL, organizationId, projectId, clusterId, bucketId)
cfg := api.EndpointCfg{Url: url, Method: http.MethodGet, SuccessStatus: http.StatusOK}
response, err := d.Client.Execute(
response, err := d.Client.ExecuteWithRetry(
ctx,
cfg,
nil,
d.Token,
Expand Down Expand Up @@ -89,7 +90,8 @@ func (d *Backups) Read(ctx context.Context, req datasource.ReadRequest, resp *da
for _, cycle := range cyclesResp.Data {
url := fmt.Sprintf("%s/v4/organizations/%s/projects/%s/clusters/%s/buckets/%s/backup/cycles/%s", d.HostURL, organizationId, projectId, clusterId, bucketId, cycle.CycleId)
cfg := api.EndpointCfg{Url: url, Method: http.MethodGet, SuccessStatus: http.StatusOK}
response, err := d.Client.Execute(
response, err := d.Client.ExecuteWithRetry(
ctx,
cfg,
nil,
d.Token,
Expand Down
3 changes: 2 additions & 1 deletion internal/datasources/certificate.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func (c *Certificate) Read(ctx context.Context, req datasource.ReadRequest, resp

url := fmt.Sprintf("%s/v4/organizations/%s/projects/%s/clusters/%s/certificates", c.HostURL, organizationId, projectId, clusterId)
cfg := api.EndpointCfg{Url: url, Method: http.MethodGet, SuccessStatus: http.StatusOK}
response, err := c.Client.Execute(
response, err := c.Client.ExecuteWithRetry(
ctx,
cfg,
nil,
c.Token,
Expand Down
3 changes: 2 additions & 1 deletion internal/datasources/organization.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func (o *Organization) Read(ctx context.Context, req datasource.ReadRequest, res
// Make request to get organization
url := fmt.Sprintf("%s/v4/organizations/%s", o.HostURL, organizationId)
cfg := api.EndpointCfg{Url: url, Method: http.MethodGet, SuccessStatus: http.StatusOK}
response, err := o.Client.Execute(
response, err := o.Client.ExecuteWithRetry(
ctx,
cfg,
nil,
o.Token,
Expand Down
3 changes: 3 additions & 0 deletions internal/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,7 @@ var (
// ErrClusterCreationTimeoutAfterInitiation is returned when cluster creation
// is timeout after initiation.
ErrClusterCreationTimeoutAfterInitiation = errors.New("cluster creation status transition timed out after initiation")

// ErrGatewayTimeout is returned when a gateway operation times out.
ErrGatewayTimeout = errors.New("gateway timeout")
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package acceptance_tests

import (
"context"
"fmt"
"log"
"net/http"
Expand Down Expand Up @@ -366,7 +367,8 @@ func testAccDeleteAllowIP(clusterResourceReference, projectResourceReference, al
authToken := os.Getenv("TF_VAR_auth_token")
url := fmt.Sprintf("%s/v4/organizations/%s/projects/%s/clusters/%s/allowedcidrs/%s", host, orgid, projectState["id"], clusterState["id"], allowListState["id"])
cfg := api.EndpointCfg{Url: url, Method: http.MethodDelete, SuccessStatus: http.StatusNoContent}
_, err = data.Client.Execute(
_, err = data.Client.ExecuteWithRetry(
context.Background(),
cfg,
nil,
authToken,
Expand Down
Loading

0 comments on commit 4fa8fe6

Please sign in to comment.