Skip to content

Commit

Permalink
refactor: improve subscriber error handling (#440)
Browse files Browse the repository at this point in the history
  • Loading branch information
callmehiphop authored and JustinBeckwith committed Jan 24, 2019
1 parent 9371425 commit 1885d61
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 17 deletions.
37 changes: 34 additions & 3 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

import {CallOptions} from 'google-gax';
import {Metadata, ServiceError, status} from 'grpc';
import * as defer from 'p-defer';

import {Message, Subscriber} from './subscriber';
Expand All @@ -37,6 +38,28 @@ export interface BatchOptions {
maxMilliseconds?: number;
}

/**
* Error class used to signal a batch failure.
*
* @class
*
* @param {string} message The error message.
* @param {ServiceError} err The grpc service error.
*/
export class BatchError extends Error implements ServiceError {
ackIds: string[];
code?: status;
metadata?: Metadata;
constructor(err: ServiceError, ackIds: string[], rpc: string) {
super(`Failed to "${rpc}" for ${ackIds.length} message(s). Reason: ${
err.message}`);

this.ackIds = ackIds;
this.code = err.code;
this.metadata = err.metadata;
}
}

/**
* Class for buffering ack/modAck requests.
*
Expand Down Expand Up @@ -162,7 +185,11 @@ export class AckQueue extends MessageQueue {
const ackIds = batch.map(([ackId]) => ackId);
const reqOpts = {subscription: this._subscriber.name, ackIds};

await client.acknowledge(reqOpts, this._options.callOptions!);
try {
await client.acknowledge(reqOpts, this._options.callOptions!);
} catch (e) {
throw new BatchError(e, ackIds, 'acknowledge');
}
}
}

Expand Down Expand Up @@ -194,12 +221,16 @@ export class ModAckQueue extends MessageQueue {
return table;
}, {});

const modAckRequests = Object.keys(modAckTable).map(deadline => {
const modAckRequests = Object.keys(modAckTable).map(async (deadline) => {
const ackIds = modAckTable[deadline];
const ackDeadlineSeconds = Number(deadline);
const reqOpts = {subscription, ackIds, ackDeadlineSeconds};

return client.modifyAckDeadline(reqOpts, this._options.callOptions!);
try {
await client.modifyAckDeadline(reqOpts, this._options.callOptions!);
} catch (e) {
throw new BatchError(e, ackIds, 'modifyAckDeadline');
}
});

await Promise.all(modAckRequests);
Expand Down
56 changes: 44 additions & 12 deletions src/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import {promisify} from '@google-cloud/promisify';
import {ClientStub} from 'google-gax';
import {ClientDuplexStream, Metadata, StatusObject} from 'grpc';
import {ClientDuplexStream, Metadata, ServiceError, status, StatusObject} from 'grpc';
import * as isStreamEnded from 'is-stream-ended';
import {Duplex, PassThrough} from 'stream';

Expand All @@ -27,10 +27,20 @@ import {PullResponse, Subscriber} from './subscriber';
*/
const KEEP_ALIVE_INTERVAL = 30000;

/*!
* Deadline Exceeded status code
*/
const DEADLINE: status = 4;

/*!
* Unknown status code
*/
const UNKNOWN: status = 2;

/*!
* codes to retry streams
*/
const RETRY_CODES: number[] = [
const RETRY_CODES: status[] = [
0, // ok
1, // canceled
2, // unknown
Expand All @@ -45,7 +55,7 @@ const RETRY_CODES: number[] = [
/*!
* default stream options
*/
const DEFAULT_OPTIONS = {
const DEFAULT_OPTIONS: MessageStreamOptions = {
highWaterMark: 0,
maxStreams: 5,
timeout: 300000,
Expand Down Expand Up @@ -73,16 +83,31 @@ type PullStream = ClientDuplexStream<StreamingPullRequest, PullResponse>&
*
* @param {object} status The gRPC status object.
*/
export class StatusError extends Error {
code: number;
metadata: Metadata;
export class StatusError extends Error implements ServiceError {
code?: status;
metadata?: Metadata;
constructor(status: StatusObject) {
super(status.details);
this.code = status.code;
this.metadata = status.metadata;
}
}

/**
* Error thrown when we fail to open a channel for the message stream.
*
* @class
*
* @param {Error} err The original error.
*/
export class ChannelError extends Error implements ServiceError {
code: status;
constructor(err: Error) {
super(`Failed to connect to channel. Reason: ${err.message}`);
this.code = err.message.includes('deadline') ? DEADLINE : UNKNOWN;
}
}

/**
* Ponyfill for destroying streams.
*
Expand Down Expand Up @@ -272,16 +297,18 @@ export class MessageStream extends PassThrough {
}
}
/**
* Sometimes a gRPC status will be emitted as both a status event and an
* error event. In order to cut back on emitted errors, we'll ignore any
* error events that come in AFTER the status has been received.
* gRPC will usually emit a status as a ServiceError via `error` event before
* it emits the status itself. In order to cut back on emitted errors, we'll
* wait a tick on error and ignore it if the status has been received.
*
* @private
*
* @param {stream} stream The stream that errored.
* @param {Error} err The error.
*/
private _onError(stream: PullStream, err: Error): void {
private async _onError(stream: PullStream, err: Error): Promise<void> {
await promisify(setImmediate)();

const code = (err as StatusError).code;
const receivedStatus = this._streams.get(stream) !== false;

Expand Down Expand Up @@ -349,8 +376,13 @@ export class MessageStream extends PassThrough {
* @param {object} client The gRPC client to wait for.
* @returns {Promise}
*/
private _waitForClientReady(client: ClientStub): Promise<void> {
private async _waitForClientReady(client: ClientStub): Promise<void> {
const deadline = Date.now() + this._options.timeout!;
return promisify(client.waitForReady).call(client, deadline);

try {
await promisify(client.waitForReady).call(client, deadline);
} catch (e) {
throw new ChannelError(e);
}
}
}
63 changes: 63 additions & 0 deletions test/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

import * as assert from 'assert';
import {EventEmitter} from 'events';
import {Metadata, ServiceError} from 'grpc';
import * as proxyquire from 'proxyquire';
import * as sinon from 'sinon';
import * as uuid from 'uuid';

import {BatchError} from '../src/message-queues';

class FakeClient {
async acknowledge(reqOpts, callOptions): Promise<void> {}
async modifyAckDeadline(reqOpts, callOptions): Promise<void> {}
Expand Down Expand Up @@ -296,6 +299,36 @@ describe('MessageQueues', () => {
const [, callOptions] = stub.lastCall.args;
assert.strictEqual(callOptions, fakeCallOptions);
});

it('should throw a BatchError if unable to ack', done => {
const messages = [
new FakeMessage(),
new FakeMessage(),
new FakeMessage(),
];

const ackIds = messages.map(message => message.ackId);

const fakeError: ServiceError = new Error('Err.');
fakeError.code = 2;
fakeError.metadata = new Metadata();

const expectedMessage =
`Failed to "acknowledge" for 3 message(s). Reason: Err.`;

sandbox.stub(subscriber.client, 'acknowledge').rejects(fakeError);

subscriber.on('error', (err: BatchError) => {
assert.strictEqual(err.message, expectedMessage);
assert.deepStrictEqual(err.ackIds, ackIds);
assert.strictEqual(err.code, fakeError.code);
assert.strictEqual(err.metadata, fakeError.metadata);
done();
});

messages.forEach(message => ackQueue.add(message));
ackQueue.flush();
});
});

describe('ModAckQueue', () => {
Expand Down Expand Up @@ -376,5 +409,35 @@ describe('MessageQueues', () => {
const [, callOptions] = stub.lastCall.args;
assert.strictEqual(callOptions, fakeCallOptions);
});

it('should throw a BatchError if unable to modAck', done => {
const messages = [
new FakeMessage(),
new FakeMessage(),
new FakeMessage(),
];

const ackIds = messages.map(message => message.ackId);

const fakeError: ServiceError = new Error('Err.');
fakeError.code = 2;
fakeError.metadata = new Metadata();

const expectedMessage =
`Failed to "modifyAckDeadline" for 3 message(s). Reason: Err.`;

sandbox.stub(subscriber.client, 'modifyAckDeadline').rejects(fakeError);

subscriber.on('error', (err: BatchError) => {
assert.strictEqual(err.message, expectedMessage);
assert.deepStrictEqual(err.ackIds, ackIds);
assert.strictEqual(err.code, fakeError.code);
assert.strictEqual(err.metadata, fakeError.metadata);
done();
});

messages.forEach(message => modAckQueue.add(message));
modAckQueue.flush();
});
});
});
33 changes: 31 additions & 2 deletions test/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,15 @@ describe('MessageStream', () => {
});
});

it('should destroy the stream if unable to verify channel', done => {
it('should destroy the stream if unable to connect to channel', done => {
const stub = sandbox.stub(client, 'waitForReady');
const ms = new MessageStream(subscriber);
const fakeError = new Error('err');
const expectedMessage = `Failed to connect to channel. Reason: err`;

ms.on('error', err => {
assert.strictEqual(err, fakeError);
assert.strictEqual(err.code, 2);
assert.strictEqual(err.message, expectedMessage);
assert.strictEqual(ms.destroyed, true);
done();
});
Expand All @@ -394,6 +396,22 @@ describe('MessageStream', () => {
});
});

it('should give a deadline error if waitForReady times out', done => {
const stub = sandbox.stub(client, 'waitForReady');
const ms = new MessageStream(subscriber);
const fakeError = new Error('Failed to connect before the deadline');

ms.on('error', err => {
assert.strictEqual(err.code, 4);
done();
});

setImmediate(() => {
const [, callback] = stub.lastCall.args;
callback(fakeError);
});
});

it('should emit non-status errors', done => {
const fakeError = new Error('err');

Expand All @@ -405,6 +423,17 @@ describe('MessageStream', () => {
client.streams[0].emit('error', fakeError);
});

it('should ignore status errors', done => {
const [stream] = client.streams;
const status = {code: 0};

messageStream.on('error', done);
stream.emit('error', status);
stream.emit('status', status);

setImmediate(done);
});

it('should ignore errors that come in after the status', done => {
const [stream] = client.streams;

Expand Down

0 comments on commit 1885d61

Please sign in to comment.