From d629c21515da5e1abe208715b9452385ddc9760a Mon Sep 17 00:00:00 2001 From: tipusinghaw Date: Wed, 22 May 2024 15:41:49 +0530 Subject: [PATCH 1/2] feat: implemented bulk issuance with nest queue Signed-off-by: tipusinghaw --- apps/api-gateway/src/app.module.ts | 9 +----- apps/issuance/src/issuance.module.ts | 6 ++++ apps/issuance/src/issuance.processor.ts | 4 +-- apps/issuance/src/issuance.service.ts | 38 +++++++++++++++---------- 4 files changed, 32 insertions(+), 25 deletions(-) diff --git a/apps/api-gateway/src/app.module.ts b/apps/api-gateway/src/app.module.ts index 8b853a00e..167b19ae9 100644 --- a/apps/api-gateway/src/app.module.ts +++ b/apps/api-gateway/src/app.module.ts @@ -21,7 +21,6 @@ import { UserModule } from './user/user.module'; import { ConnectionModule } from './connection/connection.module'; import { EcosystemModule } from './ecosystem/ecosystem.module'; import { getNatsOptions } from '@credebl/common/nats.config'; -import { BullModule } from '@nestjs/bull'; import { CacheModule } from '@nestjs/cache-manager'; import * as redisStore from 'cache-manager-redis-store'; import { WebhookModule } from './webhook/webhook.module'; @@ -54,13 +53,7 @@ import { NotificationModule } from './notification/notification.module'; UtilitiesModule, WebhookModule, NotificationModule, - CacheModule.register({ store: redisStore, host: process.env.REDIS_HOST, port: process.env.REDIS_PORT }), - BullModule.forRoot({ - redis: { - host: process.env.REDIS_HOST, - port: parseInt(process.env.REDIS_PORT) - } - }) + CacheModule.register({ store: redisStore, host: process.env.REDIS_HOST, port: process.env.REDIS_PORT }) ], controllers: [AppController], providers: [AppService] diff --git a/apps/issuance/src/issuance.module.ts b/apps/issuance/src/issuance.module.ts index 0f52a44f5..b69696c53 100644 --- a/apps/issuance/src/issuance.module.ts +++ b/apps/issuance/src/issuance.module.ts @@ -27,6 +27,12 @@ import { AwsService } from '@credebl/aws'; ]), CommonModule, CacheModule.register({ store: redisStore, host: process.env.REDIS_HOST, port: process.env.REDIS_PORT }), + BullModule.forRoot({ + redis: { + host: process.env.REDIS_HOST, + port: parseInt(process.env.REDIS_PORT) + } + }), BullModule.registerQueue({ name: 'bulk-issuance' }) diff --git a/apps/issuance/src/issuance.processor.ts b/apps/issuance/src/issuance.processor.ts index 93c7487cf..bbe1e1b0a 100644 --- a/apps/issuance/src/issuance.processor.ts +++ b/apps/issuance/src/issuance.processor.ts @@ -11,11 +11,11 @@ export class BulkIssuanceProcessor { @OnQueueActive() onActive(job: Job): void { this.logger.log( - `Processing job ${job.id} of type ${job.name} with data ${JSON.stringify(job.data)}...` + `Emitting job status${job.id} of type ${job.name} with data ${JSON.stringify(job.data)}...` ); } - @Process('issue-credential') + @Process() async issueCredential(job: Job):Promise { this.logger.log( `Processing job ${job.id} of type ${job.name} with data ${JSON.stringify(job.data)}...` diff --git a/apps/issuance/src/issuance.service.ts b/apps/issuance/src/issuance.service.ts index 5ac7e811e..abf40a381 100644 --- a/apps/issuance/src/issuance.service.ts +++ b/apps/issuance/src/issuance.service.ts @@ -1085,23 +1085,31 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO if (!respFile) { throw new BadRequestException(ResponseMessages.issuance.error.fileData); } + // ------------------------ Remove after user --------------------------- + const queueRunningStatus = await this.bulkIssuanceQueue.isReady(); + this.logger.log(`respFile::::::`, respFile); + // eslint-disable-next-line no-console + console.log('queueRunningStatus:::::::::', queueRunningStatus); + // ------------------------ Remove after user --------------------------- + for (const element of respFile) { try { - const payload = { - data: element.credential_data, - fileUploadId: element.fileUploadId, - clientId: clientDetails.clientId, - cacheId: requestId, - credentialDefinitionId: element.credDefId, - schemaLedgerId: element.schemaId, - isRetry: false, - orgId, - id: element.id, - isLastData: respFile.indexOf(element) === respFile.length - 1 - }; - - await this.delay(500); // Wait for 0.5 secends - this.processIssuanceData(payload); + this.logger.log(`element log::::::`, element); //Remove after debugging + this.bulkIssuanceQueue.add( + { + data: element.credential_data, + fileUploadId: element.fileUploadId, + clientId: clientDetails.clientId, + cacheId: requestId, + credentialDefinitionId: element.credDefId, + schemaLedgerId: element.schemaId, + isRetry: false, + orgId, + id: element.id, + isLastData: respFile.indexOf(element) === respFile.length - 1 + }, + { delay: 5000 } + ); } catch (error) { this.logger.error(`Error processing issuance data: ${error}`); } From d848625c9b112c66085994a9c4f2be017aedb6a9 Mon Sep 17 00:00:00 2001 From: tipusinghaw Date: Wed, 22 May 2024 15:47:04 +0530 Subject: [PATCH 2/2] refactor: changed comment message Signed-off-by: tipusinghaw --- apps/issuance/src/issuance.service.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/issuance/src/issuance.service.ts b/apps/issuance/src/issuance.service.ts index abf40a381..4e0062b24 100644 --- a/apps/issuance/src/issuance.service.ts +++ b/apps/issuance/src/issuance.service.ts @@ -1085,12 +1085,12 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO if (!respFile) { throw new BadRequestException(ResponseMessages.issuance.error.fileData); } - // ------------------------ Remove after user --------------------------- + // ------------------------ Remove after debugging --------------------------- const queueRunningStatus = await this.bulkIssuanceQueue.isReady(); this.logger.log(`respFile::::::`, respFile); // eslint-disable-next-line no-console console.log('queueRunningStatus:::::::::', queueRunningStatus); - // ------------------------ Remove after user --------------------------- + // ------------------------ Remove after debugging --------------------------- for (const element of respFile) { try {