diff --git a/CHANGELOG.md b/CHANGELOG.md index b20ec57491..e0095c0998 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ librdkafka v2.2.0 is a feature release: + * Fix a segmentation fault when subscribing to non-existent topics and + using the consume batch functions (#4273). * Store offset commit metadata in `rd_kafka_offsets_store` (@mathispesch, #4084). * Fix a bug that happens when skipping tags, causing buffer underflow in MetadataResponse (#4278). diff --git a/src/rdkafka_queue.c b/src/rdkafka_queue.c index f6bf1ed859..59a751abd9 100644 --- a/src/rdkafka_queue.c +++ b/src/rdkafka_queue.c @@ -724,7 +724,7 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, rko = (rd_kafka_op_t *)rkmessages[i]->_private; rd_kafka_toppar_t *rktp = rko->rko_rktp; int64_t offset = rkmessages[i]->offset + 1; - if (unlikely(rktp->rktp_app_pos.offset < offset)) + if (unlikely(rktp && (rktp->rktp_app_pos.offset < offset))) rd_kafka_update_app_pos( rk, rktp, RD_KAFKA_FETCH_POS( @@ -748,7 +748,7 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, next = TAILQ_NEXT(next, rko_link); rd_kafka_toppar_t *rktp = rko->rko_rktp; int64_t offset = rko->rko_u.fetch.rkm.rkm_rkmessage.offset + 1; - if (rktp->rktp_app_pos.offset < offset) + if (rktp && (rktp->rktp_app_pos.offset < offset)) rd_kafka_update_app_pos( rk, rktp, RD_KAFKA_FETCH_POS( diff --git a/tests/0022-consume_batch.c b/tests/0022-consume_batch.c index 64e826d035..ea7bdf1f2e 100644 --- a/tests/0022-consume_batch.c +++ b/tests/0022-consume_batch.c @@ -196,8 +196,70 @@ static void do_test_consume_batch_oauthbearer_cb(void) { #endif +/** + * @brief Subscribe to a non-existent topic with rd_kafka_consume_batch_queue. + * Verify that a rkmessage with error code ERR_UNKNOWN_TOPIC_OR_PART + * is received. + */ +static void do_test_consume_batch_non_existent_topic(void) { + + char *topic = "non-existent"; + rd_kafka_t *rk; + rd_kafka_topic_partition_list_t *rktpars; + rd_kafka_queue_t *rkq; + rd_kafka_message_t *rkms[1]; + rd_kafka_conf_t *conf; + ssize_t consumed = 0; + + SUB_TEST_QUICK(); + + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "allow.auto.create.topics", "false"); + test_conf_set(conf, "group.id", "test1"); + + /* Create simple consumer */ + rk = test_create_consumer(NULL, NULL, conf, NULL); + + /* Subscribe to the input topic */ + rktpars = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(rktpars, topic, + /* The partition is ignored in + * rd_kafka_subscribe() */ + RD_KAFKA_PARTITION_UA); + + rd_kafka_subscribe(rk, rktpars); + rd_kafka_topic_partition_list_destroy(rktpars); + + /* Create generic consume queue */ + rkq = rd_kafka_queue_get_consumer(rk); + + TEST_SAY("Consuming from non-existent topic\n"); + while ((consumed = rd_kafka_consume_batch_queue(rkq, 1000, rkms, 1)) != + 1) { + TEST_SAY("Consuming from non-existent topic\n"); + } + + TEST_ASSERT(rkms[0]->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, + "Expected ERR_UNKNOWN_TOPIC_OR_PART, not %s: %s", + rd_kafka_err2str(rkms[0]->err), + rd_kafka_message_errstr(rkms[0])); + TEST_SAY("Received ERR_UNKNOWN_TOPIC_OR_PART\n"); + + TEST_SAY("Stopping consumer\n"); + + rd_kafka_message_destroy(rkms[0]); + + rd_kafka_queue_destroy(rkq); + + rd_kafka_destroy(rk); + + SUB_TEST_PASS(); +} + + int main_0022_consume_batch(int argc, char **argv) { do_test_consume_batch(); + do_test_consume_batch_non_existent_topic(); return 0; }