Skip to content

Commit

Permalink
refactor: inbound transport to use AgentMessageReceivedEvent
Browse files Browse the repository at this point in the history
Signed-off-by: Sai Ranjit Tummalapalli <[email protected]>
  • Loading branch information
sairanjit committed Sep 13, 2024
1 parent e24b92f commit cc6d1ab
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 11 deletions.
1 change: 1 addition & 0 deletions packages/core/src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ export class Agent<AgentModules extends AgentModulesInput = any> extends BaseAge
.receiveMessage(e.payload.message, {
connection: e.payload.connection,
contextCorrelationId: e.payload.contextCorrelationId,
session: e.payload.session,
})
.catch((error) => {
this.logger.error('Failed to process message', { error })
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/agent/Events.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { AgentMessage } from './AgentMessage'
import type { TransportSession } from './TransportService'
import type { OutboundMessageContext, OutboundMessageSendStatus } from './models'
import type { ConnectionRecord } from '../modules/connections'
import type { Observable } from 'rxjs'
Expand Down Expand Up @@ -34,6 +35,7 @@ export interface AgentMessageReceivedEvent extends BaseEvent {
connection?: ConnectionRecord
contextCorrelationId?: string
receivedAt?: Date
session?: TransportSession
}
}

Expand Down
20 changes: 15 additions & 5 deletions packages/node/src/transport/HttpInboundTransport.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import type { InboundTransport, Agent, TransportSession, EncryptedMessage, AgentContext } from '@credo-ts/core'
import type {
InboundTransport,
Agent,
TransportSession,
EncryptedMessage,
AgentContext,
AgentMessageReceivedEvent,
} from '@credo-ts/core'
import type { Express, Request, Response } from 'express'
import type { Server } from 'http'

import { DidCommMimeType, CredoError, TransportService, utils, MessageReceiver } from '@credo-ts/core'
import { DidCommMimeType, CredoError, TransportService, utils, AgentEventTypes } from '@credo-ts/core'
import express, { text } from 'express'

const supportedContentTypes: string[] = [DidCommMimeType.V0, DidCommMimeType.V1]
Expand All @@ -29,7 +36,6 @@ export class HttpInboundTransport implements InboundTransport {

public async start(agent: Agent) {
const transportService = agent.dependencyManager.resolve(TransportService)
const messageReceiver = agent.dependencyManager.resolve(MessageReceiver)

agent.config.logger.debug(`Starting HTTP inbound transport`, {
port: this.port,
Expand All @@ -52,8 +58,12 @@ export class HttpInboundTransport implements InboundTransport {
try {
const message = req.body
const encryptedMessage = JSON.parse(message)
await messageReceiver.receiveMessage(encryptedMessage, {
session,
agent.events.emit<AgentMessageReceivedEvent>(agent.context, {
type: AgentEventTypes.AgentMessageReceived,
payload: {
message: encryptedMessage,
session: session,
},
})

// If agent did not use session when processing message we need to send response here.
Expand Down
24 changes: 18 additions & 6 deletions packages/node/src/transport/WsInboundTransport.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import type { Agent, InboundTransport, Logger, TransportSession, EncryptedMessage, AgentContext } from '@credo-ts/core'

import { CredoError, TransportService, utils, MessageReceiver } from '@credo-ts/core'
import type {
Agent,
InboundTransport,
Logger,
TransportSession,
EncryptedMessage,
AgentContext,
AgentMessageReceivedEvent,
} from '@credo-ts/core'

import { CredoError, TransportService, utils, AgentEventTypes } from '@credo-ts/core'
// eslint-disable-next-line import/no-named-as-default
import WebSocket, { Server } from 'ws'

Expand Down Expand Up @@ -58,13 +66,17 @@ export class WsInboundTransport implements InboundTransport {
}

private listenOnWebSocketMessages(agent: Agent, socket: WebSocket, session: TransportSession) {
const messageReceiver = agent.dependencyManager.resolve(MessageReceiver)

// eslint-disable-next-line @typescript-eslint/no-explicit-any
socket.addEventListener('message', async (event: any) => {
this.logger.debug('WebSocket message event received.', { url: event.target.url })
try {
await messageReceiver.receiveMessage(JSON.parse(event.data), { session })
agent.events.emit<AgentMessageReceivedEvent>(agent.context, {
type: AgentEventTypes.AgentMessageReceived,
payload: {
message: JSON.parse(event.data),
session: session,
},
})
} catch (error) {
this.logger.error(`Error processing message: ${error}`)
}
Expand Down

0 comments on commit cc6d1ab

Please sign in to comment.