Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing Bucket index sync status file #5446

Merged
merged 4 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* [ENHANCEMENT] Add jitter to lifecycler heartbeat. #5404
* [ENHANCEMENT] Store Gateway: Add config `estimated_max_series_size_bytes` and `estimated_max_chunk_size_bytes` to address data overfetch. #5401
* [ENHANCEMENT] Distributor/Ingester: Add experimental `-distributor.sign_write_requests` flag to sign the write requests. #5430
* [ENHANCEMENT] Store Gateway/Querier/Compactor: Handling CMK Access Denied errors. #5420 #5442
* [ENHANCEMENT] Store Gateway/Querier/Compactor: Handling CMK Access Denied errors. #5420 #5442 #5446
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
34 changes: 32 additions & 2 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID
if err := bucketindex.DeleteIndex(ctx, c.bucketClient, userID, c.cfgProvider); err != nil {
return err
}

// Delete the bucket sync status
if err := bucketindex.DeleteIndexSyncStatus(ctx, c.bucketClient, userID); err != nil {
return err
}
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)

var deletedBlocks, failed int
Expand Down Expand Up @@ -321,15 +326,40 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
}
}

// Reading bucket index sync stats
idxs, err := bucketindex.ReadSyncStatus(ctx, c.bucketClient, userID, userLogger)

if err != nil {
level.Warn(userLogger).Log("msg", "error reading the bucket index status", "err", err)
idxs = bucketindex.Status{Version: bucketindex.SyncStatusFileVersion, NonQueryableReason: bucketindex.Unknown}
}

idxs.Status = bucketindex.Ok
alanprot marked this conversation as resolved.
Show resolved Hide resolved
idxs.SyncTime = time.Now().Unix()

// Read the bucket index.
idx, err := bucketindex.ReadIndex(ctx, c.bucketClient, userID, c.cfgProvider, c.logger)

defer func() {
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, idxs, userLogger)
alanprot marked this conversation as resolved.
Show resolved Hide resolved
}()

if errors.Is(err, bucketindex.ErrIndexCorrupted) {
level.Warn(userLogger).Log("msg", "found a corrupted bucket index, recreating it")
} else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
// Give up cleaning if we get access denied
level.Warn(userLogger).Log("msg", err.Error())
level.Warn(userLogger).Log("msg", "customer manager key access denied", "err", err)
idxs.Status = bucketindex.CustomerManagedKeyError
// Making the tenant non queryable until 3x the cleanup interval to give time to compactors and storegateways
// to reload the bucket index in case the key access is re-granted
idxs.NonQueryableUntil = time.Now().Add(3 * c.cfg.CleanupInterval).Unix()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can we extract the magic number 3 to some meaningfully name constant?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

idxs.NonQueryableReason = bucketindex.CustomerManagedKeyError

// Update the bucket index update time
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
return nil
} else if err != nil && !errors.Is(err, bucketindex.ErrIndexNotFound) {
idxs.Status = bucketindex.GenericError
return err
}

Expand All @@ -348,6 +378,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger)
idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx)
if err != nil {
idxs.Status = bucketindex.GenericError
return err
}

Expand Down Expand Up @@ -398,7 +429,6 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction))
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))

return nil
}

Expand Down
45 changes: 38 additions & 7 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ func TestBlocksCleaner(t *testing.T) {
func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
const userID = "user-1"

bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
bkt = bucketindex.BucketWithGlobalMarkers(bkt)

// Create blocks.
ctx := context.Background()
deletionDelay := 12 * time.Hour
bucketClient = &cortex_testutil.MockBucketFailure{
Bucket: bucketClient,
mbucket := &cortex_testutil.MockBucketFailure{
Bucket: bkt,
GetFailures: map[string]error{
path.Join(userID, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError,
},
Expand All @@ -77,12 +77,37 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
}

logger := log.NewNopLogger()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
scanner := tsdb.NewUsersScanner(mbucket, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil)
err := cleaner.cleanUser(ctx, userID, true)
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil)

// Clean User with no error
cleaner.bucketClient = bkt
err := cleaner.cleanUser(ctx, userID, false)
require.NoError(t, err)
s, err := bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
require.NoError(t, err)
require.Equal(t, bucketindex.Ok, s.Status)
require.Equal(t, int64(0), s.NonQueryableUntil)

// Clean with cmk error
cleaner.bucketClient = mbucket
err = cleaner.cleanUser(ctx, userID, false)
require.NoError(t, err)
s, err = bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
require.NoError(t, err)
require.Equal(t, bucketindex.CustomerManagedKeyError, s.Status)
require.Less(t, int64(0), s.NonQueryableUntil)

// Re grant access to the key
cleaner.bucketClient = bkt
err = cleaner.cleanUser(ctx, userID, false)
require.NoError(t, err)
s, err = bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
require.NoError(t, err)
require.Equal(t, bucketindex.Ok, s.Status)
require.Less(t, int64(0), s.NonQueryableUntil)
}

func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions) {
Expand Down Expand Up @@ -232,6 +257,9 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
require.NoError(t, err)
assert.ElementsMatch(t, tc.expectedBlocks, idx.Blocks.GetULIDs())
assert.ElementsMatch(t, tc.expectedMarks, idx.BlockDeletionMarks.GetULIDs())
s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, tc.userID, logger)
require.NoError(t, err)
require.Equal(t, bucketindex.Ok, s.Status)
}

assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(`
Expand Down Expand Up @@ -385,6 +413,9 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
require.NoError(t, err)
assert.ElementsMatch(t, []ulid.ULID{block1, block3}, idx.Blocks.GetULIDs())
assert.ElementsMatch(t, []ulid.ULID{block3}, idx.BlockDeletionMarks.GetULIDs())
s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, userID, logger)
require.NoError(t, err)
require.Equal(t, bucketindex.Ok, s.Status)
}

func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShard(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,15 @@ func (c *Compactor) compactUsers(ctx context.Context) {
continue
}

// Skipping compaction if the bucket index failed to sync due to CMK errors.
if idxs, err := bucketindex.ReadSyncStatus(ctx, c.bucketClient, userID, util_log.WithUserID(userID, c.logger)); err == nil {
if idxs.Status == bucketindex.CustomerManagedKeyError {
c.compactionRunSkippedTenants.Inc()
level.Info(c.logger).Log("msg", "skipping compactUser due CustomerManagedKeyError", "user", userID)
continue
}
}

ownedUsers[userID] = struct{}{}

if markedForDeletion, err := cortex_tsdb.TenantDeletionMarkExists(ctx, c.bucketClient, userID); err != nil {
Expand Down
58 changes: 58 additions & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,38 @@ func TestConfig_Validate(t *testing.T) {
}
}

func TestCompactor_SkipCompactionWhenCmkError(t *testing.T) {
t.Parallel()
userID := "user-1"

ss := bucketindex.Status{Status: bucketindex.CustomerManagedKeyError, Version: bucketindex.SyncStatusFileVersion}
content, err := json.Marshal(ss)
require.NoError(t, err)

// No user blocks stored in the bucket.
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{userID}, nil)
bucketClient.MockIter(userID+"/", []string{}, nil)
bucketClient.MockIter(userID+"/markers/", nil, nil)
bucketClient.MockGet(userID+"/bucket-index-sync-status.json", string(content), nil)
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil)

cfg := prepareConfig()
c, _, _, logs, _ := prepare(t, cfg, bucketClient, nil)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

// Wait until a run has completed.
cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} {
return prom_testutil.ToFloat64(c.compactionRunsCompleted)
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
assert.Contains(t, strings.Split(strings.TrimSpace(logs.String()), "\n"), `level=info component=compactor msg="skipping compactUser due CustomerManagedKeyError" user=user-1`)
}

func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -465,6 +497,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant(
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
bucketClient.MockGet(userID+"/bucket-index-sync-status.json", "", nil)
bucketClient.MockUpload(userID+"/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
bucketClient.MockGet(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil)
bucketClient.MockGet(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/no-compact-mark.json", "", nil)
Expand All @@ -473,6 +506,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant(
bucketClient.MockUpload(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/visit-mark.json", nil)
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)

c, _, tsdbPlannerMock, _, registry := prepare(t, prepareConfig(), bucketClient, nil)
tsdbPlannerMock.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, errors.New("Failed to plan"))
Expand Down Expand Up @@ -520,6 +554,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil)
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/no-compact-mark.json", "", nil)
Expand All @@ -534,10 +569,14 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/visit-mark.json", "", nil)
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockIter("user-2/markers/", nil, nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)

c, _, tsdbPlanner, logs, registry := prepare(t, prepareConfig(), bucketClient, nil)

Expand Down Expand Up @@ -652,6 +691,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", mockDeletionMarkJSON("01DTVP434PA9VFXSW2JKB3392D", time.Now()), nil)
bucketClient.MockGet("user-1/markers/01DTVP434PA9VFXSW2JKB3392D-deletion-mark.json", mockDeletionMarkJSON("01DTVP434PA9VFXSW2JKB3392D", time.Now()), nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)

// This block will be deleted by cleaner.
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil)
Expand All @@ -673,7 +713,9 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
bucketClient.MockDelete("user-1/markers/01DTW0ZCPDDNV4BV83Q2SV4QAZ-deletion-mark.json", nil)
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil)
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)

c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil)

Expand Down Expand Up @@ -775,6 +817,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) {
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", mockNoCompactBlockJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockUpload("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil)
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil)
Expand All @@ -795,10 +838,14 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) {

bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockIter("user-2/markers/", nil, nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)

c, _, tsdbPlanner, _, registry := prepare(t, prepareConfig(), bucketClient, nil)

Expand Down Expand Up @@ -843,13 +890,15 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T)
bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTVP434PA9VFXSW2JKB3392D/index"}, nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/index", "some index content", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
bucketClient.MockUpload("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
bucketClient.MockExists("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", false, nil)

bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", nil)
bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/index", nil)
bucketClient.MockDelete("user-1/bucket-index.json.gz", nil)
bucketClient.MockDelete("user-1/bucket-index-sync-status.json", nil)

c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil)

Expand Down Expand Up @@ -1006,6 +1055,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockUpload("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil)
bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil)
Expand All @@ -1024,8 +1074,12 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
bucketClient.MockUpload("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/visit-mark.json", nil)
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)

ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
Expand Down Expand Up @@ -1104,9 +1158,11 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil)
bucketClient.MockGet(userID+"/bucket-index-sync-status.json", "", nil)
bucketClient.MockUpload(userID+"/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
}

// Create a shared KV Store
Expand Down Expand Up @@ -1212,6 +1268,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit
Version: VisitMarkerVersion1,
}
visitMarkerFileContent, _ := json.Marshal(blockVisitMarker)
bucketClient.MockGet(userID+"/bucket-index-sync-status.json", "", nil)
bucketClient.MockGet(userID+"/"+blockID+"/meta.json", mockBlockMetaJSONWithTime(blockID, userID, blockTimes["startTime"], blockTimes["endTime"]), nil)
bucketClient.MockGet(userID+"/"+blockID+"/deletion-mark.json", "", nil)
bucketClient.MockGet(userID+"/"+blockID+"/no-compact-mark.json", "", nil)
Expand All @@ -1230,6 +1287,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit
bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil)
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
}

// Create a shared KV Store
Expand Down
Loading