diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 4f2a965e3dd2..d9a83ec5b710 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -472,10 +472,10 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, } metaSearchParams := bloomshipper.MetaSearchParams{ TenantID: job.tenantID, - MinFingerprint: uint64(job.minFp), - MaxFingerprint: uint64(job.maxFp), - StartTimestamp: int64(job.from), - EndTimestamp: int64(job.through), + MinFingerprint: job.minFp, + MaxFingerprint: job.maxFp, + StartTimestamp: job.from, + EndTimestamp: job.through, } var metas []bloomshipper.Meta //TODO Configure pool for these to avoid allocations diff --git a/pkg/bloomcompactor/chunkcompactor.go b/pkg/bloomcompactor/chunkcompactor.go index a949f26452d9..744a38b1ad5a 100644 --- a/pkg/bloomcompactor/chunkcompactor.go +++ b/pkg/bloomcompactor/chunkcompactor.go @@ -135,8 +135,8 @@ func buildBlockFromBlooms( TableName: job.tableName, MinFingerprint: uint64(job.minFp), MaxFingerprint: uint64(job.maxFp), - StartTimestamp: int64(job.from), - EndTimestamp: int64(job.through), + StartTimestamp: job.from, + EndTimestamp: job.through, Checksum: checksum, }, IndexPath: job.indexPath, @@ -148,7 +148,7 @@ func buildBlockFromBlooms( } func createLocalDirName(workingDir string, job Job) string { - dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%s-%s", job.tableName, job.tenantID, job.minFp, job.maxFp, job.from, job.through) + dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%d-%d", job.tableName, job.tenantID, job.minFp, job.maxFp, job.from, job.through) return filepath.Join(workingDir, dir) } diff --git a/pkg/bloomcompactor/chunkcompactor_test.go b/pkg/bloomcompactor/chunkcompactor_test.go index 4d19f24417d4..a89e4e967a1d 100644 --- a/pkg/bloomcompactor/chunkcompactor_test.go +++ b/pkg/bloomcompactor/chunkcompactor_test.go @@ -121,8 +121,8 @@ func TestChunkCompactor_CompactNewChunks(t *testing.T) { require.Equal(t, job.tableName, compactedBlock.TableName) require.Equal(t, uint64(fp1), compactedBlock.MinFingerprint) require.Equal(t, uint64(fp2), compactedBlock.MaxFingerprint) - require.Equal(t, chunkRef1.MinTime, compactedBlock.StartTimestamp) - require.Equal(t, chunkRef2.MaxTime, compactedBlock.EndTimestamp) + require.Equal(t, model.Time(chunkRef1.MinTime), compactedBlock.StartTimestamp) + require.Equal(t, model.Time(chunkRef2.MaxTime), compactedBlock.EndTimestamp) require.Equal(t, indexPath, compactedBlock.IndexPath) } diff --git a/pkg/bloomcompactor/mergecompactor.go b/pkg/bloomcompactor/mergecompactor.go index 94682579ac9e..0cf55cef86a7 100644 --- a/pkg/bloomcompactor/mergecompactor.go +++ b/pkg/bloomcompactor/mergecompactor.go @@ -137,8 +137,8 @@ func mergeCompactChunks(logger log.Logger, TableName: job.tableName, MinFingerprint: uint64(job.minFp), MaxFingerprint: uint64(job.maxFp), - StartTimestamp: int64(job.from), - EndTimestamp: int64(job.through), + StartTimestamp: job.from, + EndTimestamp: job.through, Checksum: checksum, }, IndexPath: job.indexPath, diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index f24b8bc8a4e2..b34e3d55852a 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -75,7 +75,13 @@ func TestBloomGateway_StartStopService(t *testing.T) { t.Cleanup(cm.Unregister) p := config.PeriodConfig{ - From: parseDayTime("2023-09-01"), + From: parseDayTime("2023-09-01"), + IndexTables: config.IndexPeriodicTableConfig{ + PeriodicTableConfig: config.PeriodicTableConfig{ + Prefix: "index_", + Period: 24 * time.Hour, + }, + }, IndexType: config.TSDBType, ObjectType: config.StorageTypeFileSystem, Schema: "v13", @@ -137,7 +143,13 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { t.Cleanup(cm.Unregister) p := config.PeriodConfig{ - From: parseDayTime("2023-09-01"), + From: parseDayTime("2023-09-01"), + IndexTables: config.IndexPeriodicTableConfig{ + PeriodicTableConfig: config.PeriodicTableConfig{ + Prefix: "index_", + Period: 24 * time.Hour, + }, + }, IndexType: config.TSDBType, ObjectType: config.StorageTypeFileSystem, Schema: "v13", diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index 5636d1916f18..50b26d57a3a7 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -33,15 +33,15 @@ type Ref struct { TenantID string TableName string MinFingerprint, MaxFingerprint uint64 - StartTimestamp, EndTimestamp int64 + StartTimestamp, EndTimestamp model.Time Checksum uint32 } // Cmp returns the fingerprint's position relative to the bounds -func (b Ref) Cmp(fp uint64) v1.BoundsCheck { - if fp < b.MinFingerprint { +func (r Ref) Cmp(fp uint64) v1.BoundsCheck { + if fp < r.MinFingerprint { return v1.Before - } else if fp > b.MaxFingerprint { + } else if fp > r.MaxFingerprint { return v1.After } return v1.Overlap @@ -67,11 +67,9 @@ type Meta struct { } type MetaSearchParams struct { - TenantID string - MinFingerprint uint64 - MaxFingerprint uint64 - StartTimestamp int64 - EndTimestamp int64 + TenantID string + MinFingerprint, MaxFingerprint model.Fingerprint + StartTimestamp, EndTimestamp model.Time } type MetaClient interface { @@ -128,9 +126,7 @@ type BloomClient struct { } func (b *BloomClient) GetMetas(ctx context.Context, params MetaSearchParams) ([]Meta, error) { - start := model.TimeFromUnix(params.StartTimestamp) - end := model.TimeFromUnix(params.EndTimestamp) - tablesByPeriod := tablesByPeriod(b.periodicConfigs, start, end) + tablesByPeriod := tablesByPeriod(b.periodicConfigs, params.StartTimestamp, params.EndTimestamp) var metas []Meta for periodFrom, tables := range tablesByPeriod { @@ -146,8 +142,8 @@ func (b *BloomClient) GetMetas(ctx context.Context, params MetaSearchParams) ([] if err != nil { return nil, err } - if metaRef.MaxFingerprint < params.MinFingerprint || params.MaxFingerprint < metaRef.MinFingerprint || - metaRef.StartTimestamp < params.StartTimestamp || params.EndTimestamp < metaRef.EndTimestamp { + if metaRef.MaxFingerprint < uint64(params.MinFingerprint) || uint64(params.MaxFingerprint) < metaRef.MinFingerprint || + metaRef.StartTimestamp.Before(params.StartTimestamp) || metaRef.EndTimestamp.After(params.EndTimestamp) { continue } meta, err := b.downloadMeta(ctx, metaRef, periodClient) @@ -176,24 +172,23 @@ func (b *BloomClient) PutMeta(ctx context.Context, meta Meta) error { func createBlockObjectKey(meta Ref) string { blockParentFolder := fmt.Sprintf("%x-%x", meta.MinFingerprint, meta.MaxFingerprint) - filename := fmt.Sprintf("%v-%v-%x", meta.StartTimestamp, meta.EndTimestamp, meta.Checksum) + filename := fmt.Sprintf("%d-%d-%x", meta.StartTimestamp, meta.EndTimestamp, meta.Checksum) return strings.Join([]string{rootFolder, meta.TableName, meta.TenantID, bloomsFolder, blockParentFolder, filename}, delimiter) } func createMetaObjectKey(meta Ref) string { - filename := fmt.Sprintf("%x-%x-%v-%v-%x", meta.MinFingerprint, meta.MaxFingerprint, meta.StartTimestamp, meta.EndTimestamp, meta.Checksum) + filename := fmt.Sprintf("%x-%x-%d-%d-%x", meta.MinFingerprint, meta.MaxFingerprint, meta.StartTimestamp, meta.EndTimestamp, meta.Checksum) return strings.Join([]string{rootFolder, meta.TableName, meta.TenantID, metasFolder, filename}, delimiter) } -func findPeriod(configs []config.PeriodConfig, timestamp int64) (config.DayTime, error) { - ts := model.TimeFromUnix(timestamp) +func findPeriod(configs []config.PeriodConfig, ts model.Time) (config.DayTime, error) { for i := len(configs) - 1; i >= 0; i-- { periodConfig := configs[i] if periodConfig.From.Before(ts) || periodConfig.From.Equal(ts) { return periodConfig.From, nil } } - return config.DayTime{}, fmt.Errorf("can not find period for timestamp %d", timestamp) + return config.DayTime{}, fmt.Errorf("can not find period for timestamp %d", ts) } func (b *BloomClient) DeleteMeta(ctx context.Context, meta Meta) error { @@ -289,7 +284,6 @@ func (b *BloomClient) downloadMeta(ctx context.Context, metaRef MetaRef, client return meta, nil } -// todo cover with tests func createMetaRef(objectKey string, tenantID string, tableName string) (MetaRef, error) { fileName := objectKey[strings.LastIndex(objectKey, delimiter)+1:] parts := strings.Split(fileName, fileNamePartDelimiter) @@ -323,8 +317,8 @@ func createMetaRef(objectKey string, tenantID string, tableName string) (MetaRef TableName: tableName, MinFingerprint: minFingerprint, MaxFingerprint: maxFingerprint, - StartTimestamp: startTimestamp, - EndTimestamp: endTimestamp, + StartTimestamp: model.Time(startTimestamp), + EndTimestamp: model.Time(endTimestamp), Checksum: uint32(checksum), }, FilePath: objectKey, @@ -354,9 +348,9 @@ func tablesByPeriod(periodicConfigs []config.PeriodConfig, start, end model.Time func tablesForRange(periodConfig config.PeriodConfig, from, to int64) []string { interval := periodConfig.IndexTables.Period - intervalSeconds := interval.Seconds() - lower := from / int64(intervalSeconds) - upper := to / int64(intervalSeconds) + step := int64(interval.Seconds()) + lower := from / step + upper := to / step tables := make([]string, 0, 1+upper-lower) prefix := periodConfig.IndexTables.Prefix for i := lower; i <= upper; i++ { diff --git a/pkg/storage/stores/shipper/bloomshipper/client_test.go b/pkg/storage/stores/shipper/bloomshipper/client_test.go index 7267856a4315..d6043febb48c 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/client_test.go @@ -13,7 +13,7 @@ import ( "testing" "time" - aws_io "github.com/aws/smithy-go/io" + awsio "github.com/aws/smithy-go/io" "github.com/google/uuid" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -28,9 +28,24 @@ const ( var ( // table 19627 - fixedDay = model.TimeFromUnix(time.Date(2023, time.September, 27, 0, 0, 0, 0, time.UTC).Unix()) + fixedDay = Date(2023, time.September, 27, 0, 0, 0) ) +func Date(year int, month time.Month, day, hour, min, sec int) model.Time { + date := time.Date(year, month, day, hour, min, sec, 0, time.UTC) + return model.TimeFromUnixNano(date.UnixNano()) +} + +func parseDayTime(s string) config.DayTime { + t, err := time.Parse("2006-01-02", s) + if err != nil { + panic(err) + } + return config.DayTime{ + Time: model.TimeFromUnix(t.Unix()), + } +} + func Test_BloomClient_GetMetas(t *testing.T) { shipper := createClient(t) @@ -57,8 +72,8 @@ func Test_BloomClient_GetMetas(t *testing.T) { TenantID: "tenantA", MinFingerprint: 50, MaxFingerprint: 150, - StartTimestamp: fixedDay.Add(-6 * day).Unix(), - EndTimestamp: fixedDay.Add(-1*day - 1*time.Hour).Unix(), + StartTimestamp: fixedDay.Add(-6 * day), + EndTimestamp: fixedDay.Add(-1*day - 1*time.Hour), }) require.NoError(t, err) require.ElementsMatch(t, expected, actual) @@ -75,26 +90,26 @@ func Test_BloomClient_PutMeta(t *testing.T) { "first-period-19621", 0xff, 0xfff, - time.Date(2023, time.September, 21, 5, 0, 0, 0, time.UTC).Unix(), - time.Date(2023, time.September, 21, 6, 0, 0, 0, time.UTC).Unix(), + Date(2023, time.September, 21, 5, 0, 0), + Date(2023, time.September, 21, 6, 0, 0), 0xaaa, "ignored-file-path-during-uploading", ), expectedStorage: "folder-1", - expectedFilePath: "bloom/first-period-19621/tenantA/metas/ff-fff-1695272400-1695276000-aaa", + expectedFilePath: "bloom/first-period-19621/tenantA/metas/ff-fff-1695272400000-1695276000000-aaa", }, "expected meta to be uploaded to the second folder": { source: createMetaEntity("tenantA", "second-period-19625", 200, 300, - time.Date(2023, time.September, 25, 0, 0, 0, 0, time.UTC).Unix(), - time.Date(2023, time.September, 25, 1, 0, 0, 0, time.UTC).Unix(), + Date(2023, time.September, 25, 0, 0, 0), + Date(2023, time.September, 25, 1, 0, 0), 0xbbb, "ignored-file-path-during-uploading", ), expectedStorage: "folder-2", - expectedFilePath: "bloom/second-period-19625/tenantA/metas/c8-12c-1695600000-1695603600-bbb", + expectedFilePath: "bloom/second-period-19625/tenantA/metas/c8-12c-1695600000000-1695603600000-bbb", }, } for name, data := range tests { @@ -131,26 +146,26 @@ func Test_BloomClient_DeleteMeta(t *testing.T) { "first-period-19621", 0xff, 0xfff, - time.Date(2023, time.September, 21, 5, 0, 0, 0, time.UTC).Unix(), - time.Date(2023, time.September, 21, 6, 0, 0, 0, time.UTC).Unix(), + Date(2023, time.September, 21, 5, 0, 0), + Date(2023, time.September, 21, 6, 0, 0), 0xaaa, "ignored-file-path-during-uploading", ), expectedStorage: "folder-1", - expectedFilePath: "bloom/first-period-19621/tenantA/metas/ff-fff-1695272400-1695276000-aaa", + expectedFilePath: "bloom/first-period-19621/tenantA/metas/ff-fff-1695272400000-1695276000000-aaa", }, "expected meta to be delete from the second folder": { source: createMetaEntity("tenantA", "second-period-19625", 200, 300, - time.Date(2023, time.September, 25, 0, 0, 0, 0, time.UTC).Unix(), - time.Date(2023, time.September, 25, 1, 0, 0, 0, time.UTC).Unix(), + Date(2023, time.September, 25, 0, 0, 0), + Date(2023, time.September, 25, 1, 0, 0), 0xbbb, "ignored-file-path-during-uploading", ), expectedStorage: "folder-2", - expectedFilePath: "bloom/second-period-19625/tenantA/metas/c8-12c-1695600000-1695603600-bbb", + expectedFilePath: "bloom/second-period-19625/tenantA/metas/c8-12c-1695600000000-1695603600000-bbb", }, } for name, data := range tests { @@ -175,10 +190,10 @@ func Test_BloomClient_DeleteMeta(t *testing.T) { func Test_BloomClient_GetBlocks(t *testing.T) { bloomClient := createClient(t) fsNamedStores := bloomClient.storageConfig.NamedStores.Filesystem - firstBlockPath := "bloom/first-period-19621/tenantA/blooms/eeee-ffff/1695272400-1695276000-1" + firstBlockPath := "bloom/first-period-19621/tenantA/blooms/eeee-ffff/1695272400000-1695276000000-1" firstBlockFullPath := filepath.Join(fsNamedStores["folder-1"].Directory, firstBlockPath) firstBlockData := createBlockFile(t, firstBlockFullPath) - secondBlockPath := "bloom/second-period-19624/tenantA/blooms/aaaa-bbbb/1695531600-1695535200-2" + secondBlockPath := "bloom/second-period-19624/tenantA/blooms/aaaa-bbbb/1695531600000-1695535200000-2" secondBlockFullPath := filepath.Join(fsNamedStores["folder-2"].Directory, secondBlockPath) secondBlockData := createBlockFile(t, secondBlockFullPath) require.FileExists(t, firstBlockFullPath) @@ -190,8 +205,8 @@ func Test_BloomClient_GetBlocks(t *testing.T) { TableName: "first-period-19621", MinFingerprint: 0xeeee, MaxFingerprint: 0xffff, - StartTimestamp: time.Date(2023, time.September, 21, 5, 0, 0, 0, time.UTC).Unix(), - EndTimestamp: time.Date(2023, time.September, 21, 6, 0, 0, 0, time.UTC).Unix(), + StartTimestamp: Date(2023, time.September, 21, 5, 0, 0), + EndTimestamp: Date(2023, time.September, 21, 6, 0, 0), Checksum: 1, }, BlockPath: firstBlockPath, @@ -202,8 +217,8 @@ func Test_BloomClient_GetBlocks(t *testing.T) { TableName: "second-period-19624", MinFingerprint: 0xaaaa, MaxFingerprint: 0xbbbb, - StartTimestamp: time.Date(2023, time.September, 24, 5, 0, 0, 0, time.UTC).Unix(), - EndTimestamp: time.Date(2023, time.September, 24, 6, 0, 0, 0, time.UTC).Unix(), + StartTimestamp: Date(2023, time.September, 24, 5, 0, 0), + EndTimestamp: Date(2023, time.September, 24, 6, 0, 0), Checksum: 2, }, BlockPath: secondBlockPath, @@ -232,13 +247,13 @@ func Test_BloomClient_PutBlocks(t *testing.T) { TableName: "first-period-19621", MinFingerprint: 0xeeee, MaxFingerprint: 0xffff, - StartTimestamp: time.Date(2023, time.September, 21, 5, 0, 0, 0, time.UTC).Unix(), - EndTimestamp: time.Date(2023, time.September, 21, 6, 0, 0, 0, time.UTC).Unix(), + StartTimestamp: Date(2023, time.September, 21, 5, 0, 0), + EndTimestamp: Date(2023, time.September, 21, 6, 0, 0), Checksum: 1, }, IndexPath: uuid.New().String(), }, - Data: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForFirstFolderData))}, + Data: awsio.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForFirstFolderData))}, } blockForSecondFolderData := "data2" @@ -249,13 +264,13 @@ func Test_BloomClient_PutBlocks(t *testing.T) { TableName: "second-period-19624", MinFingerprint: 0xaaaa, MaxFingerprint: 0xbbbb, - StartTimestamp: time.Date(2023, time.September, 24, 5, 0, 0, 0, time.UTC).Unix(), - EndTimestamp: time.Date(2023, time.September, 24, 6, 0, 0, 0, time.UTC).Unix(), + StartTimestamp: Date(2023, time.September, 24, 5, 0, 0), + EndTimestamp: Date(2023, time.September, 24, 6, 0, 0), Checksum: 2, }, IndexPath: uuid.New().String(), }, - Data: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForSecondFolderData))}, + Data: awsio.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForSecondFolderData))}, } results, err := bloomClient.PutBlocks(context.Background(), []Block{blockForFirstFolder, blockForSecondFolder}) @@ -263,7 +278,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) { require.Len(t, results, 2) firstResultBlock := results[0] path := firstResultBlock.BlockPath - require.Equal(t, "bloom/first-period-19621/tenantA/blooms/eeee-ffff/1695272400-1695276000-1", path) + require.Equal(t, "bloom/first-period-19621/tenantA/blooms/eeee-ffff/1695272400000-1695276000000-1", path) require.Equal(t, blockForFirstFolder.TenantID, firstResultBlock.TenantID) require.Equal(t, blockForFirstFolder.TableName, firstResultBlock.TableName) require.Equal(t, blockForFirstFolder.MinFingerprint, firstResultBlock.MinFingerprint) @@ -281,7 +296,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) { secondResultBlock := results[1] path = secondResultBlock.BlockPath - require.Equal(t, "bloom/second-period-19624/tenantA/blooms/aaaa-bbbb/1695531600-1695535200-2", path) + require.Equal(t, "bloom/second-period-19624/tenantA/blooms/aaaa-bbbb/1695531600000-1695535200000-2", path) require.Equal(t, blockForSecondFolder.TenantID, secondResultBlock.TenantID) require.Equal(t, blockForSecondFolder.TableName, secondResultBlock.TableName) require.Equal(t, blockForSecondFolder.MinFingerprint, secondResultBlock.MinFingerprint) @@ -302,9 +317,9 @@ func Test_BloomClient_PutBlocks(t *testing.T) { func Test_BloomClient_DeleteBlocks(t *testing.T) { bloomClient := createClient(t) fsNamedStores := bloomClient.storageConfig.NamedStores.Filesystem - block1Path := filepath.Join(fsNamedStores["folder-1"].Directory, "bloom/first-period-19621/tenantA/blooms/eeee-ffff/1695272400-1695276000-1") + block1Path := filepath.Join(fsNamedStores["folder-1"].Directory, "bloom/first-period-19621/tenantA/blooms/eeee-ffff/1695272400000-1695276000000-1") createBlockFile(t, block1Path) - block2Path := filepath.Join(fsNamedStores["folder-2"].Directory, "bloom/second-period-19624/tenantA/blooms/aaaa-bbbb/1695531600-1695535200-2") + block2Path := filepath.Join(fsNamedStores["folder-2"].Directory, "bloom/second-period-19624/tenantA/blooms/aaaa-bbbb/1695531600000-1695535200000-2") createBlockFile(t, block2Path) require.FileExists(t, block1Path) require.FileExists(t, block2Path) @@ -316,8 +331,8 @@ func Test_BloomClient_DeleteBlocks(t *testing.T) { TableName: "second-period-19624", MinFingerprint: 0xaaaa, MaxFingerprint: 0xbbbb, - StartTimestamp: time.Date(2023, time.September, 24, 5, 0, 0, 0, time.UTC).Unix(), - EndTimestamp: time.Date(2023, time.September, 24, 6, 0, 0, 0, time.UTC).Unix(), + StartTimestamp: Date(2023, time.September, 24, 5, 0, 0), + EndTimestamp: Date(2023, time.September, 24, 6, 0, 0), Checksum: 2, }, IndexPath: uuid.New().String(), @@ -328,8 +343,8 @@ func Test_BloomClient_DeleteBlocks(t *testing.T) { TableName: "first-period-19621", MinFingerprint: 0xeeee, MaxFingerprint: 0xffff, - StartTimestamp: time.Date(2023, time.September, 21, 5, 0, 0, 0, time.UTC).Unix(), - EndTimestamp: time.Date(2023, time.September, 21, 6, 0, 0, 0, time.UTC).Unix(), + StartTimestamp: Date(2023, time.September, 21, 5, 0, 0), + EndTimestamp: Date(2023, time.September, 21, 6, 0, 0), Checksum: 1, }, IndexPath: uuid.New().String(), @@ -500,7 +515,7 @@ func createPeriodConfigs() []config.PeriodConfig { { ObjectType: "folder-1", // from 2023-09-20: table range [19620:19623] - From: config.DayTime{Time: model.TimeFromUnix(time.Date(2023, time.September, 20, 0, 0, 0, 0, time.UTC).Unix())}, + From: parseDayTime("2023-09-20"), IndexTables: config.IndexPeriodicTableConfig{ PeriodicTableConfig: config.PeriodicTableConfig{ Period: day, @@ -510,7 +525,7 @@ func createPeriodConfigs() []config.PeriodConfig { { ObjectType: "folder-2", // from 2023-09-24: table range [19624:19627] - From: config.DayTime{Time: model.TimeFromUnix(time.Date(2023, time.September, 24, 0, 0, 0, 0, time.UTC).Unix())}, + From: parseDayTime("2023-09-24"), IndexTables: config.IndexPeriodicTableConfig{ PeriodicTableConfig: config.PeriodicTableConfig{ Period: day, @@ -522,15 +537,15 @@ func createPeriodConfigs() []config.PeriodConfig { } func createMetaInStorage(t *testing.T, folder string, tableName string, tenant string, minFingerprint uint64, maxFingerprint uint64, start model.Time) Meta { - startTimestamp := start.Unix() - endTimestamp := start.Add(12 * time.Hour).Unix() + end := start.Add(12 * time.Hour) metaChecksum := rand.Uint32() - metaFileName := fmt.Sprintf("%x-%x-%v-%v-%x", minFingerprint, maxFingerprint, startTimestamp, endTimestamp, metaChecksum) + // make sure this is equal to the createMetaObjectKey() + metaFileName := fmt.Sprintf("%x-%x-%d-%d-%x", minFingerprint, maxFingerprint, start, end, metaChecksum) metaFilePath := filepath.Join(rootFolder, tableName, tenant, metasFolder, metaFileName) err := os.MkdirAll(filepath.Join(folder, metaFilePath[:strings.LastIndex(metaFilePath, delimiter)]), 0700) require.NoError(t, err) - meta := createMetaEntity(tenant, tableName, minFingerprint, maxFingerprint, startTimestamp, endTimestamp, metaChecksum, metaFilePath) + meta := createMetaEntity(tenant, tableName, minFingerprint, maxFingerprint, start, end, metaChecksum, metaFilePath) metaFileContent, err := json.Marshal(meta) require.NoError(t, err) @@ -544,8 +559,8 @@ func createMetaEntity( tableName string, minFingerprint uint64, maxFingerprint uint64, - startTimestamp int64, - endTimestamp int64, + startTimestamp model.Time, + endTimestamp model.Time, metaChecksum uint32, metaFilePath string) Meta { return Meta{ diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper.go b/pkg/storage/stores/shipper/bloomshipper/shipper.go index ee0665c4f6c3..d7038fc13761 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper.go @@ -4,11 +4,12 @@ import ( "cmp" "context" "fmt" - "time" + "math" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "golang.org/x/exp/slices" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" @@ -39,10 +40,10 @@ func NewShipper(client Client, config config.Config, limits Limits, logger log.L }, nil } -func (s *Shipper) GetBlockRefs(ctx context.Context, tenantID string, from, through time.Time) ([]BlockRef, error) { +func (s *Shipper) GetBlockRefs(ctx context.Context, tenantID string, from, through model.Time) ([]BlockRef, error) { level.Debug(s.logger).Log("msg", "GetBlockRefs", "tenant", tenantID, "from", from, "through", through) - blockRefs, err := s.getActiveBlockRefs(ctx, tenantID, from.UnixNano(), through.UnixNano(), nil) + blockRefs, err := s.getActiveBlockRefs(ctx, tenantID, from, through, []uint64{0, math.MaxUint64}) if err != nil { return nil, fmt.Errorf("error fetching active block references : %w", err) } @@ -85,10 +86,10 @@ func runCallback(callback ForEachBlockCallback, block blockWithQuerier) error { return nil } -func (s *Shipper) ForEachBlock(ctx context.Context, tenantID string, from, through time.Time, fingerprints []uint64, callback ForEachBlockCallback) error { +func (s *Shipper) ForEachBlock(ctx context.Context, tenantID string, from, through model.Time, fingerprints []uint64, callback ForEachBlockCallback) error { level.Debug(s.logger).Log("msg", "ForEachBlock", "tenant", tenantID, "from", from, "through", through, "fingerprints", len(fingerprints)) - blockRefs, err := s.getActiveBlockRefs(ctx, tenantID, from.UnixNano(), through.UnixNano(), fingerprints) + blockRefs, err := s.getActiveBlockRefs(ctx, tenantID, from, through, fingerprints) if err != nil { return fmt.Errorf("error fetching active block references : %w", err) } @@ -111,12 +112,12 @@ func getFirstLast[T any](s []T) (T, T) { return s[0], s[len(s)-1] } -func (s *Shipper) getActiveBlockRefs(ctx context.Context, tenantID string, from, through int64, fingerprints []uint64) ([]BlockRef, error) { +func (s *Shipper) getActiveBlockRefs(ctx context.Context, tenantID string, from, through model.Time, fingerprints []uint64) ([]BlockRef, error) { minFingerprint, maxFingerprint := getFirstLast(fingerprints) metas, err := s.client.GetMetas(ctx, MetaSearchParams{ TenantID: tenantID, - MinFingerprint: minFingerprint, - MaxFingerprint: maxFingerprint, + MinFingerprint: model.Fingerprint(minFingerprint), + MaxFingerprint: model.Fingerprint(maxFingerprint), StartTimestamp: from, EndTimestamp: through, }) @@ -137,7 +138,7 @@ func (s *Shipper) getActiveBlockRefs(ctx context.Context, tenantID string, from, return activeBlocks, nil } -func (s *Shipper) findBlocks(metas []Meta, startTimestamp, endTimestamp int64, fingerprints []uint64) []BlockRef { +func (s *Shipper) findBlocks(metas []Meta, startTimestamp, endTimestamp model.Time, fingerprints []uint64) []BlockRef { outdatedBlocks := make(map[string]interface{}) for _, meta := range metas { for _, tombstone := range meta.Tombstones { @@ -175,7 +176,7 @@ func getPosition[S ~[]E, E cmp.Ordered](s S, v E) int { return len(s) } -func isOutsideRange(b *BlockRef, startTimestamp, endTimestamp int64, fingerprints []uint64) bool { +func isOutsideRange(b *BlockRef, startTimestamp, endTimestamp model.Time, fingerprints []uint64) bool { // First, check time range if b.EndTimestamp < startTimestamp || b.StartTimestamp > endTimestamp { return true diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go index 17f21793680c..83c9379cd44c 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go @@ -5,6 +5,7 @@ import ( "math" "testing" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" ) @@ -190,8 +191,8 @@ func createBlockRef( TableName: "16600", MinFingerprint: minFingerprint, MaxFingerprint: maxFingerprint, - StartTimestamp: startTimestamp, - EndTimestamp: endTimestamp, + StartTimestamp: model.Time(startTimestamp), + EndTimestamp: model.Time(endTimestamp), Checksum: 0, }, // block path is unique, and it's used to distinguish the blocks so the rest of the fields might be skipped in this test diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go index e24d7e35c412..06e1d7a4675b 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ b/pkg/storage/stores/shipper/bloomshipper/store.go @@ -13,8 +13,8 @@ import ( type ForEachBlockCallback func(bq *v1.BlockQuerier, minFp, maxFp uint64) error type ReadShipper interface { - GetBlockRefs(ctx context.Context, tenant string, from, through time.Time) ([]BlockRef, error) - ForEachBlock(ctx context.Context, tenant string, from, through time.Time, fingerprints []uint64, callback ForEachBlockCallback) error + GetBlockRefs(ctx context.Context, tenant string, from, through model.Time) ([]BlockRef, error) + ForEachBlock(ctx context.Context, tenant string, from, through model.Time, fingerprints []uint64, callback ForEachBlockCallback) error Fetch(ctx context.Context, tenant string, blocks []BlockRef, callback ForEachBlockCallback) error } @@ -52,7 +52,7 @@ func (bs *BloomStore) Stop() { // GetBlockRefs implements Store func (bs *BloomStore) GetBlockRefs(ctx context.Context, tenant string, from, through time.Time) ([]BlockRef, error) { - return bs.shipper.GetBlockRefs(ctx, tenant, from, through) + return bs.shipper.GetBlockRefs(ctx, tenant, toModelTime(from), toModelTime(through)) } // ForEach implements Store @@ -80,7 +80,7 @@ func (bs *BloomStore) GetBlockQueriersForBlockRefs(ctx context.Context, tenant s // BlockQueriers implements Store func (bs *BloomStore) GetBlockQueriers(ctx context.Context, tenant string, from, through time.Time, fingerprints []uint64) ([]BlockQuerierWithFingerprintRange, error) { bqs := make([]BlockQuerierWithFingerprintRange, 0, 32) - err := bs.shipper.ForEachBlock(ctx, tenant, from, through, fingerprints, func(bq *v1.BlockQuerier, minFp uint64, maxFp uint64) error { + err := bs.shipper.ForEachBlock(ctx, tenant, toModelTime(from), toModelTime(through), fingerprints, func(bq *v1.BlockQuerier, minFp uint64, maxFp uint64) error { bqs = append(bqs, BlockQuerierWithFingerprintRange{ BlockQuerier: bq, MinFp: model.Fingerprint(minFp), @@ -93,3 +93,7 @@ func (bs *BloomStore) GetBlockQueriers(ctx context.Context, tenant string, from, }) return bqs, err } + +func toModelTime(t time.Time) model.Time { + return model.TimeFromUnixNano(t.UnixNano()) +}