Skip to content

Commit

Permalink
ensure delivery count is set correctly on drain
Browse files Browse the repository at this point in the history
  • Loading branch information
grs committed Mar 6, 2019
1 parent 416706b commit 8619046
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 1 deletion.
1 change: 1 addition & 0 deletions lib/link.js
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ Receiver.prototype.on_flow = function (frame) {
this.dispatch('receiver_flow', this._context());
if (frame.performative.drain) {
this.credit = frame.performative.link_credit;
this.delivery_count = frame.performative.delivery_count;
if (frame.performative.link_credit > 0) console.error('ERROR: received flow with drain set, but non zero credit');
else this.dispatch('receiver_drained', this._context());
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "rhea",
"version": "0.3.10",
"version": "0.3.11",
"description": "reactive AMQP 1.0 library",
"homepage": "http://github.com/amqp/rhea",
"license": "Apache-2.0",
Expand Down
50 changes: 50 additions & 0 deletions test/links.ts
Original file line number Diff line number Diff line change
Expand Up @@ -791,4 +791,54 @@ describe('miscellaneous', function() {
done();
});
});

it('handles drain correctly', function(done: Function) {
var queue = [] as any;
var sender: any;
server.on('sender_open', function (context: rhea.EventContext) {
sender = context.sender!;
});
server.on('sender_draining', function (context: rhea.EventContext) {
context.sender!.set_drained(queue.length === 0);
});
server.on('receiver_open', function (context: rhea.EventContext) {
context.receiver!.add_credit(100);
});
server.on('message', function (context: rhea.EventContext) {
queue.push(context.message!.body);
while (sender.sendable && queue.length) {
sender.send({body:queue.shift()});
}
});
server.on('sendable', function (context: rhea.EventContext) {
while (context.sender!.sendable && queue.length) {
context.sender!.send({body:queue.shift()});
}
});
var conn = client.connect(listener.address() as any);
var received: string;
var r = conn.open_receiver({credit_window:0}) as any;
var s = conn.open_sender() as any;
r.on('message', function (context: rhea.EventContext) {
received = context.message!.body;
r.close();
});
r.once('receiver_drained', function () {
s.send({body:'post-drain'});
});
r.drain = true;
r.flow(1);
s.on('accepted', function () {
r.drain = true;
r.flow(1);
});
client.on('receiver_close', function (context: rhea.EventContext) {
context.connection.close();
});
client.on('connection_close', function (context: rhea.EventContext) {
assert.deepEqual(received, 'post-drain');
done();
});
});

});

0 comments on commit 8619046

Please sign in to comment.