Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tail following on EOF #7927

Merged
merged 2 commits into from
Jul 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions plugins/common/encoding/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"golang.org/x/text/encoding/unicode"
)

type Decoder = encoding.Decoder

// NewDecoder returns a x/text Decoder for the specified text encoding. The
// Decoder converts a character encoding into utf-8 bytes. If a BOM is found
// it will be converted into a utf-8 BOM, you can use
Expand All @@ -24,13 +22,13 @@ type Decoder = encoding.Decoder
func NewDecoder(enc string) (*Decoder, error) {
switch enc {
case "utf-8":
return unicode.UTF8.NewDecoder(), nil
return &Decoder{Transformer: unicode.UTF8.NewDecoder()}, nil
case "utf-16le":
return unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewDecoder(), nil
return newDecoder(unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewDecoder()), nil
case "utf-16be":
return unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM).NewDecoder(), nil
return newDecoder(unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM).NewDecoder()), nil
case "none", "":
return encoding.Nop.NewDecoder(), nil
return newDecoder(encoding.Nop.NewDecoder()), nil
}
return nil, errors.New("unknown character encoding")
}
171 changes: 171 additions & 0 deletions plugins/common/encoding/decoder_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package encoding

import (
"errors"
"io"

"golang.org/x/text/transform"
)

// Other than resetting r.err and r.transformComplete in Read() this
// was copied from x/text

func newDecoder(t transform.Transformer) *Decoder {
return &Decoder{Transformer: t}
}

// A Decoder converts bytes to UTF-8. It implements transform.Transformer.
//
// Transforming source bytes that are not of that encoding will not result in an
// error per se. Each byte that cannot be transcoded will be represented in the
// output by the UTF-8 encoding of '\uFFFD', the replacement rune.
type Decoder struct {
transform.Transformer

// This forces external creators of Decoders to use names in struct
// initializers, allowing for future extendibility without having to break
// code.
_ struct{}
}

// Bytes converts the given encoded bytes to UTF-8. It returns the converted
// bytes or nil, err if any error occurred.
func (d *Decoder) Bytes(b []byte) ([]byte, error) {
b, _, err := transform.Bytes(d, b)
if err != nil {
return nil, err
}
return b, nil
}

// String converts the given encoded string to UTF-8. It returns the converted
// string or "", err if any error occurred.
func (d *Decoder) String(s string) (string, error) {
s, _, err := transform.String(d, s)
if err != nil {
return "", err
}
return s, nil
}

// Reader wraps another Reader to decode its bytes.
//
// The Decoder may not be used for any other operation as long as the returned
// Reader is in use.
func (d *Decoder) Reader(r io.Reader) io.Reader {
return NewReader(r, d)
}

// Reader wraps another io.Reader by transforming the bytes read.
type Reader struct {
r io.Reader
t transform.Transformer
err error

// dst[dst0:dst1] contains bytes that have been transformed by t but
// not yet copied out via Read.
dst []byte
dst0, dst1 int

// src[src0:src1] contains bytes that have been read from r but not
// yet transformed through t.
src []byte
src0, src1 int

// transformComplete is whether the transformation is complete,
// regardless of whether or not it was successful.
transformComplete bool
}

var (
// ErrShortDst means that the destination buffer was too short to
// receive all of the transformed bytes.
ErrShortDst = errors.New("transform: short destination buffer")

// ErrShortSrc means that the source buffer has insufficient data to
// complete the transformation.
ErrShortSrc = errors.New("transform: short source buffer")

// errInconsistentByteCount means that Transform returned success (nil
// error) but also returned nSrc inconsistent with the src argument.
errInconsistentByteCount = errors.New("transform: inconsistent byte count returned")
)

const defaultBufSize = 4096

// NewReader returns a new Reader that wraps r by transforming the bytes read
// via t. It calls Reset on t.
func NewReader(r io.Reader, t transform.Transformer) *Reader {
t.Reset()
return &Reader{
r: r,
t: t,
dst: make([]byte, defaultBufSize),
src: make([]byte, defaultBufSize),
}
}

// Read implements the io.Reader interface.
func (r *Reader) Read(p []byte) (int, error) {
// Clear previous errors so a Read can be performed even if the last call
// returned EOF.
r.err = nil
r.transformComplete = false

n, err := 0, error(nil)
for {
// Copy out any transformed bytes and return the final error if we are done.
if r.dst0 != r.dst1 {
n = copy(p, r.dst[r.dst0:r.dst1])
r.dst0 += n
if r.dst0 == r.dst1 && r.transformComplete {
return n, r.err
}
return n, nil
} else if r.transformComplete {
return 0, r.err
}

// Try to transform some source bytes, or to flush the transformer if we
// are out of source bytes. We do this even if r.r.Read returned an error.
// As the io.Reader documentation says, "process the n > 0 bytes returned
// before considering the error".
if r.src0 != r.src1 || r.err != nil {
r.dst0 = 0
r.dst1, n, err = r.t.Transform(r.dst, r.src[r.src0:r.src1], r.err == io.EOF)
r.src0 += n

switch {
case err == nil:
if r.src0 != r.src1 {
r.err = errInconsistentByteCount
}
// The Transform call was successful; we are complete if we
// cannot read more bytes into src.
r.transformComplete = r.err != nil
continue
case err == ErrShortDst && (r.dst1 != 0 || n != 0):
// Make room in dst by copying out, and try again.
continue
case err == ErrShortSrc && r.src1-r.src0 != len(r.src) && r.err == nil:
// Read more bytes into src via the code below, and try again.
default:
r.transformComplete = true
// The reader error (r.err) takes precedence over the
// transformer error (err) unless r.err is nil or io.EOF.
if r.err == nil || r.err == io.EOF {
r.err = err
}
continue
}
}

// Move any untransformed source bytes to the start of the buffer
// and read more bytes.
if r.src0 != 0 {
r.src0, r.src1 = 0, copy(r.src, r.src[r.src0:r.src1])
}
n, r.err = r.r.Read(r.src[r.src1:])
r.src1 += n
}
}
44 changes: 44 additions & 0 deletions plugins/inputs/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,47 @@ func TestCharacterEncoding(t *testing.T) {
})
}
}

func TestTailEOF(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
_, err = tmpfile.WriteString("cpu usage_idle=100\r\n")
require.NoError(t, err)
err = tmpfile.Sync()
require.NoError(t, err)

tt := NewTail()
tt.Log = testutil.Logger{}
tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()}
tt.SetParserFunc(parsers.NewInfluxParser)

err = tt.Init()
require.NoError(t, err)

acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc))
defer tt.Stop()
require.NoError(t, acc.GatherError(tt.Gather))
acc.Wait(1) // input hits eof

_, err = tmpfile.WriteString("cpu2 usage_idle=200\r\n")
require.NoError(t, err)
err = tmpfile.Sync()
require.NoError(t, err)

acc.Wait(2)
require.NoError(t, acc.GatherError(tt.Gather))
acc.AssertContainsFields(t, "cpu",
map[string]interface{}{
"usage_idle": float64(100),
})
acc.AssertContainsFields(t, "cpu2",
map[string]interface{}{
"usage_idle": float64(200),
})

err = tmpfile.Close()
require.NoError(t, err)
}