Skip to content

Commit

Permalink
Provide isolatedCollectd option for native collectd SA monitors (op…
Browse files Browse the repository at this point in the history
…en-telemetry#2957)

* Add ability to run isolated collectd subprocess

* sort logrus fields for zap consistency

* tests: add collectd-mysql and isolated logging coverage
  • Loading branch information
rmfitzpatrick authored Apr 14, 2023
1 parent 00a9165 commit 05192a3
Show file tree
Hide file tree
Showing 15 changed files with 1,003 additions and 64 deletions.
6 changes: 6 additions & 0 deletions pkg/receiver/smartagentreceiver/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package smartagentreceiver
import (
"fmt"
"io"
"sort"
"strings"
"sync"

Expand Down Expand Up @@ -136,6 +137,11 @@ func (l *logrusToZap) Fire(entry *logrus.Entry) error {
monitorID: monitorID,
})

sort.Slice(fields, func(i, j int) bool {
fI, fJ := fields[i], fields[j]
return fI.Key < fJ.Key
})

if ce := zapLogger.Check(logrusToZapLevel[entry.Level], entry.Message); ce != nil {
ce.Time = entry.Time
// clear stack so that it's not for parent Check()
Expand Down
13 changes: 13 additions & 0 deletions pkg/receiver/smartagentreceiver/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ func TestRedirectMonitorLogsWithMissingMapEntryUsesDefaultLogger(t *testing.T) {
assert.Equal(t, 0, zap1Logs.Len())
require.Equal(t, 1, defaultZapLogs.Len())
require.Equal(t, msg1, defaultZapLogs.All()[0].Message)
require.Equal(t, []zapcore.Field{
{Key: "monitorID", Type: zapcore.StringType, String: "id1"},
{Key: "monitorType", Type: zapcore.StringType, String: "monitor1"},
}, defaultZapLogs.All()[0].Context)

})
}
}
Expand Down Expand Up @@ -348,6 +353,14 @@ func TestRedirectSameMonitorManyInstancesLogs(t *testing.T) {
require.Equal(t, msg1, zap1Logs.All()[0].Message)
require.Equal(t, 1, zap2Logs.Len())
require.Equal(t, msg2, zap2Logs.All()[0].Message)
require.Equal(t, []zapcore.Field{
{Key: "monitorID", Type: zapcore.StringType, String: "id1"},
{Key: "monitorType", Type: zapcore.StringType, String: "monitor1"},
}, zap1Logs.All()[0].Context)
require.Equal(t, []zapcore.Field{
{Key: "monitorID", Type: zapcore.StringType, String: "id2"},
{Key: "monitorType", Type: zapcore.StringType, String: "monitor1"},
}, zap2Logs.All()[0].Context)
})
}
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/signalfx-agent/pkg/core/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (
"runtime"
"strings"

"github.com/signalfx/signalfx-agent/pkg/utils/timeutil"

"github.com/mitchellh/hashstructure"
log "github.com/sirupsen/logrus"

"github.com/signalfx/signalfx-agent/pkg/core/common/constants"
"github.com/signalfx/signalfx-agent/pkg/core/config/sources"
"github.com/signalfx/signalfx-agent/pkg/core/config/validation"
"github.com/signalfx/signalfx-agent/pkg/utils"
"github.com/signalfx/signalfx-agent/pkg/utils/hostfs"
log "github.com/sirupsen/logrus"
"github.com/signalfx/signalfx-agent/pkg/utils/timeutil"
)

const (
Expand Down Expand Up @@ -389,7 +389,8 @@ type CollectdConfig struct {
InstanceName string `yaml:"-"`
// A hack to allow custom collectd to easily specify a single monitorID via
// query parameter
WriteServerQuery string `yaml:"-"`
WriteServerQuery string `yaml:"-"`
Logger log.FieldLogger `yaml:"-"`
}

// Validate the collectd specific config
Expand Down
8 changes: 6 additions & 2 deletions pkg/signalfx-agent/pkg/core/config/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"strings"

"github.com/mitchellh/hashstructure"
log "github.com/sirupsen/logrus"
yaml "gopkg.in/yaml.v2"

"github.com/signalfx/signalfx-agent/pkg/core/common/constants"
"github.com/signalfx/signalfx-agent/pkg/core/dpfilters"
"github.com/signalfx/signalfx-agent/pkg/monitors/types"
log "github.com/sirupsen/logrus"
yaml "gopkg.in/yaml.v2"
)

// MonitorConfig is used to configure monitor instances. One instance of
Expand Down Expand Up @@ -132,6 +133,9 @@ type MonitorConfig struct {
// emitted by default. A metric group is simply a collection of metrics,
// and they are defined in each monitor's documentation.
ExtraGroups []string `yaml:"extraGroups" json:"extraGroups"`
// If this is a native collectd plugin-based monitor it will
// run its own collectd subprocess. No effect otherwise.
IsolatedCollectd bool `yaml:"isolatedCollectd" json:"isolatedCollectd"`
// OtherConfig is everything else that is custom to a particular monitor
OtherConfig map[string]interface{} `yaml:",inline" neverLog:"omit"`
Hostname string `yaml:"-" json:"-"`
Expand Down
10 changes: 8 additions & 2 deletions pkg/signalfx-agent/pkg/monitors/collectd/collectd.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,18 @@ func MainInstance() *Manager {
// InitCollectd makes a new instance of a manager and initializes it, but does
// not start collectd
func InitCollectd(conf *config.CollectdConfig) *Manager {
logger := conf.Logger
if logger == nil {
logger = log.StandardLogger()
}
logger = logger.WithField("collectdInstance", conf.InstanceName)

manager := &Manager{
conf: conf,
activeMonitors: make(map[types.MonitorID]types.Output),
genericJMXUsers: make(map[types.MonitorID]bool),
requestRestart: make(chan struct{}),
logger: log.WithField("collectdInstance", conf.InstanceName),
logger: logger,
}
manager.deleteExistingConfig()

Expand Down Expand Up @@ -305,7 +311,7 @@ func (cm *Manager) manageCollectd(initCh chan<- struct{}, terminated chan struct
go func() {
scanner := utils.ChunkScanner(output)
for scanner.Scan() {
logLine(scanner.Text(), cm.logger)
cm.logLine(scanner.Text())
}
}()

Expand Down
17 changes: 2 additions & 15 deletions pkg/signalfx-agent/pkg/monitors/collectd/custom/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/signalfx/signalfx-agent/pkg/core/config"
"github.com/signalfx/signalfx-agent/pkg/monitors"
"github.com/signalfx/signalfx-agent/pkg/monitors/collectd"
"github.com/signalfx/signalfx-agent/pkg/utils"
)

func init() {
Expand Down Expand Up @@ -120,19 +119,7 @@ func (cm *Monitor) Configure(conf *Config) error {
if err != nil {
return err
}

collectdConf := *collectd.MainInstance().Config()

collectdConf.WriteServerPort = 0
collectdConf.WriteServerQuery = "?monitorID=" + string(conf.MonitorID)
collectdConf.InstanceName = "monitor-" + string(conf.MonitorID)
collectdConf.ReadThreads = utils.FirstNonZero(conf.CollectdReadThreads, utils.MinInt(len(conf.allTemplates()), 10))
collectdConf.WriteThreads = 1
collectdConf.WriteQueueLimitHigh = 10000
collectdConf.WriteQueueLimitLow = 10000
collectdConf.IntervalSeconds = conf.IntervalSeconds

cm.MonitorCore.SetCollectdInstance(collectd.InitCollectd(&collectdConf))

// always run an isolated collectd instance per monitor instance
conf.IsolatedCollectd = true
return cm.SetConfigurationAndRun(conf)
}
9 changes: 4 additions & 5 deletions pkg/signalfx-agent/pkg/monitors/collectd/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"regexp"
"strings"

log "github.com/sirupsen/logrus"

"github.com/signalfx/signalfx-agent/pkg/utils"
)

Expand All @@ -18,17 +16,18 @@ var logRE = regexp.MustCompile(
`(?:\[(?P<level>\w+?)\] )?` +
`(?P<message>(?:(?P<plugin>[\w-]+?): )?.*)`)

func logLine(line string, logger log.FieldLogger) {
func (cm *Manager) logLine(line string) {
groups := utils.RegexpGroupMap(logRE, line)

logger := cm.logger
var level string
var message string
if groups == nil {
level = "info"
message = line
} else {
if groups["plugin"] != "" {
logger = logger.WithField("plugin", groups["plugin"])
if plugin := groups["plugin"]; plugin != "" {
logger = logger.WithField("plugin", plugin)
}

level = groups["level"]
Expand Down
91 changes: 55 additions & 36 deletions pkg/signalfx-agent/pkg/monitors/collectd/monitorcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,101 +31,120 @@ type MonitorCore struct {
lock sync.Mutex
UsesGenericJMX bool
collectdInstanceOverride *Manager
logger log.FieldLogger
}

// NewMonitorCore creates a new initialized but unconfigured MonitorCore with
// the given template.
func NewMonitorCore(template *template.Template) *MonitorCore {
return &MonitorCore{
Template: template,
logger: log.StandardLogger(),
}
}

// Init generates a unique file name for each distinct monitor instance
func (bm *MonitorCore) Init() error {
InjectTemplateFuncs(bm.Template)
func (mc *MonitorCore) Init() error {
InjectTemplateFuncs(mc.Template)

return nil
}

// SetCollectdInstance allows you to override the instance of collectd used by
// this monitor
func (bm *MonitorCore) SetCollectdInstance(instance *Manager) {
bm.collectdInstanceOverride = instance
func (mc *MonitorCore) SetCollectdInstance(instance *Manager) {
mc.collectdInstanceOverride = instance
}

func (bm *MonitorCore) collectdInstance() *Manager {
if bm.collectdInstanceOverride != nil {
return bm.collectdInstanceOverride
func (mc *MonitorCore) collectdInstance() *Manager {
if mc.collectdInstanceOverride != nil {
return mc.collectdInstanceOverride
}
return MainInstance()
}

// SetConfigurationAndRun sets the configuration to be used when rendering
// templates, and writes config before queueing a collectd restart.
func (bm *MonitorCore) SetConfigurationAndRun(conf config.MonitorCustomConfig) error {
bm.lock.Lock()
defer bm.lock.Unlock()

bm.config = conf
bm.monitorID = conf.MonitorConfigCore().MonitorID
func (mc *MonitorCore) SetConfigurationAndRun(conf config.MonitorCustomConfig) error {
mc.lock.Lock()
defer mc.lock.Unlock()

mConf := conf.MonitorConfigCore()
mc.monitorID = mConf.MonitorID
mc.logger = mc.logger.WithFields(log.Fields{"monitorType": conf.MonitorConfigCore().Type, "monitorID": string(mc.monitorID)})

if mConf.IsolatedCollectd {
cconf := *MainInstance().Config()
cconf.WriteServerPort = 0
cconf.WriteServerQuery = "?monitorID=" + string(mConf.MonitorID)
cconf.InstanceName = "monitor-" + string(mConf.MonitorID)
cconf.ReadThreads = 10
cconf.WriteThreads = 1
cconf.WriteQueueLimitHigh = 10000
cconf.WriteQueueLimitLow = 10000
cconf.IntervalSeconds = mConf.IntervalSeconds
cconf.Logger = mc.logger
mc.logger.Info(fmt.Sprintf("starting isolated configd instance %q", cconf.InstanceName))
mc.SetCollectdInstance(InitCollectd(&cconf))
}

bm.configFilename = fmt.Sprintf("20-%s.%s.conf", bm.Template.Name(), string(bm.monitorID))
mc.config = conf
mc.configFilename = fmt.Sprintf("20-%s.%s.conf", mc.Template.Name(), string(mc.monitorID))

if err := bm.WriteConfigForPlugin(); err != nil {
if err := mc.WriteConfigForPlugin(); err != nil {
return err
}
return bm.SetConfiguration(conf)
return mc.SetConfiguration()
}

// SetConfiguration adds various fields from the config to the template context
// but does not render the config.
func (bm *MonitorCore) SetConfiguration(conf config.MonitorCustomConfig) error {
return bm.collectdInstance().ConfigureFromMonitor(bm.monitorID, bm.Output, bm.UsesGenericJMX)
func (mc *MonitorCore) SetConfiguration() error {
return mc.collectdInstance().ConfigureFromMonitor(mc.monitorID, mc.Output, mc.UsesGenericJMX)
}

// WriteConfigForPlugin will render the config template to the filesystem and
// queue a collectd restart
func (bm *MonitorCore) WriteConfigForPlugin() error {
func (mc *MonitorCore) WriteConfigForPlugin() error {
pluginConfigText := bytes.Buffer{}

err := bm.Template.Execute(&pluginConfigText, bm.config)
err := mc.Template.Execute(&pluginConfigText, mc.config)
if err != nil {
return fmt.Errorf("Could not render collectd config file for %s. Context was %#v %w",
bm.Template.Name(), bm.config, err)
mc.Template.Name(), mc.config, err)
}

log.WithFields(log.Fields{
"renderPath": bm.renderPath(),
"context": bm.config,
mc.logger.WithFields(log.Fields{
"renderPath": mc.renderPath(),
"context": mc.config,
}).Debug("Writing collectd plugin config file")

if err := WriteConfFile(pluginConfigText.String(), bm.renderPath()); err != nil {
log.WithFields(log.Fields{
if err := WriteConfFile(pluginConfigText.String(), mc.renderPath()); err != nil {
mc.logger.WithFields(log.Fields{
"error": err,
"path": bm.renderPath(),
"path": mc.renderPath(),
}).Error("Could not render collectd plugin config")
return err
}

return nil
}

func (bm *MonitorCore) renderPath() string {
return filepath.Join(bm.collectdInstance().ManagedConfigDir(), bm.configFilename)
func (mc *MonitorCore) renderPath() string {
return filepath.Join(mc.collectdInstance().ManagedConfigDir(), mc.configFilename)
}

// RemoveConfFile deletes the collectd config file for this monitor
func (bm *MonitorCore) RemoveConfFile() {
os.Remove(bm.renderPath())
func (mc *MonitorCore) RemoveConfFile() {
os.Remove(mc.renderPath())
}

// Shutdown removes the config file and restarts collectd
func (bm *MonitorCore) Shutdown() {
log.WithFields(log.Fields{
"path": bm.renderPath(),
func (mc *MonitorCore) Shutdown() {
mc.logger.WithFields(log.Fields{
"path": mc.renderPath(),
}).Debug("Removing collectd plugin config")

bm.RemoveConfFile()
bm.collectdInstance().MonitorDidShutdown(bm.monitorID)
mc.RemoveConfFile()
mc.collectdInstance().MonitorDidShutdown(mc.monitorID)
}
1 change: 1 addition & 0 deletions tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/Masterminds/sprig/v3 v3.2.3
github.com/docker/docker v23.0.3+incompatible
github.com/docker/go-connections v0.4.0
github.com/go-sql-driver/mysql v1.4.0
github.com/google/uuid v1.3.0
github.com/shirou/gopsutil/v3 v3.23.3
github.com/signalfx/com_signalfx_metrics_protobuf v0.0.3
Expand Down
1 change: 1 addition & 0 deletions tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ github.com/go-openapi/validate v0.19.3/go.mod h1:90Vh6jjkTn+OT1Eefm0ZixWNFjhtOH7
github.com/go-openapi/validate v0.19.8/go.mod h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85nY1c2z52x1Gk4=
github.com/go-openapi/validate v0.21.0/go.mod h1:rjnrwK57VJ7A8xqfpAOEKRH8yQSGUriMu5/zuPSQ1hg=
github.com/go-openapi/validate v0.22.0/go.mod h1:rjnrwK57VJ7A8xqfpAOEKRH8yQSGUriMu5/zuPSQ1hg=
github.com/go-sql-driver/mysql v1.4.0 h1:7LxgVwFb2hIQtMm87NdgAVfXjnt4OePseqT1tKx+opk=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4=
Expand Down
Loading

0 comments on commit 05192a3

Please sign in to comment.