From a54e19222975b1d6ef31d98578595c9d28d06bc6 Mon Sep 17 00:00:00 2001 From: tipusinghaw Date: Mon, 3 Jun 2024 18:32:51 +0530 Subject: [PATCH 1/3] feat: added implementation for retry Signed-off-by: tipusinghaw --- .../interfaces/issuance.interfaces.ts | 18 ++++ apps/issuance/src/issuance.processor.ts | 3 +- apps/issuance/src/issuance.service.ts | 87 +++++++++++-------- libs/common/src/response-messages/index.ts | 3 +- 4 files changed, 71 insertions(+), 40 deletions(-) diff --git a/apps/issuance/interfaces/issuance.interfaces.ts b/apps/issuance/interfaces/issuance.interfaces.ts index a3ee14968..239087921 100644 --- a/apps/issuance/interfaces/issuance.interfaces.ts +++ b/apps/issuance/interfaces/issuance.interfaces.ts @@ -297,4 +297,22 @@ export interface IJobDetails { credential_data: CredentialData orgId: string; credentialType: string; +} + +export interface IQueuePayload{ + id: string; + jobId: string; + cacheId?: string; + clientId: string; + referenceId: string; + fileUploadId: string; + schemaLedgerId: string; + credentialDefinitionId: string; + status: string; + credential_data: CredentialData; + orgId: string; + credentialType: string; + totalJobs: number; + isRetry: boolean; + isLastData: boolean; } \ No newline at end of file diff --git a/apps/issuance/src/issuance.processor.ts b/apps/issuance/src/issuance.processor.ts index bbe1e1b0a..8b87724d0 100644 --- a/apps/issuance/src/issuance.processor.ts +++ b/apps/issuance/src/issuance.processor.ts @@ -2,6 +2,7 @@ import { OnQueueActive, Process, Processor } from '@nestjs/bull'; import { Job } from 'bull'; import { IssuanceService } from './issuance.service'; import { Logger } from '@nestjs/common'; +import { IQueuePayload } from '../interfaces/issuance.interfaces'; @Processor('bulk-issuance') export class BulkIssuanceProcessor { @@ -16,7 +17,7 @@ export class BulkIssuanceProcessor { } @Process() - async issueCredential(job: Job):Promise { + async issueCredential(job: Job):Promise { this.logger.log( `Processing job ${job.id} of type ${job.name} with data ${JSON.stringify(job.data)}...` ); diff --git a/apps/issuance/src/issuance.service.ts b/apps/issuance/src/issuance.service.ts index 0c929c40d..80459013a 100644 --- a/apps/issuance/src/issuance.service.ts +++ b/apps/issuance/src/issuance.service.ts @@ -8,7 +8,7 @@ import { CommonConstants } from '@credebl/common/common.constant'; import { ResponseMessages } from '@credebl/common/response-messages'; import { ClientProxy, RpcException } from '@nestjs/microservices'; import { map } from 'rxjs'; -import { CredentialOffer, FileUpload, FileUploadData, IAttributes, IClientDetails, ICreateOfferResponse, ICredentialPayload, IIssuance, IIssueData, IPattern, ISendOfferNatsPayload, ImportFileDetails, IssueCredentialWebhookPayload, OutOfBandCredentialOfferPayload, PreviewRequest, SchemaDetails, SendEmailCredentialOffer, TemplateDetailsInterface } from '../interfaces/issuance.interfaces'; +import { CredentialOffer, FileUpload, FileUploadData, IAttributes, IClientDetails, ICreateOfferResponse, ICredentialPayload, IIssuance, IIssueData, IPattern, IQueuePayload, ISendOfferNatsPayload, ImportFileDetails, IssueCredentialWebhookPayload, OutOfBandCredentialOfferPayload, PreviewRequest, SchemaDetails, SendEmailCredentialOffer, TemplateDetailsInterface } from '../interfaces/issuance.interfaces'; import { OrgAgentType, SchemaType, TemplateIdentifier } from '@credebl/enum/enum'; import * as QRCode from 'qrcode'; import { OutOfBandIssuance } from '../templates/out-of-band-issuance.template'; @@ -37,6 +37,7 @@ import { sendEmail } from '@credebl/common/send-grid-helper-file'; @Injectable() export class IssuanceService { private readonly logger = new Logger('IssueCredentialService'); + private processedJobsCounters: Record = {}; constructor( @Inject('NATS_CLIENT') private readonly issuanceServiceProxy: ClientProxy, private readonly commonService: CommonService, @@ -1106,15 +1107,15 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO // Wait for all saveFileDetails operations to complete await Promise.all(saveFileDetailsPromises); - // Now fetch the file details const bulkpayload = await this.issuanceRepository.getFileDetails(csvFileDetail.id); if (!bulkpayload) { throw new BadRequestException(ResponseMessages.issuance.error.fileData); } - + const uniqueJobId = uuidv4(); const queueJobsArrayPromises = bulkpayload.map(async (item) => ({ data: { id: item.id, + jobId: uniqueJobId, cacheId: requestId, clientId: clientDetails.clientId, referenceId: item.referenceId, @@ -1124,11 +1125,13 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO status: item.status, credential_data: item.credential_data, orgId, - credentialType: item.credential_type + credentialType: item.credential_type, + totalJobs: bulkpayload.length, + isRetry: false, + isLastData: false } })); - - // Await all promises to complete + const queueJobsArray = await Promise.all(queueJobsArrayPromises); try { await this.bulkIssuanceQueue.addBulk(queueJobsArray); @@ -1150,7 +1153,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO } async retryBulkCredential(fileId: string, orgId: string, clientId: string): Promise { - let respFile; + let bulkpayloadRetry; try { @@ -1158,47 +1161,55 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO if (!fileDetails) { throw new BadRequestException(ResponseMessages.issuance.error.retry); } - respFile = await this.issuanceRepository.getFailedCredentials(fileId); - - if (0 === respFile.length) { + bulkpayloadRetry = await this.issuanceRepository.getFailedCredentials(fileId); + if (0 === bulkpayloadRetry.length) { const errorMessage = ResponseMessages.bulkIssuance.error.fileDetailsNotFound; throw new BadRequestException(`${errorMessage}`); } - - for (const element of respFile) { - try { - const payload = { - data: element.credential_data, - fileUploadId: element.fileUploadId, - clientId, - credentialDefinitionId: element.credDefId, - schemaLedgerId: element.schemaId, - orgId, - id: element.id, - isRetry: true, - isLastData: respFile.indexOf(element) === respFile.length - 1 - }; - - await this.delay(500); // Wait for 0.5 secends - this.processIssuanceData(payload); - if (0 === respFile.length) { - return FileUploadStatus.completed; - } - } catch (error) { - // Handle errors if needed - this.logger.error(`Error processing issuance data: ${error}`); + const uniqueJobId = uuidv4(); + const queueJobsArrayPromises = bulkpayloadRetry.map(async (item) => ({ + data: { + id: item.id, + jobId: uniqueJobId, + clientId, + referenceId: item.referenceId, + fileUploadId: item.fileUploadId, + schemaLedgerId: item.schemaId, + credentialDefinitionId: item.credDefId, + status: item.status, + credential_data: item.credential_data, + orgId, + credentialType: item.credential_type, + totalJobs: bulkpayloadRetry.length, + isRetry: true, + isLastData: false } - } - - return 'Process reinitiated for bulk issuance'; + })); + const queueJobsArray = await Promise.all(queueJobsArrayPromises); + try { + await this.bulkIssuanceQueue.addBulk(queueJobsArray); + } catch (error) { + this.logger.error(`Error processing issuance data: ${error}`); + } + + return ResponseMessages.bulkIssuance.success.reinitiated; } catch (error) { throw new RpcException(error.response ? error.response : error); } } - // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types - async processIssuanceData(jobDetails): Promise { + async processIssuanceData(jobDetails: IQueuePayload): Promise { + const {jobId, totalJobs} = jobDetails; + if (!this.processedJobsCounters[jobId]) { + this.processedJobsCounters[jobId] = 0; + } + this.processedJobsCounters[jobId] += 1; + if (this.processedJobsCounters[jobId] === totalJobs) { + jobDetails.isLastData = true; + delete this.processedJobsCounters[jobId]; + } + const socket = await io(`${process.env.SOCKET_HOST}`, { reconnection: true, reconnectionDelay: 5000, diff --git a/libs/common/src/response-messages/index.ts b/libs/common/src/response-messages/index.ts index 02a141ba4..df0fab285 100644 --- a/libs/common/src/response-messages/index.ts +++ b/libs/common/src/response-messages/index.ts @@ -411,7 +411,8 @@ export const ResponseMessages = { }, bulkIssuance: { success: { - create: 'Issuance process successfully' + create: 'Issuance process successfully', + reinitiated: 'Process reinitiated for bulk issuance' }, error: { PathNotFound: 'Path to export data not found.', From 0a64aaa586b90a56fe9b9a8e4d3bb541f934f407 Mon Sep 17 00:00:00 2001 From: tipusinghaw Date: Wed, 5 Jun 2024 12:09:51 +0530 Subject: [PATCH 2/3] refactor: changed message for issunace Signed-off-by: tipusinghaw --- libs/common/src/response-messages/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/common/src/response-messages/index.ts b/libs/common/src/response-messages/index.ts index b4a476720..b0d787259 100644 --- a/libs/common/src/response-messages/index.ts +++ b/libs/common/src/response-messages/index.ts @@ -414,7 +414,7 @@ export const ResponseMessages = { }, bulkIssuance: { success: { - create: 'Issuance process successfully', + create: 'Issuance process initiated successfully', reinitiated: 'Process reinitiated for bulk issuance' }, error: { From 973277f6deb0b65945ee62dd448b9656602c5b39 Mon Sep 17 00:00:00 2001 From: tipusinghaw Date: Fri, 7 Jun 2024 12:17:52 +0530 Subject: [PATCH 3/3] feat: added support for async queue calling the processer function Signed-off-by: tipusinghaw --- apps/issuance/src/issuance.processor.ts | 2 +- apps/issuance/src/issuance.service.ts | 6 ++++-- libs/common/src/response-messages/index.ts | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/apps/issuance/src/issuance.processor.ts b/apps/issuance/src/issuance.processor.ts index 8b87724d0..be45d2a36 100644 --- a/apps/issuance/src/issuance.processor.ts +++ b/apps/issuance/src/issuance.processor.ts @@ -22,6 +22,6 @@ export class BulkIssuanceProcessor { `Processing job ${job.id} of type ${job.name} with data ${JSON.stringify(job.data)}...` ); - await this.issuanceService.processIssuanceData(job.data); + this.issuanceService.processIssuanceData(job.data); } } diff --git a/apps/issuance/src/issuance.service.ts b/apps/issuance/src/issuance.service.ts index 80459013a..200371fda 100644 --- a/apps/issuance/src/issuance.service.ts +++ b/apps/issuance/src/issuance.service.ts @@ -37,6 +37,7 @@ import { sendEmail } from '@credebl/common/send-grid-helper-file'; @Injectable() export class IssuanceService { private readonly logger = new Logger('IssueCredentialService'); + private counter = 0; private processedJobsCounters: Record = {}; constructor( @Inject('NATS_CLIENT') private readonly issuanceServiceProxy: ClientProxy, @@ -694,8 +695,9 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO disposition: 'attachment' } ]; - const isEmailSent = await sendEmail(this.emailData); - this.logger.log(`isEmailSent ::: ${JSON.stringify(isEmailSent)}`); + const isEmailSent = await sendEmail(this.emailData); + this.logger.log(`isEmailSent ::: ${JSON.stringify(isEmailSent)}-${this.counter}`); + this.counter++; if (!isEmailSent) { errors.push(new InternalServerErrorException(ResponseMessages.issuance.error.emailSend)); return false; diff --git a/libs/common/src/response-messages/index.ts b/libs/common/src/response-messages/index.ts index b0d787259..f58d66eb5 100644 --- a/libs/common/src/response-messages/index.ts +++ b/libs/common/src/response-messages/index.ts @@ -422,7 +422,7 @@ export const ResponseMessages = { invalidtemplateId: 'Invalid template id.', invalidIdentifier: 'Invalid Identifier', exportFile: 'An error occurred during CSV export.', - emailColumn: '1st column of the file should always be email.', + emailColumn: '1st column of the file should always be email_identifier.', attributeNumber: 'Number of supplied values is different from the number of schema attributes.', mismatchedAttributes: 'Schema attributes are mismatched in the file header.', fileDetailsNotFound: 'File details not found.',