Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Keep series that are still in WAL in checkpoints (#577)
Browse files Browse the repository at this point in the history
If all the samples are deleted for a series,
we should still keep the series in the WAL as
anything else reading the WAL will still care
about it in order to understand the samples.

Signed-off-by: Brian Brazil <[email protected]>
  • Loading branch information
brian-brazil committed Apr 9, 2019
1 parent 259847a commit dfed85e
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 1 deletion.
38 changes: 37 additions & 1 deletion head.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type Head struct {
symbols map[string]struct{}
values map[string]stringset // label names to possible values

deletedMtx sync.Mutex
deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until.

postings *index.MemPostings // postings lists for terms
}

Expand Down Expand Up @@ -234,6 +237,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
values: map[string]stringset{},
symbols: map[string]struct{}{},
postings: index.NewUnorderedMemPostings(),
deleted: map[uint64]int{},
}
h.metrics = newHeadMetrics(h, r)

Expand Down Expand Up @@ -557,7 +561,13 @@ func (h *Head) Truncate(mint int64) (err error) {
}

keep := func(id uint64) bool {
return h.series.getByID(id) != nil
if h.series.getByID(id) != nil {
return true
}
h.deletedMtx.Lock()
_, ok := h.deleted[id]
h.deletedMtx.Unlock()
return ok
}
h.metrics.checkpointCreationTotal.Inc()
if _, err = Checkpoint(h.wal, first, last, keep, mint); err != nil {
Expand All @@ -570,6 +580,17 @@ func (h *Head) Truncate(mint int64) (err error) {
// that supersedes them.
level.Error(h.logger).Log("msg", "truncating segments failed", "err", err)
}

// The checkpoint is written and segments before it is truncated, so we no
// longer need to track deleted series that are before it.
h.deletedMtx.Lock()
for ref, segment := range h.deleted {
if segment < first {
delete(h.deleted, ref)
}
}
h.deletedMtx.Unlock()

h.metrics.checkpointDeleteTotal.Inc()
if err := DeleteCheckpoints(h.wal.Dir(), last); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
Expand Down Expand Up @@ -953,6 +974,21 @@ func (h *Head) gc() {
// Remove deleted series IDs from the postings lists.
h.postings.Delete(deleted)

if h.wal != nil {
_, last, _ := h.wal.Segments()
h.deletedMtx.Lock()
// Keep series records until we're past segment 'last'
// because the WAL will still have samples records with
// this ref ID. If we didn't keep these series records then
// on start up when we replay the WAL, or any other code
// that reads the WAL, wouldn't be able to use those
// samples since we would have no labels for that ref ID.
for ref := range deleted {
h.deleted[ref] = last
}
h.deletedMtx.Unlock()
}

// Rebuild symbols and label value indices from what is left in the postings terms.
symbols := make(map[string]struct{}, len(h.symbols))
values := make(map[string]stringset, len(h.values))
Expand Down
51 changes: 51 additions & 0 deletions head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,57 @@ func TestDeleteUntilCurMax(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, []tsdbutil.Sample{sample{11, 1}}, ressmpls)
}

func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
dir, err := ioutil.TempDir("", "test_delete_wal")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
wlog, err := wal.NewSize(nil, nil, dir, 32768)
testutil.Ok(t, err)

// Enough samples to cause a checkpoint.
numSamples := 10000
hb, err := NewHead(nil, nil, wlog, int64(numSamples)*10)
testutil.Ok(t, err)
defer hb.Close()
for i := 0; i < numSamples; i++ {
app := hb.Appender()
_, err := app.Add(labels.Labels{{"a", "b"}}, int64(i), 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
}
testutil.Ok(t, hb.Delete(0, int64(numSamples), labels.NewEqualMatcher("a", "b")))
testutil.Ok(t, hb.Truncate(1))
testutil.Ok(t, hb.Close())

// Confirm there's been a checkpoint.
cdir, _, err := LastCheckpoint(dir)
testutil.Ok(t, err)
// Read in checkpoint and WAL.
recs := readTestWAL(t, cdir)
recs = append(recs, readTestWAL(t, dir)...)

var series, samples, stones int
for _, rec := range recs {
switch rec.(type) {
case []RefSeries:
series++
case []RefSample:
samples++
case []Stone:
stones++
default:
t.Fatalf("unknown record type")
}
}
testutil.Equals(t, 1, series)
testutil.Equals(t, 9999, samples)
testutil.Equals(t, 1, stones)

}

func TestDelete_e2e(t *testing.T) {
numDatapoints := 1000
numRanges := 1000
Expand Down

0 comments on commit dfed85e

Please sign in to comment.