Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Tests, fix connection error timeout, other issues #158

Merged
merged 51 commits into from
May 7, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
6ee1514
Remove test support for py26 since it's broken
wizzat Apr 8, 2014
80b3335
Split test files, modify test_protocol
wizzat Apr 8, 2014
d59cbf6
Comment out all of test_integration because it currently does not work
wizzat Apr 8, 2014
f0def43
Explicit testing of protocol errors. Make tests more explicit, and s…
wizzat Apr 8, 2014
d7c5bbf
Reinstate test_integrate, make test_protocol more explicit, create te…
wizzat Apr 8, 2014
ac9cf9e
Convert serveral tests to struct.pack
wizzat Apr 9, 2014
3bde6d6
Convert more tests to struct.pack
wizzat Apr 9, 2014
a3c781f
Merge branch 'master' into add_tests
wizzat Apr 9, 2014
853d452
Update more tests, fix intermittent failure
wizzat Apr 9, 2014
115c20c
Convert test_encode_fetch_request to struct.pack format, improve test…
wizzat Apr 9, 2014
5c58151
Add python-snappy to tox dependencies. Fix snappy protocol test
wizzat Apr 9, 2014
1cb27f9
Add tests for encode_offset_request
wizzat Apr 9, 2014
58b4d0f
Add commit offset request test
wizzat Apr 9, 2014
8f179d8
Add encode_offset_fetch_request test
wizzat Apr 9, 2014
12fae12
Add final tests for 100% coverage of protocol.py from test/test_proto…
wizzat Apr 9, 2014
385f2d8
Refactor away _get_conn_for_broker. Fix bug in _get_conn
wizzat Apr 9, 2014
7eaca8e
Split out and speed up producer tests
wizzat Apr 17, 2014
8983e73
Split up and speed up producer based integration tests
wizzat Apr 17, 2014
1984dab
Finish breaking out integration tests
wizzat Apr 18, 2014
a7cbfd3
Fix bug in socket timeout per PR #161 by maciejkula, add test
wizzat Apr 19, 2014
d35b8fd
Merge branch 'master' into add_tests
wizzat Apr 22, 2014
b6262e4
Update fixtures to eliminate extraneous logging on non-errors, split …
wizzat Apr 23, 2014
6628c10
Move kafka-src to servers/0.8.0/kafka-src, move test/resources to ser…
wizzat Apr 23, 2014
7e5c847
Add support for kafka 0.8.1
wizzat Apr 23, 2014
7c21dfe
Update README
wizzat Apr 23, 2014
86e1ac7
Add test support for multiple versions of kafka. Uncomment first 0.8…
wizzat Apr 23, 2014
8a1f2e6
Split out kafka version environments, default tox no longer runs any …
wizzat Apr 23, 2014
764f205
Update consumer_integration to flip the autocommit switch when testin…
wizzat Apr 23, 2014
583d3ae
Fix Python 2.6 support
wizzat Apr 23, 2014
0e50f33
Fix last remaining test by making autocommit more intuitive
wizzat Apr 24, 2014
57913f9
Various fixes
wizzat Apr 25, 2014
0d57c27
Make BrokerRequestError a base class, make subclasses for each broker…
wizzat May 1, 2014
66152b2
Fix log directory as per documentation: http://kafka.apache.org/08/do…
mgilbir Apr 30, 2014
be06b30
Build kafka 0.8.1 for only one version of scala
mgilbir Apr 30, 2014
7a7a818
Merge branch 'add_tests' into exception_refactor
wizzat May 1, 2014
636778a
Make commit() check for errors instead of simply assert no error
wizzat May 1, 2014
f6f298f
Update .travis.yml to support new test suite
wizzat May 1, 2014
4d9236b
Merge branch 'exception_refactor' into add_tests
wizzat May 1, 2014
169c196
Update travis.yml
wizzat May 1, 2014
7108200
Attempt to install libsnappy in the travis environment
wizzat May 1, 2014
87675cc
Remove libsnappy-java
wizzat May 1, 2014
06ab5d1
Update travis.yml based on http://alexgaynor.net/2014/jan/06/why-trav…
wizzat May 1, 2014
26ae502
Temporarily remove pypy support
wizzat May 1, 2014
7a1e227
Skip flaky test between osx/linux
wizzat May 1, 2014
c307a3a
Add missing import
wizzat May 1, 2014
83d9571
Temporarily remove Python26 support from travis.yml
wizzat May 1, 2014
b120ca5
Attempt to reenable py26 and pypy builds
wizzat May 1, 2014
99320fb
Add pypy back to tox.ini
wizzat May 1, 2014
efcf58b
Attempt to fix travis build. Decrease complexity of service.py in fav…
wizzat May 7, 2014
b053da2
Merge branch 'master' into add_tests
wizzat May 7, 2014
b81bf5f
Make test suite more robust against very slow test suites
wizzat May 7, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[submodule "kafka-src"]
path = kafka-src
url = git://github.com/apache/kafka.git
[submodule "servers/0.8.0/kafka-src"]
path = servers/0.8.0/kafka-src
url = https://github.com/apache/kafka.git
[submodule "servers/0.8.1/kafka-src"]
path = servers/0.8.1/kafka-src
url = https://github.com/apache/kafka.git
25 changes: 14 additions & 11 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
language: python

python:
- 2.7
- 2.6
- 2.7
- pypy

before_install:
- git submodule update --init --recursive
- cd kafka-src
- ./sbt clean update package assembly-package-dependency
- cd -
- git submodule update --init --recursive
- sudo apt-get install libsnappy-dev
- ./build_integration.sh

install:
- pip install .
# Deal with issue on Travis builders re: multiprocessing.Queue :(
# See https://github.com/travis-ci/travis-cookbooks/issues/155
- sudo rm -rf /dev/shm && sudo ln -s /run/shm /dev/shm
- pip install tox
- pip install .
# Deal with issue on Travis builders re: multiprocessing.Queue :(
# See https://github.com/travis-ci/travis-cookbooks/issues/155
- sudo rm -rf /dev/shm && sudo ln -s /run/shm /dev/shm

script:
- python -m test.test_unit
- python -m test.test_integration
- tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`
- KAFKA_VERSION=0.8.0 tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`
- KAFKA_VERSION=0.8.1 tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`
63 changes: 36 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ high-level consumer and producer classes. Request batching is supported by the
protocol as well as broker-aware request routing. Gzip and Snappy compression
is also supported for message sets.

Compatible with Apache Kafka 0.8.1

http://kafka.apache.org/

# License
Expand All @@ -17,8 +15,17 @@ Copyright 2013, David Arthur under Apache License, v2.0. See `LICENSE`

# Status

The current version of this package is **0.9.0** and is compatible with
Kafka brokers running version **0.8.1**.
The current version of this package is **0.9.1** and is compatible with

Kafka broker versions
- 0.8.0
- 0.8.1
- 0.8.1.1

Python versions
- 2.6.9
- 2.7.6
- pypy 2.2.1

# Usage

Expand Down Expand Up @@ -155,6 +162,7 @@ python setup.py install

Download and build Snappy from http://code.google.com/p/snappy/downloads/list

Linux:
```shell
wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz
tar xzvf snappy-1.0.5.tar.gz
Expand All @@ -164,6 +172,11 @@ make
sudo make install
```

OSX:
```shell
brew install snappy
```

Install the `python-snappy` module
```shell
pip install python-snappy
Expand All @@ -173,40 +186,36 @@ pip install python-snappy

## Run the unit tests

_These are broken at the moment_

```shell
tox ./test/test_unit.py
```

or

```shell
python -m test.test_unit
tox
```

## Run the integration tests

First, checkout the Kafka source
The integration tests will actually start up real local Zookeeper
instance and Kafka brokers, and send messages in using the client.

Note that you may want to add this to your global gitignore:
```shell
git submodule init
git submodule update
cd kafka-src
./sbt update
./sbt package
./sbt assembly-package-dependency
.gradle/
clients/build/
contrib/build/
contrib/hadoop-consumer/build/
contrib/hadoop-producer/build/
core/build/
core/data/
examples/build/
perf/build/
```

And then run the tests. This will actually start up real local Zookeeper
instance and Kafka brokers, and send messages in using the client.

First, check out and the Kafka source:
```shell
tox ./test/test_integration.py
git submodule update --init
./build_integration.sh
```

or

Then run the tests against supported Kafka versions:
```shell
python -m test.test_integration
KAFKA_VERSION=0.8.0 tox
KAFKA_VERSION=0.8.1 tox
```
5 changes: 5 additions & 0 deletions build_integration.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash

git submodule update --init
(cd servers/0.8.0/kafka-src && ./sbt update package assembly-package-dependency)
(cd servers/0.8.1/kafka-src && ./gradlew jar)
55 changes: 22 additions & 33 deletions kafka/client.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import copy
import logging
import collections

import kafka.common

from collections import defaultdict
from functools import partial
from itertools import count

from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition,
from kafka.common import (TopicAndPartition,
ConnectionError, FailedPayloadsError,
BrokerResponseError, PartitionUnavailableError,
LeaderUnavailableError,
KafkaUnavailableError)
PartitionUnavailableError,
LeaderUnavailableError, KafkaUnavailableError,
UnknownTopicOrPartitionError, NotLeaderForPartitionError)

from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
Expand Down Expand Up @@ -39,29 +40,23 @@ def __init__(self, hosts, client_id=CLIENT_ID,
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
self.load_metadata_for_topics() # bootstrap with all metadata


##################
# Private API #
##################

def _get_conn(self, host, port):
"Get or create a connection to a broker using host and port"

host_key = (host, port)
if host_key not in self.conns:
self.conns[host_key] = KafkaConnection(host, port, timeout=self.timeout)
self.conns[host_key] = KafkaConnection(
host,
port,
timeout=self.timeout
)

return self.conns[host_key]

def _get_conn_for_broker(self, broker):
"""
Get or create a connection to a broker
"""
if (broker.host, broker.port) not in self.conns:
self.conns[(broker.host, broker.port)] = \
KafkaConnection(broker.host, broker.port, timeout=self.timeout)

return self._get_conn(broker.host, broker.port)

def _get_leader_for_partition(self, topic, partition):
"""
Returns the leader for a partition or None if the partition exists
Expand Down Expand Up @@ -99,10 +94,9 @@ def _send_broker_unaware_request(self, requestId, request):
conn.send(requestId, request)
response = conn.recv(requestId)
return response
except Exception, e:
except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, "
"trying next server: %s" % (request, host, port, e))
continue

raise KafkaUnavailableError("All servers failed to process request")

Expand Down Expand Up @@ -130,7 +124,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):

# Group the requests by topic+partition
original_keys = []
payloads_by_broker = defaultdict(list)
payloads_by_broker = collections.defaultdict(list)

for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
Expand All @@ -151,7 +145,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):

# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
conn = self._get_conn_for_broker(broker)
conn = self._get_conn(broker.host, broker.port)
requestId = self._next_id()
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
Expand All @@ -164,11 +158,11 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
continue
try:
response = conn.recv(requestId)
except ConnectionError, e:
except ConnectionError as e:
log.warning("Could not receive response to request [%s] "
"from server %s: %s", request, conn, e)
failed = True
except ConnectionError, e:
except ConnectionError as e:
log.warning("Could not send request [%s] to server %s: %s",
request, conn, e)
failed = True
Expand All @@ -191,16 +185,11 @@ def __repr__(self):
return '<KafkaClient client_id=%s>' % (self.client_id)

def _raise_on_response_error(self, resp):
if resp.error == ErrorMapping.NO_ERROR:
return

if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON,
ErrorMapping.NOT_LEADER_FOR_PARTITION):
try:
kafka.common.check_error(resp)
except (UnknownTopicOrPartitionError, NotLeaderForPartitionError) as e:
self.reset_topic_metadata(resp.topic)

raise BrokerResponseError(
"Request for %s failed with errorcode=%d (%s)" %
(TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error]))
raise

#################
# Public API #
Expand Down
Loading