diff --git a/package.json b/package.json index d78b01a6..035bd35a 100644 --- a/package.json +++ b/package.json @@ -51,6 +51,7 @@ }, "dependencies": { "@google-cloud/paginator": "^0.1.0", + "@google-cloud/precise-date": "^0.1.0", "@google-cloud/projectify": "^0.3.0", "@google-cloud/promisify": "^0.4.0", "arrify": "^1.0.0", diff --git a/src/subscriber.ts b/src/subscriber.ts index d32191ea..2cf542af 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -14,6 +14,7 @@ * limitations under the License. */ +import {DateStruct, PreciseDate} from '@google-cloud/precise-date'; import {replaceProjectIdToken} from '@google-cloud/projectify'; import {promisify} from '@google-cloud/promisify'; import {EventEmitter} from 'events'; @@ -46,33 +47,91 @@ export interface PullResponse { receivedMessages: ReceivedMessage[]; } +/** + * Date object with nanosecond precision. Supports all standard Date arguments + * in addition to several custom types. + * + * @external PreciseDate + * @see {@link https://github.com/googleapis/nodejs-precise-date|PreciseDate} + */ /** * Message objects provide a simple interface for users to get message data and * acknowledge the message. * - * @private - * @class - * - * @param {Subscriber} sub The parent subscriber. - * @param {object} message The raw message response. + * @example + * subscription.on('message', message => { + * // { + * // ackId: 'RUFeQBJMJAxESVMrQwsqWBFOBCEhPjA', + * // attributes: {key: 'value'}, + * // data: Buffer.from('Hello, world!), + * // id: '1551297743043', + * // publishTime: new PreciseDate('2019-02-27T20:02:19.029534186Z'), + * // received: 1551297743043, + * // length: 13 + * // } + * }); */ export class Message { ackId: string; attributes: {}; data: Buffer; id: string; - publishTime: Date; + publishTime: PreciseDate; received: number; private _handled: boolean; private _length: number; private _subscriber: Subscriber; + /** + * @hideconstructor + * + * @param {Subscriber} sub The parent subscriber. + * @param {object} message The raw message response. + */ constructor(sub: Subscriber, {ackId, message}: ReceivedMessage) { + /** + * This ID is used to acknowledge the message. + * + * @name Message#ackId + * @type {string} + */ this.ackId = ackId; + /** + * Optional attributes for this message. + * + * @name Message#attributes + * @type {object} + */ this.attributes = message.attributes || {}; + /** + * The message data as a Buffer. + * + * @name Message#data + * @type {Buffer} + */ this.data = message.data; + /** + * ID of the message, assigned by the server when the message is published. + * Guaranteed to be unique within the topic. + * + * @name Message#id + * @type {string} + */ this.id = message.messageId; - this.publishTime = Message.formatTimestamp(message.publishTime); + /** + * The time at which the message was published. + * + * @name Message#publishTime + * @type {external:PreciseDate} + */ + this.publishTime = new PreciseDate(message.publishTime as DateStruct); + /** + * The time at which the message was recieved by the subscription. + * + * @name Message#recieved + * @type {number} + */ this.received = Date.now(); + this._handled = false; this._length = this.data.length; this._subscriber = sub; @@ -81,14 +140,17 @@ export class Message { * The length of the message data. * * @type {number} - * @private */ get length() { return this._length; } /** * Acknowledges the message. - * @private + * + * @example + * subscription.on('message', message => { + * message.ack(); + * }); */ ack(): void { if (!this._handled) { @@ -113,7 +175,16 @@ export class Message { * * @param {number} [delay=0] The desired time to wait before the * redelivery occurs. - * @private + * + * @example + * subscription.on('message', message => { + * message.nack(); + * }); + * + * @example Specify a delay to redeliver the message + * subscription.on('message', message => { + * message.nack(60); // redeliver in 1 minute + * }); */ nack(delay?: number): void { if (!this._handled) { @@ -121,19 +192,6 @@ export class Message { this._subscriber.nack(this, delay); } } - /** - * Formats the protobuf timestamp into a JavaScript date. - * - * @private - * - * @param {object} timestamp The protobuf timestamp. - * @return {date} - */ - static formatTimestamp({nanos = 0, seconds = 0}: protobuf.ITimestamp): Date { - const ms: number = Number(nanos) / 1e6; - const s: number = Number(seconds) * 1000; - return new Date(ms + s); - } } /** diff --git a/test/subscriber.ts b/test/subscriber.ts index 78fd8e5b..4204d127 100644 --- a/test/subscriber.ts +++ b/test/subscriber.ts @@ -16,6 +16,7 @@ import * as assert from 'assert'; import {EventEmitter} from 'events'; +import {common as protobuf} from 'protobufjs'; import * as proxyquire from 'proxyquire'; import * as sinon from 'sinon'; import {PassThrough} from 'stream'; @@ -115,6 +116,13 @@ class FakeMessageStream extends PassThrough { destroy(error?: Error): void {} } +class FakePreciseDate { + value: protobuf.ITimestamp; + constructor(date: protobuf.ITimestamp) { + this.value = date; + } +} + const RECEIVED_MESSAGE = { ackId: uuid.v4(), message: { @@ -142,6 +150,7 @@ describe('Subscriber', () => { before(() => { const s = proxyquire('../src/subscriber.js', { + '@google-cloud/precise-date': {PreciseDate: FakePreciseDate}, '@google-cloud/projectify': fakeProjectify, './histogram': {Histogram: FakeHistogram}, './lease-manager': {LeaseManager: FakeLeaseManager}, @@ -599,15 +608,12 @@ describe('Subscriber', () => { }); it('should localize publishTime', () => { - const fakeDate = new Date(); - - sandbox.stub(Message, 'formatTimestamp') - .withArgs(RECEIVED_MESSAGE.message.publishTime) - .returns(fakeDate); - const m = new Message(subscriber, RECEIVED_MESSAGE); + const timestamp = m.publishTime as unknown as FakePreciseDate; - assert.strictEqual(m.publishTime, fakeDate); + assert(timestamp instanceof FakePreciseDate); + assert.strictEqual( + timestamp.value, RECEIVED_MESSAGE.message.publishTime); }); it('should localize recieved time', () => { @@ -700,18 +706,5 @@ describe('Subscriber', () => { assert.strictEqual(stub.callCount, 0); }); }); - - describe('formatTimestamp', () => { - it('should format the timestamp object', () => { - const publishTime = RECEIVED_MESSAGE.message.publishTime; - const actual = Message.formatTimestamp(publishTime); - - const ms = publishTime.nanos / 1e6; - const s = publishTime.seconds * 1000; - const expectedDate = new Date(ms + s); - - assert.deepStrictEqual(actual, expectedDate); - }); - }); }); });