Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reader stuck after reading all the messages on a topic that were each published via a separate transaction. #11877

Closed
YannPerthuis opened this issue Sep 1, 2021 · 3 comments
Labels
lifecycle/stale type/bug The PR fixed a bug or issue reported a bug

Comments

@YannPerthuis
Copy link

YannPerthuis commented Sep 1, 2021

Describe the bug
It seems to me that when a message is posted to a topic via a transaction, two entries are inserted into the topic (as seen with pulsar manager ui, there is probably a system message and the actual posted message) as opposed to only one entry when the message is posted without a transaction.

When I use a reader to read all the messages in the same topic, I use the 'hasMessageAvailable' method and then read the message if there is one. The problem is, after reading the last message in the topic, the 'hasMessageAvailable' method still returns 'true' instead of 'false'.

Logically my reader tries to read the next message but gets stuck.

I think that the 'hasMessageAvailable' method returns 'true' because the broker API sees that my reader's cursor is not at the end of the topic (there is still a system message after the current position of my cursor) and does not differentiate between real and system messages. However, when the reader tries to read the last message it gets stuck because the broker API has no real message to return. This is just a guess.

Ps: I tried to use the 'hasReachedEndOfTopic' method, but it is of no use to me because it returns false all the time.

To Reproduce

  1. Run pulsar standalone (with transactionCoordinatorEnabled=true in conf)
  2. Run the following piece of code :
public class Application {

    public static void main(String[] args) throws IOException {

        String[] messagesToPublish = new String[]{ "message-1", "message-2", "message-3", "message-4", "message-5" };

        PulsarClient pulsarClient = PulsarClient.builder()
            .serviceUrl("pulsar://localhost:6650")
            .statsInterval(0, TimeUnit.SECONDS)
            .enableTransaction(true)
            .build();

        String sourceTopic = "public/default/test-topic";

        Producer<String> producer = pulsarClient
            .newProducer(Schema.STRING)
            .topic(sourceTopic)
            .sendTimeout(0, TimeUnit.SECONDS)
            .create();

        Stream.of(messagesToPublish)
            .forEach(m -> {

                try {

                    Transaction txn = pulsarClient
                        .newTransaction()
                        .build()
                        .get();


                    producer.newMessage(txn).value(m).send();

                    txn.commit().get();
                } catch (PulsarClientException | ExecutionException | InterruptedException e) {

                    e.printStackTrace();
                }
            });

        Reader<String> reader = pulsarClient
            .newReader(Schema.STRING)
            .topic(sourceTopic)
            .subscriptionName("test")
            .startMessageId(MessageId.earliest)
            .create();

        while (reader.hasMessageAvailable()) {

            Message<String> message = reader.readNext(); // Stuck here

            System.out.println("New message received: " + message.getValue());
        }

        System.out.println("Should logically be written but is not");
    }
}
  1. Note that the program never exits and that the last print is logically not done.

Expected behavior
The hasMessageAvailable method should return 'false' when the reader has finished reading all the messages in a topic that were all originally published via a separate transaction.

Desktop

  • OS: macOs v11.2.3
  • Java: v8
  • Pulsar (broker and client): v2.8.0
@YannPerthuis
Copy link
Author

YannPerthuis commented Sep 22, 2021

@315157973 @sijie @codelipenghui @lhotari @wolfstudy, I'm sorry to tag you but I just want to know if I'm doing something wrong with the transactions or with the reader or if this problem is already known. In this pr I saw that the problem had been fixed but not in the context of messages published via a transaction.

Thanks for the help in advance

@codelipenghui
Copy link
Contributor

The issue had no activity for 30 days, mark with Stale label.

@dao-jun
Copy link
Member

dao-jun commented May 19, 2024

this issue looks fixed by #22572, closing

@dao-jun dao-jun closed this as completed May 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
lifecycle/stale type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

3 participants