Skip to content

Commit

Permalink
feat: bullmq (#90)
Browse files Browse the repository at this point in the history
Co-authored-by: coji <[email protected]>
  • Loading branch information
coji and coji authored Jul 26, 2024
1 parent 79ea1f8 commit 221f770
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 49 deletions.
32 changes: 32 additions & 0 deletions web/app/feature/translate/jobs/translate-job.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { translate } from "../libs/translation";
import { addNumbersToContent } from "../utils/addNumbersToContent";
import { extractArticle } from "../utils/extractArticle";
import { extractNumberedElements } from "../utils/extractNumberedElements";
import { fetchWithRetry } from "../utils/fetchWithRetry";

interface TranslateJobParams {
url: string;
targetLanguage: string;
apiKey: string;
userId: number;
}
export const translateJob = async (params: TranslateJobParams) => {
const html = await fetchWithRetry(params.url);
const { content, title } = extractArticle(html, params.url);
const numberedContent = addNumbersToContent(content);
const extractedNumberedElements = extractNumberedElements(
numberedContent,
title,
);
const targetLanguage = params.targetLanguage;

await translate(
params.apiKey,
params.userId,
targetLanguage,
title,
numberedContent,
extractedNumberedElements,
params.url,
);
};
47 changes: 0 additions & 47 deletions web/app/feature/translate/libs/userTranslationqueueService.ts

This file was deleted.

38 changes: 38 additions & 0 deletions web/app/feature/translate/translate-user-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { Queue } from "~/utils/queue.server";
import { translateJob } from "./jobs/translate-job.server";

type TranslateJobData = {
url: string;
targetLanguage: string;
apiKey: string;
userId: number;
};

export const getTranslateUserQueue = (userId: number) => {
return Queue<TranslateJobData>(`translation-user-${userId}`, {
processor: async (job) => {
console.log(`Starting job ${job.id} for user ${userId}`);
try {
await translateJob(job.data);
console.log(`Job ${job.id} completed successfully for user ${userId}`);
} catch (error) {
console.error(
`Error processing job ${job.id} for user ${userId}:`,
error,
);
throw error;
}
},
onComplete: async (job, queue) => {
const waitingCount = await queue.getWaitingCount();
const activeCount = await queue.getActiveCount();
const delayedCount = await queue.getDelayedCount();
const totalCount = waitingCount + activeCount + delayedCount;
if (totalCount === 0) {
console.log(
`All translation jobs for user ${userId} have been completed.`,
);
}
},
});
};
6 changes: 4 additions & 2 deletions web/app/routes/translate/route.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { useRevalidator } from "@remix-run/react";
import { useEffect } from "react";
import { typedjson, useTypedLoaderData } from "remix-typedjson";
import { Header } from "~/components/Header";
import { getTranslateUserQueue } from "~/feature/translate/translate-user-queue";
import { validateGeminiApiKey } from "~/feature/translate/utils/gemini";
import { authenticator } from "~/utils/auth.server";
import { getTargetLanguage } from "~/utils/target-language.server";
Expand All @@ -15,7 +16,6 @@ import {
getDbUser,
listUserAiTranslationInfo,
} from "./functions/queries.server";
import { translateJob } from "./functions/translate-job.server";
import { schema } from "./types";

export async function loader({ request }: LoaderFunctionArgs) {
Expand Down Expand Up @@ -76,12 +76,14 @@ export async function action({ request }: ActionFunctionArgs) {
}
const targetLanguage = await getTargetLanguage(request);
// Start the translation job in background
translateJob({
const queue = getTranslateUserQueue(safeUser.id);
const job = await queue.add(`translate-${safeUser.id}`, {
url: submission.value.url,
targetLanguage,
apiKey: dbUser.geminiApiKey,
userId: safeUser.id,
});
console.log(job.toJSON());

return {
intent,
Expand Down
35 changes: 35 additions & 0 deletions web/app/utils/queue.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { Queue as BullQueue, type Job, type Processor, Worker } from "bullmq";
import { RedisConfig } from "./redis-config";

type RegisteredQueue = {
queue: BullQueue;
worker: Worker;
};

declare global {
var __registeredQueues: Record<string, RegisteredQueue> | undefined;
}

if (!global.__registeredQueues) {
global.__registeredQueues = {};
}
const registeredQueues = global.__registeredQueues;

export function Queue<Payload>(
name: string,
handlers: {
processor: Processor<Payload>;
onComplete: (job: Job<Payload>, queue: BullQueue<Payload>) => void;
},
): BullQueue<Payload> {
if (registeredQueues[name]) {
return registeredQueues[name].queue;
}
const queue = new BullQueue<Payload>(name, { connection: RedisConfig });
const worker = new Worker<Payload>(name, handlers.processor, {
connection: RedisConfig,
});
worker.on("completed", (job) => handlers.onComplete(job, queue));
registeredQueues[name] = { queue, worker };
return queue;
}
4 changes: 4 additions & 0 deletions web/app/utils/redis-config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export const RedisConfig = {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT ? Number(process.env.REDIS_PORT) : 6379,
};

0 comments on commit 221f770

Please sign in to comment.