Skip to content

Commit

Permalink
fix: register the transport (#192)
Browse files Browse the repository at this point in the history
  • Loading branch information
FabioPinheiro authored Nov 30, 2023
1 parent 11a4437 commit e73a2ab
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ case class AgentExecutorMediator(
override def acceptTransport(
transport: TransportDIDComm[Any]
): URIO[Operations & Resolver, Unit] =
transport.inbound
.mapZIO(msg => jobExecuterProtocol(msg, transport))
.runDrain
.forkIn(scope)
.unit // From Fiber.Runtime[fmgp.util.Transport.InErr, Unit] to Unit
for {
_ <- transportManager.update { _.registerTransport(transport) }
_ <- transport.inbound
.mapZIO(msg => jobExecuterProtocol(msg, transport))
.runDrain
.forkIn(scope)
.unit // From Fiber.Runtime[fmgp.util.Transport.InErr, Unit] to Unit
} yield ()

override def receiveMsg(
msg: SignedMessage | EncryptedMessage,
Expand Down Expand Up @@ -80,6 +83,7 @@ case class AgentExecutorMediator(
.map(_.to.toSet.flatten.map(_.toDIDSubject))
.mapError(didFail => MediatorDidError(didFail))
_ <- transportManager.get.flatMap { m =>
// TODO REVIEW what is this code for?
ZIO.foreach(recipientsSubject)(subject => m.publish(subject.asTO, msg))
}
_ <-
Expand All @@ -91,8 +95,10 @@ case class AgentExecutorMediator(
.decrypt(msg)
.tap { pMsg =>
pMsg.from match
case None => ZIO.unit
case Some(from) => transportManager.update { _.link(from.asFROMTO, transport) }
case None => ZIO.unit
case Some(from) =>
ZIO.logInfo(s"Link ${transport.id} to agent ${from.asFROMTO}") *>
transportManager.update { _.link(from.asFROMTO, transport) }
}
.map(Right(_))
.catchAll { didFail =>
Expand Down Expand Up @@ -257,6 +263,7 @@ object AgentExecutorMediator {
messageItemRepo: MessageItemRepo,
): ZIO[TransportFactory, Nothing, AgentExecutar] =
for {
_ <- ZIO.logInfo(s"Make Madiator AgentExecutor for ${agent.id}")
transportManager <- MediatorTransportManager.make
mediator = AgentExecutorMediator(agent, transportManager, protocolHandler, userAccountRepo, messageItemRepo)
} yield mediator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ case class MediatorTransportManager(
def enableLiveMode(subject: FROMTO, transportID: TransportID): MediatorTransportManager =
this.copy(
liveMode = liveMode.updatedWith(subject) {
case Some(set) => Some(set - transportID).filter(_.isEmpty)
case None => None
case Some(set) => Some(set + transportID)
case None => Some(Set(transportID))
}
)

def disableLiveMode(subject: FROMTO, transportID: TransportID): MediatorTransportManager =
this.copy(
liveMode = liveMode.updatedWith(subject) {
case Some(set) => Some(set + transportID)
case None => Some(Set(transportID))
case Some(set) => Some(set - transportID).filter(_.isEmpty)
case None => None
}
)

Expand All @@ -73,11 +73,12 @@ case class MediatorTransportManager(
def sendForLiveMode(
next: TO,
msg: /*lazy*/ => SignedMessage | EncryptedMessage
): ZIO[Any, DidFail, Iterable[Unit]] = {
val transportIDs = this.liveMode.getOrElse(next.asFROMTO, Seq.empty)
val myChannels = transportIDs.flatMap(id => this.transports.find(_.id == id))
ZIO.foreach(myChannels) { _.send(msg) }
}
): ZIO[Any, DidFail, Unit] =
for {
transportIDs <- ZIO.succeed(this.liveMode.getOrElse(next.asFROMTO, Set.empty))
myChannels <- ZIO.succeed(transportIDs.flatMap(id => this.transports.find(_.id == id)))
_ <- ZIO.foreach(myChannels) { _.send(msg) }
} yield ()

// TODO maybe rename to send
def publish(to: TO, msg: SignedMessage | EncryptedMessage): ZIO[Any, Nothing, Iterable[Unit]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object OperatorImp {
userAccountRepo <- ZIO.service[UserAccountRepo]
messageItemRepo <- ZIO.service[MessageItemRepo]
self <- AgentExecutorMediator.make(mediator, protocolHandlerAux, userAccountRepo, messageItemRepo)
_ <- ZIO.log("Operator: " + self.subject.toString)
_ <- ZIO.log("Layer Operator: " + self.subject.toString)
operator = Operator(
selfOperator = self,
contacts = Seq(self)
Expand Down

0 comments on commit e73a2ab

Please sign in to comment.