Skip to content

Commit

Permalink
removed a few conditions where memory couldn't be released
Browse files Browse the repository at this point in the history
removed some goroutine leaks
added servicetime to requests and average service time to graphs
replaced message bus with another one
improved code reuse in chart/dashboard layout code (drop down options are reused)
  • Loading branch information
chrisruffalo committed Jun 21, 2019
1 parent c84cd4b commit c8e6c9c
Show file tree
Hide file tree
Showing 19 changed files with 301 additions and 181 deletions.
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ func Load(filename string) (*GudgeonConfig, []string, error) {
config = root.Config
}

if config == nil {
return nil, []string{}, fmt.Errorf("Loaded a nil configuration")
}

// get warnings and errors
addWarnings, errors := config.verifyAndInit()
warnings = append(warnings, addWarnings...)
Expand Down
54 changes: 43 additions & 11 deletions engine/mdns.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ var hostnameReadPriority = []string{"txt:fn", "txt:f", "txt:md", "name", "name6"
func MulticastMdnsQuery() {
m := &dns.Msg{}
m.Question = []dns.Question{
dns.Question{Name: questionString, Qtype: dns.TypeSRV, Qclass: dns.ClassINET},
dns.Question{Name: questionString, Qtype: dns.TypeTXT, Qclass: dns.ClassINET},
{Name: questionString, Qtype: dns.TypeSRV, Qclass: dns.ClassINET},
{Name: questionString, Qtype: dns.TypeTXT, Qclass: dns.ClassINET},
}
m.RecursionDesired = false

Expand All @@ -36,7 +36,7 @@ func MulticastMdnsQuery() {
log.Debug("Sent mDNS Multicast Query")
}

func MulticastMdnsListen(msgChan chan *dns.Msg) {
func MulticastMdnsListen(msgChan chan *dns.Msg, closeChan chan bool) {
addr, err := net.ResolveUDPAddr("udp", mdnsAddressString)
if err != nil {
log.Errorf("Address resolve failed: %s\n", err)
Expand All @@ -47,19 +47,51 @@ func MulticastMdnsListen(msgChan chan *dns.Msg) {
log.Errorf("Listen multicast failed")
return
}
// probably not needed but defer so it closes no matter what
defer co.Close()

// make query after open
// make query after open to start messages coming in
MulticastMdnsQuery()

for {
msg, err := co.ReadMsg()
if err != nil {
log.Debugf("Reading mDNS message: %s", err)
continue
// keep running after error?
keeprunning := true

// pipe messages to internal stop/start switch
internalChan := make(chan *dns.Msg)
go func() {
for {
msg, err := co.ReadMsg()
if err != nil {
if keeprunning {
log.Errorf("Reading mDNS message: %s", err)
continue
} else {
break
}
}
internalChan <- msg
}
if msgChan != nil && msg != nil {
msgChan <- msg
close(internalChan)
log.Debugf("Shutdown mDNS connection")
}()

// loop that decides to read/forward messages or close listener
for {
select {
case <- closeChan:
keeprunning = false
err := co.Close()
if err != nil {
log.Errorf("Could not close mDNS listener")
}
close(msgChan)
closeChan <- true
log.Debugf("Closed mDNS listener")
return
case msg := <- internalChan:
if msgChan != nil && msg != nil {
msgChan <- msg
}
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions engine/mdns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package engine

import (
"context"
"github.com/sirupsen/logrus"
"testing"
"time"

Expand All @@ -15,11 +16,12 @@ func TestMdns(t *testing.T) {
defer cancel()

msgChan := make(chan *dns.Msg)
go MulticastMdnsListen(msgChan)
go MulticastMdnsListen(msgChan, make(chan bool))
counter := 0
go func() {
MulticastMdnsQuery()
for _ = range msgChan {
for msg := range msgChan {
logrus.Infof("Got DNS message: %v", msg.String())
counter++
}
}()
Expand Down
2 changes: 1 addition & 1 deletion engine/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (metric *Metric) Inc(value int64) *Metric {
}

func (metric *Metric) RecordSample(value int64) *Metric {
metric.Inc(value)
metric.Count = metric.Count + value
metric.Records = metric.Records + 1
metric.Avg = metric.Count / metric.Records
return metric
Expand Down
60 changes: 60 additions & 0 deletions engine/migrations/000005_add_service_time_ms.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
-- nuke buffer and remake
DROP TABLE buffer;
CREATE TABLE buffer (
Id INTEGER PRIMARY KEY,
Address TEXT DEFAULT '',
Consumer TEXT DEFAULT '',
ClientName TEXT DEFAULT '',
RequestDomain TEXT DEFAULT '',
RequestType TEXT DEFAULT '',
ResponseText TEXT DEFAULT '',
Cached BOOLEAN DEFAULT false,
Blocked BOOLEAN DEFAULT false,
Match INT DEFAULT 0,
MatchList TEXT DEFAULT '',
MatchListShort TEXT DEFAULT '',
MatchRule TEXT DEFAULT '',
Rcode TEXT DEFAULT '',
Created DATETIME,
StartTime DATETIME,
EndTime DATETIME
);

-- move old qlog table
ALTER TABLE qlog RENAME TO _qlog_old;

-- create qlog schema with indexes for long-term storage/use
CREATE TABLE qlog (
Id INTEGER PRIMARY KEY,
Address TEXT DEFAULT '',
Consumer TEXT DEFAULT '',
ClientName TEXT DEFAULT '',
RequestDomain TEXT DEFAULT '',
RequestType TEXT DEFAULT '',
ResponseText TEXT DEFAULT '',
Cached BOOLEAN DEFAULT false,
Blocked BOOLEAN DEFAULT false,
Match INT DEFAULT 0,
MatchList TEXT DEFAULT '',
MatchListShort TEXT DEFAULT '',
MatchRule TEXT DEFAULT '',
Rcode TEXT DEFAULT '',
Created DATETIME,
StartTime DATETIME,
EndTime DATETIME
);

-- create qlog index columns
CREATE INDEX idx_qlog_Address ON qlog (Address);
CREATE INDEX idx_qlog_RequestDomain ON qlog (RequestDomain);
CREATE INDEX idx_qlog_Match ON qlog (Match);
CREATE INDEX idx_qlog_Created ON qlog (Created);
CREATE INDEX idx_qlog_Cached ON qlog (Cached);

-- move records
INSERT INTO qlog (Address, Consumer, ClientName, RequestDomain, RequestType, ResponseText, Cached, Blocked, Match, MatchList, MatchListShort, MatchRule, Rcode, Created, StartTime, EndTime)
SELECT Address, Consumer, ClientName, RequestDomain, RequestType, ResponseText, Cached, Blocked, Match, MatchList, MatchListShort, MatchRule, Rcode, Created, StartTime, EndTime
FROM _qlog_old;

-- drop old table
DROP TABLE _qlog_old;
7 changes: 7 additions & 0 deletions engine/migrations/000005_add_service_time_ms.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- add service time (ms) to buffer
ALTER TABLE buffer ADD COLUMN ServiceTime INTEGER DEFAULT 0;
UPDATE buffer SET ServiceTime = 0 WHERE ServiceTime = null;

-- add service time (ms) to qlog
ALTER TABLE qlog ADD COLUMN ServiceTime INTEGER DEFAULT 0;
UPDATE qlog SET ServiceTime = 0 WHERE ServiceTime = null;
34 changes: 19 additions & 15 deletions engine/qlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (

// lit of valid sort names (lower case for ease of use with util.StringIn)
var validSorts = []string{"address", "connectiontype", "requestdomain", "requesttype", "blocked", "blockedlist", "blockedrule", "created"}
const bufferFlushStmt = "INSERT INTO qlog (Address, Consumer, ClientName, RequestDomain, RequestType, ResponseText, Rcode, Cached, Blocked, Match, MatchList, MatchRule, ServiceTime, Created, EndTime) SELECT Address, Consumer, ClientName, RequestDomain, RequestType, ResponseText, Rcode, Cached, Blocked, Match, MatchList, MatchRule, ServiceTime, Created, EndTime FROM buffer WHERE true"

// allows a dependency injection-way of defining a reverse lookup function, takes a string address (should be an IP) and returns a string that contains the domain name result
type ReverseLookupFunction = func(addres string) string
type ReverseLookupFunction = func(address string) string

// the type that is used to make queries against the
// query log (should be used by the web interface to
Expand Down Expand Up @@ -92,7 +93,10 @@ func NewQueryLog(conf *config.GudgeonConfig, db *sql.DB) (QueryLog, error) {
// create destination and writer
dirpart := path.Dir(qlConf.File)
if _, err := os.Stat(dirpart); os.IsNotExist(err) {
os.MkdirAll(dirpart, os.ModePerm)
err = os.MkdirAll(dirpart, os.ModePerm)
if err != nil {
log.Errorf("While creating path for query log output file: %s", err)
}
}

// attempt to open file
Expand Down Expand Up @@ -131,7 +135,7 @@ func (qlog *qlog) prune(tx *sql.Tx) {
}

func (qlog *qlog) flush(tx *sql.Tx) {
_, err := tx.Exec("INSERT INTO qlog (Address, Consumer, ClientName, RequestDomain, RequestType, ResponseText, Rcode, Cached, Blocked, Match, MatchList, MatchRule, Created) SELECT Address, Consumer, ClientName, RequestDomain, RequestType, ResponseText, Rcode, Cached, Blocked, Match, MatchList, MatchRule, Created FROM buffer WHERE true")
_, err := tx.Exec(bufferFlushStmt)
if err != nil {
log.Errorf("Could not flush query log data: %s", err)
return
Expand Down Expand Up @@ -176,7 +180,7 @@ func (qlog *qlog) log(info *InfoRecord) {
builder.WriteString("|")
builder.WriteString(info.RequestType)
builder.WriteString("]->")
if qlog.fileLogger != nil {
if fields != nil {
fields["address"] = info.Address
fields["protocol"] = rCon.Protocol
fields["consumer"] = info.Consumer
Expand All @@ -202,28 +206,28 @@ func (qlog *qlog) log(info *InfoRecord) {
builder.WriteString("BLOCKED")
} else if result.Match == rule.MatchBlock {
builder.WriteString("RULE BLOCKED")
if qlog.fileLogger != nil {
if fields != nil {
fields["match"] = result.Match
fields["matchType"] = "BLOCKED"
}
if result.MatchList != nil {
builder.WriteString("[")
builder.WriteString(result.MatchList.CanonicalName())
if qlog.fileLogger != nil {
if fields != nil {
fields["matchList"] = result.MatchList.CanonicalName()
}
if result.MatchRule != "" {
builder.WriteString("|")
builder.WriteString(result.MatchRule)
if qlog.fileLogger != nil {
if fields != nil {
fields["matchRule"] = result.MatchRule
}
}
builder.WriteString("]")
}
} else {
if result.Match == rule.MatchAllow {
if qlog.fileLogger != nil {
if fields != nil {
fields["match"] = result.Match
fields["matchType"] = "ALLOWED"
}
Expand All @@ -232,7 +236,7 @@ func (qlog *qlog) log(info *InfoRecord) {
builder.WriteString("c:[")
builder.WriteString(result.Resolver)
builder.WriteString("]")
if qlog.fileLogger != nil {
if fields != nil {
fields["resolver"] = result.Resolver
fields["cached"] = "true"
}
Expand All @@ -244,7 +248,7 @@ func (qlog *qlog) log(info *InfoRecord) {
builder.WriteString("s:[")
builder.WriteString(result.Source)
builder.WriteString("]")
if qlog.fileLogger != nil {
if fields != nil {
fields["resolver"] = result.Resolver
fields["source"] = result.Source
}
Expand All @@ -256,21 +260,21 @@ func (qlog *qlog) log(info *InfoRecord) {
answerValues := util.GetAnswerValues(response)
if len(answerValues) > 0 {
builder.WriteString(answerValues[0])
if qlog.fileLogger != nil {
if fields != nil {
fields["answer"] = answerValues[0]
}
if len(answerValues) > 1 {
builder.WriteString(fmt.Sprintf(" (+%d)", len(answerValues)-1))
}
} else {
builder.WriteString("(EMPTY RESPONSE)")
if qlog.fileLogger != nil {
if fields != nil {
fields["answer"] = "<< EMPTY >>"
}
}
} else {
builder.WriteString("(NO INFO RESPONSE)")
if qlog.fileLogger != nil {
if fields != nil {
fields["answer"] = "<< NONE >>"
}
}
Expand All @@ -296,7 +300,7 @@ func (qlog *qlog) query(query *QueryLogQuery, accumulator queryAccumulator) {
}

// select entries from qlog
selectStmt := "SELECT Address, ClientName, Consumer, RequestDomain, RequestType, ResponseText, Rcode, Blocked, Match, MatchList, MatchRule, Cached, Created FROM qlog"
selectStmt := "SELECT Address, ClientName, Consumer, RequestDomain, RequestType, ResponseText, Rcode, Blocked, Match, MatchList, MatchRule, Cached, ServiceTime, Created, EndTime FROM qlog"
countStmt := "SELECT COUNT(*) FROM qlog"

// so we can dynamically build the where clause
Expand Down Expand Up @@ -436,7 +440,7 @@ func (qlog *qlog) query(query *QueryLogQuery, accumulator queryAccumulator) {
var info *InfoRecord
for rows.Next() {
info = &InfoRecord{}
err = rows.Scan(&info.Address, &info.ClientName, &info.Consumer, &info.RequestDomain, &info.RequestType, &info.ResponseText, &info.Rcode, &info.Blocked, &info.Match, &info.MatchList, &info.MatchRule, &info.Cached, &info.Created)
err = rows.Scan(&info.Address, &info.ClientName, &info.Consumer, &info.RequestDomain, &info.RequestType, &info.ResponseText, &info.Rcode, &info.Blocked, &info.Match, &info.MatchList, &info.MatchRule, &info.Cached, &info.ServiceMilliseconds, &info.Created, &info.Finished)
if err != nil {
log.Errorf("Scanning qlog results: %s", err)
continue
Expand Down
Loading

0 comments on commit c8e6c9c

Please sign in to comment.