Skip to content

Commit

Permalink
Fix Resty logging, timeout and retry configuration (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
anandswaminathan committed Jun 17, 2019
1 parent 9e35f64 commit 93f0e01
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 29 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ required = [

[[constraint]]
name = "github.com/lyft/flytestdlib"
version = "0.2.6"
version = "0.2.8"
39 changes: 20 additions & 19 deletions pkg/controller/flink/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ const httpGet = "GET"
const httpPost = "POST"
const httpPatch = "PATCH"
const retryCount = 3
const timeOut = 5 * time.Second
const httpGetTimeOut = 5 * time.Second
const defaultTimeOut = 1 * time.Minute

type FlinkAPIInterface interface {
CancelJobWithSavepoint(ctx context.Context, url string, jobID string) (string, error)
Expand All @@ -47,7 +48,6 @@ type FlinkAPIInterface interface {
}

type FlinkJobManagerClient struct {
client *resty.Client
metrics *flinkJobManagerClientMetrics
}

Expand Down Expand Up @@ -98,7 +98,7 @@ func (c *FlinkJobManagerClient) GetJobConfig(ctx context.Context, url, jobID str
path := fmt.Sprintf(getJobConfigURL, jobID)
url = url + path

response, err := c.executeRequest(httpGet, url, nil)
response, err := c.executeRequest(ctx, httpGet, url, nil)
if err != nil {
c.metrics.getJobConfigFailureCounter.Inc(ctx)
return nil, errors.Wrap(err, "GetJobConfig API request failed")
Expand All @@ -120,7 +120,7 @@ func (c *FlinkJobManagerClient) GetJobConfig(ctx context.Context, url, jobID str

func (c *FlinkJobManagerClient) GetClusterOverview(ctx context.Context, url string) (*ClusterOverviewResponse, error) {
url = url + getOverviewURL
response, err := c.executeRequest(httpGet, url, nil)
response, err := c.executeRequest(ctx, httpGet, url, nil)
if err != nil {
c.metrics.getClusterFailureCounter.Inc(ctx)
return nil, errors.Wrap(err, "GetClusterOverview API request failed")
Expand All @@ -142,16 +142,19 @@ func (c *FlinkJobManagerClient) GetClusterOverview(ctx context.Context, url stri
}

// Helper method to execute the requests
func (c *FlinkJobManagerClient) executeRequest(
func (c *FlinkJobManagerClient) executeRequest(ctx context.Context,
method string, url string, payload interface{}) (*resty.Response, error) {
client := resty.SetLogger(logger.GetLogWriter(ctx)).SetTimeout(defaultTimeOut)

var resp *resty.Response
var err error
if method == httpGet {
resp, err = c.client.R().Get(url)
client.SetTimeout(httpGetTimeOut).SetRetryCount(retryCount)
resp, err = client.R().Get(url)
} else if method == httpPatch {
resp, err = c.client.R().Patch(url)
resp, err = client.R().Patch(url)
} else if method == httpPost {
resp, err = c.client.R().
resp, err = client.R().
SetHeader("Content-Type", "application/json").
SetBody(payload).
Post(url)
Expand All @@ -168,7 +171,7 @@ func (c *FlinkJobManagerClient) CancelJobWithSavepoint(ctx context.Context, url
cancelJobRequest := CancelJobRequest{
CancelJob: true,
}
response, err := c.executeRequest(httpPost, url, cancelJobRequest)
response, err := c.executeRequest(ctx, httpPost, url, cancelJobRequest)
if err != nil {
c.metrics.cancelJobFailureCounter.Inc(ctx)
return "", errors.Wrap(err, "Cancel job API request failed")
Expand All @@ -192,7 +195,7 @@ func (c *FlinkJobManagerClient) ForceCancelJob(ctx context.Context, url string,

url = url + path + "?mode=cancel"

response, err := c.executeRequest(httpPatch, url, nil)
response, err := c.executeRequest(ctx, httpPatch, url, nil)
if err != nil {
c.metrics.forceCancelJobFailureCounter.Inc(ctx)
return errors.Wrap(err, "Force cancel job API request failed")
Expand All @@ -211,7 +214,7 @@ func (c *FlinkJobManagerClient) SubmitJob(ctx context.Context, url string, jarID
path := fmt.Sprintf(submitJobURL, jarID)
url = url + path

response, err := c.executeRequest(httpPost, url, submitJobRequest)
response, err := c.executeRequest(ctx, httpPost, url, submitJobRequest)
if err != nil {
c.metrics.submitJobFailureCounter.Inc(ctx)
return nil, errors.Wrap(err, "Submit job API request failed")
Expand All @@ -236,7 +239,7 @@ func (c *FlinkJobManagerClient) CheckSavepointStatus(ctx context.Context, url st
path := fmt.Sprintf(checkSavepointStatusURL, jobID, triggerID)
url = url + path

response, err := c.executeRequest(httpGet, url, nil)
response, err := c.executeRequest(ctx, httpGet, url, nil)
if err != nil {
c.metrics.checkSavepointFailureCounter.Inc(ctx)
return nil, errors.Wrap(err, "Check savepoint status API request failed")
Expand All @@ -257,7 +260,7 @@ func (c *FlinkJobManagerClient) CheckSavepointStatus(ctx context.Context, url st

func (c *FlinkJobManagerClient) GetJobs(ctx context.Context, url string) (*GetJobsResponse, error) {
url = url + getJobsURL
response, err := c.executeRequest(httpGet, url, nil)
response, err := c.executeRequest(ctx, httpGet, url, nil)
if err != nil {
c.metrics.getJobsFailureCounter.Inc(ctx)
return nil, errors.Wrap(err, "Get jobs API request failed")
Expand All @@ -279,7 +282,7 @@ func (c *FlinkJobManagerClient) GetJobs(ctx context.Context, url string) (*GetJo

func (c *FlinkJobManagerClient) GetLatestCheckpoint(ctx context.Context, url string, jobID string) (*CheckpointStatistics, error) {
endpoint := fmt.Sprintf(url+checkpointsURL, jobID)
response, err := c.executeRequest(httpGet, endpoint, nil)
response, err := c.executeRequest(ctx, httpGet, endpoint, nil)
if err != nil {
c.metrics.getCheckpointsFailureCounter.Inc(ctx)
return nil, errors.Wrap(err, "get checkpoints failed")
Expand All @@ -300,7 +303,7 @@ func (c *FlinkJobManagerClient) GetLatestCheckpoint(ctx context.Context, url str

func (c *FlinkJobManagerClient) GetTaskManagers(ctx context.Context, url string) (*TaskManagersResponse, error) {
endpoint := url + taskmanagersURL
response, err := c.executeRequest(httpGet, endpoint, nil)
response, err := c.executeRequest(ctx, httpGet, endpoint, nil)
if err != nil {
return nil, errors.Wrap(err, "get taskmanagers failed")
}
Expand All @@ -320,7 +323,7 @@ func (c *FlinkJobManagerClient) GetTaskManagers(ctx context.Context, url string)

func (c *FlinkJobManagerClient) GetCheckpointCounts(ctx context.Context, url string, jobID string) (*CheckpointResponse, error) {
endpoint := fmt.Sprintf(url+checkpointsURL, jobID)
response, err := c.executeRequest(httpGet, endpoint, nil)
response, err := c.executeRequest(ctx, httpGet, endpoint, nil)
if err != nil {
c.metrics.getCheckpointsFailureCounter.Inc(ctx)
return nil, errors.Wrap(err, "get checkpoints failed")
Expand All @@ -341,7 +344,7 @@ func (c *FlinkJobManagerClient) GetCheckpointCounts(ctx context.Context, url str

func (c *FlinkJobManagerClient) GetJobOverview(ctx context.Context, url string, jobID string) (*FlinkJobOverview, error) {
endpoint := fmt.Sprintf(url+getJobsOverviewURL, jobID)
response, err := c.executeRequest(httpGet, endpoint, nil)
response, err := c.executeRequest(ctx, httpGet, endpoint, nil)
if err != nil {
return nil, errors.Wrap(err, "get job overview failed")
}
Expand All @@ -359,10 +362,8 @@ func (c *FlinkJobManagerClient) GetJobOverview(ctx context.Context, url string,
}

func NewFlinkJobManagerClient(config config.RuntimeConfig) FlinkAPIInterface {
client := resty.SetRetryCount(retryCount).SetTimeout(timeOut)
metrics := newFlinkJobManagerClientMetrics(config.MetricsScope)
return &FlinkJobManagerClient{
client: client,
metrics: metrics,
}
}
8 changes: 2 additions & 6 deletions pkg/controller/flink/client/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"testing"

"github.com/go-resty/resty"
"github.com/jarcoal/httpmock"
mockScope "github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/promutils/labeled"
Expand All @@ -27,10 +26,7 @@ const fakeCancelURL = "http://abc.com/jobs/1/savepoints"
const fakeTaskmanagersURL = "http://abc.com/taskmanagers"

func getTestClient() FlinkJobManagerClient {
client := resty.SetRetryCount(1)
return FlinkJobManagerClient{
client: client,
}
return FlinkJobManagerClient{}
}

func getTestJobManagerClient() FlinkAPIInterface {
Expand Down Expand Up @@ -438,7 +434,7 @@ func TestClientInvalidMethod(t *testing.T) {
defer httpmock.DeactivateAndReset()

client := getTestClient()
_, err := client.executeRequest("random", testURL, nil)
_, err := client.executeRequest(context.Background(), "random", testURL, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, "Invalid method random in request")
}
Expand Down

0 comments on commit 93f0e01

Please sign in to comment.