Skip to content

Commit

Permalink
Merge pull request #158 from wizzat/add_tests
Browse files Browse the repository at this point in the history
Improve Tests, fix connection error timeout, other issues
  • Loading branch information
dpkp committed May 7, 2014
2 parents 3b18043 + b81bf5f commit b47bf78
Show file tree
Hide file tree
Showing 37 changed files with 2,730 additions and 2,001 deletions.
9 changes: 6 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[submodule "kafka-src"]
path = kafka-src
url = git://github.com/apache/kafka.git
[submodule "servers/0.8.0/kafka-src"]
path = servers/0.8.0/kafka-src
url = https://github.com/apache/kafka.git
[submodule "servers/0.8.1/kafka-src"]
path = servers/0.8.1/kafka-src
url = https://github.com/apache/kafka.git
25 changes: 14 additions & 11 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
language: python

python:
- 2.7
- 2.6
- 2.7
- pypy

before_install:
- git submodule update --init --recursive
- cd kafka-src
- ./sbt clean update package assembly-package-dependency
- cd -
- git submodule update --init --recursive
- sudo apt-get install libsnappy-dev
- ./build_integration.sh

install:
- pip install .
# Deal with issue on Travis builders re: multiprocessing.Queue :(
# See https://github.com/travis-ci/travis-cookbooks/issues/155
- sudo rm -rf /dev/shm && sudo ln -s /run/shm /dev/shm
- pip install tox
- pip install .
# Deal with issue on Travis builders re: multiprocessing.Queue :(
# See https://github.com/travis-ci/travis-cookbooks/issues/155
- sudo rm -rf /dev/shm && sudo ln -s /run/shm /dev/shm

script:
- python -m test.test_unit
- python -m test.test_integration
- tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`
- KAFKA_VERSION=0.8.0 tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`
- KAFKA_VERSION=0.8.1 tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`
63 changes: 36 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ high-level consumer and producer classes. Request batching is supported by the
protocol as well as broker-aware request routing. Gzip and Snappy compression
is also supported for message sets.

Compatible with Apache Kafka 0.8.1

http://kafka.apache.org/

# License
Expand All @@ -17,8 +15,17 @@ Copyright 2013, David Arthur under Apache License, v2.0. See `LICENSE`

# Status

The current version of this package is **0.9.0** and is compatible with
Kafka brokers running version **0.8.1**.
The current version of this package is **0.9.1** and is compatible with

Kafka broker versions
- 0.8.0
- 0.8.1
- 0.8.1.1

Python versions
- 2.6.9
- 2.7.6
- pypy 2.2.1

# Usage

Expand Down Expand Up @@ -155,6 +162,7 @@ python setup.py install

Download and build Snappy from http://code.google.com/p/snappy/downloads/list

Linux:
```shell
wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz
tar xzvf snappy-1.0.5.tar.gz
Expand All @@ -164,6 +172,11 @@ make
sudo make install
```

OSX:
```shell
brew install snappy
```

Install the `python-snappy` module
```shell
pip install python-snappy
Expand All @@ -173,40 +186,36 @@ pip install python-snappy

## Run the unit tests

_These are broken at the moment_

```shell
tox ./test/test_unit.py
```

or

```shell
python -m test.test_unit
tox
```

## Run the integration tests

First, checkout the Kafka source
The integration tests will actually start up real local Zookeeper
instance and Kafka brokers, and send messages in using the client.

Note that you may want to add this to your global gitignore:
```shell
git submodule init
git submodule update
cd kafka-src
./sbt update
./sbt package
./sbt assembly-package-dependency
.gradle/
clients/build/
contrib/build/
contrib/hadoop-consumer/build/
contrib/hadoop-producer/build/
core/build/
core/data/
examples/build/
perf/build/
```

And then run the tests. This will actually start up real local Zookeeper
instance and Kafka brokers, and send messages in using the client.

First, check out and the Kafka source:
```shell
tox ./test/test_integration.py
git submodule update --init
./build_integration.sh
```

or

Then run the tests against supported Kafka versions:
```shell
python -m test.test_integration
KAFKA_VERSION=0.8.0 tox
KAFKA_VERSION=0.8.1 tox
```
5 changes: 5 additions & 0 deletions build_integration.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash

git submodule update --init
(cd servers/0.8.0/kafka-src && ./sbt update package assembly-package-dependency)
(cd servers/0.8.1/kafka-src && ./gradlew jar)
55 changes: 22 additions & 33 deletions kafka/client.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import copy
import logging
import collections

import kafka.common

from collections import defaultdict
from functools import partial
from itertools import count

from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition,
from kafka.common import (TopicAndPartition,
ConnectionError, FailedPayloadsError,
BrokerResponseError, PartitionUnavailableError,
LeaderUnavailableError,
KafkaUnavailableError)
PartitionUnavailableError,
LeaderUnavailableError, KafkaUnavailableError,
UnknownTopicOrPartitionError, NotLeaderForPartitionError)

from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
Expand Down Expand Up @@ -39,29 +40,23 @@ def __init__(self, hosts, client_id=CLIENT_ID,
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
self.load_metadata_for_topics() # bootstrap with all metadata


##################
# Private API #
##################

def _get_conn(self, host, port):
"Get or create a connection to a broker using host and port"

host_key = (host, port)
if host_key not in self.conns:
self.conns[host_key] = KafkaConnection(host, port, timeout=self.timeout)
self.conns[host_key] = KafkaConnection(
host,
port,
timeout=self.timeout
)

return self.conns[host_key]

def _get_conn_for_broker(self, broker):
"""
Get or create a connection to a broker
"""
if (broker.host, broker.port) not in self.conns:
self.conns[(broker.host, broker.port)] = \
KafkaConnection(broker.host, broker.port, timeout=self.timeout)

return self._get_conn(broker.host, broker.port)

def _get_leader_for_partition(self, topic, partition):
"""
Returns the leader for a partition or None if the partition exists
Expand Down Expand Up @@ -99,10 +94,9 @@ def _send_broker_unaware_request(self, requestId, request):
conn.send(requestId, request)
response = conn.recv(requestId)
return response
except Exception, e:
except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, "
"trying next server: %s" % (request, host, port, e))
continue

raise KafkaUnavailableError("All servers failed to process request")

Expand Down Expand Up @@ -130,7 +124,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):

# Group the requests by topic+partition
original_keys = []
payloads_by_broker = defaultdict(list)
payloads_by_broker = collections.defaultdict(list)

for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
Expand All @@ -151,7 +145,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):

# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
conn = self._get_conn_for_broker(broker)
conn = self._get_conn(broker.host, broker.port)
requestId = self._next_id()
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
Expand All @@ -164,11 +158,11 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
continue
try:
response = conn.recv(requestId)
except ConnectionError, e:
except ConnectionError as e:
log.warning("Could not receive response to request [%s] "
"from server %s: %s", request, conn, e)
failed = True
except ConnectionError, e:
except ConnectionError as e:
log.warning("Could not send request [%s] to server %s: %s",
request, conn, e)
failed = True
Expand All @@ -191,16 +185,11 @@ def __repr__(self):
return '<KafkaClient client_id=%s>' % (self.client_id)

def _raise_on_response_error(self, resp):
if resp.error == ErrorMapping.NO_ERROR:
return

if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON,
ErrorMapping.NOT_LEADER_FOR_PARTITION):
try:
kafka.common.check_error(resp)
except (UnknownTopicOrPartitionError, NotLeaderForPartitionError) as e:
self.reset_topic_metadata(resp.topic)

raise BrokerResponseError(
"Request for %s failed with errorcode=%d (%s)" %
(TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error]))
raise

#################
# Public API #
Expand Down
Loading

0 comments on commit b47bf78

Please sign in to comment.