Skip to content

Commit

Permalink
Merge pull request #86 from MisskeyIO/revert-bullmq
Browse files Browse the repository at this point in the history
Revert "enhance(backend): migrate bull to bullmq (misskey-dev#10910)"
  • Loading branch information
riku6460 authored Jun 15, 2023
2 parents cc87ef9 + 9b3a92e commit e8cf53a
Show file tree
Hide file tree
Showing 42 changed files with 491 additions and 524 deletions.
2 changes: 1 addition & 1 deletion packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
"autwh": "0.1.0",
"bcryptjs": "2.4.3",
"blurhash": "2.0.5",
"bullmq": "3.15.0",
"bull": "4.10.4",
"cacheable-lookup": "6.1.0",
"cbor": "9.0.0",
"chalk": "5.2.0",
Expand Down
1 change: 0 additions & 1 deletion packages/backend/src/boot/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ export async function jobQueue() {
});
jobQueue.enableShutdownHooks();

jobQueue.get(QueueProcessorService).start();
jobQueue.get(ChartManagementService).start();

return jobQueue;
Expand Down
2 changes: 1 addition & 1 deletion packages/backend/src/core/NoteCreateService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ export class NoteCreateService implements OnApplicationShutdown {

if (data.poll && data.poll.expiresAt) {
const delay = data.poll.expiresAt.getTime() - Date.now();
this.queueService.endedPollNotificationQueue.add(note.id, {
this.queueService.endedPollNotificationQueue.add({
noteId: note.id,
}, {
delay,
Expand Down
53 changes: 42 additions & 11 deletions packages/backend/src/core/QueueModule.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,42 @@
import { setTimeout } from 'node:timers/promises';
import { Inject, Module, OnApplicationShutdown } from '@nestjs/common';
import * as Bull from 'bullmq';
import Bull from 'bull';
import { DI } from '@/di-symbols.js';
import type { Config } from '@/config.js';
import { QUEUE, baseQueueOptions } from '@/queue/const.js';
import type { Provider } from '@nestjs/common';
import type { DeliverJobData, InboxJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData } from '../queue/types.js';
import type { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData, DbJobMap } from '../queue/types.js';

function q<T>(config: Config, name: string, limitPerSec = -1) {
return new Bull<T>(name, {
redis: {
port: config.redisForJobQueue.port,
host: config.redisForJobQueue.host,
family: config.redisForJobQueue.family == null ? 0 : config.redisForJobQueue.family,
password: config.redisForJobQueue.pass,
db: config.redisForJobQueue.db ?? 0,
},
prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue` : 'queue',
limiter: limitPerSec > 0 ? {
max: limitPerSec,
duration: 1000,
} : undefined,
settings: {
backoffStrategies: {
apBackoff,
},
},
});
}

// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
function apBackoff(attemptsMade: number, err: Error) {
const baseDelay = 60 * 1000; // 1min
const maxBackoff = 8 * 60 * 60 * 1000; // 8hours
let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay;
backoff = Math.min(backoff, maxBackoff);
backoff += Math.round(backoff * Math.random() * 0.2);
return backoff;
}

export type SystemQueue = Bull.Queue<Record<string, unknown>>;
export type EndedPollNotificationQueue = Bull.Queue<EndedPollNotificationJobData>;
Expand All @@ -18,49 +49,49 @@ export type WebhookDeliverQueue = Bull.Queue<WebhookDeliverJobData>;

const $system: Provider = {
provide: 'queue:system',
useFactory: (config: Config) => new Bull.Queue(QUEUE.SYSTEM, baseQueueOptions(config, QUEUE.SYSTEM)),
useFactory: (config: Config) => q(config, 'system'),
inject: [DI.config],
};

const $endedPollNotification: Provider = {
provide: 'queue:endedPollNotification',
useFactory: (config: Config) => new Bull.Queue(QUEUE.ENDED_POLL_NOTIFICATION, baseQueueOptions(config, QUEUE.ENDED_POLL_NOTIFICATION)),
useFactory: (config: Config) => q(config, 'endedPollNotification'),
inject: [DI.config],
};

const $deliver: Provider = {
provide: 'queue:deliver',
useFactory: (config: Config) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config, QUEUE.DELIVER)),
useFactory: (config: Config) => q(config, 'deliver', config.deliverJobPerSec ?? 128),
inject: [DI.config],
};

const $inbox: Provider = {
provide: 'queue:inbox',
useFactory: (config: Config) => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(config, QUEUE.INBOX)),
useFactory: (config: Config) => q(config, 'inbox', config.inboxJobPerSec ?? 16),
inject: [DI.config],
};

const $db: Provider = {
provide: 'queue:db',
useFactory: (config: Config) => new Bull.Queue(QUEUE.DB, baseQueueOptions(config, QUEUE.DB)),
useFactory: (config: Config) => q(config, 'db'),
inject: [DI.config],
};

const $relationship: Provider = {
provide: 'queue:relationship',
useFactory: (config: Config) => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(config, QUEUE.RELATIONSHIP)),
useFactory: (config: Config) => q(config, 'relationship', config.relashionshipJobPerSec ?? 64),
inject: [DI.config],
};

const $objectStorage: Provider = {
provide: 'queue:objectStorage',
useFactory: (config: Config) => new Bull.Queue(QUEUE.OBJECT_STORAGE, baseQueueOptions(config, QUEUE.OBJECT_STORAGE)),
useFactory: (config: Config) => q(config, 'objectStorage'),
inject: [DI.config],
};

const $webhookDeliver: Provider = {
provide: 'queue:webhookDeliver',
useFactory: (config: Config) => new Bull.Queue(QUEUE.WEBHOOK_DELIVER, baseQueueOptions(config, QUEUE.WEBHOOK_DELIVER)),
useFactory: (config: Config) => q(config, 'webhookDeliver', 64),
inject: [DI.config],
};

Expand Down
65 changes: 16 additions & 49 deletions packages/backend/src/core/QueueService.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Inject, Injectable } from '@nestjs/common';
import { v4 as uuid } from 'uuid';
import Bull from 'bull';
import type { IActivity } from '@/core/activitypub/type.js';
import type { DriveFile } from '@/models/entities/DriveFile.js';
import type { Webhook, webhookEventTypes } from '@/models/entities/Webhook.js';
Expand All @@ -10,7 +11,6 @@ import type { Antenna } from '@/server/api/endpoints/i/import-antennas.js';
import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, RelationshipQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js';
import type { DbJobData, RelationshipJobData, ThinUser } from '../queue/types.js';
import type httpSignature from '@peertube/http-signature';
import type * as Bull from 'bullmq';

@Injectable()
export class QueueService {
Expand All @@ -26,43 +26,7 @@ export class QueueService {
@Inject('queue:relationship') public relationshipQueue: RelationshipQueue,
@Inject('queue:objectStorage') public objectStorageQueue: ObjectStorageQueue,
@Inject('queue:webhookDeliver') public webhookDeliverQueue: WebhookDeliverQueue,
) {
this.systemQueue.add('tickCharts', {
}, {
repeat: { pattern: '55 * * * *' },
removeOnComplete: true,
});

this.systemQueue.add('resyncCharts', {
}, {
repeat: { pattern: '0 0 * * *' },
removeOnComplete: true,
});

this.systemQueue.add('cleanCharts', {
}, {
repeat: { pattern: '0 0 * * *' },
removeOnComplete: true,
});

this.systemQueue.add('aggregateRetention', {
}, {
repeat: { pattern: '0 0 * * *' },
removeOnComplete: true,
});

this.systemQueue.add('clean', {
}, {
repeat: { pattern: '0 0 * * *' },
removeOnComplete: true,
});

this.systemQueue.add('checkExpiredMutings', {
}, {
repeat: { pattern: '*/5 * * * *' },
removeOnComplete: true,
});
}
) {}

@bindThis
public deliver(user: ThinUser, content: IActivity | null, to: string | null, isSharedInbox: boolean) {
Expand All @@ -78,10 +42,11 @@ export class QueueService {
isSharedInbox,
};

return this.deliverQueue.add(to, data, {
return this.deliverQueue.add(data, {
attempts: this.config.deliverJobMaxAttempts ?? 12,
timeout: 1 * 60 * 1000, // 1min
backoff: {
type: 'custom',
type: 'apBackoff',
},
removeOnComplete: true,
removeOnFail: true,
Expand All @@ -95,10 +60,11 @@ export class QueueService {
signature,
};

return this.inboxQueue.add('', data, {
return this.inboxQueue.add(data, {
attempts: this.config.inboxJobMaxAttempts ?? 8,
timeout: 5 * 60 * 1000, // 5min
backoff: {
type: 'custom',
type: 'apBackoff',
},
removeOnComplete: true,
removeOnFail: true,
Expand Down Expand Up @@ -246,7 +212,7 @@ export class QueueService {
private generateToDbJobData<T extends 'importFollowingToDb' | 'importBlockingToDb', D extends DbJobData<T>>(name: T, data: D): {
name: string,
data: D,
opts: Bull.JobsOptions,
opts: Bull.JobOptions,
} {
return {
name,
Expand Down Expand Up @@ -333,10 +299,10 @@ export class QueueService {
}

@bindThis
private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData, opts: Bull.JobsOptions = {}): {
private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData, opts: Bull.JobOptions = {}): {
name: string,
data: RelationshipJobData,
opts: Bull.JobsOptions,
opts: Bull.JobOptions,
} {
return {
name,
Expand Down Expand Up @@ -385,10 +351,11 @@ export class QueueService {
eventId: uuid(),
};

return this.webhookDeliverQueue.add(webhook.id, data, {
return this.webhookDeliverQueue.add(data, {
attempts: 4,
timeout: 1 * 60 * 1000, // 1min
backoff: {
type: 'custom',
type: 'apBackoff',
},
removeOnComplete: true,
removeOnFail: true,
Expand All @@ -400,11 +367,11 @@ export class QueueService {
this.deliverQueue.once('cleaned', (jobs, status) => {
//deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
this.deliverQueue.clean(0, Infinity, 'delayed');
this.deliverQueue.clean(0, 'delayed');

this.inboxQueue.once('cleaned', (jobs, status) => {
//inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
this.inboxQueue.clean(0, Infinity, 'delayed');
this.inboxQueue.clean(0, 'delayed');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import { PollService } from '@/core/PollService.js';
import { StatusError } from '@/misc/status-error.js';
import { UtilityService } from '@/core/UtilityService.js';
import { bindThis } from '@/decorators.js';
import { checkHttps } from '@/misc/check-https.js';
import { getOneApId, getApId, getOneApHrefNullable, validPost, isEmoji, getApType } from '../type.js';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports
import { ApLoggerService } from '../ApLoggerService.js';
Expand All @@ -33,6 +32,7 @@ import { ApQuestionService } from './ApQuestionService.js';
import { ApImageService } from './ApImageService.js';
import type { Resolver } from '../ApResolverService.js';
import type { IObject, IPost } from '../type.js';
import { checkHttps } from '@/misc/check-https.js';

@Injectable()
export class ApNoteService {
Expand Down
16 changes: 3 additions & 13 deletions packages/backend/src/daemons/QueueStatsService.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import { Inject, Injectable } from '@nestjs/common';
import { Injectable } from '@nestjs/common';
import Xev from 'xev';
import * as Bull from 'bullmq';
import { QueueService } from '@/core/QueueService.js';
import { bindThis } from '@/decorators.js';
import { DI } from '@/di-symbols.js';
import type { Config } from '@/config.js';
import { QUEUE, baseQueueOptions } from '@/queue/const.js';
import type { OnApplicationShutdown } from '@nestjs/common';

const ev = new Xev();
Expand All @@ -17,9 +13,6 @@ export class QueueStatsService implements OnApplicationShutdown {
private intervalId: NodeJS.Timer;

constructor(
@Inject(DI.config)
private config: Config,

private queueService: QueueService,
) {
}
Expand All @@ -38,14 +31,11 @@ export class QueueStatsService implements OnApplicationShutdown {
let activeDeliverJobs = 0;
let activeInboxJobs = 0;

const deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER));
const inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX));

deliverQueueEvents.on('active', () => {
this.queueService.deliverQueue.on('global:active', () => {
activeDeliverJobs++;
});

inboxQueueEvents.on('active', () => {
this.queueService.inboxQueue.on('global:active', () => {
activeInboxJobs++;
});

Expand Down
15 changes: 7 additions & 8 deletions packages/backend/src/misc/prelude/time.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ const dateTimeIntervals = {
};

export function dateUTC(time: number[]): Date {
const d =
time.length === 2 ? Date.UTC(time[0], time[1])
: time.length === 3 ? Date.UTC(time[0], time[1], time[2])
: time.length === 4 ? Date.UTC(time[0], time[1], time[2], time[3])
: time.length === 5 ? Date.UTC(time[0], time[1], time[2], time[3], time[4])
: time.length === 6 ? Date.UTC(time[0], time[1], time[2], time[3], time[4], time[5])
: time.length === 7 ? Date.UTC(time[0], time[1], time[2], time[3], time[4], time[5], time[6])
: null;
const d = time.length === 2 ? Date.UTC(time[0], time[1])
: time.length === 3 ? Date.UTC(time[0], time[1], time[2])
: time.length === 4 ? Date.UTC(time[0], time[1], time[2], time[3])
: time.length === 5 ? Date.UTC(time[0], time[1], time[2], time[3], time[4])
: time.length === 6 ? Date.UTC(time[0], time[1], time[2], time[3], time[4], time[5])
: time.length === 7 ? Date.UTC(time[0], time[1], time[2], time[3], time[4], time[5], time[6])
: null;

if (!d) throw new Error('wrong number of arguments');

Expand Down
Loading

0 comments on commit e8cf53a

Please sign in to comment.