Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Jun 8, 2023
1 parent 3d8d9f9 commit 8a31959
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 11 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ librdkafka v2.2.0 is a feature release:
* [KIP-881](https://cwiki.apache.org/confluence/display/KAFKA/KIP-881%3A+Rack-aware+Partition+Assignment+for+Kafka+Consumers):
Add support for rack-aware partition assignment for consumers
(#4184, #4291, #4252).
* Fix several bugs with sticky assignor in case of partition ownership
changing between members of the consumer group (#4252).


## Fixes
Expand All @@ -23,6 +25,19 @@ librdkafka v2.2.0 is a feature release:
Fixed by skipping the correct amount of bytes when tags are received.


### Consumer fixes

* In case of multiple owners of a partition with different generations, the
sticky assignor would pick the earliest (lowest generation) member as the
current owner, which would lead to stickiness violations. Fixed by
choosing the latest (highest generation) member.
* In case where the same partition is owned by two members with the same
generation, it indicates an issue. The sticky assignor had some code to
handle this, but it was non-functional, and did not have parity with the
Java assignor. Fixed by invalidating any such partition from the current
assignment completely.


# librdkafka v2.1.1

librdkafka v2.1.1 is a maintenance release:
Expand Down
21 changes: 10 additions & 11 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -766,14 +766,13 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_metadata_cache_topic_update(
rk, &md->topics[i], &mdi->topics[i],
rd_false /*propagate later*/,
has_client_rack, /* use has_client_rack
rather than
compute_racks: it's
possible that we only
need the rack in the
outptr mdip, rather than
in the cache. */
mdi->brokers, md->broker_cnt);
/* use has_client_rack rather than
compute_racks. We need cached rack ids
only in case we need to rejoin the group
if they change and client.rack is set
(KIP-881). */
has_client_rack, mdi->brokers,
md->broker_cnt);
cache_changes++;
rd_kafka_wrunlock(rk);
}
Expand Down Expand Up @@ -888,9 +887,9 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_metadata_destroy(
&rkb->rkb_rk->rk_full_metadata->metadata);

/* use has_client_rack rather than compute_racks: it's possible
* that we only need the rack in the outptr mdip, rather than in
* the cache. */
/* use has_client_rack rather than compute_racks. We need cached
* rack ids only in case we need to rejoin the group if they
* change and client.rack is set (KIP-881). */
if (has_client_rack)
rkb->rkb_rk->rk_full_metadata =
rd_kafka_metadata_copy_add_racks(mdi, tbuf.of);
Expand Down

0 comments on commit 8a31959

Please sign in to comment.