diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 91a2911f5436e5..01b69633a3da44 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -625,16 +625,16 @@ This is the full list of those extras: .. START EXTRAS HERE all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid, apache.hdfs, -apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, -apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes, -crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, -docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github_enterprise, google, -google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, jira, kerberos, kubernetes, -ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, odbc, openfaas, -opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole, -rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, -snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv, webhdfs, winrm, -yandex, zendesk +apache.hive, apacke.kafka, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, +apache.sqoop, apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, +cncf.kubernetes, crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, +dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, +github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, +jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, +mysql, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, +presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, +singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv, +webhdfs, winrm, yandex, zendesk .. END EXTRAS HERE diff --git a/INSTALL b/INSTALL index 87c093eca71e0c..5dd5a45f7acc83 100644 --- a/INSTALL +++ b/INSTALL @@ -72,16 +72,16 @@ pip install -e . \ # START EXTRAS HERE all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid, apache.hdfs, -apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, -apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes, -crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, -docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github_enterprise, google, -google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, jira, kerberos, kubernetes, -ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, odbc, openfaas, -opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole, -rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, -snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv, webhdfs, winrm, -yandex, zendesk +apache.hive, apacke.kafka, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, +apache.sqoop, apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, +cncf.kubernetes, crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, +dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, +github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, +jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, +mysql, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, +presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, +singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv, +webhdfs, winrm, yandex, zendesk # END EXTRAS HERE diff --git a/airflow/providers/apache/kafka/README.md b/airflow/providers/apache/kafka/README.md new file mode 100644 index 00000000000000..d4fd559dd9cb4c --- /dev/null +++ b/airflow/providers/apache/kafka/README.md @@ -0,0 +1,62 @@ + + + +# Package apache-airflow-providers-apache-kafka + +Release: 2.0.0b2 + +**Table of contents** + +- [Provider package](#provider-package) +- [Installation](#installation) +- [PIP requirements](#pip-requirements) +- [Provider class summary](#provider-classes-summary) + - [Sensors](#sensors) + - [Hooks](#hooks) +- [Releases](#releases) + +## Provider package + +This is a provider package for `apache.kafka` provider. All classes for this provider package +are in `airflow.providers.apache.kafka` python package. + + + +## Installation + +You can install this package on top of an existing airflow 2.* installation via +`pip install apache-airflow-providers-apache-kafka` + +## PIP requirements + +| PIP package | Version required | +|:-----------------|:-------------------| +| kafka | >=1.3.5 | + + +## Sensors + + + +## Hooks + + + +## Releases diff --git a/airflow/providers/apache/kafka/__init__.py b/airflow/providers/apache/kafka/__init__.py new file mode 100644 index 00000000000000..217e5db9607827 --- /dev/null +++ b/airflow/providers/apache/kafka/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/apache/kafka/example_dags/__init__.py b/airflow/providers/apache/kafka/example_dags/__init__.py new file mode 100644 index 00000000000000..217e5db9607827 --- /dev/null +++ b/airflow/providers/apache/kafka/example_dags/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/apache/kafka/example_dags/example_kafka_dag.py b/airflow/providers/apache/kafka/example_dags/example_kafka_dag.py new file mode 100644 index 00000000000000..a6832693dbd236 --- /dev/null +++ b/airflow/providers/apache/kafka/example_dags/example_kafka_dag.py @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datetime import timedelta + +from airflow import DAG +from airflow.providers.apache.kafka.sensors.kafka import KafkaSensor +from airflow.utils.timezone import datetime + +DAG_ID = "example_kafka_dag" +dag_start_date = datetime(2015, 6, 1, hour=20, tzinfo=None) +default_args = { + 'depends_on_past': False, + 'provide_context': True, + 'retries': 3, + 'retry_delay': timedelta(minutes=5), +} + +with DAG( + start_date=dag_start_date, + dag_id=DAG_ID, + default_args=default_args, + schedule_interval=None, + max_active_runs=1, + concurrency=4, + catchup=False, +) as dag: + + sensor = KafkaSensor(task_id='trigger', topic='', host='', port='') diff --git a/airflow/providers/apache/kafka/hooks/__init__.py b/airflow/providers/apache/kafka/hooks/__init__.py new file mode 100644 index 00000000000000..217e5db9607827 --- /dev/null +++ b/airflow/providers/apache/kafka/hooks/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/apache/kafka/hooks/kafka_consumer.py b/airflow/providers/apache/kafka/hooks/kafka_consumer.py new file mode 100644 index 00000000000000..6914070a8be0ce --- /dev/null +++ b/airflow/providers/apache/kafka/hooks/kafka_consumer.py @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from kafka import KafkaConsumer + +from airflow.hooks.base_hook import BaseHook + + +class KafkaConsumerHook(BaseHook): + """KafkaConsumerHook Class.""" + + DEFAULT_HOST = 'kafka1' + DEFAULT_PORT = 9092 + + def __init__(self, topic, host=DEFAULT_HOST, port=DEFAULT_PORT, kafka_conn_id='kafka_default'): + super().__init__(None) + self.conn_id = kafka_conn_id + self._conn = None + self.server = None + self.consumer = None + self.extra_dejson = {} + self.topic = topic + self.host = host + self.port = port + + def get_conn(self) -> KafkaConsumer: + """ + A Kafka Consumer object. + + :return: + A Kafka Consumer object. + """ + if not self.consumer: + conn = self.get_connection(self.conn_id) + service_options = conn.extra_dejson + host = conn.host or self.DEFAULT_HOST + port = conn.port or self.DEFAULT_PORT + + self.server = f"""{host}:{port}""" + self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server, **service_options) + return self.consumer + + def get_messages(self, timeout_ms=5000) -> dict: + """ + Get all the messages haven't been consumed, it doesn't + block by default, then commit the offset. + + :param timeout_ms: + :return: + A list of messages + """ + consumer = self.get_conn() + try: + messages = consumer.poll(timeout_ms) + # consumer.commit() + finally: + consumer.close() + return messages + + def __repr__(self): + """ + A pretty version of the connection string. + + :return: + A pretty version of the connection string. + """ + connected = self.consumer is not None + return '' % ( + connected, + self.server, + self.topic, + ) diff --git a/airflow/providers/apache/kafka/hooks/kafka_producer.py b/airflow/providers/apache/kafka/hooks/kafka_producer.py new file mode 100644 index 00000000000000..eb2524a5fb63fd --- /dev/null +++ b/airflow/providers/apache/kafka/hooks/kafka_producer.py @@ -0,0 +1,96 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from kafka import KafkaProducer + +from airflow.hooks.base_hook import BaseHook + + +class KafkaProducerHook(BaseHook): + """KafkaProducerHook Class.""" + + DEFAULT_HOST = 'localhost' + DEFAULT_PORT = 9092 + + def __init__(self, conn_id): + """ + Initializes an instance of the Kafka Producer Hook class. + + :param conn_id: + The airflow connection ID to use. + """ + super().__init__(None) + self.conn_id = conn_id + self._conn = None + self.server = None + self.producer = None + + def get_conn(self) -> KafkaProducer: + """ + Returns a Kafka Producer + + :return: + A Kafka Producer object. + """ + if not self.producer: + _conn = self.get_connection(self.conn_id) + service_options = _conn.extra_dejson + host = _conn.host or self.DEFAULT_HOST + port = _conn.port or self.DEFAULT_PORT + + self.server = f"{host}:{port}" + self.producer = KafkaProducer(bootstrap_servers=self.server, **service_options) + return self.producer + + def send_message(self, topic, value=None, key=None, partition=None, timestamp_ms=None): + """ + Sends a message on the specified topic and partition. Keyed messages will be sent in order. + + :param topic: + The Kafka topic you wish to publish to. + :param value: + The value of the message you are publishing. + :param key: + The key to publish on. + :param partition: + The partition to publish onto. + :param timestamp_ms: + :return: + """ + producer = self.get_conn() + try: + future_record_metadata = producer.send( + topic, value=value, key=key, partition=partition, timestamp_ms=timestamp_ms + ) + finally: + producer.close() + return future_record_metadata + + def __repr__(self): + """ + A pretty version of the connection string. + + :return: + A pretty version of the connection string. + """ + connected = self.producer is not None + return ''.format( + connected, + self.server, + self.topic, + ) diff --git a/airflow/providers/apache/kafka/provider.yaml b/airflow/providers/apache/kafka/provider.yaml new file mode 100644 index 00000000000000..5553993ff7fc6c --- /dev/null +++ b/airflow/providers/apache/kafka/provider.yaml @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +package-name: apache-airflow-providers-apache-kafka +description: | + `Apache Kafka `__. + +versions: + - 1.0.0 + +integrations: + - integration-name: Apache Kafka + external-doc-url: http://kafka.apache.org/ + tags: [apache] + +sensors: + - integration-name: Apache Kafka + python-modules: + - airflow.providers.apache.kafka.sensors.kafka + +hooks: + - integration-name: Apache Kafka + python-modules: + - airflow.providers.apache.kafka.hooks.kafka_consumer + - airflow.providers.apache.kafka.hooks.kafka_producer diff --git a/airflow/providers/apache/kafka/sensors/__init__.py b/airflow/providers/apache/kafka/sensors/__init__.py new file mode 100644 index 00000000000000..217e5db9607827 --- /dev/null +++ b/airflow/providers/apache/kafka/sensors/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/apache/kafka/sensors/kafka.py b/airflow/providers/apache/kafka/sensors/kafka.py new file mode 100644 index 00000000000000..f768371ca8faec --- /dev/null +++ b/airflow/providers/apache/kafka/sensors/kafka.py @@ -0,0 +1,70 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from cached_property import cached_property + +from airflow.providers.apache.kafka.hooks.kafka_consumer import KafkaConsumerHook +from airflow.sensors.base_sensor_operator import BaseSensorOperator +from airflow.utils.decorators import apply_defaults + + +class KafkaSensor(BaseSensorOperator): + """Consumes the Kafka message with the specific topic""" + + DEFAULT_HOST = 'kafka1' + DEFAULT_PORT = 9092 + templated_fields = ('topic', 'host', 'port', ß) + + @apply_defaults + def __init__(self, topic, host=DEFAULT_HOST, port=DEFAULT_PORT, *args, **kwargs): + """ + Initialize the sensor, the connection establish + is put off to it's first time usage. + + :param topic: + :param host: + :param port: + :param args: + :param kwargs: + """ + self.topic = topic + self.host = host + self.port = port + super().__init__(*args, **kwargs) + + @cached_property + def hook(self): + """Returns a Kafka Consumer Hook""" + return KafkaConsumerHook(self.topic, self.host, self.port) + + def poke(self, context): + """ + Checks to see if messages exist on this topic/partition. + + :param context: + :return: + """ + self.log.info('Poking topic: %s, using hook: %s', str(self.topic), str(self.hook)) + + messages = self.hook.get_messages() + + if messages: + self.log.info('Got messages during poking: %s', str(messages)) + return messages + else: + return False diff --git a/setup.py b/setup.py index a918718baa5090..afa59c79d1a39b 100644 --- a/setup.py +++ b/setup.py @@ -302,6 +302,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version jira = [ 'JIRA>1.0.7', ] +kafka = ['kafka>=1.3.5'] kerberos = [ 'pykerberos>=1.1.13', 'requests_kerberos>=0.10.0', @@ -519,6 +520,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version 'apache.druid': druid, 'apache.hdfs': hdfs, 'apache.hive': hive, + 'apache.kafka': kafka, 'apache.kylin': kylin, 'apache.livy': [], 'apache.pig': [], diff --git a/tests/providers/apache/kafka/__init__.py b/tests/providers/apache/kafka/__init__.py new file mode 100644 index 00000000000000..217e5db9607827 --- /dev/null +++ b/tests/providers/apache/kafka/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/apache/kafka/hooks/__init__.py b/tests/providers/apache/kafka/hooks/__init__.py new file mode 100644 index 00000000000000..217e5db9607827 --- /dev/null +++ b/tests/providers/apache/kafka/hooks/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/apache/kafka/hooks/test_kafka_consumer.py b/tests/providers/apache/kafka/hooks/test_kafka_consumer.py new file mode 100644 index 00000000000000..0f2e321f895174 --- /dev/null +++ b/tests/providers/apache/kafka/hooks/test_kafka_consumer.py @@ -0,0 +1,88 @@ +# # +# # Licensed to the Apache Software Foundation (ASF) under one +# # or more contributor license agreements. See the NOTICE file +# # distributed with this work for additional information +# # regarding copyright ownership. The ASF licenses this file +# # to you under the Apache License, Version 2.0 (the +# # "License"); you may not use this file except in compliance +# # with the License. You may obtain a copy of the License at +# # +# # http://www.apache.org/licenses/LICENSE-2.0 +# # +# # Unless required by applicable law or agreed to in writing, +# # software distributed under the License is distributed on an +# # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# # KIND, either express or implied. See the License for the +# # specific language governing permissions and limitations +# # under the License. +# +# +# from kafka import KafkaConsumer +# +# from airflow.hooks.base import BaseHook +# +# +# class KafkaConsumerHook(BaseHook): +# """KafkaConsumerHook Class.""" +# +# DEFAULT_HOST = 'kafka1' +# DEFAULT_PORT = 9092 +# +# def __init__(self, topic, host=DEFAULT_HOST, port=DEFAULT_PORT, kafka_conn_id='kafka_default'): +# super().__init__(None) +# self.conn_id = kafka_conn_id +# self._conn = None +# self.server = None +# self.consumer = None +# self.extra_dejson = {} +# self.topic = topic +# self.host = host +# self.port = port +# +# def get_conn(self) -> KafkaConsumer: +# """ +# A Kafka Consumer object. +# +# :return: +# A Kafka Consumer object. +# """ +# if not self._conn: +# conn = self.get_connection(self.conn_id) +# service_options = conn.extra_dejson +# host = conn.host or self.DEFAULT_HOST +# port = conn.port or self.DEFAULT_PORT +# +# self.server = f"""{host}:{port}""" +# self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server, **service_options) +# return self.consumer +# +# def get_messages(self, timeout_ms=5000) -> dict: +# """ +# Get all the messages haven't been consumed, it doesn't +# block by default, then commit the offset. +# +# :param timeout_ms: +# :return: +# A list of messages +# """ +# consumer = self.get_conn() +# try: +# messages = consumer.poll(timeout_ms) +# # consumer.commit() +# finally: +# consumer.close() +# return messages +# +# def __repr__(self): +# """ +# A pretty version of the connection string. +# +# :return: +# A pretty version of the connection string. +# """ +# connected = self.consumer is not None +# return '' % ( +# connected, +# self.server, +# self.topic, +# ) diff --git a/tests/providers/apache/kafka/hooks/test_kafka_producer.py b/tests/providers/apache/kafka/hooks/test_kafka_producer.py new file mode 100644 index 00000000000000..cb29fc47b57155 --- /dev/null +++ b/tests/providers/apache/kafka/hooks/test_kafka_producer.py @@ -0,0 +1,95 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from kafka import KafkaProducer + +from airflow.hooks.base_hook import BaseHook + + +class KafkaProducerHook(BaseHook): + """KafkaProducerHook Class.""" + + DEFAULT_HOST = 'localhost' + DEFAULT_PORT = 9092 + + def __init__(self, conn_id): + """ + Initializes an instance of the Kafka Producer Hook class. + :param conn_id + The airflow connection ID to use. + """ + super().__init__(None) + self.conn_id = conn_id + self._conn = None + self.server = None + self.producer = None + + def get_conn(self) -> KafkaProducer: + """ + Returns a Kafka Producer + + :return: + A Kafka Producer object. + """ + if not self._conn: + _conn = self.get_connection(self.conn_id) + service_options = _conn.extra_dejson + host = _conn.host or self.DEFAULT_HOST + port = _conn.port or self.DEFAULT_PORT + + self.server = f"""{host}:{port}""" + self.producer = KafkaProducer(bootstrap_servers=self.server, **service_options) + return self.producer + + def send_message(self, topic, value=None, key=None, partition=None, timestamp_ms=None): + """ + Sends a message on the specified topic and partition. Keyed messages will be sent in order. + + :param topic: + The Kafka topic you wish to publish to. + :param value: + The value of the message you are publishing. + :param key: + The key to publish on. + :param partition: + The partition to publish onto. + :param timestamp_ms: + :return: + """ + producer = self.get_conn() + try: + future_record_metadata = producer.send( + topic, value=value, key=key, partition=partition, timestamp_ms=timestamp_ms + ) + finally: + producer.close() + return future_record_metadata + + def __repr__(self): + """ + A pretty version of the connection string. + + :return: + A pretty version of the connection string. + """ + connected = self.producer is not None + return ''.format( + connected, + self.server, + self.topic, + ) diff --git a/tests/providers/apache/kafka/sensors/__init__.py b/tests/providers/apache/kafka/sensors/__init__.py new file mode 100644 index 00000000000000..217e5db9607827 --- /dev/null +++ b/tests/providers/apache/kafka/sensors/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/apache/kafka/sensors/test_kafka.py b/tests/providers/apache/kafka/sensors/test_kafka.py new file mode 100644 index 00000000000000..d0b41faa8b206f --- /dev/null +++ b/tests/providers/apache/kafka/sensors/test_kafka.py @@ -0,0 +1,70 @@ +# # +# # Licensed to the Apache Software Foundation (ASF) under one +# # or more contributor license agreements. See the NOTICE file +# # distributed with this work for additional information +# # regarding copyright ownership. The ASF licenses this file +# # to you under the Apache License, Version 2.0 (the +# # "License"); you may not use this file except in compliance +# # with the License. You may obtain a copy of the License at +# # +# # http://www.apache.org/licenses/LICENSE-2.0 +# # +# # Unless required by applicable law or agreed to in writing, +# # software distributed under the License is distributed on an +# # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# # KIND, either express or implied. See the License for the +# # specific language governing permissions and limitations +# # under the License. +# +# from cached_property import cached_property +# +# from airflow.providers.apache.kafka.hooks.kafka_consumer import KafkaConsumerHook +# from airflow.sensors.base_sensor_operator import BaseSensorOperator +# from airflow.utils.decorators import apply_defaults +# +# +# class KafkaSensor(BaseSensorOperator): +# """Consumes the Kafka message with the specific topic""" +# +# DEFAULT_HOST = 'kafka1' +# DEFAULT_PORT = 9092 +# templated_fields = ('topic', 'host', 'port', ß) +# +# @apply_defaults +# def __init__(self, topic, host=DEFAULT_HOST, port=DEFAULT_PORT, *args, **kwargs): +# """ +# Initialize the sensor, the connection establish +# is put off to it's first time usage. +# +# :param topic: +# :param host: +# :param port: +# :param args: +# :param kwargs: +# """ +# self.topic = topic +# self.host = host +# self.port = port +# super().__init__(*args, **kwargs) +# +# @cached_property +# def hook(self): +# """Returns a Kafka Consumer Hook""" +# return KafkaConsumerHook(self.topic, self.host, self.port) +# +# def poke(self, context): +# """ +# Checks to see if messages exist on this topic/partition. +# +# :param context: +# :return: +# """ +# self.log.info('Poking topic: %s, using hook: %s', str(self.topic), str(self.hook)) +# +# messages = self.hook.get_messages() +# +# if messages: +# self.log.info('Got messages during poking: %s', str(messages)) +# return messages +# else: +# return False