Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6786] Added Kafka components, 3rd time's the charm #12388

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -625,16 +625,16 @@ This is the full list of those extras:
.. START EXTRAS HERE

all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid, apache.hdfs,
apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop,
apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes,
crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc,
docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github_enterprise, google,
google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, jira, kerberos, kubernetes,
ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, odbc, openfaas,
opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole,
rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack,
snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv, webhdfs, winrm,
yandex, zendesk
apache.hive, apacke.kafka, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark,
apache.sqoop, apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
cncf.kubernetes, crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop,
dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api,
github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins,
jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql,
mysql, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres,
presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp,
singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv,
webhdfs, winrm, yandex, zendesk

.. END EXTRAS HERE

Expand Down
20 changes: 10 additions & 10 deletions INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,16 @@ pip install -e . \
# START EXTRAS HERE

all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid, apache.hdfs,
apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop,
apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes,
crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc,
docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github_enterprise, google,
google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, jira, kerberos, kubernetes,
ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, odbc, openfaas,
opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole,
rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack,
snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv, webhdfs, winrm,
yandex, zendesk
apache.hive, apacke.kafka, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark,
apache.sqoop, apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
cncf.kubernetes, crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop,
dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api,
github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins,
jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql,
mysql, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres,
presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp,
singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv,
webhdfs, winrm, yandex, zendesk

# END EXTRAS HERE

Expand Down
62 changes: 62 additions & 0 deletions airflow/providers/apache/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->


# Package apache-airflow-providers-apache-kafka

Release: 2.0.0b2

**Table of contents**

- [Provider package](#provider-package)
- [Installation](#installation)
- [PIP requirements](#pip-requirements)
- [Provider class summary](#provider-classes-summary)
- [Sensors](#sensors)
- [Hooks](#hooks)
- [Releases](#releases)

## Provider package

This is a provider package for `apache.kafka` provider. All classes for this provider package
are in `airflow.providers.apache.kafka` python package.



## Installation

You can install this package on top of an existing airflow 2.* installation via
`pip install apache-airflow-providers-apache-kafka`

## PIP requirements

| PIP package | Version required |
|:-----------------|:-------------------|
| kafka | &gt;=1.3.5 |


## Sensors



## Hooks



## Releases
17 changes: 17 additions & 0 deletions airflow/providers/apache/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
17 changes: 17 additions & 0 deletions airflow/providers/apache/kafka/example_dags/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
44 changes: 44 additions & 0 deletions airflow/providers/apache/kafka/example_dags/example_kafka_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from datetime import timedelta

from airflow import DAG
from airflow.providers.apache.kafka.sensors.kafka import KafkaSensor
from airflow.utils.timezone import datetime

DAG_ID = "example_kafka_dag"
dag_start_date = datetime(2015, 6, 1, hour=20, tzinfo=None)
default_args = {
'depends_on_past': False,
'provide_context': True,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}

with DAG(
start_date=dag_start_date,
dag_id=DAG_ID,
default_args=default_args,
schedule_interval=None,
max_active_runs=1,
concurrency=4,
catchup=False,
) as dag:

sensor = KafkaSensor(task_id='trigger', topic='', host='', port='')
17 changes: 17 additions & 0 deletions airflow/providers/apache/kafka/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
88 changes: 88 additions & 0 deletions airflow/providers/apache/kafka/hooks/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


from kafka import KafkaConsumer

from airflow.hooks.base_hook import BaseHook


class KafkaConsumerHook(BaseHook):
"""KafkaConsumerHook Class."""

DEFAULT_HOST = 'kafka1'
DEFAULT_PORT = 9092

def __init__(self, topic, host=DEFAULT_HOST, port=DEFAULT_PORT, kafka_conn_id='kafka_default'):
super().__init__(None)
self.conn_id = kafka_conn_id
self._conn = None
self.server = None
self.consumer = None
self.extra_dejson = {}
self.topic = topic
self.host = host
self.port = port

def get_conn(self) -> KafkaConsumer:
"""
A Kafka Consumer object.

:return:
A Kafka Consumer object.
"""
if not self.consumer:
conn = self.get_connection(self.conn_id)
service_options = conn.extra_dejson
host = conn.host or self.DEFAULT_HOST
port = conn.port or self.DEFAULT_PORT

self.server = f"""{host}:{port}"""
self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server, **service_options)
return self.consumer

def get_messages(self, timeout_ms=5000) -> dict:
"""
Get all the messages haven't been consumed, it doesn't
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Get all the messages haven't been consumed,"

If we use poll() without max_records, the behavior is returns at most "max_poll_records" #records. "max_poll_records" is setted to 500 by default at Consumer Init config.

So, we're not going to consume "all" message except we put a very high number as max_poll_records(could be a memory bomb) or we have a low number of message in the topic.

https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html#kafka.KafkaConsumer.poll

block by default, then commit the offset.

:param timeout_ms:
:return:
A list of messages
"""
consumer = self.get_conn()
try:
messages = consumer.poll(timeout_ms)
# consumer.commit()
finally:
consumer.close()
return messages

def __repr__(self):
"""
A pretty version of the connection string.

:return:
A pretty version of the connection string.
"""
connected = self.consumer is not None
return '<KafkaConsumerHook ' 'connected?=%s server=%s topic=%s>' % (
connected,
self.server,
self.topic,
)
Loading