Skip to content

Commit

Permalink
fix: ignore messages that come in after close (#485)
Browse files Browse the repository at this point in the history
  • Loading branch information
callmehiphop authored and JustinBeckwith committed Feb 18, 2019
1 parent 58ea4b4 commit e59f8ec
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 29 deletions.
4 changes: 3 additions & 1 deletion src/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ export class LeaseManager extends EventEmitter {
* @param {Message} message The message to emit.
*/
private _dispense(message: Message): void {
process.nextTick(() => this._subscriber.emit('message', message));
if (this._subscriber.isOpen) {
process.nextTick(() => this._subscriber.emit('message', message));
}
}
/**
* Loops through inventory and extends the deadlines for any messages that
Expand Down
14 changes: 9 additions & 5 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -365,13 +365,17 @@ export class Subscriber extends EventEmitter {
*
* @private
*/
private _onData(response: PullResponse): void {
response.receivedMessages.forEach((data: ReceivedMessage) => {
private _onData({receivedMessages}: PullResponse): void {
for (const data of receivedMessages) {
const message = new Message(this, data);

message.modAck(this.ackDeadline);
this._inventory.add(message);
});
if (this.isOpen) {
message.modAck(this.ackDeadline);
this._inventory.add(message);
} else {
message.nack();
}
}
}

/**
Expand Down
34 changes: 12 additions & 22 deletions system-test/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,11 +415,8 @@ describe('pubsub', () => {
subscription.on('message', ack);

function ack(message) {
// remove listener to we only ack first message
subscription.removeListener('message', ack);

message.ack();
setTimeout(() => subscription.close(done), 2500);
subscription.close(done);
}
});

Expand All @@ -430,11 +427,8 @@ describe('pubsub', () => {
subscription.on('message', nack);

function nack(message) {
// remove listener to we only ack first message
subscription.removeListener('message', nack);

message.nack();
setTimeout(() => subscription.close(done), 2500);
subscription.close(done);
}
});

Expand Down Expand Up @@ -588,36 +582,32 @@ describe('pubsub', () => {
let subscription;
let snapshot;

function deleteAllSnapshots() {
// tslint:disable-next-line no-any
return (pubsub.getSnapshots() as any).then(data => {
return Promise.all(data[0].map(snapshot => {
return snapshot.delete();
}));
});
function getSnapshotName({name}) {
return name.split('/').pop();
}

before(async () => {
topic = pubsub.topic(generateTopicName());
subscription = topic.subscription(generateSubName());
snapshot = subscription.snapshot(SNAPSHOT_NAME);

await deleteAllSnapshots();
await topic.create();
await subscription.create();
await snapshot.create();
});

after(async () => {
await deleteAllSnapshots();
await snapshot.delete();
await subscription.delete();
await topic.delete();
});

it('should get a list of snapshots', done => {
pubsub.getSnapshots((err, snapshots) => {
assert.ifError(err);
assert.strictEqual(snapshots.length, 1);
assert.strictEqual(snapshots[0].name.split('/').pop(), SNAPSHOT_NAME);
assert(snapshots.length > 0);
const names = snapshots.map(getSnapshotName);
assert(names.includes(SNAPSHOT_NAME));
done();
});
});
Expand All @@ -633,9 +623,9 @@ describe('pubsub', () => {
snapshots.push(snapshot);
})
.on('end', () => {
assert.strictEqual(snapshots.length, 1);
assert.strictEqual(
snapshots[0].name.split('/').pop(), SNAPSHOT_NAME);
assert(snapshots.length > 0);
const names = snapshots.map(getSnapshotName);
assert(names.includes(SNAPSHOT_NAME));
done();
});
});
Expand Down
18 changes: 17 additions & 1 deletion test/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const fakeos = {

class FakeSubscriber extends EventEmitter {
ackDeadline = 10;
isOpen = true;
modAckLatency = 2000;
async modAck(message: FakeMessage, deadline: number): Promise<void> {}
}
Expand Down Expand Up @@ -140,7 +141,7 @@ describe('LeaseManager', () => {
});

it('should not dispatch the message if the inventory is full', done => {
const message = new FakeMessage();
const fakeMessage = new FakeMessage();

leaseManager.isFull = () => true;
leaseManager.setOptions({allowExcessMessages: false});
Expand All @@ -149,6 +150,21 @@ describe('LeaseManager', () => {
done(new Error('Test should not have dispatched message.'));
});

leaseManager.add(fakeMessage);
setImmediate(done);
});

it('should not dispatch the message if the sub closes', done => {
const fakeMessage = new FakeMessage();

leaseManager.isFull = () => false;

subscriber.isOpen = false;
subscriber.on('message', () => {
done(new Error('Test should not have dispatched message.'));
});

leaseManager.add(fakeMessage);
setImmediate(done);
});

Expand Down
12 changes: 12 additions & 0 deletions test/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,18 @@ describe('Subscriber', () => {
subscriber.close();
});

it('should nack any messages that come in after', () => {
const stream: FakeMessageStream = stubs.get('messageStream');
const stub = sandbox.stub(subscriber, 'nack');
const pullResponse = {receivedMessages: [RECEIVED_MESSAGE]};

subscriber.close();
stream.emit('data', pullResponse);

const [{ackId}] = stub.lastCall.args;
assert.strictEqual(ackId, RECEIVED_MESSAGE.ackId);
});

describe('flushing the queues', () => {
it('should wait for any pending acks', async () => {
const ackQueue: FakeAckQueue = stubs.get('ackQueue');
Expand Down

0 comments on commit e59f8ec

Please sign in to comment.