Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MultiProcessConsumer disregards topic offsets between subsequent runs #173

Closed
macmarcin-zz opened this issue May 16, 2014 · 9 comments
Closed

Comments

@macmarcin-zz
Copy link

Not sure if this works as designed but, every time I invoke MultiProcesConsumer on same topic/partition (new execution thread), the consumer disregards the consumer offset in Zookeeper and reads all the messages that are available. SimpleProcessConsumer only reads messages that have not been read. Shouldn't MultiProcessConsumer behave the same way?

Code snippet:


brokers=["localhost:9092","localhost:9093","localhost:9094"]
kafka = KafkaClient(brokers)
consumer = MultiProcessConsumer(kafka, "marcin-group", "test_1", num_procs=2)
logging.debug('*** all offsets=%s' %consumer.offsets)
consumed_count=0
for message in consumer:
    print(message)
    consumed_count += 1

logging.debug('*** all offsets=%s ...after consuming all messages, number of messages consumed=%s' % (consumer.offsets, consumed_count)
----------------------------------------
ZK info:
get /kafka/consumers/marcin-group/offsets/test_1/0
200
cZxid = 0x1a2f0
--------
Debug snippets from the log file:
DEBUG:root:*** all offsets={0: 200}
.....
DEBUG:kafka:Commit offset 100 in SimpleConsumer: group=marcin-group, topic=test_1, partition=0
.....
DEBUG:kafka:Commit offset 200 in SimpleConsumer: group=marcin-group, topic=test_1, partition=0
.....
DEBUG:root:*** all offsets={0: 200} ...after consuming all messages, number of messages consumed=200
@wizzat
Copy link
Collaborator

wizzat commented May 19, 2014

What Kafka-Python commit are you using?

@macmarcin-zz
Copy link
Author

I tried several and invoking MultiProcessConsumer the following way exhibits that above behavior. However if I invoke SimpleConsumer the same way (minus num_procs), the SimpleConsumer won't consume any messages since the offset is set at 200.

MultiProcessConsumer(kafka, "marcin-group", "test_1", num_procs=2, auto_commit=True, auto_commit_every_n=100)

@tenminben
Copy link

I can confirm this behavior as well. I did confirm that setting auto_commit=True in the SimpleConsumers created within _mp_consume() corrects this behavior. It appears this was a design choice to manage auto_commit within the master process only. I haven't dug into the code enough yet to understand why.

@wizzat
Copy link
Collaborator

wizzat commented May 22, 2014

The design choice makes sense because the master process is actually what "consumes" the messages. Fetching something from the queue and committing that without actually doing anything with the message is definitely an incorrect course of action. I don't see anything obviously wrong with the code. I don't know when I'll have more time to devote to this, but if someone could put together a failing test case it would be super helpful.

@mahall
Copy link

mahall commented Oct 2, 2014

I'm working on a fix for this. I see two options:

  1. Add a new parameter to the Consumer to indicate that it should load the offsets on init but not perform any of the other autocommit actions. Something like:
def __init__(self, client, group, topic, partitions=None, auto_commit=True,
             auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
             auto_commit_every_t=AUTO_COMMIT_INTERVAL,
             load_initial_offsets=False):
  1. Pass the offsets from the MultiProcessConsumer to each child SimpleConsumer along with the partitions and use them as the starting offsets.

I like option 1 the best but I know they wanted to keep all offset management in the MultiProcessConsumer so option 2 might fit their design choice better.

@dpkp
Copy link
Owner

dpkp commented Mar 30, 2015

i think the best solution here is to fetch committed offsets in the base consumer on __init__. currently it is only done when autocommit is configured, but any consumer that uses a pre-existing group should use those offsets by default.

@dpkp
Copy link
Owner

dpkp commented Mar 30, 2015

See PR #356 -- I recall now that the reason SimpleConsumer only fetches commits when auto_commit=True was because everyone was using server v0.8.0 and it did not have support for offset commit/fetch. We should change to check whether client.group is None.

@vshlapakov
Copy link
Contributor

Sounds good to me, it's simpler to reset offsets only when required (minority of cases), and resuming offsets should be the default behavior.

@dpkp dpkp added this to the 0.9.4 Release milestone Mar 31, 2015
@dpkp
Copy link
Owner

dpkp commented Mar 31, 2015

#356 merged -- this should be fixed now and will be available in 0.9.4 release

@dpkp dpkp closed this as completed Mar 31, 2015
wbarnha added a commit to ax-ale/kafka-python that referenced this issue Mar 26, 2024
* Add typing

* define types as Struct for simplicity's sake
bradenneal1 pushed a commit to bradenneal1/kafka-python that referenced this issue May 16, 2024
* Add typing

* define types as Struct for simplicity's sake
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
6 participants