Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ts): added ts style fix for src/subscriber.ts #359

Merged
merged 2 commits into from
Nov 22, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 45 additions & 52 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import * as os from 'os';

import {ConnectionPool} from './connection-pool';
import {Histogram} from './histogram';
import { Subscription } from '.';
import {Subscription} from '.';

/**
* @type {number} - The maximum number of ackIds to be sent in acknowledge/modifyAckDeadline
* requests. There is an API limit of 524288 bytes (512KiB) per acknowledge/modifyAckDeadline
* request. ackIds have a maximum size of 164 bytes, so 524288/164 ~= 3197. Accounting for some
* overhead, a maximum of 3000 ackIds per request should be safe.
* requests. There is an API limit of 524288 bytes (512KiB) per
* acknowledge/modifyAckDeadline request. ackIds have a maximum size of 164
* bytes, so 524288/164 ~= 3197. Accounting for some overhead, a maximum of 3000
* ackIds per request should be safe.
* @private
*/
const MAX_ACK_IDS_PER_REQUEST = 3000;
Expand Down Expand Up @@ -75,26 +76,25 @@ export class Subscriber extends EventEmitter {
bytes: 0,
};
this.flowControl = Object.assign(
{
maxBytes: os.freemem() * 0.2,
maxMessages: 100,
},
options.flowControl
);
{
maxBytes: os.freemem() * 0.2,
maxMessages: 100,
},
options.flowControl);
this.batching = Object.assign(
{
maxMilliseconds: 100,
},
options.batching
);
{
maxMilliseconds: 100,
},
options.batching);
this.flushTimeoutHandle_ = null;
this.leaseTimeoutHandle_ = null;
this.userClosed_ = false;
this.isOpen = false;
this.messageListeners = 0;
// As of right now we do not write any acks/modacks to the pull streams.
// But with allowing users to opt out of using streaming pulls altogether on
// the horizon, we may need to support this feature again in the near future.
// the horizon, we may need to support this feature again in the near
// future.
this.writeToStreams_ = false;
this.listenForEvents_();
}
Expand Down Expand Up @@ -126,7 +126,7 @@ export class Subscriber extends EventEmitter {
* @param {string} [connId] Connection ID to send request on.
* @return {Promise}
*/
acknowledge_(ackIds: string|string[], connId?: string): Promise<any> {
acknowledge_(ackIds: string|string[], connId?: string) {
ackIds = arrify(ackIds);
const promises = chunk(ackIds, MAX_ACK_IDS_PER_REQUEST).map(ackIdChunk => {
if (this.writeToStreams_ && this.isConnected_()) {
Expand All @@ -146,8 +146,8 @@ export class Subscriber extends EventEmitter {
});
}
/*!
* Breaks the lease on a message. Essentially this means we no longer treat the
* message as being un-acked and count it towards the flow control limits.
* Breaks the lease on a message. Essentially this means we no longer treat
* the message as being un-acked and count it towards the flow control limits.
*
* If the pool was previously paused and we freed up space, we'll continue to
* recieve messages.
Expand Down Expand Up @@ -230,7 +230,7 @@ export class Subscriber extends EventEmitter {
*
* @private
*/
flushQueues_(): Promise<any> {
flushQueues_(): Promise<void|void[]> {
if (this.flushTimeoutHandle_) {
this.flushTimeoutHandle_.clear();
this.flushTimeoutHandle_ = null;
Expand All @@ -242,14 +242,12 @@ export class Subscriber extends EventEmitter {
return Promise.resolve();
}

const requests: Promise<void>[] = [];
const requests: Array<Promise<void>> = [];

if (acks.length) {
requests.push(
this.acknowledge_(acks).then(() => {
this.inventory_.ack = [];
})
);
requests.push(this.acknowledge_(acks).then(() => {
this.inventory_.ack = [];
}));
}

if (nacks.length) {
Expand All @@ -262,8 +260,9 @@ export class Subscriber extends EventEmitter {
return table;
}, {});

const modAckRequests = Object.keys(modAcks).map(deadline =>
this.modifyAckDeadline_(modAcks[deadline], Number(deadline)));
const modAckRequests = Object.keys(modAcks).map(
deadline =>
this.modifyAckDeadline_(modAcks[deadline], Number(deadline)));

requests.push.apply(requests, modAckRequests);

Expand Down Expand Up @@ -294,9 +293,8 @@ export class Subscriber extends EventEmitter {
*/
hasMaxMessages_() {
return (
this.inventory_.lease.length >= this.flowControl.maxMessages ||
this.inventory_.bytes >= this.flowControl.maxBytes
);
this.inventory_.lease.length >= this.flowControl.maxMessages ||
this.inventory_.bytes >= this.flowControl.maxBytes);
}
/*!
* Leases a message. This will add the message to our inventory list and then
Expand All @@ -309,10 +307,7 @@ export class Subscriber extends EventEmitter {
*/
leaseMessage_(message) {
this.modifyAckDeadline_(
message.ackId,
this.ackDeadline / 1000,
message.connectionId
);
message.ackId, this.ackDeadline / 1000, message.connectionId);
this.inventory_.lease.push(message.ackId);
this.inventory_.bytes += message.length;
this.setLeaseTimeout_();
Expand Down Expand Up @@ -357,13 +352,14 @@ export class Subscriber extends EventEmitter {
* @param {string=} connId Connection ID to send request on.
* @return {Promise}
*/
modifyAckDeadline_(ackIds: string|string[], deadline: number, connId?: string) {
modifyAckDeadline_(
ackIds: string|string[], deadline: number, connId?: string) {
ackIds = arrify(ackIds);
const promises = chunk(ackIds, MAX_ACK_IDS_PER_REQUEST).map(ackIdChunk => {
if (this.writeToStreams_ && this.isConnected_()) {
return this.writeTo_(connId, {
modifyDeadlineAckIds: ackIdChunk,
modifyDeadlineSeconds: Array(ackIdChunk.length).fill(deadline),
modifyDeadlineSeconds: new Array(ackIdChunk.length).fill(deadline),
});
}
return promisify(this.request).call(this, {
Expand Down Expand Up @@ -394,9 +390,8 @@ export class Subscriber extends EventEmitter {
const breakLease = this.breakLease_.bind(this, message);

if (this.isConnected_()) {
this.modifyAckDeadline_(message.ackId, delay, message.connectionId).then(
breakLease
);
this.modifyAckDeadline_(message.ackId, delay, message.connectionId)
.then(breakLease);
return;
}

Expand All @@ -410,7 +405,8 @@ export class Subscriber extends EventEmitter {
*/
openConnection_() {
// TODO: fixup this cast
const pool = (this.connectionPool = new ConnectionPool(this as {} as Subscription));
const pool =
(this.connectionPool = new ConnectionPool(this as {} as Subscription));
this.isOpen = true;
pool.on('error', err => {
this.emit('error', err);
Expand Down Expand Up @@ -445,25 +441,24 @@ export class Subscriber extends EventEmitter {
});
}
/*!
* Sets a timeout to flush any acks/nacks that have been made since the pool has
* closed.
* Sets a timeout to flush any acks/nacks that have been made since the pool
* has closed.
*
* @private
*/
setFlushTimeout_() {
if (!this.flushTimeoutHandle_) {
const timeout = delay(this.batching.maxMilliseconds);
const promise = timeout
.then(this.flushQueues_.bind(this))
.catch(util.noop);
const promise =
timeout.then(this.flushQueues_.bind(this)).catch(util.noop);
promise.clear = timeout.clear.bind(timeout);
this.flushTimeoutHandle_ = promise;
}
return this.flushTimeoutHandle_;
}
/*!
* Sets a timeout to modify the ack deadlines for any unacked/unnacked messages,
* renewing their lease.
* Sets a timeout to modify the ack deadlines for any unacked/unnacked
* messages, renewing their lease.
*
* @private
*/
Expand All @@ -473,10 +468,8 @@ export class Subscriber extends EventEmitter {
}
const latency = this.latency_.percentile(99);
const timeout = Math.random() * this.ackDeadline * 0.9 - latency;
this.leaseTimeoutHandle_ = setTimeout(
this.renewLeases_.bind(this),
timeout
);
this.leaseTimeoutHandle_ =
setTimeout(this.renewLeases_.bind(this), timeout);
}
/**
* Writes to specified duplex stream. This is useful for capturing write
Expand Down