Skip to content

Commit

Permalink
refactor(ts): added ts style fix for src/subscriber.ts (#359)
Browse files Browse the repository at this point in the history
  • Loading branch information
vijay-qlogic authored and JustinBeckwith committed Nov 22, 2018
1 parent dafd67c commit 4a614a4
Showing 1 changed file with 45 additions and 52 deletions.
97 changes: 45 additions & 52 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import * as os from 'os';

import {ConnectionPool} from './connection-pool';
import {Histogram} from './histogram';
import { Subscription } from '.';
import {Subscription} from '.';

/**
* @type {number} - The maximum number of ackIds to be sent in acknowledge/modifyAckDeadline
* requests. There is an API limit of 524288 bytes (512KiB) per acknowledge/modifyAckDeadline
* request. ackIds have a maximum size of 164 bytes, so 524288/164 ~= 3197. Accounting for some
* overhead, a maximum of 3000 ackIds per request should be safe.
* requests. There is an API limit of 524288 bytes (512KiB) per
* acknowledge/modifyAckDeadline request. ackIds have a maximum size of 164
* bytes, so 524288/164 ~= 3197. Accounting for some overhead, a maximum of 3000
* ackIds per request should be safe.
* @private
*/
const MAX_ACK_IDS_PER_REQUEST = 3000;
Expand Down Expand Up @@ -75,26 +76,25 @@ export class Subscriber extends EventEmitter {
bytes: 0,
};
this.flowControl = Object.assign(
{
maxBytes: os.freemem() * 0.2,
maxMessages: 100,
},
options.flowControl
);
{
maxBytes: os.freemem() * 0.2,
maxMessages: 100,
},
options.flowControl);
this.batching = Object.assign(
{
maxMilliseconds: 100,
},
options.batching
);
{
maxMilliseconds: 100,
},
options.batching);
this.flushTimeoutHandle_ = null;
this.leaseTimeoutHandle_ = null;
this.userClosed_ = false;
this.isOpen = false;
this.messageListeners = 0;
// As of right now we do not write any acks/modacks to the pull streams.
// But with allowing users to opt out of using streaming pulls altogether on
// the horizon, we may need to support this feature again in the near future.
// the horizon, we may need to support this feature again in the near
// future.
this.writeToStreams_ = false;
this.listenForEvents_();
}
Expand Down Expand Up @@ -126,7 +126,7 @@ export class Subscriber extends EventEmitter {
* @param {string} [connId] Connection ID to send request on.
* @return {Promise}
*/
acknowledge_(ackIds: string|string[], connId?: string): Promise<any> {
acknowledge_(ackIds: string|string[], connId?: string) {
ackIds = arrify(ackIds);
const promises = chunk(ackIds, MAX_ACK_IDS_PER_REQUEST).map(ackIdChunk => {
if (this.writeToStreams_ && this.isConnected_()) {
Expand All @@ -146,8 +146,8 @@ export class Subscriber extends EventEmitter {
});
}
/*!
* Breaks the lease on a message. Essentially this means we no longer treat the
* message as being un-acked and count it towards the flow control limits.
* Breaks the lease on a message. Essentially this means we no longer treat
* the message as being un-acked and count it towards the flow control limits.
*
* If the pool was previously paused and we freed up space, we'll continue to
* recieve messages.
Expand Down Expand Up @@ -230,7 +230,7 @@ export class Subscriber extends EventEmitter {
*
* @private
*/
flushQueues_(): Promise<any> {
flushQueues_(): Promise<void|void[]> {
if (this.flushTimeoutHandle_) {
this.flushTimeoutHandle_.clear();
this.flushTimeoutHandle_ = null;
Expand All @@ -242,14 +242,12 @@ export class Subscriber extends EventEmitter {
return Promise.resolve();
}

const requests: Promise<void>[] = [];
const requests: Array<Promise<void>> = [];

if (acks.length) {
requests.push(
this.acknowledge_(acks).then(() => {
this.inventory_.ack = [];
})
);
requests.push(this.acknowledge_(acks).then(() => {
this.inventory_.ack = [];
}));
}

if (nacks.length) {
Expand All @@ -262,8 +260,9 @@ export class Subscriber extends EventEmitter {
return table;
}, {});

const modAckRequests = Object.keys(modAcks).map(deadline =>
this.modifyAckDeadline_(modAcks[deadline], Number(deadline)));
const modAckRequests = Object.keys(modAcks).map(
deadline =>
this.modifyAckDeadline_(modAcks[deadline], Number(deadline)));

requests.push.apply(requests, modAckRequests);

Expand Down Expand Up @@ -294,9 +293,8 @@ export class Subscriber extends EventEmitter {
*/
hasMaxMessages_() {
return (
this.inventory_.lease.length >= this.flowControl.maxMessages ||
this.inventory_.bytes >= this.flowControl.maxBytes
);
this.inventory_.lease.length >= this.flowControl.maxMessages ||
this.inventory_.bytes >= this.flowControl.maxBytes);
}
/*!
* Leases a message. This will add the message to our inventory list and then
Expand All @@ -309,10 +307,7 @@ export class Subscriber extends EventEmitter {
*/
leaseMessage_(message) {
this.modifyAckDeadline_(
message.ackId,
this.ackDeadline / 1000,
message.connectionId
);
message.ackId, this.ackDeadline / 1000, message.connectionId);
this.inventory_.lease.push(message.ackId);
this.inventory_.bytes += message.length;
this.setLeaseTimeout_();
Expand Down Expand Up @@ -357,13 +352,14 @@ export class Subscriber extends EventEmitter {
* @param {string=} connId Connection ID to send request on.
* @return {Promise}
*/
modifyAckDeadline_(ackIds: string|string[], deadline: number, connId?: string) {
modifyAckDeadline_(
ackIds: string|string[], deadline: number, connId?: string) {
ackIds = arrify(ackIds);
const promises = chunk(ackIds, MAX_ACK_IDS_PER_REQUEST).map(ackIdChunk => {
if (this.writeToStreams_ && this.isConnected_()) {
return this.writeTo_(connId, {
modifyDeadlineAckIds: ackIdChunk,
modifyDeadlineSeconds: Array(ackIdChunk.length).fill(deadline),
modifyDeadlineSeconds: new Array(ackIdChunk.length).fill(deadline),
});
}
return promisify(this.request).call(this, {
Expand Down Expand Up @@ -394,9 +390,8 @@ export class Subscriber extends EventEmitter {
const breakLease = this.breakLease_.bind(this, message);

if (this.isConnected_()) {
this.modifyAckDeadline_(message.ackId, delay, message.connectionId).then(
breakLease
);
this.modifyAckDeadline_(message.ackId, delay, message.connectionId)
.then(breakLease);
return;
}

Expand All @@ -410,7 +405,8 @@ export class Subscriber extends EventEmitter {
*/
openConnection_() {
// TODO: fixup this cast
const pool = (this.connectionPool = new ConnectionPool(this as {} as Subscription));
const pool =
(this.connectionPool = new ConnectionPool(this as {} as Subscription));
this.isOpen = true;
pool.on('error', err => {
this.emit('error', err);
Expand Down Expand Up @@ -445,25 +441,24 @@ export class Subscriber extends EventEmitter {
});
}
/*!
* Sets a timeout to flush any acks/nacks that have been made since the pool has
* closed.
* Sets a timeout to flush any acks/nacks that have been made since the pool
* has closed.
*
* @private
*/
setFlushTimeout_() {
if (!this.flushTimeoutHandle_) {
const timeout = delay(this.batching.maxMilliseconds);
const promise = timeout
.then(this.flushQueues_.bind(this))
.catch(util.noop);
const promise =
timeout.then(this.flushQueues_.bind(this)).catch(util.noop);
promise.clear = timeout.clear.bind(timeout);
this.flushTimeoutHandle_ = promise;
}
return this.flushTimeoutHandle_;
}
/*!
* Sets a timeout to modify the ack deadlines for any unacked/unnacked messages,
* renewing their lease.
* Sets a timeout to modify the ack deadlines for any unacked/unnacked
* messages, renewing their lease.
*
* @private
*/
Expand All @@ -473,10 +468,8 @@ export class Subscriber extends EventEmitter {
}
const latency = this.latency_.percentile(99);
const timeout = Math.random() * this.ackDeadline * 0.9 - latency;
this.leaseTimeoutHandle_ = setTimeout(
this.renewLeases_.bind(this),
timeout
);
this.leaseTimeoutHandle_ =
setTimeout(this.renewLeases_.bind(this), timeout);
}
/**
* Writes to specified duplex stream. This is useful for capturing write
Expand Down

0 comments on commit 4a614a4

Please sign in to comment.