Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: set MaxBytes for AckQueue #1963

Merged
merged 9 commits into from
Sep 13, 2024
19 changes: 17 additions & 2 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ export interface BatchOptions {
maxMilliseconds?: number;
}

// This is the maximum number of bytes we will send for a batch of
// ack/modack messages. The server itself has a maximum of 512KiB, so
// we just pull back a little from that in case of unknown fenceposts.
export const MAX_BATCH_BYTES = 510 * 1024 * 1024;
hongalex marked this conversation as resolved.
Show resolved Hide resolved

/**
* Error class used to signal a batch failure.
*
Expand Down Expand Up @@ -113,6 +118,7 @@ export abstract class MessageQueue {
numPendingRequests: number;
numInFlightRequests: number;
numInRetryRequests: number;
bytes: number;
protected _onFlush?: defer.DeferredPromise<void>;
protected _onDrain?: defer.DeferredPromise<void>;
protected _options!: BatchOptions;
Expand All @@ -127,6 +133,7 @@ export abstract class MessageQueue {
this.numPendingRequests = 0;
this.numInFlightRequests = 0;
this.numInRetryRequests = 0;
this.bytes = 0;
this._requests = [];
this._subscriber = sub;
this._retrier = new ExponentialRetry<QueuedMessage>(
Expand Down Expand Up @@ -208,8 +215,12 @@ export abstract class MessageQueue {
});
this.numPendingRequests++;
this.numInFlightRequests++;
this.bytes += Buffer.byteLength(message.ackId, 'utf8');

if (this._requests.length >= maxMessages!) {
if (
this._requests.length >= maxMessages! ||
this.bytes >= MAX_BATCH_BYTES
feywind marked this conversation as resolved.
Show resolved Hide resolved
) {
this.flush();
} else if (!this._timer) {
this._timer = setTimeout(() => this.flush(), maxMilliseconds!);
Expand Down Expand Up @@ -273,6 +284,7 @@ export abstract class MessageQueue {
const deferred = this._onFlush;

this._requests = [];
this.bytes = 0;
this.numPendingRequests -= batchSize;
delete this._onFlush;

Expand Down Expand Up @@ -339,7 +351,10 @@ export abstract class MessageQueue {
* @private
*/
setOptions(options: BatchOptions): void {
const defaults: BatchOptions = {maxMessages: 3000, maxMilliseconds: 100};
const defaults: BatchOptions = {
maxMessages: 3000,
maxMilliseconds: 100,
};

this._options = Object.assign(defaults, options);
}
Expand Down
60 changes: 23 additions & 37 deletions test/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import defer = require('p-defer');

import * as messageTypes from '../src/message-queues';
import {BatchError} from '../src/message-queues';
import {AckError, Message, Subscriber} from '../src/subscriber';
import {Message, Subscriber} from '../src/subscriber';
import {DebugMessage} from '../src/debug';

class FakeClient {
Expand Down Expand Up @@ -99,36 +99,6 @@ class ModAckQueue extends messageTypes.ModAckQueue {
}
}

// This discount polyfill for Promise.allSettled can be removed after we drop Node 12.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the related changes below are just cleanup I did while in here.

type AllSettledResult<T, U> = {
status: 'fulfilled' | 'rejected';
value?: T;
reason?: U;
};
function allSettled<T, U>(
proms: Promise<T>[]
): Promise<AllSettledResult<T, U>[]> {
const checkedProms = proms.map((r: Promise<T>) =>
r
.then(
(value: T) =>
({
status: 'fulfilled',
value,
}) as AllSettledResult<T, U>
)
.catch(
(error: U) =>
({
status: 'rejected',
reason: error,
}) as AllSettledResult<T, U>
)
);

return Promise.all(checkedProms);
}

describe('MessageQueues', () => {
const sandbox = sinon.createSandbox();

Expand Down Expand Up @@ -190,6 +160,15 @@ describe('MessageQueues', () => {
assert.strictEqual(stub.callCount, 1);
});

it('should flush the queue if at byte capacity', () => {
const stub = sandbox.stub(messageQueue, 'flush');

messageQueue.bytes = messageTypes.MAX_BATCH_BYTES - 10;
messageQueue.add(new FakeMessage() as Message);

assert.strictEqual(stub.callCount, 1);
});

it('should schedule a flush if needed', () => {
const clock = sandbox.useFakeTimers();
const stub = sandbox.stub(messageQueue, 'flush');
Expand Down Expand Up @@ -244,6 +223,13 @@ describe('MessageQueues', () => {
assert.strictEqual(messageQueue.numPendingRequests, 0);
});

it('should remove the bytes of messages from the queue', () => {
messageQueue.add(new FakeMessage() as Message);
messageQueue.flush();

assert.strictEqual(messageQueue.bytes, 0);
});

it('should send the batch', () => {
const message = new FakeMessage();
const deadline = 10;
Expand Down Expand Up @@ -498,7 +484,7 @@ describe('MessageQueues', () => {
(r: messageTypes.QueuedMessage) => r.responsePromise!.promise
);
await ackQueue.flush();
const results = await allSettled(proms);
const results = await Promise.allSettled(proms);
const oneSuccess = {status: 'fulfilled', value: undefined};
assert.deepStrictEqual(results, [oneSuccess, oneSuccess, oneSuccess]);
});
Expand All @@ -522,7 +508,7 @@ describe('MessageQueues', () => {
proms.shift();
await ackQueue.flush();

const results = await allSettled<void, AckError>(proms);
const results = await Promise.allSettled<void>(proms);
assert.strictEqual(results[0].status, 'rejected');
assert.strictEqual(results[0].reason?.errorCode, 'OTHER');
assert.strictEqual(results[1].status, 'rejected');
Expand Down Expand Up @@ -552,7 +538,7 @@ describe('MessageQueues', () => {
];
await ackQueue.flush();

const results = await allSettled<void, AckError>(proms);
const results = await Promise.allSettled<void>(proms);
assert.strictEqual(results[0].status, 'rejected');
assert.strictEqual(results[0].reason?.errorCode, 'INVALID');

Expand Down Expand Up @@ -789,7 +775,7 @@ describe('MessageQueues', () => {
(r: messageTypes.QueuedMessage) => r.responsePromise!.promise
);
await modAckQueue.flush();
const results = await allSettled(proms);
const results = await Promise.allSettled(proms);
const oneSuccess = {status: 'fulfilled', value: undefined};
assert.deepStrictEqual(results, [oneSuccess, oneSuccess, oneSuccess]);
});
Expand All @@ -815,7 +801,7 @@ describe('MessageQueues', () => {
proms.shift();
await modAckQueue.flush();

const results = await allSettled<void, AckError>(proms);
const results = await Promise.allSettled<void>(proms);
assert.strictEqual(results[0].status, 'rejected');
assert.strictEqual(results[0].reason?.errorCode, 'OTHER');
assert.strictEqual(results[1].status, 'rejected');
Expand Down Expand Up @@ -847,7 +833,7 @@ describe('MessageQueues', () => {
];
await modAckQueue.flush();

const results = await allSettled<void, AckError>(proms);
const results = await Promise.allSettled<void>(proms);
assert.strictEqual(results[0].status, 'rejected');
assert.strictEqual(results[0].reason?.errorCode, 'INVALID');

Expand Down
Loading