-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KIP 951 - All phases - cherry picked #4756
Conversation
89ef329
to
98315bc
Compare
Please add this fix too to the PR. if (rd_kafka_Uuid_cmp(mdit->topic_id, RD_KAFKA_UUID_ZERO) &&
rd_kafka_Uuid_cmp(mdit->topic_id, rkt->rkt_topic_id)) {
/* FIXME: an offset reset must be triggered.
* when rkt_topic_id wasn't zero.
* There are no problems
* in test 0107_topic_recreate if offsets in new
* topic are lower than in previous one,
* causing an out of range and an offset reset,
* but the rarer case where they're higher needs
* to be checked. */
rd_kafka_dbg(
rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA",
"Topic %s changed id from %s to %s",
rkt->rkt_topic->str,
rd_kafka_Uuid_base64str(&rkt->rkt_topic_id),
rd_kafka_Uuid_base64str(&mdit->topic_id));
rkt->rkt_topic_id = mdit->topic_id;
} |
Please rebase with dependency upgrade commit merged |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First comments about the code structure and naming
1902d0c
to
680af32
Compare
680af32
to
0dfafc7
Compare
src/rdkafka.c
Outdated
@@ -4078,6 +4078,10 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk, | |||
rd_kafka_purge(rk, rko->rko_u.purge.flags); | |||
break; | |||
|
|||
case RD_KAFKA_OP_METADATA_UPDATE: | |||
rd_kafka_metadata_update_op(rk, rko->rko_u.metadata.mdi); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RD_KAFKA_OP_RES_HANDLED must be returned otherwise it's not destroyed. The decision on the fields to use should be done by the handling code
rd_kafka_metadata_update_op(rk, rko->rko_u.metadata.mdi); | |
res = rd_kafka_metadata_update_op(rk, rko); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm returning RD_KAFKA_OP_RES_HANDLED
from rd_kafka_metadata_update_op
not sure if that's correct
src/rdkafka_request.h
Outdated
* | ||
*/ | ||
|
||
typedef struct rd_kafkap_Fetch_reply_Partition_s { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typedef struct rd_kafkap_Fetch_reply_Partition_s { | |
typedef struct rd_kafkap_Fetch_reply_Partition_s { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to rd_kafkap_Fetch_reply_tags_Partition_s
rkmce->rkmce_metadata_internal_topic.partitions[mdp->id] | ||
.leader_epoch = mdp_int->leader_epoch; | ||
rkmce->rkmce_mtopic.partitions[mdp->id].leader = | ||
mdp->leader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mdp->leader; | |
mdp->leader; | |
rd_kafka_wrunlock(rk); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a
rd_kafka_broker_destroy(rkb);
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated and added rd_kafka_broker_destroy(rkb);
after unlock
&rkmce->rkmce_metadata_internal_topic, rd_true, | ||
rd_true, mdi->brokers, mdi->metadata.broker_cnt, | ||
rd_true); | ||
rd_kafka_topic_metadata_update2( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be done once per topic, so you can put it outside the for loop you can have a variable
partition_cache_changes
that is incremented for each partition and use it in an if outside the partition loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There the rkb is necessary just for logging purposes, you can use rd_kafka_broker_t *rkb = rk->rk_internal_rkb;
in the if after the partition loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a counter partition_cache_changes
which is incremented for each partition cache change, and if the counter> 0 then the rd_kafka_topic_metadata_update2
is called.
rkmce->rkmce_metadata_internal_topic.partitions[mdp->id] | ||
.leader_epoch = mdp_int->leader_epoch; | ||
rkmce->rkmce_mtopic.partitions[mdp->id].leader = | ||
mdp->leader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a
rd_kafka_broker_destroy(rkb);
here
src/rdkafka_request.c
Outdated
rd_kafka_buf_skip_tags(rkbuf); | ||
/* Topic tags */ | ||
rd_kafka_buf_skip_tags(rkbuf); | ||
if (request->rkbuf_reqhdr.ApiVersion >= 10 && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it should just populate the topic tags, then the message isn't sent depending on the error code but whenever we have these tags
if (request->rkbuf_reqhdr.ApiVersion >= 10 && | |
if (request->rkbuf_reqhdr.ApiVersion >= 10) { | |
rd_kafkap_Produce_reply_tags_Topic_t *TopicTags = | |
&ProduceTags.Topic; | |
rd_kafkap_Produce_reply_tags_Partition_t *PartitionTags = | |
&TopicTags->Partition; | |
/* Partition tags count */ | |
TopicTags->TopicName = RD_KAFKAP_STR_DUP(&TopicName); | |
PartitionTags->Partition = hdr.Partition; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All that comes after must be simpler and without these conditions, this way:
/* Partition tags */
rd_kafka_buf_read_tags(rkbuf,
rd_kafkap_Produce_reply_tags_partition_parse,
&ProduceTags, &ProduceTags.Topic.Partition);
/* Topic tags */
rd_kafka_buf_skip_tags(rkbuf);
if (request->rkbuf_reqhdr.ApiVersion >= 1) {
int32_t Throttle_Time;
rd_kafka_buf_read_i32(rkbuf, &Throttle_Time);
rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep,
Throttle_Time);
}
/* ProduceResponse tags */
rd_kafka_buf_read_tags(rkbuf, rd_kafkap_Produce_reply_tags_parse,
&ProduceTags);
rd_kafka_handle_Produce_metadata_update(rkb, &ProduceTags);
rd_kafkap_Produce_reply_tags_destroy(&ProduceTags);
return hdr.ErrorCode;
err_parse:
rd_kafkap_Produce_reply_tags_destroy(&ProduceTags);
return rkbuf->rkbuf_err;
err:
rd_kafkap_Produce_reply_tags_destroy(&ProduceTags);
return RD_KAFKA_RESP_ERR__BAD_MSG;
It's the rd_kafka_handle_Produce_metadata_update
that sends an update op if the ProduceTags->leader_change_cnt
is non zero.
rd_kafkap_Produce_reply_tags_destroy
instead frees the TopicName
and NodeEndpoints.NodeEndpoints
if not NULL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Would do the similar for fetch
40faf87
to
d907d5d
Compare
7f0b55e
to
f201885
Compare
c138f96
to
1018042
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments about creation of the metadata update operation
89fddc4
to
2a5d9eb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes about the fetch tags
dfc6b36
to
ab12eab
Compare
ab12eab
to
0c72d60
Compare
ef5c5ba
to
3f5aa0c
Compare
525b9d5
to
40e2510
Compare
ae5378d
to
f17e85a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for the changes
No description provided.