diff --git a/operator/internal/handlers/internal/storage/storage.go b/operator/internal/handlers/internal/storage/storage.go index b2d40d6cf32c..6276942fe32e 100644 --- a/operator/internal/handlers/internal/storage/storage.go +++ b/operator/internal/handlers/internal/storage/storage.go @@ -45,8 +45,9 @@ func BuildOptions(ctx context.Context, k k8s.Client, stack *lokiv1.LokiStack, fg } } + now := time.Now().UTC() storageSchemas, err := storage.BuildSchemaConfig( - time.Now().UTC(), + now, stack.Spec.Storage, stack.Status.Storage, ) @@ -59,6 +60,7 @@ func BuildOptions(ctx context.Context, k k8s.Client, stack *lokiv1.LokiStack, fg } objStore.Schemas = storageSchemas + objStore.AllowStructuredMetadata = allowStructuredMetadata(storageSchemas, now) if stack.Spec.Storage.TLS == nil { return objStore, nil @@ -98,3 +100,16 @@ func BuildOptions(ctx context.Context, k k8s.Client, stack *lokiv1.LokiStack, fg return objStore, nil } + +func allowStructuredMetadata(schemas []lokiv1.ObjectStorageSchema, now time.Time) bool { + activeVersion := lokiv1.ObjectStorageSchemaV11 + for _, s := range schemas { + time, _ := s.EffectiveDate.UTCTime() + if time.Before(now) { + activeVersion = s.Version + } + } + + return activeVersion != lokiv1.ObjectStorageSchemaV11 && + activeVersion != lokiv1.ObjectStorageSchemaV12 +} diff --git a/operator/internal/handlers/internal/storage/storage_test.go b/operator/internal/handlers/internal/storage/storage_test.go index 857d0be4de4a..05cfd88bb5ff 100644 --- a/operator/internal/handlers/internal/storage/storage_test.go +++ b/operator/internal/handlers/internal/storage/storage_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" @@ -560,3 +561,124 @@ func TestBuildOptions_WhenInvalidCAConfigMap_SetDegraded(t *testing.T) { require.Error(t, err) require.Equal(t, degradedErr, err) } + +func TestAllowStructuredMetadata(t *testing.T) { + testTime := time.Date(2024, 7, 1, 1, 0, 0, 0, time.UTC) + tt := []struct { + desc string + schemas []lokiv1.ObjectStorageSchema + wantAllow bool + }{ + { + desc: "disallow - no schemas", + schemas: []lokiv1.ObjectStorageSchema{}, + wantAllow: false, + }, + { + desc: "disallow - only v12", + schemas: []lokiv1.ObjectStorageSchema{ + { + Version: lokiv1.ObjectStorageSchemaV12, + EffectiveDate: "2024-07-01", + }, + }, + wantAllow: false, + }, + { + desc: "allow - only v13", + schemas: []lokiv1.ObjectStorageSchema{ + { + Version: lokiv1.ObjectStorageSchemaV13, + EffectiveDate: "2024-07-01", + }, + }, + wantAllow: true, + }, + { + desc: "disallow - v13 in future", + schemas: []lokiv1.ObjectStorageSchema{ + { + Version: lokiv1.ObjectStorageSchemaV12, + EffectiveDate: "2024-07-01", + }, + { + Version: lokiv1.ObjectStorageSchemaV13, + EffectiveDate: "2024-07-02", + }, + }, + wantAllow: false, + }, + { + desc: "disallow - v13 in past", + schemas: []lokiv1.ObjectStorageSchema{ + { + Version: lokiv1.ObjectStorageSchemaV13, + EffectiveDate: "2024-06-01", + }, + { + Version: lokiv1.ObjectStorageSchemaV12, + EffectiveDate: "2024-07-01", + }, + }, + wantAllow: false, + }, + { + desc: "disallow - v13 in past and future", + schemas: []lokiv1.ObjectStorageSchema{ + { + Version: lokiv1.ObjectStorageSchemaV13, + EffectiveDate: "2024-06-01", + }, + { + Version: lokiv1.ObjectStorageSchemaV12, + EffectiveDate: "2024-07-01", + }, + { + Version: lokiv1.ObjectStorageSchemaV13, + EffectiveDate: "2024-07-02", + }, + }, + wantAllow: false, + }, + { + desc: "allow - v13 active", + schemas: []lokiv1.ObjectStorageSchema{ + { + Version: lokiv1.ObjectStorageSchemaV12, + EffectiveDate: "2024-06-01", + }, + { + Version: lokiv1.ObjectStorageSchemaV13, + EffectiveDate: "2024-07-01", + }, + }, + wantAllow: true, + }, + { + desc: "allow - v13 active, v12 in future", + schemas: []lokiv1.ObjectStorageSchema{ + { + Version: lokiv1.ObjectStorageSchemaV13, + EffectiveDate: "2024-07-01", + }, + { + Version: lokiv1.ObjectStorageSchemaV12, + EffectiveDate: "2024-08-01", + }, + }, + wantAllow: true, + }, + } + + for _, tc := range tt { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + allow := allowStructuredMetadata(tc.schemas, testTime) + if allow != tc.wantAllow { + t.Errorf("got %v, want %v", allow, tc.wantAllow) + } + }) + } +} diff --git a/operator/internal/manifests/internal/config/build_test.go b/operator/internal/manifests/internal/config/build_test.go index f7131d217b89..1d52d98692df 100644 --- a/operator/internal/manifests/internal/config/build_test.go +++ b/operator/internal/manifests/internal/config/build_test.go @@ -114,7 +114,7 @@ limits_config: shard_streams: enabled: true desired_rate: 3MB - allow_structured_metadata: true + allow_structured_metadata: false memberlist: abort_if_cluster_join_fails: true advertise_port: 7946 @@ -371,7 +371,7 @@ limits_config: shard_streams: enabled: true desired_rate: 3MB - allow_structured_metadata: true + allow_structured_metadata: false memberlist: abort_if_cluster_join_fails: true advertise_port: 7946 @@ -797,7 +797,7 @@ limits_config: shard_streams: enabled: true desired_rate: 3MB - allow_structured_metadata: true + allow_structured_metadata: false memberlist: abort_if_cluster_join_fails: true advertise_port: 7946 @@ -1155,7 +1155,7 @@ limits_config: shard_streams: enabled: true desired_rate: 3MB - allow_structured_metadata: true + allow_structured_metadata: false memberlist: abort_if_cluster_join_fails: true advertise_port: 7946 @@ -1514,7 +1514,7 @@ limits_config: shard_streams: enabled: true desired_rate: 3MB - allow_structured_metadata: true + allow_structured_metadata: false memberlist: abort_if_cluster_join_fails: true advertise_port: 7946 @@ -1911,7 +1911,7 @@ limits_config: shard_streams: enabled: true desired_rate: 3MB - allow_structured_metadata: true + allow_structured_metadata: false memberlist: abort_if_cluster_join_fails: true advertise_port: 7946 @@ -2241,7 +2241,7 @@ limits_config: shard_streams: enabled: true desired_rate: 3MB - allow_structured_metadata: true + allow_structured_metadata: false memberlist: abort_if_cluster_join_fails: true advertise_port: 7946 @@ -2680,7 +2680,7 @@ limits_config: shard_streams: enabled: true desired_rate: 3MB - allow_structured_metadata: true + allow_structured_metadata: false memberlist: abort_if_cluster_join_fails: true advertise_port: 7946 @@ -3004,7 +3004,7 @@ limits_config: shard_streams: enabled: true desired_rate: 3MB - allow_structured_metadata: true + allow_structured_metadata: false memberlist: abort_if_cluster_join_fails: true advertise_port: 7946 @@ -3501,7 +3501,7 @@ limits_config: shard_streams: enabled: true desired_rate: 3MB - allow_structured_metadata: true + allow_structured_metadata: false memberlist: abort_if_cluster_join_fails: true advertise_addr: ${HASH_RING_INSTANCE_ADDR} @@ -3762,7 +3762,7 @@ limits_config: shard_streams: enabled: true desired_rate: 3MB - allow_structured_metadata: true + allow_structured_metadata: false memberlist: abort_if_cluster_join_fails: true advertise_addr: ${HASH_RING_INSTANCE_ADDR} @@ -4024,7 +4024,7 @@ limits_config: shard_streams: enabled: true desired_rate: 3MB - allow_structured_metadata: true + allow_structured_metadata: false memberlist: abort_if_cluster_join_fails: true advertise_port: 7946 @@ -4287,7 +4287,7 @@ limits_config: shard_streams: enabled: true desired_rate: 3MB - allow_structured_metadata: true + allow_structured_metadata: false memberlist: abort_if_cluster_join_fails: true advertise_port: 7946 @@ -4586,7 +4586,7 @@ limits_config: shard_streams: enabled: true desired_rate: 3MB - allow_structured_metadata: true + allow_structured_metadata: false memberlist: abort_if_cluster_join_fails: true advertise_port: 7946 @@ -4880,7 +4880,7 @@ limits_config: query_timeout: 1m volume_enabled: true volume_max_series: 1000 - allow_structured_metadata: true + allow_structured_metadata: false memberlist: abort_if_cluster_join_fails: true advertise_port: 7946 @@ -5124,11 +5124,13 @@ func defaultOptions() Options { func TestBuild_ConfigAndRuntimeConfig_Schemas(t *testing.T) { for _, tc := range []struct { - name string - schemaConfig []lokiv1.ObjectStorageSchema - shippers []string - expSchemaConfig string - expStorageConfig string + name string + schemaConfig []lokiv1.ObjectStorageSchema + shippers []string + allowStructuredMetadata bool + expSchemaConfig string + expStorageConfig string + expStructuredMetadata string }{ { name: "default_config_v11_schema", @@ -5156,6 +5158,8 @@ func TestBuild_ConfigAndRuntimeConfig_Schemas(t *testing.T) { resync_interval: 5m index_gateway_client: server_address: dns:///loki-index-gateway-grpc-lokistack-dev.default.svc.cluster.local:9095`, + expStructuredMetadata: ` + allow_structured_metadata: false`, }, { name: "v12_schema", @@ -5183,6 +5187,8 @@ func TestBuild_ConfigAndRuntimeConfig_Schemas(t *testing.T) { resync_interval: 5m index_gateway_client: server_address: dns:///loki-index-gateway-grpc-lokistack-dev.default.svc.cluster.local:9095`, + expStructuredMetadata: ` + allow_structured_metadata: false`, }, { name: "v13_schema", @@ -5192,7 +5198,8 @@ func TestBuild_ConfigAndRuntimeConfig_Schemas(t *testing.T) { EffectiveDate: "2024-01-01", }, }, - shippers: []string{"tsdb"}, + allowStructuredMetadata: true, + shippers: []string{"tsdb"}, expSchemaConfig: ` configs: - from: "2024-01-01" @@ -5210,6 +5217,8 @@ func TestBuild_ConfigAndRuntimeConfig_Schemas(t *testing.T) { resync_interval: 5m index_gateway_client: server_address: dns:///loki-index-gateway-grpc-lokistack-dev.default.svc.cluster.local:9095`, + expStructuredMetadata: ` + allow_structured_metadata: true`, }, { name: "multiple_schema", @@ -5227,7 +5236,8 @@ func TestBuild_ConfigAndRuntimeConfig_Schemas(t *testing.T) { EffectiveDate: "2024-01-01", }, }, - shippers: []string{"boltdb", "tsdb"}, + shippers: []string{"boltdb", "tsdb"}, + allowStructuredMetadata: true, expSchemaConfig: ` configs: - from: "2020-01-01" @@ -5266,6 +5276,8 @@ func TestBuild_ConfigAndRuntimeConfig_Schemas(t *testing.T) { resync_interval: 5m index_gateway_client: server_address: dns:///loki-index-gateway-grpc-lokistack-dev.default.svc.cluster.local:9095`, + expStructuredMetadata: ` + allow_structured_metadata: true`, }, } { t.Run(tc.name, func(t *testing.T) { @@ -5366,7 +5378,7 @@ limits_config: query_timeout: 1m volume_enabled: true volume_max_series: 1000 - allow_structured_metadata: true + ${STORAGE_STRUCTURED_METADATA} memberlist: abort_if_cluster_join_fails: true advertise_port: 7946 @@ -5416,9 +5428,11 @@ analytics: ` expCfg = strings.Replace(expCfg, "${SCHEMA_CONFIG}", tc.expSchemaConfig, 1) expCfg = strings.Replace(expCfg, "${STORAGE_CONFIG}", tc.expStorageConfig, 1) + expCfg = strings.Replace(expCfg, "${STORAGE_STRUCTURED_METADATA}", tc.expStructuredMetadata, 1) opts := defaultOptions() opts.ObjectStorage.Schemas = tc.schemaConfig + opts.ObjectStorage.AllowStructuredMetadata = tc.allowStructuredMetadata opts.Shippers = tc.shippers cfg, _, err := Build(opts) @@ -5540,7 +5554,7 @@ limits_config: query_timeout: 1m volume_enabled: true volume_max_series: 1000 - allow_structured_metadata: true + allow_structured_metadata: false memberlist: abort_if_cluster_join_fails: true advertise_port: 7946 @@ -5712,7 +5726,7 @@ limits_config: shard_streams: enabled: true desired_rate: 3MB - allow_structured_metadata: true + allow_structured_metadata: false memberlist: abort_if_cluster_join_fails: true advertise_port: 7946 diff --git a/operator/internal/manifests/internal/config/loki-config.yaml b/operator/internal/manifests/internal/config/loki-config.yaml index a68ad96cd4c2..da13920b770e 100644 --- a/operator/internal/manifests/internal/config/loki-config.yaml +++ b/operator/internal/manifests/internal/config/loki-config.yaml @@ -220,7 +220,7 @@ limits_config: enabled: true desired_rate: {{ . }}MB {{- end }} - allow_structured_metadata: true + allow_structured_metadata: {{ .ObjectStorage.AllowStructuredMetadata }} {{- with .GossipRing }} memberlist: abort_if_cluster_join_fails: true diff --git a/operator/internal/manifests/storage/options.go b/operator/internal/manifests/storage/options.go index 1f8d3d904b71..1adb80235b09 100644 --- a/operator/internal/manifests/storage/options.go +++ b/operator/internal/manifests/storage/options.go @@ -7,9 +7,10 @@ import ( // Options is used to configure Loki to integrate with // supported object storages. type Options struct { - Schemas []lokiv1.ObjectStorageSchema - SharedStore lokiv1.ObjectStorageSecretType - CredentialMode lokiv1.CredentialMode + Schemas []lokiv1.ObjectStorageSchema + SharedStore lokiv1.ObjectStorageSecretType + CredentialMode lokiv1.CredentialMode + AllowStructuredMetadata bool Azure *AzureStorageConfig GCS *GCSStorageConfig