Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature support-parquet-for-data-collection #667

33 changes: 24 additions & 9 deletions exporter/feature_event.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package exporter

import (
"encoding/json"
"time"

"github.com/thomaspoignant/go-feature-flag/ffuser"
Expand Down Expand Up @@ -33,38 +34,52 @@ func NewFeatureEvent(
}

// FeatureEvent represent an event that we store in the data storage
// nolint:lll
type FeatureEvent struct {
// Kind for a feature event is feature.
// A feature event will only be generated if the trackEvents attribute of the flag is set to true.
Kind string `json:"kind" example:"feature"`
Kind string `json:"kind" example:"feature" parquet:"name=kind, type=BYTE_ARRAY, convertedtype=UTF8"`

// ContextKind is the kind of context which generated an event. This will only be "anonymousUser" for events generated
// on behalf of an anonymous user or the reserved word "user" for events generated on behalf of a non-anonymous user
ContextKind string `json:"contextKind,omitempty" example:"user"`
ContextKind string `json:"contextKind,omitempty" example:"user" parquet:"name=contextKind, type=BYTE_ARRAY, convertedtype=UTF8"`

// UserKey The key of the user object used in a feature flag evaluation. Details for the user object used in a feature
// flag evaluation as reported by the "feature" event are transmitted periodically with a separate index event.
UserKey string `json:"userKey" example:"94a25909-20d8-40cc-8500-fee99b569345"`
UserKey string `json:"userKey" example:"94a25909-20d8-40cc-8500-fee99b569345" parquet:"name=userKey, type=BYTE_ARRAY, convertedtype=UTF8"`

// CreationDate When the feature flag was requested at Unix epoch time in milliseconds.
CreationDate int64 `json:"creationDate" example:"1680246000011"`
CreationDate int64 `json:"creationDate" example:"1680246000011" parquet:"name=creationDate, type=INT64"`

// Key of the feature flag requested.
Key string `json:"key" example:"my-feature-flag"`
Key string `json:"key" example:"my-feature-flag" parquet:"name=key, type=BYTE_ARRAY, convertedtype=UTF8"`

// Variation of the flag requested. Flag variation values can be "True", "False", "Default" or "SdkDefault"
// depending on which value was taken during flag evaluation. "SdkDefault" is used when an error is detected and the
// default value passed during the call to your variation is used.
Variation string `json:"variation" example:"admin-variation"`
Variation string `json:"variation" example:"admin-variation" parquet:"name=variation, type=BYTE_ARRAY, convertedtype=UTF8"`

// Value of the feature flag returned by feature flag evaluation.
Value interface{} `json:"value"`
Value interface{} `json:"value" parquet:"name=value, type=BYTE_ARRAY, convertedtype=UTF8"`
thomaspoignant marked this conversation as resolved.
Show resolved Hide resolved

// Default value is set to true if feature flag evaluation failed, in which case the value returned was the default
// value passed to variation. If the default field is omitted, it is assumed to be false.
Default bool `json:"default" example:"false"`
Default bool `json:"default" example:"false" parquet:"name=default, type=BOOLEAN"`

// Version contains the version of the flag. If the field is omitted for the flag in the configuration file
// the default version will be 0.
Version string `json:"version" example:"v1.0.0"`
Version string `json:"version" example:"v1.0.0" parquet:"name=version, type=BYTE_ARRAY, convertedtype=UTF8"`
}

// MarshalInterface marshals all interface type fields in FeatureEvent into JSON-encoded string.
func (f *FeatureEvent) MarshalInterface() error {
if f == nil {
return nil
}
b, err := json.Marshal(f.Value)
if err != nil {
return err
}
f.Value = string(b)
return nil
thomaspoignant marked this conversation as resolved.
Show resolved Hide resolved
}
67 changes: 67 additions & 0 deletions exporter/feature_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,70 @@ func TestNewFeatureEvent(t *testing.T) {
})
}
}

func TestFeatureEvent_MarshalInterface(t *testing.T) {
tests := []struct {
name string
featureEvent *exporter.FeatureEvent
want *exporter.FeatureEvent
wantErr bool
}{
{
name: "happy path",
featureEvent: &exporter.FeatureEvent{
Kind: "feature",
ContextKind: "anonymousUser",
UserKey: "ABCD",
CreationDate: 1617970547,
Key: "random-key",
Variation: "Default",
Value: map[string]interface{}{
"string": "string",
"bool": true,
"float": 1.23,
"int": 1,
},
Default: false,
},
want: &exporter.FeatureEvent{
Kind: "feature",
ContextKind: "anonymousUser",
UserKey: "ABCD",
CreationDate: 1617970547,
Key: "random-key",
Variation: "Default",
Value: `{"bool":true,"float":1.23,"int":1,"string":"string"}`,
Default: false,
},
},
{
name: "marshal failed",
featureEvent: &exporter.FeatureEvent{
Kind: "feature",
ContextKind: "anonymousUser",
UserKey: "ABCD",
CreationDate: 1617970547,
Key: "random-key",
Variation: "Default",
Value: make(chan int),
Default: false,
},
wantErr: true,
},
{
name: "nil featureEvent",
featureEvent: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := tt.featureEvent.MarshalInterface(); (err != nil) != tt.wantErr {
t.Errorf("FeatureEvent.MarshalInterface() error = %v, wantErr %v", err, tt.wantErr)
return
}
if tt.want != nil {
assert.Equal(t, tt.want, tt.featureEvent)
}
})
}
}
58 changes: 52 additions & 6 deletions exporter/fileexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@ import (
"fmt"
"log"
"os"
"runtime"
"strings"
"sync"
"text/template"

"github.com/thomaspoignant/go-feature-flag/exporter"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/writer"
)

type Exporter struct {
// Format is the output format you want in your exported file.
// Available format are JSON and CSV.
// Available format are JSON, CSV, and Parquet.
// Default: JSON
Format string

Expand All @@ -37,6 +41,11 @@ type Exporter struct {
// {{ .Kind}};{{ .ContextKind}};{{ .UserKey}};{{ .CreationDate}};{{ .Key}};{{ .Variation}};{{ .Value}};{{ .Default}}\n
CsvTemplate string

// ParquetCompressionCodec is the parquet compression codec for better space efficiency.
// Available options https://github.com/apache/parquet-format/blob/master/Compression.md
// Default: SNAPPY
ParquetCompressionCodec string

csvTemplate *template.Template
filenameTemplate *template.Template
initTemplates sync.Once
Expand All @@ -54,6 +63,7 @@ func (f *Exporter) Export(_ context.Context, _ *log.Logger, featureEvents []expo
if f.Format == "" {
f.Format = "json"
}
f.Format = strings.ToLower(f.Format)

// Get the filename
filename, err := exporter.ComputeFilename(f.filenameTemplate, f.Format)
Expand All @@ -63,6 +73,19 @@ func (f *Exporter) Export(_ context.Context, _ *log.Logger, featureEvents []expo

filePath := f.OutputDir + "/" + filename

if f.Format == "parquet" {
return f.writeParquet(filePath, featureEvents)
}
return f.writeFile(filePath, featureEvents)
}

// IsBulk return false if we should directly send the data as soon as it is produce
// and true if we collect the data to send them in bulk.
func (f *Exporter) IsBulk() bool {
return true
}

func (f *Exporter) writeFile(filePath string, featureEvents []exporter.FeatureEvent) error {
file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return err
Expand All @@ -73,7 +96,7 @@ func (f *Exporter) Export(_ context.Context, _ *log.Logger, featureEvents []expo
var err error

// Convert the line in the right format
switch strings.ToLower(f.Format) {
switch f.Format {
case "csv":
line, err = exporter.FormatEventInCSV(f.csvTemplate, event)
case "json":
Expand All @@ -94,8 +117,31 @@ func (f *Exporter) Export(_ context.Context, _ *log.Logger, featureEvents []expo
return nil
}

// IsBulk return false if we should directly send the data as soon as it is produce
// and true if we collect the data to send them in bulk.
func (f *Exporter) IsBulk() bool {
return true
func (f *Exporter) writeParquet(filePath string, featureEvents []exporter.FeatureEvent) error {
fw, err := local.NewLocalFileWriter(filePath)
if err != nil {
return err
}
defer fw.Close()

pw, err := writer.NewParquetWriter(fw, new(exporter.FeatureEvent), int64(runtime.NumCPU()))
if err != nil {
return err
}
thomaspoignant marked this conversation as resolved.
Show resolved Hide resolved

pw.CompressionType = parquet.CompressionCodec_SNAPPY
if ct, err := parquet.CompressionCodecFromString(f.ParquetCompressionCodec); err == nil {
pw.CompressionType = ct
}

for _, event := range featureEvents {
if err := event.MarshalInterface(); err != nil {
return err
}
if err = pw.Write(event); err != nil {
return fmt.Errorf("error while writing the export file: %v", err)
}
}

return pw.WriteStop()
}
Loading