diff --git a/airflow/providers/apache/kafka/hooks/base.py b/airflow/providers/apache/kafka/hooks/base.py index 189c4cd2aafde3..2f99cb21eaa4d3 100644 --- a/airflow/providers/apache/kafka/hooks/base.py +++ b/airflow/providers/apache/kafka/hooks/base.py @@ -49,7 +49,7 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]: "hidden_fields": ["schema", "login", "password", "port", "host"], "relabeling": {"extra": "Config Dict"}, "placeholders": { - "extra": '{"bootstrap.servers": "localhost:9092"}', + "extra": '{"bootstrap.servers": "localhost:9092", "group.id": "my-group"}', }, } diff --git a/airflow/utils/db.py b/airflow/utils/db.py index b5a5c46345114a..f25594ef688385 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -424,7 +424,7 @@ def create_default_connections(session: Session = NEW_SESSION): Connection( conn_id="kafka_default", conn_type="kafka", - extra=json.dumps({"bootstrap.servers": "broker:29092"}), + extra=json.dumps({"bootstrap.servers": "broker:29092", "group.id": "my-group"}), ), session, ) diff --git a/tests/integration/providers/apache/kafka/hooks/test_admin_client.py b/tests/integration/providers/apache/kafka/hooks/test_admin_client.py index 1200a96565be56..9597456c6f704c 100644 --- a/tests/integration/providers/apache/kafka/hooks/test_admin_client.py +++ b/tests/integration/providers/apache/kafka/hooks/test_admin_client.py @@ -25,7 +25,7 @@ from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook from airflow.utils import db -client_config = {"socket.timeout.ms": 1000, "bootstrap.servers": "broker:29092"} +client_config = {"socket.timeout.ms": 1000, "bootstrap.servers": "broker:29092", "group.id": "my-group"} @pytest.mark.integration("kafka") diff --git a/tests/integration/providers/apache/kafka/operators/test_produce.py b/tests/integration/providers/apache/kafka/operators/test_produce.py index be76fe5f4591fb..f2fa9358e99c4c 100644 --- a/tests/integration/providers/apache/kafka/operators/test_produce.py +++ b/tests/integration/providers/apache/kafka/operators/test_produce.py @@ -41,6 +41,7 @@ class TestProduceToTopic: """ def setup_method(self): + GROUP = "operator.producer.test.integration.test_1" db.merge_conn( Connection( conn_id="kafka_default", @@ -50,6 +51,7 @@ def setup_method(self): "socket.timeout.ms": 10, "message.timeout.ms": 10, "bootstrap.servers": "broker:29092", + "group.id": GROUP, } ), )