Skip to content

Commit

Permalink
PR feedback 8th round
Browse files Browse the repository at this point in the history
  • Loading branch information
anchitj committed Jun 17, 2024
1 parent 9a1a8d2 commit ef5c5ba
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 59 deletions.
47 changes: 34 additions & 13 deletions src/rdkafka_fetcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -414,15 +414,14 @@ rd_kafkap_Fetch_reply_tags_parse(rd_kafka_buf_t *rkbuf,
static void
rd_kafka_handle_Fetch_metadata_update(rd_kafka_broker_t *rkb,
rd_kafkap_Fetch_reply_tags_t *FetchTags) {

if (FetchTags->topics_with_leader_change_cnt &&
FetchTags->NodeEndpoints.NodeEndpoints) {
rd_kafka_metadata_t *md = NULL;
rd_kafka_metadata_internal_t *mdi = NULL;
rd_kafkap_Fetch_reply_tags_Partition_t *Partition;
rd_tmpabuf_t tbuf;
int32_t nodeid, i, j;
int32_t nodeid;
rd_kafka_op_t *rko;
int i, changed_topic, changed_partition;

rd_kafka_broker_lock(rkb);
nodeid = rkb->rkb_nodeid;
Expand All @@ -433,10 +432,15 @@ rd_kafka_handle_Fetch_metadata_update(rd_kafka_broker_t *rkb,
rd_kafkap_leader_discovery_tmpabuf_add_alloc_brokers(
&tbuf, &FetchTags->NodeEndpoints);
rd_kafkap_leader_discovery_tmpabuf_add_alloc_topics(
&tbuf, FetchTags->TopicCnt);
&tbuf, FetchTags->topics_with_leader_change_cnt);
for (i = 0; i < FetchTags->TopicCnt; i++) {
if (!FetchTags->Topics[i]
.partitions_with_leader_change_cnt)
continue;
rd_kafkap_leader_discovery_tmpabuf_add_alloc_topic(
&tbuf, NULL, FetchTags->Topics[i].PartitionCnt);
&tbuf, NULL,
FetchTags->Topics[i]
.partitions_with_leader_change_cnt);
}
rd_tmpabuf_finalize(&tbuf);

Expand All @@ -448,32 +452,49 @@ rd_kafka_handle_Fetch_metadata_update(rd_kafka_broker_t *rkb,
rd_kafkap_leader_discovery_set_brokers(
&tbuf, mdi, &FetchTags->NodeEndpoints);

rd_kafkap_leader_discovery_set_topic_cnt(&tbuf, mdi,
FetchTags->TopicCnt);
rd_kafkap_leader_discovery_set_topic_cnt(
&tbuf, mdi, FetchTags->topics_with_leader_change_cnt);

changed_topic = 0;
for (i = 0; i < FetchTags->TopicCnt; i++) {
if (FetchTags->Topics[i].PartitionCnt == 0)
int j;
if (!FetchTags->Topics[i]
.partitions_with_leader_change_cnt)
continue;

rd_kafkap_leader_discovery_set_topic(
&tbuf, mdi, i, FetchTags->Topics[i].TopicId, NULL,
FetchTags->Topics[i].PartitionCnt);
&tbuf, mdi, changed_topic,
FetchTags->Topics[i].TopicId, NULL,
FetchTags->Topics[i]
.partitions_with_leader_change_cnt);

changed_partition = 0;
for (j = 0; j < FetchTags->Topics[i].PartitionCnt;
j++) {
Partition = &FetchTags->Topics[i].Partitions[j];
if (FetchTags->Topics[i]
.Partitions[j]
.CurrentLeader.LeaderId < 0)
continue;

rd_kafkap_Fetch_reply_tags_Partition_t
*Partition =
&FetchTags->Topics[i].Partitions[j];
rd_kafkap_leader_discovery_set_CurrentLeader(
&tbuf, mdi, i, j, Partition->Partition,
&tbuf, mdi, changed_topic,
changed_partition, Partition->Partition,
&Partition->CurrentLeader);
changed_partition++;
}
changed_topic++;
}

rko = rd_kafka_op_new(RD_KAFKA_OP_METADATA_UPDATE);
rko->rko_u.metadata.md = md;
rko->rko_u.metadata.mdi = mdi;
rd_kafka_q_enq(rkb->rkb_rk->rk_ops, rko);
}
}


/**
* @brief Per-partition FetchResponse parsing and handling.
*
Expand Down
43 changes: 23 additions & 20 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -1989,22 +1989,22 @@ rd_kafka_metadata_update_op(rd_kafka_t *rk, rd_kafka_metadata_internal_t *mdi) {
rd_kafka_metadata_t *md = &mdi->metadata;
rd_kafka_broker_t *rkb;
rd_bool_t cache_updated = rd_false;
rd_kafka_metadata_partition_t *mdp;
rd_kafka_metadata_partition_internal_t *mdpi;
rd_kafka_secproto_t rkb_proto = rk->rk_conf.security_protocol;


for (i = 0; i < md->broker_cnt; i++) {
rd_kafka_broker_update(rk, rkb_proto, &md->brokers[i], NULL);
}

struct rd_kafka_metadata_cache_entry *rkmce;
int32_t partition_cache_changes = 0, part, current_leader_epoch;
rd_bool_t by_id = rd_false;
rd_kafka_Uuid_t topic_id;
char *topic;
for (i = 0; i < md->topic_cnt; i++) {
by_id = !RD_KAFKA_UUID_IS_ZERO(mdi->topics[i].topic_id);
struct rd_kafka_metadata_cache_entry *rkmce;
int32_t partition_cache_changes = 0, part, current_leader_epoch;
rd_bool_t by_id = !RD_KAFKA_UUID_IS_ZERO(mdi->topics[i].topic_id);
rd_kafka_Uuid_t topic_id;
char *topic;
rd_kafka_metadata_partition_t *mdp;
rd_kafka_metadata_partition_internal_t *mdpi;

if (by_id) {
rkmce = rd_kafka_metadata_cache_find_by_id(
rk, mdi->topics[i].topic_id, 1);
Expand Down Expand Up @@ -2036,18 +2036,18 @@ rd_kafka_metadata_update_op(rd_kafka_t *rk, rd_kafka_metadata_internal_t *mdi) {

part = mdp->id;

if (mdp->id >= rkmce->rkmce_mtopic.partition_cnt) {
if (part >= rkmce->rkmce_mtopic.partition_cnt) {
if (by_id) {
rd_kafka_log(
rk, LOG_WARNING, "METADATAUPDATE",
"Partition %s [%d]: not found "
"Partition %s [%" PRId32 "]: not found "
"in cache",
rd_kafka_Uuid_base64str(&topic_id),
part);
} else {
rd_kafka_log(
rk, LOG_WARNING, "METADATAUPDATE",
"Partition %s [%d]: not found in "
"Partition %s [%" PRId32 "]: not found in "
"cache",
topic, part);
}
Expand All @@ -2059,14 +2059,14 @@ rd_kafka_metadata_update_op(rd_kafka_t *rk, rd_kafka_metadata_internal_t *mdi) {
if (by_id) {
rd_kafka_log(
rk, LOG_WARNING, "METADATAUPDATE",
"Partition %s [%d]: new leader"
"Partition %s [%" PRId32 "]: new leader"
"%" PRId32 " not found in cache",
rd_kafka_Uuid_base64str(&topic_id),
part, mdp->leader);
} else {
rd_kafka_log(
rk, LOG_WARNING, "METADATAUPDATE",
"Partition %s [%d]: new leader "
"Partition %s [%" PRId32 "]: new leader "
"%" PRId32 " not found in cache",
topic, part, mdp->leader);
}
Expand All @@ -2083,7 +2083,7 @@ rd_kafka_metadata_update_op(rd_kafka_t *rk, rd_kafka_metadata_internal_t *mdi) {
if (by_id) {
rd_kafka_dbg(
rk, METADATA, "METADATAUPDATE",
"Partition %s [%d]: leader epoch "
"Partition %s [%" PRId32 "]: leader epoch "
"is "
"not newer %" PRId32 " >= %" PRId32,
rd_kafka_Uuid_base64str(&topic_id),
Expand All @@ -2092,7 +2092,7 @@ rd_kafka_metadata_update_op(rd_kafka_t *rk, rd_kafka_metadata_internal_t *mdi) {
} else {
rd_kafka_dbg(
rk, METADATA, "METADATAUPDATE",
"Partition %s [%d]: leader epoch "
"Partition %s [%" PRId32 "]: leader epoch "
"is "
"not newer %" PRId32 " >= %" PRId32,
topic, part, current_leader_epoch,
Expand All @@ -2106,24 +2106,24 @@ rd_kafka_metadata_update_op(rd_kafka_t *rk, rd_kafka_metadata_internal_t *mdi) {
/* Need to acquire the write lock to avoid dirty reads
* from other threads acquiring read locks. */
rd_kafka_wrlock(rk);
rkmce->rkmce_metadata_internal_topic.partitions[mdp->id]
rkmce->rkmce_metadata_internal_topic.partitions[part]
.leader_epoch = mdpi->leader_epoch;
rkmce->rkmce_mtopic.partitions[mdp->id].leader =
rkmce->rkmce_mtopic.partitions[part].leader =
mdp->leader;
rd_kafka_wrunlock(rk);
rd_kafka_broker_destroy(rkb);

if (by_id) {
rd_kafka_dbg(rk, METADATA, "METADATAUPDATE",
"Partition %s [%d] "
"Partition %s [%" PRId32 "]: "
" updated with leader %" PRId32
" and epoch %" PRId32,
rd_kafka_Uuid_base64str(&topic_id),
part, mdp->leader,
mdpi->leader_epoch);
} else {
rd_kafka_dbg(rk, METADATA, "METADATAUPDATE",
"Partition %s [%d] "
"Partition %s [%" PRId32 "]: "
" updated with leader %" PRId32
" and epoch %" PRId32,
topic, part, mdp->leader,
Expand All @@ -2137,8 +2137,11 @@ rd_kafka_metadata_update_op(rd_kafka_t *rk, rd_kafka_metadata_internal_t *mdi) {
&rkmce->rkmce_metadata_internal_topic);
}

if (!cache_updated)
if (!cache_updated) {
rd_kafka_dbg(rk, METADATA, "METADATAUPDATE",
"Cache was not updated");
return RD_KAFKA_OP_RES_HANDLED;
}

rd_kafka_dbg(rk, METADATA, "METADATAUPDATE",
"Metadata cache updated, propagating changes");
Expand Down
26 changes: 3 additions & 23 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -3419,7 +3419,7 @@ void rd_kafkap_leader_discovery_metadata_init(rd_kafka_metadata_internal_t *mdi,
mdi->cluster_authorized_operations = -1;
}

int rd_kafkap_leader_discovery_set_brokers(
void rd_kafkap_leader_discovery_set_brokers(
rd_tmpabuf_t *tbuf,
rd_kafka_metadata_internal_t *mdi,
rd_kafkap_NodeEndpoints_t *NodeEndpoints) {
Expand All @@ -3436,9 +3436,6 @@ int rd_kafkap_leader_discovery_set_brokers(
mdi->brokers_sorted = rd_tmpabuf_alloc(tbuf, md_brokers_size);
mdi->brokers = rd_tmpabuf_alloc(tbuf, mdi_brokers_size);

if (!md->brokers || !mdi->brokers_sorted || !mdi->brokers)
return -1;

for (i = 0; i < NodeEndpoints->NodeEndpointCnt; i++) {
rd_kafkap_NodeEndpoint_t *NodeEndpoint =
&NodeEndpoints->NodeEndpoints[i];
Expand All @@ -3453,8 +3450,6 @@ int rd_kafkap_leader_discovery_set_brokers(
RD_KAFKAP_STR_LEN(&NodeEndpoint->Host) + 1,
"%.*s",
RD_KAFKAP_STR_PR(&NodeEndpoint->Host));
if (!mdb->host)
return -1;
}
mdb->port = NodeEndpoints->NodeEndpoints[i].Port;

Expand All @@ -3469,11 +3464,9 @@ int rd_kafkap_leader_discovery_set_brokers(
sizeof(*mdi->brokers_sorted) * md->broker_cnt);
qsort(mdi->brokers_sorted, md->broker_cnt, sizeof(*mdi->brokers_sorted),
rd_kafka_metadata_broker_cmp);

return 0;
}

int rd_kafkap_leader_discovery_set_topic_cnt(rd_tmpabuf_t *tbuf,
void rd_kafkap_leader_discovery_set_topic_cnt(rd_tmpabuf_t *tbuf,
rd_kafka_metadata_internal_t *mdi,
int topic_cnt) {

Expand All @@ -3482,14 +3475,9 @@ int rd_kafkap_leader_discovery_set_topic_cnt(rd_tmpabuf_t *tbuf,
md->topic_cnt = topic_cnt;
md->topics = rd_tmpabuf_alloc(tbuf, sizeof(*md->topics) * topic_cnt);
mdi->topics = rd_tmpabuf_alloc(tbuf, sizeof(*mdi->topics) * topic_cnt);

if (!md->topics || !mdi->topics)
return -1;

return 0;
}

int rd_kafkap_leader_discovery_set_topic(rd_tmpabuf_t *tbuf,
void rd_kafkap_leader_discovery_set_topic(rd_tmpabuf_t *tbuf,
rd_kafka_metadata_internal_t *mdi,
int topic_idx,
rd_kafka_Uuid_t topic_id,
Expand All @@ -3511,19 +3499,11 @@ int rd_kafkap_leader_discovery_set_topic(rd_tmpabuf_t *tbuf,
rd_snprintf(mdt->topic, strlen(topic_name) + 1, "%s",
topic_name);

if ((topic_name && !mdt->topic) || !mdt->partitions)
return -1;

memset(mdti, 0, sizeof(*mdti));
mdti->partitions =
rd_tmpabuf_alloc(tbuf, sizeof(*mdti->partitions) * partition_cnt);
mdti->topic_id = topic_id;
mdti->topic_authorized_operations = -1;

if (!mdti->partitions)
return -1;

return 0;
}

void rd_kafkap_leader_discovery_set_CurrentLeader(
Expand Down
6 changes: 3 additions & 3 deletions src/rdkafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -631,16 +631,16 @@ void rd_kafkap_leader_discovery_tmpabuf_add_alloc_topic(rd_tmpabuf_t *tbuf,
void rd_kafkap_leader_discovery_metadata_init(rd_kafka_metadata_internal_t *mdi,
int32_t broker_id);

int rd_kafkap_leader_discovery_set_brokers(
void rd_kafkap_leader_discovery_set_brokers(
rd_tmpabuf_t *tbuf,
rd_kafka_metadata_internal_t *mdi,
rd_kafkap_NodeEndpoints_t *NodeEndpoints);

int rd_kafkap_leader_discovery_set_topic_cnt(rd_tmpabuf_t *tbuf,
void rd_kafkap_leader_discovery_set_topic_cnt(rd_tmpabuf_t *tbuf,
rd_kafka_metadata_internal_t *mdi,
int topic_cnt);

int rd_kafkap_leader_discovery_set_topic(rd_tmpabuf_t *tbuf,
void rd_kafkap_leader_discovery_set_topic(rd_tmpabuf_t *tbuf,
rd_kafka_metadata_internal_t *mdi,
int topic_idx,
rd_kafka_Uuid_t topic_id,
Expand Down

0 comments on commit ef5c5ba

Please sign in to comment.