Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for cooperative-sticky assignment strategy #339

Merged
merged 4 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ PATH
specs:
racecar (2.9.0.beta1)
king_konf (~> 1.0.0)
rdkafka (~> 0.12.0)
rdkafka (~> 0.13.0)

GEM
remote: https://rubygems.org/
Expand Down Expand Up @@ -33,7 +33,7 @@ GEM
byebug (~> 11.0)
pry (>= 0.13, < 0.15)
rake (13.0.6)
rdkafka (0.12.0)
rdkafka (0.13.0)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)
Expand Down
36 changes: 31 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -476,15 +476,17 @@ With Foreman, you can easily run these processes locally by executing `foreman r

If you run your applications in Kubernetes, use the following [Deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/) spec as a starting point:

##### Recreate Strategy

```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-racecar-deployment
labels:
app: my-racecar
spec:
replicas: 3 # <-- this will give us three consumers in the group.
spec
replicas: 4 # <-- this is a good value if you have a multliple of 4 partitions
selector:
matchLabels:
app: my-racecar
Expand All @@ -506,9 +508,33 @@ spec:
value: 5
```

The important part is the `strategy.type` value, which tells Kubernetes how to upgrade from one version of your Deployment to another. Many services use so-called _rolling updates_, where some but not all containers are replaced with the new version. This is done so that, if the new version doesn't work, the old version is still there to serve most of the requests. For Kafka consumers, this doesn't work well. The reason is that every time a consumer joins or leaves a group, every other consumer in the group needs to stop and synchronize the list of partitions assigned to each group member. So if the group is updated in a rolling fashion, this synchronization would occur over and over again, causing undesirable double-processing of messages as consumers would start only to be synchronized shortly after.
This configuration uses the recreate strategy which completely terminates all consumers before starting new ones.
It's simple and easy to understand but can result in significant 'downtime' where no messages are processed.

##### Rolling Updates and 'sticky-cooperative' Assignment

A newer alternative is to use the consumer's "cooperative-sticky" assignment strategy which allows healthy consumers to keep processing their partitions while others are terminated.
This can be combined with a restricted rolling update to minimize processing downtime.

Add to your Racecar config:
```ruby
Racecar.configure do |c|
c.partition_assignment_strategy = "cooperative-sticky"
end
```

Replace the Kubernetes deployment strategy with:
```yaml
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 0 # <- Never boot an excess consumer
maxUnavailable: 1 # <- The deploy 'rolls' one consumer at a time
```

These two configurations should be deployed together.

Instead, the `Recreate` update strategy should be used. It completely tears down the existing containers before starting all of the new containers simultaneously, allowing for a single synchronization stage and a much faster, more stable deployment update.
While `maxSurge` should always be 0, `maxUnavailable` can be increased to reduce deployment times in exchange for longer pauses in message processing.

#### Liveness Probe

Expand Down Expand Up @@ -663,7 +689,7 @@ In order to introspect the configuration of a consumer process, send it the `SIG

### Upgrading from v1 to v2

In order to safely upgrade from Racecar v1 to v2, you need to completely shut down your consumer group before starting it up again with the v2 Racecar dependency. In general, you should avoid rolling deploys for consumers groups, so it is likely the case that this will just work for you, but it's a good idea to check first.
In order to safely upgrade from Racecar v1 to v2, you need to completely shut down your consumer group before starting it up again with the v2 Racecar dependency.

### Compression

Expand Down
7 changes: 3 additions & 4 deletions lib/racecar/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class Config < KingKonf::Config
desc "The minimum number of messages in the local consumer queue"
integer :min_message_queue_size, default: 2000

desc "Which partition assignment strategy to use, range, roundrobin or cooperative-sticky. -- https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md"
string :partition_assignment_strategy, default: "range,roundrobin"

desc "Kafka consumer configuration options, separated with '=' -- https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md"
list :consumer, default: []

Expand Down Expand Up @@ -296,10 +299,6 @@ def liveness_probe
)
end

def rebalance_listener
RebalanceListener.new(self)
end

private

def rdkafka_security_config
Expand Down
4 changes: 4 additions & 0 deletions lib/racecar/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ def subscribes_to(
subscriptions << Subscription.new(topic, start_from_beginning, max_bytes_per_partition, additional_config)
end
end

# Rebalance hooks for subclasses to override
def on_partitions_assigned(rebalance_event); end
def on_partitions_revoked(rebalance_event); end
end

def configure(producer:, consumer:, instrumenter: NullInstrumenter, config: Racecar.config)
Expand Down
9 changes: 6 additions & 3 deletions lib/racecar/consumer_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ def close
def current
@consumers[@consumer_id_iterator.peek] ||= begin
consumer_config = Rdkafka::Config.new(rdkafka_config(current_subscription))
consumer_config.consumer_rebalance_listener = @config.rebalance_listener

listener = RebalanceListener.new(@config.consumer_class, @instrumenter)
consumer_config.consumer_rebalance_listener = listener
consumer = consumer_config.consumer
listener.rdkafka_consumer = consumer

@instrumenter.instrument('join_group') do
consumer.subscribe current_subscription.topic
end
Expand Down Expand Up @@ -233,7 +235,8 @@ def rdkafka_config(subscription)
"queued.min.messages" => @config.min_message_queue_size,
"session.timeout.ms" => @config.session_timeout * 1000,
"socket.timeout.ms" => @config.socket_timeout * 1000,
"statistics.interval.ms" => @config.statistics_interval_ms
"statistics.interval.ms" => @config.statistics_interval_ms,
"partition.assignment.strategy" => @config.partition_assignment_strategy,
}
config.merge! @config.rdkafka_consumer
config.merge! subscription.additional_config
Expand Down
60 changes: 48 additions & 12 deletions lib/racecar/rebalance_listener.rb
Original file line number Diff line number Diff line change
@@ -1,22 +1,58 @@
module Racecar
class RebalanceListener
def initialize(config)
@config = config
@consumer_class = config.consumer_class
def initialize(consumer_class, instrumenter)
@consumer_class = consumer_class
@instrumenter = instrumenter
@rdkafka_consumer = nil
end

attr_reader :config, :consumer_class
attr_writer :rdkafka_consumer

def on_partitions_assigned(_consumer, topic_partition_list)
consumer_class.respond_to?(:on_partitions_assigned) &&
consumer_class.on_partitions_assigned(topic_partition_list.to_h)
rescue
attr_reader :consumer_class, :instrumenter, :rdkafka_consumer
private :consumer_class, :instrumenter, :rdkafka_consumer

def on_partitions_assigned(rdkafka_topic_partition_list)
event = Event.new(rdkafka_consumer: rdkafka_consumer, rdkafka_topic_partition_list: rdkafka_topic_partition_list)

instrument("partitions_assigned", partitions: event.partition_numbers) do
consumer_class.on_partitions_assigned(event)
end
end

def on_partitions_revoked(rdkafka_topic_partition_list)
event = Event.new(rdkafka_consumer: rdkafka_consumer, rdkafka_topic_partition_list: rdkafka_topic_partition_list)

instrument("partitions_revoked", partitions: event.partition_numbers) do
consumer_class.on_partitions_revoked(event)
end
end

private

def instrument(event, payload, &block)
instrumenter.instrument(event, payload, &block)
end

def on_partitions_revoked(_consumer, topic_partition_list)
consumer_class.respond_to?(:on_partitions_revoked) &&
consumer_class.on_partitions_revoked(topic_partition_list.to_h)
rescue
class Event
def initialize(rdkafka_topic_partition_list:, rdkafka_consumer:)
@__rdkafka_topic_partition_list = rdkafka_topic_partition_list
@__rdkafka_consumer = rdkafka_consumer
end

def topic_name
__rdkafka_topic_partition_list.to_h.keys.first
end

def partition_numbers
__rdkafka_topic_partition_list.to_h.values.flatten.map(&:partition)
end

def empty?
__rdkafka_topic_partition_list.empty?
end

# API private and not guaranteed stable
attr_reader :__rdkafka_topic_partition_list, :__rdkafka_consumer
end
end
end
2 changes: 1 addition & 1 deletion racecar.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Gem::Specification.new do |spec|
spec.required_ruby_version = '>= 2.6'

spec.add_runtime_dependency "king_konf", "~> 1.0.0"
spec.add_runtime_dependency "rdkafka", "~> 0.12.0"
spec.add_runtime_dependency "rdkafka", "~> 0.13.0"

spec.add_development_dependency "bundler", [">= 1.13", "< 3"]
spec.add_development_dependency "pry-byebug"
Expand Down
38 changes: 4 additions & 34 deletions spec/integration/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class NoProcessConsumer < Racecar::Consumer
consumer_class.subscribes_to(input_topic)
consumer_class.output_topic = output_topic
consumer_class.parallel_workers = parallelism
consumer_class.group_id = group_id
consumer_class.pipe_to_test = consumer_message_pipe
end

context "for a single threaded consumer" do
Expand Down Expand Up @@ -109,7 +111,7 @@ class NoProcessConsumer < Racecar::Consumer
publish_messages
wait_for_messages

message_count_by_worker = incoming_messages.group_by { |m| m.headers.fetch(:processed_by_pid) }.transform_values(&:count)
message_count_by_worker = incoming_messages.group_by { |m| m.headers.fetch("processed_by") }.transform_values(&:count)

expect(incoming_messages.map(&:topic).uniq).to eq([output_topic])
expect(incoming_messages.map(&:payload))
Expand All @@ -129,7 +131,7 @@ class NoProcessConsumer < Racecar::Consumer
publish_messages
wait_for_messages

message_count_by_worker = incoming_messages.group_by { |m| m.headers.fetch(:processed_by_pid) }.transform_values(&:count)
message_count_by_worker = incoming_messages.group_by { |m| m.headers.fetch("processed_by") }.transform_values(&:count)

expect(incoming_messages.count).to eq(6)
expect(incoming_messages.map(&:topic).uniq).to eq([output_topic])
Expand All @@ -144,36 +146,4 @@ class NoProcessConsumer < Racecar::Consumer
after do
Object.send(:remove_const, :IntegrationTestConsumer) if defined?(IntegrationTestConsumer)
end

def echo_consumer_class
test_instance = self

Class.new(Racecar::Consumer) do
class << self
attr_accessor :output_topic
attr_accessor :pipe_to_test
end
self.group_id = test_instance.group_id
self.pipe_to_test = test_instance.consumer_message_pipe.write_end

def self.on_partitions_assigned(topic_partition_list)
Racecar.logger.info("on_partitions_assigned #{topic_partition_list.to_h}")

pipe_to_test.puts(JSON.dump({group_id: self.group_id, pid: Process.pid}))
end

def process(message)
produce(message.value, key: message.key, topic: self.class.output_topic, headers: headers)
deliver!
end

private

def headers
{
processed_by_pid: Process.pid,
}
end
end
end
end
Loading