Skip to content

Commit

Permalink
[exporter/file] Refactor to keep the file exporter inside start/stop …
Browse files Browse the repository at this point in the history
…lifecycle (#31495)

**Description:**
Scope the behavior of the fileexporter to its lifecycle, so it is safe
to shut it down or restart it.

**Link to tracking Issue:**
Fixes #31494
  • Loading branch information
atoulme authored Feb 29, 2024
1 parent ce6e401 commit 89c6207
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 115 deletions.
27 changes: 27 additions & 0 deletions .chloggen/fileexporter_lifecycle.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: fileexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Scope the behavior of the fileexporter to its lifecycle, so it is safe to shut it down or restart it.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27489]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
26 changes: 0 additions & 26 deletions exporter/fileexporter/error_component.go

This file was deleted.

54 changes: 10 additions & 44 deletions exporter/fileexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@ func createTracesExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Traces, error) {
fe, err := getOrCreateFileExporter(cfg)
if err != nil {
return nil, err
}
fe := getOrCreateFileExporter(cfg)
return exporterhelper.NewTracesExporter(
ctx,
set,
Expand All @@ -83,10 +80,7 @@ func createMetricsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Metrics, error) {
fe, err := getOrCreateFileExporter(cfg)
if err != nil {
return nil, err
}
fe := getOrCreateFileExporter(cfg)
return exporterhelper.NewMetricsExporter(
ctx,
set,
Expand All @@ -103,10 +97,7 @@ func createLogsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Logs, error) {
fe, err := getOrCreateFileExporter(cfg)
if err != nil {
return nil, err
}
fe := getOrCreateFileExporter(cfg)
return exporterhelper.NewLogsExporter(
ctx,
set,
Expand All @@ -122,45 +113,20 @@ func createLogsExporter(
// or returns the already cached one. Caching is required because the factory is asked trace and
// metric receivers separately when it gets CreateTracesReceiver() and CreateMetricsReceiver()
// but they must not create separate objects, they must use one Exporter object per configuration.
func getOrCreateFileExporter(cfg component.Config) (FileExporter, error) {
func getOrCreateFileExporter(cfg component.Config) FileExporter {
conf := cfg.(*Config)
fe := exporters.GetOrAdd(cfg, func() component.Component {
e, err := newFileExporter(conf)
if err != nil {
return &errorComponent{err: err}
}

return e
return newFileExporter(conf)
})

component := fe.Unwrap()
if errComponent, ok := component.(*errorComponent); ok {
return nil, errComponent.err
}

return component.(FileExporter), nil
c := fe.Unwrap()
return c.(FileExporter)
}

func newFileExporter(conf *Config) (FileExporter, error) {
marshaller := &marshaller{
formatType: conf.FormatType,
tracesMarshaler: tracesMarshalers[conf.FormatType],
metricsMarshaler: metricsMarshalers[conf.FormatType],
logsMarshaler: logsMarshalers[conf.FormatType],
compression: conf.Compression,
compressor: buildCompressor(conf.Compression),
}
export := buildExportFunc(conf)

writer, err := newFileWriter(conf.Path, conf.Rotation, conf.FlushInterval, export)
if err != nil {
return nil, err
}

func newFileExporter(conf *Config) FileExporter {
return &fileExporter{
marshaller: marshaller,
writer: writer,
}, nil
conf: conf,
}
}

func newFileWriter(path string, rotation *Rotation, flushInterval time.Duration, export exportFunc) (*fileWriter, error) {
Expand Down
12 changes: 9 additions & 3 deletions exporter/fileexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ func TestCreateMetricsExporterError(t *testing.T) {
cfg := &Config{
FormatType: formatTypeJSON,
}
_, err := createMetricsExporter(
e, err := createMetricsExporter(
context.Background(),
exportertest.NewNopCreateSettings(),
cfg)
require.NoError(t, err)
err = e.Start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
}

Expand Down Expand Up @@ -65,10 +67,12 @@ func TestCreateTracesExporterError(t *testing.T) {
cfg := &Config{
FormatType: formatTypeJSON,
}
_, err := createTracesExporter(
e, err := createTracesExporter(
context.Background(),
exportertest.NewNopCreateSettings(),
cfg)
require.NoError(t, err)
err = e.Start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
}

Expand All @@ -90,10 +94,12 @@ func TestCreateLogsExporterError(t *testing.T) {
cfg := &Config{
FormatType: formatTypeJSON,
}
_, err := createLogsExporter(
e, err := createLogsExporter(
context.Background(),
exportertest.NewNopCreateSettings(),
cfg)
require.NoError(t, err)
err = e.Start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
}

Expand Down
28 changes: 25 additions & 3 deletions exporter/fileexporter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

// fileExporter is the implementation of file exporter that writes telemetry data to a file
type fileExporter struct {
conf *Config
marshaller *marshaller
writer *fileWriter
}
Expand Down Expand Up @@ -43,12 +44,33 @@ func (e *fileExporter) consumeLogs(_ context.Context, ld plog.Logs) error {
}

// Start starts the flush timer if set.
func (e *fileExporter) Start(ctx context.Context, _ component.Host) error {
return e.writer.start(ctx)
func (e *fileExporter) Start(_ context.Context, _ component.Host) error {
e.marshaller = &marshaller{
formatType: e.conf.FormatType,
tracesMarshaler: tracesMarshalers[e.conf.FormatType],
metricsMarshaler: metricsMarshalers[e.conf.FormatType],
logsMarshaler: logsMarshalers[e.conf.FormatType],
compression: e.conf.Compression,
compressor: buildCompressor(e.conf.Compression),
}
export := buildExportFunc(e.conf)

var err error
e.writer, err = newFileWriter(e.conf.Path, e.conf.Rotation, e.conf.FlushInterval, export)
if err != nil {
return err
}
e.writer.start()
return nil
}

// Shutdown stops the exporter and is invoked during shutdown.
// It stops the flush ticker if set.
func (e *fileExporter) Shutdown(context.Context) error {
return e.writer.shutdown()
if e.writer == nil {
return nil
}
w := e.writer
e.writer = nil
return w.shutdown()
}
62 changes: 29 additions & 33 deletions exporter/fileexporter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,17 @@ func TestFileTracesExporter(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conf := tt.args.conf
feI, err := newFileExporter(conf)
assert.NoError(t, err)
feI := newFileExporter(conf)
require.IsType(t, &fileExporter{}, feI)
fe := feI.(*fileExporter)

td := testdata.GenerateTracesTwoSpansSameResource()
assert.NoError(t, fe.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, fe.consumeTraces(context.Background(), td))
assert.NoError(t, fe.consumeTraces(context.Background(), td))
assert.NoError(t, fe.Shutdown(context.Background()))
defer func() {
assert.NoError(t, fe.Shutdown(context.Background()))
}()

fi, err := os.Open(fe.writer.path)
assert.NoError(t, err)
Expand Down Expand Up @@ -256,24 +257,18 @@ func TestFileMetricsExporter(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conf := tt.args.conf
writer, err := buildFileWriter(conf)
assert.NoError(t, err)
fe := &fileExporter{
marshaller: &marshaller{
formatType: conf.FormatType,
metricsMarshaler: metricsMarshalers[conf.FormatType],
compression: conf.Compression,
compressor: buildCompressor(conf.Compression),
},
writer: writer,
conf: conf,
}
require.NotNil(t, fe)

md := testdata.GenerateMetricsTwoMetrics()
assert.NoError(t, fe.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, fe.consumeMetrics(context.Background(), md))
assert.NoError(t, fe.consumeMetrics(context.Background(), md))
assert.NoError(t, fe.Shutdown(context.Background()))
defer func() {
assert.NoError(t, fe.Shutdown(context.Background()))
}()

fi, err := os.Open(fe.writer.path)
assert.NoError(t, err)
Expand Down Expand Up @@ -396,24 +391,18 @@ func TestFileLogsExporter(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conf := tt.args.conf
writer, err := buildFileWriter(conf)
assert.NoError(t, err)
fe := &fileExporter{
marshaller: &marshaller{
formatType: conf.FormatType,
logsMarshaler: logsMarshalers[conf.FormatType],
compression: conf.Compression,
compressor: buildCompressor(conf.Compression),
},
writer: writer,
conf: conf,
}
require.NotNil(t, fe)

ld := testdata.GenerateLogsTwoLogRecordsSameResource()
assert.NoError(t, fe.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, fe.consumeLogs(context.Background(), ld))
assert.NoError(t, fe.consumeLogs(context.Background(), ld))
assert.NoError(t, fe.Shutdown(context.Background()))
defer func() {
assert.NoError(t, fe.Shutdown(context.Background()))
}()

fi, err := os.Open(fe.writer.path)
assert.NoError(t, err)
Expand Down Expand Up @@ -631,11 +620,6 @@ func safeFileExporterWrite(e *fileExporter, d []byte) (int, error) {
return e.writer.file.Write(d)
}

func buildFileWriter(conf *Config) (*fileWriter, error) {
export := buildExportFunc(conf)
return newFileWriter(conf.Path, conf.Rotation, conf.FlushInterval, export)
}

func TestFlushing(t *testing.T) {
cfg := &Config{
Path: tempFileName(t),
Expand All @@ -648,16 +632,28 @@ func TestFlushing(t *testing.T) {
// Wrap the buffer with the buffered writer closer that implements flush() method.
bwc := newBufferedWriteCloser(buf)
// Create a file exporter with flushing enabled.
feI, err := newFileExporter(cfg)
assert.NoError(t, err)
feI := newFileExporter(cfg)
assert.IsType(t, &fileExporter{}, feI)
fe := feI.(*fileExporter)
fe.writer.file.Close()
fe.writer.file = bwc

// Start the flusher.
ctx := context.Background()
assert.NoError(t, fe.Start(ctx, nil))
fe.marshaller = &marshaller{
formatType: fe.conf.FormatType,
tracesMarshaler: tracesMarshalers[fe.conf.FormatType],
metricsMarshaler: metricsMarshalers[fe.conf.FormatType],
logsMarshaler: logsMarshalers[fe.conf.FormatType],
compression: fe.conf.Compression,
compressor: buildCompressor(fe.conf.Compression),
}
export := buildExportFunc(fe.conf)
var err error
fe.writer, err = newFileWriter(fe.conf.Path, fe.conf.Rotation, fe.conf.FlushInterval, export)
assert.NoError(t, err)
err = fe.writer.file.Close()
assert.NoError(t, err)
fe.writer.file = bwc
fe.writer.start()

// Write 10 bytes.
b := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
Expand Down
Loading

0 comments on commit 89c6207

Please sign in to comment.