Skip to content

Commit

Permalink
feat(routing): add settings to control back off strategy on mediator …
Browse files Browse the repository at this point in the history
…reconnection (#1017)

Signed-off-by: Sergi Garreta <[email protected]>
  • Loading branch information
garretaserra authored Sep 8, 2022
1 parent 856f40d commit 543437c
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 8 deletions.
8 changes: 8 additions & 0 deletions packages/core/src/agent/AgentConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ export class AgentConfig {
return this.initConfig.maximumMessagePickup ?? 10
}

public get baseMediatorReconnectionIntervalMs() {
return this.initConfig.baseMediatorReconnectionIntervalMs ?? 100
}

public get maximumMediatorReconnectionIntervalMs() {
return this.initConfig.maximumMediatorReconnectionIntervalMs ?? Number.POSITIVE_INFINITY
}

public get endpoints(): [string, ...string[]] {
// if endpoints is not set, return queue endpoint
// https://github.com/hyperledger/aries-rfcs/issues/405#issuecomment-582612875
Expand Down
29 changes: 22 additions & 7 deletions packages/core/src/modules/routing/RecipientModule.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Logger } from '../../logger'
import type { DependencyManager } from '../../plugins'
import type { OutboundWebSocketClosedEvent } from '../../transport'
import type { OutboundWebSocketClosedEvent, OutboundWebSocketOpenedEvent } from '../../transport'
import type { OutboundMessage } from '../../types'
import type { ConnectionRecord } from '../connections'
import type { MediationStateChangedEvent } from './RoutingEvents'
Expand Down Expand Up @@ -141,14 +141,27 @@ export class RecipientModule {
}

private async openWebSocketAndPickUp(mediator: MediationRecord, pickupStrategy: MediatorPickupStrategy) {
let interval = 50
const { baseMediatorReconnectionIntervalMs, maximumMediatorReconnectionIntervalMs } = this.agentConfig
let interval = baseMediatorReconnectionIntervalMs

const stopConditions$ = merge(this.agentConfig.stop$, this.stopMessagePickup$).pipe()

// Reset back off interval when the websocket is successfully opened again
this.eventEmitter
.observable<OutboundWebSocketOpenedEvent>(TransportEventTypes.OutboundWebSocketOpenedEvent)
.pipe(
// Stop when the agent shuts down or stop message pickup signal is received
takeUntil(stopConditions$),
filter((e) => e.payload.connectionId === mediator.connectionId)
)
.subscribe(() => {
interval = baseMediatorReconnectionIntervalMs
})

// Listens to Outbound websocket closed events and will reopen the websocket connection
// in a recursive back off strategy if it matches the following criteria:
// - Agent is not shutdown
// - Socket was for current mediator connection id

const stopConditions$ = merge(this.agentConfig.stop$, this.stopMessagePickup$).pipe()
this.eventEmitter
.observable<OutboundWebSocketClosedEvent>(TransportEventTypes.OutboundWebSocketClosedEvent)
.pipe(
Expand All @@ -157,10 +170,12 @@ export class RecipientModule {
filter((e) => e.payload.connectionId === mediator.connectionId),
// Make sure we're not reconnecting multiple times
throttleTime(interval),
// Increase the interval (recursive back-off)
tap(() => (interval *= 2)),
// Wait for interval time before reconnecting
delayWhen(() => timer(interval))
delayWhen(() => timer(interval)),
// Increase the interval (recursive back-off)
tap(() => {
interval = Math.min(interval * 2, maximumMediatorReconnectionIntervalMs)
})
)
.subscribe({
next: async () => {
Expand Down
9 changes: 9 additions & 0 deletions packages/core/src/transport/TransportEventTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { BaseEvent } from '../agent/Events'

export enum TransportEventTypes {
OutboundWebSocketClosedEvent = 'OutboundWebSocketClosedEvent',
OutboundWebSocketOpenedEvent = 'OutboundWebSocketOpenedEvent',
}

export interface OutboundWebSocketClosedEvent extends BaseEvent {
Expand All @@ -11,3 +12,11 @@ export interface OutboundWebSocketClosedEvent extends BaseEvent {
connectionId?: string
}
}

export interface OutboundWebSocketOpenedEvent extends BaseEvent {
type: TransportEventTypes.OutboundWebSocketOpenedEvent
payload: {
socketId: string
connectionId?: string
}
}
10 changes: 9 additions & 1 deletion packages/core/src/transport/WsOutboundTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { AgentMessageReceivedEvent } from '../agent/Events'
import type { Logger } from '../logger'
import type { OutboundPackage } from '../types'
import type { OutboundTransport } from './OutboundTransport'
import type { OutboundWebSocketClosedEvent } from './TransportEventTypes'
import type { OutboundWebSocketClosedEvent, OutboundWebSocketOpenedEvent } from './TransportEventTypes'
import type WebSocket from 'ws'

import { AgentConfig } from '../agent/AgentConfig'
Expand Down Expand Up @@ -139,6 +139,14 @@ export class WsOutboundTransport implements OutboundTransport {
socket.onopen = () => {
this.logger.debug(`Successfully connected to WebSocket ${endpoint}`)
resolve(socket)

this.eventEmitter.emit<OutboundWebSocketOpenedEvent>({
type: TransportEventTypes.OutboundWebSocketOpenedEvent,
payload: {
socketId,
connectionId: connectionId,
},
})
}

socket.onerror = (error) => {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ export interface InitConfig {
mediatorPollingInterval?: number
mediatorPickupStrategy?: MediatorPickupStrategy
maximumMessagePickup?: number
baseMediatorReconnectionIntervalMs?: number
maximumMediatorReconnectionIntervalMs?: number

useLegacyDidSovPrefix?: boolean
connectionImageUrl?: string
Expand Down

0 comments on commit 543437c

Please sign in to comment.