Skip to content

Commit

Permalink
Fix race conditions in runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
carabasdaniel committed Dec 12, 2023
1 parent 9ac9f1c commit 753faae
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 10 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@
"stretchr",
"TLSCA",
"Wrapf"
]
],
"go.testFlags": ["-race"]
}
32 changes: 27 additions & 5 deletions plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (r *Runtime) pluginsLoaded() bool {
return false
}

forceBundleUpdate := false

statusChannel := make(chan (bool), 1)

Check failure on line 77 in plugins.go

View workflow job for this annotation

GitHub Actions / test

typeUnparen: could simplify chan (bool) to chan bool (gocritic)

pluginStates := r.pluginsManager.PluginStatus()
for pluginName, status := range pluginStates {
if status == nil || status.State == plugins.StateOK {
Expand All @@ -88,20 +92,36 @@ func (r *Runtime) pluginsLoaded() bool {
if pluginName == bundlePluginName || status.State == plugins.StateNotReady {
bundles, err := getBundles(context.Background(), r)
if err == nil && len(bundles) > 0 {
// if bundle plugin state is not ready after a reconfiguration, forcefully update plugin state if bundles are loaded.
r.pluginsManager.UpdatePluginStatus(pluginName, &plugins.Status{State: plugins.StateOK})
forceBundleUpdate = true
}
}

r.Logger.Trace().Str("state", string(status.State)).Str("plugin-name", pluginName).Msg("plugin not ready")
return false
statusChannel <- false
}

return true
if forceBundleUpdate {
// if bundle plugin state is not ready after a reconfiguration, forcefully update plugin state if bundles are loaded.
r.pluginsManager.UpdatePluginStatus(bundlePluginName, &plugins.Status{State: plugins.StateOK})
}

select {
case result := <-statusChannel:
{

Check failure on line 110 in plugins.go

View workflow job for this annotation

GitHub Actions / test

unnecessaryBlock: case statement doesn't require a block statement (gocritic)
return result
}
default:
{

Check failure on line 114 in plugins.go

View workflow job for this annotation

GitHub Actions / test

unnecessaryBlock: case statement doesn't require a block statement (gocritic)
return true
}
}
}

// nolint TODO: This change would require upstream changes in OPA
func (r *Runtime) bundlesStatusCallback(status bundle.Status) {
if r.lock.TryLock() {
defer r.lock.Unlock()
}
errs := status.Errors
if status.Code == bundleErrorCode {
errs = append(errs, errors.Errorf("bundle error: %s", status.Message))
Expand All @@ -122,6 +142,9 @@ func (r *Runtime) bundlesStatusCallback(status bundle.Status) {
//nolint // hugeParam - the status is heavy 200 bytes, upstream changes might be welcomed

func (r *Runtime) pluginStatusCallback(statusDetails map[string]*plugins.Status) {
if r.lock.TryLock() {
defer r.lock.Unlock()
}
for n, s := range statusDetails {
if n == bundlePluginName && !r.bundlesCallbackRegistered {
plugin := r.pluginsManager.Plugin(bundlePluginName)
Expand Down Expand Up @@ -158,6 +181,5 @@ func (r *Runtime) pluginStatusCallback(statusDetails map[string]*plugins.Status)
r.pluginStates.Store(n, &pluginState{loaded: true})
}
}

r.latestState = r.status()
}
11 changes: 7 additions & 4 deletions runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ type Runtime struct {
bundleStates *sync.Map
bundlesCallbackRegistered bool
discoveryCallbackRegistered bool

storage storage.Store
latestState *State
storage storage.Store
latestState *State
lock sync.Mutex
}

type BundleState struct {
Expand All @@ -69,6 +69,7 @@ type State struct {
Ready bool
Errors []error
Bundles []BundleState
lock sync.Mutex
}

var builtinsLock sync.Mutex
Expand Down Expand Up @@ -256,6 +257,9 @@ func (r *Runtime) status() *State {
Errors: []error{},
Bundles: []BundleState{},
}
if result.lock.TryLock() {
defer result.lock.Unlock()
}

r.pluginStates.Range(func(key, value interface{}) bool {
pluginName := key.(string)
Expand All @@ -267,7 +271,6 @@ func (r *Runtime) status() *State {
if state.err != nil {
result.Errors = append(result.Errors, errors.Wrapf(state.err, "plugin '%s' encountered an error", pluginName))
}

return true
})

Expand Down

0 comments on commit 753faae

Please sign in to comment.