diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala b/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala index 93be459a..49a69da6 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala @@ -30,11 +30,14 @@ case class AgentExecutorMediator( override def acceptTransport( transport: TransportDIDComm[Any] ): URIO[Operations & Resolver, Unit] = - transport.inbound - .mapZIO(msg => jobExecuterProtocol(msg, transport)) - .runDrain - .forkIn(scope) - .unit // From Fiber.Runtime[fmgp.util.Transport.InErr, Unit] to Unit + for { + _ <- transportManager.update { _.registerTransport(transport) } + _ <- transport.inbound + .mapZIO(msg => jobExecuterProtocol(msg, transport)) + .runDrain + .forkIn(scope) + .unit // From Fiber.Runtime[fmgp.util.Transport.InErr, Unit] to Unit + } yield () override def receiveMsg( msg: SignedMessage | EncryptedMessage, @@ -80,6 +83,7 @@ case class AgentExecutorMediator( .map(_.to.toSet.flatten.map(_.toDIDSubject)) .mapError(didFail => MediatorDidError(didFail)) _ <- transportManager.get.flatMap { m => + // TODO REVIEW what is this code for? ZIO.foreach(recipientsSubject)(subject => m.publish(subject.asTO, msg)) } _ <- @@ -91,8 +95,10 @@ case class AgentExecutorMediator( .decrypt(msg) .tap { pMsg => pMsg.from match - case None => ZIO.unit - case Some(from) => transportManager.update { _.link(from.asFROMTO, transport) } + case None => ZIO.unit + case Some(from) => + ZIO.logInfo(s"Link ${transport.id} to agent ${from.asFROMTO}") *> + transportManager.update { _.link(from.asFROMTO, transport) } } .map(Right(_)) .catchAll { didFail => @@ -257,6 +263,7 @@ object AgentExecutorMediator { messageItemRepo: MessageItemRepo, ): ZIO[TransportFactory, Nothing, AgentExecutar] = for { + _ <- ZIO.logInfo(s"Make Madiator AgentExecutor for ${agent.id}") transportManager <- MediatorTransportManager.make mediator = AgentExecutorMediator(agent, transportManager, protocolHandler, userAccountRepo, messageItemRepo) } yield mediator diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/MediatorTransportManager.scala b/mediator/src/main/scala/io/iohk/atala/mediator/MediatorTransportManager.scala index 2f4d7874..4e664ad3 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/MediatorTransportManager.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/MediatorTransportManager.scala @@ -54,16 +54,16 @@ case class MediatorTransportManager( def enableLiveMode(subject: FROMTO, transportID: TransportID): MediatorTransportManager = this.copy( liveMode = liveMode.updatedWith(subject) { - case Some(set) => Some(set - transportID).filter(_.isEmpty) - case None => None + case Some(set) => Some(set + transportID) + case None => Some(Set(transportID)) } ) def disableLiveMode(subject: FROMTO, transportID: TransportID): MediatorTransportManager = this.copy( liveMode = liveMode.updatedWith(subject) { - case Some(set) => Some(set + transportID) - case None => Some(Set(transportID)) + case Some(set) => Some(set - transportID).filter(_.isEmpty) + case None => None } ) @@ -73,11 +73,12 @@ case class MediatorTransportManager( def sendForLiveMode( next: TO, msg: /*lazy*/ => SignedMessage | EncryptedMessage - ): ZIO[Any, DidFail, Iterable[Unit]] = { - val transportIDs = this.liveMode.getOrElse(next.asFROMTO, Seq.empty) - val myChannels = transportIDs.flatMap(id => this.transports.find(_.id == id)) - ZIO.foreach(myChannels) { _.send(msg) } - } + ): ZIO[Any, DidFail, Unit] = + for { + transportIDs <- ZIO.succeed(this.liveMode.getOrElse(next.asFROMTO, Set.empty)) + myChannels <- ZIO.succeed(transportIDs.flatMap(id => this.transports.find(_.id == id))) + _ <- ZIO.foreach(myChannels) { _.send(msg) } + } yield () // TODO maybe rename to send def publish(to: TO, msg: SignedMessage | EncryptedMessage): ZIO[Any, Nothing, Iterable[Unit]] = { diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala b/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala index 9c47e96b..17478c82 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala @@ -38,7 +38,7 @@ object OperatorImp { userAccountRepo <- ZIO.service[UserAccountRepo] messageItemRepo <- ZIO.service[MessageItemRepo] self <- AgentExecutorMediator.make(mediator, protocolHandlerAux, userAccountRepo, messageItemRepo) - _ <- ZIO.log("Operator: " + self.subject.toString) + _ <- ZIO.log("Layer Operator: " + self.subject.toString) operator = Operator( selfOperator = self, contacts = Seq(self)