From 29a587d40a012b61ec91dfd72be920cf69a05b56 Mon Sep 17 00:00:00 2001 From: Fabio Pinheiro Date: Thu, 30 Nov 2023 09:47:00 +0000 Subject: [PATCH] Pickup Protocol - Support for Live Mode #145 (#184) Pickup Protocol - Support for Live Mode #145 Update scala-did to 0.1.0-M16 Signed-off-by: Fabio --- Mediator-Error_Handling.md | 32 ++--- build.sbt | 2 +- .../mediator/AgentExecutorMediator.scala | 23 +++- .../iohk/atala/mediator/DIDCommRoutes.scala | 2 + .../atala/mediator/MediatorStandalone.scala | 6 +- .../mediator/MediatorTransportManager.scala | 127 ++++++++++++++++++ .../io/iohk/atala/mediator/OperatorImp.scala | 3 +- .../protocols/ForwardMessageExecuter.scala | 25 +++- .../mediator/protocols/PickupExecuter.scala | 79 +++++++++-- .../MediatorTransportManagerUtil.scala | 14 ++ .../iohk/atala/mediator/TransportUtil.scala | 61 +++++++++ .../ForwardMessageExecutorSpec.scala | 7 +- .../mediator/protocols/MessageSetup.scala | 22 +++ .../protocols/PickupExecuterSpec.scala | 69 ++++++++-- 14 files changed, 414 insertions(+), 58 deletions(-) create mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/MediatorTransportManager.scala create mode 100644 mediator/src/test/scala/io/iohk/atala/mediator/MediatorTransportManagerUtil.scala create mode 100644 mediator/src/test/scala/io/iohk/atala/mediator/TransportUtil.scala diff --git a/Mediator-Error_Handling.md b/Mediator-Error_Handling.md index 8be52319..035c150e 100644 --- a/Mediator-Error_Handling.md +++ b/Mediator-Error_Handling.md @@ -12,22 +12,22 @@ https://identity.foundation/didcomm-messaging/spec/#problem-reports This table defines the expected behavior of the mediator in different scenarios not covered by the specifications. -| Mediators | Atala Mediator | Roadmap Atala Mediator | │ | RootsId | Blocktrust | -|-------------|----------------|------------------------|---|---------|------------| -| Scenario G1 | G1C | - | │ | ? | ? | -| Scenario G2 | G2B [ATL-5840] | - | │ | | ? | -| Scenario G3 | Fallback G2B | [TODO] G3B | │ | | ? | -| Scenario G4 | G4B | - | │ | | ? | -| Scenario G5 | Fallback G4B | [TODO] G5B | │ | | ? | -| Scenario G6 | Fallback G4B | [WIP] G6B | │ | | ? | -| Scenario G7 | Fallback G4B | [TODO] G7B | │ | | ? | -| Scenario G8 | G8C | - | │ | | ? | -| | | | | | | -| Scenario M1 | M1B | - | │ | | ? | -| Scenario M2 | M2B | - | │ | | ? | -| Scenario M3 | Fallback G4 | M3B | │ | | ? | -| Scenario M4 | M4B | - | │ | | ? | -| Scenario M5 | M5A | [TODO] M5B | │ | | ? | +| Mediators | Atala Mediator | Roadmap Atala Mediator | +|-------------|----------------|------------------------| +| Scenario G1 | G1C | - | +| Scenario G2 | G2B [ATL-5840] | - | +| Scenario G3 | Fallback G2B | [TODO] G3B | +| Scenario G4 | G4B | - | +| Scenario G5 | Fallback G4B | [TODO] G5B | +| Scenario G6 | Fallback G4B | G6B | +| Scenario G7 | Fallback G4B | [TODO] G7B | +| Scenario G8 | G8C | - | +| | | | +| Scenario M1 | M1B | - | +| Scenario M2 | M2B | - | +| Scenario M3 | Fallback G4 | M3B | +| Scenario M4 | M4B | - | +| Scenario M5 | M5A | M5B [#145] | ### Scenarios Description diff --git a/build.sbt b/build.sbt index 2ec320c9..da4264ec 100644 --- a/build.sbt +++ b/build.sbt @@ -9,7 +9,7 @@ inThisBuild( /** Versions */ lazy val V = new { - val scalaDID = "0.1.0-M15" + val scalaDID = "0.1.0-M16" // FIXME another bug in the test framework https://github.com/scalameta/munit/issues/554 val munit = "1.0.0-M10" // "0.7.29" diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala b/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala index c842b1c0..93be459a 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala @@ -16,7 +16,7 @@ import fmgp.did.comm.protocol.reportproblem2.ProblemReport case class AgentExecutorMediator( agent: Agent, - transportManager: Ref[TransportManager], + transportManager: Ref[MediatorTransportManager], protocolHandler: ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError], userAccountRepo: UserAccountRepo, messageItemRepo: MessageItemRepo, @@ -54,14 +54,18 @@ case class AgentExecutorMediator( .tapError(ex => ZIO.logError(ex.toString)) .provideSomeLayer(this.indentityLayer) .provideSomeLayer(userAccountRepoLayer ++ messageItemRepoLayer) - .provideSomeEnvironment((e: ZEnvironment[Resolver & Operations]) => e ++ ZEnvironment(protocolHandler)) + .provideSomeEnvironment((e: ZEnvironment[Resolver & Operations]) => + e ++ ZEnvironment(protocolHandler) ++ ZEnvironment(transportManager) + ) .orDieWith(ex => new RuntimeException(ex.toString)) def receiveMessage( msg: SignedMessage | EncryptedMessage, transport: TransportDIDComm[Any] ): ZIO[ - OperatorImp.Services & ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError], + Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager] & + // instead of OperatorImp.Services + ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError], MediatorError | StorageError, Unit ] = ZIO.logAnnotate("msg_sha256", msg.sha256) { @@ -112,7 +116,9 @@ case class AgentExecutorMediator( pMsgOrProblemReport: Either[ProblemReport, PlaintextMessage], transport: TransportDIDComm[Any] ): ZIO[ - ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError] & OperatorImp.Services, + Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager] & + // instead of OperatorImp.Services + ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError], MediatorError | StorageError, Unit ] = @@ -175,7 +181,12 @@ case class AgentExecutorMediator( case Some(problemReport) => ZIO.succeed(Reply(problemReport.toPlaintextMessage)) case None => protocolHandler - .program(plaintextMessage) + .program(plaintextMessage) // should we change the signature of the method or use the ZEnvironment + .provideSomeEnvironment( + (e: ZEnvironment[ + Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager] + ]) => e ++ ZEnvironment(transport) + ) .catchSome { case ProtocolExecutionFailToParse(failToParse) => for { _ <- ZIO.logWarning(s"Error ProtocolExecutionFailToParse: $failToParse") @@ -246,7 +257,7 @@ object AgentExecutorMediator { messageItemRepo: MessageItemRepo, ): ZIO[TransportFactory, Nothing, AgentExecutar] = for { - transportManager <- TransportManager.make + transportManager <- MediatorTransportManager.make mediator = AgentExecutorMediator(agent, transportManager, protocolHandler, userAccountRepo, messageItemRepo) } yield mediator diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/DIDCommRoutes.scala b/mediator/src/main/scala/io/iohk/atala/mediator/DIDCommRoutes.scala index 79830b68..e6b74a6c 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/DIDCommRoutes.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/DIDCommRoutes.scala @@ -46,6 +46,8 @@ object DIDCommRoutes { inboundQueue <- Queue.bounded[SignedMessage | EncryptedMessage](1) outboundQueue <- Queue.bounded[SignedMessage | EncryptedMessage](1) transport = new TransportDIDComm[Any] { + def transmissionFlow = Transport.TransmissionFlow.BothWays + def transmissionType = Transport.TransmissionType.SingleTransmission def id: TransportID = TransportID.http(req.headers.get("request_id")) def inbound: ZStream[Any, Transport.InErr, SignedMessage | EncryptedMessage] = ZStream.fromQueue(inboundQueue) diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/MediatorStandalone.scala b/mediator/src/main/scala/io/iohk/atala/mediator/MediatorStandalone.scala index 2e59d326..c764f7e8 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/MediatorStandalone.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/MediatorStandalone.scala @@ -6,22 +6,22 @@ import fmgp.did.* import fmgp.did.comm.* import fmgp.did.comm.protocol.* import fmgp.did.method.peer.* +import fmgp.did.framework.TransportFactoryImp import io.iohk.atala.mediator.db.* import io.iohk.atala.mediator.protocols.* import zio.* +import zio.stream.* import zio.config.* import zio.config.magnolia.* import zio.config.typesafe.* import zio.http.* import zio.json.* +import zio.logging.* import zio.logging.LogFormat.* import zio.logging.backend.SLF4J -import zio.logging.* -import zio.stream.* import java.time.format.DateTimeFormatter import scala.io.Source -import fmgp.did.framework.TransportFactoryImp case class MediatorConfig(endpoints: String, keyAgreement: OKPPrivateKey, keyAuthentication: OKPPrivateKey) { val did = DIDPeer2.makeAgent( Seq(keyAgreement, keyAuthentication), diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/MediatorTransportManager.scala b/mediator/src/main/scala/io/iohk/atala/mediator/MediatorTransportManager.scala new file mode 100644 index 00000000..2f4d7874 --- /dev/null +++ b/mediator/src/main/scala/io/iohk/atala/mediator/MediatorTransportManager.scala @@ -0,0 +1,127 @@ +package io.iohk.atala.mediator + +import zio._ +import zio.json._ +import zio.stream._ + +import fmgp.did._ +import fmgp.did.comm._ +import fmgp.did.framework._ +import fmgp.crypto.error._ +import fmgp.util._ + +type TransportID = String + +/** Based on the [[fmgp.did.framework.TransportDispatcher]] */ +case class MediatorTransportManager( + transports: Set[TransportDIDComm[Any]] = Set.empty, + ids: Map[FROMTO, Set[TransportID]] = Map.empty, + kids: Map[VerificationMethodReferenced, Set[TransportID]] = Map.empty, + liveMode: Map[FROMTO, Set[TransportID]] = Map.empty, + transportFactory: TransportFactory +) extends TransportDispatcher { + + override def openTransport(uri: String): UIO[TransportDIDComm[Any]] = + transportFactory.openTransport(uri) // FIXME TODO register Transport + + def link(vmr: VerificationMethodReferenced, transportID: TransportID): MediatorTransportManager = + if (!transports.map(_.id).contains(transportID)) this // if transport is close + else + kids.get(vmr) match + case Some(seq) if seq.contains(transportID) => this + case Some(seq) => this.copy(kids = kids + (vmr -> (seq + transportID))).link(vmr.did.asFROMTO, transportID) + case None => this.copy(kids = kids + (vmr -> Set(transportID))).link(vmr.did.asFROMTO, transportID) + + def link(from: FROMTO, transport: TransportDIDComm[Any]): MediatorTransportManager = link(from, transport.id) + def link(from: FROMTO, transportID: TransportID): MediatorTransportManager = + if (!transports.map(_.id).contains(transportID)) this // if transport is close + else + ids.get(from) match + case Some(seq) if seq.contains(transportID) => this + case Some(seq) => this.copy(ids = ids + (from -> (seq + transportID))) + case None => this.copy(ids = ids + (from -> Set(transportID))) + + def registerTransport(transport: TransportDIDComm[Any]) = + this.copy(transports = transports + transport) + + def unregisterTransport(transportID: TransportID) = this.copy( + transports = transports.filter(_.id != transportID), + ids = ids.map { case (did, ids) => (did, ids.filter(_ != transportID)) }.filterNot(_._2.isEmpty), + kids = kids.map { case (kid, ids) => (kid, ids.filter(_ != transportID)) }.filterNot(_._2.isEmpty), + liveMode = liveMode.map { case (did, ids) => (did, ids.filter(_ != transportID)) }.filterNot(_._2.isEmpty), + ) + + def enableLiveMode(subject: FROMTO, transportID: TransportID): MediatorTransportManager = + this.copy( + liveMode = liveMode.updatedWith(subject) { + case Some(set) => Some(set - transportID).filter(_.isEmpty) + case None => None + } + ) + + def disableLiveMode(subject: FROMTO, transportID: TransportID): MediatorTransportManager = + this.copy( + liveMode = liveMode.updatedWith(subject) { + case Some(set) => Some(set + transportID) + case None => Some(Set(transportID)) + } + ) + + def getLiveModeEnableConnections(subject: FROMTO): Seq[TransportDIDComm[Any]] = + liveMode.get(subject).toSeq.flatMap(transportId => transports.filter(t => transportId.contains(t.id))) + + def sendForLiveMode( + next: TO, + msg: /*lazy*/ => SignedMessage | EncryptedMessage + ): ZIO[Any, DidFail, Iterable[Unit]] = { + val transportIDs = this.liveMode.getOrElse(next.asFROMTO, Seq.empty) + val myChannels = transportIDs.flatMap(id => this.transports.find(_.id == id)) + ZIO.foreach(myChannels) { _.send(msg) } + } + + // TODO maybe rename to send + def publish(to: TO, msg: SignedMessage | EncryptedMessage): ZIO[Any, Nothing, Iterable[Unit]] = { + val transportIDs = this.ids.getOrElse(to.asFROMTO, Seq.empty) + val myChannels = transportIDs.flatMap(id => this.transports.find(_.id == id)) + ZIO.foreach(myChannels) { _.send(msg) } + } + + override def send( + to: TO, + msg: SignedMessage | EncryptedMessage, + thid: Option[MsgID], // TODO use + pthid: Option[MsgID], // TODO use + ): ZIO[Resolver & Agent & Operations, DidFail, Unit] = + sendViaDIDCommMessagingService(to, msg).unit + + override def sendViaDIDCommMessagingService( + to: TO, + msg: SignedMessage | EncryptedMessage + ): ZIO[Resolver & Agent & Operations, DidFail, Either[String, TransportDIDComm[Any]]] = + super.sendViaDIDCommMessagingService(to, msg) + +} + +object MediatorTransportManager { + + def make: URIO[TransportFactory, Ref[MediatorTransportManager]] = + for { + transportFactory <- ZIO.service[TransportFactory] + ref <- Ref.make(MediatorTransportManager(transportFactory = transportFactory)) + } yield ref + + def registerTransport(transport: TransportDIDComm[Any]) = + for { + socketManager <- ZIO.service[Ref[MediatorTransportManager]] + _ <- socketManager.update { _.registerTransport(transport) } + _ <- ZIO.log(s"RegisterTransport concluded") + } yield () + + def unregisterTransport(transportId: String) = + for { + socketManager <- ZIO.service[Ref[MediatorTransportManager]] + _ <- socketManager.update { case sm: MediatorTransportManager => sm.unregisterTransport(transportId) } + _ <- ZIO.log(s"Channel unregisterSocket") + } yield () + +} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala b/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala index 806cdcf4..9c47e96b 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala @@ -12,7 +12,8 @@ import io.iohk.atala.mediator.protocols.* import io.iohk.atala.mediator.db.{UserAccountRepo, MessageItemRepo} object OperatorImp { - type Services = Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo + type Services = Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager] & + TransportDIDComm[Any] val protocolHandlerLayer: ULayer[ ProtocolExecuter[Services, MediatorError | StorageError] diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala index 7ff5a554..f8c94937 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala @@ -10,9 +10,13 @@ import io.iohk.atala.mediator.* import io.iohk.atala.mediator.db.* import zio.* import zio.json.* +import fmgp.did.comm.protocol.pickup3.MessageDelivery object ForwardMessageExecuter - extends ProtocolExecuter[Agent & UserAccountRepo & MessageItemRepo, MediatorError | StorageError] { + extends ProtocolExecuter[ + Resolver & Operations & Agent & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager], + MediatorError | StorageError + ] { override def supportedPIURI: Seq[PIURI] = Seq(ForwardMessage.piuri) @@ -34,6 +38,25 @@ object ForwardMessageExecuter for { _ <- repoMessageItem.insert(m.msg) _ <- ZIO.logInfo("Add next msg (of the ForwardMessage) to the Message Repo") + + // For Live Mode + mediatorTransportManager <- ZIO.service[Ref[MediatorTransportManager]].flatMap(_.get) + agent <- ZIO.service[Agent] + messageDelivery = MessageDelivery( + thid = m.id, // FIXME what should I put here? + from = agent.id.asFROM, // Mediator agent + to = m.next.asTO, // Destination of the message that is being forward + recipient_did = None, + attachments = Map( + m.msg.sha256 -> m.msg + ) + ).toPlaintextMessage + eMsgDelivery <- Operations + .authEncrypt(messageDelivery) + .mapError(didFail => MediatorDidError(didFail)) + _ <- mediatorTransportManager + .sendForLiveMode(m.next.asTO, eMsgDelivery) + .mapError(didFail => MediatorDidError(didFail)) } yield NoReply } else { for { diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala index 6ade8c30..acce9484 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala @@ -10,8 +10,14 @@ import io.iohk.atala.mediator.* import io.iohk.atala.mediator.db.* import zio.* import zio.json.* +import fmgp.did.framework._ +import fmgp.did.framework.Transport.TransmissionType -object PickupExecuter extends ProtocolExecuter[UserAccountRepo & MessageItemRepo, MediatorError | StorageError] { +object PickupExecuter + extends ProtocolExecuter[ + UserAccountRepo & MessageItemRepo & TransportDIDComm[Any] & Ref[MediatorTransportManager], + MediatorError | StorageError + ] { override def supportedPIURI: Seq[PIURI] = Seq( StatusRequest.piuri, @@ -176,19 +182,64 @@ object PickupExecuter extends ProtocolExecuter[UserAccountRepo & MessageItemRepo ) } yield NoReply case m: LiveModeChange => - ZIO.logInfo("LiveModeChange Not Supported") *> - ZIO.succeed( - Reply( - Problems - .liveModeNotSupported( - from = m.to.asFROM, - to = m.from.asTO, - pthid = m.id, - piuri = m.piuri, - ) - .toPlaintextMessage - ) - ) + for { + _ <- ZIO.logInfo("LiveModeChange") + // For Live Mode + refMediatorTransportManager <- ZIO.service[Ref[MediatorTransportManager]] + transport <- ZIO.service[TransportDIDComm[Any]] + _ <- ZIO.log(s"The transport's transmissionType is of the type ${transport.transmissionType}") + ret <- + transport.transmissionType match // If sent with live_delivery set to true on a connection incapable of live delivery, a problem_report SHOULD be sent + case TransmissionType.SingleTransmission => // Like HTTP + ZIO + .log(s"Connection '${transport.id}' does not support Live Delivery") + .map(_ => + Problems + .liveModeNotSupported( + from = m.to.asFROM, + to = m.from.asTO, + pthid = m.id, + piuri = m.piuri, + ) + .toPlaintextMessage + ) + case TransmissionType.MultiTransmissions => // Like WS + for { + updateTask <- refMediatorTransportManager.update(tm => + if (m.live_delivery) tm.enableLiveMode(m.from.asFROMTO, transport.id) + else tm.disableLiveMode(m.from.asFROMTO, transport.id) + ) + // Make the status reply + repoDidAccount <- ZIO.service[UserAccountRepo] + didRequestingMessages = m.from.asFROMTO + mDidAccount <- repoDidAccount.getDidAccount(didRequestingMessages.toDID) + ret = mDidAccount match + case None => + Problems + .notEnroledError( + from = m.to.asFROM, + to = Some(m.from.asTO), + pthid = m.id, // TODO CHECK pthid + piuri = m.piuri, + didNotEnrolled = didRequestingMessages.asFROM.toDIDSubject, + ) + .toPlaintextMessage + case Some(didAccount) => + val msgHash = didAccount.messagesRef.filter(_.state == false).map(_.hash) + Status( + thid = m.id, + from = m.to.asFROM, + to = m.from.asTO, + recipient_did = None, // m.recipient_did, + message_count = msgHash.size, + longest_waited_seconds = None, // TODO + newest_received_time = None, // TODO + oldest_received_time = None, // TODO + total_bytes = None, // TODO + live_delivery = None, // TODO + ).toPlaintextMessage + } yield ret + } yield Reply(ret) } match case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) case Right(program) => program diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/MediatorTransportManagerUtil.scala b/mediator/src/test/scala/io/iohk/atala/mediator/MediatorTransportManagerUtil.scala new file mode 100644 index 00000000..d8742014 --- /dev/null +++ b/mediator/src/test/scala/io/iohk/atala/mediator/MediatorTransportManagerUtil.scala @@ -0,0 +1,14 @@ +package io.iohk.atala.mediator + +import zio._ +import fmgp.did.framework.TransportFactory +import zio.http._ +import fmgp.did.framework.TransportFactoryImp + +object MediatorTransportManagerUtil { + + // utility + def layerTest: ZLayer[Any, Nothing, Ref[MediatorTransportManager]] = + Scope.default >>> (Client.default >>> TransportFactoryImp.layer).orDie >>> + ZLayer.fromZIO(MediatorTransportManager.make) +} diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/TransportUtil.scala b/mediator/src/test/scala/io/iohk/atala/mediator/TransportUtil.scala new file mode 100644 index 00000000..703a0158 --- /dev/null +++ b/mediator/src/test/scala/io/iohk/atala/mediator/TransportUtil.scala @@ -0,0 +1,61 @@ +package io.iohk.atala.mediator + +import fmgp.did._ +import fmgp.did.comm._ +import fmgp.did.framework._ +import zio.stream._ + +object TransportUtil { + def newTransportEmpty: TransportDIDComm[Any] = + new Transport[Any, SignedMessage | EncryptedMessage, SignedMessage | EncryptedMessage] { + def transmissionFlow = ??? + def transmissionType = ??? + def id: TransportID = "newTransportEmpty_Test" + def inbound: zio.stream.ZStream[Any, Transport.InErr, SignedMessage | EncryptedMessage] = ??? + def outbound: zio.stream.ZSink[Any, Transport.OutErr, SignedMessage | EncryptedMessage, Nothing, Unit] = ??? + } + + def newTransportEmptySingleTransmission: TransportDIDComm[Any] = + new Transport[Any, SignedMessage | EncryptedMessage, SignedMessage | EncryptedMessage] { + + def transmissionFlow = Transport.TransmissionFlow.BothWays + def transmissionType = Transport.TransmissionType.SingleTransmission + + def id: TransportID = "newTransportEmpty_Test_SingleTransmission" + def inbound: zio.stream.ZStream[ + Any, + Transport.InErr, + SignedMessage | EncryptedMessage + ] = ZStream.empty + def outbound: zio.stream.ZSink[ + Any, + Transport.OutErr, + SignedMessage | EncryptedMessage, + Nothing, + Unit + ] = ZSink.drain + + } + + def newTransportEmptyMultiTransmissions: TransportDIDComm[Any] = + new Transport[Any, SignedMessage | EncryptedMessage, SignedMessage | EncryptedMessage] { + + def transmissionFlow = Transport.TransmissionFlow.BothWays + def transmissionType = Transport.TransmissionType.MultiTransmissions + + def id: TransportID = "newTransportEmpty_Test_MultiTransmissions" + def inbound: zio.stream.ZStream[ + Any, + Transport.InErr, + SignedMessage | EncryptedMessage + ] = ZStream.empty + def outbound: zio.stream.ZSink[ + Any, + Transport.OutErr, + SignedMessage | EncryptedMessage, + Nothing, + Unit + ] = ZSink.drain + + } +} diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecutorSpec.scala b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecutorSpec.scala index f123364b..ddff01f7 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecutorSpec.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecutorSpec.scala @@ -5,7 +5,9 @@ import fmgp.did.comm.protocol.* import fmgp.did.comm.{EncryptedMessage, Operations, PlaintextMessage, SignedMessage, layerDefault} import fmgp.did.method.peer.DidPeerResolver import fmgp.util.Base64 +import io.iohk.atala.mediator.MediatorTransportManager import io.iohk.atala.mediator.db.* +import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.* import io.iohk.atala.mediator.db.MessageItemRepoSpec.encryptedMessageAlice import io.iohk.atala.mediator.protocols.ForwardMessageExecuter import zio.* @@ -17,7 +19,6 @@ import zio.test.Assertion.* import fmgp.did.DIDSubject import scala.concurrent.ExecutionContext.Implicits.global -import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.* import reactivemongo.api.bson.BSONDocument /** mediator/testOnly io.iohk.atala.mediator.protocols.ForwardMessageExecutorSpec */ @@ -56,7 +57,9 @@ object ForwardMessageExecutorSpec extends ZIOSpecDefault with DidAccountStubSetu case NoReply => assertTrue(true) } @@ TestAspect.before(setupAndClean) - ).provideSomeLayer(DidPeerResolver.layerDidPeerResolver) + ) + .provideSomeLayer(io.iohk.atala.mediator.MediatorTransportManagerUtil.layerTest) + .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(Operations.layerDefault) .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(AgentStub.agentLayer) diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/MessageSetup.scala b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/MessageSetup.scala index 5f630579..9736ed81 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/MessageSetup.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/MessageSetup.scala @@ -312,4 +312,26 @@ trait MessageSetup { | "typ" : "application/didcomm-plain+json" |}""".stripMargin.fromJson[PlaintextMessage] + val plaintextLiveModeEnable = (didFrom: String, mediatorDid: String) => s"""{ + | "id" : "f0f6c406-c247-4842-8d37-c9a4f77226d8", + | "type" : "https://didcomm.org/messagepickup/3.0/live-delivery-change", + | "to" : ["$mediatorDid"], + | "from" : "$didFrom", + | "thid" : "maybe-thid-if-responding", + | "body" : {"live_delivery" : true}, + | "return_route" : "all", + | "typ" : "application/didcomm-plain+json" + |}""".stripMargin.fromJson[PlaintextMessage] + + val plaintextLiveModeDisable = (didFrom: String, mediatorDid: String) => s"""{ + | "id" : "f0f6c406-c247-4842-8d37-c9a4f77226d8", + | "type" : "https://didcomm.org/messagepickup/3.0/live-delivery-change", + | "to" : ["$mediatorDid"], + | "from" : "$didFrom", + | "thid" : "maybe-thid-if-responding", + | "body" : {"live_delivery" : false}, + | "return_route" : "all", + | "typ" : "application/didcomm-plain+json" + |}""".stripMargin.fromJson[PlaintextMessage] + } diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala index 2b8a5ff6..1c47c677 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala @@ -8,7 +8,7 @@ import fmgp.did.comm.protocol.pickup3.StatusRequest import fmgp.did.method.peer.DidPeerResolver import fmgp.did.{Agent, DIDSubject} import fmgp.util.Base64 -import io.iohk.atala.mediator.MediatorAgent +import io.iohk.atala.mediator.* import io.iohk.atala.mediator.db.* import io.iohk.atala.mediator.db.AgentStub.* import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.* @@ -49,7 +49,7 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M case reply: AnyReply => assertTrue(reply.msg.`type` == ProblemReport.piuri) case _ => assertTrue(false) } - } @@ TestAspect.before(setupAndClean), + }, test("Pickup StatusRequest message should return problem report for not enrolled did") { val executer = PickupExecuter for { @@ -62,7 +62,7 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M case reply: AnyReply => assertTrue(reply.msg.`type` == ProblemReport.piuri) case _ => assertTrue(false) } - } @@ TestAspect.before(setupAndClean), + }, test("Pickup StatusRequest message should return Status Message") { val executer = PickupExecuter for { @@ -80,8 +80,8 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M case reply: AnyReply => assertTrue(reply.msg.`type` == Status.piuri) case _ => assertTrue(false) } - } @@ TestAspect.before(setupAndClean), - test("Pickup DeliveryRequest message return MessageDelivery and attachment message") { + }, + test("Pickup DeliveryRequest message return MessageDelivery and attachment message") { val executer = PickupExecuter val forwardMessageExecuter = ForwardMessageExecuter for { @@ -110,7 +110,7 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M assertTrue(reply.msg.`type` == MessageDelivery.piuri) && assertTrue(reply.msg.attachments.nonEmpty) case _ => assertTrue(false) } - } @@ TestAspect.before(setupAndClean), + }, test("Delivery Request message for Pickup returns a Status Message when there are no messages available") { val pickupExecuter = PickupExecuter for { @@ -130,8 +130,8 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M case reply: AnyReply => assertTrue(reply.msg.`type` == Status.piuri) case _ => assertTrue(false) } - } @@ TestAspect.before(setupAndClean), - test("Messages Received message should clear the messages from the queue") { + }, + test("Messages Received message should clear the messages from the queue") { val executer = PickupExecuter val forwardMessageExecuter = ForwardMessageExecuter for { @@ -165,13 +165,54 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M case reply: AnyReply => assertTrue(false) case NoReply => assertTrue(true) } - } @@ TestAspect.before(setupAndClean) - ).provideSomeLayer(DidPeerResolver.layerDidPeerResolver) - .provideSomeLayer(Operations.layerDefault) - .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) - .provideSomeLayer(AgentStub.agentLayer) - .provideLayerShared(dataAccessLayer) @@ TestAspect.sequential + }, + test("Pickup LiveMode over WS message should return Status Message") { + val executer = PickupExecuter + for { + mediatorAgent <- ZIO.service[MediatorAgent] + userAccount <- ZIO.service[UserAccountRepo] + _ <- userAccount.createOrFindDidAccount(DIDSubject(aliceAgent.id.did)) + _ <- userAccount.addAlias( + owner = DIDSubject(aliceAgent.id.did), + newAlias = DIDSubject(aliceAgent.id.did) + ) + msg <- ZIO.fromEither(plaintextLiveModeEnable(aliceAgent.id.did, mediatorAgent.id.did)) + action <- executer.program(msg) + } yield { + action match + case reply: AnyReply => assertTrue(reply.msg.`type` == Status.piuri) + case _ => assertTrue(false) + } + } + .provideSomeLayer(ZLayer.succeed(TransportUtil.newTransportEmptyMultiTransmissions)), + test("Pickup LiveMode over SingleTransmission (HTTP) message should return ProblemReport") { + val executer = PickupExecuter + for { + mediatorAgent <- ZIO.service[MediatorAgent] + userAccount <- ZIO.service[UserAccountRepo] + _ <- userAccount.createOrFindDidAccount(DIDSubject(aliceAgent.id.did)) + _ <- userAccount.addAlias( + owner = DIDSubject(aliceAgent.id.did), + newAlias = DIDSubject(aliceAgent.id.did) + ) + msg <- ZIO.fromEither(plaintextLiveModeEnable(aliceAgent.id.did, mediatorAgent.id.did)) + action <- executer.program(msg) + } yield { + action match + case reply: AnyReply => assertTrue(reply.msg.`type` == ProblemReport.piuri) + case _ => assertTrue(false) + } + } + .provideSomeLayer(ZLayer.succeed(TransportUtil.newTransportEmptySingleTransmission)), + ) @@ TestAspect.sequential @@ TestAspect.before(setupAndClean) } + .provideSomeLayer(MediatorTransportManagerUtil.layerTest) + .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) + .provideSomeLayer(Operations.layerDefault) + .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) + .provideSomeLayer(AgentStub.agentLayer) + .provideSomeLayer(ZLayer.succeed(TransportUtil.newTransportEmpty)) + .provideLayerShared(dataAccessLayer) val dataAccessLayer = EmbeddedMongoDBInstance.layer(port, hostIp) >>> AsyncDriverResource.layer