Skip to content

Commit

Permalink
[bugfix] recreate context on every gather call and error on slow esxc…
Browse files Browse the repository at this point in the history
…li response
  • Loading branch information
tbelda-ems committed Jun 18, 2023
1 parent f85c427 commit 9179a30
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 47 deletions.
9 changes: 3 additions & 6 deletions internal/vccollector/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,24 +280,21 @@ func (c *hostState) setNotResponding(resp bool) {
}
}

func (c *hostState) setMeanResponseTime(dur, max time.Duration) {
func (c *hostState) setMeanResponseTime(dur time.Duration) {
if c.responseTime == 0 {
c.responseTime = dur
} else {
c.responseTime = (c.responseTime + dur) / 2
}
if dur > max {
c.setNotResponding(true)
}
}

func (c *hostState) isHostConnectedAndResponding(skipDuration time.Duration) bool {
var connectedResponding bool

if !c.notConnected {
// limit notResponding in cache for skipDuration
if time.Since(c.lastNoResponse) > skipDuration {
c.notResponding = false
if !c.lastNoResponse.IsZero() && time.Since(c.lastNoResponse) > skipDuration {
c.setNotResponding(false)
}
connectedResponding = !c.notResponding
}
Expand Down
8 changes: 6 additions & 2 deletions internal/vccollector/graphics.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (c *VcCollector) CollectHostGraphics(
continue
}
res, err = x.Run([]string{"graphics", "device", "stats", "list"})
hostSt.setMeanResponseTime(time.Since(startTime), c.maxResponseDuration)
hostSt.setMeanResponseTime(time.Since(startTime))
if err != nil {
hostExecutorRunAddError(acc, "graphics device", host.Name(), err)
hostSt.setNotResponding(true)
Expand All @@ -65,8 +65,8 @@ func (c *VcCollector) CollectHostGraphics(
}
continue
}
t = time.Now()

t = time.Now()
for _, rv := range res.Values {
if len(rv) > 0 && len(rv["DeviceName"]) > 0 {
grtags["clustername"] = c.getClusternameFromHost(i, host)
Expand All @@ -84,6 +84,10 @@ func (c *VcCollector) CollectHostGraphics(
acc.AddFields("vcstat_host_graphics", grfields, grtags, t)
}
}
if t.Sub(startTime) >= c.maxResponseDuration {
hostSt.setNotResponding(true)
return fmt.Errorf("slow response from %s: %w", host.Name(), context.DeadlineExceeded)
}
}
}

Expand Down
24 changes: 18 additions & 6 deletions internal/vccollector/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (c *VcCollector) CollectHostHBA(
continue
}
res, err = x.Run([]string{"storage", "core", "adapter", "list"})
hostSt.setMeanResponseTime(time.Since(startTime), c.maxResponseDuration)
hostSt.setMeanResponseTime(time.Since(startTime))
if err != nil {
hostExecutorRunAddError(acc, "storage core", host.Name(), err)
hostSt.setNotResponding(true)
Expand All @@ -168,8 +168,8 @@ func (c *VcCollector) CollectHostHBA(
}
continue
}
t = time.Now()

t = time.Now()
for _, rv := range res.Values {
if len(rv) > 0 && len(rv["LinkState"]) > 0 {
hbatags["clustername"] = c.getClusternameFromHost(i, host)
Expand All @@ -185,6 +185,10 @@ func (c *VcCollector) CollectHostHBA(
acc.AddFields("vcstat_host_hba", hbafields, hbatags, t)
}
}
if t.Sub(startTime) >= c.maxResponseDuration {
hostSt.setNotResponding(true)
return fmt.Errorf("slow response from %s: %w", host.Name(), context.DeadlineExceeded)
}
}
}

Expand Down Expand Up @@ -231,7 +235,7 @@ func (c *VcCollector) CollectHostNIC(
continue
}
res, err = x.Run([]string{"network", "nic", "list"})
hostSt.setMeanResponseTime(time.Since(startTime), c.maxResponseDuration)
hostSt.setMeanResponseTime(time.Since(startTime))
if err != nil {
hostExecutorRunAddError(acc, "network nic", host.Name(), err)
hostSt.setNotResponding(true)
Expand All @@ -240,8 +244,8 @@ func (c *VcCollector) CollectHostNIC(
}
continue
}
t = time.Now()

t = time.Now()
for _, rv := range res.Values {
if len(rv) > 0 && len(rv["LinkStatus"]) > 0 {
nictags["clustername"] = c.getClusternameFromHost(i, host)
Expand All @@ -261,6 +265,10 @@ func (c *VcCollector) CollectHostNIC(
acc.AddFields("vcstat_host_nic", nicfields, nictags, t)
}
}
if t.Sub(startTime) >= c.maxResponseDuration {
hostSt.setNotResponding(true)
return fmt.Errorf("slow response from %s: %w", host.Name(), context.DeadlineExceeded)
}
}
}

Expand Down Expand Up @@ -307,7 +315,7 @@ func (c *VcCollector) CollectHostFw(
continue
}
res, err = x.Run([]string{"network", "firewall", "get"})
hostSt.setMeanResponseTime(time.Since(startTime), c.maxResponseDuration)
hostSt.setMeanResponseTime(time.Since(startTime))
if err != nil {
hostExecutorRunAddError(acc, "network firewall", host.Name(), err)
hostSt.setNotResponding(true)
Expand All @@ -316,8 +324,8 @@ func (c *VcCollector) CollectHostFw(
}
continue
}
t = time.Now()

t = time.Now()
if len(res.Values) > 0 && len(res.Values[0]["Enabled"]) > 0 {
fwtags["clustername"] = c.getClusternameFromHost(i, host)
fwtags["dcname"] = dc.Name()
Expand All @@ -340,6 +348,10 @@ func (c *VcCollector) CollectHostFw(

acc.AddFields("vcstat_host_firewall", fwfields, fwtags, t)
}
if t.Sub(startTime) >= c.maxResponseDuration {
hostSt.setNotResponding(true)
return fmt.Errorf("slow response from %s: %w", host.Name(), context.DeadlineExceeded)
}
}
}

Expand Down
17 changes: 9 additions & 8 deletions internal/vccollector/vccollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type VcCollector struct {

// New returns a new VcCollector associated with the provided vCenter URL
func New(
ctx context.Context,
vcenterURL, user, pass string,
clicfg *tls.ClientConfig,
dataDuration time.Duration,
Expand Down Expand Up @@ -128,20 +127,20 @@ func (c *VcCollector) SetSkipHostNotRespondingDuration(du time.Duration) {
}

// Open opens a vCenter connection session or relogin if session already exists
func (c *VcCollector) Open(ctx context.Context, timeout time.Duration) error {
func (c *VcCollector) Open(timeout time.Duration) error {
var err error

// set a login timeout
ctx1, cancel1 := context.WithTimeout(ctx, timeout)
defer cancel1()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if c.client != nil {
// Try to relogin and if not possible reopen session
if err = c.client.Login(ctx1, c.url.User); err == nil {
if err = c.client.Login(ctx, c.url.User); err == nil {
return nil
}
c.Close(ctx)
c.Close()
}
c.client, err = govplus.NewClient(ctx1, c.url, &c.ClientConfig)
c.client, err = govplus.NewClient(ctx, c.url, &c.ClientConfig)
if err == nil {
c.coll = property.DefaultCollector(c.client.Client)
}
Expand All @@ -155,11 +154,13 @@ func (c *VcCollector) IsActive(ctx context.Context) bool {
}

// Close closes vCenter connection
func (c *VcCollector) Close(ctx context.Context) {
func (c *VcCollector) Close() {
if c.client != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
_ = c.coll.Destroy(ctx) //nolint: destroy and forget old collector
govplus.CloseClient(ctx, c.client)
c.client, c.coll = nil, nil
cancel()
}
}

Expand Down
64 changes: 39 additions & 25 deletions plugins/inputs/vcstat/vcstat.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ type Config struct {

version string
pollInterval time.Duration
ctx context.Context
cancel context.CancelFunc
vcc *vccollector.VcCollector

GatherTime selfstat.Stat
Expand Down Expand Up @@ -153,12 +151,10 @@ func init() {
func (vcs *Config) Init() error {
var err error

vcs.ctx, vcs.cancel = context.WithCancel(context.Background())
if vcs.vcc != nil {
vcs.vcc.Close(vcs.ctx)
vcs.vcc.Close()
}
vcs.vcc, err = vccollector.New(
vcs.ctx,
vcs.VCenter,
vcs.Username,
vcs.Password,
Expand Down Expand Up @@ -201,9 +197,8 @@ func (vcs *Config) Init() error {
// perform shutdown tasks.
func (vcs *Config) Stop() {
if vcs.vcc != nil {
vcs.vcc.Close(vcs.ctx)
vcs.vcc.Close()
}
vcs.cancel()
}

// SetPollInterval allows telegraf shim to tell vcstat the configured polling interval
Expand Down Expand Up @@ -255,34 +250,35 @@ func (vcs *Config) Gather(acc telegraf.Accumulator) error {
err error
)

if err = vcs.keepActiveSession(acc); err != nil {
// poll using a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(vcs.Timeout))
defer cancel()

if err = vcs.keepActiveSession(ctx, acc); err != nil {
return tgplus.GatherError(acc, err)
}
acc.SetPrecision(tgplus.GetPrecision(vcs.pollInterval))

// poll using a context with timeout
ctxT, cancelT := context.WithTimeout(vcs.ctx, time.Duration(vcs.Timeout))
defer cancelT()
startTime = time.Now()

//--- Get vCenter, DCs and Clusters info
if err = vcs.gatherHighLevelEntities(ctxT, acc); err != nil {
if err = vcs.gatherHighLevelEntities(ctx, acc); err != nil {
return tgplus.GatherError(acc, err)
}

//--- Get Hosts, Networks and Storage info
if err = vcs.gatherHost(ctxT, acc); err != nil {
if err = vcs.gatherHost(ctx, acc); err != nil {
return tgplus.GatherError(acc, err)
}
if err = vcs.gatherNetwork(ctxT, acc); err != nil {
if err = vcs.gatherNetwork(ctx, acc); err != nil {
return tgplus.GatherError(acc, err)
}
if err = vcs.gatherStorage(ctxT, acc); err != nil {
if err = vcs.gatherStorage(ctx, acc); err != nil {
return tgplus.GatherError(acc, err)
}

//--- Get VM info
if err = vcs.gatherVM(ctxT, acc); err != nil {
if err = vcs.gatherVM(ctx, acc); err != nil {
return tgplus.GatherError(acc, err)
}

Expand All @@ -301,23 +297,26 @@ func (vcs *Config) Gather(acc telegraf.Accumulator) error {
}

// keepActiveSession keeps an active session with vsphere
func (vcs *Config) keepActiveSession(acc telegraf.Accumulator) error {
func (vcs *Config) keepActiveSession(
ctx context.Context,
acc telegraf.Accumulator,
) error {
var (
col *vccollector.VcCollector
err error
)

if vcs.ctx == nil || vcs.ctx.Err() != nil || vcs.vcc == nil {
if vcs.vcc == nil {
if err = vcs.Init(); err != nil {
return err
}
}
col = vcs.vcc
if !col.IsActive(vcs.ctx) {
if !col.IsActive(ctx) {
if vcs.SessionsCreated.Get() > 0 {
acc.AddError(fmt.Errorf("vCenter session not active, re-authenticating"))
}
if err = col.Open(vcs.ctx, time.Duration(vcs.Timeout)); err != nil {
if err = col.Open(time.Duration(vcs.Timeout)); err != nil {
return err
}
vcs.SessionsCreated.Incr(1)
Expand All @@ -327,7 +326,10 @@ func (vcs *Config) keepActiveSession(acc telegraf.Accumulator) error {
}

// gatherHighLevelEntities gathers datacenters and clusters stats
func (vcs *Config) gatherHighLevelEntities(ctx context.Context, acc telegraf.Accumulator) error {
func (vcs *Config) gatherHighLevelEntities(
ctx context.Context,
acc telegraf.Accumulator,
) error {
var (
col *vccollector.VcCollector
err error
Expand Down Expand Up @@ -358,7 +360,10 @@ func (vcs *Config) gatherHighLevelEntities(ctx context.Context, acc telegraf.Acc
}

// gatherHost gathers info and stats per host
func (vcs *Config) gatherHost(ctx context.Context, acc telegraf.Accumulator) error {
func (vcs *Config) gatherHost(
ctx context.Context,
acc telegraf.Accumulator,
) error {
var (
col *vccollector.VcCollector
hasEsxcliCollection bool
Expand Down Expand Up @@ -416,7 +421,10 @@ func (vcs *Config) gatherHost(ctx context.Context, acc telegraf.Accumulator) err
}

// gatherNetwork gathers network entities info
func (vcs *Config) gatherNetwork(ctx context.Context, acc telegraf.Accumulator) error {
func (vcs *Config) gatherNetwork(
ctx context.Context,
acc telegraf.Accumulator,
) error {
var (
col *vccollector.VcCollector
err error
Expand All @@ -439,7 +447,10 @@ func (vcs *Config) gatherNetwork(ctx context.Context, acc telegraf.Accumulator)
}

// gatherStorage gathers storage entities info
func (vcs *Config) gatherStorage(ctx context.Context, acc telegraf.Accumulator) error {
func (vcs *Config) gatherStorage(
ctx context.Context,
acc telegraf.Accumulator,
) error {
if vcs.DatastoreInstances {
var col *vccollector.VcCollector
var err error
Expand All @@ -456,7 +467,10 @@ func (vcs *Config) gatherStorage(ctx context.Context, acc telegraf.Accumulator)
}

// gatherVM gathers virtual machines info
func (vcs *Config) gatherVM(ctx context.Context, acc telegraf.Accumulator) error {
func (vcs *Config) gatherVM(
ctx context.Context,
acc telegraf.Accumulator,
) error {
if vcs.VMInstances {
var col *vccollector.VcCollector
var err error
Expand Down

0 comments on commit 9179a30

Please sign in to comment.