Skip to content

Commit

Permalink
refactor: remove redundant delta calculations for rater (#795)
Browse files Browse the repository at this point in the history
When we calculate processing rate, we calculate deltas between all pairs of timestamped counts within lookback seconds. If two consecutive timestamped counts all finished collecting metrics, then the delta calculation should only execute once. This PR introduce the concept of `window`. When a timestamp counts finish collecting counts, we mark the it as window-closed and calculate the delta and save the delta. Such that future GetRate calls won't need to re-calculate the delta and instead directly use the delta attribute of that closed window. This brings down the time complexity of calculating vertex level rate from O(lookback seconds * no. of pods) to O(lookback seconds)

Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Jun 16, 2023
1 parent 402d827 commit 97db198
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 37 deletions.
74 changes: 42 additions & 32 deletions pkg/daemon/server/service/rater/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,31 @@ const IndexNotFound = -1

// UpdateCount updates the count of processed messages for a pod at a given time
func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podName string, count float64) {
items := q.Items()

// find the element matching the input timestamp and update it
for _, i := range q.Items() {
for _, i := range items {
if i.timestamp == time {
i.Update(podName, count)
return
}
}

// if we cannot find a matching element, it means we need to add a new timestamped count to the queue
tc := NewTimestampedCounts(time)
tc.Update(podName, count)

// close the window for the most recent timestamped count
switch n := q.Length(); n {
case 0:
// if the queue is empty, we just append the new timestamped count
case 1:
// if the queue has only one element, we close the window for this element
items[0].CloseWindow(nil)
default:
// if the queue has more than one element, we close the window for the most recent element
items[n-1].CloseWindow(items[n-2])
}
q.Append(tc)
}

Expand All @@ -47,43 +62,25 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec
}
counts := q.Items()
startIndex := findStartIndex(lookbackSeconds, counts)
if startIndex == IndexNotFound {
endIndex := findEndIndex(counts)
if startIndex == IndexNotFound || endIndex == IndexNotFound {
return 0
}

delta := float64(0)
// time diff in seconds.
timeDiff := counts[n-1].timestamp - counts[startIndex].timestamp
timeDiff := counts[endIndex].timestamp - counts[startIndex].timestamp
if timeDiff == 0 {
// if the time difference is 0, we return 0 to avoid division by 0
// this should not happen in practice because we are using a 10s interval
return 0
}
for i := startIndex; i < n-1; i++ {
delta = delta + calculateDelta(counts[i], counts[i+1])
}
return delta / float64(timeDiff)
}

func calculateDelta(c1, c2 *TimestampedCounts) float64 {
tc1 := c1.Snapshot()
tc2 := c2.Snapshot()
delta := float64(0)
// Iterate over the podCounts of the second TimestampedCounts
for pod, count2 := range tc2 {
// If the pod also exists in the first TimestampedCounts
if count1, ok := tc1[pod]; ok {
// If the count has decreased, it means the pod restarted
if count2 < count1 {
delta += count2
} else { // If the count has increased or stayed the same
delta += count2 - count1
}
} else { // If the pod only exists in the second TimestampedCounts, it's a new pod
delta += count2
for i := startIndex; i < endIndex; i++ {
if counts[i+1] != nil && counts[i+1].IsWindowClosed() {
delta += counts[i+1].delta
}
}
return delta
return delta / float64(timeDiff)
}

// CalculatePodRate calculates the rate of a pod in the last lookback seconds
Expand All @@ -94,20 +91,23 @@ func CalculatePodRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookback
}
counts := q.Items()
startIndex := findStartIndex(lookbackSeconds, counts)
if startIndex == IndexNotFound {
endIndex := findEndIndex(counts)
if startIndex == IndexNotFound || endIndex == IndexNotFound {
return 0
}

delta := float64(0)
// time diff in seconds.
timeDiff := counts[n-1].timestamp - counts[startIndex].timestamp
timeDiff := counts[endIndex].timestamp - counts[startIndex].timestamp
if timeDiff == 0 {
// if the time difference is 0, we return 0 to avoid division by 0
// this should not happen in practice because we are using a 10s interval
return 0
}
for i := startIndex; i < n-1; i++ {
delta = delta + calculatePodDelta(counts[i], counts[i+1], podName)
for i := startIndex; i < endIndex; i++ {
if c1, c2 := counts[i], counts[i+1]; c1 != nil && c2 != nil && c1.IsWindowClosed() && c2.IsWindowClosed() {
delta += calculatePodDelta(c1, c2, podName)
}
}
return delta / float64(timeDiff)
}
Expand All @@ -133,18 +133,28 @@ func calculatePodDelta(c1, c2 *TimestampedCounts, podName string) float64 {
func findStartIndex(lookbackSeconds int64, counts []*TimestampedCounts) int {
n := len(counts)
now := time.Now().Truncate(time.Second * 10).Unix()
if now-counts[n-2].timestamp > lookbackSeconds {
if n < 2 || now-counts[n-2].timestamp > lookbackSeconds {
// if the second last element is already outside the lookback window, we return IndexNotFound
return IndexNotFound
}

startIndex := n - 2
for i := n - 2; i >= 0; i-- {
if now-counts[i].timestamp <= lookbackSeconds {
if now-counts[i].timestamp <= lookbackSeconds && counts[i].IsWindowClosed() {
startIndex = i
} else {
break
}
}
return startIndex
}

func findEndIndex(counts []*TimestampedCounts) int {
for i := len(counts) - 1; i >= 0; i-- {
// if a window is not closed, we exclude it from the rate calculation
if counts[i].IsWindowClosed() {
return i
}
}
return IndexNotFound
}
50 changes: 49 additions & 1 deletion pkg/daemon/server/service/rater/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestUpdateCount(t *testing.T) {
assert.Equal(t, 10.0, q.Items()[0].podCounts["pod1"])
})

t.Run("givenTimeNotExistsCountAvailable_whenUpdate_thenUpdateNewTimeWithPod", func(t *testing.T) {
t.Run("givenTimeNotExistsCountAvailable_whenUpdate_thenUpdateNewTimeWithPodAndCloseWindowForPrevTime", func(t *testing.T) {
q := sharedqueue.New[*TimestampedCounts](1800)
tc := NewTimestampedCounts(TestTime)
tc.Update("pod1", 10.0)
Expand All @@ -89,6 +89,8 @@ func TestUpdateCount(t *testing.T) {
assert.Equal(t, 2, q.Length())
assert.Equal(t, 10.0, q.Items()[0].podCounts["pod1"])
assert.Equal(t, 20.0, q.Items()[1].podCounts["pod1"])
assert.Equal(t, true, tc.IsWindowClosed())
assert.Equal(t, 10.0, tc.delta)
})

t.Run("givenTimeNotExistsCountNotAvailable_whenUpdate_thenAddEmptyItem", func(t *testing.T) {
Expand Down Expand Up @@ -128,12 +130,15 @@ func TestCalculateRate(t *testing.T) {
tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20)
tc1.Update("pod1", 5.0)
q.Append(tc1)
tc1.CloseWindow(nil)
tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10)
tc2.Update("pod1", 10.0)
q.Append(tc2)
tc2.CloseWindow(tc1)
tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix())
tc3.Update("pod1", 20.0)
q.Append(tc3)
tc3.CloseWindow(tc2)

assert.Equal(t, 0.0, CalculateRate(q, 5))
assert.Equal(t, 1.0, CalculateRate(q, 15))
Expand All @@ -146,22 +151,53 @@ func TestCalculateRate(t *testing.T) {
assert.Equal(t, 0.75, CalculatePodRate(q, 100, "pod1"))
})

t.Run("singlePod_givenCountIncreases_whenCalculateRate_thenReturnRate_excludeOpenWindow", func(t *testing.T) {
q := sharedqueue.New[*TimestampedCounts](1800)
now := time.Now()

tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20)
tc1.Update("pod1", 5.0)
q.Append(tc1)
tc1.CloseWindow(nil)
tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10)
tc2.Update("pod1", 10.0)
q.Append(tc2)
tc2.CloseWindow(tc1)
tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix())
tc3.Update("pod1", 20.0)
q.Append(tc3)

assert.Equal(t, 0.0, CalculateRate(q, 5))
assert.Equal(t, 0.0, CalculateRate(q, 15))
assert.Equal(t, 0.5, CalculateRate(q, 25))
assert.Equal(t, 0.5, CalculateRate(q, 100))

assert.Equal(t, 0.0, CalculatePodRate(q, 5, "pod1"))
assert.Equal(t, 0.0, CalculatePodRate(q, 15, "pod1"))
assert.Equal(t, 0.5, CalculatePodRate(q, 25, "pod1"))
assert.Equal(t, 0.5, CalculatePodRate(q, 100, "pod1"))
})

t.Run("singlePod_givenCountDecreases_whenCalculateRate_thenReturnRate", func(t *testing.T) {
q := sharedqueue.New[*TimestampedCounts](1800)
now := time.Now()

tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 30)
tc1.Update("pod1", 200.0)
q.Append(tc1)
tc1.CloseWindow(nil)
tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20)
tc2.Update("pod1", 100.0)
q.Append(tc2)
tc2.CloseWindow(tc1)
tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10)
tc3.Update("pod1", 50.0)
q.Append(tc3)
tc3.CloseWindow(tc2)
tc4 := NewTimestampedCounts(now.Truncate(CountWindow).Unix())
tc4.Update("pod1", 80.0)
q.Append(tc4)
tc4.CloseWindow(tc3)

assert.Equal(t, 0.0, CalculateRate(q, 5))
assert.Equal(t, 3.0, CalculateRate(q, 15))
Expand All @@ -184,18 +220,22 @@ func TestCalculateRate(t *testing.T) {
tc1.Update("pod1", 200.0)
tc1.Update("pod2", 100.0)
q.Append(tc1)
tc1.CloseWindow(nil)
tc2 := NewTimestampedCounts(now.Truncate(time.Second*10).Unix() - 20)
tc2.Update("pod1", 100.0)
tc2.Update("pod2", 200.0)
q.Append(tc2)
tc2.CloseWindow(tc1)
tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10)
tc3.Update("pod1", 50.0)
tc3.Update("pod2", 300.0)
q.Append(tc3)
tc3.CloseWindow(tc2)
tc4 := NewTimestampedCounts(now.Truncate(CountWindow).Unix())
tc4.Update("pod1", 80.0)
tc4.Update("pod2", 400.0)
q.Append(tc4)
tc4.CloseWindow(tc3)

// vertex rate
assert.Equal(t, 0.0, CalculateRate(q, 5))
Expand Down Expand Up @@ -228,20 +268,24 @@ func TestCalculateRate(t *testing.T) {
tc1.Update("pod2", 90.0)
tc1.Update("pod3", 50.0)
q.Append(tc1)
tc1.CloseWindow(nil)
tc2 := NewTimestampedCounts(now.Truncate(time.Second*10).Unix() - 20)
tc2.Update("pod1", 100.0)
tc2.Update("pod2", 200.0)
q.Append(tc2)
tc2.CloseWindow(tc1)
tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10)
tc3.Update("pod1", 50.0)
tc3.Update("pod2", 300.0)
tc3.Update("pod4", 100.0)
q.Append(tc3)
tc3.CloseWindow(tc2)
tc4 := NewTimestampedCounts(now.Truncate(CountWindow).Unix())
tc4.Update("pod2", 400.0)
tc4.Update("pod3", 200.0)
tc4.Update("pod100", 200.0)
q.Append(tc4)
tc4.CloseWindow(tc3)

// vertex rate
assert.Equal(t, 0.0, CalculateRate(q, 5))
Expand Down Expand Up @@ -295,20 +339,24 @@ func TestCalculateRate(t *testing.T) {
tc1.Update("pod2", 90.0)
tc1.Update("pod3", 50.0)
q.Append(tc1)
tc1.CloseWindow(nil)
tc2 := NewTimestampedCounts(now.Truncate(time.Second*10).Unix() - 20)
tc2.Update("pod1", 100.0)
tc2.Update("pod2", 200.0)
q.Append(tc2)
tc2.CloseWindow(tc1)
tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10)
tc3.Update("pod1", 50.0)
tc3.Update("pod2", 300.0)
tc3.Update("pod4", 100.0)
q.Append(tc3)
tc3.CloseWindow(tc2)
tc4 := NewTimestampedCounts(now.Truncate(CountWindow).Unix())
tc4.Update("pod2", 400.0)
tc4.Update("pod3", 200.0)
tc4.Update("pod100", 200.0)
q.Append(tc4)
tc4.CloseWindow(tc3)

assert.Equal(t, 0.0, CalculateRate(q, 5))
assert.Equal(t, 50.0, CalculateRate(q, 15))
Expand Down
57 changes: 53 additions & 4 deletions pkg/daemon/server/service/rater/timestamped_counts.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,24 @@ type TimestampedCounts struct {
timestamp int64
// podName to count mapping
podCounts map[string]float64
lock *sync.RWMutex
// isWindowClosed indicates whether we have finished collecting pod counts for this timestamp
isWindowClosed bool
// delta is the total count change from the previous window, it's valid only when isWindowClosed is true
delta float64
lock *sync.RWMutex
}

func NewTimestampedCounts(t int64) *TimestampedCounts {
return &TimestampedCounts{
timestamp: t,
podCounts: make(map[string]float64),
lock: new(sync.RWMutex),
timestamp: t,
podCounts: make(map[string]float64),
isWindowClosed: false,
delta: 0,
lock: new(sync.RWMutex),
}
}

// Update updates the count for a pod if the current window is not closed
func (tc *TimestampedCounts) Update(podName string, count float64) {
tc.lock.Lock()
defer tc.lock.Unlock()
Expand All @@ -56,6 +63,10 @@ func (tc *TimestampedCounts) Update(podName string, count float64) {
// hence we'd rather keep the count as it is to avoid wrong rate calculation.
return
}
if tc.isWindowClosed {
// we skip updating if the window is already closed.
return
}
tc.podCounts[podName] = count
}

Expand All @@ -71,6 +82,44 @@ func (tc *TimestampedCounts) Snapshot() map[string]float64 {
return counts
}

// IsWindowClosed returns whether the window is closed
func (tc *TimestampedCounts) IsWindowClosed() bool {
tc.lock.RLock()
defer tc.lock.RUnlock()
return tc.isWindowClosed
}

// CloseWindow closes the window and calculates the delta by comparing the current pod counts with the previous window
func (tc *TimestampedCounts) CloseWindow(prev *TimestampedCounts) {
// prepare pod counts for both current and previous window for delta calculation
var prevPodCounts map[string]float64
if prev == nil {
prevPodCounts = make(map[string]float64)
} else {
prevPodCounts = prev.Snapshot()
}
currPodCounts := tc.Snapshot()

// calculate the delta by comparing the current pod counts with the previous window
delta := 0.0
for key, currCount := range currPodCounts {
prevCount := prevPodCounts[key] // if key doesn't exist in prevPodCounts, prevCount is 0
if currCount < prevCount {
// this can happen when a pod is restarted during the window
// we count the new count as the delta
delta += currCount
} else {
delta += currCount - prevCount
}
}

// finalize the window by setting isWindowClosed to true and delta to the calculated value
tc.lock.Lock()
defer tc.lock.Unlock()
tc.isWindowClosed = true
tc.delta = delta
}

// ToString returns a string representation of the TimestampedCounts
// it's used for debugging purpose
func (tc *TimestampedCounts) ToString() string {
Expand Down
Loading

0 comments on commit 97db198

Please sign in to comment.