From defca71d8bc578fd87e5904924fac896f30a8c07 Mon Sep 17 00:00:00 2001 From: Christian Bruckmayer Date: Thu, 12 Sep 2024 13:58:40 +0100 Subject: [PATCH 1/2] Make queue setup idempotent --- ruby/lib/ci/queue/redis/worker.rb | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 542f029d..1756e2e2 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -207,15 +207,26 @@ def push(tests) puts "Worker electected as leader, pushing #{@total} tests to the queue." puts + attempts = 0 duration = measure do - redis.multi do |transaction| - transaction.lpush(key('queue'), tests) unless tests.empty? - transaction.set(key('total'), @total) - transaction.set(key('master-status'), 'ready') - - transaction.expire(key('queue'), config.redis_ttl) - transaction.expire(key('total'), config.redis_ttl) - transaction.expire(key('master-status'), config.redis_ttl) + redis.without_reconnect do + redis.multi do |transaction| + transaction.lpush(key('queue'), tests) unless tests.empty? + transaction.set(key('total'), @total) + transaction.set(key('master-status'), 'ready') + + transaction.expire(key('queue'), config.redis_ttl) + transaction.expire(key('total'), config.redis_ttl) + transaction.expire(key('master-status'), config.redis_ttl) + end + rescue ::Redis::BaseError => error + if !queue_initialized? && attempts < 3 + puts "Retrying pushing #{@total} tests to the queue... (#{error})" + attempts += 1 + retry + end + + raise if !queue_initialized? end end From b7c10b88d28b36ebc703406595aba20faaf13589 Mon Sep 17 00:00:00 2001 From: Christian Bruckmayer Date: Thu, 12 Sep 2024 16:42:05 +0100 Subject: [PATCH 2/2] Bump Redis timeout for pushing to the queue --- ruby/lib/ci/queue/redis/base.rb | 8 ++++++++ ruby/lib/ci/queue/redis/worker.rb | 20 +++++++++++--------- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 750b729d..b351527a 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -185,6 +185,14 @@ def max_test_failed? attr_reader :redis, :redis_url + def with_redis_timeout(timeout) + prev = redis._client.timeout + redis._client.timeout = timeout + yield + ensure + redis._client.timeout = prev + end + def measure starting = Process.clock_gettime(Process::CLOCK_MONOTONIC) yield diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 1756e2e2..ff87b6e0 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -209,15 +209,17 @@ def push(tests) attempts = 0 duration = measure do - redis.without_reconnect do - redis.multi do |transaction| - transaction.lpush(key('queue'), tests) unless tests.empty? - transaction.set(key('total'), @total) - transaction.set(key('master-status'), 'ready') - - transaction.expire(key('queue'), config.redis_ttl) - transaction.expire(key('total'), config.redis_ttl) - transaction.expire(key('master-status'), config.redis_ttl) + with_redis_timeout(5) do + redis.without_reconnect do + redis.multi do |transaction| + transaction.lpush(key('queue'), tests) unless tests.empty? + transaction.set(key('total'), @total) + transaction.set(key('master-status'), 'ready') + + transaction.expire(key('queue'), config.redis_ttl) + transaction.expire(key('total'), config.redis_ttl) + transaction.expire(key('master-status'), config.redis_ttl) + end end rescue ::Redis::BaseError => error if !queue_initialized? && attempts < 3