diff --git a/go.mod b/go.mod index 5e238808a52..7a5b97024ab 100644 --- a/go.mod +++ b/go.mod @@ -61,7 +61,7 @@ require ( github.com/nats-io/nuid v1.0.0 // indirect github.com/onsi/ginkgo v1.7.0 // indirect github.com/onsi/gomega v1.4.3 // indirect - github.com/opentracing/opentracing-go v1.0.2 + github.com/opentracing/opentracing-go v1.1.0 github.com/philhofer/fwd v1.0.0 // indirect github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v0.9.0 diff --git a/go.sum b/go.sum index 1763d84d1ea..2c359e53bae 100644 --- a/go.sum +++ b/go.sum @@ -294,6 +294,8 @@ github.com/opencontainers/image-spec v1.0.1 h1:JMemWkRwHx4Zj+fVxWoMCFm/8sYGGrUVo github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= +github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-buffruneio v0.2.0/go.mod h1:JkE26KsDizTr40EUHkXVtNPvgGtbSNq5BcowyYOWdKo= diff --git a/http/bucket_service.go b/http/bucket_service.go index 6b3a7aa14cb..c409765689c 100644 --- a/http/bucket_service.go +++ b/http/bucket_service.go @@ -605,7 +605,6 @@ func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*in return nil, err } SetToken(s.Token, req) - tracing.InjectToHTTPRequest(span, req) hc := NewClient(u.Scheme, s.InsecureSkipVerify) resp, err := hc.Do(req) @@ -692,7 +691,6 @@ func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketF req.URL.RawQuery = query.Encode() SetToken(s.Token, req) - tracing.InjectToHTTPRequest(span, req) hc := NewClient(u.Scheme, s.InsecureSkipVerify) resp, err := hc.Do(req) @@ -745,7 +743,6 @@ func (s *BucketService) CreateBucket(ctx context.Context, b *influxdb.Bucket) er req.Header.Set("Content-Type", "application/json") SetToken(s.Token, req) - tracing.InjectToHTTPRequest(span, req) hc := NewClient(u.Scheme, s.InsecureSkipVerify) diff --git a/http/client.go b/http/client.go index 890fc9c60d0..0690c28f13f 100644 --- a/http/client.go +++ b/http/client.go @@ -54,6 +54,7 @@ func NewService(addr, token string) *Service { } } +// NewURL concats addr and path. func NewURL(addr, path string) (*url.URL, error) { u, err := url.Parse(addr) if err != nil { @@ -63,6 +64,7 @@ func NewURL(addr, path string) (*url.URL, error) { return u, nil } +// NewClient returns an http.Client that pools connections and injects a span. func NewClient(scheme string, insecure bool) *traceClient { hc := &traceClient{ Client: http.Client{ @@ -84,6 +86,7 @@ type traceClient struct { // Do injects the trace and then performs the request. func (c *traceClient) Do(r *http.Request) (*http.Response, error) { span, _ := tracing.StartSpanFromContext(r.Context()) + defer span.Finish() tracing.InjectToHTTPRequest(span, r) return c.Client.Do(r) } diff --git a/http/influxdb/client.go b/http/influxdb/client.go index a8020cd6075..260026845db 100644 --- a/http/influxdb/client.go +++ b/http/influxdb/client.go @@ -80,6 +80,7 @@ type traceClient struct { // Do injects the trace and then performs the request. func (c *traceClient) Do(r *http.Request) (*http.Response, error) { span, _ := tracing.StartSpanFromContext(r.Context()) + defer span.Finish() tracing.InjectToHTTPRequest(span, r) return c.Client.Do(r) } diff --git a/http/influxdb/source_proxy_query_service.go b/http/influxdb/source_proxy_query_service.go index a43d24125a4..dbc61fa4efc 100644 --- a/http/influxdb/source_proxy_query_service.go +++ b/http/influxdb/source_proxy_query_service.go @@ -92,7 +92,6 @@ func (s *SourceProxyQueryService) fluxQuery(ctx context.Context, w io.Writer, re hreq.Header.Set("Authorization", s.Token) hreq.Header.Set("Content-Type", "application/json") hreq = hreq.WithContext(ctx) - tracing.InjectToHTTPRequest(span, hreq) hc := newTraceClient(u.Scheme, s.InsecureSkipVerify) resp, err := hc.Do(hreq) @@ -140,7 +139,6 @@ func (s *SourceProxyQueryService) influxQuery(ctx context.Context, w io.Writer, params.Set("rp", compiler.RP) hreq.URL.RawQuery = params.Encode() - tracing.InjectToHTTPRequest(span, hreq) hc := newTraceClient(u.Scheme, s.InsecureSkipVerify) resp, err := hc.Do(hreq) diff --git a/http/org_service.go b/http/org_service.go index d2af8240bde..5787918ca38 100644 --- a/http/org_service.go +++ b/http/org_service.go @@ -656,7 +656,6 @@ func (s *OrganizationService) FindOrganizations(ctx context.Context, filter infl if err != nil { return nil, 0, tracing.LogError(span, err) } - tracing.InjectToHTTPRequest(span, req) SetToken(s.Token, req) hc := NewClient(url.Scheme, s.InsecureSkipVerify) @@ -707,7 +706,6 @@ func (s *OrganizationService) CreateOrganization(ctx context.Context, o *influxd if err != nil { return tracing.LogError(span, err) } - tracing.InjectToHTTPRequest(span, req) req.Header.Set("Content-Type", "application/json") SetToken(s.Token, req) @@ -754,7 +752,6 @@ func (s *OrganizationService) UpdateOrganization(ctx context.Context, id influxd if err != nil { return nil, tracing.LogError(span, err) } - tracing.InjectToHTTPRequest(span, req) req.Header.Set("Content-Type", "application/json") SetToken(s.Token, req) @@ -793,7 +790,6 @@ func (s *OrganizationService) DeleteOrganization(ctx context.Context, id influxd if err != nil { return tracing.LogError(span, err) } - tracing.InjectToHTTPRequest(span, req) SetToken(s.Token, req) diff --git a/http/query_handler.go b/http/query_handler.go index 81b784d7610..0302d458699 100644 --- a/http/query_handler.go +++ b/http/query_handler.go @@ -370,7 +370,6 @@ func (s *FluxService) Query(ctx context.Context, w io.Writer, r *query.ProxyRequ hreq.Header.Set("Content-Type", "application/json") hreq.Header.Set("Accept", "text/csv") hreq = hreq.WithContext(ctx) - tracing.InjectToHTTPRequest(span, hreq) hc := NewClient(u.Scheme, s.InsecureSkipVerify) resp, err := hc.Do(hreq) @@ -438,7 +437,6 @@ func (s *FluxQueryService) Query(ctx context.Context, r *query.Request) (flux.Re hreq.Header.Set("Content-Type", "application/json") hreq.Header.Set("Accept", "text/csv") hreq = hreq.WithContext(ctx) - tracing.InjectToHTTPRequest(span, hreq) hc := NewClient(u.Scheme, s.InsecureSkipVerify) resp, err := hc.Do(hreq) diff --git a/http/source_proxy_service.go b/http/source_proxy_service.go index 6400e09d276..f0a6e14788e 100644 --- a/http/source_proxy_service.go +++ b/http/source_proxy_service.go @@ -54,7 +54,6 @@ func (s *SourceProxyQueryService) queryFlux(ctx context.Context, w io.Writer, re hreq.Header.Set("Authorization", fmt.Sprintf("Token %s", s.Token)) hreq.Header.Set("Content-Type", "application/json") hreq = hreq.WithContext(ctx) - tracing.InjectToHTTPRequest(span, hreq) hc := NewClient(u.Scheme, s.InsecureSkipVerify) resp, err := hc.Do(hreq) diff --git a/http/task_service.go b/http/task_service.go index 92c3b679b86..deab81e00f3 100644 --- a/http/task_service.go +++ b/http/task_service.go @@ -1373,7 +1373,6 @@ func (t TaskService) FindTaskByID(ctx context.Context, id platform.ID) (*platfor return nil, err } SetToken(t.Token, req) - tracing.InjectToHTTPRequest(span, req) hc := NewClient(u.Scheme, t.InsecureSkipVerify) resp, err := hc.Do(req) @@ -1435,7 +1434,6 @@ func (t TaskService) FindTasks(ctx context.Context, filter platform.TaskFilter) return nil, 0, err } SetToken(t.Token, req) - tracing.InjectToHTTPRequest(span, req) hc := NewClient(u.Scheme, t.InsecureSkipVerify) resp, err := hc.Do(req) @@ -1482,7 +1480,6 @@ func (t TaskService) CreateTask(ctx context.Context, tc platform.TaskCreate) (*p req.Header.Set("Content-Type", "application/json") SetToken(t.Token, req) - tracing.InjectToHTTPRequest(span, req) hc := NewClient(u.Scheme, t.InsecureSkipVerify) @@ -1525,7 +1522,6 @@ func (t TaskService) UpdateTask(ctx context.Context, id platform.ID, upd platfor req.Header.Set("Content-Type", "application/json") SetToken(t.Token, req) - tracing.InjectToHTTPRequest(span, req) hc := NewClient(u.Scheme, t.InsecureSkipVerify) @@ -1564,7 +1560,6 @@ func (t TaskService) DeleteTask(ctx context.Context, id platform.ID) error { req.Header.Set("Content-Type", "application/json") SetToken(t.Token, req) - tracing.InjectToHTTPRequest(span, req) hc := NewClient(u.Scheme, t.InsecureSkipVerify) @@ -1603,7 +1598,6 @@ func (t TaskService) FindLogs(ctx context.Context, filter platform.LogFilter) ([ return nil, 0, err } SetToken(t.Token, req) - tracing.InjectToHTTPRequest(span, req) hc := NewClient(u.Scheme, t.InsecureSkipVerify) @@ -1657,7 +1651,6 @@ func (t TaskService) FindRuns(ctx context.Context, filter platform.RunFilter) ([ req.Header.Set("Content-Type", "application/json") SetToken(t.Token, req) - tracing.InjectToHTTPRequest(span, req) hc := NewClient(u.Scheme, t.InsecureSkipVerify) @@ -1700,7 +1693,6 @@ func (t TaskService) FindRunByID(ctx context.Context, taskID, runID platform.ID) } SetToken(t.Token, req) - tracing.InjectToHTTPRequest(span, req) hc := NewClient(u.Scheme, t.InsecureSkipVerify) @@ -1744,7 +1736,6 @@ func (t TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID) (* } SetToken(t.Token, req) - tracing.InjectToHTTPRequest(span, req) hc := NewClient(u.Scheme, t.InsecureSkipVerify) @@ -1792,7 +1783,6 @@ func (t TaskService) ForceRun(ctx context.Context, taskID platform.ID, scheduled } SetToken(t.Token, req) - tracing.InjectToHTTPRequest(span, req) hc := NewClient(u.Scheme, t.InsecureSkipVerify) @@ -1843,7 +1833,6 @@ func (t TaskService) CancelRun(ctx context.Context, taskID, runID platform.ID) e } SetToken(t.Token, req) - tracing.InjectToHTTPRequest(span, req) hc := NewClient(u.Scheme, t.InsecureSkipVerify)