Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing copy() methods to work with gevent library #155

Closed
wants to merge 12 commits into from
5 changes: 2 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,5 @@ install:
- sudo rm -rf /dev/shm && sudo ln -s /run/shm /dev/shm

script:
- tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`
- KAFKA_VERSION=0.8.0 tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`
- KAFKA_VERSION=0.8.1 tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`
- ./travis_run_tests.sh $TRAVIS_PYTHON_VERSION

13 changes: 9 additions & 4 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class KafkaClient(object):
# one passed to SimpleConsumer.get_message(), otherwise you can get a
# socket timeout.
def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
activate=True):
# We need one connection to bootstrap
self.client_id = client_id
self.timeout = timeout
Expand All @@ -38,7 +39,8 @@ def __init__(self, hosts, client_id=CLIENT_ID,
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
self.load_metadata_for_topics() # bootstrap with all metadata
if activate is True:
self.load_metadata_for_topics() # bootstrap with all metadata


##################
Expand Down Expand Up @@ -222,8 +224,11 @@ def copy(self):
Create an inactive copy of the client object
A reinit() has to be done on the copy before it can be used again
"""
c = copy.deepcopy(self)
for k, v in c.conns.items():
c = KafkaClient(hosts=['{0}:{1}'.format(entry[0], entry[1]) for entry in self.hosts],
client_id=self.client_id,
timeout=self.timeout,
activate=False)
for k, v in self.conns.iteritems():
c.conns[k] = v.copy()
return c

Expand Down
8 changes: 4 additions & 4 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ class KafkaConnection(local):
timeout: default 120. The socket timeout for sending and receiving data
in seconds. None means no timeout, so a request can block forever.
"""
def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, activate=True):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
self.timeout = timeout
self._sock = None

self.reinit()
if activate is True:
self.reinit()

def __repr__(self):
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
Expand Down Expand Up @@ -133,8 +134,7 @@ def copy(self):
Create an inactive copy of the connection object
A reinit() has to be done on the copy before it can be used again
"""
c = copy.deepcopy(self)
c._sock = None
c = KafkaConnection(host=self.host, port=self.port, timeout=self.timeout, activate=False)
return c

def close(self):
Expand Down
1 change: 0 additions & 1 deletion servers/0.8.0/kafka-src
Submodule kafka-src deleted from 15bb39
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def run(self):
version="0.9.1",

install_requires=["distribute"],
tests_require=["tox", "mock"],
tests_require=["tox", "mock", "gevent"],
cmdclass={"test": Tox},

packages=["kafka"],
Expand Down
5 changes: 5 additions & 0 deletions test/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import os

if os.environ.get('USE_GEVENT') == '1':
import gevent.monkey
gevent.monkey.patch_all(Event=True)
3 changes: 3 additions & 0 deletions test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def test_simple_consumer_pending(self):

consumer.stop()

@skip_gevent()
@kafka_versions("all")
def test_multi_process_consumer(self):
# Produce 100 messages to partitions 0 and 1
Expand All @@ -124,6 +125,7 @@ def test_multi_process_consumer(self):

consumer.stop()

@skip_gevent()
@kafka_versions("all")
def test_multi_process_consumer_blocking(self):
consumer = self.consumer(consumer = MultiProcessConsumer)
Expand Down Expand Up @@ -152,6 +154,7 @@ def test_multi_process_consumer_blocking(self):

consumer.stop()

@skip_gevent()
@kafka_versions("all")
def test_multi_proc_pending(self):
self.send_messages(0, range(0, 10))
Expand Down
1 change: 1 addition & 0 deletions test/test_failover_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def test_switch_leader(self):

producer.stop()

@skip_gevent()
@kafka_versions("all")
def test_switch_leader_async(self):
key, topic, partition = random_string(5), self.topic, 0
Expand Down
4 changes: 4 additions & 0 deletions test/test_producer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ def test_acks_cluster_commit(self):

producer.stop()

@skip_gevent()
@kafka_versions("all")
def test_batched_simple_producer__triggers_by_message(self):
start_offset0 = self.current_offset(self.topic, 0)
Expand Down Expand Up @@ -296,6 +297,7 @@ def test_batched_simple_producer__triggers_by_message(self):

producer.stop()

@skip_gevent()
@kafka_versions("all")
def test_batched_simple_producer__triggers_by_time(self):
start_offset0 = self.current_offset(self.topic, 0)
Expand Down Expand Up @@ -348,6 +350,7 @@ def test_batched_simple_producer__triggers_by_time(self):

producer.stop()

@skip_gevent()
@kafka_versions("all")
def test_async_simple_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
Expand All @@ -361,6 +364,7 @@ def test_async_simple_producer(self):

producer.stop()

@skip_gevent()
@kafka_versions("all")
def test_async_keyed_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
Expand Down
14 changes: 14 additions & 0 deletions test/testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
'random_string',
'ensure_topic_creation',
'get_open_port',
'skip_gevent',
'kafka_versions',
'KafkaIntegrationTestCase',
'Timer',
Expand All @@ -24,6 +25,19 @@ def random_string(l):
s = "".join(random.choice(string.letters) for i in xrange(l))
return s

def skip_gevent():
def skip_gevent(func):
@functools.wraps(func)
def wrapper(self):
use_gevent = os.environ.get('USE_GEVENT')
if use_gevent is not None and \
use_gevent == '1':
self.skipTest('test not support for gevent')

return func(self)
return wrapper
return skip_gevent

def kafka_versions(*versions):
def kafka_versions(func):
@functools.wraps(func)
Expand Down
4 changes: 4 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
[tox]
envlist = py26, py27, pypy

[testenv]
deps =
unittest2
nose
coverage
mock
python-snappy
cython
gevent
commands =
nosetests --with-coverage --cover-erase --cover-package kafka []
setenv =
PROJECT_ROOT = {toxinidir}

16 changes: 16 additions & 0 deletions travis_run_tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

TOX_ENV=$1
if [ $1 == "2.7" ]; then
$TOX_ENV = "py27"
elif [ $1 == "2.6" ]; then
$TOX_ENV = "py26"
fi;

tox -e $TOX_ENV
KAFKA_VERSION=0.8.0 tox -e $TOX_ENV
KAFKA_VERSION=0.8.1 tox -e $TOX_ENV
if [ $TOX_ENV != "pypy" ]; then
USE_GEVENT=1 KAFKA_VERSION=0.8.0 tox -e $TOX_ENV
USE_GEVENT=1 KAFKA_VERSION=0.8.1 tox -e $TOX_ENV
fi;
12 changes: 0 additions & 12 deletions travis_selector.sh

This file was deleted.