Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Logparser Plugin Check For New Files #2141

Merged
merged 3 commits into from
Feb 1, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions plugins/inputs/logparser/logparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type LogParserPlugin struct {
Files []string
FromBeginning bool

tailers []*tail.Tail
tailers map[string]*tail.Tail
lines chan string
done chan struct{}
wg sync.WaitGroup
Expand All @@ -46,7 +46,9 @@ const sampleConfig = `
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/apache/access.log"]
## Read file from beginning.
## Read files that currently exist from the beginning. Files that are created
## while telegraf is running (and that match the "files" globs) will always
## be read from the beginning.
from_beginning = false

## Parse logstash-style "grok" patterns:
Expand Down Expand Up @@ -77,7 +79,11 @@ func (l *LogParserPlugin) Description() string {
}

func (l *LogParserPlugin) Gather(acc telegraf.Accumulator) error {
return nil
l.Lock()
defer l.Unlock()

// always start from the beginning of files that appear while we're running
return l.tailNewfiles(true)
}

func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
Expand All @@ -87,6 +93,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
l.acc = acc
l.lines = make(chan string, 1000)
l.done = make(chan struct{})
l.tailers = make(map[string]*tail.Tail)

// Looks for fields which implement LogParser interface
l.parsers = []LogParser{}
Expand Down Expand Up @@ -121,14 +128,22 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
return err
}

l.wg.Add(1)
go l.parser()

return l.tailNewfiles(l.FromBeginning)
}

// check the globs against files on disk, and start tailing any new files.
// Assumes l's lock is held!
func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
var seek tail.SeekInfo
if !l.FromBeginning {
if !fromBeginning {
seek.Whence = 2
seek.Offset = 0
}

l.wg.Add(1)
go l.parser()
errChan := errchan.New(len(l.Files))

// Create a "tailer" for each file
for _, filepath := range l.Files {
Expand All @@ -139,7 +154,13 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
}
files := g.Match()
errChan = errchan.New(len(files))

for file, _ := range files {
if _, ok := l.tailers[file]; ok {
// we're already tailing this file
continue
}

tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Expand All @@ -152,7 +173,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
// create a goroutine for each "tailer"
l.wg.Add(1)
go l.receiver(tailer)
l.tailers = append(l.tailers, tailer)
l.tailers[file] = tailer
}
}

Expand All @@ -166,6 +187,7 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) {

var line *tail.Line
for line = range tailer.Lines {

if line.Err != nil {
log.Printf("E! Error tailing file %s, Error: %s\n",
tailer.Filename, line.Err)
Expand Down
43 changes: 43 additions & 0 deletions plugins/inputs/logparser/logparser_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package logparser

import (
"io/ioutil"
"os"
"runtime"
"strings"
"testing"
Expand Down Expand Up @@ -80,6 +82,47 @@ func TestGrokParseLogFiles(t *testing.T) {
map[string]string{})
}

func TestGrokParseLogFilesAppearLater(t *testing.T) {
emptydir, err := ioutil.TempDir("", "TestGrokParseLogFilesAppearLater")
defer os.RemoveAll(emptydir)
assert.NoError(t, err)

thisdir := getCurrentDir()
p := &grok.Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
}

logparser := &LogParserPlugin{
FromBeginning: true,
Files: []string{emptydir + "/*.log"},
GrokParser: p,
}

acc := testutil.Accumulator{}
assert.NoError(t, logparser.Start(&acc))

time.Sleep(time.Millisecond * 500)
assert.Equal(t, acc.NFields(), 0)

os.Symlink(
thisdir+"grok/testdata/test_a.log",
emptydir+"/test_a.log")
assert.NoError(t, logparser.Gather(&acc))
time.Sleep(time.Millisecond * 500)

logparser.Stop()

acc.AssertContainsTaggedFields(t, "logparser_grok",
map[string]interface{}{
"clientip": "192.168.1.1",
"myfloat": float64(1.25),
"response_time": int64(5432),
"myint": int64(101),
},
map[string]string{"response_code": "200"})
}

// Test that test_a.log line gets parsed even though we don't have the correct
// pattern available for test_b.log
func TestGrokParseLogFilesOneBad(t *testing.T) {
Expand Down