Skip to content

Commit

Permalink
Add updated tests
Browse files Browse the repository at this point in the history
  • Loading branch information
anchitj committed Jun 14, 2024
1 parent 194ee92 commit 89ef329
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 75 deletions.
78 changes: 50 additions & 28 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -3789,7 +3789,8 @@ static void rd_kafka_get_offsets_for_times_resp_cb(rd_kafka_t *rk,
state->wait_reply--;
}

void rd_kafka_handle_metadata_update_op(rd_kafka_t *rk, rd_kafka_metadata_internal_t* mdi) {
void rd_kafka_handle_metadata_update_op(rd_kafka_t *rk,
rd_kafka_metadata_internal_t *mdi) {
int i, j;
struct rd_kafka_metadata_cache_entry *rkmce;
rd_kafka_broker_t *rkb;
Expand All @@ -3800,60 +3801,83 @@ void rd_kafka_handle_metadata_update_op(rd_kafka_t *rk, rd_kafka_metadata_intern

for (i = 0; i < mdi->metadata.broker_cnt; i++) {
/* TODO: How to fetch Proto? */
rd_kafka_broker_update(rk, RD_KAFKA_PROTO_PLAINTEXT, &mdi->metadata.brokers[i], NULL);
rd_kafka_broker_update(rk, RD_KAFKA_PROTO_PLAINTEXT,
&mdi->metadata.brokers[i], NULL);
}

for (i = 0; i < mdi->metadata.topic_cnt; i++) {
if (!RD_KAFKA_UUID_IS_ZERO(mdi->topics[i].topic_id)) {
rkmce = rd_kafka_metadata_cache_find_by_id(rk, mdi->topics[i].topic_id, 1);
rkmce = rd_kafka_metadata_cache_find_by_id(
rk, mdi->topics[i].topic_id, 1);
} else {
rkmce = rd_kafka_metadata_cache_find(rk, mdi->metadata.topics[i].topic, 1);
rkmce = rd_kafka_metadata_cache_find(
rk, mdi->metadata.topics[i].topic, 1);
}

if (!rkmce) {
rd_kafka_log(rk, LOG_WARNING, "METADATA",
"Metadata update: Topic %s not found in cache",
mdi->metadata.topics[i].topic);
rd_kafka_log(
rk, LOG_WARNING, "METADATA",
"Metadata update: Topic %s not found in cache",
mdi->metadata.topics[i].topic);
continue;
}

for (j = 0; j < mdi->metadata.topics[i].partition_cnt; j++) {
mdp = &mdi->metadata.topics[i].partitions[j];
mdp = &mdi->metadata.topics[i].partitions[j];
mdp_int = &mdi->topics[i].partitions[j];

if (mdp->id >= rkmce->rkmce_mtopic.partition_cnt) {
rd_kafka_log(rk, LOG_WARNING, "METADATA",
"Metadata update: Topic %s partition %d "
"not found in cache",
rkmce->rkmce_mtopic.topic, mdp->id);
rd_kafka_log(
rk, LOG_WARNING, "METADATA",
"Metadata update: Topic %s partition %d "
"not found in cache",
rkmce->rkmce_mtopic.topic, mdp->id);
continue;
}

rkb = rd_kafka_broker_find_by_nodeid(rk, mdp->leader);
if (!rkb) {
rd_kafka_log(rk, LOG_WARNING, "METADATA",
"Metadata update: Topic %s partition %d "
"leader %"PRId32" not found in cache",
rkmce->rkmce_mtopic.topic, mdp->id, mdp->leader);
rd_kafka_log(
rk, LOG_WARNING, "METADATA",
"Metadata update: Topic %s partition %d "
"leader %" PRId32 " not found in cache",
rkmce->rkmce_mtopic.topic, mdp->id,
mdp->leader);
continue;
}

if (rkmce->rkmce_metadata_internal_topic.partitions[mdp->id].leader_epoch >= mdp_int->leader_epoch) {
rd_kafka_dbg(rk, METADATA, "METADATA",
"Cache entry already has newer leader epoch for topic %s partition %d: ", rkmce->rkmce_mtopic.topic, mdp->id);
if (rkmce->rkmce_metadata_internal_topic
.partitions[mdp->id]
.leader_epoch >= mdp_int->leader_epoch) {
rd_kafka_dbg(
rk, METADATA, "METADATA",
"Cache entry already has newer leader "
"epoch for topic %s partition %d: ",
rkmce->rkmce_mtopic.topic, mdp->id);
continue;
}
cache_updated = rd_true;

rkmce->rkmce_metadata_internal_topic.partitions[mdp->id].leader_epoch = mdp_int->leader_epoch;
rkmce->rkmce_mtopic.partitions[mdp->id].leader = mdp->leader;
rkmce->rkmce_metadata_internal_topic.partitions[mdp->id]
.leader_epoch = mdp_int->leader_epoch;
rkmce->rkmce_mtopic.partitions[mdp->id].leader =
mdp->leader;

rd_kafka_dbg(rk, METADATA, "METADATA",
"Metadata updating: Topic %s partition %d with leader %"PRId32" and epoch %"PRId32,
rkmce->rkmce_mtopic.topic, mdp->id, mdp->leader, mdp_int->leader_epoch);

rd_kafka_metadata_cache_topic_update(rk, &rkmce->rkmce_mtopic, &rkmce->rkmce_metadata_internal_topic, rd_true, rd_true, mdi->brokers, mdi->metadata.broker_cnt, rd_true);
rd_kafka_topic_metadata_update2(rkb, &rkmce->rkmce_mtopic, &rkmce->rkmce_metadata_internal_topic);
"Metadata updating: Topic %s partition %d "
"with leader %" PRId32
" and epoch %" PRId32,
rkmce->rkmce_mtopic.topic, mdp->id,
mdp->leader, mdp_int->leader_epoch);

rd_kafka_metadata_cache_topic_update(
rk, &rkmce->rkmce_mtopic,
&rkmce->rkmce_metadata_internal_topic, rd_true,
rd_true, mdi->brokers, mdi->metadata.broker_cnt,
rd_true);
rd_kafka_topic_metadata_update2(
rkb, &rkmce->rkmce_mtopic,
&rkmce->rkmce_metadata_internal_topic);
}
}

Expand Down Expand Up @@ -4156,8 +4180,6 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,
break;

case RD_KAFKA_OP_METADATA_951:
/* TODO: Callback to merge metadata rko_u.metadata.mdi and
* update cache. Phase 5 */
rd_kafka_handle_metadata_update_op(rk, rko->rko_u.metadata.mdi);
break;

Expand Down
20 changes: 1 addition & 19 deletions src/rdkafka_fetcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -342,24 +342,6 @@ static void rd_kafka_fetch_reply_handle_partition_error(
rd_kafka_toppar_fetch_backoff(rkb, rktp, err);
}

typedef struct rd_kafkap_fetch_reply_PartitionTags_s {
int32_t PartitionId;
rd_kafkap_CurrentLeader_t CurrentLeader;
} rd_kafkap_fetch_reply_PartitionTags_t;

typedef struct rd_kafkap_fetch_reply_TopicTags_s {
int32_t PartitionCnt;
char *TopicName;
rd_kafka_Uuid_t TopicId;
rd_kafkap_fetch_reply_PartitionTags_t *PartitionTags;
} rd_kafkap_fetch_reply_TopicTags_t;

typedef struct rd_kafkap_fetch_reply_tags_s {
rd_kafkap_NodeEndpoints_t NodeEndpoints;
int32_t TopicCnt;
rd_kafkap_fetch_reply_TopicTags_t *TopicTags;
} rd_kafkap_fetch_reply_tags_t;

static int rd_kafka_fetch_reply_handle_partition_read_tag(rd_kafka_buf_t *rkbuf,
uint64_t tagtype,
uint64_t taglen,
Expand All @@ -373,7 +355,6 @@ static int rd_kafka_fetch_reply_handle_partition_read_tag(rd_kafka_buf_t *rkbuf,
goto err_parse;
return 1;
default:
// TODO: Create enum for tags
rd_kafka_buf_skip(rkbuf, taglen);
return 0;
}
Expand Down Expand Up @@ -733,6 +714,7 @@ static rd_kafka_resp_err_t rd_kafka_fetch_reply_handle_partition(
(hdr.ErrorCode ==
RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION ||
hdr.ErrorCode == RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH)) {
// TODO: Remove
fprintf(stderr,
"asdasd in fetch partition reading tags\n");
*leader_tags_present = rd_true;
Expand Down
5 changes: 0 additions & 5 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ typedef struct rd_kafka_metadata_partition_internal_s {

/**
* @brief Metadata topic internal container
* // FETCH -> []
* // PRODUCE -> push the op with metadata
*
* In main thread, find the topic id from cache, then merge metadata
* Update the cache
*/
typedef struct rd_kafka_metadata_topic_internal_s {
/** Internal metadata partition structs.
Expand Down
12 changes: 5 additions & 7 deletions src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,17 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn,

/* Partition tags */
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 10 &&
err ==
RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION) {
err == RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION) {
/* Tag type */
rd_kafka_buf_write_uvarint(resp, 0);
/* Tag len = 4 (leader_id) + 4
* (leader_epoch) + 1 (tags) */
* (leader_epoch) + 1 (tags) */
rd_kafka_buf_write_uvarint(resp, 9);
/* Leader id */
rd_kafka_buf_write_i32(
resp, mpart->leader->id);
rd_kafka_buf_write_i32(resp, mpart->leader->id);
/* Leader epoch */
rd_kafka_buf_write_i32(
resp, mpart->leader_epoch);
rd_kafka_buf_write_i32(resp,
mpart->leader_epoch);
/* Remaining tags */
rd_kafka_buf_write_tags_empty(resp);
}
Expand Down
18 changes: 3 additions & 15 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ int rd_kafka_buf_read_CurrentLeader(rd_kafka_buf_t *rkbuf,
const int log_decode_errors = LOG_ERR;
rd_kafka_buf_read_i32(rkbuf, &CurrentLeader->LeaderId);
rd_kafka_buf_read_i32(rkbuf, &CurrentLeader->LeaderEpoch);
// TODO: Remove
fprintf(stderr, "asdasd read tags LeaderId: %d LeaderEpoch: %d\n",
CurrentLeader->LeaderId, CurrentLeader->LeaderEpoch);
rd_kafka_buf_skip_tags(rkbuf);
Expand All @@ -533,6 +534,7 @@ int rd_kafka_buf_read_NodeEndpoints(rd_kafka_buf_t *rkbuf,
rd_kafka_buf_read_arraycnt(rkbuf, &NodeEndpoints->NodeEndpointCnt,
RD_KAFKAP_BROKERS_MAX);
rd_dassert(!NodeEndpoints->NodeEndpoints);
// TODO: Remove
fprintf(stderr, "asdasd read tags NodeEndpointCnt: %d\n",
NodeEndpoints->NodeEndpointCnt);
NodeEndpoints->NodeEndpoints =
Expand All @@ -548,6 +550,7 @@ int rd_kafka_buf_read_NodeEndpoints(rd_kafka_buf_t *rkbuf,
&NodeEndpoints->NodeEndpoints[i].Port);
rd_kafka_buf_read_str(rkbuf,
&NodeEndpoints->NodeEndpoints[i].Rack);
// TODO: Remove
fprintf(stderr,
"asdasd read tags NodeId: %d Host: %s Port: %d "
"RackLen: %d\n",
Expand Down Expand Up @@ -3339,21 +3342,6 @@ void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb,
rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
}


// static rd_kafkap_produce_reply_tags_t *
// rd_kafka_produce_reply_tags_new(int32_t TopicArrayCnt) {
// return rd_calloc(1, sizeof(rd_kafkap_produce_reply_tags_t));
//}

// void rd_kafka_produce_reply_tags_set_TopicCnt(
// rd_kafkap_produce_reply_tags_t *reply_tags,
// int32_t TopicCnt) {
// reply_tags->TopicCnt = TopicCnt;
// reply_tags->Topics = rd_calloc(TopicCnt,
// sizeof(*reply_tags->Topics));
//}


static int
rd_kafka_produce_reply_handle_partition_read_tag(rd_kafka_buf_t *rkbuf,
uint64_t tagtype,
Expand Down
18 changes: 18 additions & 0 deletions src/rdkafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,24 @@ typedef struct rd_kafkap_produce_reply_tags_s {
rd_kafkap_produce_reply_tags_Topic_t *TopicTags;
} rd_kafkap_produce_reply_tags_t;

typedef struct rd_kafkap_fetch_reply_PartitionTags_s {
int32_t PartitionId;
rd_kafkap_CurrentLeader_t CurrentLeader;
} rd_kafkap_fetch_reply_PartitionTags_t;

typedef struct rd_kafkap_fetch_reply_TopicTags_s {
int32_t PartitionCnt;
char *TopicName;
rd_kafka_Uuid_t TopicId;
rd_kafkap_fetch_reply_PartitionTags_t *PartitionTags;
} rd_kafkap_fetch_reply_TopicTags_t;

typedef struct rd_kafkap_fetch_reply_tags_s {
rd_kafkap_NodeEndpoints_t NodeEndpoints;
int32_t TopicCnt;
rd_kafkap_fetch_reply_TopicTags_t *TopicTags;
} rd_kafkap_fetch_reply_tags_t;

/**@}*/

rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions(
Expand Down
Loading

0 comments on commit 89ef329

Please sign in to comment.