Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix relay proxy config - support parquet for data collection #710

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions cmd/relayproxy/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package config

import (
"fmt"
"net/http"
"os"
"path/filepath"
"strings"
"time"

"github.com/knadh/koanf/parsers/json"
"github.com/knadh/koanf/parsers/toml"
"github.com/knadh/koanf/parsers/yaml"
Expand All @@ -10,13 +16,8 @@ import (
"github.com/knadh/koanf/providers/file"
"github.com/knadh/koanf/providers/posflag"
"github.com/knadh/koanf/v2"
"net/http"
"os"
"path/filepath"
"strings"
"time"

"github.com/spf13/pflag"
"github.com/xitongsys/parquet-go/parquet"
"go.uber.org/zap"
)

Expand All @@ -32,20 +33,22 @@ var DefaultRetriever = struct {
}

var DefaultExporter = struct {
Format string
LogFormat string
FileName string
CsvFormat string
FlushInterval time.Duration
MaxEventInMemory int64
Format string
LogFormat string
FileName string
CsvFormat string
FlushInterval time.Duration
MaxEventInMemory int64
ParquetCompressionCodec string
}{
Format: "JSON",
LogFormat: "[{{ .FormattedDate}}] user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", value=\"{{ .Value}}\"",
FileName: "flag-variation-{{ .Hostname}}-{{ .Timestamp}}.{{ .Format}}",
CsvFormat: "{ .Kind}};{{ .ContextKind}};{{ .UserKey}};{{ .CreationDate}};{{ .Key}};{{ .Variation}};" +
"{{ .Value}};{{ .Default}}\\n",
FlushInterval: 60000 * time.Millisecond,
MaxEventInMemory: 100000,
FlushInterval: 60000 * time.Millisecond,
MaxEventInMemory: 100000,
ParquetCompressionCodec: parquet.CompressionCodec_SNAPPY.String(),
}

// New is reading the configuration file
Expand Down
38 changes: 24 additions & 14 deletions cmd/relayproxy/config/exporter.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
package config

import "fmt"
import (
"fmt"

"github.com/xitongsys/parquet-go/parquet"
)

// ExporterConf contains all the field to configure an exporter
type ExporterConf struct {
Kind ExporterKind `mapstructure:"kind" koanf:"kind"`
OutputDir string `mapstructure:"outputDir" koanf:"outputdir"`
Format string `mapstructure:"format" koanf:"format"`
Filename string `mapstructure:"filename" koanf:"filename"`
CsvTemplate string `mapstructure:"csvTemplate" koanf:"csvtemplate"`
Bucket string `mapstructure:"bucket" koanf:"bucket"`
Path string `mapstructure:"path" koanf:"path"`
EndpointURL string `mapstructure:"endpointUrl" koanf:"endpointurl"`
Secret string `mapstructure:"secret" koanf:"secret"`
Meta map[string]string `mapstructure:"meta" koanf:"meta"`
LogFormat string `mapstructure:"logFormat" koanf:"logformat"`
FlushInterval int64 `mapstructure:"flushInterval" koanf:"flushinterval"`
MaxEventInMemory int64 `mapstructure:"maxEventInMemory" koanf:"maxeventinmemory"`
Kind ExporterKind `mapstructure:"kind" koanf:"kind"`
OutputDir string `mapstructure:"outputDir" koanf:"outputdir"`
Format string `mapstructure:"format" koanf:"format"`
Filename string `mapstructure:"filename" koanf:"filename"`
CsvTemplate string `mapstructure:"csvTemplate" koanf:"csvtemplate"`
Bucket string `mapstructure:"bucket" koanf:"bucket"`
Path string `mapstructure:"path" koanf:"path"`
EndpointURL string `mapstructure:"endpointUrl" koanf:"endpointurl"`
Secret string `mapstructure:"secret" koanf:"secret"`
Meta map[string]string `mapstructure:"meta" koanf:"meta"`
LogFormat string `mapstructure:"logFormat" koanf:"logformat"`
FlushInterval int64 `mapstructure:"flushInterval" koanf:"flushinterval"`
MaxEventInMemory int64 `mapstructure:"maxEventInMemory" koanf:"maxeventinmemory"`
ParquetCompressionCodec string `mapstructure:"parquetCompressionCodec" koanf:"parquetCompressionCodec"`
thomaspoignant marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *ExporterConf) IsValid() error {
Expand All @@ -32,6 +37,11 @@ func (c *ExporterConf) IsValid() error {
if c.Kind == WebhookExporter && c.EndpointURL == "" {
return fmt.Errorf("invalid exporter: no \"endpointUrl\" property found for kind \"%s\"", c.Kind)
}
if len(c.ParquetCompressionCodec) > 0 {
if _, err := parquet.CompressionCodecFromString(c.ParquetCompressionCodec); err != nil {
return fmt.Errorf("invalid exporter: \"parquetCompressionCodec\" err: %v", err)
}
}
return nil
}

Expand Down
53 changes: 33 additions & 20 deletions cmd/relayproxy/config/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@ import (

func TestExporterConf_IsValid(t *testing.T) {
type fields struct {
Kind string
OutputDir string
Format string
Filename string
CsvTemplate string
Bucket string
Path string
EndpointURL string
Secret string
Meta map[string]string
Kind string
OutputDir string
Format string
Filename string
CsvTemplate string
Bucket string
Path string
EndpointURL string
Secret string
Meta map[string]string
ParquetCompressionCodec string
}
tests := []struct {
name string
Expand Down Expand Up @@ -127,20 +128,32 @@ func TestExporterConf_IsValid(t *testing.T) {
},
wantErr: false,
},
{
name: "invalid parquetCompressionCodec",
fields: fields{
Kind: "googleStorage",
Bucket: "testbucket",
Format: "parquet",
ParquetCompressionCodec: "invalid",
},
wantErr: true,
errValue: "invalid exporter: \"parquetCompressionCodec\" err: not a valid CompressionCodec string",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &config.ExporterConf{
Kind: config.ExporterKind(tt.fields.Kind),
OutputDir: tt.fields.OutputDir,
Format: tt.fields.Format,
Filename: tt.fields.Filename,
CsvTemplate: tt.fields.CsvTemplate,
Bucket: tt.fields.Bucket,
Path: tt.fields.Path,
EndpointURL: tt.fields.EndpointURL,
Secret: tt.fields.Secret,
Meta: tt.fields.Meta,
Kind: config.ExporterKind(tt.fields.Kind),
OutputDir: tt.fields.OutputDir,
Format: tt.fields.Format,
Filename: tt.fields.Filename,
CsvTemplate: tt.fields.CsvTemplate,
Bucket: tt.fields.Bucket,
Path: tt.fields.Path,
EndpointURL: tt.fields.EndpointURL,
Secret: tt.fields.Secret,
Meta: tt.fields.Meta,
ParquetCompressionCodec: tt.fields.ParquetCompressionCodec,
}
err := c.IsValid()
assert.Equal(t, tt.wantErr, err != nil)
Expand Down
2 changes: 1 addition & 1 deletion cmd/relayproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package main

import (
"fmt"
"github.com/spf13/pflag"
"os"

"github.com/spf13/pflag"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be great to avoid this kind of change in the PR.

"github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/api"
"github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/config"
"github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/docs"
Expand Down
36 changes: 22 additions & 14 deletions cmd/relayproxy/service/gofeatureflag.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ func initExporter(c *config.ExporterConf) (ffclient.DataExporter, error) {
csvTemplate = c.CsvTemplate
}

parquetCompressionCodec := config.DefaultExporter.ParquetCompressionCodec
if c.ParquetCompressionCodec != "" {
parquetCompressionCodec = c.ParquetCompressionCodec
}

dataExp := ffclient.DataExporter{
FlushInterval: func() time.Duration {
if c.FlushInterval != 0 {
Expand All @@ -193,10 +198,11 @@ func initExporter(c *config.ExporterConf) (ffclient.DataExporter, error) {

case config.FileExporter:
dataExp.Exporter = &fileexporter.Exporter{
Format: format,
OutputDir: c.OutputDir,
Filename: filename,
CsvTemplate: csvTemplate,
Format: format,
OutputDir: c.OutputDir,
Filename: filename,
CsvTemplate: csvTemplate,
ParquetCompressionCodec: parquetCompressionCodec,
}
return dataExp, nil

Expand All @@ -213,21 +219,23 @@ func initExporter(c *config.ExporterConf) (ffclient.DataExporter, error) {

case config.S3Exporter:
dataExp.Exporter = &s3exporter.Exporter{
Bucket: c.Bucket,
Format: format,
S3Path: c.Path,
Filename: filename,
CsvTemplate: csvTemplate,
Bucket: c.Bucket,
Format: format,
S3Path: c.Path,
Filename: filename,
CsvTemplate: csvTemplate,
ParquetCompressionCodec: parquetCompressionCodec,
}
return dataExp, nil

case config.GoogleStorageExporter:
dataExp.Exporter = &gcstorageexporter.Exporter{
Bucket: c.Bucket,
Format: format,
Path: c.Path,
Filename: filename,
CsvTemplate: csvTemplate,
Bucket: c.Bucket,
Format: format,
Path: c.Path,
Filename: filename,
CsvTemplate: csvTemplate,
ParquetCompressionCodec: parquetCompressionCodec,
}
return dataExp, nil

Expand Down
44 changes: 25 additions & 19 deletions cmd/relayproxy/service/gofeatureflag_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package service

import (
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
ffclient "github.com/thomaspoignant/go-feature-flag"
"github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/config"
Expand All @@ -15,9 +19,7 @@ import (
"github.com/thomaspoignant/go-feature-flag/retriever/githubretriever"
"github.com/thomaspoignant/go-feature-flag/retriever/httpretriever"
"github.com/thomaspoignant/go-feature-flag/retriever/s3retriever"
"net/http"
"testing"
"time"
"github.com/xitongsys/parquet-go/parquet"
)

func Test_initRetriever(t *testing.T) {
Expand Down Expand Up @@ -149,17 +151,19 @@ func Test_initExporter(t *testing.T) {
name: "Convert FileExporter",
wantErr: assert.NoError,
conf: &config.ExporterConf{
Kind: "file",
OutputDir: "/outputfolder/",
Kind: "file",
OutputDir: "/outputfolder/",
ParquetCompressionCodec: parquet.CompressionCodec_UNCOMPRESSED.String(),
},
want: ffclient.DataExporter{
FlushInterval: config.DefaultExporter.FlushInterval,
MaxEventInMemory: config.DefaultExporter.MaxEventInMemory,
Exporter: &fileexporter.Exporter{
Format: config.DefaultExporter.Format,
OutputDir: "/outputfolder/",
Filename: config.DefaultExporter.FileName,
CsvTemplate: config.DefaultExporter.CsvFormat,
Format: config.DefaultExporter.Format,
OutputDir: "/outputfolder/",
Filename: config.DefaultExporter.FileName,
CsvTemplate: config.DefaultExporter.CsvFormat,
ParquetCompressionCodec: parquet.CompressionCodec_UNCOMPRESSED.String(),
},
},
},
Expand Down Expand Up @@ -190,11 +194,12 @@ func Test_initExporter(t *testing.T) {
FlushInterval: 10 * time.Millisecond,
MaxEventInMemory: config.DefaultExporter.MaxEventInMemory,
Exporter: &s3exporter.Exporter{
Bucket: "my-bucket",
Format: config.DefaultExporter.Format,
S3Path: "/my-path/",
Filename: config.DefaultExporter.FileName,
CsvTemplate: config.DefaultExporter.CsvFormat,
Bucket: "my-bucket",
Format: config.DefaultExporter.Format,
S3Path: "/my-path/",
Filename: config.DefaultExporter.FileName,
CsvTemplate: config.DefaultExporter.CsvFormat,
ParquetCompressionCodec: config.DefaultExporter.ParquetCompressionCodec,
},
},
},
Expand All @@ -211,11 +216,12 @@ func Test_initExporter(t *testing.T) {
FlushInterval: config.DefaultExporter.FlushInterval,
MaxEventInMemory: 1990,
Exporter: &gcstorageexporter.Exporter{
Bucket: "my-bucket",
Format: config.DefaultExporter.Format,
Path: "/my-path/",
Filename: config.DefaultExporter.FileName,
CsvTemplate: config.DefaultExporter.CsvFormat,
Bucket: "my-bucket",
Format: config.DefaultExporter.Format,
Path: "/my-path/",
Filename: config.DefaultExporter.FileName,
CsvTemplate: config.DefaultExporter.CsvFormat,
ParquetCompressionCodec: config.DefaultExporter.ParquetCompressionCodec,
},
},
},
Expand Down