From 1ff6b451d616ebebfba15c407922faf317f641ea Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Thu, 16 Nov 2017 14:50:06 +0100 Subject: [PATCH] Decouple queue instantiation and population --- ruby/lib/ci/queue.rb | 1 + ruby/lib/ci/queue/index.rb | 20 +++++++ ruby/lib/ci/queue/redis/base.rb | 16 ++++-- ruby/lib/ci/queue/redis/supervisor.rb | 2 +- ruby/lib/ci/queue/redis/worker.rb | 41 ++++++++----- ruby/lib/ci/queue/static.rb | 28 ++++++--- ruby/test/ci/queue/file_test.rb | 8 ++- ruby/test/ci/queue/redis_supervisor_test.rb | 3 +- ruby/test/ci/queue/redis_test.rb | 57 +++++++++++++++---- ruby/test/ci/queue/static_test.rb | 6 +- ruby/test/fixtures/redis-runner.rb | 3 +- .../minitest/reporters/redis_reporter_test.rb | 3 +- ruby/test/support/shared_queue_assertions.rb | 43 ++++++++++++-- 13 files changed, 174 insertions(+), 57 deletions(-) create mode 100644 ruby/lib/ci/queue/index.rb diff --git a/ruby/lib/ci/queue.rb b/ruby/lib/ci/queue.rb index 4d042989..b266960e 100644 --- a/ruby/lib/ci/queue.rb +++ b/ruby/lib/ci/queue.rb @@ -1,3 +1,4 @@ require 'ci/queue/version' +require 'ci/queue/index' require 'ci/queue/static' require 'ci/queue/file' diff --git a/ruby/lib/ci/queue/index.rb b/ruby/lib/ci/queue/index.rb new file mode 100644 index 00000000..e39663be --- /dev/null +++ b/ruby/lib/ci/queue/index.rb @@ -0,0 +1,20 @@ +module CI + module Queue + class Index + def initialize(objects, &indexer) + @index = objects.map { |o| [indexer.call(o), o] }.to_h + @indexer = indexer + end + + def fetch(key) + @index.fetch(key) + end + + def key(value) + key = @indexer.call(value) + raise KeyError, "value not found: #{value.inspect}" unless @index.key?(key) + key + end + end + end +end diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 8821597c..b7ae2517 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -7,8 +7,8 @@ def initialize(redis:, build_id:) @build_id = build_id end - def empty? - size == 0 + def exhausted? + queue_initialized? && size == 0 end def size @@ -22,7 +22,7 @@ def to_a redis.multi do redis.lrange(key('queue'), 0, -1) redis.zrange(key('running'), 0, -1) - end.flatten.reverse + end.flatten.reverse.map { |k| index.fetch(k) } end def progress @@ -32,8 +32,7 @@ def progress def wait_for_master(timeout: 10) return true if master? (timeout * 10 + 1).to_i.times do - case master_status - when 'ready', 'finished' + if queue_initialized? return true else sleep 0.1 @@ -58,6 +57,13 @@ def master_status redis.get(key('master-status')) end + def queue_initialized? + @queue_initialized ||= begin + status = master_status + status == 'ready' || status == 'finished' + end + end + def eval_script(script, *args) redis.evalsha(load_script(script), *args) end diff --git a/ruby/lib/ci/queue/redis/supervisor.rb b/ruby/lib/ci/queue/redis/supervisor.rb index e2c10f5e..5e533b6b 100644 --- a/ruby/lib/ci/queue/redis/supervisor.rb +++ b/ruby/lib/ci/queue/redis/supervisor.rb @@ -9,7 +9,7 @@ def master? def wait_for_workers return false unless wait_for_master - sleep 0.1 until empty? + sleep 0.1 until exhausted? true end end diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 5b51eecb..9b15a25f 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -13,15 +13,24 @@ class << self class Worker < Base attr_reader :total - def initialize(tests, redis:, build_id:, worker_id:, timeout:, max_requeues: 0, requeue_tolerance: 0.0) + def initialize(redis:, build_id:, worker_id:, timeout:, max_requeues: 0, requeue_tolerance: 0.0) @reserved_test = nil @max_requeues = max_requeues - @global_max_requeues = (tests.size * requeue_tolerance).ceil + @requeue_tolerance = requeue_tolerance @shutdown_required = false super(redis: redis, build_id: build_id) @worker_id = worker_id.to_s @timeout = timeout - push(tests) + end + + def populate(tests, &indexer) + @index = Index.new(tests, &indexer) + push(tests.map { |t| index.key(t) }) + self + end + + def populated? + !!defined?(@index) end def shutdown! @@ -38,9 +47,9 @@ def master? def poll wait_for_master - until shutdown_required? || empty? + until shutdown_required? || exhausted? if test = reserve - yield test + yield index.fetch(test) else sleep 0.05 end @@ -49,8 +58,9 @@ def poll end def retry_queue(**args) + log = redis.lrange(key('worker', worker_id, 'queue'), 0, -1).reverse.uniq Retry.new( - redis.lrange(key('worker', worker_id, 'queue'), 0, -1).reverse.uniq, + log, redis: redis, build_id: build_id, worker_id: worker_id, @@ -70,30 +80,32 @@ def minitest_reporters end def acknowledge(test) - raise_on_mismatching_test(test) + test_key = index.key(test) + raise_on_mismatching_test(test_key) eval_script( :acknowledge, keys: [key('running'), key('processed')], - argv: [test], + argv: [test_key], ) == 1 end def requeue(test, offset: Redis.requeue_offset) - raise_on_mismatching_test(test) + test_key = index.key(test) + raise_on_mismatching_test(test_key) requeued = eval_script( :requeue, keys: [key('processed'), key('requeues-count'), key('queue'), key('running')], - argv: [max_requeues, global_max_requeues, test, offset], + argv: [max_requeues, global_max_requeues, test_key, offset], ) == 1 - @reserved_test = test unless requeued + @reserved_test = test_key unless requeued requeued end private - attr_reader :worker_id, :timeout, :max_requeues, :global_max_requeues + attr_reader :worker_id, :timeout, :max_requeues, :global_max_requeues, :requeue_tolerance, :index def raise_on_mismatching_test(test) if @reserved_test == test @@ -112,9 +124,6 @@ def reserve @reserved_test = (try_to_reserve_lost_test || try_to_reserve_test) end - RESERVE_TEST = %{ - } - def try_to_reserve_test eval_script( :reserve, @@ -133,6 +142,8 @@ def try_to_reserve_lost_test def push(tests) @total = tests.size + @global_max_requeues = (tests.size * requeue_tolerance).ceil + if @master = redis.setnx(key('master-status'), 'setup') redis.multi do redis.lpush(key('queue'), tests) diff --git a/ruby/lib/ci/queue/static.rb b/ruby/lib/ci/queue/static.rb index 2049cb38..070b014a 100644 --- a/ruby/lib/ci/queue/static.rb +++ b/ruby/lib/ci/queue/static.rb @@ -11,8 +11,17 @@ def initialize(tests, max_requeues: 0, requeue_tolerance: 0.0) @global_max_requeues = (tests.size * requeue_tolerance).ceil end + def populate(tests, &indexer) + @index = Index.new(tests, &indexer) + self + end + + def populated? + !!defined?(@index) + end + def to_a - @queue.dup + @queue.map { |i| index.fetch(i) } end def size @@ -21,12 +30,12 @@ def size def poll while test = @queue.shift - yield test + yield index.fetch(test) @progress += 1 end end - def empty? + def exhausted? @queue.empty? end @@ -35,18 +44,19 @@ def acknowledge(test) end def requeue(test) - return false unless should_requeue?(test) - requeues[test] += 1 - @queue.unshift(test) + key = index.key(test) + return false unless should_requeue?(key) + requeues[key] += 1 + @queue.unshift(index.key(test)) true end private - attr_reader :max_requeues, :global_max_requeues + attr_reader :max_requeues, :global_max_requeues, :index - def should_requeue?(test) - requeues[test] < max_requeues && requeues.values.inject(0, :+) < global_max_requeues + def should_requeue?(key) + requeues[key] < max_requeues && requeues.values.inject(0, :+) < global_max_requeues end def requeues diff --git a/ruby/test/ci/queue/file_test.rb b/ruby/test/ci/queue/file_test.rb index 6b4412b4..50e611b9 100644 --- a/ruby/test/ci/queue/file_test.rb +++ b/ruby/test/ci/queue/file_test.rb @@ -5,8 +5,10 @@ class CI::Queue::FileTest < Minitest::Test TEST_LIST_PATH = '/tmp/queue-test.txt'.freeze - def setup - File.write(TEST_LIST_PATH, TEST_LIST.join("\n")) - @queue = CI::Queue::File.new(TEST_LIST_PATH, max_requeues: 1, requeue_tolerance: 0.1) + private + + def build_queue + File.write(TEST_LIST_PATH, TEST_LIST.map(&:name).join("\n")) + CI::Queue::File.new(TEST_LIST_PATH, max_requeues: 1, requeue_tolerance: 0.1) end end diff --git a/ruby/test/ci/queue/redis_supervisor_test.rb b/ruby/test/ci/queue/redis_supervisor_test.rb index 2f11bdec..a0e1e2c2 100644 --- a/ruby/test/ci/queue/redis_supervisor_test.rb +++ b/ruby/test/ci/queue/redis_supervisor_test.rb @@ -47,12 +47,11 @@ def test_num_workers def worker(id) CI::Queue::Redis.new( - SharedQueueAssertions::TEST_LIST, redis: @redis, build_id: '42', worker_id: id.to_s, timeout: 0.2, - ) + ).populate(SharedQueueAssertions::TEST_LIST, &:name) end def supervisor diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb index 02d10502..e16ba916 100644 --- a/ruby/test/ci/queue/redis_test.rb +++ b/ruby/test/ci/queue/redis_test.rb @@ -6,7 +6,7 @@ class CI::Queue::RedisTest < Minitest::Test def setup @redis = ::Redis.new(db: 7, host: ENV.fetch('REDIS_HOST', nil)) @redis.flushdb - @queue = worker(1, max_requeues: 1, requeue_tolerance: 0.1) + super end def test_requeue # redefine the shared one @@ -31,7 +31,11 @@ def test_requeue # redefine the shared one end def test_retry_queue - assert_equal poll(@queue), poll(@queue.retry_queue) + test_order = poll(@queue) + retry_queue = @queue.retry_queue + populate(retry_queue) + retry_test_order = poll(retry_queue) + assert_equal test_order, retry_test_order end def test_shutdown @@ -50,6 +54,20 @@ def test_master_election refute_predicate worker(1), :master? end + def test_exhausted_while_not_populated + assert_predicate @queue, :populated? + + second_worker = worker(2, populate: false) + + refute_predicate second_worker, :populated? + refute_predicate second_worker, :exhausted? + + poll(@queue) + + refute_predicate second_worker, :populated? + assert_predicate second_worker, :exhausted? + end + def test_timed_out_test_are_picked_up_by_other_workers second_queue = worker(2) acquired = false @@ -74,9 +92,9 @@ def test_timed_out_test_are_picked_up_by_other_workers end end - assert_predicate @queue, :empty? - assert_equal [TEST_LIST.first], @queue.retry_queue.to_a - assert_equal TEST_LIST.sort, second_queue.retry_queue.to_a.sort + assert_predicate @queue, :exhausted? + assert_equal [TEST_LIST.first], populate(@queue.retry_queue).to_a + assert_equal TEST_LIST.sort, populate(second_queue.retry_queue).to_a.sort end def test_test_isnt_requeued_if_it_was_picked_up_by_another_worker @@ -104,7 +122,7 @@ def test_test_isnt_requeued_if_it_was_picked_up_by_another_worker end end - assert_predicate @queue, :empty? + assert_predicate @queue, :exhausted? end def test_acknowledge_returns_false_if_the_test_was_picked_up_by_another_worker @@ -135,7 +153,7 @@ def test_acknowledge_returns_false_if_the_test_was_picked_up_by_another_worker end end - assert_predicate @queue, :empty? + assert_predicate @queue, :exhausted? end def test_workers_register @@ -150,7 +168,7 @@ def test_continuously_timing_out_tests begin threads = 2.times.map do |i| Thread.new do - queue = worker(i, tests: %w(a), build_id: '24') + queue = worker(i, tests: [TEST_LIST.first], build_id: '24') queue.poll do |test| sleep 1 # timeout queue.acknowledge(test) @@ -161,7 +179,9 @@ def test_continuously_timing_out_tests threads.each { |t| t.join(3) } threads.each { |t| refute_predicate t, :alive? } - assert_predicate @queue, :empty? + queue = worker(12, build_id: '24') + assert_predicate queue, :queue_initialized? + assert_predicate queue, :exhausted? ensure threads.each(&:kill) end @@ -170,15 +190,28 @@ def test_continuously_timing_out_tests private + def build_queue + worker(1, max_requeues: 1, requeue_tolerance: 0.1, populate: false) + end + + def populate(worker, tests: TEST_LIST.dup) + worker.populate(tests, &:name) + end + def worker(id, **args) - test_list = args.delete(:tests) || TEST_LIST.dup - CI::Queue::Redis.new( - test_list, + tests = args.delete(:tests) || TEST_LIST.dup + skip_populate = args.delete(:populate) == false + queue = CI::Queue::Redis.new( redis: @redis, build_id: '42', worker_id: id.to_s, timeout: 0.2, **args, ) + if skip_populate + return queue + else + populate(queue, tests: tests) + end end end diff --git a/ruby/test/ci/queue/static_test.rb b/ruby/test/ci/queue/static_test.rb index b48690aa..ba400c7b 100644 --- a/ruby/test/ci/queue/static_test.rb +++ b/ruby/test/ci/queue/static_test.rb @@ -3,7 +3,9 @@ class CI::Queue::StaticTest < Minitest::Test include SharedQueueAssertions - def setup - @queue = CI::Queue::Static.new(TEST_LIST.dup, max_requeues: 1, requeue_tolerance: 0.1) + private + + def build_queue + CI::Queue::Static.new(TEST_LIST.map(&:name), max_requeues: 1, requeue_tolerance: 0.1) end end diff --git a/ruby/test/fixtures/redis-runner.rb b/ruby/test/fixtures/redis-runner.rb index 97572460..e8fe41bf 100755 --- a/ruby/test/fixtures/redis-runner.rb +++ b/ruby/test/fixtures/redis-runner.rb @@ -10,7 +10,6 @@ Minitest::Reporters.use!([Minitest::Reporters::QueueReporter.new]) Minitest.queue = CI::Queue::Redis.new( - Minitest.loaded_tests, redis: ::Redis.new(host: ENV.fetch('REDIS_HOST', nil), db: 7, timeout: 1), build_id: 1, worker_id: 1, @@ -25,3 +24,5 @@ requeue_tolerance: 1.0, ) end + +Minitest.queue.populate(Minitest.loaded_tests, &:to_s) diff --git a/ruby/test/minitest/reporters/redis_reporter_test.rb b/ruby/test/minitest/reporters/redis_reporter_test.rb index aa5b4488..2c3da3ed 100644 --- a/ruby/test/minitest/reporters/redis_reporter_test.rb +++ b/ruby/test/minitest/reporters/redis_reporter_test.rb @@ -48,12 +48,11 @@ def test_retrying_test def worker(id) CI::Queue::Redis.new( - ['Foo'], redis: @redis, build_id: '42', worker_id: id.to_s, timeout: 0.2, - ) + ).populate(%w(a b c d e f g).map { |n| runnable(n) }, &:name) end def summary diff --git a/ruby/test/support/shared_queue_assertions.rb b/ruby/test/support/shared_queue_assertions.rb index 06782b69..48e00328 100644 --- a/ruby/test/support/shared_queue_assertions.rb +++ b/ruby/test/support/shared_queue_assertions.rb @@ -1,4 +1,24 @@ module SharedQueueAssertions + class TestCase + attr_reader :name + + def initialize(name) + @name = name + end + + def inspect + "#" + end + + def to_s + inspect + end + + def <=>(other) + self.name <=> other + end + end + include QueueHelper TEST_LIST = %w( @@ -6,7 +26,11 @@ module SharedQueueAssertions ATest#test_bar BTest#test_foo BTest#test_bar - ).freeze + ).map { |n| TestCase.new(n).freeze }.freeze + + def setup + @queue = populate(build_queue) + end def test_progess count = 0 @@ -26,10 +50,13 @@ def test_size assert_equal 0, @queue.size end - def test_empty? - refute_predicate @queue, :empty? - poll(@queue) - assert_predicate @queue, :empty? + def test_exhausted? + queue = build_queue + refute_predicate queue, :exhausted? + populate(queue) + refute_predicate queue, :exhausted? + poll(queue) + assert_predicate queue, :exhausted? end def test_to_a @@ -57,4 +84,10 @@ def test_acknowledge assert_equal true, @queue.acknowledge(test) end end + + private + + def populate(queue, tests: TEST_LIST.dup) + queue.populate(tests, &:name) + end end