Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reader.has_message_available() returns True even if no messages to consume, when start_message_id is earliest #199

Closed
adamdelezuch89 opened this issue Feb 16, 2024 · 2 comments · Fixed by #202

Comments

@adamdelezuch89
Copy link

adamdelezuch89 commented Feb 16, 2024

Hi,
Reader.has_message_available() returns True on first execution, even if no message available on topic. Issue is only, when reader is created with start_message_id=pulsar.MessageId.earliest,

Environment:
Python 3.11.5
pulsar-client 3.4.0

Reproduction

  1. Create reader with start_message_id=pulsar.MessageId.earliest:
from my_schemas import Schema
import pulsar

pulsar_url = "localhost"
topic = "tenant/namespace/topic"
name = "name"
schema = pulsar.schema.AvroSchema(Schema)

client = pulsar.Client(pulsar_url)

reader = client.create_reader(
    topic=topic,
    start_message_id=pulsar.MessageId.earliest,
    schema=schema,
    reader_name=name,
)
  1. Seek MessageId.latest or timestamp after point, where is no more messages
reader.seek(pulsar.MessageId.latest)
  1. read messages until no more messages available
while reader.has_message_available(): # It is True on first iteration but should be False
    msg = reader.read_next(timeout_millis=5000)

print("No more messages")

At point 3 I should get info log "No more messages", but I got Timeout exception.

@BewareMyPower
Copy link
Contributor

It's a bug of C++ client, I will fix it ASAP.

BewareMyPower added a commit to BewareMyPower/pulsar-client-cpp that referenced this issue Mar 4, 2024
…ageId each time

### Motivation

See apache/pulsar-client-python#199

There is a race condition when `hasMessageAvailable` is called after
`seek` if the start message ID of `Reader` is earliest.

In `ConsumerImpl::hasMessageAvailableAsync`, if the connection is not
established at the moment, `lastDequedMessageId_` will be `earliest`
because no message is received. Since `lastMessageIdInBroker_` is also
`earliest`, `getLastMessageIdAsync` will be called and then it comes at

https://github.com/apache/pulsar-client-cpp/blob/e2cacb7dfb57b6d059b49fead2e1611548ff89b0/lib/ConsumerImpl.cc#L1554

However, before `getLastMessageIdAsync` is called, `messageId` was
`earliest` because `lastDequedMessageId_` and `startMessageId_` were
both `earliest`. However, when the callback is called, the
`startMessageId_` has already been updated to `latest` in
`connectionOpened`, so we should compare to `latest`.

### Modifications

In the callback of `getLastMessageIdAsync`, retrieve the latest value of
`startMessageId_` to compare rather then reusing the old value.
BewareMyPower added a commit to BewareMyPower/pulsar-client-cpp that referenced this issue Mar 4, 2024
…ageId each time

### Motivation

See apache/pulsar-client-python#199

There is a race condition when `hasMessageAvailable` is called after
`seek` if the start message ID of `Reader` is earliest.

In `ConsumerImpl::hasMessageAvailableAsync`, if the connection is not
established at the moment, `lastDequedMessageId_` will be `earliest`
because no message is received. Since `lastMessageIdInBroker_` is also
`earliest`, `getLastMessageIdAsync` will be called and then it comes at

https://github.com/apache/pulsar-client-cpp/blob/e2cacb7dfb57b6d059b49fead2e1611548ff89b0/lib/ConsumerImpl.cc#L1554

However, before `getLastMessageIdAsync` is called, `messageId` was
`earliest` because `lastDequedMessageId_` and `startMessageId_` were
both `earliest`. However, when the callback is called, the
`startMessageId_` has already been updated to `latest` in
`connectionOpened`, so we should compare to `latest`.

### Modifications

In the callback of `getLastMessageIdAsync`, retrieve the latest value of
`startMessageId_` to compare rather then reusing the old value.

Refactor the seek flow to reset the seek states and trigger the callback
after updating the `startMessageId_`.

`ReaderTest.testHasMessageAvailableAfterSeekToEnd` is added to cover the
changes.
BewareMyPower added a commit to BewareMyPower/pulsar-client-cpp that referenced this issue Mar 5, 2024
…ageId each time

### Motivation

See apache/pulsar-client-python#199

There is a race condition when `hasMessageAvailable` is called after
`seek` if the start message ID of `Reader` is earliest.

In `ConsumerImpl::hasMessageAvailableAsync`, if the connection is not
established at the moment, `lastDequedMessageId_` will be `earliest`
because no message is received. Since `lastMessageIdInBroker_` is also
`earliest`, `getLastMessageIdAsync` will be called and then it comes at

https://github.com/apache/pulsar-client-cpp/blob/e2cacb7dfb57b6d059b49fead2e1611548ff89b0/lib/ConsumerImpl.cc#L1554

However, before `getLastMessageIdAsync` is called, `messageId` was
`earliest` because `lastDequedMessageId_` and `startMessageId_` were
both `earliest`. However, when the callback is called, the
`startMessageId_` has already been updated to `latest` in
`connectionOpened`, so we should compare to `latest`.

### Modifications

In the callback of `getLastMessageIdAsync`, retrieve the latest value of
`startMessageId_` to compare rather then reusing the old value.

Refactor the seek flow to reset the seek states and trigger the callback
after updating the `startMessageId_`.

`ReaderTest.testHasMessageAvailableAfterSeekToEnd` is added to cover the
changes.
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this issue Mar 5, 2024
…seeking to latest

### Motivation

Java client has the same issue with
apache/pulsar-client-python#199

After a seek operation is done, the `startMessageId` will be updated
until the reconnection due to the seek is done in `connectionOpened`. So
before it's updated, `hasMessageAvailable` could compare with an
outdated `startMessageId` and return a wrong value.

### Modifications

Replace `duringSeek` with a `SeekStatus` field:
- `NOT_STARTED`: initial, or a seek operation is done. `seek` could only
  succeed in this status.
- `IN_PROGRESS`: A seek operation has started but the client does not
  receive the response from broker.
- `COMPLETED`: The client has received the seek response but the seek
  future is not done.

After the status becomes `COMPLETED`, next time the connection is
established, the status will change from `COMPLETED` to `NOT_STARTED`
and then seek future will be completed in the internal executor.

Add `testHasMessageAvailableAfterSeek` to cover this change.
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this issue Mar 5, 2024
…seeking to latest

### Motivation

Java client has the same issue with
apache/pulsar-client-python#199

After a seek operation is done, the `startMessageId` will be updated
until the reconnection due to the seek is done in `connectionOpened`. So
before it's updated, `hasMessageAvailable` could compare with an
outdated `startMessageId` and return a wrong value.

### Modifications

Replace `duringSeek` with a `SeekStatus` field:
- `NOT_STARTED`: initial, or a seek operation is done. `seek` could only
  succeed in this status.
- `IN_PROGRESS`: A seek operation has started but the client does not
  receive the response from broker.
- `COMPLETED`: The client has received the seek response but the seek
  future is not done.

After the status becomes `COMPLETED`, if the connection is not ready,
next time the connection is established, the status will change from
`COMPLETED` to `NOT_STARTED` and then seek future will be completed in
the internal executor.

Add `testHasMessageAvailableAfterSeek` to cover this change.
@BewareMyPower
Copy link
Contributor

The fix apache/pulsar-client-cpp#409 is included in the C++ client 3.5.0.

BewareMyPower added a commit to BewareMyPower/pulsar-client-python that referenced this issue Mar 14, 2024
BewareMyPower added a commit to BewareMyPower/pulsar-client-python that referenced this issue Mar 16, 2024
Fixes apache#199
Fixes apache#193

Upgraded the pulsar-client-cpp dependency to 3.5.0, which has the fixes for them. Then add relevant unit tests.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants