diff --git a/.vscode/launch.json b/.vscode/launch.json index a3a25278d..a38ac5ac5 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -19,6 +19,13 @@ ], "type": "node" }, + { + "command": "yarn test:sdk", + "name": "Run test:sdk", + "request": "launch", + "type": "node-terminal", + "cwd": "${workspaceFolder}/integration-tests/e2e-tests" + }, { "name": "TESTS", "type": "node", diff --git a/integration-tests/e2e-tests/src/steps/LifecycleSteps.ts b/integration-tests/e2e-tests/src/steps/LifecycleSteps.ts index 277545549..151b06741 100644 --- a/integration-tests/e2e-tests/src/steps/LifecycleSteps.ts +++ b/integration-tests/e2e-tests/src/steps/LifecycleSteps.ts @@ -1,9 +1,9 @@ -import {Before, BeforeAll} from "@cucumber/cucumber" -import {Actor, actorCalled, Cast, engage, TakeNotes} from "@serenity-js/core" -import {CallAnApi} from "@serenity-js/rest" -import {Utils} from "../Utils" -import {WalletSdk} from "../abilities/WalletSdk" -import {axiosInstance, CloudAgentConfiguration} from "../configuration/CloudAgentConfiguration" +import { Before, BeforeAll } from "@cucumber/cucumber" +import { Actor, actorCalled, Cast, engage, TakeNotes } from "@serenity-js/core" +import { CallAnApi } from "@serenity-js/rest" +import { Utils } from "../Utils" +import { WalletSdk } from "../abilities/WalletSdk" +import { axiosInstance, CloudAgentConfiguration } from "../configuration/CloudAgentConfiguration" BeforeAll(async () => { Utils.prepareNotes() diff --git a/src/prism-agent/connectionsManager/ConnectionsManager.ts b/src/prism-agent/connectionsManager/ConnectionsManager.ts index fafb6d91b..26cba2a07 100644 --- a/src/prism-agent/connectionsManager/ConnectionsManager.ts +++ b/src/prism-agent/connectionsManager/ConnectionsManager.ts @@ -131,7 +131,7 @@ export class ConnectionsManager implements ConnectionsManagerClass { async processMessages(unreadMessages: { attachmentId: string; message: Message; - }[]): Promise { + }[] = []): Promise { if (!this.mediationHandler.mediator) { throw new AgentError.NoMediatorAvailableError(); } @@ -268,45 +268,25 @@ export class ConnectionsManager implements ConnectionsManagerClass { const currentMediator = this.mediationHandler.mediator.mediatorDID; const resolvedMediator = await this.castor.resolveDID(currentMediator.toString()); const hasWebsocket = resolvedMediator.services.find(({ serviceEndpoint: { uri } }) => - ( - uri.startsWith("ws://") || - uri.startsWith("wss://") - ) && this.withWebsocketsExperiment + ( + uri.startsWith("ws://") || + uri.startsWith("wss://") + ) ); - if (!hasWebsocket) { - const timeInterval = Math.max(iterationPeriod, 5) * 1000; - this.cancellable = new CancellableTask(async () => { - const unreadMessages = await this.mediationHandler.pickupUnreadMessages(10); - await this.processMessages(unreadMessages); - }, timeInterval); - } else { - //Connecting to websockets, do not repeat the task + if (hasWebsocket && this.withWebsocketsExperiment) { this.cancellable = new CancellableTask(async (signal) => { this.mediationHandler.listenUnreadMessages( signal, hasWebsocket.serviceEndpoint.uri, - async (messages) => { - const unreadMessages = messages.reduce<{ - attachmentId: string; - message: Message; - }[]>((unreads, message) => { - const attachment = message.attachments.at(0); - if (!attachment) { - return unreads; - } - return [ - ...unreads, - { - message: message, - attachmentId: attachment.id - } - ]; - }, []); - - await this.processMessages(unreadMessages); - } + (messages) => this.processMessages(messages) ); }); + } else { + const timeInterval = Math.max(iterationPeriod, 5) * 1000; + this.cancellable = new CancellableTask(async () => { + const unreadMessages = await this.mediationHandler.pickupUnreadMessages(10); + await this.processMessages(unreadMessages); + }, timeInterval); } this.cancellable.then().catch((err) => { diff --git a/src/prism-agent/helpers/Task.ts b/src/prism-agent/helpers/Task.ts index f273c136f..834e61703 100644 --- a/src/prism-agent/helpers/Task.ts +++ b/src/prism-agent/helpers/Task.ts @@ -31,14 +31,16 @@ export class CancellableTask { } private loopOnTaskEvery(task: Task, reject: (reason?: Error) => void, signal: AbortSignal) { - task(signal) - .then(() => { - this.clearTimer(); - this.timer = setTimeout(() => { - this.loopOnTaskEvery(task, reject, signal); - }, this.period); - }) - .catch(reject); + if (!this.controller.signal.aborted) { + task(signal) + .then(() => { + this.clearTimer(); + this.timer = setTimeout(() => { + this.loopOnTaskEvery(task, reject, signal); + }, this.period); + }) + .catch(reject); + } } cancel() { diff --git a/src/prism-agent/mediator/BasicMediatorHandler.ts b/src/prism-agent/mediator/BasicMediatorHandler.ts index f6066f4ee..79d7de9c6 100644 --- a/src/prism-agent/mediator/BasicMediatorHandler.ts +++ b/src/prism-agent/mediator/BasicMediatorHandler.ts @@ -196,13 +196,18 @@ export class BasicMediatorHandler implements MediatorHandler { listenUnreadMessages( signal: AbortSignal, serviceEndpointUri: string, - onMessage: EventCallback + onMessage: (messages: { + attachmentId: string; + message: Message; + }[]) => void | Promise ) { //Todo: we may want to abstract this to allow users to use their own native implementations for websockets //Or potentially be TCP sockets directly, this can be used in electron and nodejs can establish tcp connections directly. const socket = new WebSocket(serviceEndpointUri); signal.addEventListener("abort", () => { - socket.close() + if (socket.readyState === socket.OPEN) { + socket.close() + } }); socket.addEventListener("open", async () => { @@ -229,8 +234,7 @@ export class BasicMediatorHandler implements MediatorHandler { decryptMessage.piuri === ProtocolType.PickupStatus || decryptMessage.piuri === ProtocolType.PickupDelivery) { const delivered = await new PickupRunner(decryptMessage, this.mercury).run() - const deliveredMessages = delivered.map(({ message }) => message); - onMessage(deliveredMessages) + await onMessage(delivered) } }) diff --git a/src/prism-agent/types/index.ts b/src/prism-agent/types/index.ts index e65dd1c45..ed2bc4df1 100644 --- a/src/prism-agent/types/index.ts +++ b/src/prism-agent/types/index.ts @@ -166,6 +166,9 @@ export abstract class MediatorHandler { abstract listenUnreadMessages( signal: AbortSignal, serviceEndpointUri: string, - onMessage: EventCallback + onMessage: (messages: { + attachmentId: string; + message: Message; + }[]) => void | Promise ): void } diff --git a/tests/agent/Agent.ConnectionsManager.test.ts b/tests/agent/Agent.ConnectionsManager.test.ts index e766c82d9..2165efdd9 100644 --- a/tests/agent/Agent.ConnectionsManager.test.ts +++ b/tests/agent/Agent.ConnectionsManager.test.ts @@ -33,7 +33,6 @@ async function createBasicMediationHandler( handler: BasicMediatorHandler } > { - const seed = apollo.createRandomSeed().seed; const keypair = apollo.createPrivateKey({ type: KeyTypes.EC, @@ -66,24 +65,21 @@ async function createBasicMediationHandler( } } +jest.mock('isows', () => ({ + WebSocket: jest.fn(() => ({ + addEventListener: jest.fn(), + send: jest.fn(), + close: jest.fn(), + })), +})); describe("ConnectionsManager tests", () => { - beforeEach(() => { - jest.mock('isows', () => ({ - WebSocket: jest.fn(() => ({ - addEventListener: jest.fn(), - send: jest.fn(), - close: jest.fn(), - })), - })); - }) - afterEach(() => { jest.restoreAllMocks(); }); - it("Should use websockets if the mediator's did endpoint uri contains ws or wss and agent options have the opt in", async () => { + it.only("Should use websockets if the mediator's did endpoint uri contains ws or wss and agent options have the opt in", async () => { const services = [ new Service( "#didcomm-1", @@ -92,7 +88,15 @@ describe("ConnectionsManager tests", () => { ) ]; const ConnectionsManager = jest.requireActual('../../src/prism-agent/connectionsManager/ConnectionsManager').ConnectionsManager; - const BasicMediatorHandler = jest.requireMock('../../src/prism-agent/mediator/BasicMediatorHandler').BasicMediatorHandler; + const BasicMediatorHandler = jest.requireActual('../../src/prism-agent/mediator/BasicMediatorHandler').BasicMediatorHandler; + jest.mock('isows', () => ({ + WebSocket: jest.fn(() => ({ + addEventListener: jest.fn(), + send: jest.fn(), + close: jest.fn(), + })), + })); + const { manager, handler } = await createBasicMediationHandler( ConnectionsManager, BasicMediatorHandler, @@ -121,6 +125,13 @@ describe("ConnectionsManager tests", () => { ]; const ConnectionsManager = jest.requireActual('../../src/prism-agent/connectionsManager/ConnectionsManager').ConnectionsManager; const BasicMediatorHandler = jest.requireMock('../../src/prism-agent/mediator/BasicMediatorHandler').BasicMediatorHandler; + jest.mock('isows', () => ({ + WebSocket: jest.fn(() => ({ + addEventListener: jest.fn(), + send: jest.fn(), + close: jest.fn(), + })), + })); const { manager, handler } = await createBasicMediationHandler( ConnectionsManager, BasicMediatorHandler, @@ -145,6 +156,13 @@ describe("ConnectionsManager tests", () => { ]; const ConnectionsManager = jest.requireActual('../../src/prism-agent/connectionsManager/ConnectionsManager').ConnectionsManager; const BasicMediatorHandler = jest.requireMock('../../src/prism-agent/mediator/BasicMediatorHandler').BasicMediatorHandler; + jest.mock('isows', () => ({ + WebSocket: jest.fn(() => ({ + addEventListener: jest.fn(), + send: jest.fn(), + close: jest.fn(), + })), + })); const { manager, handler } = await createBasicMediationHandler( ConnectionsManager, BasicMediatorHandler, @@ -174,6 +192,13 @@ describe("ConnectionsManager tests", () => { ]; const ConnectionsManager = jest.requireActual('../../src/prism-agent/connectionsManager/ConnectionsManager').ConnectionsManager; const BasicMediatorHandler = jest.requireMock('../../src/prism-agent/mediator/BasicMediatorHandler').BasicMediatorHandler; + jest.mock('isows', () => ({ + WebSocket: jest.fn(() => ({ + addEventListener: jest.fn(), + send: jest.fn(), + close: jest.fn(), + })), + })); const { manager, handler } = await createBasicMediationHandler( ConnectionsManager, BasicMediatorHandler, @@ -203,6 +228,13 @@ describe("ConnectionsManager tests", () => { ]; const ConnectionsManager = jest.requireActual('../../src/prism-agent/connectionsManager/ConnectionsManager').ConnectionsManager; const BasicMediatorHandler = jest.requireMock('../../src/prism-agent/mediator/BasicMediatorHandler').BasicMediatorHandler; + jest.mock('isows', () => ({ + WebSocket: jest.fn(() => ({ + addEventListener: jest.fn(), + send: jest.fn(), + close: jest.fn(), + })), + })); const { manager, handler } = await createBasicMediationHandler( ConnectionsManager, BasicMediatorHandler, diff --git a/tests/agent/mocks/ConnectionManagerMock.ts b/tests/agent/mocks/ConnectionManagerMock.ts index 5bc3b5306..ccef8a27c 100644 --- a/tests/agent/mocks/ConnectionManagerMock.ts +++ b/tests/agent/mocks/ConnectionManagerMock.ts @@ -91,8 +91,8 @@ export class ConnectionsManagerMock implements ConnectionsManagerClass { pickupUnreadMessages: function (limit: number): Promise<{ attachmentId: string; message: Message; }[]> { throw new Error("Mock pickupUnreadMessages Function not implemented."); }, - listenUnreadMessages: function (signal: AbortSignal, serviceEndpointUri: string, onMessage: EventCallback): void { - throw new Error("Mock listenUnreadMessages Function not implemented."); + listenUnreadMessages: function (signal: AbortSignal, serviceEndpointUri: string, onMessage: (messages: { attachmentId: string; message: Message; }[]) => void | Promise): void { + throw new Error("Function not implemented."); } };