Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Keep series that are still in WAL in checkpoints #577

Merged
merged 1 commit into from
Apr 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion head.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type Head struct {
symbols map[string]struct{}
values map[string]stringset // label names to possible values

deletedMtx sync.Mutex
deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

must be kept until what?

Suggested change
deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until.
deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until the last segment has samples referencing these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your suggestion doesn't make sense. This is a data structure, not an algorithm.


postings *index.MemPostings // postings lists for terms
}

Expand Down Expand Up @@ -234,6 +237,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
values: map[string]stringset{},
symbols: map[string]struct{}{},
postings: index.NewUnorderedMemPostings(),
deleted: map[uint64]int{},
}
h.metrics = newHeadMetrics(h, r)

Expand Down Expand Up @@ -557,7 +561,13 @@ func (h *Head) Truncate(mint int64) (err error) {
}

keep := func(id uint64) bool {
return h.series.getByID(id) != nil
if h.series.getByID(id) != nil {
return true
}
h.deletedMtx.Lock()
_, ok := h.deleted[id]
h.deletedMtx.Unlock()
return ok
}
h.metrics.checkpointCreationTotal.Inc()
if _, err = Checkpoint(h.wal, first, last, keep, mint); err != nil {
Expand All @@ -570,6 +580,17 @@ func (h *Head) Truncate(mint int64) (err error) {
// that supersedes them.
level.Error(h.logger).Log("msg", "truncating segments failed", "err", err)
}

// The checkpoint is written and segments before it is truncated, so we no
// longer need to track deleted series that are before it.
h.deletedMtx.Lock()
for ref, segment := range h.deleted {
if segment < first {
delete(h.deleted, ref)
}
}
h.deletedMtx.Unlock()

h.metrics.checkpointDeleteTotal.Inc()
if err := DeleteCheckpoints(h.wal.Dir(), last); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
Expand Down Expand Up @@ -953,6 +974,21 @@ func (h *Head) gc() {
// Remove deleted series IDs from the postings lists.
h.postings.Delete(deleted)

if h.wal != nil {
_, last, _ := h.wal.Segments()
h.deletedMtx.Lock()
// Keep series records until we're past segment 'last'
// because the WAL will still have samples records with
// this ref ID. If we didn't keep these series records then
// on start up when we replay the WAL, or any other code
// that reads the WAL, wouldn't be able to use those
// samples since we would have no labels for that ref ID.
for ref := range deleted {
gouthamve marked this conversation as resolved.
Show resolved Hide resolved
h.deleted[ref] = last
}
h.deletedMtx.Unlock()
}

// Rebuild symbols and label value indices from what is left in the postings terms.
symbols := make(map[string]struct{}, len(h.symbols))
values := make(map[string]stringset, len(h.values))
Expand Down
51 changes: 51 additions & 0 deletions head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,57 @@ func TestDeleteUntilCurMax(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, []tsdbutil.Sample{sample{11, 1}}, ressmpls)
}

func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe extend the test a bit like:
After the checkpoint add more samples for existing series. Read the wal from scratch and ensure that there are no samples with unknown series references.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure what you're getting at. There are no existing series after we've read the checkpoint, as we've deleted them.

dir, err := ioutil.TempDir("", "test_delete_wal")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
wlog, err := wal.NewSize(nil, nil, dir, 32768)
testutil.Ok(t, err)

// Enough samples to cause a checkpoint.
numSamples := 10000
hb, err := NewHead(nil, nil, wlog, int64(numSamples)*10)
testutil.Ok(t, err)
defer hb.Close()
for i := 0; i < numSamples; i++ {
app := hb.Appender()
_, err := app.Add(labels.Labels{{"a", "b"}}, int64(i), 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets extract all this in a separate func like in my other PR
https://github.com/prometheus/tsdb/pull/467/files#diff-7ae027963734feb4c8722aa99f033363R165

createHead(t, genSeries(....))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That'd clash with the existing name, plus that function doesn't have the WAL handling I need.

testutil.Ok(t, hb.Delete(0, int64(numSamples), labels.NewEqualMatcher("a", "b")))
testutil.Ok(t, hb.Truncate(1))
testutil.Ok(t, hb.Close())

// Confirm there's been a checkpoint.
cdir, _, err := LastCheckpoint(dir)
testutil.Ok(t, err)
// Read in checkpoint and WAL.
recs := readTestWAL(t, cdir)
recs = append(recs, readTestWAL(t, dir)...)

var series, samples, stones int
for _, rec := range recs {
switch rec.(type) {
case []RefSeries:
series++
case []RefSample:
samples++
case []Stone:
stones++
default:
t.Fatalf("unknown record type")
}
}
testutil.Equals(t, 1, series)
testutil.Equals(t, 9999, samples)
testutil.Equals(t, 1, stones)

}

func TestDelete_e2e(t *testing.T) {
numDatapoints := 1000
numRanges := 1000
Expand Down