From 6e02e73b2624731a35b8a531d6a7b656761a91af Mon Sep 17 00:00:00 2001 From: Nathan Burke Date: Fri, 13 Oct 2023 09:38:39 -0600 Subject: [PATCH] [extension/storage/filestorage] Add bbolt FSync as a config option (#27459) Description: Exposes bbolt fsync as a configuration option Link to tracking Issue: [20266](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/20266) Testing: Manual Testing, Updated unit tests for factory and client Documentation: Added change-log and documentation comments in config.go --------- Co-authored-by: Daniel Jaglowski --- .chloggen/feat_fsync_option_filestorage.yaml | 18 ++++++++++ extension/storage/filestorage/README.md | 3 ++ extension/storage/filestorage/client.go | 10 +++--- extension/storage/filestorage/client_test.go | 36 +++++++++---------- extension/storage/filestorage/config.go | 3 ++ extension/storage/filestorage/config_test.go | 1 + extension/storage/filestorage/extension.go | 2 +- extension/storage/filestorage/factory.go | 1 + extension/storage/filestorage/factory_test.go | 1 + .../storage/filestorage/testdata/config.yaml | 1 + 10 files changed, 52 insertions(+), 24 deletions(-) create mode 100644 .chloggen/feat_fsync_option_filestorage.yaml diff --git a/.chloggen/feat_fsync_option_filestorage.yaml b/.chloggen/feat_fsync_option_filestorage.yaml new file mode 100644 index 000000000000..34b3dbed80d9 --- /dev/null +++ b/.chloggen/feat_fsync_option_filestorage.yaml @@ -0,0 +1,18 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: 'entension/storage/filestorage' + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: 'Add support for setting bbolt fsync option' + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [20266] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/extension/storage/filestorage/README.md b/extension/storage/filestorage/README.md index 6c432438c4d6..24d72e93b193 100644 --- a/extension/storage/filestorage/README.md +++ b/extension/storage/filestorage/README.md @@ -25,6 +25,8 @@ The default directory is `%ProgramData%\Otelcol\FileStorage` on Windows and `/va `timeout` is the maximum time to wait for a file lock. This value does not need to be modified in most circumstances. The default timeout is `1s`. +`fsync` when set, will force the database to perform an fsync after each write. This helps to ensure database integretity if there is an interruption to the database process, but at the cost of performance. See [DB.NoSync](https://pkg.go.dev/go.etcd.io/bbolt#DB) for more information. + ## Compaction `compaction` defines how and when files should be compacted. There are two modes of compaction available (both of which can be set concurrently): - `compaction.on_start` (default: false), which happens when collector starts @@ -78,6 +80,7 @@ extensions: on_start: true directory: /tmp/ max_transaction_size: 65_536 + fsync: false service: extensions: [file_storage, file_storage/all_settings] diff --git a/extension/storage/filestorage/client.go b/extension/storage/filestorage/client.go index 24a1776cf38f..a2819a5f2988 100644 --- a/extension/storage/filestorage/client.go +++ b/extension/storage/filestorage/client.go @@ -37,17 +37,17 @@ type fileStorageClient struct { closed bool } -func bboltOptions(timeout time.Duration) *bbolt.Options { +func bboltOptions(timeout time.Duration, fSync bool) *bbolt.Options { return &bbolt.Options{ Timeout: timeout, - NoSync: true, + NoSync: !fSync, NoFreelistSync: true, FreelistType: bbolt.FreelistMapType, } } -func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compactionCfg *CompactionConfig) (*fileStorageClient, error) { - options := bboltOptions(timeout) +func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compactionCfg *CompactionConfig, fSync bool) (*fileStorageClient, error) { + options := bboltOptions(timeout, fSync) db, err := bbolt.Open(filePath, 0600, options) if err != nil { return nil, err @@ -172,7 +172,7 @@ func (c *fileStorageClient) Compact(compactionDirectory string, timeout time.Dur }() // use temporary file as compaction target - options := bboltOptions(timeout) + options := bboltOptions(timeout, false) c.compactionMutex.Lock() defer c.compactionMutex.Unlock() diff --git a/extension/storage/filestorage/client_test.go b/extension/storage/filestorage/client_test.go index 157b6eb3c533..8717f553b90e 100644 --- a/extension/storage/filestorage/client_test.go +++ b/extension/storage/filestorage/client_test.go @@ -21,7 +21,7 @@ import ( func TestClientOperations(t *testing.T) { dbFile := filepath.Join(t.TempDir(), "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, client.Close(context.TODO())) @@ -59,7 +59,7 @@ func TestClientBatchOperations(t *testing.T) { tempDir := t.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, client.Close(context.TODO())) @@ -180,7 +180,7 @@ func TestNewClientTransactionErrors(t *testing.T) { tempDir := t.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, timeout, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, timeout, &CompactionConfig{}, false) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, client.Close(context.TODO())) @@ -204,7 +204,7 @@ func TestNewClientErrorsOnInvalidBucket(t *testing.T) { tempDir := t.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.Error(t, err) require.Nil(t, client) @@ -259,7 +259,7 @@ func TestClientReboundCompaction(t *testing.T) { CheckInterval: checkInterval, ReboundNeededThresholdMiB: testCase.reboundNeededThresholdMiB, ReboundTriggerThresholdMiB: testCase.reboundTriggerThresholdMiB, - }) + }, false) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, client.Close(context.TODO())) @@ -348,7 +348,7 @@ func TestClientConcurrentCompaction(t *testing.T) { CheckInterval: stepInterval * 2, ReboundNeededThresholdMiB: 1, ReboundTriggerThresholdMiB: 5, - }) + }, false) require.NoError(t, err) t.Cleanup(func() { @@ -408,7 +408,7 @@ func BenchmarkClientGet(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -428,7 +428,7 @@ func BenchmarkClientGet100(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -451,7 +451,7 @@ func BenchmarkClientSet(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -471,7 +471,7 @@ func BenchmarkClientSet100(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -493,7 +493,7 @@ func BenchmarkClientDelete(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -519,7 +519,7 @@ func BenchmarkClientSetLargeDB(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -556,7 +556,7 @@ func BenchmarkClientInitLargeDB(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -575,7 +575,7 @@ func BenchmarkClientInitLargeDB(b *testing.B) { var tempClient *fileStorageClient b.ResetTimer() for n := 0; n < b.N; n++ { - tempClient, err = newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + tempClient, err = newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.StopTimer() err = tempClient.Close(ctx) @@ -593,7 +593,7 @@ func BenchmarkClientCompactLargeDBFile(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -620,7 +620,7 @@ func BenchmarkClientCompactLargeDBFile(b *testing.B) { testDbFile := filepath.Join(tempDir, fmt.Sprintf("my_db%d", n)) err = os.Link(dbFile, testDbFile) require.NoError(b, err) - client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}) + client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.StartTimer() require.NoError(b, client.Compact(tempDir, time.Second, 65536)) @@ -637,7 +637,7 @@ func BenchmarkClientCompactDb(b *testing.B) { tempDir := b.TempDir() dbFile := filepath.Join(tempDir, "my_db") - client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}) + client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.Cleanup(func() { require.NoError(b, client.Close(context.TODO())) @@ -664,7 +664,7 @@ func BenchmarkClientCompactDb(b *testing.B) { testDbFile := filepath.Join(tempDir, fmt.Sprintf("my_db%d", n)) err = os.Link(dbFile, testDbFile) require.NoError(b, err) - client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}) + client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}, false) require.NoError(b, err) b.StartTimer() require.NoError(b, client.Compact(tempDir, time.Second, 65536)) diff --git a/extension/storage/filestorage/config.go b/extension/storage/filestorage/config.go index fcc866d8226d..d71bbe0234fc 100644 --- a/extension/storage/filestorage/config.go +++ b/extension/storage/filestorage/config.go @@ -17,6 +17,9 @@ type Config struct { Timeout time.Duration `mapstructure:"timeout,omitempty"` Compaction *CompactionConfig `mapstructure:"compaction,omitempty"` + + // FSync specifies that fsync should be called after each database write + FSync bool `mapstructure:"fsync,omitempty"` } // CompactionConfig defines configuration for optional file storage compaction. diff --git a/extension/storage/filestorage/config_test.go b/extension/storage/filestorage/config_test.go index 73f3be89f987..11d898a55f17 100644 --- a/extension/storage/filestorage/config_test.go +++ b/extension/storage/filestorage/config_test.go @@ -47,6 +47,7 @@ func TestLoadConfig(t *testing.T) { CheckInterval: time.Second * 5, }, Timeout: 2 * time.Second, + FSync: true, }, }, } diff --git a/extension/storage/filestorage/extension.go b/extension/storage/filestorage/extension.go index 34ff70379670..fbac0e1ee7a6 100644 --- a/extension/storage/filestorage/extension.go +++ b/extension/storage/filestorage/extension.go @@ -64,7 +64,7 @@ func (lfs *localFileStorage) GetClient(_ context.Context, kind component.Kind, e rawName = sanitize(rawName) } absoluteName := filepath.Join(lfs.cfg.Directory, rawName) - client, err := newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction) + client, err := newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction, lfs.cfg.FSync) if err != nil { return nil, err diff --git a/extension/storage/filestorage/factory.go b/extension/storage/filestorage/factory.go index 6d7f0355cfec..ef3e04e9d3c7 100644 --- a/extension/storage/filestorage/factory.go +++ b/extension/storage/filestorage/factory.go @@ -47,6 +47,7 @@ func createDefaultConfig() component.Config { CheckInterval: defaultCompactionInterval, }, Timeout: time.Second, + FSync: false, } } diff --git a/extension/storage/filestorage/factory_test.go b/extension/storage/filestorage/factory_test.go index dc77d1a59c35..c022d76ba921 100644 --- a/extension/storage/filestorage/factory_test.go +++ b/extension/storage/filestorage/factory_test.go @@ -29,6 +29,7 @@ func TestFactory(t *testing.T) { require.Equal(t, expected, cfg.Directory) } require.Equal(t, time.Second, cfg.Timeout) + require.Equal(t, false, cfg.FSync) tests := []struct { name string diff --git a/extension/storage/filestorage/testdata/config.yaml b/extension/storage/filestorage/testdata/config.yaml index dad1ef9bb50e..4a923aee71fe 100644 --- a/extension/storage/filestorage/testdata/config.yaml +++ b/extension/storage/filestorage/testdata/config.yaml @@ -13,3 +13,4 @@ file_storage/all_settings: rebound_needed_threshold_mib: 128 max_transaction_size: 2048 timeout: 2s + fsync: true