-
Notifications
You must be signed in to change notification settings - Fork 28
/
ringbuffer.rb
116 lines (104 loc) · 3.02 KB
/
ringbuffer.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
module ConsulTimeline
# Node of a ringbuffer
# This contains entries to next and previous elemens as well as the value
class RingBufferNode
attr_reader :prev, :value, :next
attr_writer :prev, :next, :value
def initialize(value, p_elem, n_elem)
@value = value
@prev = p_elem
@next = n_elem
end
# Insert element before current element, return inserted Node
def insert_before(obj)
old_prev = @prev
@prev = RingBufferNode.new(obj, old_prev, self)
old_prev&.next = @prev
@prev
end
# Append element after current ince, return inserted Node
def append(obj)
old_next = @next
@next = RingBufferNode.new(obj, self, old_next)
old_next&.prev = @next
@next
end
def to_s
"[prev=#{@prev.object_id}, next=#{@next.object_id}, value=#{@value}]"
end
end
# A ringbuffer that supports inserting out of order elements at the right place
# This is needed as Consul might notify us from changes in any order, so we
# might receive a notification t-2 AFTER t-1, at t.
# The ringbuffer will discard old elements so it keeps the "n" most recent elements only.
class SortedRingBuffer
include Enumerable
def initialize(max_size, sort_func)
raise "Invalid size #{max_size}" unless max_size.positive?
@head = RingBufferNode.new(nil, nil, nil)
@sort_func = sort_func
@tail = @head
@max_size = max_size
(max_size - 1).times do
@head = @head.insert_before(nil)
end
end
def push(obj)
return unless obj
cur = @tail
raise "No head.next found in #{@head}" unless @head.next
cur = cur.prev while cur&.value && @sort_func.call(cur.value, obj).positive?
if cur.nil?
# The value we try to insert is before @head
# no need to do anything
elsif cur == @head
# This is the head, just update the value
@head.value = obj
else
@head = @head.next
@head.prev = nil
new_val = cur.append(obj)
@tail = new_val if @tail == cur
end
end
def each
return enum_for(:each) unless block_given? # Sparkling magic!
cur = @head
until cur.nil?
yield cur.value if cur.value
cur = cur.next
end
end
def to_a
arr = Array.new(@max_size)
cur = @head
i = 0
until cur.nil?
if cur.value
arr[i] = cur.value
i += 1
end
cur = cur.next
end
if i != @max_size
arr.reject(&:nil?)
else
arr
end
end
end
end
if ARGV.count.positive? && ARGV[0] == 'debug'
require 'json'
size = (ARGV[1] || 10).to_i
ringbuff = ConsulTimeline::SortedRingBuffer.new(size, ->(a, b) { a <=> b })
ringbuff.push 0.5
puts ringbuff.to_a
(size * 10).times do |i|
ringbuff.push(2 * i + Random.rand(size / 10))
end
ringbuff.push 99_999_999_999_999
arr = ringbuff.to_a
raise "OOPS wrong size := #{arr.count} instead of #{size}" unless arr.count == size
STDOUT.puts JSON.generate(arr)
end