Skip to content

Commit

Permalink
PR feedback 5th round
Browse files Browse the repository at this point in the history
  • Loading branch information
anchitj committed Jun 17, 2024
1 parent 1018042 commit 89fddc4
Show file tree
Hide file tree
Showing 3 changed files with 303 additions and 192 deletions.
162 changes: 54 additions & 108 deletions src/rdkafka_fetcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,117 +380,63 @@ static int rd_kafka_fetch_reply_handle_read_tag(rd_kafka_buf_t *rkbuf,
return -1;
}

static void enqueue_new_metadata_op(rd_kafka_broker_t *rkb,
rd_kafka_buf_t *rkbuf,
rd_kafkap_Fetch_reply_tags_t *tags) {
rd_kafka_op_t *rko;
rd_kafka_metadata_internal_t *mdi = NULL;
rd_kafka_metadata_t *md = NULL;
rd_tmpabuf_t tbuf;
size_t rkb_namelen;
int i, j;

rko = rd_kafka_op_new(RD_KAFKA_OP_METADATA_UPDATE);

rd_kafka_broker_lock(rkb);
rkb_namelen = strlen(rkb->rkb_name) + 1;

rd_tmpabuf_new(&tbuf, 0, rd_false /*dont assert on fail*/);
rd_tmpabuf_add_alloc(&tbuf, sizeof(*mdi));
rd_tmpabuf_add_alloc(&tbuf, rkb_namelen);

rd_tmpabuf_add_alloc(&tbuf, rkbuf->rkbuf_totlen * 5);
rd_tmpabuf_finalize(&tbuf);

if (!(mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi))))
goto err_parse;

md = &mdi->metadata;
md->orig_broker_id = rkb->rkb_nodeid;
md->orig_broker_name =
rd_tmpabuf_write(&tbuf, rkb->rkb_name, rkb_namelen);
rd_kafka_broker_unlock(rkb);

md->broker_cnt = tags->node_endpoints.NodeEndpointCnt;

if (!(md->brokers = rd_tmpabuf_alloc(&tbuf, md->broker_cnt *
sizeof(*md->brokers))))
goto err_parse;

if (!(mdi->brokers = rd_tmpabuf_alloc(
&tbuf, md->broker_cnt * sizeof(*mdi->brokers))))
goto err_parse;

if (!(mdi->brokers_sorted = rd_tmpabuf_alloc(
&tbuf, md->broker_cnt * sizeof(*mdi->brokers_sorted))))
goto err_parse;

for (i = 0; i < md->broker_cnt; i++) {
md->brokers[i].id =
tags->node_endpoints.NodeEndpoints[i].NodeId;
md->brokers[i].host =
rd_strndup(tags->node_endpoints.NodeEndpoints[i].Host.str,
tags->node_endpoints.NodeEndpoints[i].Host.len);
md->brokers[i].port =
tags->node_endpoints.NodeEndpoints[i].Port;

if (tags->node_endpoints.NodeEndpoints[i].Rack.len >= 0)
mdi->brokers[i].rack_id = rd_strndup(
tags->node_endpoints.NodeEndpoints[i].Rack.str,
tags->node_endpoints.NodeEndpoints[i].Rack.len);
mdi->brokers[i].id =
tags->node_endpoints.NodeEndpoints[i].NodeId;
}
static void rd_kafka_handle_Fetch_metadata_update(rd_kafka_broker_t *rkb,
rd_kafkap_Fetch_reply_tags_t *FetchTags) {

if (FetchTags->topics_with_leader_change_cnt) {
rd_kafka_metadata_t *md = NULL;
rd_kafka_metadata_internal_t *mdi = NULL;
rd_kafkap_Fetch_reply_tags_Partition_t *Partition;
rd_tmpabuf_t tbuf;
int32_t nodeid, i, j;
rd_kafka_op_t *rko;

rd_kafka_broker_lock(rkb);
nodeid = rkb->rkb_nodeid;
rd_kafka_broker_unlock(rkb);

rd_tmpabuf_new(&tbuf, 0, rd_true /*assert on fail*/);
rd_tmpabuf_add_alloc(&tbuf, sizeof(*mdi));
rd_kafkap_leader_discovery_tmpabuf_add_alloc_brokers(
&tbuf, &FetchTags->node_endpoints);
rd_kafkap_leader_discovery_tmpabuf_add_alloc_topics(
&tbuf, FetchTags->TopicCnt);
for (i = 0; i < FetchTags->TopicCnt; i++) {
rd_kafkap_leader_discovery_tmpabuf_add_alloc_topic(
&tbuf, NULL, FetchTags->Topics[i].PartitionCnt);
}
rd_tmpabuf_finalize(&tbuf);

qsort(mdi->brokers, md->broker_cnt, sizeof(mdi->brokers[0]),
rd_kafka_metadata_broker_internal_cmp);
memcpy(mdi->brokers_sorted, md->brokers,
sizeof(*mdi->brokers_sorted) * md->broker_cnt);
qsort(mdi->brokers_sorted, md->broker_cnt, sizeof(*mdi->brokers_sorted),
rd_kafka_metadata_broker_cmp);

md->topic_cnt = tags->TopicCnt;
if (!(md->topics =
rd_tmpabuf_alloc(&tbuf, md->topic_cnt * sizeof(*md->topics))))
goto err_parse;
if (!(mdi->topics = rd_tmpabuf_alloc(&tbuf, md->topic_cnt *
sizeof(*mdi->topics))))
goto err_parse;


for (i = 0; i < md->topic_cnt; i++) {
// TODO: md->topics[i].topic Topic name by
// tags->Topics[i].TopicId
md->topics[i].partition_cnt = tags->Topics[i].PartitionCnt;
if (!(md->topics[i].partitions = rd_tmpabuf_alloc(
&tbuf, md->topics[i].partition_cnt *
sizeof(*md->topics[i].partitions))))
goto err_parse;
if (!(mdi->topics[i].partitions = rd_tmpabuf_alloc(
&tbuf, md->topics[i].partition_cnt *
sizeof(*mdi->topics[i].partitions))))
goto err_parse;
mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi));
md = &mdi->metadata;

rd_kafkap_leader_discovery_metadata_init(mdi, nodeid);

rd_kafkap_leader_discovery_set_brokers(&tbuf, mdi,
&FetchTags->node_endpoints);

for (j = 0; j < md->topics[i].partition_cnt; j++) {
md->topics[i].partitions[j].id =
tags->Topics[i].Partitions[j].PartitionIndex;
md->topics[i].partitions[j].leader =
tags->Topics[i]
.Partitions[j]
.CurrentLeader.LeaderId;
mdi->topics[i].partitions[j].id =
tags->Topics[i].Partitions[j].PartitionIndex;
mdi->topics[i].partitions[j].leader_epoch =
tags->Topics[i]
.Partitions[j]
.CurrentLeader.LeaderEpoch;
rd_kafkap_leader_discovery_set_topic_cnt(&tbuf, mdi,
FetchTags->TopicCnt);

for (i = 0; i < FetchTags->TopicCnt; i++) {
if (FetchTags->Topics[i].PartitionCnt == 0)
continue;
rd_kafkap_leader_discovery_set_topic(
&tbuf, mdi, i, FetchTags->Topics[i].TopicId, NULL,
FetchTags->Topics[i].PartitionCnt);

for (j = 0; j < FetchTags->Topics[i].PartitionCnt; j++) {
Partition = &FetchTags->Topics[i].Partitions[j];
rd_kafkap_leader_discovery_set_CurrentLeader(
&tbuf, mdi, i, j, Partition->Partition,
&Partition->CurrentLeader);
}
}
rko = rd_kafka_op_new(RD_KAFKA_OP_METADATA_UPDATE);
rko->rko_u.metadata.md = md;
rko->rko_u.metadata.mdi = mdi;
rd_kafka_q_enq(rkb->rkb_rk->rk_ops, rko);
}
rko->rko_u.metadata.mdi = mdi;
rd_kafka_q_enq(rkb->rkb_rk->rk_ops, rko);

err_parse:
return;
}


Expand Down Expand Up @@ -877,7 +823,7 @@ rd_kafka_fetch_reply_handle(rd_kafka_broker_t *rkb,
rkbuf, _tag, _taglen, &fetch_tags) == -1)
goto err_parse;
}
enqueue_new_metadata_op(rkb, rkbuf, &fetch_tags);
rd_kafka_handle_Fetch_metadata_update(rkb, &fetch_tags);
} else if (rd_kafka_buf_ApiVersion(request) >= 12)
rd_kafka_buf_skip_tags(rkbuf);

Expand Down
Loading

0 comments on commit 89fddc4

Please sign in to comment.