Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Unacked message counter is wrong #21568

Closed
1 of 2 tasks
psisoyev opened this issue Nov 13, 2023 · 0 comments · Fixed by #21592
Closed
1 of 2 tasks

[Bug] Unacked message counter is wrong #21568

psisoyev opened this issue Nov 13, 2023 · 0 comments · Fixed by #21592
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@psisoyev
Copy link

Search before asking

  • I searched in the issues and found nothing similar.

Version

Both 2.11.2 and 3.1.1 images

Minimal reproduce step

  1. Start Pulsar standalone docker image
  2. Create a Producer publishing to a non-persistent topic
  3. Create a Shared Consumer that listens to the topic, consumes messages, ack messages

Code example:

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;

public class PulsarProducer {

    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();

        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic("non-persistent://public/default/my-topic")
                .create();

        while (true) {
            System.out.println("Sending message...");
            String msg = """
                    { "foo": "bar" }""";
            producer.send(msg);
            Thread.sleep(5000); // Sleep for 5 seconds
        }

        // Never reaches here in this example
        // producer.close();
        // client.close();
    }
}
import org.apache.pulsar.client.api.*;

public class PulsarConsumer {

    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();

        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("non-persistent://public/default/my-topic")
                .subscriptionName("my-shared-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();

        while (true) {
            // Wait for a message
            Message<String> msg = consumer.receive();
            try {
                System.out.printf("Message received: %s\n", new String(msg.getData()));
                // Acknowledge the message so that it can be deleted by the message broker
                consumer.acknowledge(msg);
            } catch (Exception e) {
                consumer.negativeAcknowledge(msg);
            }
        }

        // Never reaches here in this example
        // consumer.close();
        // client.close();
    }
}

What did you expect to see?

When I run

pulsar-admin topics stats "non-persistent://public/default/my-topic"

I expected to see unackedMessages equal to zero

What did you see instead?

Instead, I'm seeing unackedMessages not equal to zero but it's growing with every processed message

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@psisoyev psisoyev added the type/bug The PR fixed a bug or issue reported a bug label Nov 13, 2023
poorbarcode pushed a commit that referenced this issue Nov 30, 2023
…n on non-persistent topic (#21592)

Fixes #21568

Motivation
Fix incorrect unack count when using shared subscription on non-persistent topic

Modifications
In the case of a non-persistent topic, the consumer does not send an ack to the broker (see org.apache.pulsar.client.impl.NonPersistentAcknowledgmentGroupingTracker# addAcknowledgment)

To work around this, we can update unackedMessages when the broker sends a message to the consumer successfully.
Technoboy- pushed a commit that referenced this issue Dec 3, 2023
…n on non-persistent topic (#21592)

Fixes #21568

Motivation
Fix incorrect unack count when using shared subscription on non-persistent topic

Modifications
In the case of a non-persistent topic, the consumer does not send an ack to the broker (see org.apache.pulsar.client.impl.NonPersistentAcknowledgmentGroupingTracker# addAcknowledgment)

To work around this, we can update unackedMessages when the broker sends a message to the consumer successfully.
Technoboy- pushed a commit that referenced this issue Dec 4, 2023
…n on non-persistent topic (#21592)

Fixes #21568

Motivation
Fix incorrect unack count when using shared subscription on non-persistent topic

Modifications
In the case of a non-persistent topic, the consumer does not send an ack to the broker (see org.apache.pulsar.client.impl.NonPersistentAcknowledgmentGroupingTracker# addAcknowledgment)

To work around this, we can update unackedMessages when the broker sends a message to the consumer successfully.
Technoboy- pushed a commit that referenced this issue Dec 4, 2023
…n on non-persistent topic (#21592)

Fixes #21568

Motivation
Fix incorrect unack count when using shared subscription on non-persistent topic

Modifications
In the case of a non-persistent topic, the consumer does not send an ack to the broker (see org.apache.pulsar.client.impl.NonPersistentAcknowledgmentGroupingTracker# addAcknowledgment)

To work around this, we can update unackedMessages when the broker sends a message to the consumer successfully.
nikhil-ctds pushed a commit to datastax/pulsar that referenced this issue Dec 20, 2023
…n on non-persistent topic (apache#21592)

Fixes apache#21568

Motivation
Fix incorrect unack count when using shared subscription on non-persistent topic

Modifications
In the case of a non-persistent topic, the consumer does not send an ack to the broker (see org.apache.pulsar.client.impl.NonPersistentAcknowledgmentGroupingTracker# addAcknowledgment)

To work around this, we can update unackedMessages when the broker sends a message to the consumer successfully.
srinath-ctds pushed a commit to datastax/pulsar that referenced this issue Dec 20, 2023
…n on non-persistent topic (apache#21592)

Fixes apache#21568

Motivation
Fix incorrect unack count when using shared subscription on non-persistent topic

Modifications
In the case of a non-persistent topic, the consumer does not send an ack to the broker (see org.apache.pulsar.client.impl.NonPersistentAcknowledgmentGroupingTracker# addAcknowledgment)

To work around this, we can update unackedMessages when the broker sends a message to the consumer successfully.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
1 participant