diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 9a7790eac..9f76f7f3d 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -25,20 +25,21 @@ from test.conftest import version from test.fixtures import ZookeeperFixture, KafkaFixture, random_string -from test.testutil import ( - KafkaIntegrationTestCase, kafka_versions, Timer, - send_messages -) +from test.testutil import KafkaIntegrationTestCase, kafka_versions, Timer @pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") -def test_kafka_consumer(simple_client, topic, kafka_consumer_factory): - """Test KafkaConsumer - """ +def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory): + """Test KafkaConsumer""" kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest') - send_messages(simple_client, topic, 0, range(0, 100)) - send_messages(simple_client, topic, 1, range(100, 200)) + # TODO replace this with a `send_messages()` pytest fixture + # as we will likely need this elsewhere + for i in range(0, 100): + kafka_producer.send(topic, partition=0, value=str(i).encode()) + for i in range(100, 200): + kafka_producer.send(topic, partition=1, value=str(i).encode()) + kafka_producer.flush() cnt = 0 messages = {0: set(), 1: set()} diff --git a/test/testutil.py b/test/testutil.py index feb6f6d5f..6f6cafb5e 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -3,20 +3,19 @@ import functools import operator import os -import socket import time import uuid import pytest from . import unittest -from kafka import SimpleClient, create_message +from kafka import SimpleClient from kafka.errors import ( LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError, NotLeaderForPartitionError, UnknownTopicOrPartitionError, FailedPayloadsError ) -from kafka.structs import OffsetRequestPayload, ProduceRequestPayload +from kafka.structs import OffsetRequestPayload from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order @@ -67,26 +66,6 @@ def wrapper(func, *args, **kwargs): return real_kafka_versions -_MESSAGES = {} -def msg(message): - """Format, encode and deduplicate a message - """ - global _MESSAGES #pylint: disable=global-statement - if message not in _MESSAGES: - _MESSAGES[message] = '%s-%s' % (message, str(uuid.uuid4())) - - return _MESSAGES[message].encode('utf-8') - -def send_messages(client, topic, partition, messages): - """Send messages to a topic's partition - """ - messages = [create_message(msg(str(m))) for m in messages] - produce = ProduceRequestPayload(topic, partition, messages=messages) - resp, = client.send_produce_request([produce]) - assert resp.error == 0 - - return [x.value for x in messages] - def current_offset(client, topic, partition, kafka_broker=None): """Get the current offset of a topic's partition """ @@ -101,6 +80,7 @@ def current_offset(client, topic, partition, kafka_broker=None): else: return offsets.offsets[0] + class KafkaIntegrationTestCase(unittest.TestCase): create_client = True topic = None