diff --git a/cmd/relayproxy/config/config.go b/cmd/relayproxy/config/config.go index 491f2015cf3..a6186464470 100644 --- a/cmd/relayproxy/config/config.go +++ b/cmd/relayproxy/config/config.go @@ -2,6 +2,12 @@ package config import ( "fmt" + "net/http" + "os" + "path/filepath" + "strings" + "time" + "github.com/knadh/koanf/parsers/json" "github.com/knadh/koanf/parsers/toml" "github.com/knadh/koanf/parsers/yaml" @@ -10,13 +16,8 @@ import ( "github.com/knadh/koanf/providers/file" "github.com/knadh/koanf/providers/posflag" "github.com/knadh/koanf/v2" - "net/http" - "os" - "path/filepath" - "strings" - "time" - "github.com/spf13/pflag" + "github.com/xitongsys/parquet-go/parquet" "go.uber.org/zap" ) @@ -32,20 +33,22 @@ var DefaultRetriever = struct { } var DefaultExporter = struct { - Format string - LogFormat string - FileName string - CsvFormat string - FlushInterval time.Duration - MaxEventInMemory int64 + Format string + LogFormat string + FileName string + CsvFormat string + FlushInterval time.Duration + MaxEventInMemory int64 + ParquetCompressionCodec string }{ Format: "JSON", LogFormat: "[{{ .FormattedDate}}] user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", value=\"{{ .Value}}\"", FileName: "flag-variation-{{ .Hostname}}-{{ .Timestamp}}.{{ .Format}}", CsvFormat: "{ .Kind}};{{ .ContextKind}};{{ .UserKey}};{{ .CreationDate}};{{ .Key}};{{ .Variation}};" + "{{ .Value}};{{ .Default}}\\n", - FlushInterval: 60000 * time.Millisecond, - MaxEventInMemory: 100000, + FlushInterval: 60000 * time.Millisecond, + MaxEventInMemory: 100000, + ParquetCompressionCodec: parquet.CompressionCodec_SNAPPY.String(), } // New is reading the configuration file diff --git a/cmd/relayproxy/config/exporter.go b/cmd/relayproxy/config/exporter.go index 29b79e3e0b3..eee506f3b55 100644 --- a/cmd/relayproxy/config/exporter.go +++ b/cmd/relayproxy/config/exporter.go @@ -1,22 +1,27 @@ package config -import "fmt" +import ( + "fmt" + + "github.com/xitongsys/parquet-go/parquet" +) // ExporterConf contains all the field to configure an exporter type ExporterConf struct { - Kind ExporterKind `mapstructure:"kind" koanf:"kind"` - OutputDir string `mapstructure:"outputDir" koanf:"outputdir"` - Format string `mapstructure:"format" koanf:"format"` - Filename string `mapstructure:"filename" koanf:"filename"` - CsvTemplate string `mapstructure:"csvTemplate" koanf:"csvtemplate"` - Bucket string `mapstructure:"bucket" koanf:"bucket"` - Path string `mapstructure:"path" koanf:"path"` - EndpointURL string `mapstructure:"endpointUrl" koanf:"endpointurl"` - Secret string `mapstructure:"secret" koanf:"secret"` - Meta map[string]string `mapstructure:"meta" koanf:"meta"` - LogFormat string `mapstructure:"logFormat" koanf:"logformat"` - FlushInterval int64 `mapstructure:"flushInterval" koanf:"flushinterval"` - MaxEventInMemory int64 `mapstructure:"maxEventInMemory" koanf:"maxeventinmemory"` + Kind ExporterKind `mapstructure:"kind" koanf:"kind"` + OutputDir string `mapstructure:"outputDir" koanf:"outputdir"` + Format string `mapstructure:"format" koanf:"format"` + Filename string `mapstructure:"filename" koanf:"filename"` + CsvTemplate string `mapstructure:"csvTemplate" koanf:"csvtemplate"` + Bucket string `mapstructure:"bucket" koanf:"bucket"` + Path string `mapstructure:"path" koanf:"path"` + EndpointURL string `mapstructure:"endpointUrl" koanf:"endpointurl"` + Secret string `mapstructure:"secret" koanf:"secret"` + Meta map[string]string `mapstructure:"meta" koanf:"meta"` + LogFormat string `mapstructure:"logFormat" koanf:"logformat"` + FlushInterval int64 `mapstructure:"flushInterval" koanf:"flushinterval"` + MaxEventInMemory int64 `mapstructure:"maxEventInMemory" koanf:"maxeventinmemory"` + ParquetCompressionCodec string `mapstructure:"parquetCompressionCodec" koanf:"parquetcompressioncodec"` } func (c *ExporterConf) IsValid() error { @@ -32,6 +37,11 @@ func (c *ExporterConf) IsValid() error { if c.Kind == WebhookExporter && c.EndpointURL == "" { return fmt.Errorf("invalid exporter: no \"endpointUrl\" property found for kind \"%s\"", c.Kind) } + if len(c.ParquetCompressionCodec) > 0 { + if _, err := parquet.CompressionCodecFromString(c.ParquetCompressionCodec); err != nil { + return fmt.Errorf("invalid exporter: \"parquetCompressionCodec\" err: %v", err) + } + } return nil } diff --git a/cmd/relayproxy/config/exporter_test.go b/cmd/relayproxy/config/exporter_test.go index 0519b129da5..425f0efe49c 100644 --- a/cmd/relayproxy/config/exporter_test.go +++ b/cmd/relayproxy/config/exporter_test.go @@ -9,16 +9,17 @@ import ( func TestExporterConf_IsValid(t *testing.T) { type fields struct { - Kind string - OutputDir string - Format string - Filename string - CsvTemplate string - Bucket string - Path string - EndpointURL string - Secret string - Meta map[string]string + Kind string + OutputDir string + Format string + Filename string + CsvTemplate string + Bucket string + Path string + EndpointURL string + Secret string + Meta map[string]string + ParquetCompressionCodec string } tests := []struct { name string @@ -127,20 +128,32 @@ func TestExporterConf_IsValid(t *testing.T) { }, wantErr: false, }, + { + name: "invalid parquetCompressionCodec", + fields: fields{ + Kind: "googleStorage", + Bucket: "testbucket", + Format: "parquet", + ParquetCompressionCodec: "invalid", + }, + wantErr: true, + errValue: "invalid exporter: \"parquetCompressionCodec\" err: not a valid CompressionCodec string", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := &config.ExporterConf{ - Kind: config.ExporterKind(tt.fields.Kind), - OutputDir: tt.fields.OutputDir, - Format: tt.fields.Format, - Filename: tt.fields.Filename, - CsvTemplate: tt.fields.CsvTemplate, - Bucket: tt.fields.Bucket, - Path: tt.fields.Path, - EndpointURL: tt.fields.EndpointURL, - Secret: tt.fields.Secret, - Meta: tt.fields.Meta, + Kind: config.ExporterKind(tt.fields.Kind), + OutputDir: tt.fields.OutputDir, + Format: tt.fields.Format, + Filename: tt.fields.Filename, + CsvTemplate: tt.fields.CsvTemplate, + Bucket: tt.fields.Bucket, + Path: tt.fields.Path, + EndpointURL: tt.fields.EndpointURL, + Secret: tt.fields.Secret, + Meta: tt.fields.Meta, + ParquetCompressionCodec: tt.fields.ParquetCompressionCodec, } err := c.IsValid() assert.Equal(t, tt.wantErr, err != nil) diff --git a/cmd/relayproxy/main.go b/cmd/relayproxy/main.go index cdce0cc3c0b..e40a7504d08 100644 --- a/cmd/relayproxy/main.go +++ b/cmd/relayproxy/main.go @@ -2,9 +2,9 @@ package main import ( "fmt" - "github.com/spf13/pflag" "os" + "github.com/spf13/pflag" "github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/api" "github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/config" "github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/docs" diff --git a/cmd/relayproxy/service/gofeatureflag.go b/cmd/relayproxy/service/gofeatureflag.go index ace5f027d70..0fc423bd33f 100644 --- a/cmd/relayproxy/service/gofeatureflag.go +++ b/cmd/relayproxy/service/gofeatureflag.go @@ -167,6 +167,11 @@ func initExporter(c *config.ExporterConf) (ffclient.DataExporter, error) { csvTemplate = c.CsvTemplate } + parquetCompressionCodec := config.DefaultExporter.ParquetCompressionCodec + if c.ParquetCompressionCodec != "" { + parquetCompressionCodec = c.ParquetCompressionCodec + } + dataExp := ffclient.DataExporter{ FlushInterval: func() time.Duration { if c.FlushInterval != 0 { @@ -193,10 +198,11 @@ func initExporter(c *config.ExporterConf) (ffclient.DataExporter, error) { case config.FileExporter: dataExp.Exporter = &fileexporter.Exporter{ - Format: format, - OutputDir: c.OutputDir, - Filename: filename, - CsvTemplate: csvTemplate, + Format: format, + OutputDir: c.OutputDir, + Filename: filename, + CsvTemplate: csvTemplate, + ParquetCompressionCodec: parquetCompressionCodec, } return dataExp, nil @@ -213,21 +219,23 @@ func initExporter(c *config.ExporterConf) (ffclient.DataExporter, error) { case config.S3Exporter: dataExp.Exporter = &s3exporter.Exporter{ - Bucket: c.Bucket, - Format: format, - S3Path: c.Path, - Filename: filename, - CsvTemplate: csvTemplate, + Bucket: c.Bucket, + Format: format, + S3Path: c.Path, + Filename: filename, + CsvTemplate: csvTemplate, + ParquetCompressionCodec: parquetCompressionCodec, } return dataExp, nil case config.GoogleStorageExporter: dataExp.Exporter = &gcstorageexporter.Exporter{ - Bucket: c.Bucket, - Format: format, - Path: c.Path, - Filename: filename, - CsvTemplate: csvTemplate, + Bucket: c.Bucket, + Format: format, + Path: c.Path, + Filename: filename, + CsvTemplate: csvTemplate, + ParquetCompressionCodec: parquetCompressionCodec, } return dataExp, nil diff --git a/cmd/relayproxy/service/gofeatureflag_test.go b/cmd/relayproxy/service/gofeatureflag_test.go index db9590a0a21..6908d55e0ce 100644 --- a/cmd/relayproxy/service/gofeatureflag_test.go +++ b/cmd/relayproxy/service/gofeatureflag_test.go @@ -1,6 +1,10 @@ package service import ( + "net/http" + "testing" + "time" + "github.com/stretchr/testify/assert" ffclient "github.com/thomaspoignant/go-feature-flag" "github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/config" @@ -15,9 +19,7 @@ import ( "github.com/thomaspoignant/go-feature-flag/retriever/githubretriever" "github.com/thomaspoignant/go-feature-flag/retriever/httpretriever" "github.com/thomaspoignant/go-feature-flag/retriever/s3retriever" - "net/http" - "testing" - "time" + "github.com/xitongsys/parquet-go/parquet" ) func Test_initRetriever(t *testing.T) { @@ -149,17 +151,19 @@ func Test_initExporter(t *testing.T) { name: "Convert FileExporter", wantErr: assert.NoError, conf: &config.ExporterConf{ - Kind: "file", - OutputDir: "/outputfolder/", + Kind: "file", + OutputDir: "/outputfolder/", + ParquetCompressionCodec: parquet.CompressionCodec_UNCOMPRESSED.String(), }, want: ffclient.DataExporter{ FlushInterval: config.DefaultExporter.FlushInterval, MaxEventInMemory: config.DefaultExporter.MaxEventInMemory, Exporter: &fileexporter.Exporter{ - Format: config.DefaultExporter.Format, - OutputDir: "/outputfolder/", - Filename: config.DefaultExporter.FileName, - CsvTemplate: config.DefaultExporter.CsvFormat, + Format: config.DefaultExporter.Format, + OutputDir: "/outputfolder/", + Filename: config.DefaultExporter.FileName, + CsvTemplate: config.DefaultExporter.CsvFormat, + ParquetCompressionCodec: parquet.CompressionCodec_UNCOMPRESSED.String(), }, }, }, @@ -190,11 +194,12 @@ func Test_initExporter(t *testing.T) { FlushInterval: 10 * time.Millisecond, MaxEventInMemory: config.DefaultExporter.MaxEventInMemory, Exporter: &s3exporter.Exporter{ - Bucket: "my-bucket", - Format: config.DefaultExporter.Format, - S3Path: "/my-path/", - Filename: config.DefaultExporter.FileName, - CsvTemplate: config.DefaultExporter.CsvFormat, + Bucket: "my-bucket", + Format: config.DefaultExporter.Format, + S3Path: "/my-path/", + Filename: config.DefaultExporter.FileName, + CsvTemplate: config.DefaultExporter.CsvFormat, + ParquetCompressionCodec: config.DefaultExporter.ParquetCompressionCodec, }, }, }, @@ -211,11 +216,12 @@ func Test_initExporter(t *testing.T) { FlushInterval: config.DefaultExporter.FlushInterval, MaxEventInMemory: 1990, Exporter: &gcstorageexporter.Exporter{ - Bucket: "my-bucket", - Format: config.DefaultExporter.Format, - Path: "/my-path/", - Filename: config.DefaultExporter.FileName, - CsvTemplate: config.DefaultExporter.CsvFormat, + Bucket: "my-bucket", + Format: config.DefaultExporter.Format, + Path: "/my-path/", + Filename: config.DefaultExporter.FileName, + CsvTemplate: config.DefaultExporter.CsvFormat, + ParquetCompressionCodec: config.DefaultExporter.ParquetCompressionCodec, }, }, },