-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
x-pack/filebeat/input/gcs: add support for CSV decoding (#40979)
The test file txn.csv.gz was obtained from https://netskopepartnerlogfilebucket.s3.amazonaws.com/txn-1722875066329034-fe10b6a23cc643c4b282e6190de2352d.csv.gz
- Loading branch information
Showing
14 changed files
with
1,224 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package gcs | ||
|
||
import ( | ||
"fmt" | ||
"io" | ||
) | ||
|
||
// decoder is an interface for decoding data from an io.Reader. | ||
type decoder interface { | ||
// decode reads and decodes data from an io reader based on the codec type. | ||
// It returns the decoded data and an error if the data cannot be decoded. | ||
decode() ([]byte, error) | ||
// next advances the decoder to the next data item and returns true if there is more data to be decoded. | ||
next() bool | ||
// close closes the decoder and releases any resources associated with it. | ||
// It returns an error if the decoder cannot be closed. | ||
|
||
// more returns whether there are more records to read. | ||
more() bool | ||
|
||
close() error | ||
} | ||
|
||
// valueDecoder is a decoder that can decode directly to a JSON serialisable value. | ||
type valueDecoder interface { | ||
decoder | ||
|
||
decodeValue() ([]byte, map[string]any, error) | ||
} | ||
|
||
// newDecoder creates a new decoder based on the codec type. | ||
// It returns a decoder type and an error if the codec type is not supported. | ||
// If the reader config codec option is not set, it returns a nil decoder and nil error. | ||
func newDecoder(cfg decoderConfig, r io.Reader) (decoder, error) { | ||
switch { | ||
case cfg.Codec == nil: | ||
return nil, nil | ||
case cfg.Codec.CSV != nil: | ||
return newCSVDecoder(cfg, r) | ||
default: | ||
return nil, fmt.Errorf("unsupported config value: %v", cfg) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package gcs | ||
|
||
import ( | ||
"fmt" | ||
"unicode/utf8" | ||
) | ||
|
||
// decoderConfig contains the configuration options for instantiating a decoder. | ||
type decoderConfig struct { | ||
Codec *codecConfig `config:"codec"` | ||
} | ||
|
||
// codecConfig contains the configuration options for different codecs used by a decoder. | ||
type codecConfig struct { | ||
CSV *csvCodecConfig `config:"csv"` | ||
} | ||
|
||
// csvCodecConfig contains the configuration options for the CSV codec. | ||
type csvCodecConfig struct { | ||
Enabled bool `config:"enabled"` | ||
|
||
// Fields is the set of field names. If it is present | ||
// it is used to specify the object names of returned | ||
// values and the FieldsPerRecord field in the csv.Reader. | ||
// Otherwise, names are obtained from the first | ||
// line of the CSV data. | ||
Fields []string `config:"fields_names"` | ||
|
||
// The fields below have the same meaning as the | ||
// fields of the same name in csv.Reader. | ||
Comma *configRune `config:"comma"` | ||
Comment configRune `config:"comment"` | ||
LazyQuotes bool `config:"lazy_quotes"` | ||
TrimLeadingSpace bool `config:"trim_leading_space"` | ||
} | ||
|
||
type configRune rune | ||
|
||
func (r *configRune) Unpack(s string) error { | ||
if s == "" { | ||
return nil | ||
} | ||
n := utf8.RuneCountInString(s) | ||
if n != 1 { | ||
return fmt.Errorf("single character option given more than one character: %q", s) | ||
} | ||
_r, _ := utf8.DecodeRuneInString(s) | ||
*r = configRune(_r) | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package gcs | ||
|
||
import ( | ||
"bytes" | ||
"encoding/csv" | ||
"fmt" | ||
"io" | ||
"slices" | ||
) | ||
|
||
// csvDecoder is a decoder for CSV data. | ||
type csvDecoder struct { | ||
r *csv.Reader | ||
|
||
header []string | ||
current []string | ||
coming []string | ||
|
||
err error | ||
} | ||
|
||
// newCSVDecoder creates a new CSV decoder. | ||
func newCSVDecoder(config decoderConfig, r io.Reader) (decoder, error) { | ||
d := csvDecoder{r: csv.NewReader(r)} | ||
d.r.ReuseRecord = true | ||
if config.Codec.CSV.Comma != nil { | ||
d.r.Comma = rune(*config.Codec.CSV.Comma) | ||
} | ||
d.r.Comment = rune(config.Codec.CSV.Comment) | ||
d.r.LazyQuotes = config.Codec.CSV.LazyQuotes | ||
d.r.TrimLeadingSpace = config.Codec.CSV.TrimLeadingSpace | ||
if len(config.Codec.CSV.Fields) != 0 { | ||
d.r.FieldsPerRecord = len(config.Codec.CSV.Fields) | ||
d.header = config.Codec.CSV.Fields | ||
} else { | ||
h, err := d.r.Read() | ||
if err != nil { | ||
return nil, err | ||
} | ||
d.header = slices.Clone(h) | ||
} | ||
var err error | ||
d.coming, err = d.r.Read() | ||
if err != nil { | ||
return nil, err | ||
} | ||
d.current = make([]string, 0, len(d.header)) | ||
return &d, nil | ||
} | ||
|
||
func (d *csvDecoder) more() bool { return len(d.coming) == len(d.header) } | ||
|
||
// next advances the decoder to the next data item and returns true if | ||
// there is more data to be decoded. | ||
func (d *csvDecoder) next() bool { | ||
if !d.more() && d.err != nil { | ||
return false | ||
} | ||
d.current = d.current[:len(d.header)] | ||
copy(d.current, d.coming) | ||
d.coming, d.err = d.r.Read() | ||
if d.err == io.EOF { | ||
d.coming = nil | ||
} | ||
return true | ||
} | ||
|
||
// decode returns the JSON encoded value of the current CSV line. next must | ||
// have been called before any calls to decode. | ||
func (d *csvDecoder) decode() ([]byte, error) { | ||
err := d.check() | ||
if err != nil { | ||
return nil, err | ||
} | ||
var buf bytes.Buffer | ||
buf.WriteByte('{') | ||
for i, n := range d.header { | ||
if i != 0 { | ||
buf.WriteByte(',') | ||
} | ||
buf.WriteByte('"') | ||
buf.WriteString(n) | ||
buf.WriteString(`":"`) | ||
buf.WriteString(d.current[i]) | ||
buf.WriteByte('"') | ||
} | ||
buf.WriteByte('}') | ||
d.current = d.current[:0] | ||
return buf.Bytes(), nil | ||
} | ||
|
||
// decodeValue returns the value of the current CSV line interpreted as | ||
// an object with fields based on the header held by the receiver. next must | ||
// have been called before any calls to decode. | ||
func (d *csvDecoder) decodeValue() ([]byte, map[string]any, error) { | ||
err := d.check() | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
m := make(map[string]any, len(d.header)) | ||
for i, n := range d.header { | ||
m[n] = d.current[i] | ||
} | ||
d.current = d.current[:0] | ||
b, err := d.decode() | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
return b, m, nil | ||
} | ||
|
||
func (d *csvDecoder) check() error { | ||
if d.err != nil { | ||
if d.err == io.EOF && d.coming == nil { | ||
return nil | ||
} | ||
return d.err | ||
} | ||
if len(d.current) == 0 { | ||
return fmt.Errorf("decode called before next") | ||
} | ||
// By the time we are here, current must be the same | ||
// length as header; if it was not read, it would be | ||
// zero, but if it was, it must match by the contract | ||
// of the csv.Reader. | ||
return nil | ||
} | ||
|
||
// close closes the csv decoder and releases the resources. | ||
func (d *csvDecoder) close() error { | ||
if d.err == io.EOF { | ||
return nil | ||
} | ||
return d.err | ||
} |
Oops, something went wrong.