From f47815bcdd3aab6c6d8432434e26874911a4d0c1 Mon Sep 17 00:00:00 2001 From: Anchit Jain <112778471+anchitj@users.noreply.github.com> Date: Mon, 17 Jun 2024 22:20:01 +0530 Subject: [PATCH] [KIP-951] Leader discovery optimizations for the client (#4756) contains the code changes to trigger a metadata update when the corresponding tags are received by the broker for partitions that have changed leadership, during Fetch or Produce requests. --- src/rdkafka.c | 4 + src/rdkafka_fetcher.c | 217 ++++++++++++++++++++++++-- src/rdkafka_metadata.c | 151 ++++++++++++++++++ src/rdkafka_metadata.h | 2 + src/rdkafka_msgset_writer.c | 3 +- src/rdkafka_op.c | 12 +- src/rdkafka_op.h | 4 +- src/rdkafka_request.c | 296 +++++++++++++++++++++++++++++++++++- src/rdkafka_request.h | 88 +++++++++++ src/rdkafka_topic.c | 15 +- 10 files changed, 762 insertions(+), 30 deletions(-) diff --git a/src/rdkafka.c b/src/rdkafka.c index a23bad4693..9c2cf3ac89 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -4078,6 +4078,10 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk, rd_kafka_purge(rk, rko->rko_u.purge.flags); break; + case RD_KAFKA_OP_METADATA_UPDATE: + res = rd_kafka_metadata_update_op(rk, rko->rko_u.metadata.mdi); + break; + default: /* If op has a callback set (e.g., OAUTHBEARER_REFRESH), * call it. */ diff --git a/src/rdkafka_fetcher.c b/src/rdkafka_fetcher.c index b38b107464..98f5e72f92 100644 --- a/src/rdkafka_fetcher.c +++ b/src/rdkafka_fetcher.c @@ -37,6 +37,7 @@ #include "rdkafka_offset.h" #include "rdkafka_msgset.h" #include "rdkafka_fetcher.h" +#include "rdkafka_request.h" /** @@ -341,20 +342,173 @@ static void rd_kafka_fetch_reply_handle_partition_error( rd_kafka_toppar_fetch_backoff(rkb, rktp, err); } +static void rd_kafkap_Fetch_reply_tags_set_topic_cnt( + rd_kafkap_Fetch_reply_tags_t *reply_tags, + int32_t TopicCnt) { + reply_tags->TopicCnt = TopicCnt; + rd_dassert(!reply_tags->Topics); + reply_tags->Topics = rd_calloc(TopicCnt, sizeof(*reply_tags->Topics)); +} + +static void +rd_kafkap_Fetch_reply_tags_set_topic(rd_kafkap_Fetch_reply_tags_t *reply_tags, + int TopicIdx, + rd_kafka_Uuid_t TopicId, + int32_t PartitionCnt) { + reply_tags->Topics[TopicIdx].TopicId = TopicId; + reply_tags->Topics[TopicIdx].PartitionCnt = PartitionCnt; + rd_dassert(!reply_tags->Topics[TopicIdx].Partitions); + reply_tags->Topics[TopicIdx].Partitions = rd_calloc( + PartitionCnt, sizeof(*reply_tags->Topics[TopicIdx].Partitions)); +} + + +static void +rd_kafkap_Fetch_reply_tags_destroy(rd_kafkap_Fetch_reply_tags_t *reply_tags) { + int i; + for (i = 0; i < reply_tags->TopicCnt; i++) { + RD_IF_FREE(reply_tags->Topics[i].Partitions, rd_free); + } + RD_IF_FREE(reply_tags->Topics, rd_free); + RD_IF_FREE(reply_tags->NodeEndpoints.NodeEndpoints, rd_free); +} + +static int rd_kafkap_Fetch_reply_tags_partition_parse( + rd_kafka_buf_t *rkbuf, + uint64_t tagtype, + uint64_t taglen, + rd_kafkap_Fetch_reply_tags_Topic_t *TopicTags, + rd_kafkap_Fetch_reply_tags_Partition_t *PartitionTags) { + switch (tagtype) { + case 1: /* CurrentLeader */ + if (rd_kafka_buf_read_CurrentLeader( + rkbuf, &PartitionTags->CurrentLeader) == -1) + goto err_parse; + TopicTags->partitions_with_leader_change_cnt++; + return 1; + default: + return 0; + } +err_parse: + return -1; +} + +static int +rd_kafkap_Fetch_reply_tags_parse(rd_kafka_buf_t *rkbuf, + uint64_t tagtype, + uint64_t taglen, + rd_kafkap_Fetch_reply_tags_t *tags) { + switch (tagtype) { + case 0: /* NodeEndpoints */ + if (rd_kafka_buf_read_NodeEndpoints(rkbuf, + &tags->NodeEndpoints) == -1) + goto err_parse; + return 1; + default: + return 0; + } +err_parse: + return -1; +} +static void +rd_kafka_handle_Fetch_metadata_update(rd_kafka_broker_t *rkb, + rd_kafkap_Fetch_reply_tags_t *FetchTags) { + if (FetchTags->topics_with_leader_change_cnt && + FetchTags->NodeEndpoints.NodeEndpoints) { + rd_kafka_metadata_t *md = NULL; + rd_kafka_metadata_internal_t *mdi = NULL; + rd_tmpabuf_t tbuf; + int32_t nodeid; + rd_kafka_op_t *rko; + int i, changed_topic, changed_partition; + + rd_kafka_broker_lock(rkb); + nodeid = rkb->rkb_nodeid; + rd_kafka_broker_unlock(rkb); + + rd_tmpabuf_new(&tbuf, 0, rd_true /*assert on fail*/); + rd_tmpabuf_add_alloc(&tbuf, sizeof(*mdi)); + rd_kafkap_leader_discovery_tmpabuf_add_alloc_brokers( + &tbuf, &FetchTags->NodeEndpoints); + rd_kafkap_leader_discovery_tmpabuf_add_alloc_topics( + &tbuf, FetchTags->topics_with_leader_change_cnt); + for (i = 0; i < FetchTags->TopicCnt; i++) { + if (!FetchTags->Topics[i] + .partitions_with_leader_change_cnt) + continue; + rd_kafkap_leader_discovery_tmpabuf_add_alloc_topic( + &tbuf, NULL, + FetchTags->Topics[i] + .partitions_with_leader_change_cnt); + } + rd_tmpabuf_finalize(&tbuf); + + mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi)); + md = &mdi->metadata; + + rd_kafkap_leader_discovery_metadata_init(mdi, nodeid); + + rd_kafkap_leader_discovery_set_brokers( + &tbuf, mdi, &FetchTags->NodeEndpoints); + + rd_kafkap_leader_discovery_set_topic_cnt( + &tbuf, mdi, FetchTags->topics_with_leader_change_cnt); + + changed_topic = 0; + for (i = 0; i < FetchTags->TopicCnt; i++) { + int j; + if (!FetchTags->Topics[i] + .partitions_with_leader_change_cnt) + continue; + + rd_kafkap_leader_discovery_set_topic( + &tbuf, mdi, changed_topic, + FetchTags->Topics[i].TopicId, NULL, + FetchTags->Topics[i] + .partitions_with_leader_change_cnt); + + changed_partition = 0; + for (j = 0; j < FetchTags->Topics[i].PartitionCnt; + j++) { + if (FetchTags->Topics[i] + .Partitions[j] + .CurrentLeader.LeaderId < 0) + continue; + + rd_kafkap_Fetch_reply_tags_Partition_t + *Partition = + &FetchTags->Topics[i].Partitions[j]; + rd_kafkap_leader_discovery_set_CurrentLeader( + &tbuf, mdi, changed_topic, + changed_partition, Partition->Partition, + &Partition->CurrentLeader); + changed_partition++; + } + changed_topic++; + } + + rko = rd_kafka_op_new(RD_KAFKA_OP_METADATA_UPDATE); + rko->rko_u.metadata.md = md; + rko->rko_u.metadata.mdi = mdi; + rd_kafka_q_enq(rkb->rkb_rk->rk_ops, rko); + } +} /** * @brief Per-partition FetchResponse parsing and handling. * * @returns an error on buffer parse failure, else RD_KAFKA_RESP_ERR_NO_ERROR. */ -static rd_kafka_resp_err_t -rd_kafka_fetch_reply_handle_partition(rd_kafka_broker_t *rkb, - const rd_kafkap_str_t *topic, - rd_kafka_topic_t *rkt /*possibly NULL*/, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - int16_t ErrorCode) { +static rd_kafka_resp_err_t rd_kafka_fetch_reply_handle_partition( + rd_kafka_broker_t *rkb, + const rd_kafkap_str_t *topic, + rd_kafka_topic_t *rkt /*possibly NULL*/, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + int16_t ErrorCode, + rd_kafkap_Fetch_reply_tags_Topic_t *TopicTags, + rd_kafkap_Fetch_reply_tags_Partition_t *PartitionTags) { const int log_decode_errors = LOG_ERR; struct rd_kafka_toppar_ver *tver, tver_skel; rd_kafka_toppar_t *rktp = NULL; @@ -375,6 +529,8 @@ rd_kafka_fetch_reply_handle_partition(rd_kafka_broker_t *rkb, rd_kafka_buf_read_i32(rkbuf, &hdr.Partition); rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode); + if (PartitionTags) + PartitionTags->Partition = hdr.Partition; if (ErrorCode) hdr.ErrorCode = ErrorCode; rd_kafka_buf_read_i64(rkbuf, &hdr.HighwaterMarkOffset); @@ -599,7 +755,16 @@ rd_kafka_fetch_reply_handle_partition(rd_kafka_broker_t *rkb, rd_kafka_aborted_txns_destroy(aborted_txns); if (likely(rktp != NULL)) rd_kafka_toppar_destroy(rktp); /*from get()*/ - rd_kafka_buf_skip_tags(rkbuf); + + if (PartitionTags) { + /* Set default LeaderId and LeaderEpoch */ + PartitionTags->CurrentLeader.LeaderId = -1; + PartitionTags->CurrentLeader.LeaderEpoch = -1; + } + rd_kafka_buf_read_tags(rkbuf, + rd_kafkap_Fetch_reply_tags_partition_parse, + TopicTags, PartitionTags); + return RD_KAFKA_RESP_ERR_NO_ERROR; } @@ -613,9 +778,11 @@ rd_kafka_fetch_reply_handle(rd_kafka_broker_t *rkb, rd_kafka_buf_t *request) { int32_t TopicArrayCnt; int i; - const int log_decode_errors = LOG_ERR; - rd_kafka_topic_t *rkt = NULL; - int16_t ErrorCode = RD_KAFKA_RESP_ERR_NO_ERROR; + const int log_decode_errors = LOG_ERR; + rd_kafka_topic_t *rkt = NULL; + int16_t ErrorCode = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_kafkap_Fetch_reply_tags_t FetchTags = RD_ZERO_INIT; + rd_bool_t has_fetch_tags = rd_false; if (rd_kafka_buf_ApiVersion(request) >= 1) { int32_t Throttle_Time; @@ -638,6 +805,12 @@ rd_kafka_fetch_reply_handle(rd_kafka_broker_t *rkb, 4 /*PartitionArrayCnt*/ + 4 + 2 + 8 + 4 /*inner header*/)); + if (rd_kafka_buf_ApiVersion(request) >= 12) { + has_fetch_tags = rd_true; + rd_kafkap_Fetch_reply_tags_set_topic_cnt(&FetchTags, + TopicArrayCnt); + } + for (i = 0; i < TopicArrayCnt; i++) { rd_kafkap_str_t topic = RD_ZERO_INIT; rd_kafka_Uuid_t topic_id = RD_KAFKA_UUID_ZERO; @@ -657,12 +830,24 @@ rd_kafka_fetch_reply_handle(rd_kafka_broker_t *rkb, rd_kafka_buf_read_arraycnt(rkbuf, &PartitionArrayCnt, RD_KAFKAP_PARTITIONS_MAX); + if (rd_kafka_buf_ApiVersion(request) >= 12) { + rd_kafkap_Fetch_reply_tags_set_topic( + &FetchTags, i, topic_id, PartitionArrayCnt); + } for (j = 0; j < PartitionArrayCnt; j++) { if (rd_kafka_fetch_reply_handle_partition( - rkb, &topic, rkt, rkbuf, request, ErrorCode)) + rkb, &topic, rkt, rkbuf, request, ErrorCode, + has_fetch_tags ? &FetchTags.Topics[i] : NULL, + has_fetch_tags + ? &FetchTags.Topics[i].Partitions[j] + : NULL)) goto err_parse; } + if (has_fetch_tags && + FetchTags.Topics[i].partitions_with_leader_change_cnt) { + FetchTags.topics_with_leader_change_cnt++; + } if (rkt) { rd_kafka_topic_destroy0(rkt); @@ -673,7 +858,8 @@ rd_kafka_fetch_reply_handle(rd_kafka_broker_t *rkb, } /* Top level tags */ - rd_kafka_buf_skip_tags(rkbuf); + rd_kafka_buf_read_tags(rkbuf, rd_kafkap_Fetch_reply_tags_parse, + &FetchTags); if (rd_kafka_buf_read_remain(rkbuf) != 0) { rd_kafka_buf_parse_fail(rkbuf, @@ -682,12 +868,15 @@ rd_kafka_fetch_reply_handle(rd_kafka_broker_t *rkb, rd_kafka_buf_read_remain(rkbuf)); RD_NOTREACHED(); } + rd_kafka_handle_Fetch_metadata_update(rkb, &FetchTags); + rd_kafkap_Fetch_reply_tags_destroy(&FetchTags); return 0; err_parse: if (rkt) rd_kafka_topic_destroy0(rkt); + rd_kafkap_Fetch_reply_tags_destroy(&FetchTags); rd_rkb_dbg(rkb, MSG, "BADMSG", "Bad message (Fetch v%d): " "is broker.version.fallback incorrectly set?", @@ -789,7 +978,7 @@ int rd_kafka_broker_fetch_toppars(rd_kafka_broker_t *rkb, rd_ts_t now) { return 0; ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb, RD_KAFKAP_Fetch, - 0, 15, NULL); + 0, 16, NULL); rkbuf = rd_kafka_buf_new_flexver_request( rkb, RD_KAFKAP_Fetch, 1, /* MaxWaitTime+MinBytes+MaxBytes+IsolationLevel+ diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index 7e9c90376d..f6419fe97d 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -1971,3 +1971,154 @@ rd_kafka_metadata_new_topic_with_partition_replicas_mock(int replication_factor, return rd_kafka_metadata_new_topic_mock( topics, topic_cnt, replication_factor, num_brokers); } + +/** + * @brief Handle update of metadata received in the produce or fetch tags. + * + * @param rk Client instance. + * @param rko Metadata update operation. + * + * @locality main thread + * @locks none + * + * @return always RD_KAFKA_OP_RES_HANDLED + */ +rd_kafka_op_res_t +rd_kafka_metadata_update_op(rd_kafka_t *rk, rd_kafka_metadata_internal_t *mdi) { + int i, j; + rd_kafka_metadata_t *md = &mdi->metadata; + rd_bool_t cache_updated = rd_false; + rd_kafka_secproto_t rkb_proto = rk->rk_conf.security_protocol; + + + for (i = 0; i < md->broker_cnt; i++) { + rd_kafka_broker_update(rk, rkb_proto, &md->brokers[i], NULL); + } + + for (i = 0; i < md->topic_cnt; i++) { + struct rd_kafka_metadata_cache_entry *rkmce; + int32_t partition_cache_changes = 0; + rd_bool_t by_id = + !RD_KAFKA_UUID_IS_ZERO(mdi->topics[i].topic_id); + rd_kafka_Uuid_t topic_id = RD_KAFKA_UUID_ZERO; + char *topic = NULL; + + if (by_id) { + rkmce = rd_kafka_metadata_cache_find_by_id( + rk, mdi->topics[i].topic_id, 1); + topic_id = mdi->topics[i].topic_id; + } else { + rkmce = rd_kafka_metadata_cache_find( + rk, md->topics[i].topic, 1); + topic = md->topics[i].topic; + } + + if (!rkmce) { + if (by_id) { + rd_kafka_log( + rk, LOG_WARNING, "METADATAUPDATE", + "Topic id %s not found in cache", + rd_kafka_Uuid_base64str(&topic_id)); + } else { + rd_kafka_log(rk, LOG_WARNING, "METADATAUPDATE", + "Topic %s not found in cache", + topic); + } + continue; + } + topic = rkmce->rkmce_mtopic.topic; + topic_id = rkmce->rkmce_metadata_internal_topic.topic_id; + + for (j = 0; j < md->topics[i].partition_cnt; j++) { + rd_kafka_broker_t *rkb; + rd_kafka_metadata_partition_t *mdp = + &md->topics[i].partitions[j]; + ; + rd_kafka_metadata_partition_internal_t *mdpi = + &mdi->topics[i].partitions[j]; + int32_t part = mdp->id, current_leader_epoch; + + if (part >= rkmce->rkmce_mtopic.partition_cnt) { + rd_kafka_log(rk, LOG_WARNING, "METADATAUPDATE", + "Partition %s(%s)[%" PRId32 + "]: not found " + "in cache", + topic, + rd_kafka_Uuid_base64str(&topic_id), + part); + + continue; + } + + rkb = rd_kafka_broker_find_by_nodeid(rk, mdp->leader); + if (!rkb) { + rd_kafka_log(rk, LOG_WARNING, "METADATAUPDATE", + "Partition %s(%s)[%" PRId32 + "]: new leader" + "%" PRId32 " not found in cache", + topic, + rd_kafka_Uuid_base64str(&topic_id), + part, mdp->leader); + continue; + } + + current_leader_epoch = + rkmce->rkmce_metadata_internal_topic + .partitions[part] + .leader_epoch; + + if (current_leader_epoch >= mdpi->leader_epoch) { + rd_kafka_broker_destroy(rkb); + rd_kafka_dbg( + rk, METADATA, "METADATAUPDATE", + "Partition %s(%s)[%" PRId32 + "]: leader epoch " + "is " + "not newer %" PRId32 " >= %" PRId32, + topic, rd_kafka_Uuid_base64str(&topic_id), + part, current_leader_epoch, + mdpi->leader_epoch); + continue; + } + partition_cache_changes++; + + /* Need to acquire the write lock to avoid dirty reads + * from other threads acquiring read locks. */ + rd_kafka_wrlock(rk); + rkmce->rkmce_metadata_internal_topic.partitions[part] + .leader_epoch = mdpi->leader_epoch; + rkmce->rkmce_mtopic.partitions[part].leader = + mdp->leader; + rd_kafka_wrunlock(rk); + rd_kafka_broker_destroy(rkb); + + rd_kafka_dbg(rk, METADATA, "METADATAUPDATE", + "Partition %s(%s)[%" PRId32 + "]: " + " updated with leader %" PRId32 + " and epoch %" PRId32, + topic, rd_kafka_Uuid_base64str(&topic_id), + part, mdp->leader, mdpi->leader_epoch); + } + + if (partition_cache_changes > 0) { + cache_updated = rd_true; + rd_kafka_topic_metadata_update2( + rk->rk_internal_rkb, &rkmce->rkmce_mtopic, + &rkmce->rkmce_metadata_internal_topic); + } + } + + if (!cache_updated) { + rd_kafka_dbg(rk, METADATA, "METADATAUPDATE", + "Cache was not updated"); + return RD_KAFKA_OP_RES_HANDLED; + } + + rd_kafka_dbg(rk, METADATA, "METADATAUPDATE", + "Metadata cache updated, propagating changes"); + rd_kafka_metadata_cache_propagate_changes(rk); + rd_kafka_metadata_cache_expiry_start(rk); + + return RD_KAFKA_OP_RES_HANDLED; +} diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index b0926845ef..9486a0050a 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -335,5 +335,7 @@ void rd_kafka_metadata_cache_wait_state_change_async( rd_kafka_t *rk, rd_kafka_enq_once_t *eonce); +rd_kafka_op_res_t +rd_kafka_metadata_update_op(rd_kafka_t *rk, rd_kafka_metadata_internal_t *mdi); /**@}*/ #endif /* _RDKAFKA_METADATA_H_ */ diff --git a/src/rdkafka_msgset_writer.c b/src/rdkafka_msgset_writer.c index b2243ed3c2..fbe16a3240 100644 --- a/src/rdkafka_msgset_writer.c +++ b/src/rdkafka_msgset_writer.c @@ -45,7 +45,7 @@ /** @brief The maxium ProduceRequestion ApiVersion supported by librdkafka */ -static const int16_t rd_kafka_ProduceRequest_max_version = 9; +static const int16_t rd_kafka_ProduceRequest_max_version = 10; typedef struct rd_kafka_msgset_writer_s { @@ -267,6 +267,7 @@ static void rd_kafka_msgset_writer_alloc_buf(rd_kafka_msgset_writer_t *msetw) { * ProduceRequest header sizes */ switch (msetw->msetw_ApiVersion) { + case 10: case 9: case 8: case 7: diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index 2fe3a4ac51..0955f9175c 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -116,7 +116,8 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) { "REPLY:ALTERUSERSCRAMCREDENTIALS", [RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS] = "REPLY:DESCRIBEUSERSCRAMCREDENTIALS", - [RD_KAFKA_OP_LISTOFFSETS] = "REPLY:LISTOFFSETS", + [RD_KAFKA_OP_LISTOFFSETS] = "REPLY:LISTOFFSETS", + [RD_KAFKA_OP_METADATA_UPDATE] = "REPLY:METADATA_UPDATE", }; if (type & RD_KAFKA_OP_REPLY) @@ -275,7 +276,8 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) { sizeof(rko->rko_u.admin_request), [RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS] = sizeof(rko->rko_u.admin_request), - [RD_KAFKA_OP_LISTOFFSETS] = sizeof(rko->rko_u.admin_request), + [RD_KAFKA_OP_LISTOFFSETS] = sizeof(rko->rko_u.admin_request), + [RD_KAFKA_OP_METADATA_UPDATE] = sizeof(rko->rko_u.metadata), }; size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK]; @@ -473,6 +475,12 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { rd_kafka_topic_partition_list_destroy); break; + case RD_KAFKA_OP_METADATA_UPDATE: + RD_IF_FREE(rko->rko_u.metadata.md, rd_kafka_metadata_destroy); + /* It's not needed to free metadata.mdi because they + are the in the same memory allocation. */ + break; + default: break; } diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 8337586d58..135c77e058 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -179,7 +179,8 @@ typedef enum { RD_KAFKA_OP_ALTERUSERSCRAMCREDENTIALS, /* < Admin: AlterUserScramCredentials u.admin_request >*/ - RD_KAFKA_OP_LISTOFFSETS, /**< Admin: ListOffsets u.admin_request >*/ + RD_KAFKA_OP_LISTOFFSETS, /**< Admin: ListOffsets u.admin_request >*/ + RD_KAFKA_OP_METADATA_UPDATE, /**< Metadata update (KIP 951) **/ RD_KAFKA_OP__END } rd_kafka_op_type_t; @@ -272,6 +273,7 @@ struct rd_kafka_admin_fanout_worker_cbs; #define RD_KAFKA_OP_TYPE_ASSERT(rko, type) \ rd_assert(((rko)->rko_type & ~RD_KAFKA_OP_FLAGMASK) == (type)) + struct rd_kafka_op_s { TAILQ_ENTRY(rd_kafka_op_s) rko_link; diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 319ebe9aaa..6642017827 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -3328,6 +3328,264 @@ void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb, rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque); } +/** + * @name Leader discovery (KIP-951) + * @{ + */ + +void rd_kafkap_leader_discovery_tmpabuf_add_alloc_brokers( + rd_tmpabuf_t *tbuf, + rd_kafkap_NodeEndpoints_t *NodeEndpoints) { + int i; + size_t md_brokers_size = + NodeEndpoints->NodeEndpointCnt * sizeof(rd_kafka_metadata_broker_t); + size_t mdi_brokers_size = NodeEndpoints->NodeEndpointCnt * + sizeof(rd_kafka_metadata_broker_internal_t); + rd_tmpabuf_add_alloc_times(tbuf, md_brokers_size, 2); + rd_tmpabuf_add_alloc(tbuf, mdi_brokers_size); + for (i = 0; i < NodeEndpoints->NodeEndpointCnt; i++) { + size_t HostSize = + RD_KAFKAP_STR_LEN(&NodeEndpoints->NodeEndpoints[i].Host) + + 1; + rd_tmpabuf_add_alloc(tbuf, HostSize); + } +} + +void rd_kafkap_leader_discovery_tmpabuf_add_alloc_topics(rd_tmpabuf_t *tbuf, + int topic_cnt) { + rd_tmpabuf_add_alloc(tbuf, + sizeof(rd_kafka_metadata_topic_t) * topic_cnt); + rd_tmpabuf_add_alloc(tbuf, sizeof(rd_kafka_metadata_topic_internal_t) * + topic_cnt); +} + +void rd_kafkap_leader_discovery_tmpabuf_add_alloc_topic(rd_tmpabuf_t *tbuf, + char *topic_name, + int32_t partition_cnt) { + if (topic_name) { + rd_tmpabuf_add_alloc(tbuf, strlen(topic_name) + 1); + } + rd_tmpabuf_add_alloc(tbuf, sizeof(rd_kafka_metadata_partition_t) * + partition_cnt); + rd_tmpabuf_add_alloc(tbuf, + sizeof(rd_kafka_metadata_partition_internal_t) * + partition_cnt); +} + +void rd_kafkap_leader_discovery_metadata_init(rd_kafka_metadata_internal_t *mdi, + int32_t broker_id) { + memset(mdi, 0, sizeof(*mdi)); + mdi->metadata.orig_broker_id = broker_id; + mdi->controller_id = -1; + mdi->cluster_authorized_operations = -1; +} + +void rd_kafkap_leader_discovery_set_brokers( + rd_tmpabuf_t *tbuf, + rd_kafka_metadata_internal_t *mdi, + rd_kafkap_NodeEndpoints_t *NodeEndpoints) { + int i; + rd_kafka_metadata_t *md = &mdi->metadata; + + size_t md_brokers_size = + NodeEndpoints->NodeEndpointCnt * sizeof(rd_kafka_metadata_broker_t); + size_t mdi_brokers_size = NodeEndpoints->NodeEndpointCnt * + sizeof(rd_kafka_metadata_broker_internal_t); + + md->broker_cnt = NodeEndpoints->NodeEndpointCnt; + md->brokers = rd_tmpabuf_alloc(tbuf, md_brokers_size); + mdi->brokers_sorted = rd_tmpabuf_alloc(tbuf, md_brokers_size); + mdi->brokers = rd_tmpabuf_alloc(tbuf, mdi_brokers_size); + + for (i = 0; i < NodeEndpoints->NodeEndpointCnt; i++) { + rd_kafkap_NodeEndpoint_t *NodeEndpoint = + &NodeEndpoints->NodeEndpoints[i]; + rd_kafka_metadata_broker_t *mdb = &md->brokers[i]; + rd_kafka_metadata_broker_internal_t *mdbi = &mdi->brokers[i]; + mdb->id = NodeEndpoint->NodeId; + mdb->host = NULL; + if (!RD_KAFKAP_STR_IS_NULL(&NodeEndpoint->Host)) { + mdb->host = rd_tmpabuf_alloc( + tbuf, RD_KAFKAP_STR_LEN(&NodeEndpoint->Host) + 1); + rd_snprintf(mdb->host, + RD_KAFKAP_STR_LEN(&NodeEndpoint->Host) + 1, + "%.*s", + RD_KAFKAP_STR_PR(&NodeEndpoint->Host)); + } + mdb->port = NodeEndpoints->NodeEndpoints[i].Port; + + /* Metadata internal fields */ + mdbi->id = mdb->id; + mdbi->rack_id = NULL; + } + + qsort(mdi->brokers, md->broker_cnt, sizeof(mdi->brokers[0]), + rd_kafka_metadata_broker_internal_cmp); + memcpy(mdi->brokers_sorted, md->brokers, + sizeof(*mdi->brokers_sorted) * md->broker_cnt); + qsort(mdi->brokers_sorted, md->broker_cnt, sizeof(*mdi->brokers_sorted), + rd_kafka_metadata_broker_cmp); +} + +void rd_kafkap_leader_discovery_set_topic_cnt(rd_tmpabuf_t *tbuf, + rd_kafka_metadata_internal_t *mdi, + int topic_cnt) { + + rd_kafka_metadata_t *md = &mdi->metadata; + + md->topic_cnt = topic_cnt; + md->topics = rd_tmpabuf_alloc(tbuf, sizeof(*md->topics) * topic_cnt); + mdi->topics = rd_tmpabuf_alloc(tbuf, sizeof(*mdi->topics) * topic_cnt); +} + +void rd_kafkap_leader_discovery_set_topic(rd_tmpabuf_t *tbuf, + rd_kafka_metadata_internal_t *mdi, + int topic_idx, + rd_kafka_Uuid_t topic_id, + char *topic_name, + int partition_cnt) { + + rd_kafka_metadata_t *md = &mdi->metadata; + rd_kafka_metadata_topic_t *mdt = &md->topics[topic_idx]; + rd_kafka_metadata_topic_internal_t *mdti = &mdi->topics[topic_idx]; + + memset(mdt, 0, sizeof(*mdt)); + mdt->topic = + topic_name ? rd_tmpabuf_alloc(tbuf, strlen(topic_name) + 1) : NULL; + mdt->partition_cnt = partition_cnt; + mdt->partitions = + rd_tmpabuf_alloc(tbuf, sizeof(*mdt->partitions) * partition_cnt); + + if (topic_name) + rd_snprintf(mdt->topic, strlen(topic_name) + 1, "%s", + topic_name); + + memset(mdti, 0, sizeof(*mdti)); + mdti->partitions = + rd_tmpabuf_alloc(tbuf, sizeof(*mdti->partitions) * partition_cnt); + mdti->topic_id = topic_id; + mdti->topic_authorized_operations = -1; +} + +void rd_kafkap_leader_discovery_set_CurrentLeader( + rd_tmpabuf_t *tbuf, + rd_kafka_metadata_internal_t *mdi, + int topic_idx, + int partition_idx, + int32_t partition_id, + rd_kafkap_CurrentLeader_t *CurrentLeader) { + + rd_kafka_metadata_t *md = &mdi->metadata; + rd_kafka_metadata_partition_t *mdp = + &md->topics[topic_idx].partitions[partition_idx]; + rd_kafka_metadata_partition_internal_t *mdpi = + &mdi->topics[topic_idx].partitions[partition_idx]; + + memset(mdp, 0, sizeof(*mdp)); + mdp->id = partition_id; + mdp->leader = CurrentLeader->LeaderId, + + memset(mdpi, 0, sizeof(*mdpi)); + mdpi->id = partition_id; + mdpi->leader_epoch = CurrentLeader->LeaderEpoch; +} +/**@}*/ + +static int rd_kafkap_Produce_reply_tags_partition_parse( + rd_kafka_buf_t *rkbuf, + uint64_t tagtype, + uint64_t taglen, + rd_kafkap_Produce_reply_tags_t *ProduceTags, + rd_kafkap_Produce_reply_tags_Partition_t *PartitionTags) { + switch (tagtype) { + case 0: /* CurrentLeader */ + if (rd_kafka_buf_read_CurrentLeader( + rkbuf, &PartitionTags->CurrentLeader) == -1) + goto err_parse; + ProduceTags->leader_change_cnt++; + return 1; + default: + return 0; + } +err_parse: + return -1; +} + +static int +rd_kafkap_Produce_reply_tags_parse(rd_kafka_buf_t *rkbuf, + uint64_t tagtype, + uint64_t taglen, + rd_kafkap_Produce_reply_tags_t *tags) { + switch (tagtype) { + case 0: /* NodeEndpoints */ + if (rd_kafka_buf_read_NodeEndpoints(rkbuf, + &tags->NodeEndpoints) == -1) + goto err_parse; + return 1; + default: + return 0; + } +err_parse: + return -1; +} + +static void rd_kafka_handle_Produce_metadata_update( + rd_kafka_broker_t *rkb, + rd_kafkap_Produce_reply_tags_t *ProduceTags) { + if (ProduceTags->leader_change_cnt) { + rd_kafka_metadata_t *md = NULL; + rd_kafka_metadata_internal_t *mdi = NULL; + rd_kafkap_Produce_reply_tags_Partition_t *Partition; + rd_tmpabuf_t tbuf; + int32_t nodeid; + rd_kafka_op_t *rko; + + rd_kafka_broker_lock(rkb); + nodeid = rkb->rkb_nodeid; + rd_kafka_broker_unlock(rkb); + + rd_tmpabuf_new(&tbuf, 0, rd_true /*assert on fail*/); + rd_tmpabuf_add_alloc(&tbuf, sizeof(*mdi)); + rd_kafkap_leader_discovery_tmpabuf_add_alloc_brokers( + &tbuf, &ProduceTags->NodeEndpoints); + rd_kafkap_leader_discovery_tmpabuf_add_alloc_topics(&tbuf, 1); + rd_kafkap_leader_discovery_tmpabuf_add_alloc_topic( + &tbuf, ProduceTags->Topic.TopicName, 1); + rd_tmpabuf_finalize(&tbuf); + + mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi)); + md = &mdi->metadata; + + rd_kafkap_leader_discovery_metadata_init(mdi, nodeid); + + rd_kafkap_leader_discovery_set_brokers( + &tbuf, mdi, &ProduceTags->NodeEndpoints); + + rd_kafkap_leader_discovery_set_topic_cnt(&tbuf, mdi, 1); + + rd_kafkap_leader_discovery_set_topic( + &tbuf, mdi, 0, RD_KAFKA_UUID_ZERO, + ProduceTags->Topic.TopicName, 1); + + Partition = &ProduceTags->Topic.Partition; + rd_kafkap_leader_discovery_set_CurrentLeader( + &tbuf, mdi, 0, 0, Partition->Partition, + &Partition->CurrentLeader); + + rko = rd_kafka_op_new(RD_KAFKA_OP_METADATA_UPDATE); + rko->rko_u.metadata.md = md; + rko->rko_u.metadata.mdi = mdi; + rd_kafka_q_enq(rkb->rkb_rk->rk_ops, rko); + } +} + +static void rd_kafkap_Produce_reply_tags_destroy( + rd_kafkap_Produce_reply_tags_t *reply_tags) { + RD_IF_FREE(reply_tags->Topic.TopicName, rd_free); + RD_IF_FREE(reply_tags->NodeEndpoints.NodeEndpoints, rd_free); +} + + /** * @brief Parses a Produce reply. * @returns 0 on success or an error code on failure. @@ -3346,8 +3604,10 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, int16_t ErrorCode; int64_t Offset; } hdr; - const int log_decode_errors = LOG_ERR; - int64_t log_start_offset = -1; + const int log_decode_errors = LOG_ERR; + int64_t log_start_offset = -1; + rd_kafkap_str_t TopicName = RD_ZERO_INIT; + rd_kafkap_Produce_reply_tags_t ProduceTags = RD_ZERO_INIT; rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX); if (TopicArrayCnt != 1) @@ -3357,7 +3617,10 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, * request we assume that the reply only contains one topic+partition * and that it is the same that we requested. * If not the broker is buggy. */ - rd_kafka_buf_skip_str(rkbuf); + if (request->rkbuf_reqhdr.ApiVersion >= 10) + rd_kafka_buf_read_str(rkbuf, &TopicName); + else + rd_kafka_buf_skip_str(rkbuf); rd_kafka_buf_read_arraycnt(rkbuf, &PartitionArrayCnt, RD_KAFKAP_PARTITIONS_MAX); @@ -3409,8 +3672,24 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, result->errstr = RD_KAFKAP_STR_DUP(&ErrorMessage); } + if (request->rkbuf_reqhdr.ApiVersion >= 10) { + rd_kafkap_Produce_reply_tags_Topic_t *TopicTags = + &ProduceTags.Topic; + ; + rd_kafkap_Produce_reply_tags_Partition_t *PartitionTags = + &TopicTags->Partition; + ; + + /* Partition tags count */ + TopicTags->TopicName = RD_KAFKAP_STR_DUP(&TopicName); + PartitionTags->Partition = hdr.Partition; + } + /* Partition tags */ - rd_kafka_buf_skip_tags(rkbuf); + rd_kafka_buf_read_tags(rkbuf, + rd_kafkap_Produce_reply_tags_partition_parse, + &ProduceTags, &ProduceTags.Topic.Partition); + /* Topic tags */ rd_kafka_buf_skip_tags(rkbuf); @@ -3422,12 +3701,19 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, Throttle_Time); } + /* ProduceResponse tags */ + rd_kafka_buf_read_tags(rkbuf, rd_kafkap_Produce_reply_tags_parse, + &ProduceTags); - return hdr.ErrorCode; + rd_kafka_handle_Produce_metadata_update(rkb, &ProduceTags); + rd_kafkap_Produce_reply_tags_destroy(&ProduceTags); + return hdr.ErrorCode; err_parse: + rd_kafkap_Produce_reply_tags_destroy(&ProduceTags); return rkbuf->rkbuf_err; err: + rd_kafkap_Produce_reply_tags_destroy(&ProduceTags); return RD_KAFKA_RESP_ERR__BAD_MSG; } diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index 5747900075..4da4979816 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -109,6 +109,57 @@ typedef struct rd_kafkap_NodeEndpoints_s { /**@}*/ +/** + * @name Produce tags + * @{ + * + */ + +typedef struct rd_kafkap_Produce_reply_tags_Partition_s { + int32_t Partition; + rd_kafkap_CurrentLeader_t CurrentLeader; +} rd_kafkap_Produce_reply_tags_Partition_t; + +typedef struct rd_kafkap_Produce_reply_tags_Topic_s { + char *TopicName; + rd_kafkap_Produce_reply_tags_Partition_t Partition; +} rd_kafkap_Produce_reply_tags_Topic_t; + +typedef struct rd_kafkap_Produce_reply_tags_s { + int32_t leader_change_cnt; + rd_kafkap_NodeEndpoints_t NodeEndpoints; + rd_kafkap_Produce_reply_tags_Topic_t Topic; +} rd_kafkap_Produce_reply_tags_t; + +/**@}*/ + +/** + * @name Fetch tags + * @{ + * + */ + +typedef struct rd_kafkap_Fetch_reply_tags_Partition_s { + int32_t Partition; + rd_kafkap_CurrentLeader_t CurrentLeader; +} rd_kafkap_Fetch_reply_tags_Partition_t; + +typedef struct rd_kafkap_Fetch_reply_tags_Topic_s { + rd_kafka_Uuid_t TopicId; + int32_t PartitionCnt; + rd_kafkap_Fetch_reply_tags_Partition_t *Partitions; + int32_t partitions_with_leader_change_cnt; +} rd_kafkap_Fetch_reply_tags_Topic_t; + +typedef struct rd_kafkap_Fetch_reply_tags_s { + rd_kafkap_NodeEndpoints_t NodeEndpoints; + int32_t TopicCnt; + rd_kafkap_Fetch_reply_tags_Topic_t *Topics; + int32_t topics_with_leader_change_cnt; +} rd_kafkap_Fetch_reply_tags_t; + +/**@}*/ + rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions( rd_kafka_buf_t *rkbuf, rd_bool_t use_topic_id, @@ -568,5 +619,42 @@ rd_kafka_DeleteAclsRequest(rd_kafka_broker_t *rkb, rd_kafka_resp_cb_t *resp_cb, void *opaque); +void rd_kafkap_leader_discovery_tmpabuf_add_alloc_brokers( + rd_tmpabuf_t *tbuf, + rd_kafkap_NodeEndpoints_t *NodeEndpoints); + +void rd_kafkap_leader_discovery_tmpabuf_add_alloc_topics(rd_tmpabuf_t *tbuf, + int topic_cnt); + +void rd_kafkap_leader_discovery_tmpabuf_add_alloc_topic(rd_tmpabuf_t *tbuf, + char *topic_name, + int32_t partition_cnt); + +void rd_kafkap_leader_discovery_metadata_init(rd_kafka_metadata_internal_t *mdi, + int32_t broker_id); + +void rd_kafkap_leader_discovery_set_brokers( + rd_tmpabuf_t *tbuf, + rd_kafka_metadata_internal_t *mdi, + rd_kafkap_NodeEndpoints_t *NodeEndpoints); + +void rd_kafkap_leader_discovery_set_topic_cnt(rd_tmpabuf_t *tbuf, + rd_kafka_metadata_internal_t *mdi, + int topic_cnt); + +void rd_kafkap_leader_discovery_set_topic(rd_tmpabuf_t *tbuf, + rd_kafka_metadata_internal_t *mdi, + int topic_idx, + rd_kafka_Uuid_t topic_id, + char *topic_name, + int partition_cnt); + +void rd_kafkap_leader_discovery_set_CurrentLeader( + rd_tmpabuf_t *tbuf, + rd_kafka_metadata_internal_t *mdi, + int topic_idx, + int partition_idx, + int32_t partition_id, + rd_kafkap_CurrentLeader_t *CurrentLeader); #endif /* _RDKAFKA_REQUEST_H_ */ diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index 38e6c7ee8c..db1ca2d390 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -1337,7 +1337,8 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, if (mdt->err == RD_KAFKA_RESP_ERR_NO_ERROR) { upd += rd_kafka_topic_partition_cnt_update(rkt, mdt->partition_cnt); - if (rd_kafka_Uuid_cmp(mdit->topic_id, RD_KAFKA_UUID_ZERO)) { + if (rd_kafka_Uuid_cmp(mdit->topic_id, RD_KAFKA_UUID_ZERO) && + rd_kafka_Uuid_cmp(mdit->topic_id, rkt->rkt_topic_id)) { /* FIXME: an offset reset must be triggered. * when rkt_topic_id wasn't zero. * There are no problems @@ -1346,12 +1347,12 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, * causing an out of range and an offset reset, * but the rarer case where they're higher needs * to be checked. */ - rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, - "METADATA", - "Topic %s changed id from %s to %s", - rkt->rkt_topic->str, - rd_kafka_Uuid_str(&rkt->rkt_topic_id), - rd_kafka_Uuid_str(&mdit->topic_id)); + rd_kafka_dbg( + rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA", + "Topic %s changed id from %s to %s", + rkt->rkt_topic->str, + rd_kafka_Uuid_base64str(&rkt->rkt_topic_id), + rd_kafka_Uuid_base64str(&mdit->topic_id)); rkt->rkt_topic_id = mdit->topic_id; } /* If the metadata times out for a topic (because all brokers