Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(message): use precise-date for message publish time #503

Merged
merged 6 commits into from
Feb 28, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
},
"dependencies": {
"@google-cloud/paginator": "^0.1.0",
"@google-cloud/precise-date": "^0.1.0",
"@google-cloud/projectify": "^0.3.0",
"@google-cloud/promisify": "^0.4.0",
"arrify": "^1.0.0",
Expand Down
104 changes: 81 additions & 23 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

import {DateStruct, PreciseDate} from '@google-cloud/precise-date';
import {replaceProjectIdToken} from '@google-cloud/projectify';
import {promisify} from '@google-cloud/promisify';
import {EventEmitter} from 'events';
Expand Down Expand Up @@ -46,33 +47,91 @@ export interface PullResponse {
receivedMessages: ReceivedMessage[];
}

/**
* Date object with nanosecond precision. Supports all standard Date arguments
* in addition to several custom types.
*
* @external PreciseDate
* @see {@link https://github.com/googleapis/nodejs-precise-date|PreciseDate}
*/
/**
* Message objects provide a simple interface for users to get message data and
* acknowledge the message.
*
* @private
* @class
*
* @param {Subscriber} sub The parent subscriber.
* @param {object} message The raw message response.
* @example
* subscription.on('message', message => {
* // {
* // ackId: 'RUFeQBJMJAxESVMrQwsqWBFOBCEhPjA',
* // attributes: {key: 'value'},
* // data: Buffer.from('Hello, world!),
* // id: '1551297743043',
* // publishTime: new PreciseDate('2019-02-27T20:02:19.029534186Z'),
* // received: 1551297743043
callmehiphop marked this conversation as resolved.
Show resolved Hide resolved
* // length: 13
* // }
* });
*/
export class Message {
ackId: string;
attributes: {};
data: Buffer;
id: string;
publishTime: Date;
publishTime: PreciseDate;
received: number;
private _handled: boolean;
private _length: number;
private _subscriber: Subscriber;
/**
* @hideconstructor
*
* @param {Subscriber} sub The parent subscriber.
* @param {object} message The raw message response.
*/
constructor(sub: Subscriber, {ackId, message}: ReceivedMessage) {
/**
* This ID is used to acknowledge the message.
*
* @name Message#ackId
* @type {string}
*/
this.ackId = ackId;
/**
* Optional attributes for this message.
*
* @name Message#attributes
* @type {object}
*/
this.attributes = message.attributes || {};
/**
* The message data as a Buffer.
*
* @name Message#data
* @type {Buffer}
*/
this.data = message.data;
/**
* ID of the message, assigned by the server when the message is published.
* Guaranteed to be unique within the topic.
*
* @name Message#id
* @type {string}
*/
this.id = message.messageId;
this.publishTime = Message.formatTimestamp(message.publishTime);
/**
* The time at which the message was published.
*
* @name Message#publishTime
* @type {external:PreciseDate}
*/
this.publishTime = new PreciseDate(message.publishTime as DateStruct);
/**
* The time at which the message was recieved by the subscription.
*
* @name Message#recieved
* @type {number}
*/
this.received = Date.now();

this._handled = false;
this._length = this.data.length;
this._subscriber = sub;
Expand All @@ -81,14 +140,17 @@ export class Message {
* The length of the message data.
*
* @type {number}
* @private
*/
get length() {
return this._length;
}
/**
* Acknowledges the message.
* @private
*
* @example
* subscription.on('message', message => {
* message.ack();
* });
*/
ack(): void {
if (!this._handled) {
Expand All @@ -113,27 +175,23 @@ export class Message {
*
* @param {number} [delay=0] The desired time to wait before the
* redelivery occurs.
* @private
*
* @example
* subscription.on('message', message => {
* message.nack();
* });
*
* @example <caption>Specify a delay to redeliver the message</caption>
* subscription.on('message', message => {
* message.nack(60); // redeliver in 1 minute
* });
*/
nack(delay?: number): void {
if (!this._handled) {
this._handled = true;
this._subscriber.nack(this, delay);
}
}
/**
* Formats the protobuf timestamp into a JavaScript date.
*
* @private
*
* @param {object} timestamp The protobuf timestamp.
* @return {date}
*/
static formatTimestamp({nanos = 0, seconds = 0}: protobuf.ITimestamp): Date {
const ms: number = Number(nanos) / 1e6;
const s: number = Number(seconds) * 1000;
return new Date(ms + s);
}
}

/**
Expand Down
33 changes: 13 additions & 20 deletions test/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import * as assert from 'assert';
import {EventEmitter} from 'events';
import {common as protobuf} from 'protobufjs';
import * as proxyquire from 'proxyquire';
import * as sinon from 'sinon';
import {PassThrough} from 'stream';
Expand Down Expand Up @@ -115,6 +116,13 @@ class FakeMessageStream extends PassThrough {
destroy(error?: Error): void {}
}

class FakePreciseDate {
value: protobuf.ITimestamp;
constructor(date: protobuf.ITimestamp) {
this.value = date;
}
}

const RECEIVED_MESSAGE = {
ackId: uuid.v4(),
message: {
Expand Down Expand Up @@ -142,6 +150,7 @@ describe('Subscriber', () => {

before(() => {
const s = proxyquire('../src/subscriber.js', {
'@google-cloud/precise-date': {PreciseDate: FakePreciseDate},
'@google-cloud/projectify': fakeProjectify,
'./histogram': {Histogram: FakeHistogram},
'./lease-manager': {LeaseManager: FakeLeaseManager},
Expand Down Expand Up @@ -599,15 +608,12 @@ describe('Subscriber', () => {
});

it('should localize publishTime', () => {
const fakeDate = new Date();

sandbox.stub(Message, 'formatTimestamp')
.withArgs(RECEIVED_MESSAGE.message.publishTime)
.returns(fakeDate);

const m = new Message(subscriber, RECEIVED_MESSAGE);
const timestamp = m.publishTime as unknown as FakePreciseDate;

assert.strictEqual(m.publishTime, fakeDate);
assert(timestamp instanceof FakePreciseDate);
assert.strictEqual(
timestamp.value, RECEIVED_MESSAGE.message.publishTime);
});

it('should localize recieved time', () => {
Expand Down Expand Up @@ -700,18 +706,5 @@ describe('Subscriber', () => {
assert.strictEqual(stub.callCount, 0);
});
});

describe('formatTimestamp', () => {
it('should format the timestamp object', () => {
const publishTime = RECEIVED_MESSAGE.message.publishTime;
const actual = Message.formatTimestamp(publishTime);

const ms = publishTime.nanos / 1e6;
const s = publishTime.seconds * 1000;
const expectedDate = new Date(ms + s);

assert.deepStrictEqual(actual, expectedDate);
});
});
});
});