diff --git a/head.go b/head.go index 26a42fb3..cfb7cb89 100644 --- a/head.go +++ b/head.go @@ -75,6 +75,9 @@ type Head struct { symbols map[string]struct{} values map[string]stringset // label names to possible values + deletedMtx sync.Mutex + deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until. + postings *index.MemPostings // postings lists for terms } @@ -234,6 +237,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), + deleted: map[uint64]int{}, } h.metrics = newHeadMetrics(h, r) @@ -557,7 +561,13 @@ func (h *Head) Truncate(mint int64) (err error) { } keep := func(id uint64) bool { - return h.series.getByID(id) != nil + if h.series.getByID(id) != nil { + return true + } + h.deletedMtx.Lock() + _, ok := h.deleted[id] + h.deletedMtx.Unlock() + return ok } h.metrics.checkpointCreationTotal.Inc() if _, err = Checkpoint(h.wal, first, last, keep, mint); err != nil { @@ -570,6 +580,17 @@ func (h *Head) Truncate(mint int64) (err error) { // that supersedes them. level.Error(h.logger).Log("msg", "truncating segments failed", "err", err) } + + // The checkpoint is written and segments before it is truncated, so we no + // longer need to track deleted series that are before it. + h.deletedMtx.Lock() + for ref, segment := range h.deleted { + if segment < first { + delete(h.deleted, ref) + } + } + h.deletedMtx.Unlock() + h.metrics.checkpointDeleteTotal.Inc() if err := DeleteCheckpoints(h.wal.Dir(), last); err != nil { // Leftover old checkpoints do not cause problems down the line beyond @@ -953,6 +974,21 @@ func (h *Head) gc() { // Remove deleted series IDs from the postings lists. h.postings.Delete(deleted) + if h.wal != nil { + _, last, _ := h.wal.Segments() + h.deletedMtx.Lock() + // Keep series records until we're past segment 'last' + // because the WAL will still have samples records with + // this ref ID. If we didn't keep these series records then + // on start up when we replay the WAL, or any other code + // that reads the WAL, wouldn't be able to use those + // samples since we would have no labels for that ref ID. + for ref := range deleted { + h.deleted[ref] = last + } + h.deletedMtx.Unlock() + } + // Rebuild symbols and label value indices from what is left in the postings terms. symbols := make(map[string]struct{}, len(h.symbols)) values := make(map[string]stringset, len(h.values)) diff --git a/head_test.go b/head_test.go index 7e8730b9..4fbdec03 100644 --- a/head_test.go +++ b/head_test.go @@ -501,6 +501,57 @@ func TestDeleteUntilCurMax(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, []tsdbutil.Sample{sample{11, 1}}, ressmpls) } + +func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { + dir, err := ioutil.TempDir("", "test_delete_wal") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + wlog, err := wal.NewSize(nil, nil, dir, 32768) + testutil.Ok(t, err) + + // Enough samples to cause a checkpoint. + numSamples := 10000 + hb, err := NewHead(nil, nil, wlog, int64(numSamples)*10) + testutil.Ok(t, err) + defer hb.Close() + for i := 0; i < numSamples; i++ { + app := hb.Appender() + _, err := app.Add(labels.Labels{{"a", "b"}}, int64(i), 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + } + testutil.Ok(t, hb.Delete(0, int64(numSamples), labels.NewEqualMatcher("a", "b"))) + testutil.Ok(t, hb.Truncate(1)) + testutil.Ok(t, hb.Close()) + + // Confirm there's been a checkpoint. + cdir, _, err := LastCheckpoint(dir) + testutil.Ok(t, err) + // Read in checkpoint and WAL. + recs := readTestWAL(t, cdir) + recs = append(recs, readTestWAL(t, dir)...) + + var series, samples, stones int + for _, rec := range recs { + switch rec.(type) { + case []RefSeries: + series++ + case []RefSample: + samples++ + case []Stone: + stones++ + default: + t.Fatalf("unknown record type") + } + } + testutil.Equals(t, 1, series) + testutil.Equals(t, 9999, samples) + testutil.Equals(t, 1, stones) + +} + func TestDelete_e2e(t *testing.T) { numDatapoints := 1000 numRanges := 1000