diff --git a/pkg/storage/wal/manager.go b/pkg/storage/wal/manager.go index 6a4ef7f052ee..fd873ca71df0 100644 --- a/pkg/storage/wal/manager.go +++ b/pkg/storage/wal/manager.go @@ -103,30 +103,42 @@ type Manager struct { metrics *ManagerMetrics available *list.List pending *list.List - // firstAppend is the time of the first append to the segment at the - // front of the available list. It is used to know when the segment has - // exceeded the maximum age and should be moved to the pending list. - // It is reset each time this happens. - firstAppend time.Time - closed bool - mu sync.Mutex + closed bool + mu sync.Mutex // Used in tests. clock quartz.Clock } -// item is similar to PendingSegment, but it is an internal struct used in the -// available and pending lists. It contains a single-use result that is returned -// to callers appending to the WAL and a re-usable segment that is reset after -// each flush. -type item struct { +// segment is similar to PendingSegment, however it is an internal struct used +// in the available and pending lists. It contains a single-use result that is +// returned to callers appending to the WAL and a re-usable segment that is reset +// after each flush. +type segment struct { r *AppendResult w *SegmentWriter + + // firstAppend is the time of the first append to the segment. It is used to + // know when the segment has exceeded the maximum age and should be moved to + // the pending list. + firstAppend time.Time + + // moved is the time the segment was moved to the pending list. It is used + // to calculate the age of the segment. A segment is moved when it has + // exceeded the maximum age or the maximum size. + moved time.Time } // PendingSegment contains a result and the segment to be flushed. type PendingSegment struct { - Result *AppendResult - Writer *SegmentWriter + Result *AppendResult + Writer *SegmentWriter + FirstAppend time.Time + Moved time.Time +} + +// Age returns the age of the segment. +func (p *PendingSegment) Age() time.Duration { + return p.Moved.Sub(p.FirstAppend) } func NewManager(cfg Config, metrics *Metrics) (*Manager, error) { @@ -145,7 +157,7 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) { if err != nil { return nil, err } - m.available.PushBack(&item{ + m.available.PushBack(&segment{ r: &AppendResult{done: make(chan struct{})}, w: w, }) @@ -164,29 +176,29 @@ func (m *Manager) Append(r AppendRequest) (*AppendResult, error) { if el == nil { return nil, ErrFull } - it := el.Value.(*item) - if m.firstAppend.IsZero() { + s := el.Value.(*segment) + if s.firstAppend.IsZero() { // This is the first append to the segment. This time will be used in // know when the segment has exceeded its maximum age and should be // moved to the pending list. - m.firstAppend = m.clock.Now() + s.firstAppend = m.clock.Now() } - it.w.Append(r.TenantID, r.LabelsStr, r.Labels, r.Entries) - // If the segment exceeded the maximum age or the maximum size, move it to + s.w.Append(r.TenantID, r.LabelsStr, r.Labels, r.Entries) + // If the segment exceeded the maximum age or the maximum size, move s to // the closed list to be flushed. - if m.clock.Since(m.firstAppend) >= m.cfg.MaxAge || it.w.InputSize() >= m.cfg.MaxSegmentSize { - m.move(el, it) + if m.clock.Since(s.firstAppend) >= m.cfg.MaxAge || s.w.InputSize() >= m.cfg.MaxSegmentSize { + m.move(el, s) } - return it.r, nil + return s.r, nil } func (m *Manager) Close() { m.mu.Lock() defer m.mu.Unlock() if el := m.available.Front(); el != nil { - it := el.Value.(*item) - if it.w.InputSize() > 0 { - m.move(el, it) + s := el.Value.(*segment) + if s.w.InputSize() > 0 { + m.move(el, s) } } m.closed = true @@ -206,35 +218,40 @@ func (m *Manager) NextPending() (*PendingSegment, error) { return nil, nil } el := m.pending.Front() - it := el.Value.(*item) + s := el.Value.(*segment) m.pending.Remove(el) m.metrics.NumPending.Dec() m.metrics.NumFlushing.Inc() - return &PendingSegment{Result: it.r, Writer: it.w}, nil + return &PendingSegment{ + Result: s.r, + Writer: s.w, + FirstAppend: s.firstAppend, + Moved: s.moved, + }, nil } // Put resets the segment and puts it back in the available list to accept // writes. A PendingSegment should not be put back until it has been flushed. -func (m *Manager) Put(it *PendingSegment) { - it.Writer.Reset() +func (m *Manager) Put(s *PendingSegment) { + s.Writer.Reset() m.mu.Lock() defer m.mu.Unlock() m.metrics.NumFlushing.Dec() m.metrics.NumAvailable.Inc() - m.available.PushBack(&item{ + m.available.PushBack(&segment{ r: &AppendResult{done: make(chan struct{})}, - w: it.Writer, + w: s.Writer, }) } // move the element from the available list to the pending list and sets the // relevant metrics. -func (m *Manager) move(el *list.Element, it *item) { - m.pending.PushBack(it) +func (m *Manager) move(el *list.Element, s *segment) { + s.moved = m.clock.Now() + m.pending.PushBack(s) m.metrics.NumPending.Inc() m.available.Remove(el) m.metrics.NumAvailable.Dec() - m.firstAppend = time.Time{} } // moveFrontIfExpired moves the element from the front of the available list to @@ -242,9 +259,9 @@ func (m *Manager) move(el *list.Element, it *item) { // relevant metrics. func (m *Manager) moveFrontIfExpired() bool { if el := m.available.Front(); el != nil { - it := el.Value.(*item) - if !m.firstAppend.IsZero() && m.clock.Since(m.firstAppend) >= m.cfg.MaxAge { - m.move(el, it) + s := el.Value.(*segment) + if !s.firstAppend.IsZero() && m.clock.Since(s.firstAppend) >= m.cfg.MaxAge { + m.move(el, s) return true } } diff --git a/pkg/storage/wal/manager_test.go b/pkg/storage/wal/manager_test.go index 9dc49dce4759..a69fbee51ab2 100644 --- a/pkg/storage/wal/manager_test.go +++ b/pkg/storage/wal/manager_test.go @@ -255,9 +255,9 @@ func TestManager_NextPending(t *testing.T) { // There should be no segments waiting to be flushed as no data has been // written. - it, err := m.NextPending() + s, err := m.NextPending() require.NoError(t, err) - require.Nil(t, it) + require.Nil(t, s) // Append 1KB of data. lbs := labels.Labels{{Name: "a", Value: "b"}} @@ -271,14 +271,78 @@ func TestManager_NextPending(t *testing.T) { require.NoError(t, err) // There should be a segment waiting to be flushed. - it, err = m.NextPending() + s, err = m.NextPending() require.NoError(t, err) - require.NotNil(t, it) + require.NotNil(t, s) // There should be no more segments waiting to be flushed. - it, err = m.NextPending() + s, err = m.NextPending() require.NoError(t, err) - require.Nil(t, it) + require.Nil(t, s) +} + +func TestManager_NextPendingAge(t *testing.T) { + m, err := NewManager(Config{ + MaxAge: 100 * time.Millisecond, + MaxSegments: 1, + MaxSegmentSize: 1024, // 1KB + }, NewMetrics(nil)) + require.NoError(t, err) + + // Create a mock clock. + clock := quartz.NewMock(t) + m.clock = clock + + // Append 1B of data. + lbs := labels.Labels{{Name: "a", Value: "b"}} + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} + res, err := m.Append(AppendRequest{ + TenantID: "1", + Labels: lbs, + LabelsStr: lbs.String(), + Entries: entries, + }) + require.NoError(t, err) + require.NotNil(t, res) + + // Wait 100ms. The segment that was just appended to should have reached + // the maximum age. + clock.Advance(100 * time.Millisecond) + s, err := m.NextPending() + require.NoError(t, err) + require.NotNil(t, s) + require.Equal(t, 100*time.Millisecond, s.Age()) + m.Put(s) + + // Append 1KB of data using two separate append requests, 1ms apart. + entries = []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}} + res, err = m.Append(AppendRequest{ + TenantID: "1", + Labels: lbs, + LabelsStr: lbs.String(), + Entries: entries, + }) + require.NoError(t, err) + require.NotNil(t, res) + + // Wait 1ms and then append the rest of the data. + clock.Advance(time.Millisecond) + entries = []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}} + res, err = m.Append(AppendRequest{ + TenantID: "1", + Labels: lbs, + LabelsStr: lbs.String(), + Entries: entries, + }) + require.NoError(t, err) + require.NotNil(t, res) + + // The segment that was just appended to should have reached the maximum + // size. + s, err = m.NextPending() + require.NoError(t, err) + require.NotNil(t, s) + require.Equal(t, time.Millisecond, s.Age()) } func TestManager_NextPendingMaxAgeExceeded(t *testing.T) { @@ -307,18 +371,18 @@ func TestManager_NextPendingMaxAgeExceeded(t *testing.T) { // The segment that was just appended to has neither reached the maximum // age nor maximum size to be flushed. - it, err := m.NextPending() + s, err := m.NextPending() require.NoError(t, err) - require.Nil(t, it) + require.Nil(t, s) require.Equal(t, 1, m.available.Len()) require.Equal(t, 0, m.pending.Len()) // Wait 100ms. The segment that was just appended to should have reached // the maximum age. clock.Advance(100 * time.Millisecond) - it, err = m.NextPending() + s, err = m.NextPending() require.NoError(t, err) - require.NotNil(t, it) + require.NotNil(t, s) require.Equal(t, 0, m.available.Len()) require.Equal(t, 0, m.pending.Len()) } @@ -345,24 +409,24 @@ func TestManager_NextPendingWALClosed(t *testing.T) { // There should be no segments waiting to be flushed as neither the maximum // age nor maximum size has been exceeded. - it, err := m.NextPending() + s, err := m.NextPending() require.NoError(t, err) - require.Nil(t, it) + require.Nil(t, s) // Close the WAL. m.Close() // There should be one segment waiting to be flushed. - it, err = m.NextPending() + s, err = m.NextPending() require.NoError(t, err) - require.NotNil(t, it) + require.NotNil(t, s) // There are no more segments waiting to be flushed, and since the WAL is // closed, successive calls should return ErrClosed. for i := 0; i < 10; i++ { - it, err = m.NextPending() + s, err = m.NextPending() require.ErrorIs(t, err, ErrClosed) - require.Nil(t, it) + require.Nil(t, s) } } @@ -395,22 +459,22 @@ func TestManager_Put(t *testing.T) { require.Equal(t, 1, m.pending.Len()) // Getting the pending segment should remove it from the list. - it, err := m.NextPending() + s, err := m.NextPending() require.NoError(t, err) - require.NotNil(t, it) + require.NotNil(t, s) require.Equal(t, 0, m.available.Len()) require.Equal(t, 0, m.pending.Len()) // The segment should contain 1KB of data. - require.Equal(t, int64(1024), it.Writer.InputSize()) + require.Equal(t, int64(1024), s.Writer.InputSize()) // Putting it back should add it to the available list. - m.Put(it) + m.Put(s) require.Equal(t, 1, m.available.Len()) require.Equal(t, 0, m.pending.Len()) // The segment should be reset. - require.Equal(t, int64(0), it.Writer.InputSize()) + require.Equal(t, int64(0), s.Writer.InputSize()) } func TestManager_Metrics(t *testing.T) { @@ -465,9 +529,9 @@ wal_segments_pending 1 require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...)) // Get the segment from the pending list. - it, err := m.NextPending() + s, err := m.NextPending() require.NoError(t, err) - require.NotNil(t, it) + require.NotNil(t, s) expected = ` # HELP wal_segments_available The number of WAL segments accepting writes. # TYPE wal_segments_available gauge @@ -482,7 +546,7 @@ wal_segments_pending 0 require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...)) // Reset the segment and put it back in the available list. - m.Put(it) + m.Put(s) expected = ` # HELP wal_segments_available The number of WAL segments accepting writes. # TYPE wal_segments_available gauge