From 4a614a408ad586a66f832b979bc037b4670453d6 Mon Sep 17 00:00:00 2001 From: vijay-qlogic <36055624+vijay-qlogic@users.noreply.github.com> Date: Thu, 22 Nov 2018 23:52:05 +0530 Subject: [PATCH] refactor(ts): added ts style fix for src/subscriber.ts (#359) --- src/subscriber.ts | 97 ++++++++++++++++++++++------------------------- 1 file changed, 45 insertions(+), 52 deletions(-) diff --git a/src/subscriber.ts b/src/subscriber.ts index ec1e7b7ed..56a845b31 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -25,13 +25,14 @@ import * as os from 'os'; import {ConnectionPool} from './connection-pool'; import {Histogram} from './histogram'; -import { Subscription } from '.'; +import {Subscription} from '.'; /** * @type {number} - The maximum number of ackIds to be sent in acknowledge/modifyAckDeadline - * requests. There is an API limit of 524288 bytes (512KiB) per acknowledge/modifyAckDeadline - * request. ackIds have a maximum size of 164 bytes, so 524288/164 ~= 3197. Accounting for some - * overhead, a maximum of 3000 ackIds per request should be safe. + * requests. There is an API limit of 524288 bytes (512KiB) per + * acknowledge/modifyAckDeadline request. ackIds have a maximum size of 164 + * bytes, so 524288/164 ~= 3197. Accounting for some overhead, a maximum of 3000 + * ackIds per request should be safe. * @private */ const MAX_ACK_IDS_PER_REQUEST = 3000; @@ -75,18 +76,16 @@ export class Subscriber extends EventEmitter { bytes: 0, }; this.flowControl = Object.assign( - { - maxBytes: os.freemem() * 0.2, - maxMessages: 100, - }, - options.flowControl - ); + { + maxBytes: os.freemem() * 0.2, + maxMessages: 100, + }, + options.flowControl); this.batching = Object.assign( - { - maxMilliseconds: 100, - }, - options.batching - ); + { + maxMilliseconds: 100, + }, + options.batching); this.flushTimeoutHandle_ = null; this.leaseTimeoutHandle_ = null; this.userClosed_ = false; @@ -94,7 +93,8 @@ export class Subscriber extends EventEmitter { this.messageListeners = 0; // As of right now we do not write any acks/modacks to the pull streams. // But with allowing users to opt out of using streaming pulls altogether on - // the horizon, we may need to support this feature again in the near future. + // the horizon, we may need to support this feature again in the near + // future. this.writeToStreams_ = false; this.listenForEvents_(); } @@ -126,7 +126,7 @@ export class Subscriber extends EventEmitter { * @param {string} [connId] Connection ID to send request on. * @return {Promise} */ - acknowledge_(ackIds: string|string[], connId?: string): Promise { + acknowledge_(ackIds: string|string[], connId?: string) { ackIds = arrify(ackIds); const promises = chunk(ackIds, MAX_ACK_IDS_PER_REQUEST).map(ackIdChunk => { if (this.writeToStreams_ && this.isConnected_()) { @@ -146,8 +146,8 @@ export class Subscriber extends EventEmitter { }); } /*! - * Breaks the lease on a message. Essentially this means we no longer treat the - * message as being un-acked and count it towards the flow control limits. + * Breaks the lease on a message. Essentially this means we no longer treat + * the message as being un-acked and count it towards the flow control limits. * * If the pool was previously paused and we freed up space, we'll continue to * recieve messages. @@ -230,7 +230,7 @@ export class Subscriber extends EventEmitter { * * @private */ - flushQueues_(): Promise { + flushQueues_(): Promise { if (this.flushTimeoutHandle_) { this.flushTimeoutHandle_.clear(); this.flushTimeoutHandle_ = null; @@ -242,14 +242,12 @@ export class Subscriber extends EventEmitter { return Promise.resolve(); } - const requests: Promise[] = []; + const requests: Array> = []; if (acks.length) { - requests.push( - this.acknowledge_(acks).then(() => { - this.inventory_.ack = []; - }) - ); + requests.push(this.acknowledge_(acks).then(() => { + this.inventory_.ack = []; + })); } if (nacks.length) { @@ -262,8 +260,9 @@ export class Subscriber extends EventEmitter { return table; }, {}); - const modAckRequests = Object.keys(modAcks).map(deadline => - this.modifyAckDeadline_(modAcks[deadline], Number(deadline))); + const modAckRequests = Object.keys(modAcks).map( + deadline => + this.modifyAckDeadline_(modAcks[deadline], Number(deadline))); requests.push.apply(requests, modAckRequests); @@ -294,9 +293,8 @@ export class Subscriber extends EventEmitter { */ hasMaxMessages_() { return ( - this.inventory_.lease.length >= this.flowControl.maxMessages || - this.inventory_.bytes >= this.flowControl.maxBytes - ); + this.inventory_.lease.length >= this.flowControl.maxMessages || + this.inventory_.bytes >= this.flowControl.maxBytes); } /*! * Leases a message. This will add the message to our inventory list and then @@ -309,10 +307,7 @@ export class Subscriber extends EventEmitter { */ leaseMessage_(message) { this.modifyAckDeadline_( - message.ackId, - this.ackDeadline / 1000, - message.connectionId - ); + message.ackId, this.ackDeadline / 1000, message.connectionId); this.inventory_.lease.push(message.ackId); this.inventory_.bytes += message.length; this.setLeaseTimeout_(); @@ -357,13 +352,14 @@ export class Subscriber extends EventEmitter { * @param {string=} connId Connection ID to send request on. * @return {Promise} */ - modifyAckDeadline_(ackIds: string|string[], deadline: number, connId?: string) { + modifyAckDeadline_( + ackIds: string|string[], deadline: number, connId?: string) { ackIds = arrify(ackIds); const promises = chunk(ackIds, MAX_ACK_IDS_PER_REQUEST).map(ackIdChunk => { if (this.writeToStreams_ && this.isConnected_()) { return this.writeTo_(connId, { modifyDeadlineAckIds: ackIdChunk, - modifyDeadlineSeconds: Array(ackIdChunk.length).fill(deadline), + modifyDeadlineSeconds: new Array(ackIdChunk.length).fill(deadline), }); } return promisify(this.request).call(this, { @@ -394,9 +390,8 @@ export class Subscriber extends EventEmitter { const breakLease = this.breakLease_.bind(this, message); if (this.isConnected_()) { - this.modifyAckDeadline_(message.ackId, delay, message.connectionId).then( - breakLease - ); + this.modifyAckDeadline_(message.ackId, delay, message.connectionId) + .then(breakLease); return; } @@ -410,7 +405,8 @@ export class Subscriber extends EventEmitter { */ openConnection_() { // TODO: fixup this cast - const pool = (this.connectionPool = new ConnectionPool(this as {} as Subscription)); + const pool = + (this.connectionPool = new ConnectionPool(this as {} as Subscription)); this.isOpen = true; pool.on('error', err => { this.emit('error', err); @@ -445,25 +441,24 @@ export class Subscriber extends EventEmitter { }); } /*! - * Sets a timeout to flush any acks/nacks that have been made since the pool has - * closed. + * Sets a timeout to flush any acks/nacks that have been made since the pool + * has closed. * * @private */ setFlushTimeout_() { if (!this.flushTimeoutHandle_) { const timeout = delay(this.batching.maxMilliseconds); - const promise = timeout - .then(this.flushQueues_.bind(this)) - .catch(util.noop); + const promise = + timeout.then(this.flushQueues_.bind(this)).catch(util.noop); promise.clear = timeout.clear.bind(timeout); this.flushTimeoutHandle_ = promise; } return this.flushTimeoutHandle_; } /*! - * Sets a timeout to modify the ack deadlines for any unacked/unnacked messages, - * renewing their lease. + * Sets a timeout to modify the ack deadlines for any unacked/unnacked + * messages, renewing their lease. * * @private */ @@ -473,10 +468,8 @@ export class Subscriber extends EventEmitter { } const latency = this.latency_.percentile(99); const timeout = Math.random() * this.ackDeadline * 0.9 - latency; - this.leaseTimeoutHandle_ = setTimeout( - this.renewLeases_.bind(this), - timeout - ); + this.leaseTimeoutHandle_ = + setTimeout(this.renewLeases_.bind(this), timeout); } /** * Writes to specified duplex stream. This is useful for capturing write