Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify Runner dynamic launching and reloading under RunnerList #7172

Merged
merged 10 commits into from
May 29, 2018

Conversation

exekias
Copy link
Contributor

@exekias exekias commented May 24, 2018

This PR unifies all uses of dynamic runner handling under a new RunnerList structure. It can receive a list of configs as the desired state and do all the needed changes (start/stopping) to transition to it.

I plan to use this as part of #7028

@exekias exekias added in progress Pull request is currently in progress. libbeat labels May 24, 2018
return ok
}

// Start the given runner and add it to the list

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on exported method RunnerList.Add should be of the form "Add ..."

return nil
}

// StopAll runners

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on exported method RunnerList.Stop should be of the form "Stop ..."

@exekias exekias force-pushed the unify-reloadable-list branch 6 times, most recently from 26a84a5 to d975687 Compare May 25, 2018 10:42
@exekias exekias added review and removed in progress Pull request is currently in progress. labels May 25, 2018
for hash, runner := range stopList {
debugf("Stopping runner: %s", runner)
delete(r.runners, hash)
go runner.Stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change in behaviour from previous implementation where it was synchronous. This could become a problem in Filebeat or at least lead to more errors. Before a new prospector can be started on an existing state of a file, the state must be set to Finish. If the shutdown happens in go routine, this overlap can become bigger. But I think it should still work.

func (r *RunnerList) Has(hash uint64) bool {
r.mutex.Lock()
defer r.mutex.Unlock()
_, ok := r.runners[hash]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A read write mutex could be used here.

@@ -1,48 +0,0 @@
package cfgfile
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any replacement for these tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to add the file to the commit 🤦‍♂️ pushed


hash, err := hashstructure.Hash(rawCfg, nil)
if err != nil {
// Make sure the next run also updates because some runners were not properly loaded
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this case still handled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope :/ I'll have a look and find a solution

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See 4ea21f2

for h, runner := range r.runners {
debugf("Stopping runner: %s", runner)
delete(r.runners, h)
runner.Stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before runners were stopped in parallel. This is important to speed up the shutdown process especially with a large amount of modules or prospectors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, will update the code to do that


func (r *runnerFactory) Create(x beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (Runner, error) {
config := struct {
Id int64 `config:"id"`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

struct field Id should be ID

@exekias exekias force-pushed the unify-reloadable-list branch 2 times, most recently from b6727b3 to 18c4989 Compare May 27, 2018 19:57
@exekias
Copy link
Contributor Author

exekias commented May 27, 2018

Thanks @ruflin for the thorough review, I think this is ready for a 2nd look :)

@exekias
Copy link
Contributor Author

exekias commented May 28, 2018

I've added 7c8ac78 after our last discussion, that should cover cases were a Filebeat input fails to start due to unclosed states (it will keep retrying every 10s). It also unifies how autodiscover uses the cfgfile.List.Reload

@exekias exekias force-pushed the unify-reloadable-list branch 3 times, most recently from 5f9ef83 to 7c8ac78 Compare May 28, 2018 17:37
@ruflin
Copy link
Member

ruflin commented May 28, 2018

The RACE detector does not seem to be happy on Jenkins

This change adds some overhead to autodiscover, as we need to hash all
configs for each new event, but we obtain 2 great benefits:

 - We use the same code paths (Reload) for autodiscover and config
 reload.
 - Autodiscover becomes resilient to module/input initialization errors,
 specially in the case of Filebeat, failed starts will be retried.
@exekias
Copy link
Contributor Author

exekias commented May 29, 2018

fixed


err := a.runners.Reload(configs)

// On error, make sure the next run also updates because some runners were not properly loaded
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create a debug log entry for this err?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go runner.Stop()
a.runners.Remove(hash)
if a.runners.Has(hash) {
delete(a.configs, hash)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can only 1 go routine at the time access a.configs?

Where is the runner stopped now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

configs is only modified by the event loop (handleStart and handleStop), so there is only one goroutine using it. Start/Stop is now handled by runners.Reload: https://github.com/elastic/beats/pull/7172/files/63c350a7ea841fd6f2b5bea7bea4debe1c0afdf5#diff-3b0ae03a54b2e31c58f8dcf308a6a7b8R127. This is the same code path used by configuration reload mechanism

@ruflin ruflin merged commit aed5529 into elastic:master May 29, 2018
exekias pushed a commit to exekias/beats that referenced this pull request Jun 1, 2018
Introduced by elastic#7172, some inputs/modules may modify the passed
configuration, this results on a effective change on the hash of
the config. Before this change, the reload process was taking a running
config as a not running config. Resulting on a stop/start after the
first run.

Example (with some added debugging):

First run (add 1 config):
```
2018-06-01T02:08:20.184+0200    DEBUG   [autodiscover] cfgfile/list.go:53      Starting reload procedure, current runners: 0
2018-06-01T02:08:20.185+0200    INFO    cfgfile/list.go:143 map[type:docker containers:map[ids:[56322b70c8a3712494e559381c7b8a6ce62a6495e33630628e6624b75b5a7505]]]
2018-06-01T02:08:20.185+0200    INFO    cfgfile/list.go:144     Hash: %!s(uint64=12172188027786936243)
2018-06-01T02:08:20.185+0200    DEBUG   [autodiscover] cfgfile/list.go:71      Start list: 1, Stop list: 0
```

Second run (add another config, the first one gets restarted):
```
2018-06-01T02:08:20.185+0200    DEBUG   [autodiscover] cfgfile/list.go:53      Starting reload procedure, current runners: 1
2018-06-01T02:08:20.185+0200    INFO    cfgfile/list.go:143 map[paths:[/var/lib/docker/containers/56322b70c8a3712494e559381c7b8a6ce62a6495e33630628e6624b75b5a7505/*.log] docker-json:map[stream:all partial:true] type:docker containers:map[ids:[56322b70c8a3712494e559381c7b8a6ce62a6495e33630628e6624b75b5a7505]]]
2018-06-01T02:08:20.185+0200    INFO    cfgfile/list.go:144     Hash: %!s(uint64=12741034856879725532)
2018-06-01T02:08:20.185+0200    INFO    cfgfile/list.go:143 map[type:docker containers:map[ids:[a626e25679abd2b9af161277f1beee96c1bba6b9412771d17da7ebfacca640a7]]]
2018-06-01T02:08:20.185+0200    INFO    cfgfile/list.go:144     Hash: %!s(uint64=7080456881540055745)
2018-06-01T02:08:20.185+0200    DEBUG   [autodiscover] cfgfile/list.go:71      Start list: 2, Stop list: 1
```
ruflin pushed a commit that referenced this pull request Jun 1, 2018
Introduced by #7172, some inputs/modules may modify the passed
configuration, this results on a effective change on the hash of
the config. Before this change, the reload process was taking a running
config as a not running config. Resulting on a stop/start after the
first run.

Example (with some added debugging):

First run (add 1 config):
```
2018-06-01T02:08:20.184+0200    DEBUG   [autodiscover] cfgfile/list.go:53      Starting reload procedure, current runners: 0
2018-06-01T02:08:20.185+0200    INFO    cfgfile/list.go:143 map[type:docker containers:map[ids:[56322b70c8a3712494e559381c7b8a6ce62a6495e33630628e6624b75b5a7505]]]
2018-06-01T02:08:20.185+0200    INFO    cfgfile/list.go:144     Hash: %!s(uint64=12172188027786936243)
2018-06-01T02:08:20.185+0200    DEBUG   [autodiscover] cfgfile/list.go:71      Start list: 1, Stop list: 0
```

Second run (add another config, the first one gets restarted):
```
2018-06-01T02:08:20.185+0200    DEBUG   [autodiscover] cfgfile/list.go:53      Starting reload procedure, current runners: 1
2018-06-01T02:08:20.185+0200    INFO    cfgfile/list.go:143 map[paths:[/var/lib/docker/containers/56322b70c8a3712494e559381c7b8a6ce62a6495e33630628e6624b75b5a7505/*.log] docker-json:map[stream:all partial:true] type:docker containers:map[ids:[56322b70c8a3712494e559381c7b8a6ce62a6495e33630628e6624b75b5a7505]]]
2018-06-01T02:08:20.185+0200    INFO    cfgfile/list.go:144     Hash: %!s(uint64=12741034856879725532)
2018-06-01T02:08:20.185+0200    INFO    cfgfile/list.go:143 map[type:docker containers:map[ids:[a626e25679abd2b9af161277f1beee96c1bba6b9412771d17da7ebfacca640a7]]]
2018-06-01T02:08:20.185+0200    INFO    cfgfile/list.go:144     Hash: %!s(uint64=7080456881540055745)
2018-06-01T02:08:20.185+0200    DEBUG   [autodiscover] cfgfile/list.go:71      Start list: 2, Stop list: 1
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants