Skip to content

Commit

Permalink
Bump rdkafka gem version and support cooperative-sticky
Browse files Browse the repository at this point in the history
Features:
- Support for cooperative-sticky assignment strategy
- Consumer class partition assign and revoke callbacks

Tests:
- Refactor to share a common consumer
- Consistent and performant communication mechanism with test consumers (using pipes)
  • Loading branch information
bestie committed Aug 17, 2023
1 parent 04c76cf commit 4255529
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 76 deletions.
4 changes: 2 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ PATH
specs:
racecar (2.9.0.beta1)
king_konf (~> 1.0.0)
rdkafka (~> 0.12.0)
rdkafka (~> 0.13.0)

GEM
remote: https://rubygems.org/
Expand Down Expand Up @@ -33,7 +33,7 @@ GEM
byebug (~> 11.0)
pry (>= 0.13, < 0.15)
rake (13.0.6)
rdkafka (0.12.0)
rdkafka (0.13.0)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)
Expand Down
7 changes: 3 additions & 4 deletions lib/racecar/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class Config < KingKonf::Config
desc "The minimum number of messages in the local consumer queue"
integer :min_message_queue_size, default: 2000

desc "Which partition assignment strategy to use, range, roundrobin or cooperative-sticky. -- https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md"
string :partition_assignment_strategy, default: "range,roundrobin"

desc "Kafka consumer configuration options, separated with '=' -- https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md"
list :consumer, default: []

Expand Down Expand Up @@ -296,10 +299,6 @@ def liveness_probe
)
end

def rebalance_listener
RebalanceListener.new(self)
end

private

def rdkafka_security_config
Expand Down
4 changes: 4 additions & 0 deletions lib/racecar/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ def subscribes_to(
subscriptions << Subscription.new(topic, start_from_beginning, max_bytes_per_partition, additional_config)
end
end

# Rebalance hooks for subclasses to override
def on_partitions_assigned(_partitions_by_topic); end
def on_partitions_revoked(_partitions_by_topic); end
end

def configure(producer:, consumer:, instrumenter: NullInstrumenter, config: Racecar.config)
Expand Down
9 changes: 6 additions & 3 deletions lib/racecar/consumer_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ def close
def current
@consumers[@consumer_id_iterator.peek] ||= begin
consumer_config = Rdkafka::Config.new(rdkafka_config(current_subscription))
consumer_config.consumer_rebalance_listener = @config.rebalance_listener

listener = RebalanceListener.new(@config.consumer_class, @instrumenter)
consumer_config.consumer_rebalance_listener = listener
consumer = consumer_config.consumer
listener.rdkafka_consumer = consumer

@instrumenter.instrument('join_group') do
consumer.subscribe current_subscription.topic
end
Expand Down Expand Up @@ -233,7 +235,8 @@ def rdkafka_config(subscription)
"queued.min.messages" => @config.min_message_queue_size,
"session.timeout.ms" => @config.session_timeout * 1000,
"socket.timeout.ms" => @config.socket_timeout * 1000,
"statistics.interval.ms" => @config.statistics_interval_ms
"statistics.interval.ms" => @config.statistics_interval_ms,
"partition.assignment.strategy" => @config.partition_assignment_strategy,
}
config.merge! @config.rdkafka_consumer
config.merge! subscription.additional_config
Expand Down
38 changes: 26 additions & 12 deletions lib/racecar/rebalance_listener.rb
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
module Racecar
class RebalanceListener
def initialize(config)
@config = config
@consumer_class = config.consumer_class
def initialize(consumer_class, instrumenter)
@consumer_class = consumer_class
@instrumenter = instrumenter
@rdkafka_consumer = nil
end

attr_reader :config, :consumer_class
attr_writer :rdkafka_consumer

def on_partitions_assigned(_consumer, topic_partition_list)
consumer_class.respond_to?(:on_partitions_assigned) &&
consumer_class.on_partitions_assigned(topic_partition_list.to_h)
rescue
attr_reader :consumer_class, :instrumenter, :rdkafka_consumer
private :consumer_class, :instrumenter, :rdkafka_consumer

def on_partitions_assigned(rdkafka_topic_partition_list)
partitions_by_topic = rdkafka_topic_partition_list.to_h

instrument("partitions_assigned", partitions: partitions_by_topic ) do
consumer_class.on_partitions_assigned(partitions_by_topic, rdkafka_consumer)
end
end

def on_partitions_revoked(rdkafka_topic_partition_list)
partitions_by_topic = rdkafka_topic_partition_list.to_h

instrument("partitions_revoked", partitions: partitions_by_topic ) do
consumer_class.on_partitions_revoked(partitions_by_topic, rdkafka_consumer)
end
end

def on_partitions_revoked(_consumer, topic_partition_list)
consumer_class.respond_to?(:on_partitions_revoked) &&
consumer_class.on_partitions_revoked(topic_partition_list.to_h)
rescue
private

def instrument(event, payload, &block)
instrumenter.instrument(event, payload, &block)
end
end
end
2 changes: 1 addition & 1 deletion racecar.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Gem::Specification.new do |spec|
spec.required_ruby_version = '>= 2.6'

spec.add_runtime_dependency "king_konf", "~> 1.0.0"
spec.add_runtime_dependency "rdkafka", "~> 0.12.0"
spec.add_runtime_dependency "rdkafka", "~> 0.13.0"

spec.add_development_dependency "bundler", [">= 1.13", "< 3"]
spec.add_development_dependency "pry-byebug"
Expand Down
38 changes: 4 additions & 34 deletions spec/integration/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class NoProcessConsumer < Racecar::Consumer
consumer_class.subscribes_to(input_topic)
consumer_class.output_topic = output_topic
consumer_class.parallel_workers = parallelism
consumer_class.group_id = group_id
consumer_class.pipe_to_test = consumer_message_pipe
end

context "for a single threaded consumer" do
Expand Down Expand Up @@ -109,7 +111,7 @@ class NoProcessConsumer < Racecar::Consumer
publish_messages
wait_for_messages

message_count_by_worker = incoming_messages.group_by { |m| m.headers.fetch(:processed_by_pid) }.transform_values(&:count)
message_count_by_worker = incoming_messages.group_by { |m| m.headers.fetch("processed_by") }.transform_values(&:count)

expect(incoming_messages.map(&:topic).uniq).to eq([output_topic])
expect(incoming_messages.map(&:payload))
Expand All @@ -129,7 +131,7 @@ class NoProcessConsumer < Racecar::Consumer
publish_messages
wait_for_messages

message_count_by_worker = incoming_messages.group_by { |m| m.headers.fetch(:processed_by_pid) }.transform_values(&:count)
message_count_by_worker = incoming_messages.group_by { |m| m.headers.fetch("processed_by") }.transform_values(&:count)

expect(incoming_messages.count).to eq(6)
expect(incoming_messages.map(&:topic).uniq).to eq([output_topic])
Expand All @@ -144,36 +146,4 @@ class NoProcessConsumer < Racecar::Consumer
after do
Object.send(:remove_const, :IntegrationTestConsumer) if defined?(IntegrationTestConsumer)
end

def echo_consumer_class
test_instance = self

Class.new(Racecar::Consumer) do
class << self
attr_accessor :output_topic
attr_accessor :pipe_to_test
end
self.group_id = test_instance.group_id
self.pipe_to_test = test_instance.consumer_message_pipe.write_end

def self.on_partitions_assigned(topic_partition_list)
Racecar.logger.info("on_partitions_assigned #{topic_partition_list.to_h}")

pipe_to_test.puts(JSON.dump({group_id: self.group_id, pid: Process.pid}))
end

def process(message)
produce(message.value, key: message.key, topic: self.class.output_topic, headers: headers)
deliver!
end

private

def headers
{
processed_by_pid: Process.pid,
}
end
end
end
end
119 changes: 119 additions & 0 deletions spec/integration/cooperative_sticky_assignment_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# frozen_string_literal: true

require "racecar/cli"

RSpec.describe "cooperative-sticky assignment", type: :integration do
before do
create_topic(topic: input_topic, partitions: topic_partitions)
create_topic(topic: output_topic, partitions: topic_partitions)

set_config

consumer_class.group_id = group_id
consumer_class.output_topic = output_topic
consumer_class.pipe_to_test = consumer_message_pipe
consumer_class.subscribes_to(input_topic)
end

let(:input_topic) { generate_input_topic_name }
let(:output_topic) { generate_output_topic_name }
let(:group_id) { generate_group_id }
let(:topic_partitions) { 2 }
let(:consumer_class) { CoopStickyConsumer ||= echo_consumer_class }
let(:input_messages) do
message_count.times.map { |n|
{ payload: "message-#{n}", partition: n % topic_partitions }
}
end
let(:message_count) { 20 }

context "during a rebalance" do
let!(:consumers) { [] }
let(:consumer_index_by_id) { {} }

after { terminate_all_consumers }

it "allows healthy consumers to keep processing their paritions" do
start_consumer
start_consumer

wait_for_assignments(2)
publish_messages
wait_for_a_few_messages

terminate_consumer1

wait_for_all_messages

aggregate_failures do
expect_consumer0_did_not_have_partitions_revoked_but_consumer1_did
expect_consumer0_took_over_processing_from_consumer1
end
end

def expect_consumer0_took_over_processing_from_consumer1
consumer0_partitions = messages_by_consumer[0].map(&:partition).uniq
consumer1_partitions = messages_by_consumer[1].map(&:partition).uniq

raise "consumer1 got assigned more than 1 parition" if consumer1_partitions.count > 1
consumer1_partition = consumer1_partitions.fetch(0)

expect(consumer0_partitions).to include(consumer1_partition)
end

def expect_consumer0_did_not_have_partitions_revoked_but_consumer1_did
revocations_by_consumer_thread_id = revocation_events.group_by { |e| e.fetch("consumer_id") }

revocations_by_consumer_index = revocations_by_consumer_thread_id
.transform_keys { |consumer_id| consumer_index_by_id.fetch(consumer_id) }

expect(revocations_by_consumer_index.keys).to eq([1])
end

def messages_by_consumer
incoming_messages.group_by { |m| m.headers["processed_by"] }
.transform_keys { |consumer_id| consumer_index_by_id.fetch(consumer_id) }
end

def start_consumer
runner = Racecar.runner(consumer_class.new)

thread = Thread.new do
Thread.current.name = "Racecar runner #{consumers.size}"
runner.run
end

consumers << runner
consumer_index_by_id["#{Process.pid}-#{thread.object_id}"] = consumers.index(runner)
end

def terminate_consumer1
consumers[1].stop
end

def terminate_all_consumers
consumers.each(&:stop)
end

def wait_for_a_few_messages
wait_for_messages(expected_message_count: 5)
end

def wait_for_all_messages
wait_for_messages(expected_message_count: message_count)
end
end

def set_config
Racecar.config.fetch_messages = 1
Racecar.config.max_wait_time = 0.1
Racecar.config.session_timeout = 6 # minimum allowed by default broker config
Racecar.config.heartbeat_interval = 1.5
Racecar.config.partition_assignment_strategy = "cooperative-sticky"
Racecar.config.load_consumer_class(consumer_class)
end

after do |test|
Object.send(:remove_const, :CoopStickyConsumer) if defined?(CoopStickyConsumer)
end
end
1 change: 1 addition & 0 deletions spec/integration/kubernetes_probes_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def sleep_with_timeout(max_sleep = 8)

def set_config
Racecar.config = Racecar::Config.new
Racecar.config.load_consumer_class(consumer_class)
Racecar.config.max_wait_time = 0.05
Racecar.config.liveness_probe_enabled = true
Racecar.config.liveness_probe_file_path = file_path
Expand Down
8 changes: 4 additions & 4 deletions spec/message_delivery_error_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
RSpec.describe Racecar::MessageDeliveryError do
let(:rdkafka_msg_timed_out) { Rdkafka::RdkafkaError.new(-192) }
let(:rdkafka_unknown_topic_or_part) { Rdkafka::RdkafkaError.new(3) }

let(:rdkafka_delivery_handle) do
Rdkafka::Producer::DeliveryHandle.new.tap do |dh|
dh[:partition] = 37
dh[:offset] = 42
end
instance_double(Rdkafka::Producer::DeliveryHandle).tap { |mock|
allow(mock).to receive_message_chain(:create_result, :partition).and_return(37)
}
end

it "passes through error code" do
Expand Down
Loading

0 comments on commit 4255529

Please sign in to comment.