Skip to content

Commit

Permalink
Pickup Protocol - Support for Live Mode #145 (#184)
Browse files Browse the repository at this point in the history
Pickup Protocol - Support for Live Mode #145
Update scala-did to 0.1.0-M16

Signed-off-by: Fabio <Pinheiro>
  • Loading branch information
FabioPinheiro authored and Fabio committed Apr 30, 2024
1 parent cfd55e4 commit 29a587d
Show file tree
Hide file tree
Showing 14 changed files with 414 additions and 58 deletions.
32 changes: 16 additions & 16 deletions Mediator-Error_Handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ https://identity.foundation/didcomm-messaging/spec/#problem-reports

This table defines the expected behavior of the mediator in different scenarios not covered by the specifications.

| Mediators | Atala Mediator | Roadmap Atala Mediator || RootsId | Blocktrust |
|-------------|----------------|------------------------|---|---------|------------|
| Scenario G1 | G1C | - || ? | ? |
| Scenario G2 | G2B [ATL-5840] | - || | ? |
| Scenario G3 | Fallback G2B | [TODO] G3B || | ? |
| Scenario G4 | G4B | - || | ? |
| Scenario G5 | Fallback G4B | [TODO] G5B || | ? |
| Scenario G6 | Fallback G4B | [WIP] G6B || | ? |
| Scenario G7 | Fallback G4B | [TODO] G7B || | ? |
| Scenario G8 | G8C | - || | ? |
| | | | | | |
| Scenario M1 | M1B | - || | ? |
| Scenario M2 | M2B | - || | ? |
| Scenario M3 | Fallback G4 | M3B || | ? |
| Scenario M4 | M4B | - || | ? |
| Scenario M5 | M5A | [TODO] M5B || | ? |
| Mediators | Atala Mediator | Roadmap Atala Mediator |
|-------------|----------------|------------------------|
| Scenario G1 | G1C | - |
| Scenario G2 | G2B [ATL-5840] | - |
| Scenario G3 | Fallback G2B | [TODO] G3B |
| Scenario G4 | G4B | - |
| Scenario G5 | Fallback G4B | [TODO] G5B |
| Scenario G6 | Fallback G4B | G6B |
| Scenario G7 | Fallback G4B | [TODO] G7B |
| Scenario G8 | G8C | - |
| | | |
| Scenario M1 | M1B | - |
| Scenario M2 | M2B | - |
| Scenario M3 | Fallback G4 | M3B |
| Scenario M4 | M4B | - |
| Scenario M5 | M5A | M5B [#145] |

### Scenarios Description

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ inThisBuild(

/** Versions */
lazy val V = new {
val scalaDID = "0.1.0-M15"
val scalaDID = "0.1.0-M16"

// FIXME another bug in the test framework https://github.com/scalameta/munit/issues/554
val munit = "1.0.0-M10" // "0.7.29"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import fmgp.did.comm.protocol.reportproblem2.ProblemReport

case class AgentExecutorMediator(
agent: Agent,
transportManager: Ref[TransportManager],
transportManager: Ref[MediatorTransportManager],
protocolHandler: ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError],
userAccountRepo: UserAccountRepo,
messageItemRepo: MessageItemRepo,
Expand Down Expand Up @@ -54,14 +54,18 @@ case class AgentExecutorMediator(
.tapError(ex => ZIO.logError(ex.toString))
.provideSomeLayer(this.indentityLayer)
.provideSomeLayer(userAccountRepoLayer ++ messageItemRepoLayer)
.provideSomeEnvironment((e: ZEnvironment[Resolver & Operations]) => e ++ ZEnvironment(protocolHandler))
.provideSomeEnvironment((e: ZEnvironment[Resolver & Operations]) =>
e ++ ZEnvironment(protocolHandler) ++ ZEnvironment(transportManager)
)
.orDieWith(ex => new RuntimeException(ex.toString))

def receiveMessage(
msg: SignedMessage | EncryptedMessage,
transport: TransportDIDComm[Any]
): ZIO[
OperatorImp.Services & ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError],
Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager] &
// instead of OperatorImp.Services
ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError],
MediatorError | StorageError,
Unit
] = ZIO.logAnnotate("msg_sha256", msg.sha256) {
Expand Down Expand Up @@ -112,7 +116,9 @@ case class AgentExecutorMediator(
pMsgOrProblemReport: Either[ProblemReport, PlaintextMessage],
transport: TransportDIDComm[Any]
): ZIO[
ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError] & OperatorImp.Services,
Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager] &
// instead of OperatorImp.Services
ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError],
MediatorError | StorageError,
Unit
] =
Expand Down Expand Up @@ -175,7 +181,12 @@ case class AgentExecutorMediator(
case Some(problemReport) => ZIO.succeed(Reply(problemReport.toPlaintextMessage))
case None =>
protocolHandler
.program(plaintextMessage)
.program(plaintextMessage) // should we change the signature of the method or use the ZEnvironment
.provideSomeEnvironment(
(e: ZEnvironment[
Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager]
]) => e ++ ZEnvironment(transport)
)
.catchSome { case ProtocolExecutionFailToParse(failToParse) =>
for {
_ <- ZIO.logWarning(s"Error ProtocolExecutionFailToParse: $failToParse")
Expand Down Expand Up @@ -246,7 +257,7 @@ object AgentExecutorMediator {
messageItemRepo: MessageItemRepo,
): ZIO[TransportFactory, Nothing, AgentExecutar] =
for {
transportManager <- TransportManager.make
transportManager <- MediatorTransportManager.make
mediator = AgentExecutorMediator(agent, transportManager, protocolHandler, userAccountRepo, messageItemRepo)
} yield mediator

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ object DIDCommRoutes {
inboundQueue <- Queue.bounded[SignedMessage | EncryptedMessage](1)
outboundQueue <- Queue.bounded[SignedMessage | EncryptedMessage](1)
transport = new TransportDIDComm[Any] {
def transmissionFlow = Transport.TransmissionFlow.BothWays
def transmissionType = Transport.TransmissionType.SingleTransmission
def id: TransportID = TransportID.http(req.headers.get("request_id"))
def inbound: ZStream[Any, Transport.InErr, SignedMessage | EncryptedMessage] =
ZStream.fromQueue(inboundQueue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ import fmgp.did.*
import fmgp.did.comm.*
import fmgp.did.comm.protocol.*
import fmgp.did.method.peer.*
import fmgp.did.framework.TransportFactoryImp
import io.iohk.atala.mediator.db.*
import io.iohk.atala.mediator.protocols.*
import zio.*
import zio.stream.*
import zio.config.*
import zio.config.magnolia.*
import zio.config.typesafe.*
import zio.http.*
import zio.json.*
import zio.logging.*
import zio.logging.LogFormat.*
import zio.logging.backend.SLF4J
import zio.logging.*
import zio.stream.*

import java.time.format.DateTimeFormatter
import scala.io.Source
import fmgp.did.framework.TransportFactoryImp
case class MediatorConfig(endpoints: String, keyAgreement: OKPPrivateKey, keyAuthentication: OKPPrivateKey) {
val did = DIDPeer2.makeAgent(
Seq(keyAgreement, keyAuthentication),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package io.iohk.atala.mediator

import zio._
import zio.json._
import zio.stream._

import fmgp.did._
import fmgp.did.comm._
import fmgp.did.framework._
import fmgp.crypto.error._
import fmgp.util._

type TransportID = String

/** Based on the [[fmgp.did.framework.TransportDispatcher]] */
case class MediatorTransportManager(
transports: Set[TransportDIDComm[Any]] = Set.empty,
ids: Map[FROMTO, Set[TransportID]] = Map.empty,
kids: Map[VerificationMethodReferenced, Set[TransportID]] = Map.empty,
liveMode: Map[FROMTO, Set[TransportID]] = Map.empty,
transportFactory: TransportFactory
) extends TransportDispatcher {

override def openTransport(uri: String): UIO[TransportDIDComm[Any]] =
transportFactory.openTransport(uri) // FIXME TODO register Transport

def link(vmr: VerificationMethodReferenced, transportID: TransportID): MediatorTransportManager =
if (!transports.map(_.id).contains(transportID)) this // if transport is close
else
kids.get(vmr) match
case Some(seq) if seq.contains(transportID) => this
case Some(seq) => this.copy(kids = kids + (vmr -> (seq + transportID))).link(vmr.did.asFROMTO, transportID)
case None => this.copy(kids = kids + (vmr -> Set(transportID))).link(vmr.did.asFROMTO, transportID)

def link(from: FROMTO, transport: TransportDIDComm[Any]): MediatorTransportManager = link(from, transport.id)
def link(from: FROMTO, transportID: TransportID): MediatorTransportManager =
if (!transports.map(_.id).contains(transportID)) this // if transport is close
else
ids.get(from) match
case Some(seq) if seq.contains(transportID) => this
case Some(seq) => this.copy(ids = ids + (from -> (seq + transportID)))
case None => this.copy(ids = ids + (from -> Set(transportID)))

def registerTransport(transport: TransportDIDComm[Any]) =
this.copy(transports = transports + transport)

def unregisterTransport(transportID: TransportID) = this.copy(
transports = transports.filter(_.id != transportID),
ids = ids.map { case (did, ids) => (did, ids.filter(_ != transportID)) }.filterNot(_._2.isEmpty),
kids = kids.map { case (kid, ids) => (kid, ids.filter(_ != transportID)) }.filterNot(_._2.isEmpty),
liveMode = liveMode.map { case (did, ids) => (did, ids.filter(_ != transportID)) }.filterNot(_._2.isEmpty),
)

def enableLiveMode(subject: FROMTO, transportID: TransportID): MediatorTransportManager =
this.copy(
liveMode = liveMode.updatedWith(subject) {
case Some(set) => Some(set - transportID).filter(_.isEmpty)
case None => None
}
)

def disableLiveMode(subject: FROMTO, transportID: TransportID): MediatorTransportManager =
this.copy(
liveMode = liveMode.updatedWith(subject) {
case Some(set) => Some(set + transportID)
case None => Some(Set(transportID))
}
)

def getLiveModeEnableConnections(subject: FROMTO): Seq[TransportDIDComm[Any]] =
liveMode.get(subject).toSeq.flatMap(transportId => transports.filter(t => transportId.contains(t.id)))

def sendForLiveMode(
next: TO,
msg: /*lazy*/ => SignedMessage | EncryptedMessage
): ZIO[Any, DidFail, Iterable[Unit]] = {
val transportIDs = this.liveMode.getOrElse(next.asFROMTO, Seq.empty)
val myChannels = transportIDs.flatMap(id => this.transports.find(_.id == id))
ZIO.foreach(myChannels) { _.send(msg) }
}

// TODO maybe rename to send
def publish(to: TO, msg: SignedMessage | EncryptedMessage): ZIO[Any, Nothing, Iterable[Unit]] = {
val transportIDs = this.ids.getOrElse(to.asFROMTO, Seq.empty)
val myChannels = transportIDs.flatMap(id => this.transports.find(_.id == id))
ZIO.foreach(myChannels) { _.send(msg) }
}

override def send(
to: TO,
msg: SignedMessage | EncryptedMessage,
thid: Option[MsgID], // TODO use
pthid: Option[MsgID], // TODO use
): ZIO[Resolver & Agent & Operations, DidFail, Unit] =
sendViaDIDCommMessagingService(to, msg).unit

override def sendViaDIDCommMessagingService(
to: TO,
msg: SignedMessage | EncryptedMessage
): ZIO[Resolver & Agent & Operations, DidFail, Either[String, TransportDIDComm[Any]]] =
super.sendViaDIDCommMessagingService(to, msg)

}

object MediatorTransportManager {

def make: URIO[TransportFactory, Ref[MediatorTransportManager]] =
for {
transportFactory <- ZIO.service[TransportFactory]
ref <- Ref.make(MediatorTransportManager(transportFactory = transportFactory))
} yield ref

def registerTransport(transport: TransportDIDComm[Any]) =
for {
socketManager <- ZIO.service[Ref[MediatorTransportManager]]
_ <- socketManager.update { _.registerTransport(transport) }
_ <- ZIO.log(s"RegisterTransport concluded")
} yield ()

def unregisterTransport(transportId: String) =
for {
socketManager <- ZIO.service[Ref[MediatorTransportManager]]
_ <- socketManager.update { case sm: MediatorTransportManager => sm.unregisterTransport(transportId) }
_ <- ZIO.log(s"Channel unregisterSocket")
} yield ()

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import io.iohk.atala.mediator.protocols.*
import io.iohk.atala.mediator.db.{UserAccountRepo, MessageItemRepo}

object OperatorImp {
type Services = Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo
type Services = Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager] &
TransportDIDComm[Any]

val protocolHandlerLayer: ULayer[
ProtocolExecuter[Services, MediatorError | StorageError]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ import io.iohk.atala.mediator.*
import io.iohk.atala.mediator.db.*
import zio.*
import zio.json.*
import fmgp.did.comm.protocol.pickup3.MessageDelivery

object ForwardMessageExecuter
extends ProtocolExecuter[Agent & UserAccountRepo & MessageItemRepo, MediatorError | StorageError] {
extends ProtocolExecuter[
Resolver & Operations & Agent & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager],
MediatorError | StorageError
] {

override def supportedPIURI: Seq[PIURI] = Seq(ForwardMessage.piuri)

Expand All @@ -34,6 +38,25 @@ object ForwardMessageExecuter
for {
_ <- repoMessageItem.insert(m.msg)
_ <- ZIO.logInfo("Add next msg (of the ForwardMessage) to the Message Repo")

// For Live Mode
mediatorTransportManager <- ZIO.service[Ref[MediatorTransportManager]].flatMap(_.get)
agent <- ZIO.service[Agent]
messageDelivery = MessageDelivery(
thid = m.id, // FIXME what should I put here?
from = agent.id.asFROM, // Mediator agent
to = m.next.asTO, // Destination of the message that is being forward
recipient_did = None,
attachments = Map(
m.msg.sha256 -> m.msg
)
).toPlaintextMessage
eMsgDelivery <- Operations
.authEncrypt(messageDelivery)
.mapError(didFail => MediatorDidError(didFail))
_ <- mediatorTransportManager
.sendForLiveMode(m.next.asTO, eMsgDelivery)
.mapError(didFail => MediatorDidError(didFail))
} yield NoReply
} else {
for {
Expand Down
Loading

0 comments on commit 29a587d

Please sign in to comment.