Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue and question - Detached receivers / draining credits #412

Open
kbfirebreather opened this issue Oct 12, 2023 · 9 comments
Open

Issue and question - Detached receivers / draining credits #412

kbfirebreather opened this issue Oct 12, 2023 · 9 comments

Comments

@kbfirebreather
Copy link

kbfirebreather commented Oct 12, 2023

Issue: Can't ack a message on a detached receiver.

I'm trying to allow messages that are being consumed (message sent / haven't been ack'd yet) during server shutdown.
I attempted to detach the receiver while it's processing, and waiting for it to complete before closing the queues / connection to Artemis. It appears though that calling ack() in a detached state doesn't actually acknowledge the message, and the next consumer on the queue picks up the message that was being processed. Is acknowledging a message while in a detached state supported?

Question:

If not, I would like to configure the consumers to stop consuming messages during shutdown, while it waits for pre-shutdown consuming to complete. How can I set the credits for the receiver to 0 so Artemis doesn't attempt to send any messages during shutdown? I see on the receiver interface there is a drain_credit() function. Is that what I could call to prevent the consumer from picking up new messages?

@grs
Copy link
Member

grs commented Oct 13, 2023

Can you attach a simple reproducer? Or else a protocol trace/log of the interaction? Acknowledgement should be allowed after the detaching of the receiver. Whether the broker handles that is another question though. It may consider unacked messages at the point the receiver is closed as unconsumed.

@kbfirebreather
Copy link
Author

When you say protocol trace/log, do you mean from rhea, or my broker? (ActiveMQ in this case)

@grs
Copy link
Member

grs commented Oct 16, 2023

Either should do. The key thing is to determine what the actual interaction is.

@kbfirebreather
Copy link
Author

Okay, turned on rhea debug logging, and scaled my service to two so there were two consumers on the queue.

here are the logs from the service that consumed the message originally, showing the message being consumed, a sleep statement being executed, and then the SIGTERM being sent in the middle of the sleep.

[vagrant@k3sdev (default) /opt/service/kubernetes ]> kubectl logs -f service-alerts-dbd9d8cc5-sgkx8 --since=10s
Mon, 16 Oct 2023 13:31:20 GMT rhea:io [connection-1] read 440 bytes
Mon, 16 Oct 2023 13:31:20 GMT rhea:io [connection-1] got frame of size 440
Mon, 16 Oct 2023 13:31:20 GMT rhea:raw [connection-1] RECV: 440 000001b802000000005314c00905520243a001004342005370c00402415000005372c12904a30e782d6f70742d6a6d732d646573745100a312782d6f70742d6a6d732d6d73672d747970655105005373d0000000280000000a40a00475736572a10c71756575652e616c65727473404040404040830000018b38af1647005374c11502a1114a4d535844656c6976657279436f756e7440005377b1000001197b2274797065223a22616c657274222c227365766572697479223a227761726e696e67222c2274696d657374616d70223a22323031312d31302d30355431343a34383a30302e3030305a222c227469746c65223a225465737420416c657274222c22636f6e74656e74223a2254686520636f6e74656e742e2e2e222c226461746154797065223a22747970654f6644617461222c2264617461223a7b227761726e696e67223a22736f6d657468696e67222c22696e666f223a22736f6d657468696e6720656c7365227d2c2265787465726e616c223a66616c73652c2275726c223a2268747470733a2f2f6d61737465722e62656173742e75732e6c6d636f2e636f6d2f222c2270726f6475636572223a226265617374227d
Mon, 16 Oct 2023 13:31:20 GMT rhea:frames [connection-1]:0 <- transfer#14 {"handle":2,"delivery_tag":{"type":"Buffer","data":[0]}} <Buffer 00 53 70 c0 04 02 41 50 00 00 53 72 c1 29 04 a3 0e 78 2d 6f 70 74 2d 6a 6d 73 2d 64 65 73 74 51 00 a3 12 78 2d 6f 70 74 2d 6a 6d 73 2d 6d 73 67 2d 74 ... 368 more bytes>
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'List8', typecode: 192, width: 1, category: 3, create: [Function (anonymous)] { typecode: 192 } }, value: [ Typed { type: [TypeDesc], value: true }, Typed { type: [TypeDesc], value: 0 } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 112 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 112 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Map8', typecode: 193, width: 1, category: 3, create: [Function (anonymous)] { typecode: 193 } }, value: [ Typed { type: [TypeDesc], value: 'x-opt-jms-dest' }, Typed { type: [TypeDesc], value: 0 }, Typed { type: [TypeDesc], value: 'x-opt-jms-msg-type' }, Typed { type: [TypeDesc], value: 5 } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 114 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 114 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'List32', typecode: 208, width: 4, category: 3, create: [Function (anonymous)] { typecode: 208 } }, value: [ Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: <Buffer 75 73 65 72> }, Typed { type: [TypeDesc], value: 'queue.alerts' }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: 2023-10-16T13:31:20.519Z } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 115 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 115 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Map8', typecode: 193, width: 1, category: 3, create: [Function (anonymous)] { typecode: 193 } }, value: [ Typed { type: [TypeDesc], value: 'JMSXDeliveryCount' }, Typed { type: [TypeDesc], value: null } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 116 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 116 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Str32', typecode: 177, width: 4, category: 2, encoding: 'utf8', create: [Function (anonymous)] { typecode: 177 } }, value: '{"type":"alert","severity":"warning","timestamp":"2011-10-05T14:48:00.000Z","title":"Test Alert","content":"The content...","dataType":"typeOfData","data":{"warning":"something","info":"something else"},"external":false,"producer":"service"}', descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 119 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 119 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:events [connection-1] Link got event: message
[2023-10-16T13:31:20Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): Sleeping for 30 seconds to simulator long running amq consumer
Mon, 16 Oct 2023 13:31:21 GMT rhea:frames [connection-1]:0 -> empty
Mon, 16 Oct 2023 13:31:21 GMT rhea:raw [connection-1] SENT: 8 0000000802000000
[2023-10-16T13:31:26Z] INFO  (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): Received SIGTERM event. Shutting down.
[2023-10-16T13:31:26Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): closing context
[2023-10-16T13:31:26Z] INFO  (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): Shutting down Messenger
[2023-10-16T13:31:26Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): queue.alerts Detaching queue to wait for consuming messages while shutting down
[2023-10-16T13:31:26Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): Waiting for consumers to complete, or timeout to be reached
Mon, 16 Oct 2023 13:31:26 GMT rhea:frames [connection-1]:0 -> detach#16 {"handle":2}
Mon, 16 Oct 2023 13:31:26 GMT rhea:raw [connection-1] SENT: 23 0000001702000000005316d00000000700000002520242
[2023-10-16T13:31:26Z] INFO  (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): server closed
Mon, 16 Oct 2023 13:31:26 GMT rhea:io [connection-1] read 16 bytes
Mon, 16 Oct 2023 13:31:26 GMT rhea:io [connection-1] got frame of size 16
Mon, 16 Oct 2023 13:31:26 GMT rhea:raw [connection-1] RECV: 16 0000001002000000005316c003015202
Mon, 16 Oct 2023 13:31:26 GMT rhea:frames [connection-1]:0 <- detach#16 {"handle":2}
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [connection-1] Link got event: receiver_close
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [connection-1] Session got event: receiver_close
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [connection-1] Connection got event: receiver_close
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [cd669231-d8cf-b04c-aa6e-0bd01fbc508f] Container got event: receiver_close

Here is the other service that is consuming on the queue. Ends up picking up the message as soon as the other service detaches on the queue, which occurs at 13:31:26

[vagrant@k3sdev (default) ~ ]> kubectl logs -f service-alerts-dbd9d8cc5-dhrjk --since=10s
Mon, 16 Oct 2023 13:31:15 GMT rhea:io [connection-1] read 8 bytes
Mon, 16 Oct 2023 13:31:15 GMT rhea:io [connection-1] got frame of size 8
Mon, 16 Oct 2023 13:31:15 GMT rhea:raw [connection-1] RECV: 8 0000000802000000
Mon, 16 Oct 2023 13:31:15 GMT rhea:frames [connection-1]:0 <- empty
Mon, 16 Oct 2023 13:31:26 GMT rhea:io [connection-1] read 441 bytes
Mon, 16 Oct 2023 13:31:26 GMT rhea:io [connection-1] got frame of size 441
Mon, 16 Oct 2023 13:31:26 GMT rhea:raw [connection-1] RECV: 441 000001b902000000005314c00a0552025201a001004342005370c00402415000005372c12904a30e782d6f70742d6a6d732d646573745100a312782d6f70742d6a6d732d6d73672d747970655105005373d0000000280000000a40a00475736572a10c71756575652e616c65727473404040404040830000018b38af1647005374c11502a1114a4d535844656c6976657279436f756e7440005377b1000001197b2274797065223a22616c657274222c227365766572697479223a227761726e696e67222c2274696d657374616d70223a22323031312d31302d30355431343a34383a30302e3030305a222c227469746c65223a225465737420416c657274222c22636f6e74656e74223a2254686520636f6e74656e742e2e2e222c226461746154797065223a22747970654f6644617461222c2264617461223a7b227761726e696e67223a22736f6d657468696e67222c22696e666f223a22736f6d657468696e6720656c7365227d2c2265787465726e616c223a66616c73652c2275726c223a2268747470733a2f2f6d61737465722e62656173742e75732e6c6d636f2e636f6d2f222c2270726f6475636572223a226265617374227d
Mon, 16 Oct 2023 13:31:26 GMT rhea:frames [connection-1]:0 <- transfer#14 {"handle":2,"delivery_id":1,"delivery_tag":{"type":"Buffer","data":[0]}} <Buffer 00 53 70 c0 04 02 41 50 00 00 53 72 c1 29 04 a3 0e 78 2d 6f 70 74 2d 6a 6d 73 2d 64 65 73 74 51 00 a3 12 78 2d 6f 70 74 2d 6a 6d 73 2d 6d 73 67 2d 74 ... 368 more bytes>
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'List8', typecode: 192, width: 1, category: 3, create: [Function (anonymous)] { typecode: 192 } }, value: [ Typed { type: [TypeDesc], value: true }, Typed { type: [TypeDesc], value: 0 } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 112 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 112 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Map8', typecode: 193, width: 1, category: 3, create: [Function (anonymous)] { typecode: 193 } }, value: [ Typed { type: [TypeDesc], value: 'x-opt-jms-dest' }, Typed { type: [TypeDesc], value: 0 }, Typed { type: [TypeDesc], value: 'x-opt-jms-msg-type' }, Typed { type: [TypeDesc], value: 5 } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 114 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 114 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'List32', typecode: 208, width: 4, category: 3, create: [Function (anonymous)] { typecode: 208 } }, value: [ Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: <Buffer 75 73 65 72> }, Typed { type: [TypeDesc], value: 'queue.alerts' }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: 2023-10-16T13:31:20.519Z } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 115 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 115 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Map8', typecode: 193, width: 1, category: 3, create: [Function (anonymous)] { typecode: 193 } }, value: [ Typed { type: [TypeDesc], value: 'JMSXDeliveryCount' }, Typed { type: [TypeDesc], value: null } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 116 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 116 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Str32', typecode: 177, width: 4, category: 2, encoding: 'utf8', create: [Function (anonymous)] { typecode: 177 } }, value: '{"type":"alert","severity":"warning","timestamp":"2011-10-05T14:48:00.000Z","title":"Test Alert","content":"The content...","dataType":"typeOfData","data":{"warning":"something","info":"something else"},"external":false,"producer":"service"}', descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 119 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 119 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [connection-1] Link got event: message
[2023-10-16T13:31:26Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-dhrjk): Sleeping for 30 seconds to simulator long running amq consumer
Mon, 16 Oct 2023 13:31:27 GMT rhea:frames [connection-1]:0 -> empty
Mon, 16 Oct 2023 13:31:27 GMT rhea:raw [connection-1] SENT: 8 0000000802000000

@grs
Copy link
Member

grs commented Oct 16, 2023

Does anything else happen on the first process after the receiver_close? My understanding of your issue is that you want to acknowledge the message received after that point. Is that happening here? (There is no disposition in that first log, but neither is there a session close or connection close, so from an AMQP perspective it is incomplete).

My suspicion is that the broker will consider any messages unacknowledged at the point the receiver is detached to be available for redelivery. In that case, sending an acknowledgement after the detach is not going to have any effect anyway. (I can't confirm this without knowing what if anything happens with the first process after the detach).

Regarding the other question, at present you can only successfully drain the credit if you have automatic credit management disabled. There is an example of that here: https://github.com/amqp/rhea/blob/main/examples/drain.js.

@kbfirebreather
Copy link
Author

kbfirebreather commented Oct 16, 2023

Ah yeah, I accidentally cut off some logging at the end. I recreated it, and these were the rhea logs in the service that was shutting down

The message is acked, then the queue is closed

[2023-10-16T15:35:50Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-vzjn5): Closing queues
Mon, 16 Oct 2023 15:35:50 GMT rhea:frames [connection-1]:0 -> disposition#15 {"role":true,"settled":true,"state":[]}
Mon, 16 Oct 2023 15:35:50 GMT rhea:raw [connection-1] SENT: 28 0000001c02000000005315d00000000c000000054143434100532445

These are the logs from the second service that got the same message

Mon, 16 Oct 2023 15:35:59 GMT rhea:frames [connection-1]:0 -> disposition#15 {"role":true,"settled":true,"state":[]}
Mon, 16 Oct 2023 15:35:59 GMT rhea:raw [connection-1] SENT: 28 0000001c02000000005315d00000000c000000054143434100532445
Mon, 16 Oct 2023 15:35:59 GMT rhea:frames [connection-1]:0 -> flow#13 {"next_incoming_id":1,"incoming_window":2048,"outgoing_window":4294967295,"handle":2,"delivery_count":1,"link_credit":15}
Mon, 16 Oct 2023 15:35:59 GMT rhea:raw [connection-1] SENT: 39 0000002702000000005313d00000001700000007520170000008004370ffffffff52025201520f
Mon, 16 Oct 2023 15:36:15 GMT rhea:io [connection-1] read 8 bytes
Mon, 16 Oct 2023 15:36:15 GMT rhea:io [connection-1] got frame of size 8
Mon, 16 Oct 2023 15:36:15 GMT rhea:raw [connection-1] RECV: 8 0000000802000000
Mon, 16 Oct 2023 15:36:15 GMT rhea:frames [connection-1]:0 <- empty
Mon, 16 Oct 2023 15:36:21 GMT rhea:frames [connection-1]:0 -> empty
Mon, 16 Oct 2023 15:36:21 GMT rhea:raw [connection-1] SENT: 8 0000000802000000

It does look like this particular consumer is disabling automatic credit management, but not sure about the rest of the services. That is good to know!

Edit: Is it possible to mutate a consumer to disable automatic credit management, or is that only possible during consumer instantiation?

@grs
Copy link
Member

grs commented Oct 16, 2023

Ok, the extra logs confirm that the broker is redelivering any unacknowledged message at the point that the receiver is closed. So acknowledgements sent after that are ignored.

@kbfirebreather
Copy link
Author

Is it possible to mutate a consumer to disable automatic credit management, or is that only possible during consumer instantiation?

@grs
Copy link
Member

grs commented Oct 16, 2023

At present you can only set that on creation of the receiver.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants