Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix queueing of historical metrics collection #14695

Merged
merged 5 commits into from
Apr 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions app/models/metric/ci_mixin/capture.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,31 @@ def perf_capture_queue(interval_name, options = {})

log_target = "#{self.class.name} name: [#{name}], id: [#{id}]"

# Determine the items to queue up
# cb is the task used to group cluster realtime metrics
cb = nil
if interval_name == 'historical'
start_time = Metric::Capture.historical_start_time if start_time.nil?
end_time = Time.now.utc if end_time.nil?
end_time ||= 1.day.from_now.utc.beginning_of_day # Ensure no more than one historical collection is queue up in the same day
items = split_capture_intervals(interval_name, start_time, end_time)
else
start_time = last_perf_capture_on unless start_time
end_time = Time.now.utc unless end_time
# if last_perf_capture_on is earlier than 4.hour.ago.beginning_of_day,
# then create *one* realtime capture for start_time = 4.hours.ago.beginning_of_day (no end_time)
# and create historical captures for each day from last_perf_capture_on until 4.hours.ago.beginning_of_day
realtime_cut_off = 4.hours.ago.utc.beginning_of_day
items =
if last_perf_capture_on.nil?
[[interval_name, realtime_cut_off]]
elsif last_perf_capture_on < realtime_cut_off
[[interval_name, realtime_cut_off]] +
split_capture_intervals("historical", last_perf_capture_on, realtime_cut_off)
else
[interval_name]
end

cb = {:class_name => self.class.name, :instance_id => id, :method_name => :perf_capture_callback, :args => [[task_id]]} if task_id
end

# Determine what items we should be queuing up
items = [interval_name]
items = split_capture_intervals(interval_name, start_time, end_time) if start_time

# Queue up the actual items
queue_item = {
:class_name => self.class.name,
Expand All @@ -78,15 +89,17 @@ def perf_capture_queue(interval_name, options = {})
if msg.nil?
qi[:priority] = priority
qi.delete(:state)
qi[:miq_callback] = cb if cb
if cb && item_interval == "realtime"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see this before. very nice

qi[:miq_callback] = cb
end
qi
elsif msg.state == "ready" && (task_id || MiqQueue.higher_priority?(priority, msg.priority))
qi[:priority] = priority
# rerun the job (either with new task or higher priority)
qi.delete(:state)
if task_id
existing_tasks = (((msg.miq_callback || {})[:args] || []).first) || []
qi[:miq_callback] = cb.merge(:args => [existing_tasks + [task_id]])
qi[:miq_callback] = cb.merge(:args => [existing_tasks + [task_id]]) if item_interval == "realtime"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can get rid of the check on upgrading priority, then it will be easier to go over to put_unless_exists.

Real-time will potentially still need this

end
qi
else
Expand Down Expand Up @@ -177,6 +190,8 @@ def perf_capture(interval_name, start_time = nil, end_time = nil)

if start_range.nil?
_log.info "#{log_header} Skipping processing for #{log_target} as no metrics were captured."
# Set the last capture on to end_time to prevent forever queueing up the same collection range
update_attributes(:last_perf_capture_on => end_time || Time.now.utc) if interval_name == 'realtime'
else
if expected_start_range && start_range > expected_start_range
_log.warn "#{log_header} For #{log_target}, expected to get data as of [#{expected_start_range}], but got data as of [#{start_range}]."
Expand Down
97 changes: 69 additions & 28 deletions spec/models/metric/ci_mixin/capture_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,45 +36,86 @@ def expected_stats_period_end
parse_datetime('2013-08-28T12:41:40Z')
end

context "#perf_capture_queue" do
def test_perf_capture_queue(time_since_last_perf_capture, total_queue_items, verify_queue_items_count)
# There are usually some lingering queue items from creating the provider above. Notably `stop_event_monitor`
MiqQueue.delete_all
context "#perf_capture_queue('realtime')" do
def verify_realtime_queue_item(queue_item, expected_start_time = nil)
expect(queue_item.method_name).to eq "perf_capture_realtime"
if expected_start_time
q_start_time = queue_item.args.first
expect(q_start_time).to be_same_time_as expected_start_time
end
end

def verify_historical_queue_item(queue_item, expected_start_time, expected_end_time)
expect(queue_item.method_name).to eq "perf_capture_historical"
q_start_time, q_end_time = queue_item.args
expect(q_start_time).to be_same_time_as expected_start_time
expect(q_end_time).to be_same_time_as expected_end_time
end

def verify_perf_capture_queue(last_perf_capture_on, total_queue_items)
Timecop.freeze do
start_time = (Time.now.utc - time_since_last_perf_capture)
@vm.last_perf_capture_on = start_time
@vm.last_perf_capture_on = last_perf_capture_on
@vm.perf_capture_queue("realtime")
expect(MiqQueue.count).to eq total_queue_items

# make sure the queue items are in the correct order
queue_items = MiqQueue.order(:id).limit(verify_queue_items_count)
days_ago = (time_since_last_perf_capture.to_i / 1.day.to_i).days
partial_days = time_since_last_perf_capture - days_ago
interval_start_time = (start_time + days_ago).utc
interval_end_time = (interval_start_time + partial_days).utc
queue_items.each do |q_item|
q_start_time, q_end_time = q_item.args
expect(q_start_time).to be_same_time_as interval_start_time
expect(q_end_time).to be_same_time_as interval_end_time
interval_end_time = interval_start_time
# if the collection threshold is ever parameterized, then this 1.day will have to change
interval_start_time -= 1.day
queue_items = MiqQueue.order(:id).to_a

# first queue item is realtime and only has a start time
realtime_cut_off = 4.hours.ago.utc.beginning_of_day
realtime_start_time = realtime_cut_off if last_perf_capture_on.nil? || last_perf_capture_on < realtime_cut_off
verify_realtime_queue_item(queue_items.shift, realtime_start_time)

# rest of the queue items should be historical
if queue_items.any? && realtime_start_time
interval_start_time = @vm.last_perf_capture_on
interval_end_time = interval_start_time + 1.day
queue_items.reverse.each do |q_item|
verify_historical_queue_item(q_item, interval_start_time, interval_end_time)

interval_start_time = interval_end_time
interval_end_time += 1.day # if collection threshold is parameterized, this increment should change
interval_end_time = realtime_start_time if interval_end_time > realtime_start_time
end
end
end
end

it "splits up long perf_capture durations for old last_perf_capture_on" do
# test when last perf capture was many days ago
# total queue items == 11
# verify last 3 queue items
test_perf_capture_queue(10.days + 5.hours + 23.minutes, 11, 3)
it "when last_perf_capture_on is nil (first time)" do
MiqQueue.delete_all
Timecop.freeze do
Timecop.travel(Time.now.end_of_day - 6.hours)
verify_perf_capture_queue(nil, 1)
end
end

it "when last_perf_capture_on is very old (older than the realtime_cut_off of 4.hours.ago)" do
MiqQueue.delete_all
Timecop.freeze do
Timecop.travel(Time.now.end_of_day - 6.hours)
verify_perf_capture_queue((10.days + 5.hours + 23.minutes).ago, 11)
end
end

it "does not get confused when dealing with a single day" do
# test when perf capture is just a few hours ago
# total queue items == 1
# verify last 1 queue item
test_perf_capture_queue(0.days + 2.hours + 5.minutes, 1, 1)
it "when last_perf_capture_on is recent (before the realtime_cut_off of 4.hours.ago)" do
MiqQueue.delete_all
Timecop.freeze do
Timecop.travel(Time.now.end_of_day - 6.hours)
verify_perf_capture_queue((0.days + 2.hours + 5.minutes).ago, 1)
end
end

it "is able to handle multiple attempts to queue perf_captures and not add new items" do
MiqQueue.delete_all
Timecop.freeze do
# set a specific time of day to avoid sporadic test failures that fall on the exact right time to bump the
# queue items to 12 instead of 11
Timecop.travel(Time.now.end_of_day - 6.hours)
last_perf_capture_on = (10.days + 5.hours + 23.minutes).ago
verify_perf_capture_queue(last_perf_capture_on, 11)
Timecop.travel(Time.now + 20.minutes)
verify_perf_capture_queue(last_perf_capture_on, 11)
end
end
end

Expand Down