Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sidecar: Handle intermediate restarts of sidecar gracefully. #941

Merged
merged 1 commit into from
Mar 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func Delete(ctx context.Context, bucket objstore.Bucket, id ulid.ULID) error {
}

// DownloadMeta downloads only meta file from bucket by block ID.
// TODO(bwplotka): Differentiate between network error & partial upload.
func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) {
rc, err := bkt.Get(ctx, path.Join(id.String(), MetaFilename))
if err != nil {
Expand Down
24 changes: 12 additions & 12 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ func newLazyOverlapChecker(logger log.Logger, bucket objstore.Bucket, labels fun
}

func (c *lazyOverlapChecker) sync(ctx context.Context) error {
level.Info(c.logger).Log("msg", "gathering all existing blocks from the remote bucket")
if err := c.bucket.Iter(ctx, "", func(path string) error {
id, ok := block.IsBlockDir(path)
if !ok {
Expand Down Expand Up @@ -253,6 +252,7 @@ func (c *lazyOverlapChecker) sync(ctx context.Context) error {

func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.BlockMeta) error {
if !c.synced {
level.Info(c.logger).Log("msg", "gathering all existing blocks from the remote bucket for check", "id", newMeta.ULID.String())
if err := c.sync(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -280,8 +280,8 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
meta, err := ReadMetaFile(s.dir)
if err != nil {
// If we encounter any error, proceed with an empty meta file and overwrite it later.
// The meta file is only used to deduplicate uploads, which are properly handled
// by the system if their occur anyway.
// The meta file is only used to avoid unnecessary bucket.Exists call,
// which are properly handled by the system if their occur anyway.
if !os.IsNotExist(err) {
level.Warn(s.logger).Log("msg", "reading meta file failed, will override it", "err", err)
}
Expand Down Expand Up @@ -316,6 +316,15 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
return nil
}

// Check against bucket if the meta file for this block exists.
ok, err := s.bucket.Exists(ctx, path.Join(m.ULID.String(), block.MetaFilename))
if err != nil {
return errors.Wrap(err, "check exists")
}
if ok {
return nil
}

// We only ship of the first compacted block level as normal flow.
if m.Compaction.Level > 1 {
if !s.uploadCompacted {
Expand All @@ -329,15 +338,6 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
}
}

// Check against bucket if the meta file for this block exists.
ok, err := s.bucket.Exists(ctx, path.Join(m.ULID.String(), block.MetaFilename))
if err != nil {
return errors.Wrap(err, "check exists")
}
if ok {
return nil
}

if err := s.upload(ctx, m); err != nil {
level.Error(s.logger).Log("msg", "shipping failed", "block", m.ULID, "err", err)
// No error returned, just log line. This is because we want other blocks to be uploaded even
Expand Down