Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add provider for Apache Kafka #30175

Merged
merged 14 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ body:
- apache-hdfs
- apache-hive
- apache-impala
- apache-kafka
- apache-kylin
- apache-livy
- apache-pig
Expand Down Expand Up @@ -66,6 +67,7 @@ body:
- influxdb
- jdbc
- jenkins
- apache-kafka
dylanbstorey marked this conversation as resolved.
Show resolved Hide resolved
- microsoft-azure
- microsoft-mssql
- microsoft-psrp
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,11 @@ jobs:
breeze testing integration-tests --integration trino --integration kerberos
breeze stop
if: needs.build-info.outputs.runs-on != 'self-hosted'
- name: "Integration Tests Postgres: Kafka"
run: |
breeze testing integration-tests --integration kafka
breeze stop
if: needs.build-info.outputs.runs-on != 'self-hosted'
- name: "Integration Tests Postgres: all-testable"
run: breeze testing integration-tests --integration all-testable
if: needs.build-info.outputs.runs-on == 'self-hosted'
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ repos:
^docs/README.rst$|
^docs/apache-airflow-providers-amazon/secrets-backends/aws-ssm-parameter-store.rst$|
^docs/apache-airflow-providers-apache-hdfs/connections.rst$|
^docs/apache-airflow-providers-apache-kafka/connections/kafka.rst$|
^docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst$|
^docs/apache-airflow-providers-microsoft-azure/connections/azure_cosmos.rst$|
^docs/conf.py$|
Expand Down
24 changes: 12 additions & 12 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -610,18 +610,18 @@ This is the full list of those extras:

.. START EXTRAS HERE
aiobotocore, airbyte, alibaba, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra,
apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kylin,
apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs, arangodb, asana,
async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes,
common.sql, crypto, dask, databricks, datadog, dbt.cloud, deprecated_api, devel, devel_all,
devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker, druid, elasticsearch, exasol,
facebook, ftp, gcp, gcp_api, github, github_enterprise, google, google_auth, grpc, hashicorp, hdfs,
hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, ldap, leveldb, microsoft.azure,
microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas,
openlineage, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus,
postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry,
sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram,
trino, vertica, virtualenv, webhdfs, winrm, zendesk
apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kafka,
apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs,
arangodb, asana, async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups, cloudant,
cncf.kubernetes, common.sql, crypto, dask, databricks, datadog, dbt.cloud, deprecated_api, devel,
devel_all, devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker, druid, elasticsearch,
exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google, google_auth, grpc,
hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, ldap, leveldb,
microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc,
openfaas, openlineage, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, pinot,
plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid,
sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, tableau, tabular,
telegram, trino, vertica, virtualenv, webhdfs, winrm, zendesk
.. END EXTRAS HERE

Provider packages
Expand Down
24 changes: 12 additions & 12 deletions INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,18 @@ The list of available extras:

# START EXTRAS HERE
aiobotocore, airbyte, alibaba, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra,
apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kylin,
apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs, arangodb, asana,
async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes,
common.sql, crypto, dask, databricks, datadog, dbt.cloud, deprecated_api, devel, devel_all,
devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker, druid, elasticsearch, exasol,
facebook, ftp, gcp, gcp_api, github, github_enterprise, google, google_auth, grpc, hashicorp, hdfs,
hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, ldap, leveldb, microsoft.azure,
microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas,
openlineage, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus,
postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry,
sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram,
trino, vertica, virtualenv, webhdfs, winrm, zendesk
apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kafka,
apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs,
arangodb, asana, async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups, cloudant,
cncf.kubernetes, common.sql, crypto, dask, databricks, datadog, dbt.cloud, deprecated_api, devel,
devel_all, devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker, druid, elasticsearch,
exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google, google_auth, grpc,
hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, ldap, leveldb,
microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc,
openfaas, openlineage, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, pinot,
plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid,
sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, tableau, tabular,
telegram, trino, vertica, virtualenv, webhdfs, winrm, zendesk
# END EXTRAS HERE

# For installing Airflow in development environments - see CONTRIBUTING.rst
Expand Down
1 change: 1 addition & 0 deletions airflow/provider.yaml.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
"gcp",
"gmp",
"google",
"kafka",
"protocol",
"service",
"software",
Expand Down
25 changes: 25 additions & 0 deletions airflow/providers/apache/kafka/CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Changelog
---------

1.0.0
.....

Initial version of the provider.
17 changes: 17 additions & 0 deletions airflow/providers/apache/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
16 changes: 16 additions & 0 deletions airflow/providers/apache/kafka/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
79 changes: 79 additions & 0 deletions airflow/providers/apache/kafka/hooks/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Any

from confluent_kafka.admin import AdminClient

from airflow.compat.functools import cached_property
from airflow.hooks.base import BaseHook


class KafkaBaseHook(BaseHook):
"""
A base hook for interacting with Apache Kafka

:param kafka_config_id: The connection object to use, defaults to "kafka_default"
"""

conn_name_attr = "kafka_config_id"
default_conn_name = "kafka_default"
conn_type = "kafka"
hook_name = "Apache Kafka"

def __init__(self, kafka_config_id=default_conn_name, *args, **kwargs):
dylanbstorey marked this conversation as resolved.
Show resolved Hide resolved
"""Initialize our Base"""
super().__init__()
self.kafka_config_id = kafka_config_id
self.get_conn

@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
"""Returns custom field behaviour"""
return {
"hidden_fields": ["schema", "login", "password", "port", "host"],
"relabeling": {"extra": "Config Dict"},
"placeholders": {
"extra": '{"bootstrap.servers": "localhost:9092"}',
},
}

def _get_client(self, config):
raise NotImplementedError

@cached_property
def get_conn(self) -> Any:
"""get the configuration object"""
config = self.get_connection(self.kafka_config_id).extra_dejson

if not (config.get("bootstrap.servers", None)):
raise ValueError("config['bootstrap.servers'] must be provided.")

return self._get_client(config)

def test_connection(self) -> tuple[bool, str]:
"""Test Connectivity from the UI"""
try:
config = self.get_connection(self.kafka_config_id).extra_dejson
t = AdminClient(config, timeout=10).list_topics()
if t:
return True, "Connection successful."
except Exception as e:
False, str(e)

return False, "Failed to establish connection."
63 changes: 63 additions & 0 deletions airflow/providers/apache/kafka/hooks/client.py
dylanbstorey marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Any, Sequence

from confluent_kafka import KafkaException
from confluent_kafka.admin import AdminClient, NewTopic

from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook


class KafkaAdminClientHook(KafkaBaseHook):
"""
A hook for interacting with the Kafka Cluster

:param kafka_config_id: The connection object to use, defaults to "kafka_default"
"""

def __init__(self, kafka_config_id=KafkaBaseHook.default_conn_name) -> None:
super().__init__(kafka_config_id=kafka_config_id)

def _get_client(self, config) -> AdminClient:
return AdminClient(config)

def create_topic(
self,
topics: Sequence[Sequence[Any]],
dylanbstorey marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
"""creates a topic

:param topics: a list of topics to create including the number of partitions for the topic
and the replication factor. Format: [ ("topic_name", number of partitions, replication factor)]
"""
admin_client = self.get_conn

new_topics = [NewTopic(t[0], num_partitions=t[1], replication_factor=t[2]) for t in topics]

futures = admin_client.create_topics(new_topics)

for t, f in futures.items():
try:
f.result()
self.log.info("The topic %s has been created.", t)
except KafkaException as e:
if e.args[0].name == "TOPIC_ALREADY_EXISTS":
self.log.warning("The topic %s already exists.", t)
else:
raise
47 changes: 47 additions & 0 deletions airflow/providers/apache/kafka/hooks/consume.py
dylanbstorey marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Sequence

from confluent_kafka import Consumer

from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook


class KafkaConsumerHook(KafkaBaseHook):
"""
A hook for creating a Kafka Consumer

:param kafka_config_id: The connection object to use, defaults to "kafka_default"
:param topics: A list of topics to subscribe to.
"""

def __init__(self, topics: Sequence[str], kafka_config_id=KafkaBaseHook.default_conn_name) -> None:

super().__init__(kafka_config_id=kafka_config_id)
self.topics = topics

def _get_client(self, config) -> Consumer:
return Consumer(config)

def get_consumer(self) -> Consumer:
"""Returns a Consumer that has been subscribed to topics."""
consumer = self.get_conn
consumer.subscribe(self.topics)

return consumer
Loading