Skip to content

Commit

Permalink
Bugfixes and refactoring in osquery runtime
Browse files Browse the repository at this point in the history
- Correctly detect when error channel is closed (potential fix for #134).
  Previously the logic was inverted for whether the channel was closed, so
  recovery was not initiated. Unit test TestOsqueryDies repros the suspected
  issue.
- Allow logger to be set properly.
- Add logging around recovery scenarios.
- Check communication with both osquery and extension server in health check
  (previously only the extension server was checked).
- Add healthcheck on interval that causes recovery on failure (Closes #141).
  • Loading branch information
zwass committed Oct 7, 2017
1 parent 7041df6 commit 0fb56bc
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 26 deletions.
1 change: 1 addition & 0 deletions cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func main() {

// Start the osqueryd instance
instance, err := osquery.LaunchOsqueryInstance(
osquery.WithLogger(logger),
osquery.WithOsquerydBinary(opts.osquerydPath),
osquery.WithRootDirectory(rootDirectory),
osquery.WithConfigPluginFlag("kolide_grpc"),
Expand Down
91 changes: 67 additions & 24 deletions osquery/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
type OsqueryInstance struct {
*osqueryInstanceFields
instanceLock sync.Mutex
logger log.Logger
}

// osqueryInstanceFields is a type which is embedded in OsqueryInstance so that
Expand All @@ -48,6 +47,7 @@ type osqueryInstanceFields struct {
stdout io.Writer
stderr io.Writer
retries uint
logger log.Logger

// the following are instance artifacts that are created and held as a result
// of launching an osqueryd process
Expand Down Expand Up @@ -129,8 +129,12 @@ func createOsquerydCommand(osquerydBinary string, paths *osqueryFilePaths, confi
"--force=true",
"--disable_watchdog",
)
cmd.Stdout = stdout
cmd.Stderr = stderr
if stdout != nil {
cmd.Stdout = stdout
}
if stderr != nil {
cmd.Stderr = stderr
}

return cmd, nil
}
Expand All @@ -152,6 +156,14 @@ func osqueryTempDir() (string, func(), error) {
// https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis
type OsqueryInstanceOption func(*OsqueryInstance)

// WithLogger is a functional option which allows the user to pass a log.Logger
// to be used for logging osquery instance status.
func WithLogger(logger log.Logger) OsqueryInstanceOption {
return func(i *OsqueryInstance) {
i.logger = logger
}
}

// WithOsqueryExtensionPlugin is a functional option which allows the user to
// declare a number of osquery plugins (ie: config plugin, logger plugin, tables,
// etc) which can be loaded when calling LaunchOsqueryInstance. You can load as
Expand Down Expand Up @@ -267,11 +279,10 @@ func LaunchOsqueryInstance(opts ...OsqueryInstanceOption) (*OsqueryInstance, err
// caller.
o := &OsqueryInstance{
osqueryInstanceFields: &osqueryInstanceFields{
stdout: ioutil.Discard,
stderr: ioutil.Discard,
rmRootDirectory: func() {},
errs: make(chan error),
clientLock: new(sync.Mutex),
logger: log.NewNopLogger(),
},
}

Expand Down Expand Up @@ -436,21 +447,47 @@ func launchOsqueryInstance(o *OsqueryInstance) (*OsqueryInstance, error) {
// Launch a long-running recovery goroutine which can handle various errors
// that can occur
go func() {
// Block until an error is generated by the osqueryd process itself or the
// extension manager server. We don't select, because if one element of the
// runtime produces an error, it's likely that all of the other components
// will produce errors as well since everything is so interconnected. For
// this reason, when any error occurs, we attempt a total recovery.
runtimeError, done := <-errChannel
if done {
return
}
if recoveryError := o.Recover(runtimeError); recoveryError != nil {
// If we were not able to recover the osqueryd process for some reason,
// kill the process and hope that the operating system scheduling
// mechanism (launchd, etc) can relaunch the tool cleanly.
level.Info(o.logger).Log("err", errors.Wrap(recoveryError, "could not recover the osqueryd process"))
os.Exit(1)
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()

for {
needsRecovery := false
select {
case <-ticker.C:
healthy, err := o.Healthy()
if err != nil {
needsRecovery = true
level.Error(o.logger).Log("err", errors.Wrap(err, "checking instance health"))
}
if !healthy {
needsRecovery = true
level.Error(o.logger).Log("msg", "instance not healthy")
}

// Block until an error is generated by the osqueryd process itself or the
// extension manager server. We don't select, because if one element of the
// runtime produces an error, it's likely that all of the other components
// will produce errors as well since everything is so interconnected. For
// this reason, when any error occurs, we attempt a total recovery.
case runtimeError, open := <-errChannel:
if !open {
return
}
needsRecovery = true
level.Error(o.logger).Log("err", errors.Wrap(runtimeError, "osquery runtime error"))
}

if needsRecovery {
level.Info(o.logger).Log("msg", "recovering osquery instance")
if recoveryError := o.Recover(); recoveryError != nil {
// If we were not able to recover the osqueryd process for some reason,
// kill the process and hope that the operating system scheduling
// mechanism (launchd, etc) can relaunch the tool cleanly.
level.Error(o.logger).Log("err", errors.Wrap(recoveryError, "could not recover the osqueryd process"))
os.Exit(1)
}
return
}
}
}()

Expand All @@ -471,7 +508,7 @@ func (o *OsqueryInstance) beginTeardown() bool {
// release resources because Kill() expects the osquery instance to be healthy,
// whereas Recover() expects a hostile environment and is slightly more
// defensive in it's actions.
func (o *OsqueryInstance) Recover(runtimeError error) error {
func (o *OsqueryInstance) Recover() error {
// If the user explicitly calls o.Kill(), as the components are shutdown, they
// may exit with errors. In this case, we shouldn't recover the
// instance.
Expand Down Expand Up @@ -545,11 +582,16 @@ func (o *OsqueryInstance) Restart() error {
// being managed by the current instantiation of this OsqueryInstance is
// healthy.
func (o *OsqueryInstance) Healthy() (bool, error) {
status, err := o.extensionManagerServer.Ping()
serverStatus, err := o.extensionManagerServer.Ping()
if err != nil {
return false, errors.Wrap(err, "could not ping extension server")
}

clientStatus, err := o.extensionManagerClient.Ping()
if err != nil {
return false, errors.Wrap(err, "could not ping osquery through extension interface")
return false, errors.Wrap(err, "could not ping osquery extension client")
}
return status.Code == 0, nil
return serverStatus.Code == 0 && clientStatus.Code == 0, nil
}

func (o *OsqueryInstance) Query(query string) ([]map[string]string, error) {
Expand Down Expand Up @@ -579,6 +621,7 @@ func (o *OsqueryInstance) relaunchAndReplace() error {
WithLoggerPluginFlag(o.loggerPluginFlag),
WithDistributedPluginFlag(o.distributedPluginFlag),
WithRetries(o.retries),
WithLogger(o.logger),
}
if !o.usingTempDir {
opts = append(opts, WithRootDirectory(o.rootDirectory))
Expand Down
28 changes: 26 additions & 2 deletions osquery/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,32 @@ func TestRecover(t *testing.T) {
)
require.NoError(t, err)

require.NoError(t, instance.Recover(errors.New("fabricated in a test")))
require.NoError(t, instance.Recover(errors.New("fabricated in a test")))
require.NoError(t, instance.Recover())
require.NoError(t, instance.Recover())
time.Sleep(1 * time.Second)

healthy, err := instance.Healthy()
require.NoError(t, err)
require.True(t, healthy)

require.NoError(t, instance.Kill())
}

func TestOsqueryDies(t *testing.T) {
t.Parallel()
rootDirectory, rmRootDirectory, err := osqueryTempDir()
require.NoError(t, err)
defer rmRootDirectory()

require.NoError(t, buildOsqueryExtensionInBinDir(getBinDir(t)))
instance, err := LaunchOsqueryInstance(
WithRootDirectory(rootDirectory),
WithRetries(3),
)
require.NoError(t, err)

require.NoError(t, instance.cmd.Process.Kill())
time.Sleep(3 * time.Second)

healthy, err := instance.Healthy()
require.NoError(t, err)
Expand Down

0 comments on commit 0fb56bc

Please sign in to comment.