From db694e6105fc76ae4b0d1a6719487d9353b17e98 Mon Sep 17 00:00:00 2001 From: prydin Date: Fri, 16 Nov 2018 17:09:58 -0500 Subject: [PATCH 01/18] Scale and performance improvements --- plugins/inputs/vsphere/endpoint.go | 158 ++++++++++++++----------- plugins/outputs/wavefront/wavefront.go | 2 +- 2 files changed, 90 insertions(+), 70 deletions(-) diff --git a/plugins/inputs/vsphere/endpoint.go b/plugins/inputs/vsphere/endpoint.go index dbc67dd959366..27f6653398437 100644 --- a/plugins/inputs/vsphere/endpoint.go +++ b/plugins/inputs/vsphere/endpoint.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "math/rand" "net/url" "regexp" "strconv" @@ -26,13 +27,15 @@ var isolateLUN = regexp.MustCompile(".*/([^/]+)/?$") const metricLookback = 3 +const rtMetricLookback = 3 + // Endpoint is a high-level representation of a connected vCenter endpoint. It is backed by the lower // level Client type. type Endpoint struct { - Parent *VSphere - URL *url.URL - lastColls map[string]time.Time - instanceInfo map[string]resourceInfo + Parent *VSphere + URL *url.URL + lastColls map[string]time.Time + //instanceInfo map[string]resourceInfo resourceKinds map[string]resourceKind hwMarks *TSCache lun2ds map[string]string @@ -52,6 +55,7 @@ type resourceKind struct { sampling int32 objects objectMap filters filter.Filter + metrics performance.MetricList collectInstances bool getObjects func(context.Context, *Endpoint, *view.ContainerView) (objectMap, error) } @@ -74,12 +78,6 @@ type objectRef struct { dcname string } -type resourceInfo struct { - name string - metrics performance.MetricList - parentRef *types.ManagedObjectReference -} - type metricQRequest struct { res *resourceKind obj objectRef @@ -100,7 +98,6 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, Parent: parent, lastColls: make(map[string]time.Time), hwMarks: NewTSCache(1 * time.Hour), - instanceInfo: make(map[string]resourceInfo), lun2ds: make(map[string]string), initialized: false, clientFactory: NewClientFactory(ctx, url, parent), @@ -276,6 +273,21 @@ func (e *Endpoint) getMetricNameMap(ctx context.Context) (map[int32]string, erro return names, nil } +func (e *Endpoint) getMetadata2(ctx context.Context, obj objectRef, sampling int32) (performance.MetricList, error) { + client, err := e.clientFactory.GetClient(ctx) + if err != nil { + return nil, err + } + + ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) + defer cancel1() + metrics, err := client.Perf.AvailableMetric(ctx1, obj.ref.Reference(), sampling) + if err != nil && err != context.Canceled { + return nil, err + } + return metrics, nil +} + func (e *Endpoint) getMetadata(ctx context.Context, in interface{}) interface{} { client, err := e.clientFactory.GetClient(ctx) if err != nil { @@ -350,7 +362,7 @@ func (e *Endpoint) discover(ctx context.Context) error { log.Printf("D! [input.vsphere]: Discover new objects for %s", e.URL.Host) - instInfo := make(map[string]resourceInfo) + instInfoMux := sync.Mutex{} resourceKinds := make(map[string]resourceKind) dcNameCache := make(map[string]string) @@ -374,40 +386,51 @@ func (e *Endpoint) discover(ctx context.Context) error { } } - // Set up a worker pool for processing metadata queries concurrently - wp := NewWorkerPool(10) - wp.Run(ctx, e.getMetadata, e.Parent.DiscoverConcurrency) - - // Fill the input channels with resources that need to be queried - // for metadata. - wp.Fill(ctx, func(ctx context.Context, f PushFunc) { - for _, obj := range objects { - f(ctx, &metricQRequest{obj: obj, res: &res}) + // Get metric metadata and filter metrics + prob := 100.0 / float64(len(objects)) + log.Printf("D! [input.vsphere] Probability of sampling a resource: %f", prob) + wg := sync.WaitGroup{} + limiter := make(chan struct{}, e.Parent.DiscoverConcurrency) + for _, obj := range objects { + if rand.Float64() > prob { + continue } - }) - - // Drain the resulting metadata and build instance infos. - wp.Drain(ctx, func(ctx context.Context, in interface{}) bool { - switch resp := in.(type) { - case *metricQResponse: - mList := make(performance.MetricList, 0) - if res.enabled { - for _, m := range *resp.metrics { - if m.Instance != "" && !res.collectInstances { - continue - } - if res.filters.Match(metricNames[m.CounterId]) { - mList = append(mList, m) - } + wg.Add(1) + go func(obj objectRef) { + defer wg.Done() + limiter <- struct{}{} + defer func() { + <-limiter + }() + metrics, err := e.getMetadata2(ctx, obj, res.sampling) + if err != nil { + log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err) + } + mMap := make(map[string]types.PerfMetricId) + for _, m := range metrics { + if m.Instance != "" && res.collectInstances { + m.Instance = "*" + } else { + m.Instance = "" + } + if res.filters.Match(metricNames[m.CounterId]) { + mMap[strconv.Itoa(int(m.CounterId))+"|"+m.Instance] = m } } - instInfo[resp.obj.ref.Value] = resourceInfo{name: resp.obj.name, metrics: mList, parentRef: resp.obj.parentRef} - case error: - log.Printf("W! [input.vsphere]: Error while discovering resources: %s", resp) - return false - } - return true - }) + log.Printf("D! [input.vsphere] Found %d metrics for %s", len(mMap), obj.name) + instInfoMux.Lock() + defer instInfoMux.Unlock() + if len(mMap) > len(res.metrics) { + res.metrics = make(performance.MetricList, len(mMap)) + i := 0 + for _, m := range mMap { + res.metrics[i] = m + i++ + } + } + }(obj) + } + wg.Wait() res.objects = objects resourceKinds[k] = res } @@ -428,12 +451,11 @@ func (e *Endpoint) discover(ctx context.Context) error { e.collectMux.Lock() defer e.collectMux.Unlock() - e.instanceInfo = instInfo e.resourceKinds = resourceKinds e.lun2ds = l2d sw.Stop() - SendInternalCounter("discovered_objects", e.URL.Host, int64(len(instInfo))) + // SendInternalCounter("discovered_objects", e.URL.Host, int64(len(instInfo))) TODO: Count the correct way return nil } @@ -505,12 +527,16 @@ func getVMs(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectM var resources []mo.VirtualMachine ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) defer cancel1() - err := root.Retrieve(ctx1, []string{"VirtualMachine"}, []string{"name", "runtime.host", "config.guestId", "config.uuid"}, &resources) + err := root.Retrieve(ctx1, []string{"VirtualMachine"}, []string{"name", "runtime.host", "runtime.powerState", "config.guestId", "config.uuid"}, &resources) if err != nil { return nil, err } m := make(objectMap) for _, r := range resources { + if r.Runtime.PowerState != "poweredOn" { + log.Printf("D! [input.vsphere] Skipped powered off VM: %s", r.Name) + continue + } guest := "unknown" uuid := "" // Sometimes Config is unknown and returns a nil pointer @@ -609,22 +635,18 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n total := 0 nRes := 0 for _, object := range res.objects { - info, found := e.instanceInfo[object.ref.Value] - if !found { - log.Printf("E! [input.vsphere]: Internal error: Instance info not found for MOID %s", object.ref) - } - mr := len(info.metrics) + mr := len(res.metrics) for mr > 0 { mc := mr headroom := maxMetrics - metrics if !res.realTime && mc > headroom { // Metric query limit only applies to non-realtime metrics mc = headroom } - fm := len(info.metrics) - mr + fm := len(res.metrics) - mr pq := types.PerfQuerySpec{ Entity: object.ref, - MaxSample: 1, - MetricId: info.metrics[fm : fm+mc], + MaxSample: rtMetricLookback, + MetricId: res.metrics[fm : fm+mc], IntervalId: res.sampling, Format: "normal", } @@ -694,7 +716,7 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc elapsed := now.Sub(latest).Seconds() + 5.0 // Allow 5 second jitter. log.Printf("D! [input.vsphere]: Latest: %s, elapsed: %f, resource: %s", latest, elapsed, resourceType) if !res.realTime && elapsed < float64(res.sampling) { - // No new data would be available. We're outta herE! [input.vsphere]: + // No new data would be available. We're outta here! log.Printf("D! [input.vsphere]: Sampling period for %s of %d has not elapsed on %s", resourceType, res.sampling, e.URL.Host) return nil @@ -715,7 +737,7 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc wp := NewWorkerPool(10) wp.Run(ctx, func(ctx context.Context, in interface{}) interface{} { chunk := in.([]types.PerfQuerySpec) - n, err := e.collectChunk(ctx, chunk, resourceType, res, acc) + n, err := e.collectChunk(ctx, chunk, resourceType, &res, acc) log.Printf("D! [input.vsphere] CollectChunk for %s returned %d metrics", resourceType, n) if err != nil { return err @@ -754,7 +776,7 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc } func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, resourceType string, - res resourceKind, acc telegraf.Accumulator) (int, error) { + res *resourceKind, acc telegraf.Accumulator) (int, error) { count := 0 prefix := "vsphere" + e.Parent.Separator + resourceType @@ -788,7 +810,7 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, // Iterate through results for _, em := range ems { moid := em.Entity.Reference().Value - instInfo, found := e.instanceInfo[moid] + instInfo, found := res.objects[moid] if !found { log.Printf("E! [input.vsphere]: MOID %s not found in cache. Skipping! (This should not happen!)", moid) continue @@ -808,16 +830,16 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, log.Printf("E! [input.vsphere]: MOID %s not found in cache. Skipping", moid) continue } - e.populateTags(&objectRef, resourceType, &res, t, &v) + e.populateTags(&objectRef, resourceType, res, t, &v) // Now deal with the values. Iterate backwards so we start with the latest value tsKey := moid + "|" + name + "|" + v.Instance for idx := len(v.Value) - 1; idx >= 0; idx-- { ts := em.SampleInfo[idx].Timestamp - // Since non-realtime metrics are queries with a lookback, we need to check the high-water mark + // For queries with a lookback, we need to check the high-water mark // to determine if this should be included. Only samples not seen before should be included. - if !(res.realTime || e.hwMarks.IsNew(tsKey, ts)) { + if !e.hwMarks.IsNew(tsKey, ts) { continue } value := v.Value[idx] @@ -850,9 +872,7 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, count++ // Update highwater marks for non-realtime metrics. - if !res.realTime { - e.hwMarks.Put(tsKey, ts) - } + e.hwMarks.Put(tsKey, ts) } } // We've iterated through all the metrics and collected buckets for each @@ -864,13 +884,13 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, return count, nil } -func (e *Endpoint) getParent(obj resourceInfo) (resourceInfo, bool) { +func (e *Endpoint) getParent(obj objectRef, res *resourceKind) (objectRef, bool) { p := obj.parentRef if p == nil { log.Printf("D! [input.vsphere] No parent found for %s", obj.name) - return resourceInfo{}, false + return objectRef{}, false } - r, ok := e.instanceInfo[p.Value] + r, ok := res.objects[p.Value] return r, ok } @@ -885,14 +905,14 @@ func (e *Endpoint) populateTags(objectRef *objectRef, resourceType string, resou } // Map parent reference - parent, found := e.instanceInfo[objectRef.parentRef.Value] + parent, found := resource.objects[objectRef.parentRef.Value] if found { t[resource.parentTag] = parent.name if resourceType == "vm" { if objectRef.guest != "" { t["guest"] = objectRef.guest } - if c, ok := e.getParent(parent); ok { + if c, ok := e.getParent(parent, resource); ok { t["clustername"] = c.name } } diff --git a/plugins/outputs/wavefront/wavefront.go b/plugins/outputs/wavefront/wavefront.go index ef36d1804045f..df1d42158dc07 100644 --- a/plugins/outputs/wavefront/wavefront.go +++ b/plugins/outputs/wavefront/wavefront.go @@ -122,11 +122,11 @@ func (w *Wavefront) Write(metrics []telegraf.Metric) error { return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error()) } defer connection.Close() - connection.SetWriteDeadline(time.Now().Add(5 * time.Second)) for _, m := range metrics { for _, metricPoint := range buildMetrics(m, w) { metricLine := formatMetricPoint(metricPoint, w) + connection.SetWriteDeadline(time.Now().Add(30 * time.Second)) _, err := connection.Write([]byte(metricLine)) if err != nil { return fmt.Errorf("Wavefront: TCP writing error %s", err.Error()) From 5fdabdbd18e63bb72f9493128e2999a0fb3f05f9 Mon Sep 17 00:00:00 2001 From: prydin Date: Sun, 18 Nov 2018 09:24:58 -0500 Subject: [PATCH 02/18] Use timestamp of latest sample as start point for next round --- plugins/inputs/vsphere/endpoint.go | 77 +++++++++++++++++------------- 1 file changed, 44 insertions(+), 33 deletions(-) diff --git a/plugins/inputs/vsphere/endpoint.go b/plugins/inputs/vsphere/endpoint.go index 27f6653398437..44e6b408e5de2 100644 --- a/plugins/inputs/vsphere/endpoint.go +++ b/plugins/inputs/vsphere/endpoint.go @@ -29,6 +29,8 @@ const metricLookback = 3 const rtMetricLookback = 3 +const maxSampleConst = 10 + // Endpoint is a high-level representation of a connected vCenter endpoint. It is backed by the lower // level Client type. type Endpoint struct { @@ -57,6 +59,7 @@ type resourceKind struct { filters filter.Filter metrics performance.MetricList collectInstances bool + parent string getObjects func(context.Context, *Endpoint, *view.ContainerView) (objectMap, error) } @@ -90,6 +93,15 @@ type metricQResponse struct { type multiError []error +func (e *Endpoint) getParent(obj *objectRef, res *resourceKind) (*objectRef, bool) { + if pKind, ok := e.resourceKinds[res.parent]; ok { + if p, ok := pKind.objects[obj.parentRef.Value]; ok { + return &p, true + } + } + return nil, false +} + // NewEndpoint returns a new connection to a vCenter based on the URL and configuration passed // as parameters. func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, error) { @@ -115,6 +127,7 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, filters: newFilterOrPanic(parent.DatacenterMetricInclude, parent.DatacenterMetricExclude), collectInstances: parent.DatacenterInstances, getObjects: getDatacenters, + parent: "", }, "cluster": { name: "cluster", @@ -127,6 +140,7 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, filters: newFilterOrPanic(parent.ClusterMetricInclude, parent.ClusterMetricExclude), collectInstances: parent.ClusterInstances, getObjects: getClusters, + parent: "datacenter", }, "host": { name: "host", @@ -139,6 +153,7 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, filters: newFilterOrPanic(parent.HostMetricInclude, parent.HostMetricExclude), collectInstances: parent.HostInstances, getObjects: getHosts, + parent: "cluster", }, "vm": { name: "vm", @@ -151,6 +166,7 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, filters: newFilterOrPanic(parent.VMMetricInclude, parent.VMMetricExclude), collectInstances: parent.VMInstances, getObjects: getVMs, + parent: "host", }, "datastore": { name: "datastore", @@ -162,6 +178,7 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, filters: newFilterOrPanic(parent.DatastoreMetricInclude, parent.DatastoreMetricExclude), collectInstances: parent.DatastoreInstances, getObjects: getDatastores, + parent: "", }, } @@ -645,24 +662,17 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n fm := len(res.metrics) - mr pq := types.PerfQuerySpec{ Entity: object.ref, - MaxSample: rtMetricLookback, + MaxSample: maxSampleConst, MetricId: res.metrics[fm : fm+mc], IntervalId: res.sampling, Format: "normal", } - // For non-realtime metrics, we need to look back a few samples in case - // the vCenter is late reporting metrics. - if !res.realTime { - pq.MaxSample = metricLookback - } - // Look back 3 sampling periods - start := latest.Add(time.Duration(-res.sampling) * time.Second * (metricLookback - 1)) - if !res.realTime { - pq.StartTime = &start - pq.EndTime = &now - } + //start := latest.Add(time.Duration(-res.sampling) * time.Second * (metricLookback - 1)) + pq.StartTime = &latest + pq.EndTime = &now + pqs = append(pqs, pq) mr -= mc metrics += mc @@ -733,16 +743,23 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc count := int64(0) + var tsMux sync.Mutex + latestSample := time.Time{} // Set up a worker pool for collecting chunk metrics wp := NewWorkerPool(10) wp.Run(ctx, func(ctx context.Context, in interface{}) interface{} { chunk := in.([]types.PerfQuerySpec) - n, err := e.collectChunk(ctx, chunk, resourceType, &res, acc) + n, localLatest, err := e.collectChunk(ctx, chunk, resourceType, &res, acc) log.Printf("D! [input.vsphere] CollectChunk for %s returned %d metrics", resourceType, n) if err != nil { return err } atomic.AddInt64(&count, int64(n)) + tsMux.Lock() + defer tsMux.Unlock() + if localLatest.After(latestSample) { + latestSample = localLatest + } return nil }, e.Parent.CollectConcurrency) @@ -765,8 +782,8 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc } return true }) - e.lastColls[resourceType] = now // Use value captured at the beginning to avoid blind spots. - + log.Printf("D! [input.vsphere] Latest sample for %s set to %s", resourceType, latestSample) + e.lastColls[resourceType] = latestSample sw.Stop() SendInternalCounterWithTags("gather_count", e.URL.Host, internalTags, count) if len(merr) > 0 { @@ -776,34 +793,35 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc } func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, resourceType string, - res *resourceKind, acc telegraf.Accumulator) (int, error) { + res *resourceKind, acc telegraf.Accumulator) (int, time.Time, error) { + latestSample := time.Time{} count := 0 prefix := "vsphere" + e.Parent.Separator + resourceType client, err := e.clientFactory.GetClient(ctx) if err != nil { - return 0, err + return count, latestSample, err } ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) defer cancel1() metricInfo, err := client.Perf.CounterInfoByName(ctx1) if err != nil { - return count, err + return count, latestSample, err } ctx2, cancel2 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) defer cancel2() metrics, err := client.Perf.Query(ctx2, pqs) if err != nil { - return count, err + return count, latestSample, err } ctx3, cancel3 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) defer cancel3() ems, err := client.Perf.ToMetricSeries(ctx3, metrics) if err != nil { - return count, err + return count, latestSample, err } log.Printf("D! [input.vsphere] Query for %s returned metrics for %d objects", resourceType, len(ems)) @@ -836,6 +854,9 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, tsKey := moid + "|" + name + "|" + v.Instance for idx := len(v.Value) - 1; idx >= 0; idx-- { ts := em.SampleInfo[idx].Timestamp + if ts.After(latestSample) { + latestSample = ts + } // For queries with a lookback, we need to check the high-water mark // to determine if this should be included. Only samples not seen before should be included. @@ -881,17 +902,7 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, acc.AddFields(bucket.name, bucket.fields, bucket.tags, bucket.ts) } } - return count, nil -} - -func (e *Endpoint) getParent(obj objectRef, res *resourceKind) (objectRef, bool) { - p := obj.parentRef - if p == nil { - log.Printf("D! [input.vsphere] No parent found for %s", obj.name) - return objectRef{}, false - } - r, ok := res.objects[p.Value] - return r, ok + return count, latestSample, nil } func (e *Endpoint) populateTags(objectRef *objectRef, resourceType string, resource *resourceKind, t map[string]string, v *performance.MetricSeries) { @@ -905,14 +916,14 @@ func (e *Endpoint) populateTags(objectRef *objectRef, resourceType string, resou } // Map parent reference - parent, found := resource.objects[objectRef.parentRef.Value] + parent, found := e.getParent(objectRef, resource) if found { t[resource.parentTag] = parent.name if resourceType == "vm" { if objectRef.guest != "" { t["guest"] = objectRef.guest } - if c, ok := e.getParent(parent, resource); ok { + if c, ok := e.resourceKinds["cluster"].objects[parent.parentRef.Value]; ok { t["clustername"] = c.name } } From 6c4cba00a57e05af1af178db4bf78c931fe868ff Mon Sep 17 00:00:00 2001 From: prydin Date: Tue, 27 Nov 2018 16:42:12 -0500 Subject: [PATCH 03/18] * Improved collection concurrency (one goroutine per object type) * Don't stop collection on partial error * Compute average if query returned multiple samples for a metric --- plugins/inputs/vsphere/endpoint.go | 208 ++++++++++++++++------------- plugins/inputs/vsphere/tscache.go | 8 ++ 2 files changed, 121 insertions(+), 95 deletions(-) diff --git a/plugins/inputs/vsphere/endpoint.go b/plugins/inputs/vsphere/endpoint.go index 44e6b408e5de2..864807b579a71 100644 --- a/plugins/inputs/vsphere/endpoint.go +++ b/plugins/inputs/vsphere/endpoint.go @@ -34,10 +34,10 @@ const maxSampleConst = 10 // Endpoint is a high-level representation of a connected vCenter endpoint. It is backed by the lower // level Client type. type Endpoint struct { - Parent *VSphere - URL *url.URL - lastColls map[string]time.Time - //instanceInfo map[string]resourceInfo + Parent *VSphere + URL *url.URL + lastColls map[string]time.Time + lastColl time.Time resourceKinds map[string]resourceKind hwMarks *TSCache lun2ds map[string]string @@ -403,51 +403,54 @@ func (e *Endpoint) discover(ctx context.Context) error { } } - // Get metric metadata and filter metrics - prob := 100.0 / float64(len(objects)) - log.Printf("D! [input.vsphere] Probability of sampling a resource: %f", prob) - wg := sync.WaitGroup{} - limiter := make(chan struct{}, e.Parent.DiscoverConcurrency) - for _, obj := range objects { - if rand.Float64() > prob { - continue - } - wg.Add(1) - go func(obj objectRef) { - defer wg.Done() - limiter <- struct{}{} - defer func() { - <-limiter - }() - metrics, err := e.getMetadata2(ctx, obj, res.sampling) - if err != nil { - log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err) + // No need to collect metric metadata if resource type is not enabled + if res.enabled { + // Get metric metadata and filter metrics + prob := 100.0 / float64(len(objects)) + log.Printf("D! [input.vsphere] Probability of sampling a resource: %f", prob) + wg := sync.WaitGroup{} + limiter := make(chan struct{}, e.Parent.DiscoverConcurrency) + for _, obj := range objects { + if rand.Float64() > prob { + continue } - mMap := make(map[string]types.PerfMetricId) - for _, m := range metrics { - if m.Instance != "" && res.collectInstances { - m.Instance = "*" - } else { - m.Instance = "" + wg.Add(1) + go func(obj objectRef) { + defer wg.Done() + limiter <- struct{}{} + defer func() { + <-limiter + }() + metrics, err := e.getMetadata2(ctx, obj, res.sampling) + if err != nil { + log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err) } - if res.filters.Match(metricNames[m.CounterId]) { - mMap[strconv.Itoa(int(m.CounterId))+"|"+m.Instance] = m + mMap := make(map[string]types.PerfMetricId) + for _, m := range metrics { + if m.Instance != "" && res.collectInstances { + m.Instance = "*" + } else { + m.Instance = "" + } + if res.filters.Match(metricNames[m.CounterId]) { + mMap[strconv.Itoa(int(m.CounterId))+"|"+m.Instance] = m + } } - } - log.Printf("D! [input.vsphere] Found %d metrics for %s", len(mMap), obj.name) - instInfoMux.Lock() - defer instInfoMux.Unlock() - if len(mMap) > len(res.metrics) { - res.metrics = make(performance.MetricList, len(mMap)) - i := 0 - for _, m := range mMap { - res.metrics[i] = m - i++ + log.Printf("D! [input.vsphere] Found %d metrics for %s", len(mMap), obj.name) + instInfoMux.Lock() + defer instInfoMux.Unlock() + if len(mMap) > len(res.metrics) { + res.metrics = make(performance.MetricList, len(mMap)) + i := 0 + for _, m := range mMap { + res.metrics[i] = m + i++ + } } - } - }(obj) + }(obj) + } + wg.Wait() } - wg.Wait() res.objects = objects resourceKinds[k] = res } @@ -601,7 +604,6 @@ func (e *Endpoint) Collect(ctx context.Context, acc telegraf.Accumulator) error // If we never managed to do a discovery, collection will be a no-op. Therefore, // we need to check that a connection is available, or the collection will // silently fail. - // if _, err := e.clientFactory.GetClient(ctx); err != nil { return err } @@ -614,21 +616,26 @@ func (e *Endpoint) Collect(ctx context.Context, acc telegraf.Accumulator) error } // If discovery interval is disabled (0), discover on each collection cycle - // if e.Parent.ObjectDiscoveryInterval.Duration == 0 { err := e.discover(ctx) if err != nil { return err } } + var wg sync.WaitGroup for k, res := range e.resourceKinds { if res.enabled { - err := e.collectResource(ctx, k, acc) - if err != nil { - return err - } + wg.Add(1) + go func(k string) { + defer wg.Done() + err := e.collectResource(ctx, k, acc) + if err != nil { + acc.AddError(err) + } + }(k) } } + wg.Wait() // Purge old timestamps from the cache e.hwMarks.Purge() @@ -668,9 +675,12 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n Format: "normal", } - // Look back 3 sampling periods - //start := latest.Add(time.Duration(-res.sampling) * time.Second * (metricLookback - 1)) - pq.StartTime = &latest + start, ok := e.hwMarks.Get(object.ref.Value) + if !ok { + // Look back 3 sampling periods by default + start = latest.Add(time.Duration(-res.sampling) * time.Second * (metricLookback - 1)) + } + pq.StartTime = &start pq.EndTime = &now pqs = append(pqs, pq) @@ -711,7 +721,6 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc telegraf.Accumulator) error { - // Do we have new data yet? res := e.resourceKinds[resourceType] client, err := e.clientFactory.GetClient(ctx) if err != nil { @@ -734,6 +743,7 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc } else { latest = now.Add(time.Duration(-res.sampling) * time.Second) } + e.lastColl = now internalTags := map[string]string{"resourcetype": resourceType} sw := NewStopwatchWithTags("gather_duration", e.URL.Host, internalTags) @@ -757,7 +767,7 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc atomic.AddInt64(&count, int64(n)) tsMux.Lock() defer tsMux.Unlock() - if localLatest.After(latestSample) { + if localLatest.After(latestSample) && !localLatest.IsZero() { latestSample = localLatest } return nil @@ -783,7 +793,9 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc return true }) log.Printf("D! [input.vsphere] Latest sample for %s set to %s", resourceType, latestSample) - e.lastColls[resourceType] = latestSample + if !latestSample.IsZero() { + e.lastColls[resourceType] = latestSample + } sw.Stop() SendInternalCounterWithTags("gather_count", e.URL.Host, internalTags, count) if len(merr) > 0 { @@ -850,51 +862,57 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, } e.populateTags(&objectRef, resourceType, res, t, &v) - // Now deal with the values. Iterate backwards so we start with the latest value - tsKey := moid + "|" + name + "|" + v.Instance - for idx := len(v.Value) - 1; idx >= 0; idx-- { - ts := em.SampleInfo[idx].Timestamp - if ts.After(latestSample) { - latestSample = ts - } - - // For queries with a lookback, we need to check the high-water mark - // to determine if this should be included. Only samples not seen before should be included. - if !e.hwMarks.IsNew(tsKey, ts) { - continue - } - value := v.Value[idx] - - // Organize the metrics into a bucket per measurement. - // Data SHOULD be presented to us with the same timestamp for all samples, but in case - // they don't we use the measurement name + timestamp as the key for the bucket. - mn, fn := e.makeMetricIdentifier(prefix, name) - bKey := mn + " " + v.Instance + " " + strconv.FormatInt(ts.UnixNano(), 10) - bucket, found := buckets[bKey] - if !found { - bucket = metricEntry{name: mn, ts: ts, fields: make(map[string]interface{}), tags: t} - buckets[bKey] = bucket - } + avg := float64(0) + nValues := 0 + //log.Printf("D! [input.vsphere] %s %d samples", name, len(v.Value)) + for idx, sample := range em.SampleInfo { + value := float64(v.Value[idx]) if value < 0 { - log.Printf("D! [input.vsphere]: Negative value for %s on %s. Indicates missing samples", name, objectRef.name) continue } - - // Percentage values must be scaled down by 100. - info, ok := metricInfo[name] - if !ok { - log.Printf("E! [input.vsphere]: Could not determine unit for %s. Skipping", name) - } - if info.UnitInfo.GetElementDescription().Key == "percent" { - bucket.fields[fn] = float64(value) / 100.0 - } else { - bucket.fields[fn] = value + ts := sample.Timestamp + if ts.After(latestSample) { + latestSample = ts } - count++ + avg += float64(value) + nValues++ + } + if nValues == 0 { + log.Printf("D! [input.vsphere]: Missing value for: %s, %s", name, objectRef.name) + continue + } + + // If we're catching up with metrics arriving late, calculate the average + // of them and pick the midpoint timestamp. This is a reasonable way of + // filling in missed collections that doesn't cause us to deliver metrics + // faster than the interval. + avg /= float64(nValues) + midTs := em.SampleInfo[len(em.SampleInfo)/2].Timestamp + + // Organize the metrics into a bucket per measurement. + mn, fn := e.makeMetricIdentifier(prefix, name) + bKey := mn + " " + v.Instance + " " + strconv.FormatInt(midTs.UnixNano(), 10) + bucket, found := buckets[bKey] + if !found { + bucket = metricEntry{name: mn, ts: midTs, fields: make(map[string]interface{}), tags: t} + buckets[bKey] = bucket + } - // Update highwater marks for non-realtime metrics. - e.hwMarks.Put(tsKey, ts) + // Percentage values must be scaled down by 100. + info, ok := metricInfo[name] + if !ok { + log.Printf("E! [input.vsphere]: Could not determine unit for %s. Skipping", name) } + if info.UnitInfo.GetElementDescription().Key == "percent" { + bucket.fields[fn] = float64(avg) / 100.0 + } else { + bucket.fields[fn] = avg + } + count++ + + // Update highwater marks + e.hwMarks.Put(moid, latestSample) + } // We've iterated through all the metrics and collected buckets for each // measurement name. Now emit them! diff --git a/plugins/inputs/vsphere/tscache.go b/plugins/inputs/vsphere/tscache.go index 9abe24ea725c5..1d1f00ebea3cc 100644 --- a/plugins/inputs/vsphere/tscache.go +++ b/plugins/inputs/vsphere/tscache.go @@ -49,6 +49,14 @@ func (t *TSCache) IsNew(key string, tm time.Time) bool { return !tm.Before(v) } +// Get returns a timestamp (if present) +func (t *TSCache) Get(key string) (time.Time, bool) { + t.mux.RLock() + defer t.mux.RUnlock() + ts, ok := t.table[key] + return ts, ok +} + // Put updates the latest timestamp for the supplied key. func (t *TSCache) Put(key string, time time.Time) { t.mux.Lock() From 26e153627a2cd0fce67a74a070a5b6de36dbbee4 Mon Sep 17 00:00:00 2001 From: prydin Date: Wed, 28 Nov 2018 11:18:51 -0500 Subject: [PATCH 04/18] Added hard 100000 metric query limit --- plugins/inputs/vsphere/endpoint.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/vsphere/endpoint.go b/plugins/inputs/vsphere/endpoint.go index 864807b579a71..de6a68311ca28 100644 --- a/plugins/inputs/vsphere/endpoint.go +++ b/plugins/inputs/vsphere/endpoint.go @@ -690,7 +690,8 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n // We need to dump the current chunk of metrics for one of two reasons: // 1) We filled up the metric quota while processing the current resource // 2) We are at the last resource and have no more data to process. - if mr > 0 || (!res.realTime && metrics >= maxMetrics) || nRes >= e.Parent.MaxQueryObjects { + // 3) The query contains more than 100,000 individual metrics + if mr > 0 || (!res.realTime && metrics >= maxMetrics) || nRes >= e.Parent.MaxQueryObjects || len(pqs) > 100000 { log.Printf("D! [input.vsphere]: Queueing query: %d objects, %d metrics (%d remaining) of type %s for %s. Processed objects: %d. Total objects %d", len(pqs), metrics, mr, res.name, e.URL.Host, total+1, len(res.objects)) @@ -710,7 +711,6 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n nRes++ } // There may be dangling stuff in the queue. Handle them - // if len(pqs) > 0 { // Call push function log.Printf("D! [input.vsphere]: Queuing query: %d objects, %d metrics (0 remaining) of type %s for %s. Total objects %d (final chunk)", From aaa675476fdf71de872d931214e80b4797d5741a Mon Sep 17 00:00:00 2001 From: prydin Date: Fri, 30 Nov 2018 09:15:48 -0500 Subject: [PATCH 05/18] Moved timeout logic to client.go --- plugins/inputs/vsphere/client.go | 32 ++++++++++++- plugins/inputs/vsphere/endpoint.go | 74 ++++++++---------------------- 2 files changed, 49 insertions(+), 57 deletions(-) diff --git a/plugins/inputs/vsphere/client.go b/plugins/inputs/vsphere/client.go index ebad2bea79d30..176bfb425f4c3 100644 --- a/plugins/inputs/vsphere/client.go +++ b/plugins/inputs/vsphere/client.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/vmware/govmomi/vim25/types" + "github.com/vmware/govmomi" "github.com/vmware/govmomi/object" "github.com/vmware/govmomi/performance" @@ -76,6 +78,7 @@ func (cf *ClientFactory) GetClient(ctx context.Context) (*Client, error) { ctx2, cancel2 := context.WithTimeout(ctx, cf.parent.Timeout.Duration) defer cancel2() if cf.client.Client.SessionManager.Login(ctx2, url.UserPassword(cf.parent.Username, cf.parent.Password)) != nil { + log.Printf("W! [input.vsphere]: Client reauthentication failed.") return nil, err } } @@ -205,6 +208,8 @@ func (c *Client) close() { // GetServerTime returns the time at the vCenter server func (c *Client) GetServerTime(ctx context.Context) (time.Time, error) { + ctx, cancel := context.WithTimeout(ctx, c.Timeout) + defer cancel() t, err := methods.GetCurrentTime(ctx, c.Client) if err != nil { return time.Time{}, err @@ -235,7 +240,7 @@ func (c *Client) GetMaxQueryMetrics(ctx context.Context) (int, error) { // Fall through version-based inference if value isn't usable } } else { - log.Println("I! [input.vsphere] Option query for maxQueryMetrics failed. Using default") + log.Println("D! [input.vsphere] Option query for maxQueryMetrics failed. Using default") } // No usable maxQueryMetrics setting. Infer based on version @@ -255,3 +260,28 @@ func (c *Client) GetMaxQueryMetrics(ctx context.Context) (int, error) { } return 256, nil } + +func (c *Client) QueryMetrics(ctx context.Context, pqs []types.PerfQuerySpec) ([]performance.EntityMetric, error) { + ctx1, cancel1 := context.WithTimeout(ctx, c.Timeout) + defer cancel1() + metrics, err := c.Perf.Query(ctx1, pqs) + if err != nil { + return nil, err + } + + ctx2, cancel2 := context.WithTimeout(ctx, c.Timeout) + defer cancel2() + return c.Perf.ToMetricSeries(ctx2, metrics) +} + +func (c *Client) CounterInfoByName(ctx context.Context) (map[string]*types.PerfCounterInfo, error) { + ctx1, cancel1 := context.WithTimeout(ctx, c.Timeout) + defer cancel1() + return c.Perf.CounterInfoByName(ctx1) +} + +func (c *Client) ListResources(ctx context.Context, root *view.ContainerView, kind []string, ps []string, dst interface{}) error { + ctx1, cancel1 := context.WithTimeout(ctx, c.Timeout) + defer cancel1() + return root.Retrieve(ctx1, kind, ps, dst) +} diff --git a/plugins/inputs/vsphere/endpoint.go b/plugins/inputs/vsphere/endpoint.go index de6a68311ca28..05492a97841c6 100644 --- a/plugins/inputs/vsphere/endpoint.go +++ b/plugins/inputs/vsphere/endpoint.go @@ -60,7 +60,7 @@ type resourceKind struct { metrics performance.MetricList collectInstances bool parent string - getObjects func(context.Context, *Endpoint, *view.ContainerView) (objectMap, error) + getObjects func(context.Context, *Client, *Endpoint, *view.ContainerView) (objectMap, error) } type metricEntry struct { @@ -276,10 +276,7 @@ func (e *Endpoint) getMetricNameMap(ctx context.Context) (map[int32]string, erro return nil, err } - ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel1() - mn, err := client.Perf.CounterInfoByName(ctx1) - + mn, err := client.CounterInfoByName(ctx) if err != nil { return nil, err } @@ -290,7 +287,7 @@ func (e *Endpoint) getMetricNameMap(ctx context.Context) (map[int32]string, erro return names, nil } -func (e *Endpoint) getMetadata2(ctx context.Context, obj objectRef, sampling int32) (performance.MetricList, error) { +func (e *Endpoint) getMetadata(ctx context.Context, obj objectRef, sampling int32) (performance.MetricList, error) { client, err := e.clientFactory.GetClient(ctx) if err != nil { return nil, err @@ -299,28 +296,12 @@ func (e *Endpoint) getMetadata2(ctx context.Context, obj objectRef, sampling int ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) defer cancel1() metrics, err := client.Perf.AvailableMetric(ctx1, obj.ref.Reference(), sampling) - if err != nil && err != context.Canceled { + if err != nil { return nil, err } return metrics, nil } -func (e *Endpoint) getMetadata(ctx context.Context, in interface{}) interface{} { - client, err := e.clientFactory.GetClient(ctx) - if err != nil { - return err - } - - rq := in.(*metricQRequest) - ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel1() - metrics, err := client.Perf.AvailableMetric(ctx1, rq.obj.ref.Reference(), rq.res.sampling) - if err != nil && err != context.Canceled { - log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err) - } - return &metricQResponse{metrics: &metrics, obj: rq.obj} -} - func (e *Endpoint) getDatacenterName(ctx context.Context, client *Client, cache map[string]string, r types.ManagedObjectReference) string { path := make([]string, 0) returnVal := "" @@ -388,7 +369,7 @@ func (e *Endpoint) discover(ctx context.Context) error { log.Printf("D! [input.vsphere] Discovering resources for %s", res.name) // Need to do this for all resource types even if they are not enabled if res.enabled || k != "vm" { - objects, err := res.getObjects(ctx, e, client.Root) + objects, err := res.getObjects(ctx, client, e, client.Root) if err != nil { return err } @@ -421,7 +402,7 @@ func (e *Endpoint) discover(ctx context.Context) error { defer func() { <-limiter }() - metrics, err := e.getMetadata2(ctx, obj, res.sampling) + metrics, err := e.getMetadata(ctx, obj, res.sampling) if err != nil { log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err) } @@ -479,11 +460,9 @@ func (e *Endpoint) discover(ctx context.Context) error { return nil } -func getDatacenters(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) { +func getDatacenters(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) { var resources []mo.Datacenter - ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel1() - err := root.Retrieve(ctx1, []string{"Datacenter"}, []string{"name", "parent"}, &resources) + err := client.ListResources(ctx, root, []string{"Datacenter"}, []string{"name", "parent"}, &resources) if err != nil { return nil, err } @@ -495,11 +474,9 @@ func getDatacenters(ctx context.Context, e *Endpoint, root *view.ContainerView) return m, nil } -func getClusters(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) { +func getClusters(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) { var resources []mo.ClusterComputeResource - ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel1() - err := root.Retrieve(ctx1, []string{"ClusterComputeResource"}, []string{"name", "parent"}, &resources) + err := client.ListResources(ctx, root, []string{"ClusterComputeResource"}, []string{"name", "parent"}, &resources) if err != nil { return nil, err } @@ -529,9 +506,9 @@ func getClusters(ctx context.Context, e *Endpoint, root *view.ContainerView) (ob return m, nil } -func getHosts(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) { +func getHosts(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) { var resources []mo.HostSystem - err := root.Retrieve(ctx, []string{"HostSystem"}, []string{"name", "parent"}, &resources) + err := client.ListResources(ctx, root, []string{"HostSystem"}, []string{"name", "parent"}, &resources) if err != nil { return nil, err } @@ -543,11 +520,9 @@ func getHosts(ctx context.Context, e *Endpoint, root *view.ContainerView) (objec return m, nil } -func getVMs(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) { +func getVMs(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) { var resources []mo.VirtualMachine - ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel1() - err := root.Retrieve(ctx1, []string{"VirtualMachine"}, []string{"name", "runtime.host", "runtime.powerState", "config.guestId", "config.uuid"}, &resources) + err := client.ListResources(ctx, root, []string{"VirtualMachine"}, []string{"name", "runtime.host", "runtime.powerState", "config.guestId", "config.uuid"}, &resources) if err != nil { return nil, err } @@ -571,11 +546,9 @@ func getVMs(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectM return m, nil } -func getDatastores(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) { +func getDatastores(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) { var resources []mo.Datastore - ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel1() - err := root.Retrieve(ctx1, []string{"Datastore"}, []string{"name", "parent", "info"}, &resources) + err := client.ListResources(ctx, root, []string{"Datastore"}, []string{"name", "parent", "info"}, &resources) if err != nil { return nil, err } @@ -720,7 +693,6 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n } func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc telegraf.Accumulator) error { - res := e.resourceKinds[resourceType] client, err := e.clientFactory.GetClient(ctx) if err != nil { @@ -815,26 +787,16 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, return count, latestSample, err } - ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel1() - metricInfo, err := client.Perf.CounterInfoByName(ctx1) + metricInfo, err := client.CounterInfoByName(ctx) if err != nil { return count, latestSample, err } - ctx2, cancel2 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel2() - metrics, err := client.Perf.Query(ctx2, pqs) + ems, err := client.QueryMetrics(ctx, pqs) if err != nil { return count, latestSample, err } - ctx3, cancel3 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel3() - ems, err := client.Perf.ToMetricSeries(ctx3, metrics) - if err != nil { - return count, latestSample, err - } log.Printf("D! [input.vsphere] Query for %s returned metrics for %d objects", resourceType, len(ems)) // Iterate through results From e9956cac9178336cd12b9bffea08d1bbdbd77649 Mon Sep 17 00:00:00 2001 From: prydin Date: Tue, 4 Dec 2018 14:45:26 -0500 Subject: [PATCH 06/18] Removed WorkerPool and added ThrottledExecutor instead --- plugins/inputs/vsphere/client.go | 10 + plugins/inputs/vsphere/endpoint.go | 247 ++++++++++++----------- plugins/inputs/vsphere/throttled_exec.go | 35 ++++ plugins/inputs/vsphere/vsphere.go | 5 - plugins/inputs/vsphere/vsphere_test.go | 46 +++-- plugins/inputs/vsphere/workerpool.go | 119 ----------- 6 files changed, 204 insertions(+), 258 deletions(-) create mode 100644 plugins/inputs/vsphere/throttled_exec.go delete mode 100644 plugins/inputs/vsphere/workerpool.go diff --git a/plugins/inputs/vsphere/client.go b/plugins/inputs/vsphere/client.go index 176bfb425f4c3..37f4b2c31776e 100644 --- a/plugins/inputs/vsphere/client.go +++ b/plugins/inputs/vsphere/client.go @@ -261,6 +261,7 @@ func (c *Client) GetMaxQueryMetrics(ctx context.Context) (int, error) { return 256, nil } +// QueryMetrics wraps performance.Query to give it proper timeouts func (c *Client) QueryMetrics(ctx context.Context, pqs []types.PerfQuerySpec) ([]performance.EntityMetric, error) { ctx1, cancel1 := context.WithTimeout(ctx, c.Timeout) defer cancel1() @@ -274,12 +275,21 @@ func (c *Client) QueryMetrics(ctx context.Context, pqs []types.PerfQuerySpec) ([ return c.Perf.ToMetricSeries(ctx2, metrics) } +// CounterInfoByName wraps performance.CounterInfoByName to give it proper timeouts func (c *Client) CounterInfoByName(ctx context.Context) (map[string]*types.PerfCounterInfo, error) { ctx1, cancel1 := context.WithTimeout(ctx, c.Timeout) defer cancel1() return c.Perf.CounterInfoByName(ctx1) } +// CounterInfoByKey wraps performance.CounterInfoByKey to give it proper timeouts +func (c *Client) CounterInfoByKey(ctx context.Context) (map[int32]*types.PerfCounterInfo, error) { + ctx1, cancel1 := context.WithTimeout(ctx, c.Timeout) + defer cancel1() + return c.Perf.CounterInfoByKey(ctx1) +} + +// ListResources wraps property.Collector.Retrieve to give it proper timeouts func (c *Client) ListResources(ctx context.Context, root *view.ContainerView, kind []string, ps []string, dst interface{}) error { ctx1, cancel1 := context.WithTimeout(ctx, c.Timeout) defer cancel1() diff --git a/plugins/inputs/vsphere/endpoint.go b/plugins/inputs/vsphere/endpoint.go index 05492a97841c6..aba2a6ea08329 100644 --- a/plugins/inputs/vsphere/endpoint.go +++ b/plugins/inputs/vsphere/endpoint.go @@ -2,6 +2,7 @@ package vsphere import ( "context" + "errors" "fmt" "log" "math/rand" @@ -57,6 +58,8 @@ type resourceKind struct { sampling int32 objects objectMap filters filter.Filter + include []string + simple bool metrics performance.MetricList collectInstances bool parent string @@ -91,8 +94,6 @@ type metricQResponse struct { metrics *performance.MetricList } -type multiError []error - func (e *Endpoint) getParent(obj *objectRef, res *resourceKind) (*objectRef, bool) { if pKind, ok := e.resourceKinds[res.parent]; ok { if p, ok := pKind.objects[obj.parentRef.Value]; ok { @@ -125,6 +126,8 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, sampling: 300, objects: make(objectMap), filters: newFilterOrPanic(parent.DatacenterMetricInclude, parent.DatacenterMetricExclude), + simple: isSimple(parent.DatacenterMetricInclude, parent.DatacenterMetricExclude), + include: parent.DatacenterMetricInclude, collectInstances: parent.DatacenterInstances, getObjects: getDatacenters, parent: "", @@ -138,6 +141,8 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, sampling: 300, objects: make(objectMap), filters: newFilterOrPanic(parent.ClusterMetricInclude, parent.ClusterMetricExclude), + simple: isSimple(parent.ClusterMetricInclude, parent.ClusterMetricExclude), + include: parent.ClusterMetricInclude, collectInstances: parent.ClusterInstances, getObjects: getClusters, parent: "datacenter", @@ -151,6 +156,8 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, sampling: 20, objects: make(objectMap), filters: newFilterOrPanic(parent.HostMetricInclude, parent.HostMetricExclude), + simple: isSimple(parent.HostMetricInclude, parent.HostMetricExclude), + include: parent.HostMetricInclude, collectInstances: parent.HostInstances, getObjects: getHosts, parent: "cluster", @@ -164,6 +171,8 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, sampling: 20, objects: make(objectMap), filters: newFilterOrPanic(parent.VMMetricInclude, parent.VMMetricExclude), + simple: isSimple(parent.VMMetricInclude, parent.VMMetricExclude), + include: parent.VMMetricInclude, collectInstances: parent.VMInstances, getObjects: getVMs, parent: "host", @@ -176,6 +185,8 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, sampling: 300, objects: make(objectMap), filters: newFilterOrPanic(parent.DatastoreMetricInclude, parent.DatastoreMetricExclude), + simple: isSimple(parent.DatastoreMetricInclude, parent.DatastoreMetricExclude), + include: parent.DatastoreMetricInclude, collectInstances: parent.DatastoreInstances, getObjects: getDatastores, parent: "", @@ -188,24 +199,6 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, return &e, err } -func (m multiError) Error() string { - switch len(m) { - case 0: - return "No error recorded. Something is wrong!" - case 1: - return m[0].Error() - default: - s := "Multiple errors detected concurrently: " - for i, e := range m { - if i != 0 { - s += ", " - } - s += e.Error() - } - return s - } -} - func anythingEnabled(ex []string) bool { for _, s := range ex { if s == "*" { @@ -223,6 +216,18 @@ func newFilterOrPanic(include []string, exclude []string) filter.Filter { return f } +func isSimple(include []string, exclude []string) bool { + if len(exclude) > 0 || len(include) == 0 { + return false + } + for _, s := range include { + if strings.Contains(s, "*") { + return false + } + } + return true +} + func (e *Endpoint) startDiscovery(ctx context.Context) { e.discoveryTicker = time.NewTicker(e.Parent.ObjectDiscoveryInterval.Duration) go func() { @@ -359,8 +364,6 @@ func (e *Endpoint) discover(ctx context.Context) error { } log.Printf("D! [input.vsphere]: Discover new objects for %s", e.URL.Host) - - instInfoMux := sync.Mutex{} resourceKinds := make(map[string]resourceKind) dcNameCache := make(map[string]string) @@ -386,51 +389,11 @@ func (e *Endpoint) discover(ctx context.Context) error { // No need to collect metric metadata if resource type is not enabled if res.enabled { - // Get metric metadata and filter metrics - prob := 100.0 / float64(len(objects)) - log.Printf("D! [input.vsphere] Probability of sampling a resource: %f", prob) - wg := sync.WaitGroup{} - limiter := make(chan struct{}, e.Parent.DiscoverConcurrency) - for _, obj := range objects { - if rand.Float64() > prob { - continue - } - wg.Add(1) - go func(obj objectRef) { - defer wg.Done() - limiter <- struct{}{} - defer func() { - <-limiter - }() - metrics, err := e.getMetadata(ctx, obj, res.sampling) - if err != nil { - log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err) - } - mMap := make(map[string]types.PerfMetricId) - for _, m := range metrics { - if m.Instance != "" && res.collectInstances { - m.Instance = "*" - } else { - m.Instance = "" - } - if res.filters.Match(metricNames[m.CounterId]) { - mMap[strconv.Itoa(int(m.CounterId))+"|"+m.Instance] = m - } - } - log.Printf("D! [input.vsphere] Found %d metrics for %s", len(mMap), obj.name) - instInfoMux.Lock() - defer instInfoMux.Unlock() - if len(mMap) > len(res.metrics) { - res.metrics = make(performance.MetricList, len(mMap)) - i := 0 - for _, m := range mMap { - res.metrics[i] = m - i++ - } - } - }(obj) + if res.simple { + e.simpleMetadataSelect(ctx, client, &res) + } else { + e.complexMetadataSelect(ctx, &res, objects, metricNames) } - wg.Wait() } res.objects = objects resourceKinds[k] = res @@ -460,6 +423,74 @@ func (e *Endpoint) discover(ctx context.Context) error { return nil } +func (e *Endpoint) simpleMetadataSelect(ctx context.Context, client *Client, res *resourceKind) { + log.Printf("D! [input.vsphere] Using fast metric metadata selection for %s", res.name) + m, err := client.CounterInfoByName(ctx) + if err != nil { + log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err) + return + } + res.metrics = make(performance.MetricList, 0, len(res.include)) + for _, s := range res.include { + if pci, ok := m[s]; ok { + cnt := types.PerfMetricId{ + CounterId: pci.Key, + } + if res.collectInstances { + cnt.Instance = "*" + } else { + cnt.Instance = "" + } + res.metrics = append(res.metrics, cnt) + } else { + log.Printf("W! [input.vsphere] Metric name %s is unknown. Will not be collected", s) + } + } +} + +func (e *Endpoint) complexMetadataSelect(ctx context.Context, res *resourceKind, objects objectMap, metricNames map[int32]string) { + prob := 100.0 / float64(len(objects)) + log.Printf("D! [input.vsphere] Probability of sampling a resource: %f", prob) + instInfoMux := sync.Mutex{} + te := NewThrottledExecutor(e.Parent.DiscoverConcurrency) + for _, obj := range objects { + if rand.Float64() > prob { + continue + } + func(obj objectRef) { + te.Run(func() { + metrics, err := e.getMetadata(ctx, obj, res.sampling) + if err != nil { + log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err) + } + mMap := make(map[string]types.PerfMetricId) + for _, m := range metrics { + if m.Instance != "" && res.collectInstances { + m.Instance = "*" + } else { + m.Instance = "" + } + if res.filters.Match(metricNames[m.CounterId]) { + mMap[strconv.Itoa(int(m.CounterId))+"|"+m.Instance] = m + } + } + log.Printf("D! [input.vsphere] Found %d metrics for %s", len(mMap), obj.name) + instInfoMux.Lock() + defer instInfoMux.Unlock() + if len(mMap) > len(res.metrics) { + res.metrics = make(performance.MetricList, len(mMap)) + i := 0 + for _, m := range mMap { + res.metrics[i] = m + i++ + } + } + }) + }(obj) + } + te.Wait() +} + func getDatacenters(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) { var resources []mo.Datacenter err := client.ListResources(ctx, root, []string{"Datacenter"}, []string{"name", "parent"}, &resources) @@ -615,7 +646,15 @@ func (e *Endpoint) Collect(ctx context.Context, acc telegraf.Accumulator) error return nil } -func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, now time.Time, latest time.Time) { +// Workaround to make sure pqs is a copy of the loop variable and won't change. +func submitChunkJob(te *ThrottledExecutor, job func([]types.PerfQuerySpec), pqs []types.PerfQuerySpec) { + te.Run(func() { + job(pqs) + }) +} + +func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Time, latest time.Time, acc telegraf.Accumulator, job func([]types.PerfQuerySpec)) { + te := NewThrottledExecutor(e.Parent.CollectConcurrency) maxMetrics := e.Parent.MaxQueryMetrics if maxMetrics < 1 { maxMetrics = 1 @@ -664,17 +703,17 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n // 1) We filled up the metric quota while processing the current resource // 2) We are at the last resource and have no more data to process. // 3) The query contains more than 100,000 individual metrics - if mr > 0 || (!res.realTime && metrics >= maxMetrics) || nRes >= e.Parent.MaxQueryObjects || len(pqs) > 100000 { + if mr > 0 || nRes >= e.Parent.MaxQueryObjects || len(pqs) > 100000 { log.Printf("D! [input.vsphere]: Queueing query: %d objects, %d metrics (%d remaining) of type %s for %s. Processed objects: %d. Total objects %d", len(pqs), metrics, mr, res.name, e.URL.Host, total+1, len(res.objects)) - // To prevent deadlocks, don't send work items if the context has been cancelled. + // Don't send work items if the context has been cancelled. if ctx.Err() == context.Canceled { return } - // Call push function - f(ctx, pqs) + // Run collection job + submitChunkJob(te, job, pqs) pqs = make([]types.PerfQuerySpec, 0, e.Parent.MaxQueryObjects) metrics = 0 nRes = 0 @@ -683,13 +722,16 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n total++ nRes++ } - // There may be dangling stuff in the queue. Handle them + // Handle final partially filled chunk if len(pqs) > 0 { - // Call push function + // Run collection job log.Printf("D! [input.vsphere]: Queuing query: %d objects, %d metrics (0 remaining) of type %s for %s. Total objects %d (final chunk)", len(pqs), metrics, res.name, e.URL.Host, len(res.objects)) - f(ctx, pqs) + submitChunkJob(te, job, pqs) } + + // Wait for background collection to finish + te.Wait() } func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc telegraf.Accumulator) error { @@ -728,58 +770,35 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc var tsMux sync.Mutex latestSample := time.Time{} // Set up a worker pool for collecting chunk metrics - wp := NewWorkerPool(10) - wp.Run(ctx, func(ctx context.Context, in interface{}) interface{} { - chunk := in.([]types.PerfQuerySpec) - n, localLatest, err := e.collectChunk(ctx, chunk, resourceType, &res, acc) - log.Printf("D! [input.vsphere] CollectChunk for %s returned %d metrics", resourceType, n) - if err != nil { - return err - } - atomic.AddInt64(&count, int64(n)) - tsMux.Lock() - defer tsMux.Unlock() - if localLatest.After(latestSample) && !localLatest.IsZero() { - latestSample = localLatest - } - return nil - - }, e.Parent.CollectConcurrency) - - // Fill the input channel of the worker queue by running the chunking - // logic implemented in chunker() - wp.Fill(ctx, func(ctx context.Context, f PushFunc) { - e.chunker(ctx, f, &res, now, latest) - }) + e.chunkify(ctx, &res, now, latest, acc, + func(chunk []types.PerfQuerySpec) { + n, localLatest, err := e.collectChunk(ctx, chunk, &res, acc) + log.Printf("D! [input.vsphere] CollectChunk for %s returned %d metrics", resourceType, n) + if err != nil { + acc.AddError(errors.New("While collecting " + res.name + ": " + err.Error())) + } + atomic.AddInt64(&count, int64(n)) + tsMux.Lock() + defer tsMux.Unlock() + if localLatest.After(latestSample) && !localLatest.IsZero() { + latestSample = localLatest + } + }) - // Drain the pool. We're getting errors back. They should all be nil - var mux sync.Mutex - merr := make(multiError, 0) - wp.Drain(ctx, func(ctx context.Context, in interface{}) bool { - if in != nil { - mux.Lock() - defer mux.Unlock() - merr = append(merr, in.(error)) - return false - } - return true - }) log.Printf("D! [input.vsphere] Latest sample for %s set to %s", resourceType, latestSample) if !latestSample.IsZero() { e.lastColls[resourceType] = latestSample } sw.Stop() SendInternalCounterWithTags("gather_count", e.URL.Host, internalTags, count) - if len(merr) > 0 { - return merr - } return nil } -func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, resourceType string, - res *resourceKind, acc telegraf.Accumulator) (int, time.Time, error) { +func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, res *resourceKind, acc telegraf.Accumulator) (int, time.Time, error) { + log.Printf("D! [input.vsphere] Query for %s has %d QuerySpecs", res.name, len(pqs)) latestSample := time.Time{} count := 0 + resourceType := res.name prefix := "vsphere" + e.Parent.Separator + resourceType client, err := e.clientFactory.GetClient(ctx) diff --git a/plugins/inputs/vsphere/throttled_exec.go b/plugins/inputs/vsphere/throttled_exec.go new file mode 100644 index 0000000000000..83586dd4eeec0 --- /dev/null +++ b/plugins/inputs/vsphere/throttled_exec.go @@ -0,0 +1,35 @@ +package vsphere + +import "sync" + +// ThrottledExecutor provides a simple mechanism for running jobs in separate +// goroutines while limit the number of concurrent jobs running at any given time. +type ThrottledExecutor struct { + limiter chan struct{} + wg sync.WaitGroup +} + +// NewThrottledExecutor creates a new ThrottlesExecutor with a specified maximum +// number of concurrent jobs +func NewThrottledExecutor(limit int) *ThrottledExecutor { + return &ThrottledExecutor{limiter: make(chan struct{}, limit)} +} + +// Run schedules a job for execution as soon as possible while respecting the +// maximum concurrency limit. +func (t *ThrottledExecutor) Run(job func()) { + t.wg.Add(1) + t.limiter <- struct{}{} + go func() { + defer func() { + <-t.limiter + }() + defer t.wg.Done() + job() + }() +} + +// Wait blocks until all scheduled jobs have finished +func (t *ThrottledExecutor) Wait() { + t.wg.Wait() +} diff --git a/plugins/inputs/vsphere/vsphere.go b/plugins/inputs/vsphere/vsphere.go index f0bb5dca99c38..23ce52ed56f7c 100644 --- a/plugins/inputs/vsphere/vsphere.go +++ b/plugins/inputs/vsphere/vsphere.go @@ -260,7 +260,6 @@ func (v *VSphere) Stop() { // Gather is the main data collection function called by the Telegraf core. It performs all // the data collection and writes all metrics into the Accumulator passed as an argument. func (v *VSphere) Gather(acc telegraf.Accumulator) error { - merr := make(multiError, 0) var wg sync.WaitGroup for _, ep := range v.endpoints { wg.Add(1) @@ -274,15 +273,11 @@ func (v *VSphere) Gather(acc telegraf.Accumulator) error { } if err != nil { acc.AddError(err) - merr = append(merr, err) } }(ep) } wg.Wait() - if len(merr) > 0 { - return merr - } return nil } diff --git a/plugins/inputs/vsphere/vsphere_test.go b/plugins/inputs/vsphere/vsphere_test.go index 4eb3d28f810e6..f87d93d0f4670 100644 --- a/plugins/inputs/vsphere/vsphere_test.go +++ b/plugins/inputs/vsphere/vsphere_test.go @@ -7,6 +7,8 @@ import ( "regexp" "sort" "strings" + "sync" + "sync/atomic" "testing" "time" @@ -205,29 +207,33 @@ func TestParseConfig(t *testing.T) { } func TestWorkerPool(t *testing.T) { - wp := NewWorkerPool(100) - ctx := context.Background() - wp.Run(ctx, func(ctx context.Context, p interface{}) interface{} { - return p.(int) * 2 - }, 10) - - n := 100000 - wp.Fill(ctx, func(ctx context.Context, f PushFunc) { - for i := 0; i < n; i++ { - f(ctx, i) - } - }) - results := make([]int, n) - i := 0 - wp.Drain(ctx, func(ctx context.Context, p interface{}) bool { - results[i] = p.(int) - i++ - return true - }) + max := int64(0) + ngr := int64(0) + n := 10000 + var mux sync.Mutex + results := make([]int, 0, n) + te := NewThrottledExecutor(5) + for i := 0; i < n; i++ { + func(i int) { + te.Run(func() { + atomic.AddInt64(&ngr, 1) + mux.Lock() + defer mux.Unlock() + results = append(results, i*2) + if ngr > max { + max = ngr + } + time.Sleep(100 * time.Microsecond) + atomic.AddInt64(&ngr, -1) + }) + }(i) + } + te.Wait() sort.Ints(results) for i := 0; i < n; i++ { - require.Equal(t, results[i], i*2) + require.Equal(t, results[i], i*2, "Some jobs didn't run") } + require.Equal(t, int64(5), max, "Wrong number of goroutines spawned") } func TestTimeout(t *testing.T) { diff --git a/plugins/inputs/vsphere/workerpool.go b/plugins/inputs/vsphere/workerpool.go deleted file mode 100644 index 6695735ce3a22..0000000000000 --- a/plugins/inputs/vsphere/workerpool.go +++ /dev/null @@ -1,119 +0,0 @@ -package vsphere - -import ( - "context" - "log" - "sync" -) - -// WorkerFunc is a function that is supposed to do the actual work -// of the WorkerPool. It is similar to the "map" portion of the -// map/reduce semantics, in that it takes a single value as an input, -// does some processing and returns a single result. -type WorkerFunc func(context.Context, interface{}) interface{} - -// PushFunc is called from a FillerFunc to push a workitem onto -// the input channel. Wraps some logic for gracefulk shutdowns. -type PushFunc func(context.Context, interface{}) bool - -// DrainerFunc represents a function used to "drain" the WorkerPool, -// i.e. pull out all the results generated by the workers and processing -// them. The DrainerFunc is called once per result produced. -// If the function returns false, the draining of the pool is aborted. -type DrainerFunc func(context.Context, interface{}) bool - -// FillerFunc represents a function for filling the WorkerPool with jobs. -// It is called once and is responsible for pushing jobs onto the supplied channel. -type FillerFunc func(context.Context, PushFunc) - -// WorkerPool implements a simple work pooling mechanism. It runs a predefined -// number of goroutines to process jobs. Jobs are inserted using the Fill call -// and results are retrieved through the Drain function. -type WorkerPool struct { - wg sync.WaitGroup - In chan interface{} - Out chan interface{} -} - -// NewWorkerPool creates a worker pool -func NewWorkerPool(bufsize int) *WorkerPool { - return &WorkerPool{ - In: make(chan interface{}, bufsize), - Out: make(chan interface{}, bufsize), - } -} - -func (w *WorkerPool) push(ctx context.Context, job interface{}) bool { - select { - case w.In <- job: - return true - case <-ctx.Done(): - return false - } -} - -func (w *WorkerPool) pushOut(ctx context.Context, result interface{}) bool { - select { - case w.Out <- result: - return true - case <-ctx.Done(): - return false - } -} - -// Run takes a WorkerFunc and runs it in 'n' goroutines. -func (w *WorkerPool) Run(ctx context.Context, f WorkerFunc, n int) bool { - w.wg.Add(1) - go func() { - defer w.wg.Done() - var localWg sync.WaitGroup - localWg.Add(n) - for i := 0; i < n; i++ { - go func() { - defer localWg.Done() - for { - select { - case job, ok := <-w.In: - if !ok { - return - } - w.pushOut(ctx, f(ctx, job)) - case <-ctx.Done(): - log.Printf("D! [input.vsphere]: Stop requested for worker pool. Exiting.") - return - } - } - }() - } - localWg.Wait() - close(w.Out) - }() - return ctx.Err() == nil -} - -// Fill runs a FillerFunc responsible for supplying work to the pool. You may only -// call Fill once. Calling it twice will panic. -func (w *WorkerPool) Fill(ctx context.Context, f FillerFunc) bool { - w.wg.Add(1) - go func() { - defer w.wg.Done() - f(ctx, w.push) - close(w.In) - }() - return true -} - -// Drain runs a DrainerFunc for each result generated by the workers. -func (w *WorkerPool) Drain(ctx context.Context, f DrainerFunc) bool { - w.wg.Add(1) - go func() { - defer w.wg.Done() - for result := range w.Out { - if !f(ctx, result) { - break - } - } - }() - w.wg.Wait() - return ctx.Err() != nil -} From 94c6fb6d838bec0433b86056193b8ec8f95c60d3 Mon Sep 17 00:00:00 2001 From: prydin Date: Wed, 5 Dec 2018 12:40:10 -0500 Subject: [PATCH 07/18] Changed cluster_instances default value to false, since true causes problems with some versions of vCenter --- plugins/inputs/vsphere/README.md | 6 ++--- plugins/inputs/vsphere/endpoint.go | 36 +++++++++++++++++++++--------- plugins/inputs/vsphere/vsphere.go | 4 ++-- 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/plugins/inputs/vsphere/README.md b/plugins/inputs/vsphere/README.md index 7ba323bc73e9e..4bccbb2c880e8 100644 --- a/plugins/inputs/vsphere/README.md +++ b/plugins/inputs/vsphere/README.md @@ -122,17 +122,17 @@ vm_metric_exclude = [ "*" ] ## Clusters # cluster_metric_include = [] ## if omitted or empty, all metrics are collected # cluster_metric_exclude = [] ## Nothing excluded by default - # cluster_instances = true ## true by default + # cluster_instances = false ## false by default ## Datastores # datastore_metric_include = [] ## if omitted or empty, all metrics are collected # datastore_metric_exclude = [] ## Nothing excluded by default - # datastore_instances = false ## false by default for Datastores only + # datastore_instances = false ## false by default ## Datacenters datacenter_metric_include = [] ## if omitted or empty, all metrics are collected datacenter_metric_exclude = [ "*" ] ## Datacenters are not collected by default. - # datacenter_instances = false ## false by default for Datastores only + # datacenter_instances = false ## false by default ## Plugin Settings ## separator character to use for measurement and field names (default: "_") diff --git a/plugins/inputs/vsphere/endpoint.go b/plugins/inputs/vsphere/endpoint.go index aba2a6ea08329..47d64156d4a35 100644 --- a/plugins/inputs/vsphere/endpoint.go +++ b/plugins/inputs/vsphere/endpoint.go @@ -26,11 +26,13 @@ import ( var isolateLUN = regexp.MustCompile(".*/([^/]+)/?$") -const metricLookback = 3 +const metricLookback = 3 // Number of time periods to look back at for non-realtime metrics -const rtMetricLookback = 3 +const rtMetricLookback = 3 // Number of time periods to look back at for realtime metrics -const maxSampleConst = 10 +const maxSampleConst = 10 // Absolute maximim number of samples regardless of period + +const maxMetadataSamples = 100 // Number of resources to sample for metric metadata // Endpoint is a high-level representation of a connected vCenter endpoint. It is backed by the lower // level Client type. @@ -449,14 +451,29 @@ func (e *Endpoint) simpleMetadataSelect(ctx context.Context, client *Client, res } func (e *Endpoint) complexMetadataSelect(ctx context.Context, res *resourceKind, objects objectMap, metricNames map[int32]string) { - prob := 100.0 / float64(len(objects)) - log.Printf("D! [input.vsphere] Probability of sampling a resource: %f", prob) - instInfoMux := sync.Mutex{} - te := NewThrottledExecutor(e.Parent.DiscoverConcurrency) + // We're only going to get metadata from maxMetadataSamples resources. If we have + // more resources than that, we pick maxMetadataSamples samples at random. + sampledObjects := make([]objectRef, len(objects)) + i := 0 for _, obj := range objects { - if rand.Float64() > prob { - continue + sampledObjects[i] = obj + i++ + } + n := len(sampledObjects) + if n > maxMetadataSamples { + // Shuffle samples into the maxMetadatSamples positions + for i := 0; i < maxMetadataSamples; i++ { + j := int(rand.Int31n(int32(i + 1))) + t := sampledObjects[i] + sampledObjects[i] = sampledObjects[j] + sampledObjects[j] = t } + sampledObjects = sampledObjects[0:maxMetadataSamples] + } + + instInfoMux := sync.Mutex{} + te := NewThrottledExecutor(e.Parent.DiscoverConcurrency) + for _, obj := range sampledObjects { func(obj objectRef) { te.Run(func() { metrics, err := e.getMetadata(ctx, obj, res.sampling) @@ -560,7 +577,6 @@ func getVMs(ctx context.Context, client *Client, e *Endpoint, root *view.Contain m := make(objectMap) for _, r := range resources { if r.Runtime.PowerState != "poweredOn" { - log.Printf("D! [input.vsphere] Skipped powered off VM: %s", r.Name) continue } guest := "unknown" diff --git a/plugins/inputs/vsphere/vsphere.go b/plugins/inputs/vsphere/vsphere.go index 23ce52ed56f7c..13186634fb51d 100644 --- a/plugins/inputs/vsphere/vsphere.go +++ b/plugins/inputs/vsphere/vsphere.go @@ -155,7 +155,7 @@ var sampleConfig = ` ## Clusters # cluster_metric_include = [] ## if omitted or empty, all metrics are collected # cluster_metric_exclude = [] ## Nothing excluded by default - # cluster_instances = true ## true by default + # cluster_instances = false ## false by default ## Datastores # datastore_metric_include = [] ## if omitted or empty, all metrics are collected @@ -286,7 +286,7 @@ func init() { return &VSphere{ Vcenters: []string{}, - ClusterInstances: true, + ClusterInstances: false, ClusterMetricInclude: nil, ClusterMetricExclude: nil, HostInstances: true, From 646c59609327c5ceea31575cfa7fff6b1367fc39 Mon Sep 17 00:00:00 2001 From: prydin Date: Wed, 5 Dec 2018 16:28:38 -0500 Subject: [PATCH 08/18] Fixed broken test cases --- plugins/inputs/vsphere/throttled_exec.go | 3 +++ plugins/inputs/vsphere/vsphere_test.go | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/vsphere/throttled_exec.go b/plugins/inputs/vsphere/throttled_exec.go index 83586dd4eeec0..a7e07bedb7c67 100644 --- a/plugins/inputs/vsphere/throttled_exec.go +++ b/plugins/inputs/vsphere/throttled_exec.go @@ -12,6 +12,9 @@ type ThrottledExecutor struct { // NewThrottledExecutor creates a new ThrottlesExecutor with a specified maximum // number of concurrent jobs func NewThrottledExecutor(limit int) *ThrottledExecutor { + if limit == 0 { + panic("Limit must be > 0") + } return &ThrottledExecutor{limiter: make(chan struct{}, limit)} } diff --git a/plugins/inputs/vsphere/vsphere_test.go b/plugins/inputs/vsphere/vsphere_test.go index f87d93d0f4670..607493f5c0a5d 100644 --- a/plugins/inputs/vsphere/vsphere_test.go +++ b/plugins/inputs/vsphere/vsphere_test.go @@ -177,6 +177,8 @@ func defaultVSphere() *VSphere { ObjectDiscoveryInterval: internal.Duration{Duration: time.Second * 300}, Timeout: internal.Duration{Duration: time.Second * 20}, ForceDiscoverOnInit: true, + DiscoverConcurrency: 1, + CollectConcurrency: 1, } } @@ -251,7 +253,7 @@ func TestTimeout(t *testing.T) { require.NoError(t, v.Start(nil)) // We're not using the Accumulator, so it can be nil. defer v.Stop() err = v.Gather(&acc) - require.NotNil(t, err, "Error should not be nil here") + require.True(t, len(acc.Errors) > 0, "Errors should not be empty here") // The accumulator must contain exactly one error and it must be a deadline exceeded. require.Equal(t, 1, len(acc.Errors)) From 9ab5b945f5cdd5023f9da56956fb0fb71fdb5077 Mon Sep 17 00:00:00 2001 From: prydin Date: Thu, 6 Dec 2018 11:02:12 -0500 Subject: [PATCH 09/18] Reverted accidental change to wavefront.go --- plugins/outputs/wavefront/wavefront.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/outputs/wavefront/wavefront.go b/plugins/outputs/wavefront/wavefront.go index df1d42158dc07..ef36d1804045f 100644 --- a/plugins/outputs/wavefront/wavefront.go +++ b/plugins/outputs/wavefront/wavefront.go @@ -122,11 +122,11 @@ func (w *Wavefront) Write(metrics []telegraf.Metric) error { return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error()) } defer connection.Close() + connection.SetWriteDeadline(time.Now().Add(5 * time.Second)) for _, m := range metrics { for _, metricPoint := range buildMetrics(m, w) { metricLine := formatMetricPoint(metricPoint, w) - connection.SetWriteDeadline(time.Now().Add(30 * time.Second)) _, err := connection.Write([]byte(metricLine)) if err != nil { return fmt.Errorf("Wavefront: TCP writing error %s", err.Error()) From 466b1399869b4b6c2d851f36a19d7fb7e1f47b50 Mon Sep 17 00:00:00 2001 From: prydin Date: Fri, 7 Dec 2018 18:30:53 -0500 Subject: [PATCH 10/18] Added check for value indices --- plugins/inputs/vsphere/endpoint.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/vsphere/endpoint.go b/plugins/inputs/vsphere/endpoint.go index 47d64156d4a35..c68f18bd47d69 100644 --- a/plugins/inputs/vsphere/endpoint.go +++ b/plugins/inputs/vsphere/endpoint.go @@ -861,8 +861,13 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, avg := float64(0) nValues := 0 - //log.Printf("D! [input.vsphere] %s %d samples", name, len(v.Value)) for idx, sample := range em.SampleInfo { + // According to the docs, SampleInfo and Value should have the same length, but we've seen corrupted + // data coming back with missing values. Take care of that gracefully! + if idx >= len(v.Value) { + log.Printf("D! [input.vsphere] len(SampleInfo)>len(Value) %d > %d", len(em.SampleInfo), len(v.Value)) + break + } value := float64(v.Value[idx]) if value < 0 { continue From bd3fe0d0773fa9824dc7f4054e8738c92e67dc9d Mon Sep 17 00:00:00 2001 From: prydin Date: Mon, 10 Dec 2018 16:57:23 -0500 Subject: [PATCH 11/18] More robust panic handling --- Gopkg.lock | 6 +++--- Gopkg.toml | 2 +- plugins/inputs/vsphere/endpoint.go | 15 +++++++++++++-- plugins/inputs/vsphere/throttled_exec.go | 8 ++++++-- plugins/inputs/vsphere/vsphere.go | 1 + plugins/inputs/vsphere/vsphere_test.go | 3 ++- 6 files changed, 26 insertions(+), 9 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index a2df3c81dfc55..5cfc176af9258 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1043,7 +1043,7 @@ version = "v1.0.0" [[projects]] - digest = "1:f9fe29bf856d49f9a51d6001588cb5ee5d65c8a7ff5e8b0dd5423c3a510f0833" + digest = "1:6af52ce6dae9a912aa3113f247a63cd82599760ddc328a6721c3ef0426d31ca2" name = "github.com/vmware/govmomi" packages = [ ".", @@ -1069,8 +1069,8 @@ "vim25/xml", ] pruneopts = "" - revision = "e3a01f9611c32b2362366434bcd671516e78955d" - version = "v0.18.0" + revision = "3617f28d167d448f93f282a867870f109516d2a5" + version = "v0.19.0" [[projects]] branch = "master" diff --git a/Gopkg.toml b/Gopkg.toml index 791e265e82da4..835bae18aa233 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -228,7 +228,7 @@ [[constraint]] name = "github.com/vmware/govmomi" - version = "0.18.0" + version = "0.19.0" [[constraint]] name = "github.com/Azure/go-autorest" diff --git a/plugins/inputs/vsphere/endpoint.go b/plugins/inputs/vsphere/endpoint.go index c68f18bd47d69..8efb9c6397345 100644 --- a/plugins/inputs/vsphere/endpoint.go +++ b/plugins/inputs/vsphere/endpoint.go @@ -233,6 +233,7 @@ func isSimple(include []string, exclude []string) bool { func (e *Endpoint) startDiscovery(ctx context.Context) { e.discoveryTicker = time.NewTicker(e.Parent.ObjectDiscoveryInterval.Duration) go func() { + defer HandlePanic() for { select { case <-e.discoveryTicker.C: @@ -270,7 +271,10 @@ func (e *Endpoint) init(ctx context.Context) error { } else { // Otherwise, just run it in the background. We'll probably have an incomplete first metric // collection this way. - go e.initalDiscovery(ctx) + go func() { + defer HandlePanic() + e.initalDiscovery(ctx) + }() } } e.initialized = true @@ -621,6 +625,7 @@ func (e *Endpoint) Close() { // Collect runs a round of data collections as specified in the configuration. func (e *Endpoint) Collect(ctx context.Context, acc telegraf.Accumulator) error { + // If we never managed to do a discovery, collection will be a no-op. Therefore, // we need to check that a connection is available, or the collection will // silently fail. @@ -647,6 +652,7 @@ func (e *Endpoint) Collect(ctx context.Context, acc telegraf.Accumulator) error if res.enabled { wg.Add(1) go func(k string) { + defer HandlePanic() defer wg.Done() err := e.collectResource(ctx, k, acc) if err != nil { @@ -785,9 +791,14 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc var tsMux sync.Mutex latestSample := time.Time{} - // Set up a worker pool for collecting chunk metrics + + // Divide workload into chunks and process them concurrently e.chunkify(ctx, &res, now, latest, acc, func(chunk []types.PerfQuerySpec) { + + // Handle panics gracefully + defer HandlePanicWithAcc(acc) + n, localLatest, err := e.collectChunk(ctx, chunk, &res, acc) log.Printf("D! [input.vsphere] CollectChunk for %s returned %d metrics", resourceType, n) if err != nil { diff --git a/plugins/inputs/vsphere/throttled_exec.go b/plugins/inputs/vsphere/throttled_exec.go index a7e07bedb7c67..15f66c5ab5d7e 100644 --- a/plugins/inputs/vsphere/throttled_exec.go +++ b/plugins/inputs/vsphere/throttled_exec.go @@ -1,6 +1,8 @@ package vsphere -import "sync" +import ( + "sync" +) // ThrottledExecutor provides a simple mechanism for running jobs in separate // goroutines while limit the number of concurrent jobs running at any given time. @@ -24,10 +26,12 @@ func (t *ThrottledExecutor) Run(job func()) { t.wg.Add(1) t.limiter <- struct{}{} go func() { + // Last resort panic handler. + defer HandlePanic() + defer t.wg.Done() defer func() { <-t.limiter }() - defer t.wg.Done() job() }() } diff --git a/plugins/inputs/vsphere/vsphere.go b/plugins/inputs/vsphere/vsphere.go index 13186634fb51d..5ceb14a7fd238 100644 --- a/plugins/inputs/vsphere/vsphere.go +++ b/plugins/inputs/vsphere/vsphere.go @@ -264,6 +264,7 @@ func (v *VSphere) Gather(acc telegraf.Accumulator) error { for _, ep := range v.endpoints { wg.Add(1) go func(endpoint *Endpoint) { + defer HandlePanicWithAcc(acc) defer wg.Done() err := endpoint.Collect(context.Background(), acc) if err == context.Canceled { diff --git a/plugins/inputs/vsphere/vsphere_test.go b/plugins/inputs/vsphere/vsphere_test.go index 607493f5c0a5d..1f05f36bf9b41 100644 --- a/plugins/inputs/vsphere/vsphere_test.go +++ b/plugins/inputs/vsphere/vsphere_test.go @@ -308,7 +308,8 @@ func TestAll(t *testing.T) { var acc testutil.Accumulator v := defaultVSphere() v.Vcenters = []string{s.URL.String()} - v.Start(nil) // We're not using the Accumulator, so it can be nil. + v.Start(&acc) defer v.Stop() require.NoError(t, v.Gather(&acc)) + require.Equal(t, 0, len(acc.Errors), fmt.Sprintf("Errors found: %s", acc.Errors)) } From 3ede8cc6ae8e31f7f39814bd8ec2ae3f4a27cf28 Mon Sep 17 00:00:00 2001 From: prydin Date: Mon, 10 Dec 2018 16:58:20 -0500 Subject: [PATCH 12/18] Added panic_handler.go --- plugins/inputs/vsphere/panic_handler.go | 28 +++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 plugins/inputs/vsphere/panic_handler.go diff --git a/plugins/inputs/vsphere/panic_handler.go b/plugins/inputs/vsphere/panic_handler.go new file mode 100644 index 0000000000000..1d6242510a972 --- /dev/null +++ b/plugins/inputs/vsphere/panic_handler.go @@ -0,0 +1,28 @@ +package vsphere + +import ( + "errors" + "fmt" + "log" + + "github.com/influxdata/telegraf" +) + +func HandlePanicWithAcc(acc telegraf.Accumulator) { + if p := recover(); p != nil { + switch p.(type) { + case string: + acc.AddError(errors.New(p.(string))) + case error: + acc.AddError(p.(error)) + default: + acc.AddError(fmt.Errorf("Unknown panic: %s", p)) + } + } +} + +func HandlePanic() { + if p := recover(); p != nil { + log.Printf("E! [input.vsphere] PANIC (recovered): %s", p) + } +} From 957762d31ee4e5e6250e8c7b353285c27a3b970e Mon Sep 17 00:00:00 2001 From: prydin Date: Mon, 10 Dec 2018 17:27:42 -0500 Subject: [PATCH 13/18] Reverted to govmomi 0.18.0 --- Gopkg.toml | 2 +- plugins/inputs/vsphere/endpoint.go | 10 ---------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/Gopkg.toml b/Gopkg.toml index 835bae18aa233..791e265e82da4 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -228,7 +228,7 @@ [[constraint]] name = "github.com/vmware/govmomi" - version = "0.19.0" + version = "0.18.0" [[constraint]] name = "github.com/Azure/go-autorest" diff --git a/plugins/inputs/vsphere/endpoint.go b/plugins/inputs/vsphere/endpoint.go index 8efb9c6397345..6caebb4f77412 100644 --- a/plugins/inputs/vsphere/endpoint.go +++ b/plugins/inputs/vsphere/endpoint.go @@ -86,16 +86,6 @@ type objectRef struct { dcname string } -type metricQRequest struct { - res *resourceKind - obj objectRef -} - -type metricQResponse struct { - obj objectRef - metrics *performance.MetricList -} - func (e *Endpoint) getParent(obj *objectRef, res *resourceKind) (*objectRef, bool) { if pKind, ok := e.resourceKinds[res.parent]; ok { if p, ok := pKind.objects[obj.parentRef.Value]; ok { From f563cd8138a10036ea4441fc290835e8c67d109a Mon Sep 17 00:00:00 2001 From: prydin Date: Tue, 11 Dec 2018 10:05:38 -0500 Subject: [PATCH 14/18] Exclude tests requiring VPX simulator on 32-bit arch --- Gopkg.toml | 2 +- plugins/inputs/vsphere/vsphere_test.go | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/Gopkg.toml b/Gopkg.toml index 791e265e82da4..835bae18aa233 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -228,7 +228,7 @@ [[constraint]] name = "github.com/vmware/govmomi" - version = "0.18.0" + version = "0.19.0" [[constraint]] name = "github.com/Azure/go-autorest" diff --git a/plugins/inputs/vsphere/vsphere_test.go b/plugins/inputs/vsphere/vsphere_test.go index 1f05f36bf9b41..0da0681cc0a93 100644 --- a/plugins/inputs/vsphere/vsphere_test.go +++ b/plugins/inputs/vsphere/vsphere_test.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "testing" "time" + "unsafe" "github.com/influxdata/telegraf/internal" itls "github.com/influxdata/telegraf/internal/tls" @@ -239,6 +240,13 @@ func TestWorkerPool(t *testing.T) { } func TestTimeout(t *testing.T) { + // Don't run test on 32-bit machines due to bug in simulator. + // https://github.com/vmware/govmomi/issues/1330 + var i int + if unsafe.Sizeof(i) < 8 { + return + } + m, s, err := createSim() if err != nil { t.Fatal(err) @@ -261,6 +269,12 @@ func TestTimeout(t *testing.T) { } func TestMaxQuery(t *testing.T) { + // Don't run test on 32-bit machines due to bug in simulator. + // https://github.com/vmware/govmomi/issues/1330 + var i int + if unsafe.Sizeof(i) < 8 { + return + } m, s, err := createSim() if err != nil { t.Fatal(err) @@ -298,6 +312,13 @@ func TestMaxQuery(t *testing.T) { } func TestAll(t *testing.T) { + // Don't run test on 32-bit machines due to bug in simulator. + // https://github.com/vmware/govmomi/issues/1330 + var i int + if unsafe.Sizeof(i) < 8 { + return + } + m, s, err := createSim() if err != nil { t.Fatal(err) From 180a7bfc5ec6e6e79ddea747be40928321e1d02b Mon Sep 17 00:00:00 2001 From: prydin Date: Fri, 21 Dec 2018 11:25:38 -0500 Subject: [PATCH 15/18] Added cancel-handler to ThrottledExecutor, removed unnecessary warning from GetClient(), fixed discovered object counting --- plugins/inputs/vsphere/client.go | 6 ++---- plugins/inputs/vsphere/endpoint.go | 17 +++++++++++------ plugins/inputs/vsphere/throttled_exec.go | 17 +++++++++++------ 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/plugins/inputs/vsphere/client.go b/plugins/inputs/vsphere/client.go index 37f4b2c31776e..485361c34d2f8 100644 --- a/plugins/inputs/vsphere/client.go +++ b/plugins/inputs/vsphere/client.go @@ -10,8 +10,6 @@ import ( "sync" "time" - "github.com/vmware/govmomi/vim25/types" - "github.com/vmware/govmomi" "github.com/vmware/govmomi/object" "github.com/vmware/govmomi/performance" @@ -20,6 +18,7 @@ import ( "github.com/vmware/govmomi/vim25" "github.com/vmware/govmomi/vim25/methods" "github.com/vmware/govmomi/vim25/soap" + "github.com/vmware/govmomi/vim25/types" ) // The highest number of metrics we can query for, no matter what settings @@ -78,8 +77,7 @@ func (cf *ClientFactory) GetClient(ctx context.Context) (*Client, error) { ctx2, cancel2 := context.WithTimeout(ctx, cf.parent.Timeout.Duration) defer cancel2() if cf.client.Client.SessionManager.Login(ctx2, url.UserPassword(cf.parent.Username, cf.parent.Password)) != nil { - log.Printf("W! [input.vsphere]: Client reauthentication failed.") - return nil, err + return nil.fmt.Errorf("Renewing authentication failed: %v", err) } } diff --git a/plugins/inputs/vsphere/endpoint.go b/plugins/inputs/vsphere/endpoint.go index 6caebb4f77412..9df23264348ac 100644 --- a/plugins/inputs/vsphere/endpoint.go +++ b/plugins/inputs/vsphere/endpoint.go @@ -363,6 +363,8 @@ func (e *Endpoint) discover(ctx context.Context) error { resourceKinds := make(map[string]resourceKind) dcNameCache := make(map[string]string) + numRes := int64(0) + // Populate resource objects, and endpoint instance info. for k, res := range e.resourceKinds { log.Printf("D! [input.vsphere] Discovering resources for %s", res.name) @@ -393,6 +395,9 @@ func (e *Endpoint) discover(ctx context.Context) error { } res.objects = objects resourceKinds[k] = res + + SendInternalCounterWithTags("discovered_objects", e.URL.Host, map[string]string{"type": res.name}, int64(len(objects))) + numRes += int64(len(objects)) } } @@ -415,7 +420,7 @@ func (e *Endpoint) discover(ctx context.Context) error { e.lun2ds = l2d sw.Stop() - // SendInternalCounter("discovered_objects", e.URL.Host, int64(len(instInfo))) TODO: Count the correct way + SendInternalCounterWithTags("discovered_objects", e.URL.Host, map[string]string{"type": "instance-total"}, numRes) return nil } @@ -469,7 +474,7 @@ func (e *Endpoint) complexMetadataSelect(ctx context.Context, res *resourceKind, te := NewThrottledExecutor(e.Parent.DiscoverConcurrency) for _, obj := range sampledObjects { func(obj objectRef) { - te.Run(func() { + te.Run(ctx, func() { metrics, err := e.getMetadata(ctx, obj, res.sampling) if err != nil { log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err) @@ -659,8 +664,8 @@ func (e *Endpoint) Collect(ctx context.Context, acc telegraf.Accumulator) error } // Workaround to make sure pqs is a copy of the loop variable and won't change. -func submitChunkJob(te *ThrottledExecutor, job func([]types.PerfQuerySpec), pqs []types.PerfQuerySpec) { - te.Run(func() { +func submitChunkJob(ctx context.Context, te *ThrottledExecutor, job func([]types.PerfQuerySpec), pqs []types.PerfQuerySpec) { + te.Run(ctx, func() { job(pqs) }) } @@ -725,7 +730,7 @@ func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Tim } // Run collection job - submitChunkJob(te, job, pqs) + submitChunkJob(ctx, te, job, pqs) pqs = make([]types.PerfQuerySpec, 0, e.Parent.MaxQueryObjects) metrics = 0 nRes = 0 @@ -739,7 +744,7 @@ func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Tim // Run collection job log.Printf("D! [input.vsphere]: Queuing query: %d objects, %d metrics (0 remaining) of type %s for %s. Total objects %d (final chunk)", len(pqs), metrics, res.name, e.URL.Host, len(res.objects)) - submitChunkJob(te, job, pqs) + submitChunkJob(ctx, te, job, pqs) } // Wait for background collection to finish diff --git a/plugins/inputs/vsphere/throttled_exec.go b/plugins/inputs/vsphere/throttled_exec.go index 15f66c5ab5d7e..0259a6de42ea8 100644 --- a/plugins/inputs/vsphere/throttled_exec.go +++ b/plugins/inputs/vsphere/throttled_exec.go @@ -1,6 +1,7 @@ package vsphere import ( + "context" "sync" ) @@ -22,17 +23,21 @@ func NewThrottledExecutor(limit int) *ThrottledExecutor { // Run schedules a job for execution as soon as possible while respecting the // maximum concurrency limit. -func (t *ThrottledExecutor) Run(job func()) { +func (t *ThrottledExecutor) Run(ctx context.Context, job func()) { t.wg.Add(1) - t.limiter <- struct{}{} go func() { // Last resort panic handler. defer HandlePanic() defer t.wg.Done() - defer func() { - <-t.limiter - }() - job() + select { + case t.limiter <- struct{}{}: + defer func() { + <-t.limiter + }() + job() + case <-ctx.Done(): + return + } }() } From a0ca6479c1806df498a85cd2bffc5672da48f110 Mon Sep 17 00:00:00 2001 From: prydin Date: Fri, 21 Dec 2018 11:33:31 -0500 Subject: [PATCH 16/18] Fixed test case issues --- plugins/inputs/vsphere/client.go | 3 ++- plugins/inputs/vsphere/vsphere_test.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/vsphere/client.go b/plugins/inputs/vsphere/client.go index 485361c34d2f8..8b1c4866ac9df 100644 --- a/plugins/inputs/vsphere/client.go +++ b/plugins/inputs/vsphere/client.go @@ -3,6 +3,7 @@ package vsphere import ( "context" "crypto/tls" + "fmt" "log" "net/url" "strconv" @@ -77,7 +78,7 @@ func (cf *ClientFactory) GetClient(ctx context.Context) (*Client, error) { ctx2, cancel2 := context.WithTimeout(ctx, cf.parent.Timeout.Duration) defer cancel2() if cf.client.Client.SessionManager.Login(ctx2, url.UserPassword(cf.parent.Username, cf.parent.Password)) != nil { - return nil.fmt.Errorf("Renewing authentication failed: %v", err) + return nil, fmt.Errorf("Renewing authentication failed: %v", err) } } diff --git a/plugins/inputs/vsphere/vsphere_test.go b/plugins/inputs/vsphere/vsphere_test.go index 0da0681cc0a93..a4b931bd96568 100644 --- a/plugins/inputs/vsphere/vsphere_test.go +++ b/plugins/inputs/vsphere/vsphere_test.go @@ -218,7 +218,7 @@ func TestWorkerPool(t *testing.T) { te := NewThrottledExecutor(5) for i := 0; i < n; i++ { func(i int) { - te.Run(func() { + te.Run(context.Background(), func() { atomic.AddInt64(&ngr, 1) mux.Lock() defer mux.Unlock() From 1eb24a3bb02ad2ec7aeec09a3c32270fa85162b6 Mon Sep 17 00:00:00 2001 From: prydin Date: Sat, 22 Dec 2018 09:03:56 -0500 Subject: [PATCH 17/18] Back-ported timestamping fixes from pontus-issue-4790 --- plugins/inputs/vsphere/endpoint.go | 156 ++++++++++++++++++----------- 1 file changed, 96 insertions(+), 60 deletions(-) diff --git a/plugins/inputs/vsphere/endpoint.go b/plugins/inputs/vsphere/endpoint.go index 9df23264348ac..aacf5bb2d20a4 100644 --- a/plugins/inputs/vsphere/endpoint.go +++ b/plugins/inputs/vsphere/endpoint.go @@ -39,8 +39,6 @@ const maxMetadataSamples = 100 // Number of resources to sample for metric metad type Endpoint struct { Parent *VSphere URL *url.URL - lastColls map[string]time.Time - lastColl time.Time resourceKinds map[string]resourceKind hwMarks *TSCache lun2ds map[string]string @@ -66,6 +64,8 @@ type resourceKind struct { collectInstances bool parent string getObjects func(context.Context, *Client, *Endpoint, *view.ContainerView) (objectMap, error) + latestSample time.Time + lastColl time.Time } type metricEntry struct { @@ -101,7 +101,6 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, e := Endpoint{ URL: url, Parent: parent, - lastColls: make(map[string]time.Time), hwMarks: NewTSCache(1 * time.Hour), lun2ds: make(map[string]string), initialized: false, @@ -761,25 +760,34 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc if err != nil { return err } - latest, hasLatest := e.lastColls[resourceType] - if hasLatest { + + // Estimate the interval at which we're invoked. Use local time (not server time) + // since this is about how we got invoked locally. + localNow := time.Now() + estInterval := time.Duration(time.Minute) + if !res.lastColl.IsZero() { + estInterval = localNow.Sub(res.lastColl).Truncate(time.Duration(res.sampling) * time.Second) + } + log.Printf("D! [inputs.vsphere] Interval estimated to %s", estInterval) + + latest := res.latestSample + if !latest.IsZero() { elapsed := now.Sub(latest).Seconds() + 5.0 // Allow 5 second jitter. - log.Printf("D! [input.vsphere]: Latest: %s, elapsed: %f, resource: %s", latest, elapsed, resourceType) + log.Printf("D! [inputs.vsphere]: Latest: %s, elapsed: %f, resource: %s", latest, elapsed, resourceType) if !res.realTime && elapsed < float64(res.sampling) { // No new data would be available. We're outta here! - log.Printf("D! [input.vsphere]: Sampling period for %s of %d has not elapsed on %s", + log.Printf("D! [inputs.vsphere]: Sampling period for %s of %d has not elapsed on %s", resourceType, res.sampling, e.URL.Host) return nil } } else { latest = now.Add(time.Duration(-res.sampling) * time.Second) } - e.lastColl = now internalTags := map[string]string{"resourcetype": resourceType} sw := NewStopwatchWithTags("gather_duration", e.URL.Host, internalTags) - log.Printf("D! [input.vsphere]: Collecting metrics for %d objects of type %s for %s", + log.Printf("D! [inputs.vsphere]: Collecting metrics for %d objects of type %s for %s", len(res.objects), resourceType, e.URL.Host) count := int64(0) @@ -794,8 +802,8 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc // Handle panics gracefully defer HandlePanicWithAcc(acc) - n, localLatest, err := e.collectChunk(ctx, chunk, &res, acc) - log.Printf("D! [input.vsphere] CollectChunk for %s returned %d metrics", resourceType, n) + n, localLatest, err := e.collectChunk(ctx, chunk, &res, acc, now, estInterval) + log.Printf("D! [inputs.vsphere] CollectChunk for %s returned %d metrics", resourceType, n) if err != nil { acc.AddError(errors.New("While collecting " + res.name + ": " + err.Error())) } @@ -807,17 +815,56 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc } }) - log.Printf("D! [input.vsphere] Latest sample for %s set to %s", resourceType, latestSample) + log.Printf("D! [inputs.vsphere] Latest sample for %s set to %s", resourceType, latestSample) if !latestSample.IsZero() { - e.lastColls[resourceType] = latestSample + res.latestSample = latestSample } sw.Stop() SendInternalCounterWithTags("gather_count", e.URL.Host, internalTags, count) return nil } -func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, res *resourceKind, acc telegraf.Accumulator) (int, time.Time, error) { - log.Printf("D! [input.vsphere] Query for %s has %d QuerySpecs", res.name, len(pqs)) +func alignSamples(info []types.PerfSampleInfo, values []int64, interval time.Duration) ([]types.PerfSampleInfo, []float64) { + rInfo := make([]types.PerfSampleInfo, 0, len(info)) + rValues := make([]float64, 0, len(values)) + bi := 1.0 + var lastBucket time.Time + for idx := range info { + // According to the docs, SampleInfo and Value should have the same length, but we've seen corrupted + // data coming back with missing values. Take care of that gracefully! + if idx >= len(values) { + log.Printf("D! [inputs.vsphere] len(SampleInfo)>len(Value) %d > %d", len(info), len(values)) + break + } + v := float64(values[idx]) + if v < 0 { + continue + } + ts := info[idx].Timestamp + roundedTs := ts.Truncate(interval) + + // Are we still working on the same bucket? + if roundedTs == lastBucket { + bi++ + p := len(rValues) - 1 + rValues[p] = ((bi-1)/bi)*float64(rValues[p]) + v/bi + } else { + rValues = append(rValues, v) + roundedInfo := types.PerfSampleInfo{ + Timestamp: roundedTs, + Interval: info[idx].Interval, + } + rInfo = append(rInfo, roundedInfo) + bi = 1.0 + lastBucket = roundedTs + } + } + //log.Printf("D! [inputs.vsphere] Aligned samples: %d collapsed into %d", len(info), len(rInfo)) + return rInfo, rValues +} + +func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, res *resourceKind, acc telegraf.Accumulator, now time.Time, interval time.Duration) (int, time.Time, error) { + log.Printf("D! [inputs.vsphere] Query for %s has %d QuerySpecs", res.name, len(pqs)) latestSample := time.Time{} count := 0 resourceType := res.name @@ -838,14 +885,14 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, return count, latestSample, err } - log.Printf("D! [input.vsphere] Query for %s returned metrics for %d objects", resourceType, len(ems)) + log.Printf("D! [inputs.vsphere] Query for %s returned metrics for %d objects", resourceType, len(ems)) // Iterate through results for _, em := range ems { moid := em.Entity.Reference().Value instInfo, found := res.objects[moid] if !found { - log.Printf("E! [input.vsphere]: MOID %s not found in cache. Skipping! (This should not happen!)", moid) + log.Printf("E! [inputs.vsphere]: MOID %s not found in cache. Skipping! (This should not happen!)", moid) continue } buckets := make(map[string]metricEntry) @@ -860,67 +907,56 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, // Populate tags objectRef, ok := res.objects[moid] if !ok { - log.Printf("E! [input.vsphere]: MOID %s not found in cache. Skipping", moid) + log.Printf("E! [inputs.vsphere]: MOID %s not found in cache. Skipping", moid) continue } e.populateTags(&objectRef, resourceType, res, t, &v) - avg := float64(0) nValues := 0 - for idx, sample := range em.SampleInfo { + alignedInfo, alignedValues := alignSamples(em.SampleInfo, v.Value, interval) // TODO: Estimate interval + + for idx, sample := range alignedInfo { // According to the docs, SampleInfo and Value should have the same length, but we've seen corrupted // data coming back with missing values. Take care of that gracefully! - if idx >= len(v.Value) { - log.Printf("D! [input.vsphere] len(SampleInfo)>len(Value) %d > %d", len(em.SampleInfo), len(v.Value)) + if idx >= len(alignedValues) { + log.Printf("D! [inputs.vsphere] len(SampleInfo)>len(Value) %d > %d", len(alignedInfo), len(alignedValues)) break } - value := float64(v.Value[idx]) - if value < 0 { - continue - } ts := sample.Timestamp if ts.After(latestSample) { latestSample = ts } - avg += float64(value) nValues++ - } - if nValues == 0 { - log.Printf("D! [input.vsphere]: Missing value for: %s, %s", name, objectRef.name) - continue - } - // If we're catching up with metrics arriving late, calculate the average - // of them and pick the midpoint timestamp. This is a reasonable way of - // filling in missed collections that doesn't cause us to deliver metrics - // faster than the interval. - avg /= float64(nValues) - midTs := em.SampleInfo[len(em.SampleInfo)/2].Timestamp - - // Organize the metrics into a bucket per measurement. - mn, fn := e.makeMetricIdentifier(prefix, name) - bKey := mn + " " + v.Instance + " " + strconv.FormatInt(midTs.UnixNano(), 10) - bucket, found := buckets[bKey] - if !found { - bucket = metricEntry{name: mn, ts: midTs, fields: make(map[string]interface{}), tags: t} - buckets[bKey] = bucket - } + // Organize the metrics into a bucket per measurement. + mn, fn := e.makeMetricIdentifier(prefix, name) + bKey := mn + " " + v.Instance + " " + strconv.FormatInt(ts.UnixNano(), 10) + bucket, found := buckets[bKey] + if !found { + bucket = metricEntry{name: mn, ts: ts, fields: make(map[string]interface{}), tags: t} + buckets[bKey] = bucket + } - // Percentage values must be scaled down by 100. - info, ok := metricInfo[name] - if !ok { - log.Printf("E! [input.vsphere]: Could not determine unit for %s. Skipping", name) + // Percentage values must be scaled down by 100. + info, ok := metricInfo[name] + if !ok { + log.Printf("E! [inputs.vsphere]: Could not determine unit for %s. Skipping", name) + } + v := alignedValues[idx] + if info.UnitInfo.GetElementDescription().Key == "percent" { + bucket.fields[fn] = float64(v) / 100.0 + } else { + bucket.fields[fn] = v + } + count++ + + // Update highwater marks + e.hwMarks.Put(moid, ts) } - if info.UnitInfo.GetElementDescription().Key == "percent" { - bucket.fields[fn] = float64(avg) / 100.0 - } else { - bucket.fields[fn] = avg + if nValues == 0 { + log.Printf("D! [inputs.vsphere]: Missing value for: %s, %s", name, objectRef.name) + continue } - count++ - - // Update highwater marks - e.hwMarks.Put(moid, latestSample) - } // We've iterated through all the metrics and collected buckets for each // measurement name. Now emit them! From 79e433e8e43f3bb2497eb0486606fb047a56b81d Mon Sep 17 00:00:00 2001 From: prydin Date: Fri, 28 Dec 2018 12:43:12 -0500 Subject: [PATCH 18/18] Removed panic handling --- plugins/inputs/vsphere/endpoint.go | 7 ------ plugins/inputs/vsphere/panic_handler.go | 28 ------------------------ plugins/inputs/vsphere/throttled_exec.go | 2 -- plugins/inputs/vsphere/vsphere.go | 1 - 4 files changed, 38 deletions(-) delete mode 100644 plugins/inputs/vsphere/panic_handler.go diff --git a/plugins/inputs/vsphere/endpoint.go b/plugins/inputs/vsphere/endpoint.go index aacf5bb2d20a4..27aca331c9e02 100644 --- a/plugins/inputs/vsphere/endpoint.go +++ b/plugins/inputs/vsphere/endpoint.go @@ -222,7 +222,6 @@ func isSimple(include []string, exclude []string) bool { func (e *Endpoint) startDiscovery(ctx context.Context) { e.discoveryTicker = time.NewTicker(e.Parent.ObjectDiscoveryInterval.Duration) go func() { - defer HandlePanic() for { select { case <-e.discoveryTicker.C: @@ -261,7 +260,6 @@ func (e *Endpoint) init(ctx context.Context) error { // Otherwise, just run it in the background. We'll probably have an incomplete first metric // collection this way. go func() { - defer HandlePanic() e.initalDiscovery(ctx) }() } @@ -646,7 +644,6 @@ func (e *Endpoint) Collect(ctx context.Context, acc telegraf.Accumulator) error if res.enabled { wg.Add(1) go func(k string) { - defer HandlePanic() defer wg.Done() err := e.collectResource(ctx, k, acc) if err != nil { @@ -798,10 +795,6 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc // Divide workload into chunks and process them concurrently e.chunkify(ctx, &res, now, latest, acc, func(chunk []types.PerfQuerySpec) { - - // Handle panics gracefully - defer HandlePanicWithAcc(acc) - n, localLatest, err := e.collectChunk(ctx, chunk, &res, acc, now, estInterval) log.Printf("D! [inputs.vsphere] CollectChunk for %s returned %d metrics", resourceType, n) if err != nil { diff --git a/plugins/inputs/vsphere/panic_handler.go b/plugins/inputs/vsphere/panic_handler.go deleted file mode 100644 index 1d6242510a972..0000000000000 --- a/plugins/inputs/vsphere/panic_handler.go +++ /dev/null @@ -1,28 +0,0 @@ -package vsphere - -import ( - "errors" - "fmt" - "log" - - "github.com/influxdata/telegraf" -) - -func HandlePanicWithAcc(acc telegraf.Accumulator) { - if p := recover(); p != nil { - switch p.(type) { - case string: - acc.AddError(errors.New(p.(string))) - case error: - acc.AddError(p.(error)) - default: - acc.AddError(fmt.Errorf("Unknown panic: %s", p)) - } - } -} - -func HandlePanic() { - if p := recover(); p != nil { - log.Printf("E! [input.vsphere] PANIC (recovered): %s", p) - } -} diff --git a/plugins/inputs/vsphere/throttled_exec.go b/plugins/inputs/vsphere/throttled_exec.go index 0259a6de42ea8..ac95b496c97fa 100644 --- a/plugins/inputs/vsphere/throttled_exec.go +++ b/plugins/inputs/vsphere/throttled_exec.go @@ -26,8 +26,6 @@ func NewThrottledExecutor(limit int) *ThrottledExecutor { func (t *ThrottledExecutor) Run(ctx context.Context, job func()) { t.wg.Add(1) go func() { - // Last resort panic handler. - defer HandlePanic() defer t.wg.Done() select { case t.limiter <- struct{}{}: diff --git a/plugins/inputs/vsphere/vsphere.go b/plugins/inputs/vsphere/vsphere.go index 5ceb14a7fd238..13186634fb51d 100644 --- a/plugins/inputs/vsphere/vsphere.go +++ b/plugins/inputs/vsphere/vsphere.go @@ -264,7 +264,6 @@ func (v *VSphere) Gather(acc telegraf.Accumulator) error { for _, ep := range v.endpoints { wg.Add(1) go func(endpoint *Endpoint) { - defer HandlePanicWithAcc(acc) defer wg.Done() err := endpoint.Collect(context.Background(), acc) if err == context.Canceled {