Skip to content

Commit

Permalink
PR feedback 10th round
Browse files Browse the repository at this point in the history
  • Loading branch information
anchitj committed Jun 17, 2024
1 parent 40e2510 commit f17e85a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 15 deletions.
8 changes: 1 addition & 7 deletions src/rdkafka_fetcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -709,16 +709,10 @@ static rd_kafka_resp_err_t rd_kafka_fetch_reply_handle_partition(

if (unlikely(hdr.ErrorCode != RD_KAFKA_RESP_ERR_NO_ERROR)) {
/* Handle partition-level errors. */
rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize);

if (rd_kafka_buf_ApiVersion(request) >= 16)
rd_kafka_buf_read_tags(
rkbuf, rd_kafkap_Fetch_reply_tags_partition_parse,
TopicTags, PartitionTags);

rd_kafka_fetch_reply_handle_partition_error(
rkb, rktp, tver, hdr.ErrorCode, hdr.HighwaterMarkOffset);

rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize);
goto done;
}

Expand Down
23 changes: 15 additions & 8 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -3604,12 +3604,10 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb,
int16_t ErrorCode;
int64_t Offset;
} hdr;
const int log_decode_errors = LOG_ERR;
int64_t log_start_offset = -1;
rd_kafkap_str_t TopicName = RD_ZERO_INIT;
rd_kafkap_Produce_reply_tags_Partition_t *PartitionTags = NULL;
rd_kafkap_Produce_reply_tags_Topic_t *TopicTags = NULL;
rd_kafkap_Produce_reply_tags_t ProduceTags = RD_ZERO_INIT;
const int log_decode_errors = LOG_ERR;
int64_t log_start_offset = -1;
rd_kafkap_str_t TopicName = RD_ZERO_INIT;
rd_kafkap_Produce_reply_tags_t ProduceTags = RD_ZERO_INIT;

rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX);
if (TopicArrayCnt != 1)
Expand Down Expand Up @@ -3675,13 +3673,22 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb,
}

if (request->rkbuf_reqhdr.ApiVersion >= 10) {
TopicTags = &ProduceTags.Topic;
PartitionTags = &TopicTags->Partition;
rd_kafkap_Produce_reply_tags_Topic_t *TopicTags =
&ProduceTags.Topic;
;
rd_kafkap_Produce_reply_tags_Partition_t *PartitionTags =
&TopicTags->Partition;
;

/* Partition tags count */
TopicTags->TopicName = RD_KAFKAP_STR_DUP(&TopicName);
PartitionTags->Partition = hdr.Partition;
}

/* Partition tags */
rd_kafka_buf_read_tags(rkbuf,
rd_kafkap_Produce_reply_tags_partition_parse,
&ProduceTags, &ProduceTags.Topic.Partition);

/* Topic tags */
rd_kafka_buf_skip_tags(rkbuf);
Expand Down

0 comments on commit f17e85a

Please sign in to comment.