Skip to content

Commit

Permalink
Merge pull request #379 from kafkaex/kayrock_producer_with_partitioner
Browse files Browse the repository at this point in the history
Fix default partitioner for kayrock
  • Loading branch information
dantswain authored Nov 4, 2019
2 parents 9c93293 + 7120fa2 commit c7ba6a5
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 11 deletions.
29 changes: 18 additions & 11 deletions lib/kafka_ex/new/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,10 @@ defmodule KafkaEx.New.Client do
end

def handle_call({:topic_metadata, topics, allow_topic_creation}, _from, state) do
allow_auto_topic_creation = state.allow_auto_topic_creation

updated_state =
update_metadata(
%{state | allow_auto_topic_creation: allow_topic_creation},
topics
)

topic_metadata = State.topics_metadata(updated_state, topics)
{topic_metadata, updated_state} =
fetch_topics_metadata(state, topics, allow_topic_creation)

{:reply, {:ok, topic_metadata},
%{updated_state | allow_auto_topic_creation: allow_auto_topic_creation}}
{:reply, {:ok, topic_metadata}, updated_state}
end

def handle_call({:kayrock_request, request, node_selector}, _from, state) do
Expand Down Expand Up @@ -679,4 +671,19 @@ defmodule KafkaEx.New.Client do
)
end
end

defp fetch_topics_metadata(state, topics, allow_topic_creation) do
allow_auto_topic_creation = state.allow_auto_topic_creation

updated_state =
update_metadata(
%{state | allow_auto_topic_creation: allow_topic_creation},
topics
)

topic_metadata = State.topics_metadata(updated_state, topics)

{topic_metadata,
%{updated_state | allow_auto_topic_creation: allow_auto_topic_creation}}
end
end
20 changes: 20 additions & 0 deletions lib/kafka_ex/new/client_compatibility.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ defmodule KafkaEx.New.ClientCompatibility do
end

def handle_call({:produce, produce_request}, _from, state) do
# the partitioner will need to know the topic's metadata
# note we also try to create the topic if it does not exist
state = ensure_topics_metadata(state, [produce_request.topic], true)

produce_request =
default_partitioner().assign_partition(
produce_request,
Expand Down Expand Up @@ -326,6 +330,22 @@ defmodule KafkaEx.New.ClientCompatibility do
{:reply, response, updated_state}
end

######################################################################
# helper functions only used for compatibility

defp ensure_topics_metadata(state, topics, allow_topic_creation) do
case State.topics_metadata(state, topics) do
metadata when length(metadata) == length(topics) ->
state

_ ->
{_, updated_state} =
fetch_topics_metadata(state, topics, allow_topic_creation)

updated_state
end
end

######################################################################
end
end
Expand Down
5 changes: 5 additions & 0 deletions test/integration/kayrock/compatibility_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -617,4 +617,9 @@ defmodule KafkaEx.KayrockCompatibilityTest do
KafkaEx.stream(random_string, 0, offset: 0, worker_name: client)
KafkaEx.stream(random_string, 0, offset: 0, worker_name: client)
end

test "produce with the default partitioner works", %{client: client} do
topic = TestHelper.generate_random_string()
:ok = KafkaEx.produce(topic, nil, "hello", worker_name: client)
end
end

0 comments on commit c7ba6a5

Please sign in to comment.