Skip to content

Commit

Permalink
Merge pull request #7 from Indicio-tech/feature/message-wrapper
Browse files Browse the repository at this point in the history
Feature/message wrapper
  • Loading branch information
dbluhm authored Dec 16, 2021
2 parents 55393f5 + e86a060 commit e5c2d24
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 932 deletions.
4 changes: 2 additions & 2 deletions acapy_plugin_pickup/acapy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Tools for writing plugins for ACA-Py"""

from .message import AgentMessage, Thread
from .message import AgentMessage, Thread, Attach, AttachData

__all__ = ["AgentMessage", "Thread"]
__all__ = ["AgentMessage", "Thread", "Attach", "AttachData"]
89 changes: 85 additions & 4 deletions acapy_plugin_pickup/acapy/message.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,105 @@
"""Simple Agent Message class."""

from abc import ABC
from datetime import datetime
import json
import logging
from typing import Any, ClassVar, Dict, Mapping, Optional
from typing import Any, ClassVar, Dict, Mapping, Optional, Union
from typing_extensions import Annotated, Literal
from uuid import uuid4
import uuid

from aries_cloudagent.messaging.base_message import BaseMessage
from aries_cloudagent.messaging.base_handler import BaseHandler
from aries_cloudagent.messaging.base_message import BaseMessage
from aries_cloudagent.messaging.request_context import RequestContext
from aries_cloudagent.messaging.responder import BaseResponder
from aries_cloudagent.wallet.util import bytes_to_b64
from pydantic import BaseModel, Field, parse_obj_as
from pydantic.class_validators import validator
from typing_extensions import Annotated, Literal
from pydantic.class_validators import validator, root_validator
from pydantic.types import StrictInt

from acapy_plugin_pickup.valid import ISODateTime


LOGGER = logging.getLogger(__name__)


class AttachData(BaseModel):
class Config:
allow_population_by_field_name = True

base64: Annotated[Optional[str], Field(description="Base64-encoded data")] = None
json_: Annotated[
Optional[dict], Field(description="JSON-serialized data", alias="json")
] = None
links: Annotated[
Optional[str], Field(description="List of hypertext links to data")
] = None

@root_validator(pre=True)
@classmethod
def _mutual_exclusion_validate(cls, values):
if len(set(values.keys()) & {"base64", "json", "links", "json_"}) != 1:
raise ValueError("AttachData: choose exactly one of base64, json, or links")
return values


class Attach(BaseModel):
ident: Annotated[str, Field(alias="@id")]
mime_type: Annotated[str, Field(alias="mime-type")]
filename: Optional[str] = None
byte_count: Optional[StrictInt] = None
lastmod_time: Optional[datetime]
description: Optional[str]
data: AttachData

class Config:
allow_population_by_field_name = True

@classmethod
def data_base64(
cls,
value: Union[Mapping, bytes, str],
*,
ident: str = None,
description: str = None,
filename: str = None,
lastmod_time: str = None,
byte_count: int = None,
):
"""
Create `AttachDecorator` instance on base64-encoded data from input mapping.
Given mapping, JSON dump, base64-encode, and embed
it as data; mark `application/json` MIME type.
Args:
mapping: (dict) data structure; e.g., indy production
ident: optional attachment identifier (default random UUID4)
description: optional attachment description
filename: optional attachment filename
lastmod_time: optional attachment last modification time
byte_count: optional attachment byte count
"""
if isinstance(value, Mapping):
value = json.dumps(value).encode()
if isinstance(value, str):
value = value.encode()

attach_data = AttachData(base64=bytes_to_b64(value))

return cls(
ident=ident or str(uuid.uuid4()),
description=description,
filename=filename,
mime_type="application/json",
lastmod_time=lastmod_time,
byte_count=byte_count,
data=attach_data,
)


class Thread(BaseModel):
thid: Annotated[
Optional[str],
Expand Down
57 changes: 38 additions & 19 deletions acapy_plugin_pickup/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import logging
import json
from typing import Optional, List, Set, cast
from typing import Optional, List, Sequence, Set, cast
from typing_extensions import Annotated
from pydantic import Field

from aries_cloudagent.messaging.request_context import RequestContext
from aries_cloudagent.messaging.responder import BaseResponder
Expand All @@ -14,7 +16,7 @@
from aries_cloudagent.transport.outbound.message import OutboundMessage
from aries_cloudagent.transport.wire_format import BaseWireFormat

from .acapy import AgentMessage
from .acapy import AgentMessage, Attach
from .acapy.error import HandlerException
from .valid import ISODateTime

Expand Down Expand Up @@ -90,6 +92,7 @@ async def handle(self, context: RequestContext, responder: BaseResponder):
assert manager
queue = manager.undelivered_queue
key = context.message_receipt.sender_verkey
message_attachments = []

if queue.has_message_for_key(key):
session = self.determine_session(manager, key)
Expand All @@ -110,11 +113,12 @@ async def handle(self, context: RequestContext, responder: BaseResponder):
sender_key = msg.target_list[0].sender_key or key

# Depending on send_outbound() implementation, there is a
# race condition with the timestamp When ACA-Py is under
# race condition with the timestamp. When ACA-Py is under
# load, there is a potential for this encryption to not
# match the actual encryption
# TODO: update ACA-Py to store all messages with an
# encrypted payload

msg.enc_payload = await wire_format.encode_message(
profile_session,
msg.payload,
Expand All @@ -123,23 +127,37 @@ async def handle(self, context: RequestContext, responder: BaseResponder):
sender_key,
)

if session.accept_response(msg):
returned_count += 1
else:
LOGGER.warning(
"Failed to return message to session when we were "
"expecting it would work"
)
attached_msg = Attach.data_base64(
ident=json.loads(msg.enc_payload)["tag"], value=msg.enc_payload
)
message_attachments.append(attached_msg)
returned_count += 1

if returned_count >= self.limit:
break
return

count = manager.undelivered_queue.message_count_for_key(
context.message_receipt.sender_verkey
)
response = Status(message_count=count)
response.assign_thread_from(self)
await responder.send_reply(response)
response = Delivery(message_attachments=message_attachments)
response.assign_thread_from(self)
await responder.send_reply(response)

else:
response = Status(recipient_key=self.recipient_key, message_count=0)
response.assign_thread_from(self)
await responder.send_reply(response)


class Delivery(AgentMessage):
"""Message wrapper for delivering messages to a recipient."""

class Config:
allow_population_by_field_name = True

message_type = f"{PROTOCOL}/delivery"

recipient_key: Optional[str] = None
message_attachments: Annotated[
Sequence[Attach], Field(description="Attached messages", alias="~attach")
]


# This is the start of a message updating the Live Delivery status
Expand All @@ -148,6 +166,7 @@ class LiveDeliveryChange(AgentMessage):
"""Live Delivery Change message."""

message_type = f"{PROTOCOL}/live-delivery-change"

live_delivery: bool = False

async def handle(self, context: RequestContext, responder: BaseResponder):
Expand All @@ -159,7 +178,7 @@ class MessagesReceived(AgentMessage):
"""MessageReceived acknowledgement message."""

message_type = f"{PROTOCOL}/messages-received"
message_tag_list: Set[str]
message_id_list: Set[str]

@staticmethod
def determine_session(manager: InboundTransportManager, key: str):
Expand All @@ -184,7 +203,7 @@ async def handle(self, context: RequestContext, responder: BaseResponder):
key = context.message_receipt.sender_verkey

if queue.has_message_for_key(key):
remove_message_by_tag_list(queue, key, self.message_tag_list)
remove_message_by_tag_list(queue, key, self.message_id_list)

response = Status(message_count=queue.message_count_for_key(key))
response.assign_thread_from(self)
Expand Down
Loading

0 comments on commit e5c2d24

Please sign in to comment.