From 06ec064937e230436741c8646220cbd7407fb909 Mon Sep 17 00:00:00 2001 From: wheelly Date: Mon, 5 Feb 2024 14:14:09 +0200 Subject: [PATCH 1/9] #138 - kafka.read first implementation --- .../blocks/kafka/read/__init__.py | 0 .../datayoga_core/blocks/kafka/read/block.py | 37 +++++++++++++++ .../blocks/kafka/read/block.schema.json | 4 ++ .../resources/schemas/connections.schema.json | 3 +- .../schemas/connections/kafka.schema.json | 47 +++++++++++++++++++ integration-tests/common/kafka_utils.py | 9 ++++ integration-tests/resources/connections.yaml | 6 +++ .../resources/jobs/tests/kafka_to_stdout.yaml | 4 ++ integration-tests/test_kafka_to_stdout.py | 15 ++++++ 9 files changed, 124 insertions(+), 1 deletion(-) create mode 100644 core/src/datayoga_core/blocks/kafka/read/__init__.py create mode 100644 core/src/datayoga_core/blocks/kafka/read/block.py create mode 100644 core/src/datayoga_core/blocks/kafka/read/block.schema.json create mode 100644 core/src/datayoga_core/resources/schemas/connections/kafka.schema.json create mode 100644 integration-tests/common/kafka_utils.py create mode 100644 integration-tests/resources/jobs/tests/kafka_to_stdout.yaml create mode 100644 integration-tests/test_kafka_to_stdout.py diff --git a/core/src/datayoga_core/blocks/kafka/read/__init__.py b/core/src/datayoga_core/blocks/kafka/read/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/core/src/datayoga_core/blocks/kafka/read/block.py b/core/src/datayoga_core/blocks/kafka/read/block.py new file mode 100644 index 00000000..8b447b22 --- /dev/null +++ b/core/src/datayoga_core/blocks/kafka/read/block.py @@ -0,0 +1,37 @@ +import logging +from abc import ABCMeta +from itertools import count + +from datayoga_core.context import Context +from typing import AsyncGenerator, List, Optional +from datayoga_core.producer import Producer as DyProducer, Message +from kafka import KafkaConsumer +from kafka.errors import KafkaError + +logger = logging.getLogger("dy") + + +class Block(DyProducer, metaclass=ABCMeta): + port: int + bootstrap_servers: List[str] + group: str + topics: List[str] + INTERNAL_FIELD_PREFIX = "__$$" + MSG_ID_FIELD = f"{INTERNAL_FIELD_PREFIX}msg_id" + + def init(self, context: Optional[Context] = None): + logger.debug(f"Initializing {self.get_block_name()}") + self.port = int(self.properties.get("port", 9092)) + self.bootstrap_servers = self.properties.get("bootstrap_servers", ["0.0.0.0"]) + self.group = self.properties.get("group", "integration-tests") + self.topics = self.properties.get("topics", ["integration-tests"]) + + async def produce(self) -> AsyncGenerator[List[Message], None]: + logger.debug(f"Producing {self.get_block_name()}") + consumer = KafkaConsumer(group_id=self.group, bootstrap_servers=self.bootstrap_servers) + if not consumer.bootstrap_connected(): + raise KafkaError("Unable to connect with kafka container!") + consumer.subscribe(self.topics) + for message in consumer: + counter = iter(count()) + yield [{self.MSG_ID_FIELD: f"{next(counter)}", **message}] \ No newline at end of file diff --git a/core/src/datayoga_core/blocks/kafka/read/block.schema.json b/core/src/datayoga_core/blocks/kafka/read/block.schema.json new file mode 100644 index 00000000..02f08eba --- /dev/null +++ b/core/src/datayoga_core/blocks/kafka/read/block.schema.json @@ -0,0 +1,4 @@ +{ + "title": "kafka.read", + "description": "Read from the kafka consumer" +} diff --git a/core/src/datayoga_core/resources/schemas/connections.schema.json b/core/src/datayoga_core/resources/schemas/connections.schema.json index b307621a..117ad881 100644 --- a/core/src/datayoga_core/resources/schemas/connections.schema.json +++ b/core/src/datayoga_core/resources/schemas/connections.schema.json @@ -16,7 +16,8 @@ "mysql", "postgresql", "redis", - "sqlserver" + "sqlserver", + "kafka" ] }, "if": { diff --git a/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json b/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json new file mode 100644 index 00000000..cab20d0a --- /dev/null +++ b/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json @@ -0,0 +1,47 @@ +{ + "title": "Kafka", + "description": "Schema for configuring Kafka connection parameters", + "type": "object", + "properties": { + "type": { + "description": "Connection type", + "type": "string", + "const": "kafka" + }, + "bootstrap_servers": { + "description": "Kafka Hosts", + "type": "array", + "items": { + "type": "string" + } + }, + "port": { + "description": "Kafka port", + "type": "integer", + "minimum": 1, + "maximum": 65535 + }, + "topics": { + "description": "Kafka Topics", + "type": "array", + "items": { + "type": "string" + } + }, + "group": "Kafka Group", + "type": "string" + }, + "additionalProperties": false, + "required": ["type", "bootstrap_servers", "port", "topics", "group"], + "examples": [ + { + "kafka": { + "type": "kafka", + "host": ["localhost"], + "port": 9093, + "group":"test", + "topics": ["test"] + } + } + ] +} diff --git a/integration-tests/common/kafka_utils.py b/integration-tests/common/kafka_utils.py new file mode 100644 index 00000000..1dd1aaad --- /dev/null +++ b/integration-tests/common/kafka_utils.py @@ -0,0 +1,9 @@ +from typing import List + +from kafka import KafkaProducer +from testcontainers.kafka import KafkaContainer +def get_kafka_container(port: int) -> KafkaContainer: + return KafkaContainer().with_bind_ports(9092, port) + +def get_kafka_producer(topic: str) -> KafkaProducer: + return KafkaProducer() diff --git a/integration-tests/resources/connections.yaml b/integration-tests/resources/connections.yaml index a4cba484..07e578d5 100644 --- a/integration-tests/resources/connections.yaml +++ b/integration-tests/resources/connections.yaml @@ -44,3 +44,9 @@ cassandra: type: cassandra hosts: ["localhost"] port: 9042 +kafka: + type: kafka + bootstrap_servers: ["localhost"] + port: 9093 + group-id: "test" + topics: ["test"] diff --git a/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml b/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml new file mode 100644 index 00000000..6ab8e9b1 --- /dev/null +++ b/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml @@ -0,0 +1,4 @@ +input: + uses: kafka.read +steps: + - uses: std.write diff --git a/integration-tests/test_kafka_to_stdout.py b/integration-tests/test_kafka_to_stdout.py new file mode 100644 index 00000000..474fc0d7 --- /dev/null +++ b/integration-tests/test_kafka_to_stdout.py @@ -0,0 +1,15 @@ +import logging +from common import kafka_utils +from common.utils import run_job + +logger = logging.getLogger("dy") + +def test_kafka_to_stdout(): + kafka_container = kafka_utils.get_kafka_container(9093) + with kafka_container as kafka: + connection = kafka.get_bootstrap_server() + logger.info(f"Connecting to Kafka: {connection}") + producer = kafka_utils.get_kafka_producer("integration-tests") + producer.send("integration-tests", b"test", b"test") + run_job("tests.kafka_to_stdout") + From edbec52c2f83aa46baae3d40ab8d81d581cdbf8c Mon Sep 17 00:00:00 2001 From: wheelly Date: Tue, 6 Feb 2024 17:25:34 +0200 Subject: [PATCH 2/9] #138 - kafka.read first working exchange producer/consumer --- .../datayoga_core/blocks/kafka/read/block.py | 34 ++++++++++++++----- .../blocks/kafka/read/block.schema.json | 16 ++++++++- .../schemas/connections/kafka.schema.json | 27 +++++---------- integration-tests/common/kafka_utils.py | 8 ++--- integration-tests/resources/connections.yaml | 7 ++-- .../resources/jobs/tests/kafka_to_stdout.yaml | 3 ++ integration-tests/test_kafka_to_stdout.py | 14 +++++--- 7 files changed, 68 insertions(+), 41 deletions(-) diff --git a/core/src/datayoga_core/blocks/kafka/read/block.py b/core/src/datayoga_core/blocks/kafka/read/block.py index 8b447b22..88d0c9c9 100644 --- a/core/src/datayoga_core/blocks/kafka/read/block.py +++ b/core/src/datayoga_core/blocks/kafka/read/block.py @@ -5,33 +5,49 @@ from datayoga_core.context import Context from typing import AsyncGenerator, List, Optional from datayoga_core.producer import Producer as DyProducer, Message -from kafka import KafkaConsumer +from kafka import KafkaConsumer, TopicPartition from kafka.errors import KafkaError +from datayoga_core import utils logger = logging.getLogger("dy") +import json + class Block(DyProducer, metaclass=ABCMeta): - port: int bootstrap_servers: List[str] group: str - topics: List[str] + topic: str + seek_to_beginning: bool = False INTERNAL_FIELD_PREFIX = "__$$" MSG_ID_FIELD = f"{INTERNAL_FIELD_PREFIX}msg_id" def init(self, context: Optional[Context] = None): logger.debug(f"Initializing {self.get_block_name()}") - self.port = int(self.properties.get("port", 9092)) - self.bootstrap_servers = self.properties.get("bootstrap_servers", ["0.0.0.0"]) - self.group = self.properties.get("group", "integration-tests") - self.topics = self.properties.get("topics", ["integration-tests"]) + connection_details = utils.get_connection_details(self.properties["bootstrap_servers"], context) + + self.port = int(connection_details.get("port", 9092)) + logger.debug(f"Connection details: {json.dumps(connection_details)}") + self.bootstrap_servers = connection_details.get("bootstrap_servers", ["0.0.0.0"]) + self.group = connection_details.get("group") + self.topic = connection_details.get("topic", "integration-tests") + self.seek_to_beginning = self.properties.get("seek_to_beginning", False) async def produce(self) -> AsyncGenerator[List[Message], None]: logger.debug(f"Producing {self.get_block_name()}") - consumer = KafkaConsumer(group_id=self.group, bootstrap_servers=self.bootstrap_servers) + + if self.seek_to_beginning: + logger.debug(f"Seeking to beginning...") + consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers, group_id=self.group) + tp = TopicPartition(self.topic, 0) + consumer.assign([tp]) + consumer.seek_to_beginning() + else: + consumer = KafkaConsumer(self.topic, bootstrap_servers=self.bootstrap_servers, group_id=self.group) + if not consumer.bootstrap_connected(): raise KafkaError("Unable to connect with kafka container!") - consumer.subscribe(self.topics) for message in consumer: counter = iter(count()) + logger.debug(f"Received message: {message}") yield [{self.MSG_ID_FIELD: f"{next(counter)}", **message}] \ No newline at end of file diff --git a/core/src/datayoga_core/blocks/kafka/read/block.schema.json b/core/src/datayoga_core/blocks/kafka/read/block.schema.json index 02f08eba..a58e2883 100644 --- a/core/src/datayoga_core/blocks/kafka/read/block.schema.json +++ b/core/src/datayoga_core/blocks/kafka/read/block.schema.json @@ -1,4 +1,18 @@ { "title": "kafka.read", - "description": "Read from the kafka consumer" + "description": "Read from the kafka consumer", + "type": "object", + "properties": { + "bootstrap_servers": { + "description": "host name", + "type": "string" + }, + "seek_to_beginning": { + "description": "Consumer seek to beginning", + "type": "boolean" + } + }, + "required": [ + "bootstrap_servers" + ] } diff --git a/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json b/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json index cab20d0a..a40e4874 100644 --- a/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json +++ b/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json @@ -15,32 +15,23 @@ "type": "string" } }, - "port": { - "description": "Kafka port", - "type": "integer", - "minimum": 1, - "maximum": 65535 - }, - "topics": { + "topic": { "description": "Kafka Topics", - "type": "array", - "items": { - "type": "string" - } + "type": "string" }, - "group": "Kafka Group", - "type": "string" + "group": { + "description": "Kafka Group", + "type": "string" + } }, "additionalProperties": false, - "required": ["type", "bootstrap_servers", "port", "topics", "group"], + "required": ["type", "bootstrap_servers", "topic"], "examples": [ { "kafka": { "type": "kafka", - "host": ["localhost"], - "port": 9093, - "group":"test", - "topics": ["test"] + "bootstrap_servers": ["localhost"], + "topic": "test" } } ] diff --git a/integration-tests/common/kafka_utils.py b/integration-tests/common/kafka_utils.py index 1dd1aaad..907b3ac8 100644 --- a/integration-tests/common/kafka_utils.py +++ b/integration-tests/common/kafka_utils.py @@ -2,8 +2,8 @@ from kafka import KafkaProducer from testcontainers.kafka import KafkaContainer -def get_kafka_container(port: int) -> KafkaContainer: - return KafkaContainer().with_bind_ports(9092, port) +def get_kafka_container() -> KafkaContainer: + return KafkaContainer().with_bind_ports(KafkaContainer.KAFKA_PORT, KafkaContainer.KAFKA_PORT) -def get_kafka_producer(topic: str) -> KafkaProducer: - return KafkaProducer() +def get_kafka_producer(bootstrap_servers: str) -> KafkaProducer: + return KafkaProducer(bootstrap_servers=[bootstrap_servers]) diff --git a/integration-tests/resources/connections.yaml b/integration-tests/resources/connections.yaml index 07e578d5..00fdbbc3 100644 --- a/integration-tests/resources/connections.yaml +++ b/integration-tests/resources/connections.yaml @@ -46,7 +46,6 @@ cassandra: port: 9042 kafka: type: kafka - bootstrap_servers: ["localhost"] - port: 9093 - group-id: "test" - topics: ["test"] + bootstrap_servers: ["localhost:9093"] + group: "test" + topic: "integration-tests" diff --git a/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml b/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml index 6ab8e9b1..88d281a8 100644 --- a/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml +++ b/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml @@ -1,4 +1,7 @@ input: uses: kafka.read + with: + bootstrap_servers: kafka + seek_to_beginning: Yes steps: - uses: std.write diff --git a/integration-tests/test_kafka_to_stdout.py b/integration-tests/test_kafka_to_stdout.py index 474fc0d7..ec605e11 100644 --- a/integration-tests/test_kafka_to_stdout.py +++ b/integration-tests/test_kafka_to_stdout.py @@ -5,11 +5,15 @@ logger = logging.getLogger("dy") def test_kafka_to_stdout(): - kafka_container = kafka_utils.get_kafka_container(9093) + kafka_container = kafka_utils.get_kafka_container() with kafka_container as kafka: - connection = kafka.get_bootstrap_server() - logger.info(f"Connecting to Kafka: {connection}") - producer = kafka_utils.get_kafka_producer("integration-tests") - producer.send("integration-tests", b"test", b"test") + bootstrap_servers = kafka.get_bootstrap_server() + logger.info(f"Connecting to Kafka: {bootstrap_servers}") + producer = kafka_utils.get_kafka_producer(bootstrap_servers) + producer.send("integration-tests", b"test") + producer.flush() + producer.close() run_job("tests.kafka_to_stdout") + + From da42af46ab12342d77386ac7d0a368fa4f9ee4f6 Mon Sep 17 00:00:00 2001 From: wheelly Date: Wed, 7 Feb 2024 16:22:29 +0200 Subject: [PATCH 3/9] #138 - kafka.read getting to confluent-kafka and snapshot --- .../datayoga_core/blocks/kafka/read/block.py | 73 +++++++++++++------ .../blocks/kafka/read/block.schema.json | 6 ++ .../schemas/connections/kafka.schema.json | 5 +- integration-tests/common/kafka_utils.py | 11 +-- integration-tests/resources/connections.yaml | 4 +- .../resources/jobs/tests/kafka_to_stdout.yaml | 3 +- integration-tests/test_kafka_to_stdout.py | 14 +++- 7 files changed, 79 insertions(+), 37 deletions(-) diff --git a/core/src/datayoga_core/blocks/kafka/read/block.py b/core/src/datayoga_core/blocks/kafka/read/block.py index 88d0c9c9..b31e2251 100644 --- a/core/src/datayoga_core/blocks/kafka/read/block.py +++ b/core/src/datayoga_core/blocks/kafka/read/block.py @@ -1,26 +1,27 @@ import logging +import json from abc import ABCMeta -from itertools import count from datayoga_core.context import Context from typing import AsyncGenerator, List, Optional from datayoga_core.producer import Producer as DyProducer, Message -from kafka import KafkaConsumer, TopicPartition -from kafka.errors import KafkaError +from confluent_kafka import Consumer, KafkaException, KafkaError, TopicPartition, OFFSET_BEGINNING + +from itertools import count from datayoga_core import utils logger = logging.getLogger("dy") -import json - class Block(DyProducer, metaclass=ABCMeta): - bootstrap_servers: List[str] + bootstrap_servers: str group: str topic: str - seek_to_beginning: bool = False + seek_to_beginning: bool + snapshot: bool INTERNAL_FIELD_PREFIX = "__$$" MSG_ID_FIELD = f"{INTERNAL_FIELD_PREFIX}msg_id" + MIN_COMMIT_COUNT = 10 def init(self, context: Optional[Context] = None): logger.debug(f"Initializing {self.get_block_name()}") @@ -28,26 +29,56 @@ def init(self, context: Optional[Context] = None): self.port = int(connection_details.get("port", 9092)) logger.debug(f"Connection details: {json.dumps(connection_details)}") - self.bootstrap_servers = connection_details.get("bootstrap_servers", ["0.0.0.0"]) + self.bootstrap_servers = connection_details.get("bootstrap_servers") self.group = connection_details.get("group") self.topic = connection_details.get("topic", "integration-tests") self.seek_to_beginning = self.properties.get("seek_to_beginning", False) + self.snapshot = self.properties.get("snapshot", False) async def produce(self) -> AsyncGenerator[List[Message], None]: logger.debug(f"Producing {self.get_block_name()}") + consumer = Consumer({ + 'bootstrap.servers': self.bootstrap_servers, + 'group.id': self.group, + 'enable.auto.commit': 'false' + }) + if self.seek_to_beginning: - logger.debug(f"Seeking to beginning...") - consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers, group_id=self.group) - tp = TopicPartition(self.topic, 0) - consumer.assign([tp]) - consumer.seek_to_beginning() + def on_assign(c, ps): + for p in ps: + p.offset = -2 + c.assign(ps) + consumer.subscribe([self.topic], on_assign) else: - consumer = KafkaConsumer(self.topic, bootstrap_servers=self.bootstrap_servers, group_id=self.group) - - if not consumer.bootstrap_connected(): - raise KafkaError("Unable to connect with kafka container!") - for message in consumer: - counter = iter(count()) - logger.debug(f"Received message: {message}") - yield [{self.MSG_ID_FIELD: f"{next(counter)}", **message}] \ No newline at end of file + consumer.subscribe([self.topic]) + + try: + while True: + # Poll for messages + msg = consumer.poll(1.0) + counter = next(count()) + + if msg is None: + if self.snapshot: + break + continue + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + # End of partition event + logger.info("Reached end of partition") + else: + # Handle other errors + logger.error("Error: {}".format(msg.error())) + + else: + # Process the message + yield [{self.MSG_ID_FIELD: msg.value()}] + # Commit the message offset + consumer.commit(msg) + if counter % self.MIN_COMMIT_COUNT == 0: + consumer.commit(asynchronous=False) + finally: + consumer.close() + + diff --git a/core/src/datayoga_core/blocks/kafka/read/block.schema.json b/core/src/datayoga_core/blocks/kafka/read/block.schema.json index a58e2883..d8364259 100644 --- a/core/src/datayoga_core/blocks/kafka/read/block.schema.json +++ b/core/src/datayoga_core/blocks/kafka/read/block.schema.json @@ -10,6 +10,12 @@ "seek_to_beginning": { "description": "Consumer seek to beginning", "type": "boolean" + }, + "snapshot": { + "type": "boolean", + "title": "Snapshot current entries and quit", + "description": "Snapshot current entries and quit", + "default": false } }, "required": [ diff --git a/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json b/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json index a40e4874..7bb5f08e 100644 --- a/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json +++ b/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json @@ -10,10 +10,7 @@ }, "bootstrap_servers": { "description": "Kafka Hosts", - "type": "array", - "items": { - "type": "string" - } + "type": "string" }, "topic": { "description": "Kafka Topics", diff --git a/integration-tests/common/kafka_utils.py b/integration-tests/common/kafka_utils.py index 907b3ac8..42a789c3 100644 --- a/integration-tests/common/kafka_utils.py +++ b/integration-tests/common/kafka_utils.py @@ -1,9 +1,10 @@ -from typing import List - -from kafka import KafkaProducer +from confluent_kafka import Producer from testcontainers.kafka import KafkaContainer def get_kafka_container() -> KafkaContainer: return KafkaContainer().with_bind_ports(KafkaContainer.KAFKA_PORT, KafkaContainer.KAFKA_PORT) -def get_kafka_producer(bootstrap_servers: str) -> KafkaProducer: - return KafkaProducer(bootstrap_servers=[bootstrap_servers]) +def get_kafka_producer(bootstrap_servers: str) -> Producer: + return Producer({ + "bootstrap.servers": bootstrap_servers, + "group.id": "integration-tests" + }) diff --git a/integration-tests/resources/connections.yaml b/integration-tests/resources/connections.yaml index 00fdbbc3..1b469a27 100644 --- a/integration-tests/resources/connections.yaml +++ b/integration-tests/resources/connections.yaml @@ -46,6 +46,6 @@ cassandra: port: 9042 kafka: type: kafka - bootstrap_servers: ["localhost:9093"] - group: "test" + bootstrap_servers: "localhost:9093" + group: "integration-tests" topic: "integration-tests" diff --git a/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml b/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml index 88d281a8..36f6feb5 100644 --- a/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml +++ b/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml @@ -2,6 +2,7 @@ input: uses: kafka.read with: bootstrap_servers: kafka - seek_to_beginning: Yes + snapshot: true + seek_to_beginning: true steps: - uses: std.write diff --git a/integration-tests/test_kafka_to_stdout.py b/integration-tests/test_kafka_to_stdout.py index ec605e11..552fa4d6 100644 --- a/integration-tests/test_kafka_to_stdout.py +++ b/integration-tests/test_kafka_to_stdout.py @@ -1,19 +1,25 @@ import logging + from common import kafka_utils from common.utils import run_job logger = logging.getLogger("dy") -def test_kafka_to_stdout(): +def test_kafka_to_stdout(tmpdir): kafka_container = kafka_utils.get_kafka_container() with kafka_container as kafka: bootstrap_servers = kafka.get_bootstrap_server() logger.info(f"Connecting to Kafka: {bootstrap_servers}") producer = kafka_utils.get_kafka_producer(bootstrap_servers) - producer.send("integration-tests", b"test") + producer.produce("integration-tests", b'{"hello": "Earth"}') + producer.produce("integration-tests", b'{"bye": "Mars"}') producer.flush() - producer.close() - run_job("tests.kafka_to_stdout") + output_file = tmpdir.join("tests_kafka_to_stdout.txt") + run_job("tests.kafka_to_stdout", None, output_file) + logger.error(output_file.read()) + # result = json.loads(output_file.read()) + # assert result[0]["hello"] == "Earth" + # assert result[1]["bye"] == "Mars" From 831a7e6a788d280e8ba7c7620834fc653c5e116b Mon Sep 17 00:00:00 2001 From: wheelly Date: Thu, 8 Feb 2024 11:33:52 +0200 Subject: [PATCH 4/9] #138 - kafka.read test finally works --- .../datayoga_core/blocks/kafka/read/block.py | 42 ++++++++++--------- integration-tests/test_kafka_to_stdout.py | 30 +++++++------ 2 files changed, 39 insertions(+), 33 deletions(-) diff --git a/core/src/datayoga_core/blocks/kafka/read/block.py b/core/src/datayoga_core/blocks/kafka/read/block.py index b31e2251..8711d0b5 100644 --- a/core/src/datayoga_core/blocks/kafka/read/block.py +++ b/core/src/datayoga_core/blocks/kafka/read/block.py @@ -5,10 +5,11 @@ from datayoga_core.context import Context from typing import AsyncGenerator, List, Optional from datayoga_core.producer import Producer as DyProducer, Message -from confluent_kafka import Consumer, KafkaException, KafkaError, TopicPartition, OFFSET_BEGINNING +from confluent_kafka import Consumer, KafkaError from itertools import count from datayoga_core import utils +import orjson logger = logging.getLogger("dy") @@ -34,35 +35,32 @@ def init(self, context: Optional[Context] = None): self.topic = connection_details.get("topic", "integration-tests") self.seek_to_beginning = self.properties.get("seek_to_beginning", False) self.snapshot = self.properties.get("snapshot", False) - - async def produce(self) -> AsyncGenerator[List[Message], None]: - logger.debug(f"Producing {self.get_block_name()}") - - consumer = Consumer({ + self.consumer = Consumer({ 'bootstrap.servers': self.bootstrap_servers, 'group.id': self.group, 'enable.auto.commit': 'false' }) + async def produce(self) -> AsyncGenerator[List[Message], None]: + logger.debug(f"Producing {self.get_block_name()}") + if self.seek_to_beginning: def on_assign(c, ps): for p in ps: p.offset = -2 c.assign(ps) - consumer.subscribe([self.topic], on_assign) + self.consumer.subscribe([self.topic], on_assign) else: - consumer.subscribe([self.topic]) + self.consumer.subscribe([self.topic]) try: while True: # Poll for messages - msg = consumer.poll(1.0) - counter = next(count()) - + msg = self.consumer.poll(3.0 if self.snapshot else None) if msg is None: - if self.snapshot: - break - continue + assert self.snapshot + logger.warning(f"Snapshot defined quitting on topic {self.topic}"'') + break if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event @@ -73,12 +71,16 @@ def on_assign(c, ps): else: # Process the message - yield [{self.MSG_ID_FIELD: msg.value()}] - # Commit the message offset - consumer.commit(msg) - if counter % self.MIN_COMMIT_COUNT == 0: - consumer.commit(asynchronous=False) + message = orjson.loads(msg.value()) + yield [{self.MSG_ID_FIELD: msg.offset(), **message}] + finally: - consumer.close() + self.consumer.close() + + def ack(self, msg_ids: List[str]): + try: + self.consumer.commit(asynchronous=False) + except Exception as e: + logger.error(f"Cannot commit: {e}") diff --git a/integration-tests/test_kafka_to_stdout.py b/integration-tests/test_kafka_to_stdout.py index 552fa4d6..e8e7cc39 100644 --- a/integration-tests/test_kafka_to_stdout.py +++ b/integration-tests/test_kafka_to_stdout.py @@ -1,25 +1,29 @@ import logging +import os from common import kafka_utils from common.utils import run_job logger = logging.getLogger("dy") +message_one = b'{"id":1,"name":"Boris"}' +message_two = b'{"id":2,"name":"Ivan"}' def test_kafka_to_stdout(tmpdir): kafka_container = kafka_utils.get_kafka_container() - with kafka_container as kafka: - bootstrap_servers = kafka.get_bootstrap_server() - logger.info(f"Connecting to Kafka: {bootstrap_servers}") - producer = kafka_utils.get_kafka_producer(bootstrap_servers) - producer.produce("integration-tests", b'{"hello": "Earth"}') - producer.produce("integration-tests", b'{"bye": "Mars"}') - producer.flush() - output_file = tmpdir.join("tests_kafka_to_stdout.txt") - run_job("tests.kafka_to_stdout", None, output_file) - logger.error(output_file.read()) - # result = json.loads(output_file.read()) - # assert result[0]["hello"] == "Earth" - # assert result[1]["bye"] == "Mars" + try: + with kafka_container as kafka: + bootstrap_servers = kafka.get_bootstrap_server() + producer = kafka_utils.get_kafka_producer(bootstrap_servers) + producer.produce("integration-tests", message_one) + producer.produce("integration-tests", message_two) + producer.flush() + output_file = tmpdir.join("tests_kafka_to_stdout.txt") + run_job("tests.kafka_to_stdout", None, output_file) + result = output_file.readlines() + assert result[0].strip().encode() == message_one + assert result[1].strip().encode() == message_two + finally: + os.remove(output_file) From 7b501e190263a343fc9e461180be6eb173297d9f Mon Sep 17 00:00:00 2001 From: wheelly Date: Thu, 8 Feb 2024 11:47:26 +0200 Subject: [PATCH 5/9] #138 - kafka.read concluent-kafka to extras --- core/pyproject.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/pyproject.toml b/core/pyproject.toml index 6e378583..5ed422e1 100644 --- a/core/pyproject.toml +++ b/core/pyproject.toml @@ -50,6 +50,7 @@ pymssql = { version = "^2.2.7", optional = true } PyMySQL = { version = "^1.0.2", optional = true } redis = { version = "^4.3.5", optional = true } SQLAlchemy = { version = "^2.0.4", optional = true } +confluent-kafka = { version = "^2.3.0", optional = true } [tool.poetry.extras] azure = ["azure-eventhub", "azure-eventhub-checkpointstoreblob-aio"] @@ -62,6 +63,7 @@ parquet = ["fastparquet"] pg = ["psycopg2-binary", "SQLAlchemy"] redis = ["redis"] sqlserver = ["pymssql", "SQLAlchemy"] +kafka = ["confluent-kafka"] test = [ "aiohttp", From 69f1235d046abb30b467c941dbe05ba344d2d142 Mon Sep 17 00:00:00 2001 From: wheelly Date: Sun, 11 Feb 2024 10:02:16 +0200 Subject: [PATCH 6/9] #138 - kafka.read merge fixes - topic and group to block --- core/src/datayoga_core/blocks/kafka/read/block.py | 4 ++-- .../blocks/kafka/read/block.schema.json | 11 ++++++++++- .../resources/schemas/connections/kafka.schema.json | 13 ++----------- integration-tests/resources/connections.yaml | 2 -- .../resources/jobs/tests/kafka_to_stdout.yaml | 2 ++ 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/datayoga_core/blocks/kafka/read/block.py b/core/src/datayoga_core/blocks/kafka/read/block.py index 8711d0b5..01504367 100644 --- a/core/src/datayoga_core/blocks/kafka/read/block.py +++ b/core/src/datayoga_core/blocks/kafka/read/block.py @@ -31,8 +31,8 @@ def init(self, context: Optional[Context] = None): self.port = int(connection_details.get("port", 9092)) logger.debug(f"Connection details: {json.dumps(connection_details)}") self.bootstrap_servers = connection_details.get("bootstrap_servers") - self.group = connection_details.get("group") - self.topic = connection_details.get("topic", "integration-tests") + self.group = self.properties.get("group") + self.topic = self.properties["topic"] self.seek_to_beginning = self.properties.get("seek_to_beginning", False) self.snapshot = self.properties.get("snapshot", False) self.consumer = Consumer({ diff --git a/core/src/datayoga_core/blocks/kafka/read/block.schema.json b/core/src/datayoga_core/blocks/kafka/read/block.schema.json index d8364259..b9a2a59c 100644 --- a/core/src/datayoga_core/blocks/kafka/read/block.schema.json +++ b/core/src/datayoga_core/blocks/kafka/read/block.schema.json @@ -7,6 +7,14 @@ "description": "host name", "type": "string" }, + "topic": { + "description": "Kafka topic", + "type": "string" + }, + "group": { + "description": "Kafka group", + "type": "string" + }, "seek_to_beginning": { "description": "Consumer seek to beginning", "type": "boolean" @@ -19,6 +27,7 @@ } }, "required": [ - "bootstrap_servers" + "bootstrap_servers", + "topic" ] } diff --git a/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json b/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json index 7bb5f08e..f56c5122 100644 --- a/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json +++ b/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json @@ -11,24 +11,15 @@ "bootstrap_servers": { "description": "Kafka Hosts", "type": "string" - }, - "topic": { - "description": "Kafka Topics", - "type": "string" - }, - "group": { - "description": "Kafka Group", - "type": "string" } }, "additionalProperties": false, - "required": ["type", "bootstrap_servers", "topic"], + "required": ["type", "bootstrap_servers"], "examples": [ { "kafka": { "type": "kafka", - "bootstrap_servers": ["localhost"], - "topic": "test" + "bootstrap_servers": ["localhost:9092"] } } ] diff --git a/integration-tests/resources/connections.yaml b/integration-tests/resources/connections.yaml index 1b469a27..e35e48e0 100644 --- a/integration-tests/resources/connections.yaml +++ b/integration-tests/resources/connections.yaml @@ -47,5 +47,3 @@ cassandra: kafka: type: kafka bootstrap_servers: "localhost:9093" - group: "integration-tests" - topic: "integration-tests" diff --git a/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml b/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml index 36f6feb5..b49a688d 100644 --- a/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml +++ b/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml @@ -2,6 +2,8 @@ input: uses: kafka.read with: bootstrap_servers: kafka + topic: "integration-tests" + group: "integration-tests" snapshot: true seek_to_beginning: true steps: From 0e2ee03e0f64d6f55824e7f05c70cada55f5b283 Mon Sep 17 00:00:00 2001 From: wheelly Date: Tue, 13 Feb 2024 14:29:17 +0200 Subject: [PATCH 7/9] #138 - kafka.read watermark commit --- .../datayoga_core/blocks/kafka/read/block.py | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/core/src/datayoga_core/blocks/kafka/read/block.py b/core/src/datayoga_core/blocks/kafka/read/block.py index 01504367..b4e4b1b3 100644 --- a/core/src/datayoga_core/blocks/kafka/read/block.py +++ b/core/src/datayoga_core/blocks/kafka/read/block.py @@ -27,21 +27,20 @@ class Block(DyProducer, metaclass=ABCMeta): def init(self, context: Optional[Context] = None): logger.debug(f"Initializing {self.get_block_name()}") connection_details = utils.get_connection_details(self.properties["bootstrap_servers"], context) - - self.port = int(connection_details.get("port", 9092)) logger.debug(f"Connection details: {json.dumps(connection_details)}") self.bootstrap_servers = connection_details.get("bootstrap_servers") self.group = self.properties.get("group") self.topic = self.properties["topic"] self.seek_to_beginning = self.properties.get("seek_to_beginning", False) self.snapshot = self.properties.get("snapshot", False) - self.consumer = Consumer({ + + + async def produce(self) -> AsyncGenerator[List[Message], None]: + consumer = Consumer({ 'bootstrap.servers': self.bootstrap_servers, 'group.id': self.group, 'enable.auto.commit': 'false' }) - - async def produce(self) -> AsyncGenerator[List[Message], None]: logger.debug(f"Producing {self.get_block_name()}") if self.seek_to_beginning: @@ -49,14 +48,15 @@ def on_assign(c, ps): for p in ps: p.offset = -2 c.assign(ps) - self.consumer.subscribe([self.topic], on_assign) + consumer.subscribe([self.topic], on_assign) else: - self.consumer.subscribe([self.topic]) + consumer.subscribe([self.topic]) try: while True: # Poll for messages - msg = self.consumer.poll(3.0 if self.snapshot else None) + msg = consumer.poll(3.0 if self.snapshot else None) + counter = next(count()) if msg is None: assert self.snapshot logger.warning(f"Snapshot defined quitting on topic {self.topic}"'') @@ -73,14 +73,12 @@ def on_assign(c, ps): # Process the message message = orjson.loads(msg.value()) yield [{self.MSG_ID_FIELD: msg.offset(), **message}] + if counter % self.MIN_COMMIT_COUNT == 0: + consumer.commit(asynchronous=False) finally: - self.consumer.close() + consumer.close() + - def ack(self, msg_ids: List[str]): - try: - self.consumer.commit(asynchronous=False) - except Exception as e: - logger.error(f"Cannot commit: {e}") From c62dbf64bd8e3159370581fcc1443d51ea68b678 Mon Sep 17 00:00:00 2001 From: wheelly Date: Wed, 14 Feb 2024 11:56:38 +0200 Subject: [PATCH 8/9] #353 - kafka.auth - elaborations --- .../datayoga_core/blocks/kafka/read/block.py | 31 ++++++++++---- .../schemas/connections/kafka.schema.json | 13 ++++++ integration-tests/resources/connections.yaml | 7 +++- .../resources/kafka.docker/basic/dockerfile | 0 .../kafka.docker/basic/dockerfile.bak | 6 +++ .../resources/kafka.docker/basic/rest.env | 23 ++++++++++ .../kafka.docker/sasl/docker-compose.yml | 33 +++++++++++++++ .../resources/kafka.docker/sasl/dockerfile | 42 +++++++++++++++++++ .../resources/kafka.docker/sasl/entry.sh | 10 +++++ .../kafka.docker/sasl/kafka_server_jaas.conf | 8 ++++ 10 files changed, 163 insertions(+), 10 deletions(-) create mode 100644 integration-tests/resources/kafka.docker/basic/dockerfile create mode 100644 integration-tests/resources/kafka.docker/basic/dockerfile.bak create mode 100644 integration-tests/resources/kafka.docker/basic/rest.env create mode 100644 integration-tests/resources/kafka.docker/sasl/docker-compose.yml create mode 100644 integration-tests/resources/kafka.docker/sasl/dockerfile create mode 100755 integration-tests/resources/kafka.docker/sasl/entry.sh create mode 100644 integration-tests/resources/kafka.docker/sasl/kafka_server_jaas.conf diff --git a/core/src/datayoga_core/blocks/kafka/read/block.py b/core/src/datayoga_core/blocks/kafka/read/block.py index b4e4b1b3..1408a735 100644 --- a/core/src/datayoga_core/blocks/kafka/read/block.py +++ b/core/src/datayoga_core/blocks/kafka/read/block.py @@ -1,6 +1,6 @@ import logging -import json from abc import ABCMeta +from typing import Dict from datayoga_core.context import Context from typing import AsyncGenerator, List, Optional @@ -23,12 +23,13 @@ class Block(DyProducer, metaclass=ABCMeta): INTERNAL_FIELD_PREFIX = "__$$" MSG_ID_FIELD = f"{INTERNAL_FIELD_PREFIX}msg_id" MIN_COMMIT_COUNT = 10 + connection_details: Dict def init(self, context: Optional[Context] = None): logger.debug(f"Initializing {self.get_block_name()}") - connection_details = utils.get_connection_details(self.properties["bootstrap_servers"], context) - logger.debug(f"Connection details: {json.dumps(connection_details)}") - self.bootstrap_servers = connection_details.get("bootstrap_servers") + self.connection_details = utils.get_connection_details(self.properties["bootstrap_servers"], context) + + self.bootstrap_servers = self.connection_details.get("bootstrap_servers") self.group = self.properties.get("group") self.topic = self.properties["topic"] self.seek_to_beginning = self.properties.get("seek_to_beginning", False) @@ -36,11 +37,7 @@ def init(self, context: Optional[Context] = None): async def produce(self) -> AsyncGenerator[List[Message], None]: - consumer = Consumer({ - 'bootstrap.servers': self.bootstrap_servers, - 'group.id': self.group, - 'enable.auto.commit': 'false' - }) + consumer = Consumer(self._get_config()) logger.debug(f"Producing {self.get_block_name()}") if self.seek_to_beginning: @@ -79,6 +76,22 @@ def on_assign(c, ps): finally: consumer.close() + def _get_config(self): + conf = { + 'bootstrap.servers': self.bootstrap_servers, + 'group.id': self.group, + 'enable.auto.commit': 'false' + } + proto = self.connection_details.get("security.protocol") + if proto: + conf['security.protocol'] = proto + # TODO: how to distinguish here different auth protocols in kafka? + if proto.startswith("SASL_"): + conf['sasl.mechanisms'] = self.connection_details["sasl.mechanisms"] + conf['sasl.username'] = self.connection_details["sasl.username"] + conf['sasl.password'] = self.connection_details["sasl.password"] + return conf + diff --git a/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json b/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json index f56c5122..a500f1b5 100644 --- a/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json +++ b/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json @@ -11,6 +11,19 @@ "bootstrap_servers": { "description": "Kafka Hosts", "type": "string" + }, + "security.protocol":{ + "description": "Auth protocols", + "type": "string" + }, + "sasl.mechanisms": { + "type": "string" + }, + "sasl.username": { + "type": "string" + }, + "sasl.password": { + "type": "string" } }, "additionalProperties": false, diff --git a/integration-tests/resources/connections.yaml b/integration-tests/resources/connections.yaml index e35e48e0..b9d4f00a 100644 --- a/integration-tests/resources/connections.yaml +++ b/integration-tests/resources/connections.yaml @@ -46,4 +46,9 @@ cassandra: port: 9042 kafka: type: kafka - bootstrap_servers: "localhost:9093" + bootstrap_servers: "localhost:9092" + security.protocol: SASL_PLAINTEXT + sasl.mechanisms: PLAIN + sasl.username: admin + sasl.password: admin-secret + diff --git a/integration-tests/resources/kafka.docker/basic/dockerfile b/integration-tests/resources/kafka.docker/basic/dockerfile new file mode 100644 index 00000000..e69de29b diff --git a/integration-tests/resources/kafka.docker/basic/dockerfile.bak b/integration-tests/resources/kafka.docker/basic/dockerfile.bak new file mode 100644 index 00000000..3bf87cc4 --- /dev/null +++ b/integration-tests/resources/kafka.docker/basic/dockerfile.bak @@ -0,0 +1,6 @@ +FROM confluentinc/cp-kafka-rest:latest + +COPY password.properties /etc/kafka-rest/ +COPY rest-jaas.properties /etc/kafka-rest/ + +CMD ["/etc/confluent/docker/run"] \ No newline at end of file diff --git a/integration-tests/resources/kafka.docker/basic/rest.env b/integration-tests/resources/kafka.docker/basic/rest.env new file mode 100644 index 00000000..ebfd54a8 --- /dev/null +++ b/integration-tests/resources/kafka.docker/basic/rest.env @@ -0,0 +1,23 @@ +KAFKA_REST_HOST_NAME=rest-proxy +KAFKA_REST_LISTENERS=http://0.0.0.0:8082 +KAFKA_REST_SCHEMA_REGISTRY_URL= +KAFKA_REST_CLIENT_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO= +KAFKA_REST_BOOTSTRAP_SERVERS= +KAFKA_REST_SECURITY_PROTOCOL=SASL_SSL +KAFKA_REST_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret'; +KAFKA_REST_SASL_MECHANISM=PLAIN +KAFKA_REST_CLIENT_BOOTSTRAP_SERVERS= +KAFKA_REST_CLIENT_SECURITY_PROTOCOL=SASL_SSL +KAFKA_REST_CLIENT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret'; + +KAFKA_REST_CLIENT_SASL_MECHANISM=PLAIN +KAFKA_REST_AUTHENTICATION_METHOD=BASIC +KAFKA_REST_AUTHENTICATION_REALM=KafkaRest +KAFKA_REST_AUTHENTICATION_ROLES=thisismyrole +KAFKAREST_OPTS=-Djava.security.auth.login.config=/etc/kafka-rest/rest-jaas.properties + +docker run -p 9093:8081 -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \ +-e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS="1.kafka.broker:9092,2.kafka.broker:9092,3.kafka.broker:9092" \ +-e SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL=SASL_SSL \ +-e SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM=PLAIN \ +-e SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG='org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="pass";' confluentinc/cp-schema-registry:5.5.3 \ No newline at end of file diff --git a/integration-tests/resources/kafka.docker/sasl/docker-compose.yml b/integration-tests/resources/kafka.docker/sasl/docker-compose.yml new file mode 100644 index 00000000..fc16759d --- /dev/null +++ b/integration-tests/resources/kafka.docker/sasl/docker-compose.yml @@ -0,0 +1,33 @@ +version: '3' +services: + zookeeper: + image: confluentinc/cp-zookeeper:latest + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://kafka:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN + KAFKA_SASL_ENABLED_MECHANISMS: PLAIN + KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf" + +# KAFKA_SASL_JAAS_CONFIG: | +# org.apache.kafka.common.security.plain.PlainLoginModule required \ +# username="admin" \ +# password="admin-secret" \ +# user_admin="admin-secret" \ +# user_alice="alice-secret"; + + volumes: + - ./kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf + ports: + - 9093:9092 + #command: ["sleep","3600"] \ No newline at end of file diff --git a/integration-tests/resources/kafka.docker/sasl/dockerfile b/integration-tests/resources/kafka.docker/sasl/dockerfile new file mode 100644 index 00000000..517465fe --- /dev/null +++ b/integration-tests/resources/kafka.docker/sasl/dockerfile @@ -0,0 +1,42 @@ +# FROM confluentinc/cp-kafka:5.4.3 +# ENV KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf" +# ENV KAFKA_INTER_BROKER_LISTENER_NAME=SASL_PLAINTEXT +# ENV KAFKA_SASL_ENABLED_MECHANISMS=PLAIN +# ENV KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN +# ENV KAFKA_LISTENERS="SASL_PLAINTEXT://:9093,BROKER://:9092" +# ENV KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:SASL_PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT +# ENV KAFKA_INTER_BROKER_LISTENER_NAME=BROKER +# ENV KAFKA_BROKER_ID=1 +# ENV KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 +# ENV KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1 +# ENV KAFKA_LOG_FLUSH_INTERVAL_MESSAGES=10000000 +# ENV KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 +# COPY kafka_server_jaas.conf /etc/kafka +# COPY entry.sh / +# ENTRYPOINT ["/entry.sh", "SASL_PLAINTEXT://:9093,BROKER://:9092"] + + +FROM confluentinc/cp-kafka:latest +ENV KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf" +#ENV KAFKA_ADVERTISED_LISTENERS=SASL_PLAINTEXT://:9093,BROKER://:9092 +ENV KAFKA_LISTENERS=SASL_PLAINTEXT://:9093,BROKER://:9092 +ENV KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SASL_PLAINTEXT:SASL_PLAINTEXT,BROKER:PLAINTEXT +ENV KAFKA_INTER_BROKER_LISTENER_NAME=BROKER +ENV KAFKA_SECURITY_INTER_BROKER_PROTOCOL=PLAINTEXT +ENV KAFKA_SASL_ENABLED_MECHANISMS=PLAIN +ENV KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN + +ENV KAFKA_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';" +ENV KAFKA_NUM_PARTITIONS=1 +ENV KAFKA_DEFAULT_REPLICATION_FACTOR=1 +ENV KAFKA_BROKER_ID=1 +COPY kafka_server_jaas.conf /etc/kafka +COPY entry.sh / +ENTRYPOINT ["/entry.sh", "SASL_PLAINTEXT://:9093,BROKER://:9092"] + +#FROM confluentinc/cp-schema-registry:5.5.3 +#ENV SCHEMA_REGISTRY_HOST_NAME=schema-registry +#ENV SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS="localhost:9092;:9091" +#ENV SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL=SASL_SSL +#ENV SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM=PLAIN +#ENV SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG='org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";' diff --git a/integration-tests/resources/kafka.docker/sasl/entry.sh b/integration-tests/resources/kafka.docker/sasl/entry.sh new file mode 100755 index 00000000..a10ffbe6 --- /dev/null +++ b/integration-tests/resources/kafka.docker/sasl/entry.sh @@ -0,0 +1,10 @@ +#!/bin/bash +echo 'clientPort=2181' > zookeeper.properties +echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties +echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties +zookeeper-server-start zookeeper.properties & +export KAFKA_ZOOKEEPER_CONNECT="localhost:2181" +export KAFKA_ADVERTISED_LISTENERS="$1" +. /etc/confluent/docker/bash-config +/etc/confluent/docker/configure +/etc/confluent/docker/launch \ No newline at end of file diff --git a/integration-tests/resources/kafka.docker/sasl/kafka_server_jaas.conf b/integration-tests/resources/kafka.docker/sasl/kafka_server_jaas.conf new file mode 100644 index 00000000..98ba2b11 --- /dev/null +++ b/integration-tests/resources/kafka.docker/sasl/kafka_server_jaas.conf @@ -0,0 +1,8 @@ +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="admin" + password="admin-secret" + user_admin="test" + user_alice="test"; +}; +Client {}; \ No newline at end of file From e0e7446100e746101dc8192b79340fc55dac3bfe Mon Sep 17 00:00:00 2001 From: github-actions Date: Wed, 14 Feb 2024 10:05:33 +0000 Subject: [PATCH 9/9] update autogenerated docs --- docs/reference/blocks/kafka_read.md | 28 ++++++++++++++++++++ docs/reference/connection_types/kafka.md | 33 ++++++++++++++++++++++++ docs/reference/connections.md | 2 +- 3 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 docs/reference/blocks/kafka_read.md create mode 100644 docs/reference/connection_types/kafka.md diff --git a/docs/reference/blocks/kafka_read.md b/docs/reference/blocks/kafka_read.md new file mode 100644 index 00000000..f6dab825 --- /dev/null +++ b/docs/reference/blocks/kafka_read.md @@ -0,0 +1,28 @@ +--- +parent: Blocks +grand_parent: Reference +--- + +# kafka\.read + +Read from the kafka consumer + + +**Properties** + +|Name|Type|Description|Required| +|----|----|-----------|--------| +|**bootstrap\_servers**|`string`|host name
|yes| +|**topic**|`string`|Kafka topic
|yes| +|**group**|`string`|Kafka group
|no| +|**seek\_to\_beginning**|`boolean`|Consumer seek to beginning
|no| +|**snapshot**
(Snapshot current entries and quit)|`boolean`|Snapshot current entries and quit
Default: `false`
|no| + +**Example** + +```yaml +snapshot: false + +``` + + diff --git a/docs/reference/connection_types/kafka.md b/docs/reference/connection_types/kafka.md new file mode 100644 index 00000000..3bf2bfd7 --- /dev/null +++ b/docs/reference/connection_types/kafka.md @@ -0,0 +1,33 @@ +--- +parent: Connection Types +grand_parent: Reference +--- + +# Kafka + +Schema for configuring Kafka connection parameters + + +**Properties** + +|Name|Type|Description|Required| +|----|----|-----------|--------| +|**type**|`string`|Connection type
Constant Value: `"kafka"`
|yes| +|**bootstrap\_servers**|`string`|Kafka Hosts
|yes| +|**security\.protocol**|`string`|Auth protocols
|no| +|**sasl\.mechanisms**|`string`||no| +|**sasl\.username**|`string`||no| +|**sasl\.password**|`string`||no| + +**Additional Properties:** not allowed +**Example** + +```yaml +kafka: + type: kafka + bootstrap_servers: + - localhost:9092 + +``` + + diff --git a/docs/reference/connections.md b/docs/reference/connections.md index 44021972..a8a153e1 100644 --- a/docs/reference/connections.md +++ b/docs/reference/connections.md @@ -21,7 +21,7 @@ Connection catalog |Name|Type|Description|Required| |----|----|-----------|--------| -|**type**|`string`|Connection type
Enum: `"cassandra"`, `"db2"`, `"http"`, `"mysql"`, `"postgresql"`, `"redis"`, `"sqlserver"`
|yes| +|**type**|`string`|Connection type
Enum: `"cassandra"`, `"db2"`, `"http"`, `"mysql"`, `"postgresql"`, `"redis"`, `"sqlserver"`, `"kafka"`
|yes| |**if**|||no| |**then**|||no|