Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ssl_options to consumers #413

Merged
merged 8 commits into from
Oct 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/kafka_ex/consumer_group/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do
]
)

worker_opts = Keyword.take(opts, [:uris])
worker_opts = Keyword.take(opts, [:uris, :use_ssl, :ssl_options])

{:ok, worker_name} =
KafkaEx.create_worker(
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ defmodule KafkaEx.GenConsumer do
{:ok, consumer_state} =
consumer_module.init(topic, partition, extra_consumer_args)

worker_opts = Keyword.take(opts, [:uris])
worker_opts = Keyword.take(opts, [:uris, :use_ssl, :ssl_options])

{:ok, worker_name} =
KafkaEx.create_worker(
Expand Down
3 changes: 2 additions & 1 deletion test/integration/consumer_group_implementation_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do

setup do
ports_before = num_open_ports()
{:ok, _} = TestPartitioner.start_link()
{:ok, test_partitioner_pid} = TestPartitioner.start_link()

{:ok, consumer_group_pid1} =
ConsumerGroup.start_link(
Expand Down Expand Up @@ -163,6 +163,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do
on_exit(fn ->
sync_stop(consumer_group_pid1)
sync_stop(consumer_group_pid2)
sync_stop(test_partitioner_pid)
end)

{
Expand Down
63 changes: 63 additions & 0 deletions test/integration/consumer_group_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,69 @@ defmodule KafkaEx.ConsumerGroup.Test do
assert consumer_group == :no_consumer_group
end

describe "custom ssl options" do
setup do
# reset application env after each test
env_before = Application.get_all_env(:kafka_ex)

ssl_option_filenames = ["ca-cert", "cert.pem", "key.pem"]
{:ok, cwd} = File.cwd()

original_filenames =
ssl_option_filenames
|> Enum.map(fn filename -> Path.join([cwd, "ssl", filename]) end)

target_filenames =
ssl_option_filenames
|> Enum.map(fn filename ->
Path.rootname(filename) <> "-custom" <> Path.extname(filename)
end)
|> Enum.map(fn filename -> Path.join([cwd, "ssl", filename]) end)

List.zip([original_filenames, target_filenames])
|> Enum.map(fn {original, target} -> File.copy(original, target) end)

on_exit(fn ->
# this is basically Application.put_all_env
for {k, v} <- env_before do
Application.put_env(:kafka_ex, k, v)
end

target_filenames
|> Enum.map(fn filename -> File.rm(filename) end)

:ok
end)

:ok
end

test "create_worker allows us to pass in use_ssl and ssl_options options" do
Application.put_env(:kafka_ex, :use_ssl, true)
ssl_options = Application.get_env(:kafka_ex, :ssl_options)
assert ssl_options == Config.ssl_options()

## These reference symbolic links to the original files in order to validate
## that custom SSL filepaths can specified
custom_ssl_options = [
cacertfile: File.cwd!() <> "/ssl/ca-cert-custom",
certfile: File.cwd!() <> "/ssl/cert-custom.pem",
keyfile: File.cwd!() <> "/ssl/key-custom.pem"
]

{:ok, pid} =
KafkaEx.create_worker(:real,
use_ssl: true,
ssl_options: custom_ssl_options
)

consumer_group = :sys.get_state(pid)
assert consumer_group.ssl_options == custom_ssl_options
refute consumer_group.ssl_options == ssl_options
assert consumer_group.use_ssl == true
end
end

test "create_worker allows us to provide a consumer group" do
{:ok, pid} =
KafkaEx.create_worker(:bah, consumer_group: "my_consumer_group")
Expand Down
14 changes: 8 additions & 6 deletions test/integration/server0_p_10_and_later_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do
Enum.member?(existing_topics(), name)
end)

assert @num_partitions ==
KafkaEx.Protocol.Metadata.Response.partitions_for_topic(
KafkaEx.metadata(),
name
)
|> Enum.count()
wait_for(fn ->
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will explicitly raise an error if it the comparison does not return true which fails the test

  1) test can create a topic (KafkaEx.Server0P10P1AndLater.Test)
     test/integration/server0_p_10_and_later_test.exs:9
     ** (RuntimeError) too many tries waiting for condition
     code: wait_for(fn ->
     stacktrace:
       test/test_helper.exs:165: TestHelper.wait_for_value/5
       test/integration/server0_p_10_and_later_test.exs:27: (test)

@num_partitions ==
KafkaEx.Protocol.Metadata.Response.partitions_for_topic(
KafkaEx.metadata(),
name
)
|> Enum.count()
end)
end

@tag :delete_topic
Expand Down