Skip to content

Commit

Permalink
Add rack-awareness (KIP-881) to sticky-assignor
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed May 9, 2023
1 parent c349b5d commit 5ef3470
Show file tree
Hide file tree
Showing 7 changed files with 2,419 additions and 531 deletions.
163 changes: 163 additions & 0 deletions src/rdkafka_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,167 @@ void rd_kafka_assignors_term(rd_kafka_t *rk) {
rd_list_destroy(&rk->rk_conf.partition_assignors);
}

/**
* @brief Computes whether rack-aware assignment needs to be used, or not.
*/
rd_bool_t rd_kafka_use_rack_aware_assignment(
rd_kafka_assignor_topic_t **topics,
size_t topic_cnt,
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t broker_rack_pair_cnt) {
/* Computing needs_rack_aware_assignment requires the evaluation of
three criteria:
1. At least one of the member has a non-null rack.
2. At least one common rack exists between members and partitions.
3. There is a partition which doesn't have replicas on all possible
racks, or in other words, all partitions don't have replicas on all
racks. Note that 'all racks' here means racks across all replicas of
all partitions, not including consumer racks. Also note that 'all
racks' are computed per-topic for range assignor, and across topics
for sticky assignor.
*/

int i;
size_t t;
rd_kafka_group_member_t *member;
rd_list_t *all_consumer_racks; /* Contained Type: rd_kafkap_str_t* */
rd_list_t *all_partition_racks; /* Contained Type: rd_kafkap_str_t* */
rd_kafkap_str_t *rack_id = NULL;
rd_list_t *partition_racks; /* Contained Type: rd_list_t * containing
rd_kafkap_str* */
rd_bool_t needs_rack_aware_assignment = rd_true; /* assume true */

/* Criteria 1 */
/* We don't copy racks, so the free function is NULL. */
all_consumer_racks = rd_list_new(0, NULL);

for (t = 0; t < topic_cnt; t++) {
RD_LIST_FOREACH(member, &topics[t]->members, i) {
if (member->rkgm_rack_id &&
RD_KAFKAP_STR_LEN(member->rkgm_rack_id)) {
/* Repetitions are fine, we will dedup it later.
*/
rd_list_add(all_consumer_racks,
member->rkgm_rack_id);
}
}
}
if (rd_list_cnt(all_consumer_racks) == 0)
needs_rack_aware_assignment = rd_false;


/* Critera 2 */
/* We don't copy racks, so the free function is NULL. */
all_partition_racks = rd_list_new(0, NULL);

/* Not required for this Criteria, but initialize it in this loop for
* Criteria 3. */
partition_racks = rd_list_new(0, rd_list_destroy_free);

for (t = 0; t < topic_cnt; t++) {
const int partition_cnt = topics[t]->metadata->partition_cnt;
for (i = 0; i < partition_cnt; i++) {
int j;
rd_list_t *curr_partition_racks = rd_list_new(0, NULL);
for (j = 0;
j < topics[t]->metadata->partitions[i].replica_cnt;
j++) {
int replica_id = topics[t]
->metadata->partitions[i]
.replicas[j];
rd_kafka_broker_id_rack_pair key = {replica_id,
NULL};
rd_kafka_broker_id_rack_pair_t *pair = bsearch(
&key, broker_rack_pair,
broker_rack_pair_cnt,
sizeof(rd_kafka_broker_id_rack_pair_t),
rd_kafka_broker_id_rack_pair_cmp);

if (pair && pair->rack &&
RD_KAFKAP_STR_LEN(pair->rack)) {
rd_list_add(all_partition_racks,
pair->rack);
rd_list_add(curr_partition_racks,
pair->rack);
}
}
rd_list_deduplicate(&curr_partition_racks,
rd_kafkap_str_cmp2);
rd_list_add(partition_racks, curr_partition_racks);
}
}

/* Sort and dedup the racks. */
rd_list_deduplicate(&all_consumer_racks, rd_kafkap_str_cmp2);
rd_list_deduplicate(&all_partition_racks, rd_kafkap_str_cmp2);

/* Iterate through each list in order, and see if there's anything in
* common */
RD_LIST_FOREACH(rack_id, all_consumer_racks, i) {
/* Break if there's even a single match. */
if (rd_list_find(all_partition_racks, rack_id,
rd_kafkap_str_cmp2)) {
break;
}
}
if (i == rd_list_cnt(all_consumer_racks))
needs_rack_aware_assignment = rd_false;

/* Criteria 3 */
for (t = 0; t < topic_cnt && needs_rack_aware_assignment; t++) {
const int partition_cnt = topics[t]->metadata->partition_cnt;
for (i = 0; i < partition_cnt && needs_rack_aware_assignment;
i++) {
/* Since partition_racks[i] is a subset of
* all_partition_racks, and both of them are deduped,
* the same size indicates that they're equal. */
if (rd_list_cnt(all_partition_racks) !=
rd_list_cnt(rd_list_elem(partition_racks, i))) {
break;
}
}
if (i < partition_cnt) {
/* Break outer loop if inner loop was broken. */
break;
}
}

/* Implies that all partitions have replicas on all racks. */
if (t == topic_cnt)
needs_rack_aware_assignment = rd_false;


rd_list_destroy(all_consumer_racks);
rd_list_destroy(all_partition_racks);
rd_list_destroy(partition_racks);

return needs_rack_aware_assignment;
}


/* Helper to compute rd_kafka_broker_id_rack_pair_t* to pass the assignor for
* unit tests. Passing num_broker_racks = 0 will return NULL racks. */
rd_kafka_broker_id_rack_pair_t *
ut_compute_broker_rack_pairs(int num_brokers,
int num_broker_racks,
rd_kafkap_str_t *all_racks[],
size_t all_racks_cnt) {
int i;
rd_kafka_broker_id_rack_pair_t *pairs = rd_calloc(
(size_t)num_brokers, sizeof(rd_kafka_broker_id_rack_pair_t));

rd_assert(num_broker_racks < (int)all_racks_cnt);

for (i = 0; i < num_brokers; i++) {
pairs[i].broker_id = i;
pairs[i].rack =
num_broker_racks ? all_racks[i % num_broker_racks] : NULL;
}

return pairs;
}


/**
* @brief Set a member's owned partitions based on its assignment.
Expand Down Expand Up @@ -725,6 +886,8 @@ static void ut_init_member_internal(rd_kafka_group_member_t *rkgm,

rkgm->rkgm_assignment =
rd_kafka_topic_partition_list_new(rkgm->rkgm_subscription->size);

rkgm->rkgm_generation = 1;
}

/**
Expand Down
92 changes: 91 additions & 1 deletion src/rdkafka_assignor.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,34 @@ void rd_kafka_group_member_clear(rd_kafka_group_member_t *rkgm);
rd_kafka_resp_err_t rd_kafka_range_assignor_init(rd_kafka_t *rk);
rd_kafka_resp_err_t rd_kafka_roundrobin_assignor_init(rd_kafka_t *rk);
rd_kafka_resp_err_t rd_kafka_sticky_assignor_init(rd_kafka_t *rk);
rd_bool_t rd_kafka_use_rack_aware_assignment(
rd_kafka_assignor_topic_t **topics,
size_t topic_cnt,
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t broker_rack_pair_cnt);

/**
* @name Common unit test functions to use across assignors.
* @name Common unit test functions, macros, and enums to use across assignors.
*
*
*
*/

/* Tests can be parametrized to contain either only broker racks, only consumer
* racks or both.*/
typedef enum {
RD_KAFKA_RANGE_ASSIGNOR_UT_NO_BROKER_RACK = 0,
RD_KAFKA_RANGE_ASSIGNOR_UT_NO_CONSUMER_RACK = 1,
RD_KAFKA_RANGE_ASSIGNOR_UT_BROKER_AND_CONSUMER_RACK = 2,
RD_KAFKA_RANGE_ASSIGNOR_UT_CONFIG_CNT = 3,
} rd_kafka_assignor_ut_rack_config_t;


rd_kafka_broker_id_rack_pair_t *
ut_compute_broker_rack_pairs(int num_brokers,
int num_broker_racks,
rd_kafkap_str_t *all_racks[],
size_t all_racks_cnt);

void ut_set_owned(rd_kafka_group_member_t *rkgm);

Expand Down Expand Up @@ -314,6 +334,76 @@ int isFullyBalanced0(const char *function,
return 1; \
} while (0)

/* Helper macro to initialize a consumer with or without a rack depending on the
* value of parametrization. */
#define ut_initMemberConditionalRack(member_ptr, member_id, rack, \
parametrization, ...) \
do { \
if (parametrization == \
RD_KAFKA_RANGE_ASSIGNOR_UT_NO_CONSUMER_RACK) { \
ut_init_member(member_ptr, member_id, __VA_ARGS__); \
} else { \
ut_init_member_with_rackv(member_ptr, member_id, rack, \
__VA_ARGS__); \
} \
} while (0)

/* Helper macro to initialize rd_kafka_metadata_t* with or without replicas
* depending on the value of parametrization. This accepts variadic arguments
* for topics. */
#define ut_initMetadataConditionalRack( \
metadataPtr, brokerPairPtr, brokerPairCntPtr, replication_factor, \
num_broker_racks, all_racks, all_racks_cnt, parametrization, ...) \
do { \
int num_brokers = num_broker_racks > 0 \
? replication_factor * num_broker_racks \
: replication_factor; \
if (parametrization == \
RD_KAFKA_RANGE_ASSIGNOR_UT_NO_BROKER_RACK) { \
*(metadataPtr) = \
rd_kafka_metadata_new_topic_mockv(__VA_ARGS__); \
*(brokerPairPtr) = NULL; \
*(brokerPairCntPtr) = 0; \
} else { \
*(metadataPtr) = \
rd_kafka_metadata_new_topic_with_partition_replicas_mockv( \
replication_factor, num_brokers, __VA_ARGS__); \
*(brokerPairPtr) = ut_compute_broker_rack_pairs( \
num_brokers, num_broker_racks, all_racks, \
all_racks_cnt); \
*(brokerPairCntPtr) = num_brokers; \
} \
} while (0)


/* Helper macro to initialize rd_kafka_metadata_t* with or without replicas
* depending on the value of parametrization. This accepts a list of topics,
* rather than being variadic.
*/
#define ut_initMetadataConditionalRack0( \
metadataPtr, brokerPairPtr, brokerPairCntPtr, replication_factor, \
num_broker_racks, all_racks, all_racks_cnt, parametrization, topics, \
topic_cnt) \
do { \
int num_brokers = num_broker_racks > 0 \
? replication_factor * num_broker_racks \
: replication_factor; \
if (parametrization == \
RD_KAFKA_RANGE_ASSIGNOR_UT_NO_BROKER_RACK) { \
*(metadataPtr) = rd_kafka_metadata_new_topic_mock( \
topics, topic_cnt, -1, 0); \
*(brokerPairPtr) = NULL; \
*(brokerPairCntPtr) = 0; \
} else { \
*(metadataPtr) = rd_kafka_metadata_new_topic_mock( \
topics, topic_cnt, replication_factor, \
num_brokers); \
*(brokerPairPtr) = ut_compute_broker_rack_pairs( \
num_brokers, num_broker_racks, all_racks, \
all_racks_cnt); \
*(brokerPairCntPtr) = num_brokers; \
} \
} while (0)


#endif /* _RDKAFKA_ASSIGNOR_H_ */
7 changes: 4 additions & 3 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -1441,9 +1441,10 @@ rd_kafka_metadata_new_topic_mock(const rd_kafka_metadata_topic_t *topics,
sizeof(*md) + (sizeof(*md->topics) * topic_cnt) + topic_names_size +
(64 /*topic name size..*/ * topic_cnt) +
(sizeof(*md->topics[0].partitions) * total_partition_cnt) +
(replication_factor > 0
? replication_factor * total_partition_cnt * sizeof(int)
: 0),
/* roundup since writes to tbuf are aligned in strides of 8. */
(replication_factor > 0 ? RD_ROUNDUP(replication_factor, 8) *
total_partition_cnt * sizeof(int)
: 0),
1 /*assert on fail*/);

md = rd_tmpabuf_alloc(&tbuf, sizeof(*md));
Expand Down
Loading

0 comments on commit 5ef3470

Please sign in to comment.