Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer Group not working on Kafka 0.8.1 #147

Closed
tavoadolfoleon opened this issue Mar 22, 2014 · 25 comments
Closed

Consumer Group not working on Kafka 0.8.1 #147

tavoadolfoleon opened this issue Mar 22, 2014 · 25 comments

Comments

@tavoadolfoleon
Copy link

I am using Kafka 0.8.1 with kafka-python tool and I have noticed that the consumer group has no effect on the offset. The consumer keeps reading all the messages form the beginning and when I launch two instances of the same python script the consumers get the same messages, from the beginning to

I found a portion on the code to uncoment when using 0.8.1 but that causes and error when the library is called

"...
consumer = SimpleConsumer(kafka, "my-group", topic, auto_commit=False)
File "build/bdist.linux-x86_64/egg/kafka/consumer.py", line 242, in init
File "build/bdist.linux-x86_64/egg/kafka/consumer.py", line 111, in init
NameError: global name 'OffsetFetchRequest' is not defined
..."

This is the python script:

localconsumer.py

from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer, KeyedProducer

kafka = KafkaClient("localhost:9092")
topic = "test"

To consume messages

consumer = SimpleConsumer(kafka, "my-group", topic, auto_commit=False)
for message in consumer:
print message

kafka.close()

@wizzat
Copy link
Collaborator

wizzat commented Mar 22, 2014

There are two open pull requests which address this problem. The debate is over how to have kafka-python support multiple versions of the kafka server given the fact the server doesn't identify itself. Also, how to keep the code base clean with different versions supported.

On Mar 22, 2014, at 8:00, tavoadolfoleon [email protected] wrote:

I am using Kafka 0.8.1 with kafka-python tool and I have noticed that the consumer group has no effect on the offset. The consumer keeps reading all the messages form the beginning and when I launch two instances of the same python script the consumers get the same messages, from the beginning to

I found a portion on the code to uncoment when using 0.8.1 but that causes and error when the library is called

"...
consumer = SimpleConsumer(kafka, "my-group", topic, auto_commit=False)
File "build/bdist.linux-x86_64/egg/kafka/consumer.py", line 242, in init
File "build/bdist.linux-x86_64/egg/kafka/consumer.py", line 111, in init
NameError: global name 'OffsetFetchRequest' is not defined
..."

This is the python script:

##localconsumer.py

from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from fixtures import ZookeeperFixture, KafkaFixture
from kafka.producer import SimpleProducer, KeyedProducer

kafka = KafkaClient("localhost:9092")
topic = "test"

To consume messages

consumer = SimpleConsumer(kafka, "my-group", topic, auto_commit=False)
for message in consumer:

print message

kafka.close()


Reply to this email directly or view it on GitHub.

@dpkp
Copy link
Owner

dpkp commented Mar 22, 2014

also -- if you are on the master branch and feel like hacking a bit, you can try including the experimental code in consumer.py:

https://github.com/mumrah/kafka-python/blob/master/kafka/consumer.py#L108-L115

wizzat is correct -- this seems to be blocked on agreeing how to handle multiple server versions

@tavoadolfoleon
Copy link
Author

is there anything it can be done to make the consumer group work on this particular version of Kafka 0.8.1, while the debate is going?, any shortcut or a temporary fix to make the consumer group work

would be the simplest thing to indicate the Kafka version manually on a parameter when setting the Kafka connection?

@tavoadolfoleon
Copy link
Author

dpkp, I did uncomment the lines you pointed but this error appears when using it:

"...
consumer = SimpleConsumer(kafka, "my-group", topic, auto_commit=False)
File "build/bdist.linux-x86_64/egg/kafka/consumer.py", line 242, in init
File "build/bdist.linux-x86_64/egg/kafka/consumer.py", line 111, in init
NameError: global name 'OffsetFetchRequest' is not defined
..."

@dpkp
Copy link
Owner

dpkp commented Mar 22, 2014

try adding OffsetFetchRequest to the import list on line 11/12 of consumer.py . It's all mostly untested though, so quite possible that it doesn't work.

you might also try rdiomar's fork that implements offsets directly in zookeeper w/o using kafka server apis: https://github.com/rdiomar/kafka-python/tree/zk_offsets .

@dpkp
Copy link
Owner

dpkp commented Mar 22, 2014

also you probably want to set auto_commit=True in SimpleConsumer to tell your consumers to store offsets

@wizzat
Copy link
Collaborator

wizzat commented Mar 22, 2014

I would recommend using one of the branches which have been offered as a pull request. The easy_kafka branch is what we have in production at Kixeye right now. Before the recent spree of merges I was going to permanently fork the project on 3/29 to get traction back and it uploaded to pypi.

On Mar 22, 2014, at 10:53, Dana Powers [email protected] wrote:

also you probably want to set auto_commit=True in SimpleConsumer to tell your consumers to store offsets


Reply to this email directly or view it on GitHub.

@geoffrey-young
Copy link

is this meant to solve #112 ?

@tavoadolfoleon
Copy link
Author

Geoffrey,

I am actually in the process of testing the options, I will be posting the
result checking if anything worked

On Tue, Mar 25, 2014 at 10:41 AM, Geoffrey Young
[email protected]:

is this meant to solve #112#112?

Reply to this email directly or view it on GitHubhttps://github.com//issues/147#issuecomment-38572092
.

@tavoadolfoleon
Copy link
Author

I did uncomment the lines for Kafka 0.8.1 and include the "import OffsetFetchRequest" at lines 11 and 12 make the errors go away so far.

but still the offset is not set, every time I read it brings all the messages (from beginning) and if I launch two instances of the consumer both show the existing and the new messages

autocommit=true/false doesn't change the behavior

@tavoadolfoleon
Copy link
Author

if this is true:

"
... That contract of one message per consumer group only works for the coordinated consumers which are implemented for the JVM only (i.e., Scala and Java clients).

Kafka will eventually have better support for non-JVM clients to handle coordinated consumption at which point we'll implement this functionality.

HTH ...
"

then there is nothing it can be done for the matter

@wizzat
Copy link
Collaborator

wizzat commented Mar 25, 2014

I've had good luck setting up a consumer per partition. This emulates that behavior fairly well.

-Mark

On Mar 25, 2014, at 12:04, Gustavo A Leon [email protected] wrote:

if this is true:

"
... That contract of one message per consumer group only works for the coordinated consumers which are implemented for the JVM only (i.e., Scala and Java clients).

Kafka will eventually have better support for non-JVM clients to handle coordinated consumption at which point we'll implement this functionality.

HTH ...
"

then there is nothing it can be done for the matter


Reply to this email directly or view it on GitHub.

@tavoadolfoleon
Copy link
Author

when the consumer stop and start again it continues where it was or from beginning? if from beginning how you do it

@tavoadolfoleon
Copy link
Author

sorry, I mean, from where it was, how do you do it?

@wizzat
Copy link
Collaborator

wizzat commented Mar 25, 2014

I'm using this branch: https://github.com/wizzat/kafka-python/tree/easykafka

In particular, each process starts up with a single topic/partition that it listens to. I don't appear to be getting the same data more than once unless I spin up another process listening on the same set of topics.

As to resuming from where it starts, I think it all boils down to this bit (it required some monkeying around).

consumer.py:109:

    if auto_commit:
        for partition in partitions:
            req = OffsetFetchRequest(topic, partition)
            (offset,) = self.client.send_offset_fetch_request(group, [req],
                          callback=get_or_init_offset_callback,
                          fail_on_error=False)
            self.offsets[partition] = offset
    else:
        for partition in partitions:
            self.offsets[partition] = 0

@tavoadolfoleon
Copy link
Author

Wizzat, are you able to get messages from where customer left?, for example:

for instance, you launch consumer in "my-group" and get messages 1,2,3 then you stop it
you launched again and it gets 4,5,...

@wizzat
Copy link
Collaborator

wizzat commented Mar 27, 2014

Yes

@tavoadolfoleon
Copy link
Author

I am getting this error using that branch (https://github.com/wizzat/kafka-python/tree/easykafka):

ubuntu@ubuntu:~$ python localconsumer.py
Traceback (most recent call last):
File "localconsumer.py", line 5, in
kafka = KafkaClient("localhost:9092")
TypeError: init() takes at least 3 arguments (2 given)

the localconsumer.py is the same described at the top

@tavoadolfoleon
Copy link
Author

I think I solved the issue of maintaining the offset, the client will start reading messages from where it was:

using (https://github.com/wizzat/kafka-python/tree/easykafka)

and modifying around the line 109 on "consumer.py"

    #Uncomment for 0.8.1
    #
    for partition in partitions:
        req = OffsetFetchRequest(topic, partition)
        (offset,) = self.client.send_offset_fetch_request(group, [req],
                      callback=get_or_init_offset_callback,
                      fail_on_error=False)
        self.offsets[partition] = offset

    #for partition in partitions:
    #    self.offsets[partition] = 0

importing "OffsetFetchRequest" around line 10 on "consumer.py"

    from kafka.common import (
           ErrorMapping, FetchRequest,
           OffsetRequest, OffsetCommitRequest,
           ConsumerFetchSizeTooSmall, ConsumerNoMoreData, OffsetFetchRequest
    )

and using this client

localconsumer.py

  from kafka.client import KafkaClient
  from kafka.consumer import SimpleConsumer
  from kafka.producer import SimpleProducer, KeyedProducer

  kafka = KafkaClient("localhost",9092)
  topic = "test"

  #To consume messages
  consumer = SimpleConsumer(kafka, "my-group", topic, auto_commit=True)
  for message in consumer:
          print message

   kafka.close()

@tavoadolfoleon
Copy link
Author

Thanks to all of you, what a great job you guys have done creating this library!!

girlprogrammer pushed a commit to girlprogrammer/kafka-python that referenced this issue Apr 9, 2014
@wizzat
Copy link
Collaborator

wizzat commented May 7, 2014

This should be resolved as of #158.

@dpkp
Copy link
Owner

dpkp commented May 9, 2014

Yes - offset commits are fixed in master by #158

@dpkp dpkp closed this as completed May 9, 2014
@tejas07
Copy link

tejas07 commented May 24, 2018

I want to create a consumer group with same groupId, in my case there will be dynamic consumers because I am using kafka as event produer & consumer to list activity between applications. Kafka is installed on a centralised server & events are produced from different VM's.
how can I achieve this in JAVA
@wizzat

@wizzat
Copy link
Collaborator

wizzat commented May 24, 2018

In Java you should just use the java client?

@tejas07
Copy link

tejas07 commented May 27, 2018

Anysample projects of same use case kindly suggest.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants