From e24b92f175cc711fb89bc8c48092b0974958ecf0 Mon Sep 17 00:00:00 2001 From: Pritam Singh Date: Thu, 9 Feb 2023 12:59:55 +0530 Subject: [PATCH] fix: add processMessagesConcurrently config for processing message concurrently Signed-off-by: Pritam Singh --- packages/core/src/agent/Agent.ts | 23 ++++++++++++----------- packages/core/src/agent/AgentConfig.ts | 4 ++++ packages/core/src/types.ts | 1 + 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/packages/core/src/agent/Agent.ts b/packages/core/src/agent/Agent.ts index 8179d7bef5..521528a00d 100644 --- a/packages/core/src/agent/Agent.ts +++ b/packages/core/src/agent/Agent.ts @@ -8,7 +8,7 @@ import type { InitConfig } from '../types' import type { Subscription } from 'rxjs' import { Subject } from 'rxjs' -import { concatMap, takeUntil } from 'rxjs/operators' +import { mergeMap, takeUntil } from 'rxjs/operators' import { InjectionSymbols } from '../constants' import { SigningProviderToken } from '../crypto' @@ -152,16 +152,17 @@ export class Agent extends BaseAge .observable(AgentEventTypes.AgentMessageReceived) .pipe( takeUntil(stop$), - concatMap((e) => - this.messageReceiver - .receiveMessage(e.payload.message, { - connection: e.payload.connection, - contextCorrelationId: e.payload.contextCorrelationId, - receivedAt: e.payload.receivedAt, - }) - .catch((error) => { - this.logger.error('Failed to process message', { error }) - }) + mergeMap( + (e) => + this.messageReceiver + .receiveMessage(e.payload.message, { + connection: e.payload.connection, + contextCorrelationId: e.payload.contextCorrelationId, + }) + .catch((error) => { + this.logger.error('Failed to process message', { error }) + }), + this.agentConfig.processMessagesConcurrently ? undefined : 1 ) ) .subscribe() diff --git a/packages/core/src/agent/AgentConfig.ts b/packages/core/src/agent/AgentConfig.ts index a6a4cb379f..8343f51e1d 100644 --- a/packages/core/src/agent/AgentConfig.ts +++ b/packages/core/src/agent/AgentConfig.ts @@ -75,6 +75,10 @@ export class AgentConfig { return this.initConfig.backupBeforeStorageUpdate ?? true } + public get processMessagesConcurrently() { + return this.initConfig.processMessagesConcurrently ?? false + } + public extend(config: Partial): AgentConfig { return new AgentConfig( { ...this.initConfig, logger: this.logger, label: this.label, ...config }, diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index cd00c8706b..60f16291af 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -83,6 +83,7 @@ export interface InitConfig { connectionImageUrl?: string autoUpdateStorageOnStartup?: boolean backupBeforeStorageUpdate?: boolean + processMessagesConcurrently?: boolean } export type ProtocolVersion = `${number}.${number}`