Skip to content

Commit

Permalink
Pickup Protocol - Support for Live Mode #145
Browse files Browse the repository at this point in the history
  • Loading branch information
FabioPinheiro committed Nov 27, 2023
1 parent da018b5 commit 3ec8c3c
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 43 deletions.
32 changes: 16 additions & 16 deletions Mediator-Error_Handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ https://identity.foundation/didcomm-messaging/spec/#problem-reports

This table defines the expected behavior of the mediator in different scenarios not covered by the specifications.

| Mediators | Atala Mediator | Roadmap Atala Mediator || RootsId | Blocktrust |
|-------------|----------------|------------------------|---|---------|------------|
| Scenario G1 | G1C | - || ? | ? |
| Scenario G2 | G2B [ATL-5840] | - || | ? |
| Scenario G3 | Fallback G2B | [TODO] G3B || | ? |
| Scenario G4 | G4B | - || | ? |
| Scenario G5 | Fallback G4B | [TODO] G5B || | ? |
| Scenario G6 | Fallback G4B | [WIP] G6B || | ? |
| Scenario G7 | Fallback G4B | [TODO] G7B || | ? |
| Scenario G8 | G8C | - || | ? |
| | | | | | |
| Scenario M1 | M1B | - || | ? |
| Scenario M2 | M2B | - || | ? |
| Scenario M3 | Fallback G4 | M3B || | ? |
| Scenario M4 | M4B | - || | ? |
| Scenario M5 | M5A | [TODO] M5B || | ? |
| Mediators | Atala Mediator | Roadmap Atala Mediator |
|-------------|----------------|------------------------|
| Scenario G1 | G1C | - |
| Scenario G2 | G2B [ATL-5840] | - |
| Scenario G3 | Fallback G2B | [TODO] G3B |
| Scenario G4 | G4B | - |
| Scenario G5 | Fallback G4B | [TODO] G5B |
| Scenario G6 | Fallback G4B | G6B |
| Scenario G7 | Fallback G4B | [TODO] G7B |
| Scenario G8 | G8C | - |
| | | |
| Scenario M1 | M1B | - |
| Scenario M2 | M2B | - |
| Scenario M3 | Fallback G4 | M3B |
| Scenario M4 | M4B | - |
| Scenario M5 | M5A | M5B [#145] |

### Scenarios Description

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import fmgp.did.comm.protocol.reportproblem2.ProblemReport

case class AgentExecutorMediator(
agent: Agent,
transportManager: Ref[TransportManager],
transportManager: Ref[MediatorTransportManager],
protocolHandler: ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError],
userAccountRepo: UserAccountRepo,
messageItemRepo: MessageItemRepo,
Expand Down Expand Up @@ -54,7 +54,9 @@ case class AgentExecutorMediator(
.tapError(ex => ZIO.logError(ex.toString))
.provideSomeLayer(this.indentityLayer)
.provideSomeLayer(userAccountRepoLayer ++ messageItemRepoLayer)
.provideSomeEnvironment((e: ZEnvironment[Resolver & Operations]) => e ++ ZEnvironment(protocolHandler))
.provideSomeEnvironment((e: ZEnvironment[Resolver & Operations]) =>
e ++ ZEnvironment(protocolHandler) ++ ZEnvironment(transportManager) ++ ZEnvironment(transport)
)
.orDieWith(ex => new RuntimeException(ex.toString))

def receiveMessage(
Expand Down Expand Up @@ -175,7 +177,12 @@ case class AgentExecutorMediator(
case Some(problemReport) => ZIO.succeed(Reply(problemReport.toPlaintextMessage))
case None =>
protocolHandler
.program(plaintextMessage)
.program(plaintextMessage) // should we change the signature of the method or use the ZEnvironment
.provideSomeEnvironment(
(e: ZEnvironment[
Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager]
]) => e ++ ZEnvironment(transport)
)
.catchSome { case ProtocolExecutionFailToParse(failToParse) =>
for {
_ <- ZIO.logWarning(s"Error ProtocolExecutionFailToParse: $failToParse")
Expand Down Expand Up @@ -246,7 +253,7 @@ object AgentExecutorMediator {
messageItemRepo: MessageItemRepo,
): ZIO[TransportFactory, Nothing, AgentExecutar] =
for {
transportManager <- TransportManager.make
transportManager <- MediatorTransportManager.make
mediator = AgentExecutorMediator(agent, transportManager, protocolHandler, userAccountRepo, messageItemRepo)
} yield mediator

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ import fmgp.did.*
import fmgp.did.comm.*
import fmgp.did.comm.protocol.*
import fmgp.did.method.peer.*
import fmgp.did.framework.TransportFactoryImp
import io.iohk.atala.mediator.db.*
import io.iohk.atala.mediator.protocols.*
import zio.*
import zio.stream.*
import zio.config.*
import zio.config.magnolia.*
import zio.config.typesafe.*
import zio.http.*
import zio.json.*
import zio.logging.*
import zio.logging.LogFormat.*
import zio.logging.backend.SLF4J
import zio.logging.*
import zio.stream.*

import java.time.format.DateTimeFormatter
import scala.io.Source
import fmgp.did.framework.TransportFactoryImp
case class MediatorConfig(endpoints: String, keyAgreement: OKPPrivateKey, keyAuthentication: OKPPrivateKey) {
val did = DIDPeer2.makeAgent(
Seq(keyAgreement, keyAuthentication),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import io.iohk.atala.mediator.protocols.*
import io.iohk.atala.mediator.db.{UserAccountRepo, MessageItemRepo}

object OperatorImp {
type Services = Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo
type Services = Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager] &
TransportDIDComm[Any]

val protocolHandlerLayer: ULayer[
ProtocolExecuter[Services, MediatorError | StorageError]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ import io.iohk.atala.mediator.*
import io.iohk.atala.mediator.db.*
import zio.*
import zio.json.*
import fmgp.did.comm.protocol.pickup3.MessageDelivery

object ForwardMessageExecuter
extends ProtocolExecuter[Agent & UserAccountRepo & MessageItemRepo, MediatorError | StorageError] {
extends ProtocolExecuter[
Resolver & Operations & Agent & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager],
MediatorError | StorageError
] {

override def supportedPIURI: Seq[PIURI] = Seq(ForwardMessage.piuri)

Expand All @@ -34,6 +38,25 @@ object ForwardMessageExecuter
for {
_ <- repoMessageItem.insert(m.msg)
_ <- ZIO.logInfo("Add next msg (of the ForwardMessage) to the Message Repo")

// For Live Mode
mediatorTransportManager <- ZIO.service[Ref[MediatorTransportManager]].flatMap(_.get)
agent <- ZIO.service[Agent]
messageDelivery = MessageDelivery(
thid = m.id, // FIXME what should I put here?
from = agent.id.asFROM, // Mediator agent
to = m.next.asTO, // Destination of the message that is being forward
recipient_did = None,
attachments = Map(
m.msg.sha256 -> m.msg
)
).toPlaintextMessage
eMsgDelivery <- Operations
.authEncrypt(messageDelivery)
.mapError(didFail => MediatorDidError(didFail))
_ <- mediatorTransportManager
.sendForLiveMode(m.next.asTO, eMsgDelivery)
.mapError(didFail => MediatorDidError(didFail))
} yield NoReply
} else {
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ import io.iohk.atala.mediator.*
import io.iohk.atala.mediator.db.*
import zio.*
import zio.json.*
import fmgp.did.framework._

object PickupExecuter extends ProtocolExecuter[UserAccountRepo & MessageItemRepo, MediatorError | StorageError] {
object PickupExecuter
extends ProtocolExecuter[
UserAccountRepo & MessageItemRepo & TransportDIDComm[Any] & Ref[MediatorTransportManager],
MediatorError | StorageError
] {

override def supportedPIURI: Seq[PIURI] = Seq(
StatusRequest.piuri,
Expand Down Expand Up @@ -176,19 +181,64 @@ object PickupExecuter extends ProtocolExecuter[UserAccountRepo & MessageItemRepo
)
} yield NoReply
case m: LiveModeChange =>
ZIO.logInfo("LiveModeChange Not Supported") *>
ZIO.succeed(
Reply(
Problems
.liveModeNotSupported(
from = m.to.asFROM,
to = m.from.asTO,
pthid = m.id,
piuri = m.piuri,
for {
_ <- ZIO.logInfo("LiveModeChange")
// For Live Mode
refMediatorTransportManager <- ZIO.service[Ref[MediatorTransportManager]]
transport <- ZIO.service[TransportDIDComm[Any]]
ret <-
if (false) { // FIXME TODO transport type support Live Mode ?
// If sent with live_delivery set to true on a connection incapable of live delivery, a problem_report SHOULD be sent
ZIO
.log(s"Connection '${transport.id}' does not support Live Delivery")
.map(_ =>
Problems
.liveModeNotSupported(
from = m.to.asFROM,
to = m.from.asTO,
pthid = m.id,
piuri = m.piuri,
)
.toPlaintextMessage
)
.toPlaintextMessage
)
)
} else
for {
updateTask <- refMediatorTransportManager.update(tm =>
if (m.live_delivery) tm.enableLiveMode(m.from.asFROMTO, transport.id)
else tm.disableLiveMode(m.from.asFROMTO, transport.id)
)

// Make the status reply
repoDidAccount <- ZIO.service[UserAccountRepo]
didRequestingMessages = m.from.asFROMTO
mDidAccount <- repoDidAccount.getDidAccount(didRequestingMessages.toDID)
ret = mDidAccount match
case None =>
Problems
.notEnroledError(
from = m.to.asFROM,
to = Some(m.from.asTO),
pthid = m.id, // TODO CHECK pthid
piuri = m.piuri,
didNotEnrolled = didRequestingMessages.asFROM.toDIDSubject,
)
.toPlaintextMessage
case Some(didAccount) =>
val msgHash = didAccount.messagesRef.filter(_.state == false).map(_.hash)
Status(
thid = m.id,
from = m.to.asFROM,
to = m.from.asTO,
recipient_did = None, // m.recipient_did,
message_count = msgHash.size,
longest_waited_seconds = None, // TODO
newest_received_time = None, // TODO
oldest_received_time = None, // TODO
total_bytes = None, // TODO
live_delivery = None, // TODO
).toPlaintextMessage
} yield ret
} yield Reply(ret)
} match
case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply)
case Right(program) => program
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import fmgp.did.comm.protocol.*
import fmgp.did.comm.{EncryptedMessage, Operations, PlaintextMessage, SignedMessage, layerDefault}
import fmgp.did.method.peer.DidPeerResolver
import fmgp.util.Base64
import io.iohk.atala.mediator.MediatorTransportManager
import io.iohk.atala.mediator.db.*
import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.*
import io.iohk.atala.mediator.db.MessageItemRepoSpec.encryptedMessageAlice
import io.iohk.atala.mediator.protocols.ForwardMessageExecuter
import zio.*
Expand All @@ -17,7 +19,6 @@ import zio.test.Assertion.*
import fmgp.did.DIDSubject

import scala.concurrent.ExecutionContext.Implicits.global
import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.*
import reactivemongo.api.bson.BSONDocument

/** mediator/testOnly io.iohk.atala.mediator.protocols.ForwardMessageExecutorSpec */
Expand Down Expand Up @@ -56,7 +57,9 @@ object ForwardMessageExecutorSpec extends ZIOSpecDefault with DidAccountStubSetu
case NoReply => assertTrue(true)

} @@ TestAspect.before(setupAndClean)
).provideSomeLayer(DidPeerResolver.layerDidPeerResolver)
)
.provideSomeLayer(io.iohk.atala.mediator.MediatorTransportManagerUtil.layerTest)
.provideSomeLayer(DidPeerResolver.layerDidPeerResolver)
.provideSomeLayer(Operations.layerDefault)
.provideSomeLayer(DidPeerResolver.layerDidPeerResolver)
.provideSomeLayer(AgentStub.agentLayer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import fmgp.did.comm.protocol.pickup3.StatusRequest
import fmgp.did.method.peer.DidPeerResolver
import fmgp.did.{Agent, DIDSubject}
import fmgp.util.Base64
import io.iohk.atala.mediator.MediatorAgent
import io.iohk.atala.mediator.*
import io.iohk.atala.mediator.db.*
import io.iohk.atala.mediator.db.AgentStub.*
import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.*
Expand Down Expand Up @@ -166,11 +166,16 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M
case NoReply => assertTrue(true)
}
} @@ TestAspect.before(setupAndClean)
).provideSomeLayer(DidPeerResolver.layerDidPeerResolver)
)
.provideSomeLayer(MediatorTransportManagerUtil.layerTest)
.provideSomeLayer(DidPeerResolver.layerDidPeerResolver)
.provideSomeLayer(Operations.layerDefault)
.provideSomeLayer(DidPeerResolver.layerDidPeerResolver)
.provideSomeLayer(AgentStub.agentLayer)
.provideLayerShared(dataAccessLayer) @@ TestAspect.sequential
.provideSomeEnvironment((e: ZEnvironment[UserAccountRepo & MessageItemRepo]) =>
e ++ ZEnvironment(TransportUtil.newTransportEmpty)
)
.provideLayerShared(dataAccessLayer)
}

val dataAccessLayer = EmbeddedMongoDBInstance.layer(port, hostIp)
Expand Down

0 comments on commit 3ec8c3c

Please sign in to comment.