Skip to content

Commit

Permalink
[KIP-951] Leader discovery optimizations for the client (#4756)
Browse files Browse the repository at this point in the history
contains the code changes to trigger a metadata update when the corresponding tags are received by the broker for partitions that have changed leadership, during Fetch or Produce requests.
  • Loading branch information
anchitj authored Jun 17, 2024
1 parent c526073 commit f47815b
Show file tree
Hide file tree
Showing 10 changed files with 762 additions and 30 deletions.
4 changes: 4 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -4078,6 +4078,10 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,
rd_kafka_purge(rk, rko->rko_u.purge.flags);
break;

case RD_KAFKA_OP_METADATA_UPDATE:
res = rd_kafka_metadata_update_op(rk, rko->rko_u.metadata.mdi);
break;

default:
/* If op has a callback set (e.g., OAUTHBEARER_REFRESH),
* call it. */
Expand Down
217 changes: 203 additions & 14 deletions src/rdkafka_fetcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "rdkafka_offset.h"
#include "rdkafka_msgset.h"
#include "rdkafka_fetcher.h"
#include "rdkafka_request.h"


/**
Expand Down Expand Up @@ -341,20 +342,173 @@ static void rd_kafka_fetch_reply_handle_partition_error(
rd_kafka_toppar_fetch_backoff(rkb, rktp, err);
}

static void rd_kafkap_Fetch_reply_tags_set_topic_cnt(
rd_kafkap_Fetch_reply_tags_t *reply_tags,
int32_t TopicCnt) {
reply_tags->TopicCnt = TopicCnt;
rd_dassert(!reply_tags->Topics);
reply_tags->Topics = rd_calloc(TopicCnt, sizeof(*reply_tags->Topics));
}

static void
rd_kafkap_Fetch_reply_tags_set_topic(rd_kafkap_Fetch_reply_tags_t *reply_tags,
int TopicIdx,
rd_kafka_Uuid_t TopicId,
int32_t PartitionCnt) {
reply_tags->Topics[TopicIdx].TopicId = TopicId;
reply_tags->Topics[TopicIdx].PartitionCnt = PartitionCnt;
rd_dassert(!reply_tags->Topics[TopicIdx].Partitions);
reply_tags->Topics[TopicIdx].Partitions = rd_calloc(
PartitionCnt, sizeof(*reply_tags->Topics[TopicIdx].Partitions));
}


static void
rd_kafkap_Fetch_reply_tags_destroy(rd_kafkap_Fetch_reply_tags_t *reply_tags) {
int i;
for (i = 0; i < reply_tags->TopicCnt; i++) {
RD_IF_FREE(reply_tags->Topics[i].Partitions, rd_free);
}
RD_IF_FREE(reply_tags->Topics, rd_free);
RD_IF_FREE(reply_tags->NodeEndpoints.NodeEndpoints, rd_free);
}

static int rd_kafkap_Fetch_reply_tags_partition_parse(
rd_kafka_buf_t *rkbuf,
uint64_t tagtype,
uint64_t taglen,
rd_kafkap_Fetch_reply_tags_Topic_t *TopicTags,
rd_kafkap_Fetch_reply_tags_Partition_t *PartitionTags) {
switch (tagtype) {
case 1: /* CurrentLeader */
if (rd_kafka_buf_read_CurrentLeader(
rkbuf, &PartitionTags->CurrentLeader) == -1)
goto err_parse;
TopicTags->partitions_with_leader_change_cnt++;
return 1;
default:
return 0;
}
err_parse:
return -1;
}

static int
rd_kafkap_Fetch_reply_tags_parse(rd_kafka_buf_t *rkbuf,
uint64_t tagtype,
uint64_t taglen,
rd_kafkap_Fetch_reply_tags_t *tags) {
switch (tagtype) {
case 0: /* NodeEndpoints */
if (rd_kafka_buf_read_NodeEndpoints(rkbuf,
&tags->NodeEndpoints) == -1)
goto err_parse;
return 1;
default:
return 0;
}
err_parse:
return -1;
}

static void
rd_kafka_handle_Fetch_metadata_update(rd_kafka_broker_t *rkb,
rd_kafkap_Fetch_reply_tags_t *FetchTags) {
if (FetchTags->topics_with_leader_change_cnt &&
FetchTags->NodeEndpoints.NodeEndpoints) {
rd_kafka_metadata_t *md = NULL;
rd_kafka_metadata_internal_t *mdi = NULL;
rd_tmpabuf_t tbuf;
int32_t nodeid;
rd_kafka_op_t *rko;
int i, changed_topic, changed_partition;

rd_kafka_broker_lock(rkb);
nodeid = rkb->rkb_nodeid;
rd_kafka_broker_unlock(rkb);

rd_tmpabuf_new(&tbuf, 0, rd_true /*assert on fail*/);
rd_tmpabuf_add_alloc(&tbuf, sizeof(*mdi));
rd_kafkap_leader_discovery_tmpabuf_add_alloc_brokers(
&tbuf, &FetchTags->NodeEndpoints);
rd_kafkap_leader_discovery_tmpabuf_add_alloc_topics(
&tbuf, FetchTags->topics_with_leader_change_cnt);
for (i = 0; i < FetchTags->TopicCnt; i++) {
if (!FetchTags->Topics[i]
.partitions_with_leader_change_cnt)
continue;
rd_kafkap_leader_discovery_tmpabuf_add_alloc_topic(
&tbuf, NULL,
FetchTags->Topics[i]
.partitions_with_leader_change_cnt);
}
rd_tmpabuf_finalize(&tbuf);

mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi));
md = &mdi->metadata;

rd_kafkap_leader_discovery_metadata_init(mdi, nodeid);

rd_kafkap_leader_discovery_set_brokers(
&tbuf, mdi, &FetchTags->NodeEndpoints);

rd_kafkap_leader_discovery_set_topic_cnt(
&tbuf, mdi, FetchTags->topics_with_leader_change_cnt);

changed_topic = 0;
for (i = 0; i < FetchTags->TopicCnt; i++) {
int j;
if (!FetchTags->Topics[i]
.partitions_with_leader_change_cnt)
continue;

rd_kafkap_leader_discovery_set_topic(
&tbuf, mdi, changed_topic,
FetchTags->Topics[i].TopicId, NULL,
FetchTags->Topics[i]
.partitions_with_leader_change_cnt);

changed_partition = 0;
for (j = 0; j < FetchTags->Topics[i].PartitionCnt;
j++) {
if (FetchTags->Topics[i]
.Partitions[j]
.CurrentLeader.LeaderId < 0)
continue;

rd_kafkap_Fetch_reply_tags_Partition_t
*Partition =
&FetchTags->Topics[i].Partitions[j];
rd_kafkap_leader_discovery_set_CurrentLeader(
&tbuf, mdi, changed_topic,
changed_partition, Partition->Partition,
&Partition->CurrentLeader);
changed_partition++;
}
changed_topic++;
}

rko = rd_kafka_op_new(RD_KAFKA_OP_METADATA_UPDATE);
rko->rko_u.metadata.md = md;
rko->rko_u.metadata.mdi = mdi;
rd_kafka_q_enq(rkb->rkb_rk->rk_ops, rko);
}
}

/**
* @brief Per-partition FetchResponse parsing and handling.
*
* @returns an error on buffer parse failure, else RD_KAFKA_RESP_ERR_NO_ERROR.
*/
static rd_kafka_resp_err_t
rd_kafka_fetch_reply_handle_partition(rd_kafka_broker_t *rkb,
const rd_kafkap_str_t *topic,
rd_kafka_topic_t *rkt /*possibly NULL*/,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
int16_t ErrorCode) {
static rd_kafka_resp_err_t rd_kafka_fetch_reply_handle_partition(
rd_kafka_broker_t *rkb,
const rd_kafkap_str_t *topic,
rd_kafka_topic_t *rkt /*possibly NULL*/,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
int16_t ErrorCode,
rd_kafkap_Fetch_reply_tags_Topic_t *TopicTags,
rd_kafkap_Fetch_reply_tags_Partition_t *PartitionTags) {
const int log_decode_errors = LOG_ERR;
struct rd_kafka_toppar_ver *tver, tver_skel;
rd_kafka_toppar_t *rktp = NULL;
Expand All @@ -375,6 +529,8 @@ rd_kafka_fetch_reply_handle_partition(rd_kafka_broker_t *rkb,

rd_kafka_buf_read_i32(rkbuf, &hdr.Partition);
rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode);
if (PartitionTags)
PartitionTags->Partition = hdr.Partition;
if (ErrorCode)
hdr.ErrorCode = ErrorCode;
rd_kafka_buf_read_i64(rkbuf, &hdr.HighwaterMarkOffset);
Expand Down Expand Up @@ -599,7 +755,16 @@ rd_kafka_fetch_reply_handle_partition(rd_kafka_broker_t *rkb,
rd_kafka_aborted_txns_destroy(aborted_txns);
if (likely(rktp != NULL))
rd_kafka_toppar_destroy(rktp); /*from get()*/
rd_kafka_buf_skip_tags(rkbuf);

if (PartitionTags) {
/* Set default LeaderId and LeaderEpoch */
PartitionTags->CurrentLeader.LeaderId = -1;
PartitionTags->CurrentLeader.LeaderEpoch = -1;
}
rd_kafka_buf_read_tags(rkbuf,
rd_kafkap_Fetch_reply_tags_partition_parse,
TopicTags, PartitionTags);

return RD_KAFKA_RESP_ERR_NO_ERROR;
}

Expand All @@ -613,9 +778,11 @@ rd_kafka_fetch_reply_handle(rd_kafka_broker_t *rkb,
rd_kafka_buf_t *request) {
int32_t TopicArrayCnt;
int i;
const int log_decode_errors = LOG_ERR;
rd_kafka_topic_t *rkt = NULL;
int16_t ErrorCode = RD_KAFKA_RESP_ERR_NO_ERROR;
const int log_decode_errors = LOG_ERR;
rd_kafka_topic_t *rkt = NULL;
int16_t ErrorCode = RD_KAFKA_RESP_ERR_NO_ERROR;
rd_kafkap_Fetch_reply_tags_t FetchTags = RD_ZERO_INIT;
rd_bool_t has_fetch_tags = rd_false;

if (rd_kafka_buf_ApiVersion(request) >= 1) {
int32_t Throttle_Time;
Expand All @@ -638,6 +805,12 @@ rd_kafka_fetch_reply_handle(rd_kafka_broker_t *rkb,
4 /*PartitionArrayCnt*/ + 4 +
2 + 8 + 4 /*inner header*/));

if (rd_kafka_buf_ApiVersion(request) >= 12) {
has_fetch_tags = rd_true;
rd_kafkap_Fetch_reply_tags_set_topic_cnt(&FetchTags,
TopicArrayCnt);
}

for (i = 0; i < TopicArrayCnt; i++) {
rd_kafkap_str_t topic = RD_ZERO_INIT;
rd_kafka_Uuid_t topic_id = RD_KAFKA_UUID_ZERO;
Expand All @@ -657,12 +830,24 @@ rd_kafka_fetch_reply_handle(rd_kafka_broker_t *rkb,

rd_kafka_buf_read_arraycnt(rkbuf, &PartitionArrayCnt,
RD_KAFKAP_PARTITIONS_MAX);
if (rd_kafka_buf_ApiVersion(request) >= 12) {
rd_kafkap_Fetch_reply_tags_set_topic(
&FetchTags, i, topic_id, PartitionArrayCnt);
}

for (j = 0; j < PartitionArrayCnt; j++) {
if (rd_kafka_fetch_reply_handle_partition(
rkb, &topic, rkt, rkbuf, request, ErrorCode))
rkb, &topic, rkt, rkbuf, request, ErrorCode,
has_fetch_tags ? &FetchTags.Topics[i] : NULL,
has_fetch_tags
? &FetchTags.Topics[i].Partitions[j]
: NULL))
goto err_parse;
}
if (has_fetch_tags &&
FetchTags.Topics[i].partitions_with_leader_change_cnt) {
FetchTags.topics_with_leader_change_cnt++;
}

if (rkt) {
rd_kafka_topic_destroy0(rkt);
Expand All @@ -673,7 +858,8 @@ rd_kafka_fetch_reply_handle(rd_kafka_broker_t *rkb,
}

/* Top level tags */
rd_kafka_buf_skip_tags(rkbuf);
rd_kafka_buf_read_tags(rkbuf, rd_kafkap_Fetch_reply_tags_parse,
&FetchTags);

if (rd_kafka_buf_read_remain(rkbuf) != 0) {
rd_kafka_buf_parse_fail(rkbuf,
Expand All @@ -682,12 +868,15 @@ rd_kafka_fetch_reply_handle(rd_kafka_broker_t *rkb,
rd_kafka_buf_read_remain(rkbuf));
RD_NOTREACHED();
}
rd_kafka_handle_Fetch_metadata_update(rkb, &FetchTags);
rd_kafkap_Fetch_reply_tags_destroy(&FetchTags);

return 0;

err_parse:
if (rkt)
rd_kafka_topic_destroy0(rkt);
rd_kafkap_Fetch_reply_tags_destroy(&FetchTags);
rd_rkb_dbg(rkb, MSG, "BADMSG",
"Bad message (Fetch v%d): "
"is broker.version.fallback incorrectly set?",
Expand Down Expand Up @@ -789,7 +978,7 @@ int rd_kafka_broker_fetch_toppars(rd_kafka_broker_t *rkb, rd_ts_t now) {
return 0;

ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb, RD_KAFKAP_Fetch,
0, 15, NULL);
0, 16, NULL);
rkbuf = rd_kafka_buf_new_flexver_request(
rkb, RD_KAFKAP_Fetch, 1,
/* MaxWaitTime+MinBytes+MaxBytes+IsolationLevel+
Expand Down
Loading

0 comments on commit f47815b

Please sign in to comment.