Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

353 kafkaauth sasl plaintext basic auth etc #354

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pymssql = { version = "^2.2.7", optional = true }
PyMySQL = { version = "^1.0.2", optional = true }
redis = { version = "^4.3.5", optional = true }
SQLAlchemy = { version = "^2.0.4", optional = true }
confluent-kafka = { version = "^2.3.0", optional = true }

[tool.poetry.extras]
azure = ["azure-eventhub", "azure-eventhub-checkpointstoreblob-aio"]
Expand All @@ -62,6 +63,7 @@ parquet = ["fastparquet"]
pg = ["psycopg2-binary", "SQLAlchemy"]
redis = ["redis"]
sqlserver = ["pymssql", "SQLAlchemy"]
kafka = ["confluent-kafka"]

test = [
"aiohttp",
Expand Down
Empty file.
97 changes: 97 additions & 0 deletions core/src/datayoga_core/blocks/kafka/read/block.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import logging
from abc import ABCMeta
from typing import Dict

from datayoga_core.context import Context
from typing import AsyncGenerator, List, Optional
from datayoga_core.producer import Producer as DyProducer, Message
from confluent_kafka import Consumer, KafkaError

from itertools import count
from datayoga_core import utils
import orjson

logger = logging.getLogger("dy")


class Block(DyProducer, metaclass=ABCMeta):
bootstrap_servers: str
group: str
topic: str
seek_to_beginning: bool
snapshot: bool
INTERNAL_FIELD_PREFIX = "__$$"
MSG_ID_FIELD = f"{INTERNAL_FIELD_PREFIX}msg_id"
MIN_COMMIT_COUNT = 10
connection_details: Dict

def init(self, context: Optional[Context] = None):
logger.debug(f"Initializing {self.get_block_name()}")
self.connection_details = utils.get_connection_details(self.properties["bootstrap_servers"], context)

self.bootstrap_servers = self.connection_details.get("bootstrap_servers")
self.group = self.properties.get("group")
self.topic = self.properties["topic"]
self.seek_to_beginning = self.properties.get("seek_to_beginning", False)
self.snapshot = self.properties.get("snapshot", False)


async def produce(self) -> AsyncGenerator[List[Message], None]:
consumer = Consumer(self._get_config())
logger.debug(f"Producing {self.get_block_name()}")

if self.seek_to_beginning:
def on_assign(c, ps):
for p in ps:
p.offset = -2
c.assign(ps)
consumer.subscribe([self.topic], on_assign)
else:
consumer.subscribe([self.topic])

try:
while True:
# Poll for messages
msg = consumer.poll(3.0 if self.snapshot else None)
counter = next(count())
if msg is None:
assert self.snapshot
logger.warning(f"Snapshot defined quitting on topic {self.topic}"'')
break
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
logger.info("Reached end of partition")
else:
# Handle other errors
logger.error("Error: {}".format(msg.error()))

else:
# Process the message
message = orjson.loads(msg.value())
yield [{self.MSG_ID_FIELD: msg.offset(), **message}]
if counter % self.MIN_COMMIT_COUNT == 0:
consumer.commit(asynchronous=False)

finally:
consumer.close()

def _get_config(self):
conf = {
'bootstrap.servers': self.bootstrap_servers,
'group.id': self.group,
'enable.auto.commit': 'false'
}
proto = self.connection_details.get("security.protocol")
if proto:
conf['security.protocol'] = proto
# TODO: how to distinguish here different auth protocols in kafka?
if proto.startswith("SASL_"):
conf['sasl.mechanisms'] = self.connection_details["sasl.mechanisms"]
conf['sasl.username'] = self.connection_details["sasl.username"]
conf['sasl.password'] = self.connection_details["sasl.password"]
return conf




33 changes: 33 additions & 0 deletions core/src/datayoga_core/blocks/kafka/read/block.schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"title": "kafka.read",
"description": "Read from the kafka consumer",
"type": "object",
"properties": {
"bootstrap_servers": {
"description": "host name",
"type": "string"
},
"topic": {
"description": "Kafka topic",
"type": "string"
},
"group": {
"description": "Kafka group",
"type": "string"
},
"seek_to_beginning": {
"description": "Consumer seek to beginning",
"type": "boolean"
},
"snapshot": {
"type": "boolean",
"title": "Snapshot current entries and quit",
"description": "Snapshot current entries and quit",
"default": false
}
},
"required": [
"bootstrap_servers",
"topic"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"mysql",
"postgresql",
"redis",
"sqlserver"
"sqlserver",
"kafka"
]
},
"if": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"title": "Kafka",
"description": "Schema for configuring Kafka connection parameters",
"type": "object",
"properties": {
"type": {
"description": "Connection type",
"type": "string",
"const": "kafka"
},
"bootstrap_servers": {
"description": "Kafka Hosts",
"type": "string"
},
"security.protocol":{
"description": "Auth protocols",
"type": "string"
},
"sasl.mechanisms": {
"type": "string"
},
"sasl.username": {
"type": "string"
},
"sasl.password": {
"type": "string"
}
},
"additionalProperties": false,
"required": ["type", "bootstrap_servers"],
"examples": [
{
"kafka": {
"type": "kafka",
"bootstrap_servers": ["localhost:9092"]
}
}
]
}
28 changes: 28 additions & 0 deletions docs/reference/blocks/kafka_read.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
parent: Blocks
grand_parent: Reference
---

# kafka\.read

Read from the kafka consumer


**Properties**

|Name|Type|Description|Required|
|----|----|-----------|--------|
|**bootstrap\_servers**|`string`|host name<br/>|yes|
|**topic**|`string`|Kafka topic<br/>|yes|
|**group**|`string`|Kafka group<br/>|no|
|**seek\_to\_beginning**|`boolean`|Consumer seek to beginning<br/>|no|
|**snapshot**<br/>(Snapshot current entries and quit)|`boolean`|Snapshot current entries and quit<br/>Default: `false`<br/>|no|

**Example**

```yaml
snapshot: false

```


33 changes: 33 additions & 0 deletions docs/reference/connection_types/kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
---
parent: Connection Types
grand_parent: Reference
---

# Kafka

Schema for configuring Kafka connection parameters


**Properties**

|Name|Type|Description|Required|
|----|----|-----------|--------|
|**type**|`string`|Connection type<br/>Constant Value: `"kafka"`<br/>|yes|
|**bootstrap\_servers**|`string`|Kafka Hosts<br/>|yes|
|**security\.protocol**|`string`|Auth protocols<br/>|no|
|**sasl\.mechanisms**|`string`||no|
|**sasl\.username**|`string`||no|
|**sasl\.password**|`string`||no|

**Additional Properties:** not allowed
**Example**

```yaml
kafka:
type: kafka
bootstrap_servers:
- localhost:9092

```


2 changes: 1 addition & 1 deletion docs/reference/connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Connection catalog

|Name|Type|Description|Required|
|----|----|-----------|--------|
|**type**|`string`|Connection type<br/>Enum: `"cassandra"`, `"db2"`, `"http"`, `"mysql"`, `"postgresql"`, `"redis"`, `"sqlserver"`<br/>|yes|
|**type**|`string`|Connection type<br/>Enum: `"cassandra"`, `"db2"`, `"http"`, `"mysql"`, `"postgresql"`, `"redis"`, `"sqlserver"`, `"kafka"`<br/>|yes|
|**if**|||no|
|**then**|||no|

Expand Down
10 changes: 10 additions & 0 deletions integration-tests/common/kafka_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from confluent_kafka import Producer
from testcontainers.kafka import KafkaContainer
def get_kafka_container() -> KafkaContainer:
return KafkaContainer().with_bind_ports(KafkaContainer.KAFKA_PORT, KafkaContainer.KAFKA_PORT)

def get_kafka_producer(bootstrap_servers: str) -> Producer:
return Producer({
"bootstrap.servers": bootstrap_servers,
"group.id": "integration-tests"
})
8 changes: 8 additions & 0 deletions integration-tests/resources/connections.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,11 @@ cassandra:
type: cassandra
hosts: ["localhost"]
port: 9042
kafka:
type: kafka
bootstrap_servers: "localhost:9092"
security.protocol: SASL_PLAINTEXT
sasl.mechanisms: PLAIN
sasl.username: admin
sasl.password: admin-secret

10 changes: 10 additions & 0 deletions integration-tests/resources/jobs/tests/kafka_to_stdout.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
input:
uses: kafka.read
with:
bootstrap_servers: kafka
topic: "integration-tests"
group: "integration-tests"
snapshot: true
seek_to_beginning: true
steps:
- uses: std.write
Empty file.
6 changes: 6 additions & 0 deletions integration-tests/resources/kafka.docker/basic/dockerfile.bak
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM confluentinc/cp-kafka-rest:latest

COPY password.properties /etc/kafka-rest/
COPY rest-jaas.properties /etc/kafka-rest/

CMD ["/etc/confluent/docker/run"]
23 changes: 23 additions & 0 deletions integration-tests/resources/kafka.docker/basic/rest.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
KAFKA_REST_HOST_NAME=rest-proxy
KAFKA_REST_LISTENERS=http://0.0.0.0:8082
KAFKA_REST_SCHEMA_REGISTRY_URL=<HIDDEN>
KAFKA_REST_CLIENT_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=<HIDDEN>
KAFKA_REST_BOOTSTRAP_SERVERS=<HIDDEN>
KAFKA_REST_SECURITY_PROTOCOL=SASL_SSL
KAFKA_REST_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';
KAFKA_REST_SASL_MECHANISM=PLAIN
KAFKA_REST_CLIENT_BOOTSTRAP_SERVERS=<HIDDEN>
KAFKA_REST_CLIENT_SECURITY_PROTOCOL=SASL_SSL
KAFKA_REST_CLIENT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';

KAFKA_REST_CLIENT_SASL_MECHANISM=PLAIN
KAFKA_REST_AUTHENTICATION_METHOD=BASIC
KAFKA_REST_AUTHENTICATION_REALM=KafkaRest
KAFKA_REST_AUTHENTICATION_ROLES=thisismyrole
KAFKAREST_OPTS=-Djava.security.auth.login.config=/etc/kafka-rest/rest-jaas.properties

docker run -p 9093:8081 -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS="1.kafka.broker:9092,2.kafka.broker:9092,3.kafka.broker:9092" \
-e SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL=SASL_SSL \
-e SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM=PLAIN \
-e SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG='org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="pass";' confluentinc/cp-schema-registry:5.5.3
33 changes: 33 additions & 0 deletions integration-tests/resources/kafka.docker/sasl/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"

# KAFKA_SASL_JAAS_CONFIG: |
# org.apache.kafka.common.security.plain.PlainLoginModule required \
# username="admin" \
# password="admin-secret" \
# user_admin="admin-secret" \
# user_alice="alice-secret";

volumes:
- ./kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
ports:
- 9093:9092
#command: ["sleep","3600"]
Loading