Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Calculate the age of a WAL segment #13637

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 55 additions & 38 deletions pkg/storage/wal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,30 +103,42 @@ type Manager struct {
metrics *ManagerMetrics
available *list.List
pending *list.List
// firstAppend is the time of the first append to the segment at the
// front of the available list. It is used to know when the segment has
// exceeded the maximum age and should be moved to the pending list.
// It is reset each time this happens.
firstAppend time.Time
closed bool
mu sync.Mutex
closed bool
mu sync.Mutex
// Used in tests.
clock quartz.Clock
}

// item is similar to PendingSegment, but it is an internal struct used in the
// available and pending lists. It contains a single-use result that is returned
// to callers appending to the WAL and a re-usable segment that is reset after
// each flush.
type item struct {
// segment is similar to PendingSegment, however it is an internal struct used
// in the available and pending lists. It contains a single-use result that is
// returned to callers appending to the WAL and a re-usable segment that is reset
// after each flush.
type segment struct {
r *AppendResult
w *SegmentWriter

// firstAppend is the time of the first append to the segment. It is used to
// know when the segment has exceeded the maximum age and should be moved to
// the pending list.
firstAppend time.Time

// moved is the time the segment was moved to the pending list. It is used
// to calculate the age of the segment. A segment is moved when it has
// exceeded the maximum age or the maximum size.
moved time.Time
}

// PendingSegment contains a result and the segment to be flushed.
type PendingSegment struct {
Result *AppendResult
Writer *SegmentWriter
Result *AppendResult
Writer *SegmentWriter
FirstAppend time.Time
Moved time.Time
}

// Age returns the age of the segment.
func (p *PendingSegment) Age() time.Duration {
return p.Moved.Sub(p.FirstAppend)
}

func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
Expand All @@ -145,7 +157,7 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
if err != nil {
return nil, err
}
m.available.PushBack(&item{
m.available.PushBack(&segment{
r: &AppendResult{done: make(chan struct{})},
w: w,
})
Expand All @@ -164,29 +176,29 @@ func (m *Manager) Append(r AppendRequest) (*AppendResult, error) {
if el == nil {
return nil, ErrFull
}
it := el.Value.(*item)
if m.firstAppend.IsZero() {
s := el.Value.(*segment)
if s.firstAppend.IsZero() {
// This is the first append to the segment. This time will be used in
// know when the segment has exceeded its maximum age and should be
// moved to the pending list.
m.firstAppend = m.clock.Now()
s.firstAppend = m.clock.Now()
}
it.w.Append(r.TenantID, r.LabelsStr, r.Labels, r.Entries)
// If the segment exceeded the maximum age or the maximum size, move it to
s.w.Append(r.TenantID, r.LabelsStr, r.Labels, r.Entries)
// If the segment exceeded the maximum age or the maximum size, move s to
// the closed list to be flushed.
if m.clock.Since(m.firstAppend) >= m.cfg.MaxAge || it.w.InputSize() >= m.cfg.MaxSegmentSize {
m.move(el, it)
if m.clock.Since(s.firstAppend) >= m.cfg.MaxAge || s.w.InputSize() >= m.cfg.MaxSegmentSize {
m.move(el, s)
}
return it.r, nil
return s.r, nil
}

func (m *Manager) Close() {
m.mu.Lock()
defer m.mu.Unlock()
if el := m.available.Front(); el != nil {
it := el.Value.(*item)
if it.w.InputSize() > 0 {
m.move(el, it)
s := el.Value.(*segment)
if s.w.InputSize() > 0 {
m.move(el, s)
}
}
m.closed = true
Expand All @@ -206,45 +218,50 @@ func (m *Manager) NextPending() (*PendingSegment, error) {
return nil, nil
}
el := m.pending.Front()
it := el.Value.(*item)
s := el.Value.(*segment)
m.pending.Remove(el)
m.metrics.NumPending.Dec()
m.metrics.NumFlushing.Inc()
return &PendingSegment{Result: it.r, Writer: it.w}, nil
return &PendingSegment{
Result: s.r,
Writer: s.w,
FirstAppend: s.firstAppend,
Moved: s.moved,
}, nil
}

// Put resets the segment and puts it back in the available list to accept
// writes. A PendingSegment should not be put back until it has been flushed.
func (m *Manager) Put(it *PendingSegment) {
it.Writer.Reset()
func (m *Manager) Put(s *PendingSegment) {
s.Writer.Reset()
m.mu.Lock()
defer m.mu.Unlock()
m.metrics.NumFlushing.Dec()
m.metrics.NumAvailable.Inc()
m.available.PushBack(&item{
m.available.PushBack(&segment{
r: &AppendResult{done: make(chan struct{})},
w: it.Writer,
w: s.Writer,
})
}

// move the element from the available list to the pending list and sets the
// relevant metrics.
func (m *Manager) move(el *list.Element, it *item) {
m.pending.PushBack(it)
func (m *Manager) move(el *list.Element, s *segment) {
s.moved = m.clock.Now()
m.pending.PushBack(s)
m.metrics.NumPending.Inc()
m.available.Remove(el)
m.metrics.NumAvailable.Dec()
m.firstAppend = time.Time{}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer needed as firstAppend is a field of segment, not Manager.

}

// moveFrontIfExpired moves the element from the front of the available list to
// the pending list if the segment has exceeded its maximum age and sets the
// relevant metrics.
func (m *Manager) moveFrontIfExpired() bool {
if el := m.available.Front(); el != nil {
it := el.Value.(*item)
if !m.firstAppend.IsZero() && m.clock.Since(m.firstAppend) >= m.cfg.MaxAge {
m.move(el, it)
s := el.Value.(*segment)
if !s.firstAppend.IsZero() && m.clock.Since(s.firstAppend) >= m.cfg.MaxAge {
m.move(el, s)
return true
}
}
Expand Down
112 changes: 88 additions & 24 deletions pkg/storage/wal/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ func TestManager_NextPending(t *testing.T) {

// There should be no segments waiting to be flushed as no data has been
// written.
it, err := m.NextPending()
s, err := m.NextPending()
require.NoError(t, err)
require.Nil(t, it)
require.Nil(t, s)

// Append 1KB of data.
lbs := labels.Labels{{Name: "a", Value: "b"}}
Expand All @@ -271,14 +271,78 @@ func TestManager_NextPending(t *testing.T) {
require.NoError(t, err)

// There should be a segment waiting to be flushed.
it, err = m.NextPending()
s, err = m.NextPending()
require.NoError(t, err)
require.NotNil(t, it)
require.NotNil(t, s)

// There should be no more segments waiting to be flushed.
it, err = m.NextPending()
s, err = m.NextPending()
require.NoError(t, err)
require.Nil(t, it)
require.Nil(t, s)
}

func TestManager_NextPendingAge(t *testing.T) {
m, err := NewManager(Config{
MaxAge: 100 * time.Millisecond,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
require.NoError(t, err)

// Create a mock clock.
clock := quartz.NewMock(t)
m.clock = clock

// Append 1B of data.
lbs := labels.Labels{{Name: "a", Value: "b"}}
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}}
res, err := m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
LabelsStr: lbs.String(),
Entries: entries,
})
require.NoError(t, err)
require.NotNil(t, res)

// Wait 100ms. The segment that was just appended to should have reached
// the maximum age.
clock.Advance(100 * time.Millisecond)
s, err := m.NextPending()
require.NoError(t, err)
require.NotNil(t, s)
require.Equal(t, 100*time.Millisecond, s.Age())
m.Put(s)

// Append 1KB of data using two separate append requests, 1ms apart.
entries = []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}}
res, err = m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
LabelsStr: lbs.String(),
Entries: entries,
})
require.NoError(t, err)
require.NotNil(t, res)

// Wait 1ms and then append the rest of the data.
clock.Advance(time.Millisecond)
entries = []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}}
res, err = m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
LabelsStr: lbs.String(),
Entries: entries,
})
require.NoError(t, err)
require.NotNil(t, res)

// The segment that was just appended to should have reached the maximum
// size.
s, err = m.NextPending()
require.NoError(t, err)
require.NotNil(t, s)
require.Equal(t, time.Millisecond, s.Age())
}

func TestManager_NextPendingMaxAgeExceeded(t *testing.T) {
Expand Down Expand Up @@ -307,18 +371,18 @@ func TestManager_NextPendingMaxAgeExceeded(t *testing.T) {

// The segment that was just appended to has neither reached the maximum
// age nor maximum size to be flushed.
it, err := m.NextPending()
s, err := m.NextPending()
require.NoError(t, err)
require.Nil(t, it)
require.Nil(t, s)
require.Equal(t, 1, m.available.Len())
require.Equal(t, 0, m.pending.Len())

// Wait 100ms. The segment that was just appended to should have reached
// the maximum age.
clock.Advance(100 * time.Millisecond)
it, err = m.NextPending()
s, err = m.NextPending()
require.NoError(t, err)
require.NotNil(t, it)
require.NotNil(t, s)
require.Equal(t, 0, m.available.Len())
require.Equal(t, 0, m.pending.Len())
}
Expand All @@ -345,24 +409,24 @@ func TestManager_NextPendingWALClosed(t *testing.T) {

// There should be no segments waiting to be flushed as neither the maximum
// age nor maximum size has been exceeded.
it, err := m.NextPending()
s, err := m.NextPending()
require.NoError(t, err)
require.Nil(t, it)
require.Nil(t, s)

// Close the WAL.
m.Close()

// There should be one segment waiting to be flushed.
it, err = m.NextPending()
s, err = m.NextPending()
require.NoError(t, err)
require.NotNil(t, it)
require.NotNil(t, s)

// There are no more segments waiting to be flushed, and since the WAL is
// closed, successive calls should return ErrClosed.
for i := 0; i < 10; i++ {
it, err = m.NextPending()
s, err = m.NextPending()
require.ErrorIs(t, err, ErrClosed)
require.Nil(t, it)
require.Nil(t, s)
}
}

Expand Down Expand Up @@ -395,22 +459,22 @@ func TestManager_Put(t *testing.T) {
require.Equal(t, 1, m.pending.Len())

// Getting the pending segment should remove it from the list.
it, err := m.NextPending()
s, err := m.NextPending()
require.NoError(t, err)
require.NotNil(t, it)
require.NotNil(t, s)
require.Equal(t, 0, m.available.Len())
require.Equal(t, 0, m.pending.Len())

// The segment should contain 1KB of data.
require.Equal(t, int64(1024), it.Writer.InputSize())
require.Equal(t, int64(1024), s.Writer.InputSize())

// Putting it back should add it to the available list.
m.Put(it)
m.Put(s)
require.Equal(t, 1, m.available.Len())
require.Equal(t, 0, m.pending.Len())

// The segment should be reset.
require.Equal(t, int64(0), it.Writer.InputSize())
require.Equal(t, int64(0), s.Writer.InputSize())
}

func TestManager_Metrics(t *testing.T) {
Expand Down Expand Up @@ -465,9 +529,9 @@ wal_segments_pending 1
require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...))

// Get the segment from the pending list.
it, err := m.NextPending()
s, err := m.NextPending()
require.NoError(t, err)
require.NotNil(t, it)
require.NotNil(t, s)
expected = `
# HELP wal_segments_available The number of WAL segments accepting writes.
# TYPE wal_segments_available gauge
Expand All @@ -482,7 +546,7 @@ wal_segments_pending 0
require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...))

// Reset the segment and put it back in the available list.
m.Put(it)
m.Put(s)
expected = `
# HELP wal_segments_available The number of WAL segments accepting writes.
# TYPE wal_segments_available gauge
Expand Down
Loading