diff --git a/web/app/feature/translate/jobs/translate-job.server.ts b/web/app/feature/translate/jobs/translate-job.server.ts new file mode 100644 index 0000000..0cbd471 --- /dev/null +++ b/web/app/feature/translate/jobs/translate-job.server.ts @@ -0,0 +1,32 @@ +import { translate } from "../libs/translation"; +import { addNumbersToContent } from "../utils/addNumbersToContent"; +import { extractArticle } from "../utils/extractArticle"; +import { extractNumberedElements } from "../utils/extractNumberedElements"; +import { fetchWithRetry } from "../utils/fetchWithRetry"; + +interface TranslateJobParams { + url: string; + targetLanguage: string; + apiKey: string; + userId: number; +} +export const translateJob = async (params: TranslateJobParams) => { + const html = await fetchWithRetry(params.url); + const { content, title } = extractArticle(html, params.url); + const numberedContent = addNumbersToContent(content); + const extractedNumberedElements = extractNumberedElements( + numberedContent, + title, + ); + const targetLanguage = params.targetLanguage; + + await translate( + params.apiKey, + params.userId, + targetLanguage, + title, + numberedContent, + extractedNumberedElements, + params.url, + ); +}; diff --git a/web/app/feature/translate/libs/userTranslationqueueService.ts b/web/app/feature/translate/libs/userTranslationqueueService.ts deleted file mode 100644 index 7cb8c34..0000000 --- a/web/app/feature/translate/libs/userTranslationqueueService.ts +++ /dev/null @@ -1,47 +0,0 @@ -import Queue, { type Queue as QueueType } from "bull"; -import { REDIS_URL } from "../../../routes/translate/constants"; -import { processTranslationJob } from "./translation"; - -const createUserTranslationQueue = (userId: number) => - new Queue(`translation-user-${userId}`, REDIS_URL, { - defaultJobOptions: { - removeOnComplete: true, - removeOnFail: true, - }, - }); - -const userTranslationQueues: { [userId: number]: QueueType } = {}; - -export function setupUserQueue(userId: number, geminiApiKey: string) { - if (userTranslationQueues[userId]) { - return userTranslationQueues[userId]; - } - const userTranslationQueue = createUserTranslationQueue(userId); - userTranslationQueue.process(async (job) => { - console.log(`Starting job ${job.id} for user ${userId}`); - try { - await processTranslationJob(job, geminiApiKey, userId); - console.log(`Job ${job.id} completed successfully for user ${userId}`); - } catch (error) { - console.error( - `Error processing job ${job.id} for user ${userId}:`, - error, - ); - throw error; - } - }); - - userTranslationQueue.on("completed", async (job) => { - const activeCount = await userTranslationQueue.getActiveCount(); - const waitingCount = await userTranslationQueue.getWaitingCount(); - - if (activeCount === 0 && waitingCount === 0) { - console.log( - `All translation jobs for user ${userId} have been completed.`, - ); - } - }); - - userTranslationQueues[userId] = userTranslationQueue; - return userTranslationQueue; -} diff --git a/web/app/feature/translate/translate-user-queue.ts b/web/app/feature/translate/translate-user-queue.ts new file mode 100644 index 0000000..88cb68b --- /dev/null +++ b/web/app/feature/translate/translate-user-queue.ts @@ -0,0 +1,38 @@ +import { Queue } from "~/utils/queue.server"; +import { translateJob } from "./jobs/translate-job.server"; + +type TranslateJobData = { + url: string; + targetLanguage: string; + apiKey: string; + userId: number; +}; + +export const getTranslateUserQueue = (userId: number) => { + return Queue(`translation-user-${userId}`, { + processor: async (job) => { + console.log(`Starting job ${job.id} for user ${userId}`); + try { + await translateJob(job.data); + console.log(`Job ${job.id} completed successfully for user ${userId}`); + } catch (error) { + console.error( + `Error processing job ${job.id} for user ${userId}:`, + error, + ); + throw error; + } + }, + onComplete: async (job, queue) => { + const waitingCount = await queue.getWaitingCount(); + const activeCount = await queue.getActiveCount(); + const delayedCount = await queue.getDelayedCount(); + const totalCount = waitingCount + activeCount + delayedCount; + if (totalCount === 0) { + console.log( + `All translation jobs for user ${userId} have been completed.`, + ); + } + }, + }); +}; diff --git a/web/app/routes/translate/route.tsx b/web/app/routes/translate/route.tsx index 9468f74..c861286 100644 --- a/web/app/routes/translate/route.tsx +++ b/web/app/routes/translate/route.tsx @@ -4,6 +4,7 @@ import { useRevalidator } from "@remix-run/react"; import { useEffect } from "react"; import { typedjson, useTypedLoaderData } from "remix-typedjson"; import { Header } from "~/components/Header"; +import { getTranslateUserQueue } from "~/feature/translate/translate-user-queue"; import { validateGeminiApiKey } from "~/feature/translate/utils/gemini"; import { authenticator } from "~/utils/auth.server"; import { getTargetLanguage } from "~/utils/target-language.server"; @@ -15,7 +16,6 @@ import { getDbUser, listUserAiTranslationInfo, } from "./functions/queries.server"; -import { translateJob } from "./functions/translate-job.server"; import { schema } from "./types"; export async function loader({ request }: LoaderFunctionArgs) { @@ -76,12 +76,14 @@ export async function action({ request }: ActionFunctionArgs) { } const targetLanguage = await getTargetLanguage(request); // Start the translation job in background - translateJob({ + const queue = getTranslateUserQueue(safeUser.id); + const job = await queue.add(`translate-${safeUser.id}`, { url: submission.value.url, targetLanguage, apiKey: dbUser.geminiApiKey, userId: safeUser.id, }); + console.log(job.toJSON()); return { intent, diff --git a/web/app/utils/queue.server.ts b/web/app/utils/queue.server.ts new file mode 100644 index 0000000..31ef8dd --- /dev/null +++ b/web/app/utils/queue.server.ts @@ -0,0 +1,35 @@ +import { Queue as BullQueue, type Job, type Processor, Worker } from "bullmq"; +import { RedisConfig } from "./redis-config"; + +type RegisteredQueue = { + queue: BullQueue; + worker: Worker; +}; + +declare global { + var __registeredQueues: Record | undefined; +} + +if (!global.__registeredQueues) { + global.__registeredQueues = {}; +} +const registeredQueues = global.__registeredQueues; + +export function Queue( + name: string, + handlers: { + processor: Processor; + onComplete: (job: Job, queue: BullQueue) => void; + }, +): BullQueue { + if (registeredQueues[name]) { + return registeredQueues[name].queue; + } + const queue = new BullQueue(name, { connection: RedisConfig }); + const worker = new Worker(name, handlers.processor, { + connection: RedisConfig, + }); + worker.on("completed", (job) => handlers.onComplete(job, queue)); + registeredQueues[name] = { queue, worker }; + return queue; +} diff --git a/web/app/utils/redis-config.ts b/web/app/utils/redis-config.ts new file mode 100644 index 0000000..49c0041 --- /dev/null +++ b/web/app/utils/redis-config.ts @@ -0,0 +1,4 @@ +export const RedisConfig = { + host: process.env.REDIS_HOST, + port: process.env.REDIS_PORT ? Number(process.env.REDIS_PORT) : 6379, +};