Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancelling a consume call does not work as expected #506

Open
robertlangenberg opened this issue Aug 9, 2023 · 2 comments
Open

Cancelling a consume call does not work as expected #506

robertlangenberg opened this issue Aug 9, 2023 · 2 comments

Comments

@robertlangenberg
Copy link

robertlangenberg commented Aug 9, 2023

Consuming a queue works, as does publishing. I don't use exchanges or anything, just one consumer on one queue. When attempting to cancel a queue, I'm still receiving messages after the DeferredCancel::onSuccess callback is executed. Also, the callback(std::string consumer) is empty, should this be the consumerTag again? I'm using the qt5 event loop and the most recent RabbitMQ release.

Here's what I'm observing:

// first publish many messages to "queueName"

m_channel->consume("queueName", "hardcodedConsumerTag").onReceived([](AMQP::Message m)
        {std::cout << m.body()<< std::endl;});

m_channel->cancel("hardcodedConsumerTag").onSuccess([](std::string consumer)
        {std::cout << "should have stopped consuming for: " << consumer << std::endl});

output:

message 1
should have stopped consuming for: (here is just an empty string)
message 2
message 3
... until ALL messages have been delivered

I would have expected the messages to stop after the output "should have stopped consuming" is printed.

When attempting to start consuming under hardcodedConsumerTag again several seconds later, RabbitMQ returns "error - consumerTag already in use", which I interpret to mean the previous consumer is still registered. RabbitMQ also continues to deliver messages to the original consumer until I manually exit the application.

@EmielBruijntjes
Copy link
Member

yes that is strange, what is the consumer-tag that is reported by the application if you also install a onSuccess callback:

m_channel->consume("queueName", "hardcodedConsumerTag").onSuccess([](const std::string &tag) {
    std::cout << "started consumer tag" << std::endl;
}).onReceived([](AMQP::Message m)
        {std::cout.write(m.body(), m.bodySize()) << std::endl;});

Also watch out that m.body() might not be null-terminated.

@robertlangenberg
Copy link
Author

robertlangenberg commented Aug 9, 2023

Oh wow that was already a good question, turns out that was a bug on my part while attempting to solve the problem with automatically assigned tags, long story short, the DeferredConsume::onSuccess() and DeferredCancel::onSuccess() callbacks get the correct consumerTag passed. Unfortunately, the problem that consumption continues persists, but printing in the callbacks has shown something else: I'm cancelling a consumer before getting the DeferredConsume::onSuccess() callback. AMQP-CPP reports success for the cancellation of the (potentially not yet existing?) consumer. Could this be what's happening? And if yes, should DeferredCancel::onSuccess() be executed if a non-existing consumer has been cancelled? Is there a way for me to perform the consume and cancel calls blocking with AMQP-CPP? Nesting everything within the DeferredConsume::onSuccess() callback would not be ideal as the nesting is already pretty deep and the consume call is done elsewhere in the code.

I might have found a(nother?) bug while trying to fix this: Rejecting a message doesn't seem to actually requeue it in the same place, another consumer will get subsequent messages, even when setting m_channel->setQos(1,true):

m_channel->setQos(1,true);
m_queueCancelled = false;
m_channel->consume("queueName", "hardcodedConsumerTag").onReceived([&](const AMQP::Message &message,
                                   uint64_t deliveryTag, bool redelivered)
        {if (m_queueCancelled){
            std::cout << "queue already cancelled, rejecting: " << message.body() << std::endl;
            m_channel->reject(deliveryTag, AMQP::requeue);
        } else { 
            //process message
            std::cout << "consuming normally: " << message.body() << std::endl;
            m_channel->ack(deliveryTag);
        }).onSuccess([](){std::cout << "consuming started" << std::endl;});

// everything below is nested in the cancel().onSuccess()
m_channel->cancel("hardcodedConsumerTag").onSuccess([&](std::string consumer)
        {m_queueCancelled = true;
        std::cout << "should have stopped consuming for: " << consumer << std::endl;

        m_channel->get(queueName).onSuccess([&](const AMQP::Message &message, uint64_t deliveryTag,
            bool redelivered) { 
                std::cout <<"printing in get: " << message.body() << std::endl; 
                m_channel->ack(deliveryTag);
                callGetUntilEmpty(); //emits a QT signal that calls m_channel->get with the same arguments recursively until DeferredGet::onEmpty() is called
            }).onEmpty([&](){std::cout << "queue empty" << std::endl); 
                                    m_queueCancelled = false; 
                                    m_channel->consume("hardcodedConsumerTag");});
});

and the output is


should have stopped consuming for: hardcodedConsumerTag
consuming started
queue already cancelled, rejecting: message1
printing in get: message2
printing in get: message1
queue already cancelled, rejecting: message3
queue empty
consuming normally: message3
consume error:  NOT_ALLOWED - attempt to reuse consumer tag 'hardcodedConsumerTag' // I installed DeferredConsume::onError for this to be printed

I need to preserve the message order, so having rejected messages not be in the same place is really inconvenient... I guess this might be RabbitMQ config, but I'm all ears if you happen to know how to achieve that :)

note that when I neither acknowledge nor reject the message in the supposedley canceled consume call, m_channel->get() will still get message2 and message3, i thought this would not happen with setQos(1,true).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants