Skip to content

Commit

Permalink
Group id is mandatory configurtion option for confluent_kafka 2.4.0+ (#…
Browse files Browse the repository at this point in the history
…39559)

The group.id parameter has been optional before 2.4.0 version of
confluent_kafka relased on 7th of May. This started to fail our
integration tests (cool).

This PR adds "group.id" as extra field and sets the defaults for
integration testing.
  • Loading branch information
potiuk authored May 11, 2024
1 parent ca058a6 commit e637367
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 3 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/apache/kafka/hooks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]:
"hidden_fields": ["schema", "login", "password", "port", "host"],
"relabeling": {"extra": "Config Dict"},
"placeholders": {
"extra": '{"bootstrap.servers": "localhost:9092"}',
"extra": '{"bootstrap.servers": "localhost:9092", "group.id": "my-group"}',
},
}

Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ def create_default_connections(session: Session = NEW_SESSION):
Connection(
conn_id="kafka_default",
conn_type="kafka",
extra=json.dumps({"bootstrap.servers": "broker:29092"}),
extra=json.dumps({"bootstrap.servers": "broker:29092", "group.id": "my-group"}),
),
session,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
from airflow.utils import db

client_config = {"socket.timeout.ms": 1000, "bootstrap.servers": "broker:29092"}
client_config = {"socket.timeout.ms": 1000, "bootstrap.servers": "broker:29092", "group.id": "my-group"}


@pytest.mark.integration("kafka")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class TestProduceToTopic:
"""

def setup_method(self):
GROUP = "operator.producer.test.integration.test_1"
db.merge_conn(
Connection(
conn_id="kafka_default",
Expand All @@ -50,6 +51,7 @@ def setup_method(self):
"socket.timeout.ms": 10,
"message.timeout.ms": 10,
"bootstrap.servers": "broker:29092",
"group.id": GROUP,
}
),
)
Expand Down

0 comments on commit e637367

Please sign in to comment.