-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
x-pack/filebeat/input/awss3: add support for CSV decoding
The test file txn.csv.gz was obtained from https://netskopepartnerlogfilebucket.s3.amazonaws.com/txn-1722875066329034-fe10b6a23cc643c4b282e6190de2352d.csv.gz
- Loading branch information
Showing
11 changed files
with
1,041 additions
and
72 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package awss3 | ||
|
||
import ( | ||
"encoding/csv" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"slices" | ||
) | ||
|
||
// csvDecoder is a decoder for CSV data. | ||
type csvDecoder struct { | ||
r *csv.Reader | ||
|
||
header []string | ||
current []string | ||
|
||
err error | ||
} | ||
|
||
// newParquetDecoder creates a new CSV decoder. | ||
func newCSVDecoder(config decoderConfig, r io.Reader) (decoder, error) { | ||
d := csvDecoder{r: csv.NewReader(r)} | ||
d.r.ReuseRecord = true | ||
if config.Codec.CSV.Comma != nil { | ||
d.r.Comma = rune(*config.Codec.CSV.Comma) | ||
} | ||
d.r.Comment = rune(config.Codec.CSV.Comment) | ||
d.r.LazyQuotes = config.Codec.CSV.LazyQuotes | ||
d.r.TrimLeadingSpace = config.Codec.CSV.TrimLeadingSpace | ||
if len(config.Codec.CSV.Fields) != 0 { | ||
d.r.FieldsPerRecord = len(config.Codec.CSV.Fields) | ||
d.header = config.Codec.CSV.Fields | ||
} else { | ||
h, err := d.r.Read() | ||
if err != nil { | ||
return nil, err | ||
} | ||
d.header = slices.Clone(h) | ||
} | ||
return &d, nil | ||
} | ||
|
||
// next advances the decoder to the next data item and returns true if | ||
// there is more data to be decoded. | ||
func (d *csvDecoder) next() bool { | ||
if d.err != nil { | ||
return false | ||
} | ||
d.current, d.err = d.r.Read() | ||
return d.err == nil | ||
} | ||
|
||
// decode returns the JSON encoded value of the current CSV line. next must | ||
// have been called before any calls to decode. | ||
func (d *csvDecoder) decode() ([]byte, error) { | ||
v, err := d.decodeValue() | ||
if err != nil { | ||
return nil, err | ||
} | ||
return json.Marshal(v) | ||
} | ||
|
||
// decodeValue returns the value of the current CSV line interpreted as | ||
// an object with fields based on the header held by the receiver. next must | ||
// have been called before any calls to decode. | ||
func (d *csvDecoder) decodeValue() (any, error) { | ||
if d.err != nil { | ||
return nil, d.err | ||
} | ||
if len(d.current) == 0 { | ||
return nil, fmt.Errorf("decode called before next") | ||
} | ||
m := make(map[string]string, len(d.header)) | ||
// By the time we are here, current must be the same | ||
// length as header; if it was not read, it would be | ||
// zero, but if it was, it must match by the contract | ||
// of the csv.Reader. | ||
for i, n := range d.header { | ||
m[n] = d.current[i] | ||
} | ||
return m, nil | ||
} | ||
|
||
// close closes the parquet decoder and releases the resources. | ||
func (d *csvDecoder) close() error { | ||
if d.err == io.EOF { | ||
return nil | ||
} | ||
return d.err | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package awss3 | ||
|
||
import ( | ||
"fmt" | ||
"io" | ||
|
||
"github.com/elastic/beats/v7/x-pack/libbeat/reader/parquet" | ||
) | ||
|
||
// parquetDecoder is a decoder for parquet data. | ||
type parquetDecoder struct { | ||
reader *parquet.BufferedReader | ||
} | ||
|
||
// newParquetDecoder creates a new parquet decoder. It uses the libbeat parquet reader under the hood. | ||
// It returns an error if the parquet reader cannot be created. | ||
func newParquetDecoder(config decoderConfig, r io.Reader) (decoder, error) { | ||
reader, err := parquet.NewBufferedReader(r, &parquet.Config{ | ||
ProcessParallel: config.Codec.Parquet.ProcessParallel, | ||
BatchSize: config.Codec.Parquet.BatchSize, | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to create parquet decoder: %w", err) | ||
} | ||
return &parquetDecoder{ | ||
reader: reader, | ||
}, nil | ||
} | ||
|
||
// next advances the parquet decoder to the next data item and returns true if there is more data to be decoded. | ||
func (pd *parquetDecoder) next() bool { | ||
return pd.reader.Next() | ||
} | ||
|
||
// decode reads and decodes a parquet data stream. After reading the parquet data it decodes | ||
// the output to JSON and returns it as a byte slice. It returns an error if the data cannot be decoded. | ||
func (pd *parquetDecoder) decode() ([]byte, error) { | ||
data, err := pd.reader.Record() | ||
if err != nil { | ||
return nil, err | ||
} | ||
return data, nil | ||
} | ||
|
||
// close closes the parquet decoder and releases the resources. | ||
func (pd *parquetDecoder) close() error { | ||
return pd.reader.Close() | ||
} |
Oops, something went wrong.