From 8619046cba9bc44481d3611d2f297784590ff6f4 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 6 Mar 2019 10:07:18 +0000 Subject: [PATCH] ensure delivery count is set correctly on drain --- lib/link.js | 1 + package.json | 2 +- test/links.ts | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 1 deletion(-) diff --git a/lib/link.js b/lib/link.js index f15d390..65a933f 100644 --- a/lib/link.js +++ b/lib/link.js @@ -354,6 +354,7 @@ Receiver.prototype.on_flow = function (frame) { this.dispatch('receiver_flow', this._context()); if (frame.performative.drain) { this.credit = frame.performative.link_credit; + this.delivery_count = frame.performative.delivery_count; if (frame.performative.link_credit > 0) console.error('ERROR: received flow with drain set, but non zero credit'); else this.dispatch('receiver_drained', this._context()); } diff --git a/package.json b/package.json index fcb3f9f..5a9405b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "rhea", - "version": "0.3.10", + "version": "0.3.11", "description": "reactive AMQP 1.0 library", "homepage": "http://github.com/amqp/rhea", "license": "Apache-2.0", diff --git a/test/links.ts b/test/links.ts index 63ad7e1..885ad81 100644 --- a/test/links.ts +++ b/test/links.ts @@ -791,4 +791,54 @@ describe('miscellaneous', function() { done(); }); }); + + it('handles drain correctly', function(done: Function) { + var queue = [] as any; + var sender: any; + server.on('sender_open', function (context: rhea.EventContext) { + sender = context.sender!; + }); + server.on('sender_draining', function (context: rhea.EventContext) { + context.sender!.set_drained(queue.length === 0); + }); + server.on('receiver_open', function (context: rhea.EventContext) { + context.receiver!.add_credit(100); + }); + server.on('message', function (context: rhea.EventContext) { + queue.push(context.message!.body); + while (sender.sendable && queue.length) { + sender.send({body:queue.shift()}); + } + }); + server.on('sendable', function (context: rhea.EventContext) { + while (context.sender!.sendable && queue.length) { + context.sender!.send({body:queue.shift()}); + } + }); + var conn = client.connect(listener.address() as any); + var received: string; + var r = conn.open_receiver({credit_window:0}) as any; + var s = conn.open_sender() as any; + r.on('message', function (context: rhea.EventContext) { + received = context.message!.body; + r.close(); + }); + r.once('receiver_drained', function () { + s.send({body:'post-drain'}); + }); + r.drain = true; + r.flow(1); + s.on('accepted', function () { + r.drain = true; + r.flow(1); + }); + client.on('receiver_close', function (context: rhea.EventContext) { + context.connection.close(); + }); + client.on('connection_close', function (context: rhea.EventContext) { + assert.deepEqual(received, 'post-drain'); + done(); + }); + }); + });