Skip to content

Commit

Permalink
#138 - kafka.read - kafka to redis
Browse files Browse the repository at this point in the history
  • Loading branch information
wheelly committed Feb 14, 2024
1 parent 0e2ee03 commit 814648d
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 0 deletions.
16 changes: 16 additions & 0 deletions integration-tests/resources/jobs/tests/kafka_to_redis.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
input:
uses: kafka.read
with:
bootstrap_servers: kafka
topic: "integration-tests"
group: "integration-tests"
snapshot: true
seek_to_beginning: true
steps:
- uses: redis.write
with:
connection: cache
command: HSET
key:
expression: id
language: jmespath
41 changes: 41 additions & 0 deletions integration-tests/test_kafka_to_redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import json
import logging

from common import kafka_utils, redis_utils
from common.utils import run_job

logger = logging.getLogger("dy")
message_one = b'{"id":"1","name":"Boris"}'
message_two = b'{"id":"2","name":"Ivan"}'

def test_kafka_to_redis():
kafka_container = kafka_utils.get_kafka_container()
try:
with kafka_container as kafka:
redis_container = redis_utils.get_redis_oss_container(redis_utils.REDIS_PORT)
redis_container.start()

bootstrap_servers = kafka.get_bootstrap_server()
producer = kafka_utils.get_kafka_producer(bootstrap_servers)
producer.produce("integration-tests", message_one)
producer.produce("integration-tests", message_two)
producer.flush()
run_job("tests.kafka_to_redis")

redis_client = redis_utils.get_redis_client("localhost", redis_utils.REDIS_PORT)

assert len(redis_client.keys()) == 2

boris = redis_client.hgetall("1")
ivan = redis_client.hgetall("2")

assert boris == json.loads(message_one.decode())
assert ivan == json.loads(message_two.decode())
finally:
redis_container.stop()






0 comments on commit 814648d

Please sign in to comment.