Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compactor: remove malformed blocks after delay #1053

Merged
9 changes: 9 additions & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,12 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) {
id, err := ulid.Parse(filepath.Base(path))
return id, err == nil
}

// MetaExists checks whether the meta file exists for the provided block
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
func MetaExists(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (bool, error) {
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
exists, err := bkt.Exists(ctx, path.Join(id.String(), MetaFilename))
if err != nil {
return false, errors.Wrapf(err, "meta.json bkt get for %s", id.String())
}
return exists, nil
}
32 changes: 31 additions & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
ResolutionLevelRaw = ResolutionLevel(downsample.ResLevel0)
ResolutionLevel5m = ResolutionLevel(downsample.ResLevel1)
ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2)

consistencyDelay = time.Duration(30 * time.Minute)
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
)

var blockTooFreshSentinelError = errors.New("Block too fresh")
Expand Down Expand Up @@ -149,7 +151,8 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
}

// SyncMetas synchronizes all meta files from blocks in the bucket into
// the memory.
// the memory. It removes any partial blocks older than consistencyDelay
// from the bucket.
func (c *Syncer) SyncMetas(ctx context.Context) error {
c.mtx.Lock()
defer c.mtx.Unlock()
Expand Down Expand Up @@ -194,6 +197,9 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
continue
}
if err != nil {
if removed := c.removeIfMalformed(workCtx, id); removed {
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
continue
}
errChan <- err
return
}
Expand Down Expand Up @@ -271,6 +277,30 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta
return &meta, nil
}

func (c *Syncer) removeIfMalformed(ctx context.Context, id ulid.ULID) bool {
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
exists, err := block.MetaExists(ctx, c.logger, c.bkt, id)
if err != nil {
level.Warn(c.logger).Log("msg", "failed to check if block is malformed", "block", id)
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
return false
}
if exists {
// Meta exists, block is not malformed.
return false
}

if ulid.Now()-id.Time() <= uint64(consistencyDelay/time.Millisecond) {
// Consistency delay has not expired, so can't say block is malformed yet.
return false
}

if err := block.Delete(ctx, c.bkt, id); err != nil {
level.Warn(c.logger).Log("msg", "failed to delete malformed block", "block", id)
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
}
level.Info(c.logger).Log("msg", "deleted malformed block", "block", id)

return true
}

// GroupKey returns a unique identifier for the group the block belongs to. It considers
// the downsampling resolution and the block's labels.
func GroupKey(meta metadata.Meta) string {
Expand Down