Skip to content

Commit

Permalink
Merge pull request #34 from Shopify/populate-refactor
Browse files Browse the repository at this point in the history
Decouple queue instantiation and population
  • Loading branch information
casperisfine authored Nov 17, 2017
2 parents e932d30 + 1ff6b45 commit b7b3d10
Show file tree
Hide file tree
Showing 13 changed files with 174 additions and 57 deletions.
1 change: 1 addition & 0 deletions ruby/lib/ci/queue.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require 'ci/queue/version'
require 'ci/queue/index'
require 'ci/queue/static'
require 'ci/queue/file'
20 changes: 20 additions & 0 deletions ruby/lib/ci/queue/index.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module CI
module Queue
class Index
def initialize(objects, &indexer)
@index = objects.map { |o| [indexer.call(o), o] }.to_h
@indexer = indexer
end

def fetch(key)
@index.fetch(key)
end

def key(value)
key = @indexer.call(value)
raise KeyError, "value not found: #{value.inspect}" unless @index.key?(key)
key
end
end
end
end
16 changes: 11 additions & 5 deletions ruby/lib/ci/queue/redis/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ def initialize(redis:, build_id:)
@build_id = build_id
end

def empty?
size == 0
def exhausted?
queue_initialized? && size == 0
end

def size
Expand All @@ -22,7 +22,7 @@ def to_a
redis.multi do
redis.lrange(key('queue'), 0, -1)
redis.zrange(key('running'), 0, -1)
end.flatten.reverse
end.flatten.reverse.map { |k| index.fetch(k) }
end

def progress
Expand All @@ -32,8 +32,7 @@ def progress
def wait_for_master(timeout: 10)
return true if master?
(timeout * 10 + 1).to_i.times do
case master_status
when 'ready', 'finished'
if queue_initialized?
return true
else
sleep 0.1
Expand All @@ -58,6 +57,13 @@ def master_status
redis.get(key('master-status'))
end

def queue_initialized?
@queue_initialized ||= begin
status = master_status
status == 'ready' || status == 'finished'
end
end

def eval_script(script, *args)
redis.evalsha(load_script(script), *args)
end
Expand Down
2 changes: 1 addition & 1 deletion ruby/lib/ci/queue/redis/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def master?
def wait_for_workers
return false unless wait_for_master

sleep 0.1 until empty?
sleep 0.1 until exhausted?
true
end
end
Expand Down
41 changes: 26 additions & 15 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,24 @@ class << self
class Worker < Base
attr_reader :total

def initialize(tests, redis:, build_id:, worker_id:, timeout:, max_requeues: 0, requeue_tolerance: 0.0)
def initialize(redis:, build_id:, worker_id:, timeout:, max_requeues: 0, requeue_tolerance: 0.0)
@reserved_test = nil
@max_requeues = max_requeues
@global_max_requeues = (tests.size * requeue_tolerance).ceil
@requeue_tolerance = requeue_tolerance
@shutdown_required = false
super(redis: redis, build_id: build_id)
@worker_id = worker_id.to_s
@timeout = timeout
push(tests)
end

def populate(tests, &indexer)
@index = Index.new(tests, &indexer)
push(tests.map { |t| index.key(t) })
self
end

def populated?
!!defined?(@index)
end

def shutdown!
Expand All @@ -38,9 +47,9 @@ def master?

def poll
wait_for_master
until shutdown_required? || empty?
until shutdown_required? || exhausted?
if test = reserve
yield test
yield index.fetch(test)
else
sleep 0.05
end
Expand All @@ -49,8 +58,9 @@ def poll
end

def retry_queue(**args)
log = redis.lrange(key('worker', worker_id, 'queue'), 0, -1).reverse.uniq
Retry.new(
redis.lrange(key('worker', worker_id, 'queue'), 0, -1).reverse.uniq,
log,
redis: redis,
build_id: build_id,
worker_id: worker_id,
Expand All @@ -70,30 +80,32 @@ def minitest_reporters
end

def acknowledge(test)
raise_on_mismatching_test(test)
test_key = index.key(test)
raise_on_mismatching_test(test_key)
eval_script(
:acknowledge,
keys: [key('running'), key('processed')],
argv: [test],
argv: [test_key],
) == 1
end

def requeue(test, offset: Redis.requeue_offset)
raise_on_mismatching_test(test)
test_key = index.key(test)
raise_on_mismatching_test(test_key)

requeued = eval_script(
:requeue,
keys: [key('processed'), key('requeues-count'), key('queue'), key('running')],
argv: [max_requeues, global_max_requeues, test, offset],
argv: [max_requeues, global_max_requeues, test_key, offset],
) == 1

@reserved_test = test unless requeued
@reserved_test = test_key unless requeued
requeued
end

private

attr_reader :worker_id, :timeout, :max_requeues, :global_max_requeues
attr_reader :worker_id, :timeout, :max_requeues, :global_max_requeues, :requeue_tolerance, :index

def raise_on_mismatching_test(test)
if @reserved_test == test
Expand All @@ -112,9 +124,6 @@ def reserve
@reserved_test = (try_to_reserve_lost_test || try_to_reserve_test)
end

RESERVE_TEST = %{
}

def try_to_reserve_test
eval_script(
:reserve,
Expand All @@ -133,6 +142,8 @@ def try_to_reserve_lost_test

def push(tests)
@total = tests.size
@global_max_requeues = (tests.size * requeue_tolerance).ceil

if @master = redis.setnx(key('master-status'), 'setup')
redis.multi do
redis.lpush(key('queue'), tests)
Expand Down
28 changes: 19 additions & 9 deletions ruby/lib/ci/queue/static.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,17 @@ def initialize(tests, max_requeues: 0, requeue_tolerance: 0.0)
@global_max_requeues = (tests.size * requeue_tolerance).ceil
end

def populate(tests, &indexer)
@index = Index.new(tests, &indexer)
self
end

def populated?
!!defined?(@index)
end

def to_a
@queue.dup
@queue.map { |i| index.fetch(i) }
end

def size
Expand All @@ -21,12 +30,12 @@ def size

def poll
while test = @queue.shift
yield test
yield index.fetch(test)
@progress += 1
end
end

def empty?
def exhausted?
@queue.empty?
end

Expand All @@ -35,18 +44,19 @@ def acknowledge(test)
end

def requeue(test)
return false unless should_requeue?(test)
requeues[test] += 1
@queue.unshift(test)
key = index.key(test)
return false unless should_requeue?(key)
requeues[key] += 1
@queue.unshift(index.key(test))
true
end

private

attr_reader :max_requeues, :global_max_requeues
attr_reader :max_requeues, :global_max_requeues, :index

def should_requeue?(test)
requeues[test] < max_requeues && requeues.values.inject(0, :+) < global_max_requeues
def should_requeue?(key)
requeues[key] < max_requeues && requeues.values.inject(0, :+) < global_max_requeues
end

def requeues
Expand Down
8 changes: 5 additions & 3 deletions ruby/test/ci/queue/file_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ class CI::Queue::FileTest < Minitest::Test

TEST_LIST_PATH = '/tmp/queue-test.txt'.freeze

def setup
File.write(TEST_LIST_PATH, TEST_LIST.join("\n"))
@queue = CI::Queue::File.new(TEST_LIST_PATH, max_requeues: 1, requeue_tolerance: 0.1)
private

def build_queue
File.write(TEST_LIST_PATH, TEST_LIST.map(&:name).join("\n"))
CI::Queue::File.new(TEST_LIST_PATH, max_requeues: 1, requeue_tolerance: 0.1)
end
end
3 changes: 1 addition & 2 deletions ruby/test/ci/queue/redis_supervisor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,11 @@ def test_num_workers

def worker(id)
CI::Queue::Redis.new(
SharedQueueAssertions::TEST_LIST,
redis: @redis,
build_id: '42',
worker_id: id.to_s,
timeout: 0.2,
)
).populate(SharedQueueAssertions::TEST_LIST, &:name)
end

def supervisor
Expand Down
Loading

0 comments on commit b7b3d10

Please sign in to comment.