-
I am using function get_file_hash(filepath: string) {
const file_buffer = fs.readFileSync(filepath);
const hash_sum = createHash('sha256');
hash_sum.update(file_buffer);
return hash_sum.digest('hex');
}
async function process_and_add_file_to_queue(file: string) {
const file_hash = get_file_hash(file);
const exisiting_processed_file = await get_processed_file_by_hash(file_hash);
if (exisiting_processed_file) {
logger.info(`File ${file} is already processed, skipping`);
return;
}
await metadata_extractor_queue.add("extract_metadata", { file });
const processed_file = await insert_processed_file({
filepath: file,
file_hash
});
if (processed_file) {
logger.info(`File ${file} added to queue and marked as processed`);
}
}
const audio_file_extensions = ['.mp3', '.wav', '.flac', '.ogg', '.aac', '.m4a'];
export function start_fs_watcher() {
const watcher = chokidar.watch(DATA_DIRECTORY, {
persistent: true,
ignoreInitial: false,
awaitWriteFinish: true,
ignored: (path, stats) => stats?.isFile() && !audio_file_extensions.some((ext) => path.endsWith(ext)),
});
watcher
.on('ready', async () => {
logger.info("Initial filesystem scan complete. Ready for changes")
})
.on('error', async (error) => {
logger.error(`Watcher error: ${error}`)
})
.on('add', async (file) => {
try {
await process_and_add_file_to_queue(file);
} catch (error) {
logger.error(error.message);
}
})
.on('change', async (file) => {
try {
await process_and_add_file_to_queue(file);
} catch (error) {
logger.error(error.message);
}
})
.on('unlink', async (file) => {
logger.warn(`File ${file} has been removed`);
});
} here is my queue and worker setup, const queue_opts = {
connection,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000
}
}
};
const metadata_extractor_queue_name = "metadata_extractor_queue";
const save_metadata_to_db_queue_name = "save_metadata_to_db_queue";
export const metadata_extractor_queue = new Queue(metadata_extractor_queue_name, queue_opts);
export const save_metadata_to_db_queue = new Queue(save_metadata_to_db_queue_name, queue_opts);
const metadata_extractor_worker = new Worker(metadata_extractor_queue_name, async (job) => {
const file = job.data.file;
const metadata = await extract_metadata(file);
if (metadata) {
await save_metadata_to_db_queue.add('save_metadata', { metadata });
}
}, { connection });
const save_metadata_to_db_worker = new Worker(save_metadata_to_db_queue_name, async (job) => {
const metadata = job.data.metadata;
if (metadata) {
await save_metadata([metadata]);
}
}, { connection });
const queues = [
metadata_extractor_queue,
save_metadata_to_db_queue
];
const workers = [
metadata_extractor_worker,
save_metadata_to_db_worker
];
for (const worker of workers) {
worker.on('active', (job) => {
logger.info(`⚙️ Job ${job.name} with id ${job.id} is being processed with ${worker.name} worker`);
});
worker.on('progress', (job) => {
logger.info(`⏳ Job ${job.name}: ${job.progress}% done`);
});
worker.on('completed', (job) => {
logger.info(`✅ Job ${job.name} with id ${job.id} completed via ${worker.name} worker`);
});
worker.on('failed', (job, error) => {
logger.error(`❌ Job ${job?.name} with id ${job?.id} failed from ${worker.name} worker: ${error.message}`);
});
}
export const connection = new IORedis(
REDIS_PORT,
REDIS_HOST,
{
maxRetriesPerRequest: null,
}
); to all this, I call this,
now I have a separate temp project for import { createBullBoard } from "@bull-board/api";
import { BullAdapter } from '@bull-board/api/bullAdapter.js';
import { ExpressAdapter } from "@bull-board/express";
import { Queue } from "bullmq";
import express from "express";
import IORedis from 'ioredis';
(async () => {
const connection = new IORedis(
6379,
"localhost",
{
maxRetriesPerRequest: null,
}
);
const queue_opts = {
connection,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000
}
}
};
const queues_list = [
"metadata_extractor_queue",
"save_metadata_to_db_queue"
];
const server_adapter = new ExpressAdapter();
server_adapter.setBasePath("/");
const queues = queues_list
.map((qs) => new Queue(qs, queue_opts))
.map((q) => new BullAdapter(q));
const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
queues,
serverAdapter: server_adapter
});
const app = express();
app.use("/", server_adapter.getRouter());
app.listen(8090, () => {
console.log("App is running on http://localhost:8090");
});
})(); by visiting this route I see that all jobs are successfully added to the |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
My recommendation here is that you start with a minimum test project with just one queue and one worker, when you get that working you can find your way up in where the issue is. Most likely the workers are either not connecting properly or they are listening to the wrong queue something like that, you will figure it out if you start simple and build from there. |
Beta Was this translation helpful? Give feedback.
The issue was
prefix
I had it set forqueue
but not forworker
.