Skip to content

Commit

Permalink
test-this-too
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwidman committed Oct 29, 2018
1 parent 3e332e8 commit 9eb63e1
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 32 deletions.
19 changes: 10 additions & 9 deletions test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@

from test.conftest import version
from test.fixtures import ZookeeperFixture, KafkaFixture, random_string
from test.testutil import (
KafkaIntegrationTestCase, kafka_versions, Timer,
send_messages
)
from test.testutil import KafkaIntegrationTestCase, kafka_versions, Timer


@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
"""Test KafkaConsumer
"""
def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory):
"""Test KafkaConsumer"""
kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest')

send_messages(simple_client, topic, 0, range(0, 100))
send_messages(simple_client, topic, 1, range(100, 200))
# TODO replace this with a `send_messages()` pytest fixture
# as we will likely need this elsewhere
for i in range(0, 100):
kafka_producer.send(topic, partition=0, value=str(i).encode())
for i in range(100, 200):
kafka_producer.send(topic, partition=1, value=str(i).encode())
kafka_producer.flush()

cnt = 0
messages = {0: set(), 1: set()}
Expand Down
26 changes: 3 additions & 23 deletions test/testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@
import functools
import operator
import os
import socket
import time
import uuid

import pytest
from . import unittest

from kafka import SimpleClient, create_message
from kafka import SimpleClient
from kafka.errors import (
LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError,
NotLeaderForPartitionError, UnknownTopicOrPartitionError,
FailedPayloadsError
)
from kafka.structs import OffsetRequestPayload, ProduceRequestPayload
from kafka.structs import OffsetRequestPayload
from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order


Expand Down Expand Up @@ -67,26 +66,6 @@ def wrapper(func, *args, **kwargs):
return real_kafka_versions


_MESSAGES = {}
def msg(message):
"""Format, encode and deduplicate a message
"""
global _MESSAGES #pylint: disable=global-statement
if message not in _MESSAGES:
_MESSAGES[message] = '%s-%s' % (message, str(uuid.uuid4()))

return _MESSAGES[message].encode('utf-8')

def send_messages(client, topic, partition, messages):
"""Send messages to a topic's partition
"""
messages = [create_message(msg(str(m))) for m in messages]
produce = ProduceRequestPayload(topic, partition, messages=messages)
resp, = client.send_produce_request([produce])
assert resp.error == 0

return [x.value for x in messages]

def current_offset(client, topic, partition, kafka_broker=None):
"""Get the current offset of a topic's partition
"""
Expand All @@ -101,6 +80,7 @@ def current_offset(client, topic, partition, kafka_broker=None):
else:
return offsets.offsets[0]


class KafkaIntegrationTestCase(unittest.TestCase):
create_client = True
topic = None
Expand Down

0 comments on commit 9eb63e1

Please sign in to comment.