Skip to content

Commit

Permalink
fix: rate limit notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
oae committed Oct 19, 2022
1 parent 355c1ae commit 20ada9b
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 38 deletions.
10 changes: 0 additions & 10 deletions src/components/chaptersTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { DataTable } from 'mantine-datatable';

import dayjs from 'dayjs';

import { showNotification } from '@mantine/notifications';
import prettyBytes from 'pretty-bytes';
import { useEffect, useState } from 'react';

Expand Down Expand Up @@ -51,15 +50,6 @@ export function ChaptersTable({ manga }: { manga: MangaWithMetadataAndChaptersLi
},
{ accessor: 'size', title: 'File Size', render: ({ size }) => prettyBytes(size) },
]}
rowContextMenu={{
items: () => [
{
key: 'download',
title: 'Download Again',
onClick: () => showNotification({ message: `Chapter queued for the download` }),
},
],
}}
/>
);
}
10 changes: 5 additions & 5 deletions src/components/navbar.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -203,35 +203,35 @@ function Activity({ data }: { data: ActivityType }) {
name="Active"
color="teal"
count={data.active}
href="/admin/queues/queue/downloadQueue?status=active"
href="/bull/queues/queue/downloadQueue?status=active"
/>
<ActivityItem
icon={<IconClock size={20} strokeWidth={1.5} />}
name="Queued"
color="cyan"
count={data.queued}
href="/admin/queues/queue/downloadQueue?status=waiting"
href="/bull/queues/queue/downloadQueue?status=waiting"
/>
<ActivityItem
icon={<IconCalendarStats size={20} strokeWidth={1.5} />}
name="Scheduled"
color="yellow"
count={data.scheduled}
href="/admin/queues/queue/downloadQueue?status=delayed"
href="/bull/queues/queue/checkChaptersQueue?status=delayed"
/>
<ActivityItem
icon={<IconAlertTriangle size={20} strokeWidth={1.5} />}
name="Failed"
color="red"
count={data.failed}
href="/admin/queues/queue/downloadQueue?status=failed"
href="/bull/queues/queue/downloadQueue?status=failed"
/>
<ActivityItem
icon={<IconCircleCheck size={20} strokeWidth={1.5} />}
name="Completed"
color="dark"
count={data.completed}
href="/admin/queues/queue/downloadQueue?status=completed"
href="/bull/queues/queue/downloadQueue?status=completed"
/>
</>
);
Expand Down
15 changes: 8 additions & 7 deletions src/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
import express, { Request, Response } from 'express';
import next from 'next';
import { ExpressAdapter } from '@bull-board/express';
import { createBullBoard } from '@bull-board/api';
import { BullAdapter } from '@bull-board/api/bullAdapter';
import { ExpressAdapter } from '@bull-board/express';
import express, { Request, Response } from 'express';
import next from 'next';
import { logger } from '../utils/logging';
import { downloadQueue } from './queue/download';
import { checkChaptersQueue } from './queue/checkChapters';
import { downloadQueue } from './queue/download';
import { notificationQueue } from './queue/notify';

const dev = process.env.NODE_ENV !== 'production';
const app = next({ dev });
const handle = app.getRequestHandler();
const port = process.env.PORT || 3000;

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
serverAdapter.setBasePath('/bull/queues');

createBullBoard({
queues: [new BullAdapter(downloadQueue), new BullAdapter(checkChaptersQueue)],
queues: [new BullAdapter(downloadQueue), new BullAdapter(checkChaptersQueue), new BullAdapter(notificationQueue)],
serverAdapter,
});

(async () => {
try {
await app.prepare();
const server = express();
server.use('/admin/queues', serverAdapter.getRouter()).all('*', (req: Request, res: Response) => {
server.use('/bull/queues', serverAdapter.getRouter()).all('*', (req: Request, res: Response) => {
return handle(req, res);
});

Expand Down
12 changes: 6 additions & 6 deletions src/server/queue/checkChapters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ const cronMap = {
weekly: '0 * * * 7',
};

const mangaWithLibrary = Prisma.validator<Prisma.MangaArgs>()({
include: { library: true },
const mangaWithLibraryAndMetadata = Prisma.validator<Prisma.MangaArgs>()({
include: { library: true, metadata: true },
});

export type MangaWithLibrary = Prisma.MangaGetPayload<typeof mangaWithLibrary>;
export type MangaWithLibraryAndMetadata = Prisma.MangaGetPayload<typeof mangaWithLibraryAndMetadata>;

const checkChapters = async (manga: MangaWithLibrary) => {
const checkChapters = async (manga: MangaWithLibraryAndMetadata) => {
logger.info(`Checking for new chapters: ${manga.title}`);
const mangaDir = path.resolve(manga.library.path, sanitizer(manga.title));
const missingChapterFiles = await findMissingChapterFiles(mangaDir, manga.source, manga.title);
Expand Down Expand Up @@ -102,7 +102,7 @@ export const checkChaptersQueue = new Queue('checkChaptersQueue', {
export const checkChaptersWorker = new Worker(
'checkChaptersQueue',
async (job: Job) => {
const { manga }: { manga: MangaWithLibrary } = job.data;
const { manga }: { manga: MangaWithLibraryAndMetadata } = job.data;
await checkChapters(manga);
await job.updateProgress(100);
},
Expand Down Expand Up @@ -132,7 +132,7 @@ export const removeJob = async (title: string) => {
);
};

export const schedule = async (manga: MangaWithLibrary) => {
export const schedule = async (manga: MangaWithLibraryAndMetadata) => {
if (manga.interval === 'never') {
return;
}
Expand Down
22 changes: 17 additions & 5 deletions src/server/queue/download.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import { Prisma } from '@prisma/client';
import { Job, Queue, Worker } from 'bullmq';
import { sanitizer } from '../../utils/sanitize';
import { prisma } from '../db/client';
import { downloadChapter, getChapterFromLocal } from '../utils/mangal';
import { sendNotification } from '../utils/notification';
import type { MangaWithLibrary } from './checkChapters';
import { notificationQueue } from './notify';

const mangaWithLibraryAndMetadata = Prisma.validator<Prisma.MangaArgs>()({
include: { library: true, metadata: true },
});

export type MangaWithLibraryAndMetadata = Prisma.MangaGetPayload<typeof mangaWithLibraryAndMetadata>;
export interface IDownloadWorkerData {
manga: MangaWithLibrary;
manga: MangaWithLibraryAndMetadata;
chapterIndex: number;
}

Expand All @@ -24,13 +30,19 @@ export const downloadWorker = new Worker(
},
});

await prisma.chapter.create({
const chapterInDb = await prisma.chapter.create({
data: {
...chapter,
mangaId: manga.id,
},
});
await sendNotification(`Downloaded a new chapter #${chapterIndex + 1} for ${manga.title} from ${manga.source}`);
await notificationQueue.add(`notify_${sanitizer(manga.title)}_${chapterInDb.id}`, {
chapterIndex,
chapterFileName: chapter.fileName,
mangaTitle: manga.title,
source: manga.source,
url: manga.metadata.urls.find((url) => url.includes('anilist')),
});
await job.updateProgress(100);
} catch (err) {
await job.log(`${err}`);
Expand Down
49 changes: 49 additions & 0 deletions src/server/queue/notify.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { Job, Queue, Worker } from 'bullmq';
import { sendNotification } from '../utils/notification';

export interface IDownloadWorkerData {
chapterIndex: number;
chapterFileName: string;
mangaTitle: string;
source: string;
url?: string;
}

export const notificationWorker = new Worker(
'notificationQueue',
async (job: Job) => {
const { chapterIndex, chapterFileName, mangaTitle, source, url }: IDownloadWorkerData = job.data;
try {
await sendNotification(
'Chapter grabbed',
`Chapter #${chapterIndex + 1} downloaded as ${chapterFileName} for ${mangaTitle} from ${source}`,
url,
);
await job.updateProgress(100);
} catch (err) {
await job.log(`${err}`);
throw err;
}
},
{
concurrency: 30,
limiter: {
max: 30,
duration: 1000 * 2,
},
},
);

export const notificationQueue = new Queue('notificationQueue', {
connection: {
host: 'localhost',
port: 6379,
},
defaultJobOptions: {
attempts: 20,
backoff: {
type: 'fixed',
delay: 1000 * 60 * 2,
},
},
});
7 changes: 4 additions & 3 deletions src/server/trpc/router/manga.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import { TRPCError } from '@trpc/server';
import path from 'path';
import { z } from 'zod';
import { sanitizer } from '../../../utils/sanitize';
import { removeJob, schedule } from '../../queue/checkChapters';
import { checkChaptersQueue, removeJob, schedule } from '../../queue/checkChapters';
import { downloadQueue } from '../../queue/download';
import { getAvailableSources, getMangaDetail, Manga, removeManga, search } from '../../utils/mangal';
import { t } from '../trpc';

export const mangaRouter = t.router({
query: t.procedure.query(async ({ ctx }) => {
return ctx.prisma.manga.findMany({ include: { metadata: true } });
return ctx.prisma.manga.findMany({ include: { metadata: true }, orderBy: { title: 'asc' } });
}),
sources: t.procedure.query(async () => {
return getAvailableSources();
Expand Down Expand Up @@ -130,6 +130,7 @@ export const mangaRouter = t.router({
const manga = await ctx.prisma.manga.create({
include: {
library: true,
metadata: true,
},
data: {
source,
Expand Down Expand Up @@ -193,7 +194,7 @@ export const mangaRouter = t.router({
return {
active: await downloadQueue.getActiveCount(),
queued: await downloadQueue.getWaitingCount(),
scheduled: await downloadQueue.getDelayedCount(),
scheduled: await checkChaptersQueue.getDelayedCount(),
failed: await downloadQueue.getFailedCount(),
completed: await downloadQueue.getCompletedCount(),
};
Expand Down
11 changes: 9 additions & 2 deletions src/server/utils/notification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ const token = '***REMOVED***';
const chatId = '***REMOVED***';
const bot = new TelegramBot(token);

export const sendNotification = async (message: string) => {
await bot.sendMessage(chatId, message);
export const sendNotification = async (title: string, body: string, url?: string) => {
const message = `
<b>${title}</b>
${body}
${url || ''}
`;
await bot.sendMessage(chatId, message, { parse_mode: 'HTML' });
};

0 comments on commit 20ada9b

Please sign in to comment.