-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: ConnectionManager emit Messages (#190)
Signed-off-by: Francisco Javier Ribó Labrador <[email protected]> Signed-off-by: Francisco Javier Ribo Labrador <[email protected]>
- Loading branch information
1 parent
c26f014
commit 776e55a
Showing
2 changed files
with
83 additions
and
133 deletions.
There are no files selected for viewing
163 changes: 62 additions & 101 deletions
163
integration-tests/e2e-tests/src/abilities/WalletSdk.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,196 +1,157 @@ | ||
import { Ability, Discardable, Initialisable, Interaction, Question, QuestionAdapter } from "@serenity-js/core" | ||
import SDK from "@atala/prism-wallet-sdk" | ||
import { Message } from "@atala/prism-wallet-sdk/build/typings/domain" | ||
import axios from "axios" | ||
import { CloudAgentConfiguration } from "../configuration/CloudAgentConfiguration" | ||
import { Utils } from "../Utils" | ||
import { InMemoryStore } from "../configuration/InMemoryStore" | ||
const { | ||
Agent, | ||
ApiImpl, | ||
Apollo, | ||
BasicMediatorHandler, | ||
Castor, | ||
ConnectionsManager, | ||
DIDCommWrapper, | ||
Domain, ListenerKey, | ||
Mercury, | ||
PublicMediatorStore | ||
} = SDK; | ||
import { Ability, Discardable, Initialisable, Interaction, Question, QuestionAdapter } from "@serenity-js/core"; | ||
import SDK from "@atala/prism-wallet-sdk"; | ||
import { Message } from "@atala/prism-wallet-sdk/build/typings/domain"; | ||
import axios from "axios"; | ||
import { CloudAgentConfiguration } from "../configuration/CloudAgentConfiguration"; | ||
import { Utils } from "../Utils"; | ||
import { InMemoryStore } from "../configuration/InMemoryStore"; | ||
|
||
const { Agent, Apollo, Domain, ListenerKey, } = SDK; | ||
|
||
export class WalletSdk extends Ability implements Initialisable, Discardable { | ||
sdk!: SDK.Agent | ||
messages: MessageQueue = new MessageQueue() | ||
sdk!: SDK.Agent; | ||
messages: MessageQueue = new MessageQueue(); | ||
|
||
static async withANewInstance(): Promise<Ability> { | ||
const instance: SDK.Agent = await Utils.retry(2, async () => { | ||
return await WalletSdkBuilder.createInstance() | ||
}) | ||
return new WalletSdk(instance) | ||
return await WalletSdkBuilder.createInstance(); | ||
}); | ||
return new WalletSdk(instance); | ||
} | ||
|
||
constructor(sdk: SDK.Agent) { | ||
super() | ||
this.sdk = sdk | ||
super(); | ||
this.sdk = sdk; | ||
} | ||
|
||
static credentialOfferStackSize(): QuestionAdapter<number> { | ||
return Question.about("credential offer stack", actor => { | ||
return WalletSdk.as(actor).messages.credentialOfferStack.length | ||
}) | ||
return WalletSdk.as(actor).messages.credentialOfferStack.length; | ||
}); | ||
} | ||
|
||
static issuedCredentialStackSize(): QuestionAdapter<number> { | ||
return Question.about("issued credential stack", actor => { | ||
return WalletSdk.as(actor).messages.issuedCredentialStack.length | ||
}) | ||
return WalletSdk.as(actor).messages.issuedCredentialStack.length; | ||
}); | ||
} | ||
|
||
static proofOfRequestStackSize(): QuestionAdapter<number> { | ||
return Question.about("proof of request stack", actor => { | ||
return WalletSdk.as(actor).messages.proofRequestStack.length | ||
}) | ||
return WalletSdk.as(actor).messages.proofRequestStack.length; | ||
}); | ||
} | ||
|
||
static execute(callback: (sdk: SDK.Agent, messages: { | ||
credentialOfferStack: Message[], | ||
issuedCredentialStack: Message[], | ||
proofRequestStack: Message[] | ||
credentialOfferStack: Message[]; | ||
issuedCredentialStack: Message[]; | ||
proofRequestStack: Message[]; | ||
}) => Promise<void>): Interaction { | ||
return Interaction.where("#actor uses wallet sdk", async actor => { | ||
await callback(WalletSdk.as(actor).sdk, { | ||
credentialOfferStack: WalletSdk.as(actor).messages.credentialOfferStack, | ||
issuedCredentialStack: WalletSdk.as(actor).messages.issuedCredentialStack, | ||
proofRequestStack: WalletSdk.as(actor).messages.proofRequestStack | ||
}) | ||
}) | ||
}); | ||
}); | ||
} | ||
|
||
async discard(): Promise<void> { | ||
await this.sdk.stop() | ||
await this.sdk.stop(); | ||
} | ||
|
||
async initialise(): Promise<void> { | ||
this.sdk.addListener( | ||
ListenerKey.MESSAGE, (messages: SDK.Domain.Message[]) => { | ||
for (const message of messages) { | ||
this.messages.enqueue(message) | ||
this.messages.enqueue(message); | ||
} | ||
} | ||
) | ||
); | ||
|
||
await this.sdk.start() | ||
await this.sdk.start(); | ||
} | ||
|
||
isInitialised(): boolean { | ||
return this.sdk.state != "stopped" | ||
return this.sdk.state != "stopped"; | ||
} | ||
} | ||
|
||
class WalletSdkBuilder { | ||
private static async getMediatorDidThroughOob(): Promise<string> { | ||
const response = await axios.get(CloudAgentConfiguration.mediatorOobUrl) | ||
const encodedData = response.data.split("?_oob=")[1] | ||
const oobData = JSON.parse(Buffer.from(encodedData, "base64").toString()) | ||
return oobData.from | ||
const response = await axios.get(CloudAgentConfiguration.mediatorOobUrl); | ||
const encodedData = response.data.split("?_oob=")[1]; | ||
const oobData = JSON.parse(Buffer.from(encodedData, "base64").toString()); | ||
return oobData.from; | ||
} | ||
|
||
static async createInstance() { | ||
const apollo = new Apollo() | ||
const castor = new Castor(apollo); | ||
const store = new InMemoryStore() | ||
const apollo = new Apollo(); | ||
const store = new InMemoryStore(); | ||
const pluto = new SDK.Pluto(store, apollo); | ||
await pluto.start() | ||
|
||
const api = new ApiImpl() | ||
const didcomm = new DIDCommWrapper(apollo, castor, pluto) | ||
const mercury = new Mercury(castor, didcomm, api) | ||
|
||
const mediatorDID = Domain.DID.fromString(await WalletSdkBuilder.getMediatorDidThroughOob()) | ||
const mediatorStore = new PublicMediatorStore(pluto) | ||
|
||
const mediatorHandler = new BasicMediatorHandler( | ||
mediatorDID, | ||
mercury, | ||
mediatorStore, | ||
) | ||
|
||
const connectionsManager = new ConnectionsManager( | ||
castor, | ||
mercury, | ||
pluto, | ||
mediatorHandler, | ||
) | ||
|
||
const seed = apollo.createRandomSeed().seed | ||
return new Agent( | ||
apollo, | ||
castor, | ||
pluto, | ||
mercury, | ||
mediatorHandler, | ||
connectionsManager, | ||
seed, | ||
) | ||
const mediatorDID = Domain.DID.fromString(await WalletSdkBuilder.getMediatorDidThroughOob()); | ||
|
||
return Agent.initialize({ apollo, pluto, mediatorDID }); | ||
} | ||
} | ||
|
||
/** | ||
* Helper class for message queueing processor | ||
*/ | ||
class MessageQueue { | ||
private processingId: NodeJS.Timeout | null = null | ||
private queue: Message[] = [] | ||
private processingId: NodeJS.Timeout | null = null; | ||
private queue: Message[] = []; | ||
|
||
credentialOfferStack: Message[] = [] | ||
proofRequestStack: Message[] = [] | ||
issuedCredentialStack: Message[] = [] | ||
receivedMessages: string[] = [] | ||
credentialOfferStack: Message[] = []; | ||
proofRequestStack: Message[] = []; | ||
issuedCredentialStack: Message[] = []; | ||
receivedMessages: string[] = []; | ||
|
||
enqueue(message: Message) { | ||
this.queue.push(message) | ||
this.queue.push(message); | ||
|
||
// auto start processing messages | ||
if (!this.processingId) { | ||
this.processMessages() | ||
this.processMessages(); | ||
} | ||
} | ||
|
||
dequeue(): Message { | ||
return this.queue.shift()! | ||
return this.queue.shift()!; | ||
} | ||
|
||
// Check if the queue is empty | ||
isEmpty(): boolean { | ||
return this.queue.length === 0 | ||
return this.queue.length === 0; | ||
} | ||
|
||
// Get the number of messages in the queue | ||
size(): number { | ||
return this.queue.length | ||
return this.queue.length; | ||
} | ||
|
||
processMessages() { | ||
this.processingId = setInterval(() => { | ||
if (!this.isEmpty()) { | ||
const message: Message = this.dequeue() | ||
const message: Message = this.dequeue(); | ||
// checks if sdk already received message | ||
if (this.receivedMessages.includes(message.id)) { | ||
return | ||
return; | ||
} | ||
|
||
this.receivedMessages.push(message.id) | ||
this.receivedMessages.push(message.id); | ||
|
||
if (message.piuri.includes("/offer-credential")) { | ||
this.credentialOfferStack.push(message) | ||
this.credentialOfferStack.push(message); | ||
} else if (message.piuri.includes("/present-proof")) { | ||
this.proofRequestStack.push(message) | ||
this.proofRequestStack.push(message); | ||
} else if (message.piuri.includes("/issue-credential")) { | ||
this.issuedCredentialStack.push(message) | ||
this.issuedCredentialStack.push(message); | ||
} | ||
} else { | ||
clearInterval(this.processingId!) | ||
this.processingId = null | ||
clearInterval(this.processingId!); | ||
this.processingId = null; | ||
} | ||
}, 50) | ||
}, 50); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters