Skip to content

Commit

Permalink
fix(handlers): create burst handler for interaction callbacks (#8996)
Browse files Browse the repository at this point in the history
* fix(handlers): create burst handler for interaction callbacks

* docs: use remarks instead of info block

Co-Authored-By: Almeida <[email protected]>

* refactor: move code duplication to shared handler

Co-authored-by: Jiralite <[email protected]>

* Update packages/rest/src/lib/handlers/BurstHandler.ts

---------

Co-authored-by: Almeida <[email protected]>
Co-authored-by: Jiralite <[email protected]>
Co-authored-by: Vlad Frangu <[email protected]>
Co-authored-by: Aura Román <[email protected]>
  • Loading branch information
5 people authored Mar 30, 2023
1 parent 984bd55 commit db8df10
Show file tree
Hide file tree
Showing 9 changed files with 511 additions and 124 deletions.
139 changes: 139 additions & 0 deletions packages/rest/__tests__/BurstHandler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/* eslint-disable id-length */
/* eslint-disable promise/prefer-await-to-then */
import { performance } from 'node:perf_hooks';
import { MockAgent, setGlobalDispatcher } from 'undici';
import type { Interceptable, MockInterceptor } from 'undici/types/mock-interceptor';
import { beforeEach, afterEach, test, expect, vitest } from 'vitest';
import { DiscordAPIError, HTTPError, RateLimitError, REST, BurstHandlerMajorIdKey } from '../src/index.js';
import { BurstHandler } from '../src/lib/handlers/BurstHandler.js';
import { genPath } from './util.js';

const callbackKey = `Global(POST:/interactions/:id/:token/callback):${BurstHandlerMajorIdKey}`;
const callbackPath = new RegExp(genPath('/interactions/[0-9]{17,19}/.+/callback'));

const api = new REST();

let mockAgent: MockAgent;
let mockPool: Interceptable;

beforeEach(() => {
mockAgent = new MockAgent();
mockAgent.disableNetConnect();
setGlobalDispatcher(mockAgent);

mockPool = mockAgent.get('https://discord.com');
api.setAgent(mockAgent);
});

afterEach(async () => {
await mockAgent.close();
});

// @discordjs/rest uses the `content-type` header to detect whether to parse
// the response as JSON or as an ArrayBuffer.
const responseOptions: MockInterceptor.MockResponseOptions = {
headers: {
'content-type': 'application/json',
},
};

test('Interaction callback creates burst handler', async () => {
mockPool.intercept({ path: callbackPath, method: 'POST' }).reply(200);

expect(api.requestManager.handlers.get(callbackKey)).toBe(undefined);
expect(
await api.post('/interactions/1234567890123456789/totallyarealtoken/callback', {
auth: false,
body: { type: 4, data: { content: 'Reply' } },
}),
).toBeInstanceOf(Uint8Array);
expect(api.requestManager.handlers.get(callbackKey)).toBeInstanceOf(BurstHandler);
});

test('Requests are handled in bursts', async () => {
mockPool.intercept({ path: callbackPath, method: 'POST' }).reply(200).delay(100).times(3);

// Return the current time on these results as their response does not indicate anything
const [a, b, c] = await Promise.all([
api
.post('/interactions/1234567890123456789/totallyarealtoken/callback', {
auth: false,
body: { type: 4, data: { content: 'Reply1' } },
})
.then(() => performance.now()),
api
.post('/interactions/2345678901234567890/anotherveryrealtoken/callback', {
auth: false,
body: { type: 4, data: { content: 'Reply2' } },
})
.then(() => performance.now()),
api
.post('/interactions/3456789012345678901/nowaytheresanotherone/callback', {
auth: false,
body: { type: 4, data: { content: 'Reply3' } },
})
.then(() => performance.now()),
]);

expect(b - a).toBeLessThan(10);
expect(c - a).toBeLessThan(10);
});

test('Handle 404', async () => {
mockPool
.intercept({ path: callbackPath, method: 'POST' })
.reply(404, { message: 'Unknown interaction', code: 10_062 }, responseOptions);

const promise = api.post('/interactions/1234567890123456788/definitelynotarealinteraction/callback', {
auth: false,
body: { type: 4, data: { content: 'Malicious' } },
});
await expect(promise).rejects.toThrowError('Unknown interaction');
await expect(promise).rejects.toBeInstanceOf(DiscordAPIError);
});

let unexpected429 = true;
test('Handle unexpected 429', async () => {
mockPool
.intercept({
path: callbackPath,
method: 'POST',
})
.reply(() => {
if (unexpected429) {
unexpected429 = false;
return {
statusCode: 429,
data: '',
responseOptions: {
headers: {
'retry-after': '1',
via: '1.1 google',
},
},
};
}

return {
statusCode: 200,
data: { test: true },
responseOptions,
};
})
.times(2);

const previous = performance.now();
let firstResolvedTime: number;
const unexpectedLimit = api
.post('/interactions/1234567890123456789/totallyarealtoken/callback', {
auth: false,
body: { type: 4, data: { content: 'Reply' } },
})
.then((res) => {
firstResolvedTime = performance.now();
return res;
});

expect(await unexpectedLimit).toStrictEqual({ test: true });
expect(performance.now()).toBeGreaterThanOrEqual(previous + 1_000);
});
6 changes: 4 additions & 2 deletions packages/rest/__tests__/REST.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { Buffer } from 'node:buffer';
import { Buffer, File as NativeFile } from 'node:buffer';
import { URLSearchParams } from 'node:url';
import { DiscordSnowflake } from '@sapphire/snowflake';
import type { Snowflake } from 'discord-api-types/v10';
import { Routes } from 'discord-api-types/v10';
import type { FormData } from 'undici';
import { File, MockAgent, setGlobalDispatcher } from 'undici';
import { File as UndiciFile, MockAgent, setGlobalDispatcher } from 'undici';
import type { Interceptable, MockInterceptor } from 'undici/types/mock-interceptor';
import { beforeEach, afterEach, test, expect } from 'vitest';
import { REST } from '../src/index.js';
import { genPath } from './util.js';

const File = NativeFile ?? UndiciFile;

const newSnowflake: Snowflake = DiscordSnowflake.generate().toString();

const api = new REST().setToken('A-Very-Fake-Token');
Expand Down
22 changes: 20 additions & 2 deletions packages/rest/src/lib/RequestManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@ import { lazy } from '@discordjs/util';
import { DiscordSnowflake } from '@sapphire/snowflake';
import { FormData, type RequestInit, type BodyInit, type Dispatcher, type Agent } from 'undici';
import type { RESTOptions, RestEvents, RequestOptions } from './REST.js';
import { BurstHandler } from './handlers/BurstHandler.js';
import type { IHandler } from './handlers/IHandler.js';
import { SequentialHandler } from './handlers/SequentialHandler.js';
import { DefaultRestOptions, DefaultUserAgent, OverwrittenMimeTypes, RESTEvents } from './utils/constants.js';
import {
BurstHandlerMajorIdKey,
DefaultRestOptions,
DefaultUserAgent,
OverwrittenMimeTypes,
RESTEvents,
} from './utils/constants.js';
import { resolveBody } from './utils/utils.js';

// Make this a lazy dynamic import as file-type is a pure ESM package
Expand Down Expand Up @@ -351,7 +358,10 @@ export class RequestManager extends EventEmitter {
*/
private createHandler(hash: string, majorParameter: string) {
// Create the async request queue to handle requests
const queue = new SequentialHandler(this, hash, majorParameter);
const queue =
majorParameter === BurstHandlerMajorIdKey
? new BurstHandler(this, hash, majorParameter)
: new SequentialHandler(this, hash, majorParameter);
// Save the queue based on its id
this.handlers.set(queue.id, queue);

Expand Down Expand Up @@ -499,6 +509,14 @@ export class RequestManager extends EventEmitter {
* @internal
*/
private static generateRouteData(endpoint: RouteLike, method: RequestMethod): RouteData {
if (endpoint.startsWith('/interactions/') && endpoint.endsWith('/callback')) {
return {
majorParameter: BurstHandlerMajorIdKey,
bucketRoute: '/interactions/:id/:token/callback',
original: endpoint,
};
}

const majorIdMatch = /^\/(?:channels|guilds|webhooks)\/(\d{17,19})/.exec(endpoint);

// Get the major id for this route - global otherwise
Expand Down
146 changes: 146 additions & 0 deletions packages/rest/src/lib/handlers/BurstHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import { setTimeout as sleep } from 'node:timers/promises';
import type { Dispatcher } from 'undici';
import type { RequestOptions } from '../REST.js';
import type { HandlerRequestData, RequestManager, RouteData } from '../RequestManager.js';
import { RESTEvents } from '../utils/constants.js';
import { onRateLimit, parseHeader } from '../utils/utils.js';
import type { IHandler } from './IHandler.js';
import { handleErrors, incrementInvalidCount, makeNetworkRequest } from './Shared.js';

/**
* The structure used to handle burst requests for a given bucket.
* Burst requests have no ratelimit handling but allow for pre- and post-processing
* of data in the same manner as sequentially queued requests.
*
* @remarks
* This queue may still emit a rate limit error if an unexpected 429 is hit
*/
export class BurstHandler implements IHandler {
/**
* {@inheritdoc IHandler.id}
*/
public readonly id: string;

/**
* {@inheritDoc IHandler.inactive}
*/
public inactive = false;

/**
* @param manager - The request manager
* @param hash - The hash that this RequestHandler handles
* @param majorParameter - The major parameter for this handler
*/
public constructor(
private readonly manager: RequestManager,
private readonly hash: string,
private readonly majorParameter: string,
) {
this.id = `${hash}:${majorParameter}`;
}

/**
* Emits a debug message
*
* @param message - The message to debug
*/
private debug(message: string) {
this.manager.emit(RESTEvents.Debug, `[REST ${this.id}] ${message}`);
}

/**
* {@inheritDoc IHandler.queueRequest}
*/
public async queueRequest(
routeId: RouteData,
url: string,
options: RequestOptions,
requestData: HandlerRequestData,
): Promise<Dispatcher.ResponseData> {
return this.runRequest(routeId, url, options, requestData);
}

/**
* The method that actually makes the request to the API, and updates info about the bucket accordingly
*
* @param routeId - The generalized API route with literal ids for major parameters
* @param url - The fully resolved URL to make the request to
* @param options - The fetch options needed to make the request
* @param requestData - Extra data from the user's request needed for errors and additional processing
* @param retries - The number of retries this request has already attempted (recursion)
*/
private async runRequest(
routeId: RouteData,
url: string,
options: RequestOptions,
requestData: HandlerRequestData,
retries = 0,
): Promise<Dispatcher.ResponseData> {
const method = options.method ?? 'get';

const res = await makeNetworkRequest(this.manager, routeId, url, options, requestData, retries);

// Retry requested
if (res === null) {
// eslint-disable-next-line no-param-reassign
return this.runRequest(routeId, url, options, requestData, ++retries);
}

const status = res.statusCode;
let retryAfter = 0;
const retry = parseHeader(res.headers['retry-after']);

// Amount of time in milliseconds until we should retry if rate limited (globally or otherwise)
if (retry) retryAfter = Number(retry) * 1_000 + this.manager.options.offset;

// Count the invalid requests
if (status === 401 || status === 403 || status === 429) {
incrementInvalidCount(this.manager);
}

if (status >= 200 && status < 300) {
return res;
} else if (status === 429) {
// Unexpected ratelimit
const isGlobal = res.headers['x-ratelimit-global'] !== undefined;
await onRateLimit(this.manager, {
timeToReset: retryAfter,
limit: Number.POSITIVE_INFINITY,
method,
hash: this.hash,
url,
route: routeId.bucketRoute,
majorParameter: this.majorParameter,
global: isGlobal,
});
this.debug(
[
'Encountered unexpected 429 rate limit',
` Global : ${isGlobal}`,
` Method : ${method}`,
` URL : ${url}`,
` Bucket : ${routeId.bucketRoute}`,
` Major parameter: ${routeId.majorParameter}`,
` Hash : ${this.hash}`,
` Limit : ${Number.POSITIVE_INFINITY}`,
` Retry After : ${retryAfter}ms`,
` Sublimit : None`,
].join('\n'),
);

// We are bypassing all other limits, but an encountered limit should be respected (it's probably a non-punished rate limit anyways)
await sleep(retryAfter);

// Since this is not a server side issue, the next request should pass, so we don't bump the retries counter
return this.runRequest(routeId, url, options, requestData, retries);
} else {
const handled = await handleErrors(this.manager, res, method, url, requestData, retries);
if (handled === null) {
// eslint-disable-next-line no-param-reassign
return this.runRequest(routeId, url, options, requestData, ++retries);
}

return handled;
}
}
}
6 changes: 6 additions & 0 deletions packages/rest/src/lib/handlers/IHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,9 @@ export interface IHandler {
requestData: HandlerRequestData,
): Promise<Dispatcher.ResponseData>;
}

export interface PolyFillAbortSignal {
readonly aborted: boolean;
addEventListener(type: 'abort', listener: () => void): void;
removeEventListener(type: 'abort', listener: () => void): void;
}
Loading

0 comments on commit db8df10

Please sign in to comment.