From e63e32f94e40c88699b50d63fce65310ea5723c9 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Sat, 11 May 2024 09:15:13 +0200 Subject: [PATCH] Group id is mandatory configurtion option for confluent_kafka 2.4.0+ The group.id parameter has been optional before 2.4.0 version of confluent_kafka relased on 7th of May. This started to fail our integration tests (cool). This PR adds "group.id" as extra field and sets the defaults for integration testing. --- airflow/providers/apache/kafka/hooks/base.py | 2 +- airflow/utils/db.py | 2 +- .../providers/apache/kafka/hooks/test_admin_client.py | 2 +- .../providers/apache/kafka/operators/test_produce.py | 2 ++ 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/airflow/providers/apache/kafka/hooks/base.py b/airflow/providers/apache/kafka/hooks/base.py index 189c4cd2aafde..2f99cb21eaa4d 100644 --- a/airflow/providers/apache/kafka/hooks/base.py +++ b/airflow/providers/apache/kafka/hooks/base.py @@ -49,7 +49,7 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]: "hidden_fields": ["schema", "login", "password", "port", "host"], "relabeling": {"extra": "Config Dict"}, "placeholders": { - "extra": '{"bootstrap.servers": "localhost:9092"}', + "extra": '{"bootstrap.servers": "localhost:9092", "group.id": "my-group"}', }, } diff --git a/airflow/utils/db.py b/airflow/utils/db.py index b5a5c46345114..f25594ef68838 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -424,7 +424,7 @@ def create_default_connections(session: Session = NEW_SESSION): Connection( conn_id="kafka_default", conn_type="kafka", - extra=json.dumps({"bootstrap.servers": "broker:29092"}), + extra=json.dumps({"bootstrap.servers": "broker:29092", "group.id": "my-group"}), ), session, ) diff --git a/tests/integration/providers/apache/kafka/hooks/test_admin_client.py b/tests/integration/providers/apache/kafka/hooks/test_admin_client.py index 1200a96565be5..9597456c6f704 100644 --- a/tests/integration/providers/apache/kafka/hooks/test_admin_client.py +++ b/tests/integration/providers/apache/kafka/hooks/test_admin_client.py @@ -25,7 +25,7 @@ from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook from airflow.utils import db -client_config = {"socket.timeout.ms": 1000, "bootstrap.servers": "broker:29092"} +client_config = {"socket.timeout.ms": 1000, "bootstrap.servers": "broker:29092", "group.id": "my-group"} @pytest.mark.integration("kafka") diff --git a/tests/integration/providers/apache/kafka/operators/test_produce.py b/tests/integration/providers/apache/kafka/operators/test_produce.py index be76fe5f4591f..f2fa9358e99c4 100644 --- a/tests/integration/providers/apache/kafka/operators/test_produce.py +++ b/tests/integration/providers/apache/kafka/operators/test_produce.py @@ -41,6 +41,7 @@ class TestProduceToTopic: """ def setup_method(self): + GROUP = "operator.producer.test.integration.test_1" db.merge_conn( Connection( conn_id="kafka_default", @@ -50,6 +51,7 @@ def setup_method(self): "socket.timeout.ms": 10, "message.timeout.ms": 10, "bootstrap.servers": "broker:29092", + "group.id": GROUP, } ), )