Skip to content

Commit

Permalink
Close KafkaConsumer instances during tests (#1410)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpkp authored Mar 8, 2018
1 parent a6130d2 commit 4c383da
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 1 deletion.
3 changes: 3 additions & 0 deletions test/test_consumer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def test_consumer(kafka_broker, version):
assert len(consumer._client._conns) > 0
node_id = list(consumer._client._conns.keys())[0]
assert consumer._client._conns[node_id].state is ConnectionStates.CONNECTED
consumer.close()


@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version')
Expand Down Expand Up @@ -153,6 +154,7 @@ def test_paused(kafka_broker, topic):

consumer.unsubscribe()
assert set() == consumer.paused()
consumer.close()


@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version')
Expand Down Expand Up @@ -183,3 +185,4 @@ def test_heartbeat_thread(kafka_broker, topic):
assert consumer._coordinator.heartbeat.last_poll == last_poll
consumer.poll(timeout_ms=100)
assert consumer._coordinator.heartbeat.last_poll > last_poll
consumer.close()
7 changes: 7 additions & 0 deletions test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):

assert len(messages[0]) == 100
assert len(messages[1]) == 100
kafka_consumer.close()


class TestConsumerIntegration(KafkaIntegrationTestCase):
Expand Down Expand Up @@ -558,6 +559,7 @@ def test_kafka_consumer__blocking(self):
messages.add((msg.partition, msg.offset))
self.assertEqual(len(messages), 5)
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
consumer.close()

@kafka_versions('>=0.8.1')
def test_kafka_consumer__offset_commit_resume(self):
Expand Down Expand Up @@ -597,6 +599,7 @@ def test_kafka_consumer__offset_commit_resume(self):
output_msgs2.append(m)
self.assert_message_count(output_msgs2, 20)
self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200)
consumer2.close()

@kafka_versions('>=0.10.1')
def test_kafka_consumer_max_bytes_simple(self):
Expand All @@ -617,6 +620,7 @@ def test_kafka_consumer_max_bytes_simple(self):
self.assertEqual(
seen_partitions, set([
TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)]))
consumer.close()

@kafka_versions('>=0.10.1')
def test_kafka_consumer_max_bytes_one_msg(self):
Expand All @@ -642,6 +646,7 @@ def test_kafka_consumer_max_bytes_one_msg(self):

fetched_msgs = [next(consumer) for i in range(10)]
self.assertEqual(len(fetched_msgs), 10)
consumer.close()

@kafka_versions('>=0.10.1')
def test_kafka_consumer_offsets_for_time(self):
Expand Down Expand Up @@ -695,6 +700,7 @@ def test_kafka_consumer_offsets_for_time(self):
self.assertEqual(offsets, {
tp: late_msg.offset + 1
})
consumer.close()

@kafka_versions('>=0.10.1')
def test_kafka_consumer_offsets_search_many_partitions(self):
Expand Down Expand Up @@ -733,6 +739,7 @@ def test_kafka_consumer_offsets_search_many_partitions(self):
tp0: p0msg.offset + 1,
tp1: p1msg.offset + 1
})
consumer.close()

@kafka_versions('<0.10.1')
def test_kafka_consumer_offsets_for_time_old(self):
Expand Down
2 changes: 1 addition & 1 deletion test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def test_end_to_end(kafka_broker, compression):
futures.append(producer.send(topic, 'msg %d' % i))
ret = [f.get(timeout=30) for f in futures]
assert len(ret) == messages

producer.close()

consumer.subscribe([topic])
Expand All @@ -67,6 +66,7 @@ def test_end_to_end(kafka_broker, compression):
break

assert msgs == set(['msg %d' % i for i in range(messages)])
consumer.close()


@pytest.mark.skipif(platform.python_implementation() != 'CPython',
Expand Down

0 comments on commit 4c383da

Please sign in to comment.