-
Notifications
You must be signed in to change notification settings - Fork 200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: add AgentMessageSentEvent and associate records to outbound messages #1099
feat!: add AgentMessageSentEvent and associate records to outbound messages #1099
Conversation
Signed-off-by: Ariel Gentile <[email protected]>
Signed-off-by: Ariel Gentile <[email protected]>
Codecov Report
@@ Coverage Diff @@
## main #1099 +/- ##
==========================================
- Coverage 88.28% 88.25% -0.04%
==========================================
Files 705 706 +1
Lines 16399 16450 +51
Branches 2657 2666 +9
==========================================
+ Hits 14478 14518 +40
- Misses 1914 1925 +11
Partials 7 7
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the overall changes in this PR. It should be marked as breaking as custom modules also use the createOutboundMessage method (but also see my comment on making a class).
I'm thikning whether we could broaden the scope of the AgentMessageSent event to also be emitted when sending failed, but include a status of the send status. E.g. in ACA-Py when you call send it returns an enum with the following states:
class OutboundSendStatus(Enum):
"""Send status of outbound messages."""
# Could directly send the message to the connection over active session
SENT_TO_SESSION = "sent_to_session"
# Message is sent to external queue. We don't know how it will process the queue
SENT_TO_EXTERNAL_QUEUE = "sent_to_external_queue"
# Message is queued for delivery using outbound transport (recipient has endpoint)
QUEUED_FOR_DELIVERY = "queued_for_delivery"
# No endpoint available.
# Need to wait for the recipient to connect with return routing for delivery
WAITING_FOR_PICKUP = "waiting_for_pickup"
# No endpoint available, and no internal queue for messages.
UNDELIVERABLE = "undeliverable"
@property
def topic(self):
"""Return an event topic associated with a given status."""
return f"{OUTBOUND_STATUS_PREFIX}{self.value}"
Needs to be adapted for our states, but I think it could help, also if we add other types of sending (e.g. to an external queue) in the future
packages/core/src/agent/helpers.ts
Outdated
export function createOutboundMessage<T extends AgentMessage = AgentMessage>( | ||
connection: ConnectionRecord, | ||
payload: T, | ||
export function createOutboundMessage<T extends AgentMessage = AgentMessage>(options: { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's been on my todo list for a really long time to make the outbound message a class instead of an object so we can add getters / helper methods etc.. to it. It's feel more in line with the rest of the framework, and we also use a class for the InboundMessageContext. What if we make this an OutboundMessageContext
class instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think this OutboundMessage is so widely used that it deserves its own class. I'm not completely sure how it would look like if using it as a Context. Would you say that:
const outboundMessage = createOutboundMessage({ connection, payload: message, associatedRecord: record })
await this.messageSender.sendMessage(this.agentContext, outboundMessage)
Would become something like the following?:
const outboundMessageContext = new OutboundMessageContext({ agentContext, connection, payload: message, associatedRecord: record })
await this.messageSender.sendMessage(outboundMessageContext)
Seems good to me.
Thanks for the comment @TimoGlastra !
I think this is a good idea. Actually, I have put the message as a dedicated attribute of the event payload considering that in the near future we would add other attributes such as the status that you are describing. I was expecting to handle this later, but making this status attribute mandatory from the start will avoid confusions about the meaning of this event, because otherwise one would think that every time |
Signed-off-by: Ariel Gentile <[email protected]>
Signed-off-by: Ariel Gentile <[email protected]>
Signed-off-by: Ariel Gentile <[email protected]>
…ssages (openwallet-foundation#1099) Signed-off-by: Ariel Gentile <[email protected]>
Following the strategy of #1045, this PR adds associatedRecord to each outbound message generated by most relevant modules such as proofs, credentials, action-menu and question-answer. This allows the calling application to immediately determine and take actions when a message could not be sent (for instance, can retry it afterwards or delete the record).
Also, as a complement to this a new kind of AgentEvent is added: AgentMessageSent, emitted by MessageSender when it handles an outbound message, being it passed to a session, a transporter or queued for pickup, or even failed. This can be used to keep track of messages in a central event handler, and inform the UI/application about their status (also can be used to retry messages). In a near future we can use this event to bind an outbound message to the encrypted package that is actually being sent or stored in the session/transport/message repository (e.g. the assigned queue ID), so it can be completely tracked.
You will note that there are plenty of files changed in this PR. That's because utility functions
createOutboundMessage
andcreateOutboundServiceMessage
were refactored to the OutboundMessageContext class. As these functions were exposed and used widely, this represents a breaking change.