Skip to content

Commit

Permalink
Bloomshipper: Use model.Time in MetaRef and BlockRef (#11566)
Browse files Browse the repository at this point in the history
The `Ref` of `MetaRef` and `BlockRef` do have start and end timestamps,
which are of type `int64`.
This however, can lead to the problem that different components might
use different precisions (`s`, `ms`, `ns`) for these timestamps.

This PR changes the current implementation of the bloom shipper to use
`ms` precision timestamps (using `model.Time`) instead of `s`, to match
the behaviour that we have to encode ChunkRefs to object store keys.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Jan 2, 2024
1 parent 76bf505 commit 0a0b7c8
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 98 deletions.
8 changes: 4 additions & 4 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,10 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
}
metaSearchParams := bloomshipper.MetaSearchParams{
TenantID: job.tenantID,
MinFingerprint: uint64(job.minFp),
MaxFingerprint: uint64(job.maxFp),
StartTimestamp: int64(job.from),
EndTimestamp: int64(job.through),
MinFingerprint: job.minFp,
MaxFingerprint: job.maxFp,
StartTimestamp: job.from,
EndTimestamp: job.through,
}
var metas []bloomshipper.Meta
//TODO Configure pool for these to avoid allocations
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomcompactor/chunkcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ func buildBlockFromBlooms(
TableName: job.tableName,
MinFingerprint: uint64(job.minFp),
MaxFingerprint: uint64(job.maxFp),
StartTimestamp: int64(job.from),
EndTimestamp: int64(job.through),
StartTimestamp: job.from,
EndTimestamp: job.through,
Checksum: checksum,
},
IndexPath: job.indexPath,
Expand All @@ -148,7 +148,7 @@ func buildBlockFromBlooms(
}

func createLocalDirName(workingDir string, job Job) string {
dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%s-%s", job.tableName, job.tenantID, job.minFp, job.maxFp, job.from, job.through)
dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%d-%d", job.tableName, job.tenantID, job.minFp, job.maxFp, job.from, job.through)
return filepath.Join(workingDir, dir)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/chunkcompactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func TestChunkCompactor_CompactNewChunks(t *testing.T) {
require.Equal(t, job.tableName, compactedBlock.TableName)
require.Equal(t, uint64(fp1), compactedBlock.MinFingerprint)
require.Equal(t, uint64(fp2), compactedBlock.MaxFingerprint)
require.Equal(t, chunkRef1.MinTime, compactedBlock.StartTimestamp)
require.Equal(t, chunkRef2.MaxTime, compactedBlock.EndTimestamp)
require.Equal(t, model.Time(chunkRef1.MinTime), compactedBlock.StartTimestamp)
require.Equal(t, model.Time(chunkRef2.MaxTime), compactedBlock.EndTimestamp)
require.Equal(t, indexPath, compactedBlock.IndexPath)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/mergecompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ func mergeCompactChunks(logger log.Logger,
TableName: job.tableName,
MinFingerprint: uint64(job.minFp),
MaxFingerprint: uint64(job.maxFp),
StartTimestamp: int64(job.from),
EndTimestamp: int64(job.through),
StartTimestamp: job.from,
EndTimestamp: job.through,
Checksum: checksum,
},
IndexPath: job.indexPath,
Expand Down
16 changes: 14 additions & 2 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,13 @@ func TestBloomGateway_StartStopService(t *testing.T) {
t.Cleanup(cm.Unregister)

p := config.PeriodConfig{
From: parseDayTime("2023-09-01"),
From: parseDayTime("2023-09-01"),
IndexTables: config.IndexPeriodicTableConfig{
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: 24 * time.Hour,
},
},
IndexType: config.TSDBType,
ObjectType: config.StorageTypeFileSystem,
Schema: "v13",
Expand Down Expand Up @@ -137,7 +143,13 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
t.Cleanup(cm.Unregister)

p := config.PeriodConfig{
From: parseDayTime("2023-09-01"),
From: parseDayTime("2023-09-01"),
IndexTables: config.IndexPeriodicTableConfig{
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: 24 * time.Hour,
},
},
IndexType: config.TSDBType,
ObjectType: config.StorageTypeFileSystem,
Schema: "v13",
Expand Down
44 changes: 19 additions & 25 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ type Ref struct {
TenantID string
TableName string
MinFingerprint, MaxFingerprint uint64
StartTimestamp, EndTimestamp int64
StartTimestamp, EndTimestamp model.Time
Checksum uint32
}

// Cmp returns the fingerprint's position relative to the bounds
func (b Ref) Cmp(fp uint64) v1.BoundsCheck {
if fp < b.MinFingerprint {
func (r Ref) Cmp(fp uint64) v1.BoundsCheck {
if fp < r.MinFingerprint {
return v1.Before
} else if fp > b.MaxFingerprint {
} else if fp > r.MaxFingerprint {
return v1.After
}
return v1.Overlap
Expand All @@ -67,11 +67,9 @@ type Meta struct {
}

type MetaSearchParams struct {
TenantID string
MinFingerprint uint64
MaxFingerprint uint64
StartTimestamp int64
EndTimestamp int64
TenantID string
MinFingerprint, MaxFingerprint model.Fingerprint
StartTimestamp, EndTimestamp model.Time
}

type MetaClient interface {
Expand Down Expand Up @@ -128,9 +126,7 @@ type BloomClient struct {
}

func (b *BloomClient) GetMetas(ctx context.Context, params MetaSearchParams) ([]Meta, error) {
start := model.TimeFromUnix(params.StartTimestamp)
end := model.TimeFromUnix(params.EndTimestamp)
tablesByPeriod := tablesByPeriod(b.periodicConfigs, start, end)
tablesByPeriod := tablesByPeriod(b.periodicConfigs, params.StartTimestamp, params.EndTimestamp)

var metas []Meta
for periodFrom, tables := range tablesByPeriod {
Expand All @@ -146,8 +142,8 @@ func (b *BloomClient) GetMetas(ctx context.Context, params MetaSearchParams) ([]
if err != nil {
return nil, err
}
if metaRef.MaxFingerprint < params.MinFingerprint || params.MaxFingerprint < metaRef.MinFingerprint ||
metaRef.StartTimestamp < params.StartTimestamp || params.EndTimestamp < metaRef.EndTimestamp {
if metaRef.MaxFingerprint < uint64(params.MinFingerprint) || uint64(params.MaxFingerprint) < metaRef.MinFingerprint ||
metaRef.StartTimestamp.Before(params.StartTimestamp) || metaRef.EndTimestamp.After(params.EndTimestamp) {
continue
}
meta, err := b.downloadMeta(ctx, metaRef, periodClient)
Expand Down Expand Up @@ -176,24 +172,23 @@ func (b *BloomClient) PutMeta(ctx context.Context, meta Meta) error {

func createBlockObjectKey(meta Ref) string {
blockParentFolder := fmt.Sprintf("%x-%x", meta.MinFingerprint, meta.MaxFingerprint)
filename := fmt.Sprintf("%v-%v-%x", meta.StartTimestamp, meta.EndTimestamp, meta.Checksum)
filename := fmt.Sprintf("%d-%d-%x", meta.StartTimestamp, meta.EndTimestamp, meta.Checksum)
return strings.Join([]string{rootFolder, meta.TableName, meta.TenantID, bloomsFolder, blockParentFolder, filename}, delimiter)
}

func createMetaObjectKey(meta Ref) string {
filename := fmt.Sprintf("%x-%x-%v-%v-%x", meta.MinFingerprint, meta.MaxFingerprint, meta.StartTimestamp, meta.EndTimestamp, meta.Checksum)
filename := fmt.Sprintf("%x-%x-%d-%d-%x", meta.MinFingerprint, meta.MaxFingerprint, meta.StartTimestamp, meta.EndTimestamp, meta.Checksum)
return strings.Join([]string{rootFolder, meta.TableName, meta.TenantID, metasFolder, filename}, delimiter)
}

func findPeriod(configs []config.PeriodConfig, timestamp int64) (config.DayTime, error) {
ts := model.TimeFromUnix(timestamp)
func findPeriod(configs []config.PeriodConfig, ts model.Time) (config.DayTime, error) {
for i := len(configs) - 1; i >= 0; i-- {
periodConfig := configs[i]
if periodConfig.From.Before(ts) || periodConfig.From.Equal(ts) {
return periodConfig.From, nil
}
}
return config.DayTime{}, fmt.Errorf("can not find period for timestamp %d", timestamp)
return config.DayTime{}, fmt.Errorf("can not find period for timestamp %d", ts)
}

func (b *BloomClient) DeleteMeta(ctx context.Context, meta Meta) error {
Expand Down Expand Up @@ -289,7 +284,6 @@ func (b *BloomClient) downloadMeta(ctx context.Context, metaRef MetaRef, client
return meta, nil
}

// todo cover with tests
func createMetaRef(objectKey string, tenantID string, tableName string) (MetaRef, error) {
fileName := objectKey[strings.LastIndex(objectKey, delimiter)+1:]
parts := strings.Split(fileName, fileNamePartDelimiter)
Expand Down Expand Up @@ -323,8 +317,8 @@ func createMetaRef(objectKey string, tenantID string, tableName string) (MetaRef
TableName: tableName,
MinFingerprint: minFingerprint,
MaxFingerprint: maxFingerprint,
StartTimestamp: startTimestamp,
EndTimestamp: endTimestamp,
StartTimestamp: model.Time(startTimestamp),
EndTimestamp: model.Time(endTimestamp),
Checksum: uint32(checksum),
},
FilePath: objectKey,
Expand Down Expand Up @@ -354,9 +348,9 @@ func tablesByPeriod(periodicConfigs []config.PeriodConfig, start, end model.Time

func tablesForRange(periodConfig config.PeriodConfig, from, to int64) []string {
interval := periodConfig.IndexTables.Period
intervalSeconds := interval.Seconds()
lower := from / int64(intervalSeconds)
upper := to / int64(intervalSeconds)
step := int64(interval.Seconds())
lower := from / step
upper := to / step
tables := make([]string, 0, 1+upper-lower)
prefix := periodConfig.IndexTables.Prefix
for i := lower; i <= upper; i++ {
Expand Down
Loading

0 comments on commit 0a0b7c8

Please sign in to comment.