Skip to content

Commit

Permalink
Implementing Bucket index sync status file (cortexproject#5446)
Browse files Browse the repository at this point in the history
* Implementing Bucket index sync status

Signed-off-by: Alan Protasio <[email protected]>

* fixing bug when returning from cache

Signed-off-by: Alan Protasio <[email protected]>

* Addressing some comments

Signed-off-by: Alan Protasio <[email protected]>

* Changelog

Signed-off-by: Alan Protasio <[email protected]>

---------

Signed-off-by: Alan Protasio <[email protected]>
  • Loading branch information
alanprot authored Jul 11, 2023
1 parent 7ef74ea commit 2c37922
Show file tree
Hide file tree
Showing 15 changed files with 426 additions and 86 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
* [ENHANCEMENT] Add jitter to lifecycler heartbeat. #5404
* [ENHANCEMENT] Store Gateway: Add config `estimated_max_series_size_bytes` and `estimated_max_chunk_size_bytes` to address data overfetch. #5401
* [ENHANCEMENT] Distributor/Ingester: Add experimental `-distributor.sign_write_requests` flag to sign the write requests. #5430
* [ENHANCEMENT] Store Gateway/Querier/Compactor: Handling CMK Access Denied errors. #5420 #5442
* [ENHANCEMENT] Store Gateway/Querier/Compactor: Handling CMK Access Denied errors. #5420 #5442 #5446
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
34 changes: 32 additions & 2 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID
if err := bucketindex.DeleteIndex(ctx, c.bucketClient, userID, c.cfgProvider); err != nil {
return err
}

// Delete the bucket sync status
if err := bucketindex.DeleteIndexSyncStatus(ctx, c.bucketClient, userID); err != nil {
return err
}
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)

var deletedBlocks, failed int
Expand Down Expand Up @@ -321,15 +326,40 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
}
}

// Reading bucket index sync stats
idxs, err := bucketindex.ReadSyncStatus(ctx, c.bucketClient, userID, userLogger)

if err != nil {
level.Warn(userLogger).Log("msg", "error reading the bucket index status", "err", err)
idxs = bucketindex.Status{Version: bucketindex.SyncStatusFileVersion, NonQueryableReason: bucketindex.Unknown}
}

idxs.Status = bucketindex.Ok
idxs.SyncTime = time.Now().Unix()

// Read the bucket index.
idx, err := bucketindex.ReadIndex(ctx, c.bucketClient, userID, c.cfgProvider, c.logger)

defer func() {
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, idxs, userLogger)
}()

if errors.Is(err, bucketindex.ErrIndexCorrupted) {
level.Warn(userLogger).Log("msg", "found a corrupted bucket index, recreating it")
} else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
// Give up cleaning if we get access denied
level.Warn(userLogger).Log("msg", err.Error())
level.Warn(userLogger).Log("msg", "customer manager key access denied", "err", err)
idxs.Status = bucketindex.CustomerManagedKeyError
// Making the tenant non queryable until 3x the cleanup interval to give time to compactors and storegateways
// to reload the bucket index in case the key access is re-granted
idxs.NonQueryableUntil = time.Now().Add(3 * c.cfg.CleanupInterval).Unix()
idxs.NonQueryableReason = bucketindex.CustomerManagedKeyError

// Update the bucket index update time
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
return nil
} else if err != nil && !errors.Is(err, bucketindex.ErrIndexNotFound) {
idxs.Status = bucketindex.GenericError
return err
}

Expand All @@ -348,6 +378,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger)
idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx)
if err != nil {
idxs.Status = bucketindex.GenericError
return err
}

Expand Down Expand Up @@ -398,7 +429,6 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction))
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))

return nil
}

Expand Down
45 changes: 38 additions & 7 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ func TestBlocksCleaner(t *testing.T) {
func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
const userID = "user-1"

bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
bkt = bucketindex.BucketWithGlobalMarkers(bkt)

// Create blocks.
ctx := context.Background()
deletionDelay := 12 * time.Hour
bucketClient = &cortex_testutil.MockBucketFailure{
Bucket: bucketClient,
mbucket := &cortex_testutil.MockBucketFailure{
Bucket: bkt,
GetFailures: map[string]error{
path.Join(userID, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError,
},
Expand All @@ -77,12 +77,37 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
}

logger := log.NewNopLogger()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
scanner := tsdb.NewUsersScanner(mbucket, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil)
err := cleaner.cleanUser(ctx, userID, true)
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil)

// Clean User with no error
cleaner.bucketClient = bkt
err := cleaner.cleanUser(ctx, userID, false)
require.NoError(t, err)
s, err := bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
require.NoError(t, err)
require.Equal(t, bucketindex.Ok, s.Status)
require.Equal(t, int64(0), s.NonQueryableUntil)

// Clean with cmk error
cleaner.bucketClient = mbucket
err = cleaner.cleanUser(ctx, userID, false)
require.NoError(t, err)
s, err = bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
require.NoError(t, err)
require.Equal(t, bucketindex.CustomerManagedKeyError, s.Status)
require.Less(t, int64(0), s.NonQueryableUntil)

// Re grant access to the key
cleaner.bucketClient = bkt
err = cleaner.cleanUser(ctx, userID, false)
require.NoError(t, err)
s, err = bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
require.NoError(t, err)
require.Equal(t, bucketindex.Ok, s.Status)
require.Less(t, int64(0), s.NonQueryableUntil)
}

func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions) {
Expand Down Expand Up @@ -232,6 +257,9 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
require.NoError(t, err)
assert.ElementsMatch(t, tc.expectedBlocks, idx.Blocks.GetULIDs())
assert.ElementsMatch(t, tc.expectedMarks, idx.BlockDeletionMarks.GetULIDs())
s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, tc.userID, logger)
require.NoError(t, err)
require.Equal(t, bucketindex.Ok, s.Status)
}

assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(`
Expand Down Expand Up @@ -385,6 +413,9 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
require.NoError(t, err)
assert.ElementsMatch(t, []ulid.ULID{block1, block3}, idx.Blocks.GetULIDs())
assert.ElementsMatch(t, []ulid.ULID{block3}, idx.BlockDeletionMarks.GetULIDs())
s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, userID, logger)
require.NoError(t, err)
require.Equal(t, bucketindex.Ok, s.Status)
}

func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShard(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,15 @@ func (c *Compactor) compactUsers(ctx context.Context) {
continue
}

// Skipping compaction if the bucket index failed to sync due to CMK errors.
if idxs, err := bucketindex.ReadSyncStatus(ctx, c.bucketClient, userID, util_log.WithUserID(userID, c.logger)); err == nil {
if idxs.Status == bucketindex.CustomerManagedKeyError {
c.compactionRunSkippedTenants.Inc()
level.Info(c.logger).Log("msg", "skipping compactUser due CustomerManagedKeyError", "user", userID)
continue
}
}

ownedUsers[userID] = struct{}{}

if markedForDeletion, err := cortex_tsdb.TenantDeletionMarkExists(ctx, c.bucketClient, userID); err != nil {
Expand Down
58 changes: 58 additions & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,38 @@ func TestConfig_Validate(t *testing.T) {
}
}

func TestCompactor_SkipCompactionWhenCmkError(t *testing.T) {
t.Parallel()
userID := "user-1"

ss := bucketindex.Status{Status: bucketindex.CustomerManagedKeyError, Version: bucketindex.SyncStatusFileVersion}
content, err := json.Marshal(ss)
require.NoError(t, err)

// No user blocks stored in the bucket.
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{userID}, nil)
bucketClient.MockIter(userID+"/", []string{}, nil)
bucketClient.MockIter(userID+"/markers/", nil, nil)
bucketClient.MockGet(userID+"/bucket-index-sync-status.json", string(content), nil)
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil)

cfg := prepareConfig()
c, _, _, logs, _ := prepare(t, cfg, bucketClient, nil)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

// Wait until a run has completed.
cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} {
return prom_testutil.ToFloat64(c.compactionRunsCompleted)
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
assert.Contains(t, strings.Split(strings.TrimSpace(logs.String()), "\n"), `level=info component=compactor msg="skipping compactUser due CustomerManagedKeyError" user=user-1`)
}

func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -466,6 +498,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant(
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
bucketClient.MockGet(userID+"/bucket-index-sync-status.json", "", nil)
bucketClient.MockUpload(userID+"/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
bucketClient.MockGet(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil)
bucketClient.MockGet(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/no-compact-mark.json", "", nil)
Expand All @@ -474,6 +507,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant(
bucketClient.MockUpload(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/visit-mark.json", nil)
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)

c, _, tsdbPlannerMock, _, registry := prepare(t, prepareConfig(), bucketClient, nil)
tsdbPlannerMock.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, errors.New("Failed to plan"))
Expand Down Expand Up @@ -521,6 +555,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil)
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/no-compact-mark.json", "", nil)
Expand All @@ -535,10 +570,14 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/visit-mark.json", "", nil)
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockIter("user-2/markers/", nil, nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)

c, _, tsdbPlanner, logs, registry := prepare(t, prepareConfig(), bucketClient, nil)

Expand Down Expand Up @@ -653,6 +692,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", mockDeletionMarkJSON("01DTVP434PA9VFXSW2JKB3392D", time.Now()), nil)
bucketClient.MockGet("user-1/markers/01DTVP434PA9VFXSW2JKB3392D-deletion-mark.json", mockDeletionMarkJSON("01DTVP434PA9VFXSW2JKB3392D", time.Now()), nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)

// This block will be deleted by cleaner.
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil)
Expand All @@ -674,7 +714,9 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
bucketClient.MockDelete("user-1/markers/01DTW0ZCPDDNV4BV83Q2SV4QAZ-deletion-mark.json", nil)
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil)
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)

c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil)

Expand Down Expand Up @@ -776,6 +818,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) {
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", mockNoCompactBlockJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockUpload("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil)
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil)
Expand All @@ -796,10 +839,14 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) {

bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockIter("user-2/markers/", nil, nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)

c, _, tsdbPlanner, _, registry := prepare(t, prepareConfig(), bucketClient, nil)

Expand Down Expand Up @@ -844,13 +891,15 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T)
bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTVP434PA9VFXSW2JKB3392D/index"}, nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/index", "some index content", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
bucketClient.MockUpload("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
bucketClient.MockExists("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", false, nil)

bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", nil)
bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/index", nil)
bucketClient.MockDelete("user-1/bucket-index.json.gz", nil)
bucketClient.MockDelete("user-1/bucket-index-sync-status.json", nil)

c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil)

Expand Down Expand Up @@ -1007,6 +1056,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockUpload("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil)
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil)
Expand All @@ -1025,8 +1075,12 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
bucketClient.MockUpload("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/visit-mark.json", nil)
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)

ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
Expand Down Expand Up @@ -1105,9 +1159,11 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
bucketClient.MockGet(userID+"/bucket-index-sync-status.json", "", nil)
bucketClient.MockUpload(userID+"/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
}

// Create a shared KV Store
Expand Down Expand Up @@ -1213,6 +1269,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit
Version: VisitMarkerVersion1,
}
visitMarkerFileContent, _ := json.Marshal(blockVisitMarker)
bucketClient.MockGet(userID+"/bucket-index-sync-status.json", "", nil)
bucketClient.MockGet(userID+"/"+blockID+"/meta.json", mockBlockMetaJSONWithTime(blockID, userID, blockTimes["startTime"], blockTimes["endTime"]), nil)
bucketClient.MockGet(userID+"/"+blockID+"/deletion-mark.json", "", nil)
bucketClient.MockGet(userID+"/"+blockID+"/no-compact-mark.json", "", nil)
Expand All @@ -1232,6 +1289,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit
bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil)
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
}

// Create a shared KV Store
Expand Down
Loading

0 comments on commit 2c37922

Please sign in to comment.