diff --git a/src/histogram.js b/src/histogram.js index 115069eb9..61dfbe8d0 100644 --- a/src/histogram.js +++ b/src/histogram.js @@ -16,8 +16,7 @@ 'use strict'; -var MIN_VALUE = 10000; -var MAX_VALUE = 600000; +var extend = require('extend'); /*! * The Histogram class is used to capture the lifespan of messages within the @@ -27,7 +26,15 @@ var MAX_VALUE = 600000; * @private * @class */ -function Histogram() { +function Histogram(options) { + this.options = extend( + { + min: 10000, + max: 600000, + }, + options + ); + this.data = new Map(); this.length = 0; } @@ -39,8 +46,8 @@ function Histogram() { * @param {numnber} value - The value in milliseconds. */ Histogram.prototype.add = function(value) { - value = Math.max(value, MIN_VALUE); - value = Math.min(value, MAX_VALUE); + value = Math.max(value, this.options.min); + value = Math.min(value, this.options.max); value = Math.ceil(value / 1000) * 1000; if (!this.data.has(value)) { @@ -75,7 +82,7 @@ Histogram.prototype.percentile = function(percent) { } } - return MIN_VALUE; + return this.options.min; }; module.exports = Histogram; diff --git a/src/subscription.js b/src/subscription.js index 5c0018d35..d316f4c5c 100644 --- a/src/subscription.js +++ b/src/subscription.js @@ -153,6 +153,7 @@ function Subscription(pubsub, name, options) { this.projectId = pubsub.projectId; this.request = pubsub.request.bind(pubsub); this.histogram = new Histogram(); + this.latency_ = new Histogram({min: 0}); this.name = Subscription.formatName_(pubsub.projectId, name); @@ -312,31 +313,17 @@ Subscription.prototype.acknowledge_ = function(ackIds, connId) { var promises = chunk(ackIds, MAX_ACK_IDS_PER_REQUEST).map(function( ackIdChunk ) { - if (!self.isConnected_()) { - return common.util.promisify(self.request).call(self, { - client: 'SubscriberClient', - method: 'acknowledge', - reqOpts: { - subscription: self.name, - ackIds: ackIdChunk, - }, - }); + if (self.isConnected_()) { + return self.writeTo_(connId, {ackIds: ackIdChunk}); } - return new Promise(function(resolve, reject) { - self.connectionPool.acquire(connId, function(err, connection) { - if (err) { - reject(err); - return; - } - - connection.write( - { - ackIds: ackIdChunk, - }, - resolve - ); - }); + return common.util.promisify(self.request).call(self, { + client: 'SubscriberClient', + method: 'acknowledge', + reqOpts: { + subscription: self.name, + ackIds: ackIdChunk, + }, }); }); @@ -902,33 +889,21 @@ Subscription.prototype.modifyAckDeadline_ = function(ackIds, deadline, connId) { var promises = chunk(ackIds, MAX_ACK_IDS_PER_REQUEST).map(function( ackIdChunk ) { - if (!self.isConnected_()) { - return common.util.promisify(self.request).call(self, { - client: 'SubscriberClient', - method: 'modifyAckDeadline', - reqOpts: { - subscription: self.name, - ackDeadlineSeconds: deadline, - ackIds: ackIdChunk, - }, + if (self.isConnected_()) { + return self.writeTo_(connId, { + modifyDeadlineAckIds: ackIdChunk, + modifyDeadlineSeconds: Array(ackIdChunk.length).fill(deadline), }); } - return new Promise(function(resolve, reject) { - self.connectionPool.acquire(connId, function(err, connection) { - if (err) { - reject(err); - return; - } - - connection.write( - { - modifyDeadlineAckIds: ackIdChunk, - modifyDeadlineSeconds: Array(ackIdChunk.length).fill(deadline), - }, - resolve - ); - }); + return common.util.promisify(self.request).call(self, { + client: 'SubscriberClient', + method: 'modifyAckDeadline', + reqOpts: { + subscription: self.name, + ackDeadlineSeconds: deadline, + ackIds: ackIdChunk, + }, }); }); @@ -1180,7 +1155,9 @@ Subscription.prototype.setLeaseTimeout_ = function() { return; } - var timeout = Math.random() * this.ackDeadline * 0.9; + var latency = this.latency_.percentile(99); + var timeout = Math.random() * this.ackDeadline * 0.9 - latency; + this.leaseTimeoutHandle_ = setTimeout(this.renewLeases_.bind(this), timeout); }; @@ -1265,6 +1242,40 @@ Subscription.prototype.snapshot = function(name) { return this.pubsub.snapshot.call(this, name); }; +/** + * Writes to specified duplex stream. This is useful for capturing write + * latencies that can later be used to adjust the auto lease timeout. + * + * @private + * + * @param {string} connId The ID of the connection to write to. + * @param {object} data The data to be written to the stream. + * @returns {Promise} + */ +Subscription.prototype.writeTo_ = function(connId, data) { + var self = this; + var startTime = Date.now(); + + return new Promise(function(resolve, reject) { + self.connectionPool.acquire(connId, function(err, connection) { + if (err) { + reject(err); + return; + } + + connection.write(data, function(err) { + if (err) { + reject(err); + return; + } + + self.latency_.add(Date.now() - startTime); + resolve(); + }); + }); + }); +}; + /*! Developer Documentation * * All async methods (except for streams) will return a Promise in the event diff --git a/test/histogram.js b/test/histogram.js index eee2693c1..607589540 100644 --- a/test/histogram.js +++ b/test/histogram.js @@ -31,6 +31,18 @@ describe('Histogram', function() { }); describe('initialization', function() { + it('should set default min/max values', function() { + assert.strictEqual(histogram.options.min, 10000); + assert.strictEqual(histogram.options.max, 600000); + }); + + it('should accept user defined min/max values', function() { + histogram = new Histogram({min: 5, max: 10}); + + assert.strictEqual(histogram.options.min, 5); + assert.strictEqual(histogram.options.max, 10); + }); + it('should create a data map', function() { assert(histogram.data instanceof Map); }); diff --git a/test/subscription.js b/test/subscription.js index d540e4680..4709f8e51 100644 --- a/test/subscription.js +++ b/test/subscription.js @@ -132,6 +132,10 @@ describe('Subscription', function() { assert(subscription.histogram instanceof FakeHistogram); }); + it('should create a latency histogram', function() { + assert(subscription.latency_ instanceof FakeHistogram); + }); + it('should format the sub name', function() { var formattedName = 'a/b/c/d'; var formatName = Subscription.formatName_; @@ -469,28 +473,19 @@ describe('Subscription', function() { }); describe('with streaming connection', function() { - var pool; - beforeEach(function() { subscription.isConnected_ = function() { return true; }; - - pool = subscription.connectionPool = {}; }); it('should send the correct request', function(done) { var fakeConnectionId = 'abc'; - var fakeConnection = { - write: function(data) { - assert.deepEqual(data, {ackIds: fakeAckIds}); - done(); - }, - }; - pool.acquire = function(connectionId, callback) { + subscription.writeTo_ = function(connectionId, data) { assert.strictEqual(connectionId, fakeConnectionId); - callback(null, fakeConnection); + assert.deepEqual(data, {ackIds: fakeAckIds}); + done(); }; subscription.acknowledge_(fakeAckIds, fakeConnectionId); @@ -498,27 +493,22 @@ describe('Subscription', function() { it('should batch requests if there are too many ackIds', function(done) { var receivedCalls = 0; - var fakeConnectionId = 'abc'; - var fakeConnection = { - write: function(data) { - var offset = receivedCalls * batchSize; - var expectedAckIds = tooManyFakeAckIds.slice( - offset, - offset + batchSize - ); - assert.deepEqual(data, {ackIds: expectedAckIds}); + subscription.writeTo_ = function(connectionId, data) { + assert.strictEqual(connectionId, fakeConnectionId); - receivedCalls += 1; - if (receivedCalls === expectedCalls) { - done(); - } - }, - }; + var offset = receivedCalls * batchSize; + var expectedAckIds = tooManyFakeAckIds.slice( + offset, + offset + batchSize + ); + + assert.deepEqual(data, {ackIds: expectedAckIds}); - pool.acquire = function(connectionId, callback) { - callback(null, fakeConnection); + if (++receivedCalls === expectedCalls) { + done(); + } }; subscription.acknowledge_(tooManyFakeAckIds, fakeConnectionId); @@ -527,8 +517,8 @@ describe('Subscription', function() { it('should emit an error when unable to get a conn', function(done) { var error = new Error('err'); - pool.acquire = function(connectionId, callback) { - callback(error); + subscription.writeTo_ = function() { + return Promise.reject(error); }; subscription.on('error', function(err) { @@ -1471,30 +1461,21 @@ describe('Subscription', function() { }); describe('with streaming connection', function() { - var pool; - beforeEach(function() { subscription.isConnected_ = function() { return true; }; - - pool = subscription.connectionPool = {}; }); it('should send the correct request', function(done) { var expectedDeadlines = Array(fakeAckIds.length).fill(fakeDeadline); var fakeConnId = 'abc'; - var fakeConnection = { - write: function(data) { - assert.deepEqual(data.modifyDeadlineAckIds, fakeAckIds); - assert.deepEqual(data.modifyDeadlineSeconds, expectedDeadlines); - done(); - }, - }; - pool.acquire = function(connectionId, callback) { + subscription.writeTo_ = function(connectionId, data) { assert.strictEqual(connectionId, fakeConnId); - callback(null, fakeConnection); + assert.deepEqual(data.modifyDeadlineAckIds, fakeAckIds); + assert.deepEqual(data.modifyDeadlineSeconds, expectedDeadlines); + done(); }; subscription.modifyAckDeadline_(fakeAckIds, fakeDeadline, fakeConnId); @@ -1502,31 +1483,26 @@ describe('Subscription', function() { it('should batch requests if there are too many ackIds', function(done) { var receivedCalls = 0; - var fakeConnId = 'abc'; - var fakeConnection = { - write: function(data) { - var offset = receivedCalls * batchSize; - var expectedAckIds = tooManyFakeAckIds.slice( - offset, - offset + batchSize - ); - var expectedDeadlines = Array(expectedAckIds.length).fill( - fakeDeadline - ); - assert.deepEqual(data.modifyDeadlineAckIds, expectedAckIds); - assert.deepEqual(data.modifyDeadlineSeconds, expectedDeadlines); + subscription.writeTo_ = function(connectionId, data) { + assert.strictEqual(connectionId, fakeConnId); - receivedCalls += 1; - if (receivedCalls === expectedCalls) { - done(); - } - }, - }; + var offset = receivedCalls * batchSize; + var expectedAckIds = tooManyFakeAckIds.slice( + offset, + offset + batchSize + ); + var expectedDeadlines = Array(expectedAckIds.length).fill( + fakeDeadline + ); + + assert.deepEqual(data.modifyDeadlineAckIds, expectedAckIds); + assert.deepEqual(data.modifyDeadlineSeconds, expectedDeadlines); - pool.acquire = function(connectionId, callback) { - callback(null, fakeConnection); + if (++receivedCalls === expectedCalls) { + done(); + } }; subscription.modifyAckDeadline_( @@ -1538,10 +1514,9 @@ describe('Subscription', function() { it('should emit an error when unable to get a conn', function(done) { var error = new Error('err'); - var fakeConnId = 'abc'; - pool.acquire = function(connectionId, callback) { - callback(error); + subscription.writeTo_ = function() { + return Promise.reject(error); }; subscription.on('error', function(err) { @@ -1549,7 +1524,7 @@ describe('Subscription', function() { done(); }); - subscription.modifyAckDeadline_(fakeAckIds, fakeDeadline, fakeConnId); + subscription.modifyAckDeadline_(fakeAckIds, fakeDeadline); }); }); }); @@ -1953,6 +1928,11 @@ describe('Subscription', function() { beforeEach(function() { subscription.isOpen = true; + subscription.latency_ = { + percentile: function() { + return 0; + }, + }; }); after(function() { @@ -1978,6 +1958,28 @@ describe('Subscription', function() { assert.strictEqual(subscription.leaseTimeoutHandle_, fakeTimeoutHandle); }); + it('should subtract the estimated latency', function(done) { + var latency = 1; + + subscription.latency_.percentile = function(percentile) { + assert.strictEqual(percentile, 99); + return latency; + }; + + var ackDeadline = (subscription.ackDeadline = 1000); + + global.Math.random = function() { + return fakeRandom; + }; + + global.setTimeout = function(callback, duration) { + assert.strictEqual(duration, fakeRandom * ackDeadline * 0.9 - latency); + done(); + }; + + subscription.setLeaseTimeout_(); + }); + it('should not set a timeout if one already exists', function() { subscription.renewLeases_ = function() { throw new Error('Should not be called.'); @@ -2079,4 +2081,90 @@ describe('Subscription', function() { subscription.snapshot(SNAPSHOT_NAME); }); }); + + describe('writeTo_', function() { + var CONNECTION_ID = 'abc'; + var CONNECTION = {}; + + beforeEach(function() { + subscription.connectionPool = { + acquire: function(connId, cb) { + cb(null, CONNECTION); + }, + }; + }); + + it('should return a promise', function() { + subscription.connectionPool.acquire = function() {}; + + var returnValue = subscription.writeTo_(); + assert(returnValue instanceof Promise); + }); + + it('should reject the promise if unable to acquire stream', function() { + var fakeError = new Error('err'); + + subscription.connectionPool.acquire = function(connId, cb) { + assert.strictEqual(connId, CONNECTION_ID); + cb(fakeError); + }; + + return subscription.writeTo_(CONNECTION_ID, {}).then( + function() { + throw new Error('Should not resolve.'); + }, + function(err) { + assert.strictEqual(err, fakeError); + } + ); + }); + + it('should write to the stream', function(done) { + var fakeData = {a: 'b'}; + + CONNECTION.write = function(data) { + assert.strictEqual(data, fakeData); + done(); + }; + + subscription.writeTo_(CONNECTION_ID, fakeData); + }); + + it('should reject the promise if unable to write the data', function() { + var fakeError = new Error('err'); + + CONNECTION.write = function(data, cb) { + cb(fakeError); + }; + + return subscription.writeTo_(CONNECTION_ID, {}).then( + function() { + throw new Error('Should not have resolved.'); + }, + function(err) { + assert.strictEqual(err, fakeError); + } + ); + }); + + it('should capture the write latency when successful', function() { + var fakeLatency = 500; + var capturedLatency; + + CONNECTION.write = function(data, cb) { + setTimeout(cb, 500, null); + }; + + subscription.latency_.add = function(value) { + capturedLatency = value; + }; + + return subscription.writeTo_(CONNECTION_ID, {}).then(function() { + var upper = fakeLatency + 50; + var lower = fakeLatency - 50; + + assert(capturedLatency > lower && capturedLatency < upper); + }); + }); + }); });