From 3ec8c3c2daee3d7cb661139d0e3afac30a38bb87 Mon Sep 17 00:00:00 2001 From: FabioPinheiro Date: Mon, 27 Nov 2023 18:24:44 +0000 Subject: [PATCH] Pickup Protocol - Support for Live Mode #145 --- Mediator-Error_Handling.md | 32 ++++---- .../mediator/AgentExecutorMediator.scala | 15 +++- .../atala/mediator/MediatorStandalone.scala | 6 +- .../io/iohk/atala/mediator/OperatorImp.scala | 3 +- .../protocols/ForwardMessageExecuter.scala | 25 +++++- .../mediator/protocols/PickupExecuter.scala | 76 +++++++++++++++---- .../ForwardMessageExecutorSpec.scala | 7 +- .../protocols/PickupExecuterSpec.scala | 11 ++- 8 files changed, 132 insertions(+), 43 deletions(-) diff --git a/Mediator-Error_Handling.md b/Mediator-Error_Handling.md index 8be52319..035c150e 100644 --- a/Mediator-Error_Handling.md +++ b/Mediator-Error_Handling.md @@ -12,22 +12,22 @@ https://identity.foundation/didcomm-messaging/spec/#problem-reports This table defines the expected behavior of the mediator in different scenarios not covered by the specifications. -| Mediators | Atala Mediator | Roadmap Atala Mediator | │ | RootsId | Blocktrust | -|-------------|----------------|------------------------|---|---------|------------| -| Scenario G1 | G1C | - | │ | ? | ? | -| Scenario G2 | G2B [ATL-5840] | - | │ | | ? | -| Scenario G3 | Fallback G2B | [TODO] G3B | │ | | ? | -| Scenario G4 | G4B | - | │ | | ? | -| Scenario G5 | Fallback G4B | [TODO] G5B | │ | | ? | -| Scenario G6 | Fallback G4B | [WIP] G6B | │ | | ? | -| Scenario G7 | Fallback G4B | [TODO] G7B | │ | | ? | -| Scenario G8 | G8C | - | │ | | ? | -| | | | | | | -| Scenario M1 | M1B | - | │ | | ? | -| Scenario M2 | M2B | - | │ | | ? | -| Scenario M3 | Fallback G4 | M3B | │ | | ? | -| Scenario M4 | M4B | - | │ | | ? | -| Scenario M5 | M5A | [TODO] M5B | │ | | ? | +| Mediators | Atala Mediator | Roadmap Atala Mediator | +|-------------|----------------|------------------------| +| Scenario G1 | G1C | - | +| Scenario G2 | G2B [ATL-5840] | - | +| Scenario G3 | Fallback G2B | [TODO] G3B | +| Scenario G4 | G4B | - | +| Scenario G5 | Fallback G4B | [TODO] G5B | +| Scenario G6 | Fallback G4B | G6B | +| Scenario G7 | Fallback G4B | [TODO] G7B | +| Scenario G8 | G8C | - | +| | | | +| Scenario M1 | M1B | - | +| Scenario M2 | M2B | - | +| Scenario M3 | Fallback G4 | M3B | +| Scenario M4 | M4B | - | +| Scenario M5 | M5A | M5B [#145] | ### Scenarios Description diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala b/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala index c842b1c0..2e4976cc 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala @@ -16,7 +16,7 @@ import fmgp.did.comm.protocol.reportproblem2.ProblemReport case class AgentExecutorMediator( agent: Agent, - transportManager: Ref[TransportManager], + transportManager: Ref[MediatorTransportManager], protocolHandler: ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError], userAccountRepo: UserAccountRepo, messageItemRepo: MessageItemRepo, @@ -54,7 +54,9 @@ case class AgentExecutorMediator( .tapError(ex => ZIO.logError(ex.toString)) .provideSomeLayer(this.indentityLayer) .provideSomeLayer(userAccountRepoLayer ++ messageItemRepoLayer) - .provideSomeEnvironment((e: ZEnvironment[Resolver & Operations]) => e ++ ZEnvironment(protocolHandler)) + .provideSomeEnvironment((e: ZEnvironment[Resolver & Operations]) => + e ++ ZEnvironment(protocolHandler) ++ ZEnvironment(transportManager) ++ ZEnvironment(transport) + ) .orDieWith(ex => new RuntimeException(ex.toString)) def receiveMessage( @@ -175,7 +177,12 @@ case class AgentExecutorMediator( case Some(problemReport) => ZIO.succeed(Reply(problemReport.toPlaintextMessage)) case None => protocolHandler - .program(plaintextMessage) + .program(plaintextMessage) // should we change the signature of the method or use the ZEnvironment + .provideSomeEnvironment( + (e: ZEnvironment[ + Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager] + ]) => e ++ ZEnvironment(transport) + ) .catchSome { case ProtocolExecutionFailToParse(failToParse) => for { _ <- ZIO.logWarning(s"Error ProtocolExecutionFailToParse: $failToParse") @@ -246,7 +253,7 @@ object AgentExecutorMediator { messageItemRepo: MessageItemRepo, ): ZIO[TransportFactory, Nothing, AgentExecutar] = for { - transportManager <- TransportManager.make + transportManager <- MediatorTransportManager.make mediator = AgentExecutorMediator(agent, transportManager, protocolHandler, userAccountRepo, messageItemRepo) } yield mediator diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/MediatorStandalone.scala b/mediator/src/main/scala/io/iohk/atala/mediator/MediatorStandalone.scala index 2e59d326..c764f7e8 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/MediatorStandalone.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/MediatorStandalone.scala @@ -6,22 +6,22 @@ import fmgp.did.* import fmgp.did.comm.* import fmgp.did.comm.protocol.* import fmgp.did.method.peer.* +import fmgp.did.framework.TransportFactoryImp import io.iohk.atala.mediator.db.* import io.iohk.atala.mediator.protocols.* import zio.* +import zio.stream.* import zio.config.* import zio.config.magnolia.* import zio.config.typesafe.* import zio.http.* import zio.json.* +import zio.logging.* import zio.logging.LogFormat.* import zio.logging.backend.SLF4J -import zio.logging.* -import zio.stream.* import java.time.format.DateTimeFormatter import scala.io.Source -import fmgp.did.framework.TransportFactoryImp case class MediatorConfig(endpoints: String, keyAgreement: OKPPrivateKey, keyAuthentication: OKPPrivateKey) { val did = DIDPeer2.makeAgent( Seq(keyAgreement, keyAuthentication), diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala b/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala index 806cdcf4..9c47e96b 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala @@ -12,7 +12,8 @@ import io.iohk.atala.mediator.protocols.* import io.iohk.atala.mediator.db.{UserAccountRepo, MessageItemRepo} object OperatorImp { - type Services = Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo + type Services = Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager] & + TransportDIDComm[Any] val protocolHandlerLayer: ULayer[ ProtocolExecuter[Services, MediatorError | StorageError] diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala index 7ff5a554..f8c94937 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala @@ -10,9 +10,13 @@ import io.iohk.atala.mediator.* import io.iohk.atala.mediator.db.* import zio.* import zio.json.* +import fmgp.did.comm.protocol.pickup3.MessageDelivery object ForwardMessageExecuter - extends ProtocolExecuter[Agent & UserAccountRepo & MessageItemRepo, MediatorError | StorageError] { + extends ProtocolExecuter[ + Resolver & Operations & Agent & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager], + MediatorError | StorageError + ] { override def supportedPIURI: Seq[PIURI] = Seq(ForwardMessage.piuri) @@ -34,6 +38,25 @@ object ForwardMessageExecuter for { _ <- repoMessageItem.insert(m.msg) _ <- ZIO.logInfo("Add next msg (of the ForwardMessage) to the Message Repo") + + // For Live Mode + mediatorTransportManager <- ZIO.service[Ref[MediatorTransportManager]].flatMap(_.get) + agent <- ZIO.service[Agent] + messageDelivery = MessageDelivery( + thid = m.id, // FIXME what should I put here? + from = agent.id.asFROM, // Mediator agent + to = m.next.asTO, // Destination of the message that is being forward + recipient_did = None, + attachments = Map( + m.msg.sha256 -> m.msg + ) + ).toPlaintextMessage + eMsgDelivery <- Operations + .authEncrypt(messageDelivery) + .mapError(didFail => MediatorDidError(didFail)) + _ <- mediatorTransportManager + .sendForLiveMode(m.next.asTO, eMsgDelivery) + .mapError(didFail => MediatorDidError(didFail)) } yield NoReply } else { for { diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala index 6ade8c30..5c8b2e0b 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala @@ -10,8 +10,13 @@ import io.iohk.atala.mediator.* import io.iohk.atala.mediator.db.* import zio.* import zio.json.* +import fmgp.did.framework._ -object PickupExecuter extends ProtocolExecuter[UserAccountRepo & MessageItemRepo, MediatorError | StorageError] { +object PickupExecuter + extends ProtocolExecuter[ + UserAccountRepo & MessageItemRepo & TransportDIDComm[Any] & Ref[MediatorTransportManager], + MediatorError | StorageError + ] { override def supportedPIURI: Seq[PIURI] = Seq( StatusRequest.piuri, @@ -176,19 +181,64 @@ object PickupExecuter extends ProtocolExecuter[UserAccountRepo & MessageItemRepo ) } yield NoReply case m: LiveModeChange => - ZIO.logInfo("LiveModeChange Not Supported") *> - ZIO.succeed( - Reply( - Problems - .liveModeNotSupported( - from = m.to.asFROM, - to = m.from.asTO, - pthid = m.id, - piuri = m.piuri, + for { + _ <- ZIO.logInfo("LiveModeChange") + // For Live Mode + refMediatorTransportManager <- ZIO.service[Ref[MediatorTransportManager]] + transport <- ZIO.service[TransportDIDComm[Any]] + ret <- + if (false) { // FIXME TODO transport type support Live Mode ? + // If sent with live_delivery set to true on a connection incapable of live delivery, a problem_report SHOULD be sent + ZIO + .log(s"Connection '${transport.id}' does not support Live Delivery") + .map(_ => + Problems + .liveModeNotSupported( + from = m.to.asFROM, + to = m.from.asTO, + pthid = m.id, + piuri = m.piuri, + ) + .toPlaintextMessage ) - .toPlaintextMessage - ) - ) + } else + for { + updateTask <- refMediatorTransportManager.update(tm => + if (m.live_delivery) tm.enableLiveMode(m.from.asFROMTO, transport.id) + else tm.disableLiveMode(m.from.asFROMTO, transport.id) + ) + + // Make the status reply + repoDidAccount <- ZIO.service[UserAccountRepo] + didRequestingMessages = m.from.asFROMTO + mDidAccount <- repoDidAccount.getDidAccount(didRequestingMessages.toDID) + ret = mDidAccount match + case None => + Problems + .notEnroledError( + from = m.to.asFROM, + to = Some(m.from.asTO), + pthid = m.id, // TODO CHECK pthid + piuri = m.piuri, + didNotEnrolled = didRequestingMessages.asFROM.toDIDSubject, + ) + .toPlaintextMessage + case Some(didAccount) => + val msgHash = didAccount.messagesRef.filter(_.state == false).map(_.hash) + Status( + thid = m.id, + from = m.to.asFROM, + to = m.from.asTO, + recipient_did = None, // m.recipient_did, + message_count = msgHash.size, + longest_waited_seconds = None, // TODO + newest_received_time = None, // TODO + oldest_received_time = None, // TODO + total_bytes = None, // TODO + live_delivery = None, // TODO + ).toPlaintextMessage + } yield ret + } yield Reply(ret) } match case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) case Right(program) => program diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecutorSpec.scala b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecutorSpec.scala index f123364b..ddff01f7 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecutorSpec.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecutorSpec.scala @@ -5,7 +5,9 @@ import fmgp.did.comm.protocol.* import fmgp.did.comm.{EncryptedMessage, Operations, PlaintextMessage, SignedMessage, layerDefault} import fmgp.did.method.peer.DidPeerResolver import fmgp.util.Base64 +import io.iohk.atala.mediator.MediatorTransportManager import io.iohk.atala.mediator.db.* +import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.* import io.iohk.atala.mediator.db.MessageItemRepoSpec.encryptedMessageAlice import io.iohk.atala.mediator.protocols.ForwardMessageExecuter import zio.* @@ -17,7 +19,6 @@ import zio.test.Assertion.* import fmgp.did.DIDSubject import scala.concurrent.ExecutionContext.Implicits.global -import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.* import reactivemongo.api.bson.BSONDocument /** mediator/testOnly io.iohk.atala.mediator.protocols.ForwardMessageExecutorSpec */ @@ -56,7 +57,9 @@ object ForwardMessageExecutorSpec extends ZIOSpecDefault with DidAccountStubSetu case NoReply => assertTrue(true) } @@ TestAspect.before(setupAndClean) - ).provideSomeLayer(DidPeerResolver.layerDidPeerResolver) + ) + .provideSomeLayer(io.iohk.atala.mediator.MediatorTransportManagerUtil.layerTest) + .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(Operations.layerDefault) .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(AgentStub.agentLayer) diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala index 2b8a5ff6..edb21640 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala @@ -8,7 +8,7 @@ import fmgp.did.comm.protocol.pickup3.StatusRequest import fmgp.did.method.peer.DidPeerResolver import fmgp.did.{Agent, DIDSubject} import fmgp.util.Base64 -import io.iohk.atala.mediator.MediatorAgent +import io.iohk.atala.mediator.* import io.iohk.atala.mediator.db.* import io.iohk.atala.mediator.db.AgentStub.* import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.* @@ -166,11 +166,16 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M case NoReply => assertTrue(true) } } @@ TestAspect.before(setupAndClean) - ).provideSomeLayer(DidPeerResolver.layerDidPeerResolver) + ) + .provideSomeLayer(MediatorTransportManagerUtil.layerTest) + .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(Operations.layerDefault) .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(AgentStub.agentLayer) - .provideLayerShared(dataAccessLayer) @@ TestAspect.sequential + .provideSomeEnvironment((e: ZEnvironment[UserAccountRepo & MessageItemRepo]) => + e ++ ZEnvironment(TransportUtil.newTransportEmpty) + ) + .provideLayerShared(dataAccessLayer) } val dataAccessLayer = EmbeddedMongoDBInstance.layer(port, hostIp)