From 1b451fb1261d988dae04f98416b4f6c51ecdb1d5 Mon Sep 17 00:00:00 2001 From: David Reimschussel Date: Thu, 30 Jul 2020 14:58:23 -0600 Subject: [PATCH 1/2] Fix tail following on EOF. Add test. --- plugins/common/encoding/decoder.go | 10 +- plugins/common/encoding/decoder_reader.go | 171 ++++++++++++++++++++++ plugins/inputs/tail/tail_test.go | 45 ++++++ 3 files changed, 220 insertions(+), 6 deletions(-) create mode 100644 plugins/common/encoding/decoder_reader.go diff --git a/plugins/common/encoding/decoder.go b/plugins/common/encoding/decoder.go index d1c282d2c9b87..8bc3b7f92fe1e 100644 --- a/plugins/common/encoding/decoder.go +++ b/plugins/common/encoding/decoder.go @@ -7,8 +7,6 @@ import ( "golang.org/x/text/encoding/unicode" ) -type Decoder = encoding.Decoder - // NewDecoder returns a x/text Decoder for the specified text encoding. The // Decoder converts a character encoding into utf-8 bytes. If a BOM is found // it will be converted into a utf-8 BOM, you can use @@ -24,13 +22,13 @@ type Decoder = encoding.Decoder func NewDecoder(enc string) (*Decoder, error) { switch enc { case "utf-8": - return unicode.UTF8.NewDecoder(), nil + return &Decoder{Transformer: unicode.UTF8.NewDecoder()}, nil case "utf-16le": - return unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewDecoder(), nil + return newDecoder(unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewDecoder()), nil case "utf-16be": - return unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM).NewDecoder(), nil + return newDecoder(unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM).NewDecoder()), nil case "none", "": - return encoding.Nop.NewDecoder(), nil + return newDecoder(encoding.Nop.NewDecoder()), nil } return nil, errors.New("unknown character encoding") } diff --git a/plugins/common/encoding/decoder_reader.go b/plugins/common/encoding/decoder_reader.go new file mode 100644 index 0000000000000..7324c8e72e883 --- /dev/null +++ b/plugins/common/encoding/decoder_reader.go @@ -0,0 +1,171 @@ +package encoding + +import ( + "errors" + "io" + + "golang.org/x/text/transform" +) + +// Other than resetting r.err and r.transformComplete in Read() this +// was copied from x/text + +func newDecoder(t transform.Transformer) *Decoder { + return &Decoder{Transformer: t} +} + +// A Decoder converts bytes to UTF-8. It implements transform.Transformer. +// +// Transforming source bytes that are not of that encoding will not result in an +// error per se. Each byte that cannot be transcoded will be represented in the +// output by the UTF-8 encoding of '\uFFFD', the replacement rune. +type Decoder struct { + transform.Transformer + + // This forces external creators of Decoders to use names in struct + // initializers, allowing for future extendibility without having to break + // code. + _ struct{} +} + +// Bytes converts the given encoded bytes to UTF-8. It returns the converted +// bytes or nil, err if any error occurred. +func (d *Decoder) Bytes(b []byte) ([]byte, error) { + b, _, err := transform.Bytes(d, b) + if err != nil { + return nil, err + } + return b, nil +} + +// String converts the given encoded string to UTF-8. It returns the converted +// string or "", err if any error occurred. +func (d *Decoder) String(s string) (string, error) { + s, _, err := transform.String(d, s) + if err != nil { + return "", err + } + return s, nil +} + +// Reader wraps another Reader to decode its bytes. +// +// The Decoder may not be used for any other operation as long as the returned +// Reader is in use. +func (d *Decoder) Reader(r io.Reader) io.Reader { + return NewReader(r, d) +} + +// Reader wraps another io.Reader by transforming the bytes read. +type Reader struct { + r io.Reader + t transform.Transformer + err error + + // dst[dst0:dst1] contains bytes that have been transformed by t but + // not yet copied out via Read. + dst []byte + dst0, dst1 int + + // src[src0:src1] contains bytes that have been read from r but not + // yet transformed through t. + src []byte + src0, src1 int + + // transformComplete is whether the transformation is complete, + // regardless of whether or not it was successful. + transformComplete bool +} + +var ( + // ErrShortDst means that the destination buffer was too short to + // receive all of the transformed bytes. + ErrShortDst = errors.New("transform: short destination buffer") + + // ErrShortSrc means that the source buffer has insufficient data to + // complete the transformation. + ErrShortSrc = errors.New("transform: short source buffer") + + // errInconsistentByteCount means that Transform returned success (nil + // error) but also returned nSrc inconsistent with the src argument. + errInconsistentByteCount = errors.New("transform: inconsistent byte count returned") +) + +const defaultBufSize = 4096 + +// NewReader returns a new Reader that wraps r by transforming the bytes read +// via t. It calls Reset on t. +func NewReader(r io.Reader, t transform.Transformer) *Reader { + t.Reset() + return &Reader{ + r: r, + t: t, + dst: make([]byte, defaultBufSize), + src: make([]byte, defaultBufSize), + } +} + +// Read implements the io.Reader interface. +func (r *Reader) Read(p []byte) (int, error) { + // Clear previous errors so a Read can be performed even if the last call + // returned EOF. + r.err = nil + r.transformComplete = false + + n, err := 0, error(nil) + for { + // Copy out any transformed bytes and return the final error if we are done. + if r.dst0 != r.dst1 { + n = copy(p, r.dst[r.dst0:r.dst1]) + r.dst0 += n + if r.dst0 == r.dst1 && r.transformComplete { + return n, r.err + } + return n, nil + } else if r.transformComplete { + return 0, r.err + } + + // Try to transform some source bytes, or to flush the transformer if we + // are out of source bytes. We do this even if r.r.Read returned an error. + // As the io.Reader documentation says, "process the n > 0 bytes returned + // before considering the error". + if r.src0 != r.src1 || r.err != nil { + r.dst0 = 0 + r.dst1, n, err = r.t.Transform(r.dst, r.src[r.src0:r.src1], r.err == io.EOF) + r.src0 += n + + switch { + case err == nil: + if r.src0 != r.src1 { + r.err = errInconsistentByteCount + } + // The Transform call was successful; we are complete if we + // cannot read more bytes into src. + r.transformComplete = r.err != nil + continue + case err == ErrShortDst && (r.dst1 != 0 || n != 0): + // Make room in dst by copying out, and try again. + continue + case err == ErrShortSrc && r.src1-r.src0 != len(r.src) && r.err == nil: + // Read more bytes into src via the code below, and try again. + default: + r.transformComplete = true + // The reader error (r.err) takes precedence over the + // transformer error (err) unless r.err is nil or io.EOF. + if r.err == nil || r.err == io.EOF { + r.err = err + } + continue + } + } + + // Move any untransformed source bytes to the start of the buffer + // and read more bytes. + if r.src0 != 0 { + r.src0, r.src1 = 0, copy(r.src, r.src[r.src0:r.src1]) + } + n, r.err = r.r.Read(r.src[r.src1:]) + r.src1 += n + } +} diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index a8eae196ce588..0e13d214130be 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -346,3 +346,48 @@ func TestCharacterEncoding(t *testing.T) { }) } } + +func TestTailEOF(t *testing.T) { + tmpfile, err := ioutil.TempFile("", "") + require.NoError(t, err) + defer os.Remove(tmpfile.Name()) + _, err = tmpfile.WriteString("cpu usage_idle=100\r\n") + require.NoError(t, err) + err = tmpfile.Sync() + require.NoError(t, err) + + tt := NewTail() + tt.Log = testutil.Logger{} + tt.FromBeginning = true + tt.Files = []string{tmpfile.Name()} + //tt.WatchMethod = "inotify" + tt.SetParserFunc(parsers.NewInfluxParser) + + err = tt.Init() + require.NoError(t, err) + + acc := testutil.Accumulator{} + require.NoError(t, tt.Start(&acc)) + defer tt.Stop() + require.NoError(t, acc.GatherError(tt.Gather)) + acc.Wait(1) // input hits eof + + _, err = tmpfile.WriteString("cpu2 usage_idle=200\r\n") + require.NoError(t, err) + err = tmpfile.Sync() + require.NoError(t, err) + + acc.Wait(2) + require.NoError(t, acc.GatherError(tt.Gather)) + acc.AssertContainsFields(t, "cpu", + map[string]interface{}{ + "usage_idle": float64(100), + }) + acc.AssertContainsFields(t, "cpu2", + map[string]interface{}{ + "usage_idle": float64(200), + }) + + err = tmpfile.Close() + require.NoError(t, err) +} From 7e567913e184b8038c5a0ba922e16b0e97fd501f Mon Sep 17 00:00:00 2001 From: David Reimschussel Date: Thu, 30 Jul 2020 15:22:08 -0600 Subject: [PATCH 2/2] remove stray comment --- plugins/inputs/tail/tail_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index 0e13d214130be..38a7f22780a52 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -360,7 +360,6 @@ func TestTailEOF(t *testing.T) { tt.Log = testutil.Logger{} tt.FromBeginning = true tt.Files = []string{tmpfile.Name()} - //tt.WatchMethod = "inotify" tt.SetParserFunc(parsers.NewInfluxParser) err = tt.Init()