Skip to content

Commit

Permalink
provider/web now go through swaping/reloading engine so that it doesn…
Browse files Browse the repository at this point in the history
…'t reference things directly

removed all debug/runtime GC stuff
fixed domain max calculation for Y2 axis
  • Loading branch information
chrisruffalo committed Jun 20, 2019
1 parent 3c1eb94 commit c84cd4b
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 82 deletions.
10 changes: 10 additions & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"database/sql"
"fmt"
"github.com/chrisruffalo/gudgeon/events"
"net"
"path"
"strings"
Expand Down Expand Up @@ -98,6 +99,9 @@ type engine struct {

// map of group names to processed/configured engine groups
groups map[string]*group

// list of handles
handles []*events.Handle
}

func (engine *engine) Root() string {
Expand Down Expand Up @@ -592,6 +596,12 @@ func (engine *engine) QueryLog() QueryLog {

// clear lists and remove references
func (engine *engine) Close() {
// stop listening for events
for _, handle := range engine.handles {
if handle != nil {
handle.Close()
}
}
// close sources
engine.resolvers.Close()
// close rule store
Expand Down
79 changes: 63 additions & 16 deletions engine/engine_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"database/sql"
"encoding/base64"
"fmt"
"github.com/chrisruffalo/gudgeon/events"
"os"
"path"
"runtime"
"strings"

"github.com/GeertJohan/go.rice"
Expand Down Expand Up @@ -92,6 +92,7 @@ func newEngineWithComponents(conf *config.GudgeonConfig, db *sql.DB, recorder *r
recorder: recorder,
metrics: metrics,
qlog: queryLog,
handles: make([]*events.Handle, 0),
}

err := engine.bootstrap()
Expand Down Expand Up @@ -131,7 +132,6 @@ func (engine *engine) bootstrap() error {

// configure db if required
if *conf.Metrics.Enabled || *conf.QueryLog.Enabled {

// if persistence functions are enabled, create db if it doesn't exist
if engine.db == nil {
if (*conf.Metrics.Enabled && *conf.Metrics.Persist) || (*conf.QueryLog.Enabled && *conf.QueryLog.Persist) {
Expand Down Expand Up @@ -172,7 +172,7 @@ func (engine *engine) bootstrap() error {

// create recorder if none provided and one is required
if engine.db != nil && (engine.qlog != nil || engine.metrics != nil) && engine.recorder == nil {
engine.recorder, err = NewRecorder(engine)
engine.recorder, err = NewRecorder(conf, engine, engine.db, engine.metrics, engine.qlog)
if err != nil {
return err
}
Expand All @@ -193,13 +193,12 @@ func (engine *engine) bootstrap() error {

// process groups
for idx, configGroup := range workingGroups {
// create active group for gorup name
engineGroup := new(group)
engineGroup.engine = engine
engineGroup.configGroup = configGroup

// determine which lists belong to this group
engineGroup.lists = assignedLists(configGroup.Lists, configGroup.SafeTags(), lists)
// create active group for group name
engineGroup := &group{
engine: engine,
configGroup: configGroup,
lists: assignedLists(configGroup.Lists, configGroup.SafeTags(), lists),
}

// add created engine group to list of groups
groups[idx] = engineGroup
Expand Down Expand Up @@ -311,17 +310,65 @@ func (engine *engine) bootstrap() error {
totalRulesCounter.Inc(int64(totalCount))
}

// subscribe to rule list changes to update metrics/counts
listChangeHandle := events.Listen("store:list:changed", func(message *events.Message) {
// bail if engine metrics are nil
if message == nil{
return
}

// sends in listName, listShortName, and count
var listName string
if name, found := (*message)["listName"]; found {
if listNameString, ok := name.(string); ok {
listName = listNameString
}
}

var listShortName string
if shortName, found := (*message)["listShortName"]; found {
if shortNameString, ok := shortName.(string); ok {
listShortName = shortNameString
}
}

if "" == listName {
listName = listShortName
}

count := int64(0)
if countValue, found := (*message)["count"]; found {
if countUint64, ok := countValue.(uint64); ok {
count = int64(countUint64)
} else if countInt64, ok := countValue.(int64); ok {
count = countInt64
}
}

// just log change and leave early if no metrics are available
if engine.Metrics() == nil {
log.Infof("Reloaded list: %s (%d rules)", listName, count)
return
}

if "" != listShortName {
metric := engine.Metrics().Get("rules-list-" + listShortName)
oldCount := metric.Value()
engine.Metrics().Get(TotalRules).Inc(-1 * oldCount).Inc(int64(count))
metric.Clear().Inc(count)
}

// log info
log.Infof("Reloaded list: '%s' (%d rules, total rules: %d)", listName, count, engine.Metrics().Get(TotalRules).Value())
})
// ensure handler is closed later
engine.handles = append(engine.handles, listChangeHandle)

// set consumers as active on engine
engine.groups = groupMap
engine.consumers = consumers
engine.consumerMap = consumerMap

// force GC after loading the engine because
// of all the extra allocation that gets performed
// during the creation of the rule storage and
// all of the other bits/parts of this init
runtime.GC()

// done bootstrapping without errors
return nil
}
18 changes: 9 additions & 9 deletions engine/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
// aspects
type recorder struct {
// direct reference back to engine
engine *engine
engine Engine
conf *config.GudgeonConfig

// db access
Expand Down Expand Up @@ -96,13 +96,13 @@ type InfoRecord struct {
}

// created from raw engine
func NewRecorder(engine *engine) (*recorder, error) {
func NewRecorder(conf *config.GudgeonConfig, engine Engine, db *sql.DB, metrics Metrics, qlog QueryLog) (*recorder, error) {
recorder := &recorder{
conf: conf,
engine: engine,
db: engine.db,
conf: engine.config,
qlog: engine.qlog,
metrics: engine.metrics,
db: db,
qlog: qlog,
metrics: metrics,
infoQueue: make(chan *InfoRecord, recordQueueSize),
doneChan: make(chan bool),
}
Expand Down Expand Up @@ -220,7 +220,7 @@ func (recorder *recorder) reverseLookup(info *InfoRecord) string {
return name
}

// takes the inner (request, resposne, context, result) information
// takes the inner (request, response, context, result) information
// and moves it to relevant top-level InfoRecord information
func (recorder *recorder) condition(info *InfoRecord) {
// condition the info item
Expand Down Expand Up @@ -268,7 +268,7 @@ func (recorder *recorder) condition(info *InfoRecord) {

// the worker is intended as the goroutine that
// acts as the switchboard for async actions so
// that only one action is perormed at a time
// that only one action is performed at a time
func (recorder *recorder) worker() {
// make timer that is only activated in some ways
var (
Expand Down Expand Up @@ -516,4 +516,4 @@ func (recorder *recorder) shutdown() {
}
stmt.Close()
}
}
}
37 changes: 9 additions & 28 deletions engine/reloading_engine.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package engine

import (
"database/sql"
"fmt"
"github.com/chrisruffalo/gudgeon/events"
"net"
Expand Down Expand Up @@ -44,7 +43,7 @@ func NewReloadingEngine(confPath string, conf *config.GudgeonConfig) (Engine, er
events.Send("file:watch:clear", nil)

// reload configuration
config, warnings, err := config.Load(confPath)
conf, warnings, err := config.Load(confPath)
if err != nil {
log.Errorf("%s", err)
} else {
Expand All @@ -55,7 +54,7 @@ func NewReloadingEngine(confPath string, conf *config.GudgeonConfig) (Engine, er
}
}

reloading.swap(config)
reloading.swap(conf)

log.Infof("Configuration updated from: '%s'", confPath)
}
Expand All @@ -72,26 +71,15 @@ func NewReloadingEngine(confPath string, conf *config.GudgeonConfig) (Engine, er
// wait to swap engine until all rlocked processes have completed
// and then lock during the swap and release to resume normal operations
func (rEngine *reloadingEngine) swap(config *config.GudgeonConfig) {
// empty/nil components
var db *sql.DB
var recorder *recorder
var metrics Metrics
var qlog QueryLog
// lock engine
rEngine.mux.Lock()
defer rEngine.mux.Unlock()

// if the old engine has the proper components reuse them
if oldEngine, ok := rEngine.current.(*engine); ok {
db = oldEngine.db
recorder = oldEngine.recorder
metrics = oldEngine.metrics
qlog = oldEngine.qlog
}
// shutdown old engine
rEngine.current.Shutdown()

// build new engine
newEngine, err := newEngineWithComponents(config, db, recorder, metrics, qlog)

// lock and unlock after return
rEngine.mux.Lock()
defer rEngine.mux.Unlock()
newEngine, err := NewEngine(config)

// if engine fails then have no engine
if err != nil {
Expand All @@ -100,15 +88,8 @@ func (rEngine *reloadingEngine) swap(config *config.GudgeonConfig) {
return
}

// use new engine
oldEngine := rEngine.current
// use new engine after build (if no errors happened)
rEngine.current = newEngine

// remove references in old engine
if oldEngine != nil {
oldEngine.Close()
}

}

func (engine *reloadingEngine) IsDomainRuleMatched(consumer *net.IP, domain string) (rule.Match, *config.GudgeonList, string) {
Expand Down
8 changes: 4 additions & 4 deletions gudgeon.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func main() {

// load config
filename := string(opts.AppOptions.ConfigPath)
config, warnings, err := config.Load(filename)
conf, warnings, err := config.Load(filename)
if err != nil {
log.Errorf("%s", err)
os.Exit(1)
Expand All @@ -152,7 +152,7 @@ func main() {
}

// create new Gudgeon instance
instance := NewGudgeon(filename, config)
instance := NewGudgeon(filename, conf)

// start new instance
err = instance.Start()
Expand All @@ -167,8 +167,8 @@ func main() {
s := <-sig

// clean out session directory
if "" != config.SessionRoot() {
util.ClearDirectory(config.SessionRoot())
if "" != conf.SessionRoot() {
util.ClearDirectory(conf.SessionRoot())
}

log.Infof("Signal (%s) received, stopping", s)
Expand Down
10 changes: 8 additions & 2 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ func (provider *provider) handle(writer dns.ResponseWriter, request *dns.Msg) {
}

// write response to response writer
writer.WriteMsg(response)
err := writer.WriteMsg(response)
if err != nil {
log.Errorf("Writing response: %s", err)
}

// we were having some errors during write that we need to figure out
// and this is a good(??) way to try and find them out.
Expand All @@ -121,7 +124,10 @@ func (provider *provider) handle(writer dns.ResponseWriter, request *dns.Msg) {
}

// explicitly close the writer since it's done
writer.Close()
err = writer.Close()
if err != nil {
log.Errorf("Closing response: %s", err)
}
}

func (provider *provider) Host(config *config.GudgeonConfig, engine engine.Engine) error {
Expand Down
10 changes: 7 additions & 3 deletions rule/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,15 @@ func CreateStore(storeRoot string, config *config.GudgeonConfig) (RuleStore, []u
// save handle so it can later be used to close watchers
handle := events.Listen("file:" + config.PathToList(watchList), func(message *events.Message) {
store.Clear(config, watchList)
loadList(store, config, watchList)
log.Infof("Reloaded list: %s", watchList.CanonicalName())
newRuleCount := loadList(store, config, watchList)
// send message that a list value changed
events.Send("store:list:changed", &events.Message{
"listName": watchList.CanonicalName(),
"listShortName": watchList.ShortName(),
"count": newRuleCount,
})
// watch file again
events.Send("file:watch:start", &events.Message{"path": config.PathToList(watchList)})
// todo: update count?
})
if handle != nil {
store.handlers = append(store.handlers, handle)
Expand Down
Loading

0 comments on commit c84cd4b

Please sign in to comment.