diff --git a/.travis.yml b/.travis.yml index bd5f63aef..6119f601d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,6 +18,5 @@ install: - sudo rm -rf /dev/shm && sudo ln -s /run/shm /dev/shm script: - - tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION` - - KAFKA_VERSION=0.8.0 tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION` - - KAFKA_VERSION=0.8.1 tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION` + - ./travis_run_tests.sh $TRAVIS_PYTHON_VERSION + diff --git a/kafka/client.py b/kafka/client.py index d0e07d072..281c54e3d 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -27,7 +27,8 @@ class KafkaClient(object): # one passed to SimpleConsumer.get_message(), otherwise you can get a # socket timeout. def __init__(self, hosts, client_id=CLIENT_ID, - timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): + timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, + activate=True): # We need one connection to bootstrap self.client_id = client_id self.timeout = timeout @@ -38,7 +39,8 @@ def __init__(self, hosts, client_id=CLIENT_ID, self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] - self.load_metadata_for_topics() # bootstrap with all metadata + if activate is True: + self.load_metadata_for_topics() # bootstrap with all metadata ################## @@ -222,8 +224,11 @@ def copy(self): Create an inactive copy of the client object A reinit() has to be done on the copy before it can be used again """ - c = copy.deepcopy(self) - for k, v in c.conns.items(): + c = KafkaClient(hosts=['{0}:{1}'.format(entry[0], entry[1]) for entry in self.hosts], + client_id=self.client_id, + timeout=self.timeout, + activate=False) + for k, v in self.conns.iteritems(): c.conns[k] = v.copy() return c diff --git a/kafka/conn.py b/kafka/conn.py index 5dc3d5acb..aaa4e0edd 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -50,14 +50,15 @@ class KafkaConnection(local): timeout: default 120. The socket timeout for sending and receiving data in seconds. None means no timeout, so a request can block forever. """ - def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): + def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, activate=True): super(KafkaConnection, self).__init__() self.host = host self.port = port self.timeout = timeout self._sock = None - self.reinit() + if activate is True: + self.reinit() def __repr__(self): return "" % (self.host, self.port) @@ -133,8 +134,7 @@ def copy(self): Create an inactive copy of the connection object A reinit() has to be done on the copy before it can be used again """ - c = copy.deepcopy(self) - c._sock = None + c = KafkaConnection(host=self.host, port=self.port, timeout=self.timeout, activate=False) return c def close(self): diff --git a/servers/0.8.0/kafka-src b/servers/0.8.0/kafka-src deleted file mode 160000 index 15bb3961d..000000000 --- a/servers/0.8.0/kafka-src +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 15bb3961d9171c1c54c4c840a554ce2c76168163 diff --git a/setup.py b/setup.py index 86d1d9f23..67352609b 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ def run(self): version="0.9.1", install_requires=["distribute"], - tests_require=["tox", "mock"], + tests_require=["tox", "mock", "gevent"], cmdclass={"test": Tox}, packages=["kafka"], diff --git a/test/__init__.py b/test/__init__.py index e69de29bb..b1616cb86 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -0,0 +1,5 @@ +import os + +if os.environ.get('USE_GEVENT') == '1': + import gevent.monkey + gevent.monkey.patch_all(Event=True) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index a6589b360..c6ba8f2c8 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -112,6 +112,7 @@ def test_simple_consumer_pending(self): consumer.stop() + @skip_gevent() @kafka_versions("all") def test_multi_process_consumer(self): # Produce 100 messages to partitions 0 and 1 @@ -124,6 +125,7 @@ def test_multi_process_consumer(self): consumer.stop() + @skip_gevent() @kafka_versions("all") def test_multi_process_consumer_blocking(self): consumer = self.consumer(consumer = MultiProcessConsumer) @@ -152,6 +154,7 @@ def test_multi_process_consumer_blocking(self): consumer.stop() + @skip_gevent() @kafka_versions("all") def test_multi_proc_pending(self): self.send_messages(0, range(0, 10)) diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 6298f62f6..31bc98081 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -68,6 +68,7 @@ def test_switch_leader(self): producer.stop() + @skip_gevent() @kafka_versions("all") def test_switch_leader_async(self): key, topic, partition = random_string(5), self.topic, 0 diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index c69e1178b..94283bafb 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -247,6 +247,7 @@ def test_acks_cluster_commit(self): producer.stop() + @skip_gevent() @kafka_versions("all") def test_batched_simple_producer__triggers_by_message(self): start_offset0 = self.current_offset(self.topic, 0) @@ -296,6 +297,7 @@ def test_batched_simple_producer__triggers_by_message(self): producer.stop() + @skip_gevent() @kafka_versions("all") def test_batched_simple_producer__triggers_by_time(self): start_offset0 = self.current_offset(self.topic, 0) @@ -348,6 +350,7 @@ def test_batched_simple_producer__triggers_by_time(self): producer.stop() + @skip_gevent() @kafka_versions("all") def test_async_simple_producer(self): start_offset0 = self.current_offset(self.topic, 0) @@ -361,6 +364,7 @@ def test_async_simple_producer(self): producer.stop() + @skip_gevent() @kafka_versions("all") def test_async_keyed_producer(self): start_offset0 = self.current_offset(self.topic, 0) diff --git a/test/testutil.py b/test/testutil.py index 78e6f7d93..2a8ae5e8f 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -15,6 +15,7 @@ 'random_string', 'ensure_topic_creation', 'get_open_port', + 'skip_gevent', 'kafka_versions', 'KafkaIntegrationTestCase', 'Timer', @@ -24,6 +25,19 @@ def random_string(l): s = "".join(random.choice(string.letters) for i in xrange(l)) return s +def skip_gevent(): + def skip_gevent(func): + @functools.wraps(func) + def wrapper(self): + use_gevent = os.environ.get('USE_GEVENT') + if use_gevent is not None and \ + use_gevent == '1': + self.skipTest('test not support for gevent') + + return func(self) + return wrapper + return skip_gevent + def kafka_versions(*versions): def kafka_versions(func): @functools.wraps(func) diff --git a/tox.ini b/tox.ini index 3c5fd17dd..82d1a5f33 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,6 @@ [tox] envlist = py26, py27, pypy + [testenv] deps = unittest2 @@ -7,7 +8,10 @@ deps = coverage mock python-snappy + cython + gevent commands = nosetests --with-coverage --cover-erase --cover-package kafka [] setenv = PROJECT_ROOT = {toxinidir} + diff --git a/travis_run_tests.sh b/travis_run_tests.sh new file mode 100755 index 000000000..736edf7d5 --- /dev/null +++ b/travis_run_tests.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +TOX_ENV=$1 +if [ $1 == "2.7" ]; then + $TOX_ENV = "py27" +elif [ $1 == "2.6" ]; then + $TOX_ENV = "py26" +fi; + +tox -e $TOX_ENV +KAFKA_VERSION=0.8.0 tox -e $TOX_ENV +KAFKA_VERSION=0.8.1 tox -e $TOX_ENV +if [ $TOX_ENV != "pypy" ]; then + USE_GEVENT=1 KAFKA_VERSION=0.8.0 tox -e $TOX_ENV + USE_GEVENT=1 KAFKA_VERSION=0.8.1 tox -e $TOX_ENV +fi; diff --git a/travis_selector.sh b/travis_selector.sh deleted file mode 100755 index 21fba7e45..000000000 --- a/travis_selector.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash -# This works with the .travis.yml file to select a python version for testing - -if [ $1 == "pypy" ]; then - echo "pypy" -elif [ $1 == "2.7" ]; then - echo "py27" -elif [ $1 == "2.6" ]; then - echo "py26" -else - echo $1 -fi;