Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

start and stop modules in correct order #285

Merged
merged 3 commits into from
Feb 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 19 additions & 25 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,12 @@ type Loki struct {
store chunk.Store

httpAuthMiddleware middleware.Interface

inited map[moduleName]struct{}
}

// New makes a new Loki.
func New(cfg Config) (*Loki, error) {
loki := &Loki{
cfg: cfg,
inited: map[moduleName]struct{}{},
cfg: cfg,
}

loki.setupAuthMiddleware()
Expand Down Expand Up @@ -111,24 +108,23 @@ func (t *Loki) setupAuthMiddleware() {
}

func (t *Loki) init(m moduleName) error {
if _, ok := t.inited[m]; ok {
return nil
}

for _, dep := range modules[m].deps {
if err := t.init(dep); err != nil {
// initialize all of our dependencies first
for _, dep := range orderedDeps(m) {
if err := t.initModule(dep); err != nil {
return err
}
}
// lastly, initialize the requested module
return t.initModule(m)
}

func (t *Loki) initModule(m moduleName) error {
level.Info(util.Logger).Log("msg", "initialising", "module", m)
if modules[m].init != nil {
if err := modules[m].init(t); err != nil {
return errors.Wrap(err, fmt.Sprintf("error initialising module: %s", m))
}
}

t.inited[m] = struct{}{}
return nil
}

Expand All @@ -145,21 +141,19 @@ func (t *Loki) Stop() error {
}

func (t *Loki) stop(m moduleName) {
if _, ok := t.inited[m]; !ok {
return
}
delete(t.inited, m)

for _, dep := range modules[m].deps {
t.stop(dep)
}

if modules[m].stop == nil {
return
t.stopModule(m)
deps := orderedDeps(m)
// iterate over our deps in reverse order and call stopModule
for i := len(deps) - 1; i >= 0; i-- {
t.stopModule(deps[i])
}
}

func (t *Loki) stopModule(m moduleName) {
level.Info(util.Logger).Log("msg", "stopping", "module", m)
if err := modules[m].stop(t); err != nil {
level.Error(util.Logger).Log("msg", "error stopping", "module", m, "err", err)
if modules[m].stop != nil {
if err := modules[m].stop(t); err != nil {
level.Error(util.Logger).Log("msg", "error stopping", "module", m, "err", err)
}
}
}
48 changes: 47 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (m moduleName) String() string {
case All:
return "all"
default:
panic(fmt.Sprintf("unknow module name: %d", m))
panic(fmt.Sprintf("unknown module name: %d", m))
}
}

Expand Down Expand Up @@ -178,6 +178,52 @@ func (t *Loki) stopStore() error {
return nil
}

// listDeps recursively gets a list of dependencies for a passed moduleName
func listDeps(m moduleName) []moduleName {
deps := modules[m].deps
for _, d := range modules[m].deps {
deps = append(deps, listDeps(d)...)
}
return deps
}
tomwilkie marked this conversation as resolved.
Show resolved Hide resolved

// orderedDeps gets a list of all dependencies ordered so that items are always after any of their dependencies.
func orderedDeps(m moduleName) []moduleName {
deps := listDeps(m)

// get a unique list of moduleNames, with a flag for whether they have been added to our result
uniq := map[moduleName]bool{}
for _, dep := range deps {
uniq[dep] = false
}

result := make([]moduleName, 0, len(uniq))

// keep looping through all modules until they have all been added to the result.

for len(result) < len(uniq) {
OUTER:
for name, added := range uniq {
if added {
continue
}
for _, dep := range modules[name].deps {
// stop processing this module if one of its dependencies has
// not been added to the result yet.
if !uniq[dep] {
continue OUTER
}
}

// if all of the module's dependencies have been added to the result slice,
// then we can safely add this module to the result slice as well.
uniq[name] = true
result = append(result, name)
}
}
return result
}

type module struct {
deps []moduleName
init func(t *Loki) error
Expand Down
21 changes: 21 additions & 0 deletions pkg/loki/modules_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package loki

import (
"testing"
)

func TestGetDeps(t *testing.T) {
for _, m := range []moduleName{All, Distributor, Ingester, Querier} {
deps := orderedDeps(m)
seen := make(map[moduleName]struct{})
// make sure that getDeps always orders dependencies correctly.
for _, d := range deps {
seen[d] = struct{}{}
for _, dep := range modules[d].deps {
if _, ok := seen[dep]; !ok {
t.Errorf("module %s has dependency %s which has not been seen.", d, dep)
}
}
}
}
}