diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index 505f6b1e..f80e096b 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -124,7 +124,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do ] ) - worker_opts = Keyword.take(opts, [:uris]) + worker_opts = Keyword.take(opts, [:uris, :use_ssl, :ssl_options]) {:ok, worker_name} = KafkaEx.create_worker( diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex index 651a3593..d29febf2 100644 --- a/lib/kafka_ex/gen_consumer.ex +++ b/lib/kafka_ex/gen_consumer.ex @@ -544,7 +544,7 @@ defmodule KafkaEx.GenConsumer do {:ok, consumer_state} = consumer_module.init(topic, partition, extra_consumer_args) - worker_opts = Keyword.take(opts, [:uris]) + worker_opts = Keyword.take(opts, [:uris, :use_ssl, :ssl_options]) {:ok, worker_name} = KafkaEx.create_worker( diff --git a/test/integration/consumer_group_implementation_test.exs b/test/integration/consumer_group_implementation_test.exs index 2db377e0..6e2cb26e 100644 --- a/test/integration/consumer_group_implementation_test.exs +++ b/test/integration/consumer_group_implementation_test.exs @@ -132,7 +132,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do setup do ports_before = num_open_ports() - {:ok, _} = TestPartitioner.start_link() + {:ok, test_partitioner_pid} = TestPartitioner.start_link() {:ok, consumer_group_pid1} = ConsumerGroup.start_link( @@ -163,6 +163,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do on_exit(fn -> sync_stop(consumer_group_pid1) sync_stop(consumer_group_pid2) + sync_stop(test_partitioner_pid) end) { diff --git a/test/integration/consumer_group_test.exs b/test/integration/consumer_group_test.exs index e608857c..cde96195 100644 --- a/test/integration/consumer_group_test.exs +++ b/test/integration/consumer_group_test.exs @@ -24,6 +24,69 @@ defmodule KafkaEx.ConsumerGroup.Test do assert consumer_group == :no_consumer_group end + describe "custom ssl options" do + setup do + # reset application env after each test + env_before = Application.get_all_env(:kafka_ex) + + ssl_option_filenames = ["ca-cert", "cert.pem", "key.pem"] + {:ok, cwd} = File.cwd() + + original_filenames = + ssl_option_filenames + |> Enum.map(fn filename -> Path.join([cwd, "ssl", filename]) end) + + target_filenames = + ssl_option_filenames + |> Enum.map(fn filename -> + Path.rootname(filename) <> "-custom" <> Path.extname(filename) + end) + |> Enum.map(fn filename -> Path.join([cwd, "ssl", filename]) end) + + List.zip([original_filenames, target_filenames]) + |> Enum.map(fn {original, target} -> File.copy(original, target) end) + + on_exit(fn -> + # this is basically Application.put_all_env + for {k, v} <- env_before do + Application.put_env(:kafka_ex, k, v) + end + + target_filenames + |> Enum.map(fn filename -> File.rm(filename) end) + + :ok + end) + + :ok + end + + test "create_worker allows us to pass in use_ssl and ssl_options options" do + Application.put_env(:kafka_ex, :use_ssl, true) + ssl_options = Application.get_env(:kafka_ex, :ssl_options) + assert ssl_options == Config.ssl_options() + + ## These reference symbolic links to the original files in order to validate + ## that custom SSL filepaths can specified + custom_ssl_options = [ + cacertfile: File.cwd!() <> "/ssl/ca-cert-custom", + certfile: File.cwd!() <> "/ssl/cert-custom.pem", + keyfile: File.cwd!() <> "/ssl/key-custom.pem" + ] + + {:ok, pid} = + KafkaEx.create_worker(:real, + use_ssl: true, + ssl_options: custom_ssl_options + ) + + consumer_group = :sys.get_state(pid) + assert consumer_group.ssl_options == custom_ssl_options + refute consumer_group.ssl_options == ssl_options + assert consumer_group.use_ssl == true + end + end + test "create_worker allows us to provide a consumer group" do {:ok, pid} = KafkaEx.create_worker(:bah, consumer_group: "my_consumer_group") diff --git a/test/integration/server0_p_10_and_later_test.exs b/test/integration/server0_p_10_and_later_test.exs index 8ee52953..bc30266d 100644 --- a/test/integration/server0_p_10_and_later_test.exs +++ b/test/integration/server0_p_10_and_later_test.exs @@ -24,12 +24,14 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do Enum.member?(existing_topics(), name) end) - assert @num_partitions == - KafkaEx.Protocol.Metadata.Response.partitions_for_topic( - KafkaEx.metadata(), - name - ) - |> Enum.count() + wait_for(fn -> + @num_partitions == + KafkaEx.Protocol.Metadata.Response.partitions_for_topic( + KafkaEx.metadata(), + name + ) + |> Enum.count() + end) end @tag :delete_topic