-
Notifications
You must be signed in to change notification settings - Fork 7
5. Turn Process
Let's complete the last piece of the server implementation: turns processing. At this point, our server can process login and join game events. It also adds the functionality through kafka-streams to transform player events in the input topic into new game events.
Now, we will see how to use one functionality in kafka-streams to join multiple streams and split the messages into different branches based on conditions.
5-1-process-turn-in-server-exercise-setup
|
---|
We have provided the kafka-streams functionality in this exercise's commit, so let's review it.
Open the file modules/server/src/main/scala/scaladays/kafka/stream/GameStream.scala
.
def turnStream(): Unit =
val VALID_TURN = "Valid-Turn"
val INVALID_TURN = "Invalid-Turn"
val NAME_BRANCH = "Turn-Branch-"
val matchTable = builder.table[GameId, Game](kfg.topics.gameTopic)
val branches = builder
.stream[EventId, TTTEvent](kfg.topics.inputTopic)
.flatMap[GameId, AggMessage[EventId, TurnGame]] {
case (eventId, TTTEvent(_, TurnGame(gameId, playerId, position, piece))) =>
Some((gameId, AggMessage[EventId, TurnGame](eventId, TurnGame(gameId, playerId, position, piece))))
case _ => None
}
.leftJoin(matchTable)(TurnLogic.processTurn)
.split(Named.as(NAME_BRANCH))
.branch((_, eitherMatch) => eitherMatch.isRight, Branched.as(VALID_TURN))
.branch((_, eitherMatch) => eitherMatch.isLeft, Branched.as(INVALID_TURN))
.noDefaultBranch()
branches(s"$NAME_BRANCH$VALID_TURN").flatMapValues(_.toOption).to(kfg.topics.gameTopic)
branches(s"$NAME_BRANCH$INVALID_TURN")
.flatMapValues(_.swap.toOption)
.map((_, re) => (EventId.unsafe(), TTTEvent(Instant.now(), re)))
.to(kfg.topics.inputTopic)
This code performs the following logic:
- Define a Kafka table using the
gameTopic
- Read
TurnGame
events from theinputTopic
and transform them intoAggMessage
- Join this stream with the game topic table
- Apply a logic defined in
TurnLogic.processTurn
. This method returns anEither
validating the turn - Valid turns are sent to the game topic, for further processing
- Invalid turns are sent to the input topic as a
RejectEvent
s
Now, open TurnLogic
(modules/server/src/main/scala/scaladays/kafka/topology/TurnLogic.scala
) and implement the processTurn
method. This needs to read the game state and the event and generate a new game state (or an error if it is not processable).
Tip: Branch Game state
Start by matching the
game.state
field and deciding what to do depending on the different values
Since the client will start sending turn events, your second task is to update TTTServer
to process turn events. Open modules/server/src/main/scala/scaladays/server/TTTServer.scala
to add a new clientAction
handler for the type Turn
, and use eventStorage
to send the pertinent event.
Finally, as we have faced previously, there can be some missing Vulcan serdes. Add the needed ones at modules/server/src/main/scala/scaladays/kafka/codecs/VulcanSerdes.scala
.
In previous exercises, we prepared the WebSocket to listen to Game events, and in that way, the app reacted accordingly.
But as it's shown in the next diagram, we are not propagating the Msg.RequestNewMovement
to the server via the
WebSocket.
5-3-process-turn-in-client-exercise-setup
|
---|
As usual, first, let's review the current code state after the new commit.
- We added a new type of message in
Messages.scala
(modules/client/src/main/scala/scaladays/models/Messages.scala
) calledRequestNewMovement
. This new class will store the information about the current game and the movement the client want to perform. - In
GameView.scala
(modules/client/src/main/scala/scaladays/views/GameView.scala
) we send the movement inCellView
when the user clicks on a cell (onClick(Msg.RequestNewMovement(game, newMovement))
) - Finally, we added the signature of the new method to implement in the
ScalaDaysClient.scala
.
In the file ScalaDaysClient.scala
, implement the method publishWs
taking into account the following tips:
Tip 1: WS.publish
Tyrian.WebSocket
implements the functiondef publish[Msg](message: String): Cmd[F, Msg]
Tip 2:
ClientAction
The
ClientAction
that the server expects isTurn
.
Tip 3:
Turn
encoder
turn.asJson.noSpacesSortKeys
encodes aTurn
into aString
In the file Update.scala
, please react to the event Msg.RequestNewMovement(game, newMovement)
.
Tip 1: New Model
The Model evolves with a new Game, which has the state
GameState.Processing
and the list of movements includes thenewMovement
Tip 2: New Cmd
We have to publish the new movement via WebSocket by calling
scalaDaysClient.publishWs(playerId, game.gameId, newMovement, ws)
So we can create new WebSocket connections, we can subscribe to new events and we can publish messages. However we didn't put a solution to disconnect the WS when a game is over or when we want to play another game.
In this commit, we have
- Added a new
Msg
type for the restart (Restart
) inMessages.scala
(modules/client/src/main/scala/scaladays/models/Messages.scala
) - In
GameView.scala
(modules/client/src/main/scala/scaladays/views/GameView.scala
) we send the messageRestart
when the user clicks on the restart button. - Finally, we added the signature of the new method to implement in the
ScalaDaysClient.scala
.
In the file ScalaDaysClient.scala
, implement the method disconnectWebSocket
taking into account the following tips:
Tip 1: WS.disconnect
Tyrian.WebSocket
implements the functiondef disconnect[Msg]: Cmd[F, Msg]
When we implemented the GameView
, we included a button to restart game when it's over:
button(tpe := "button", cls := "btn btn-primary btn-lg px-4 gap-3", onClick(Msg.Restart))("Restart")
In the file Update.scala
, please react to the event Msg.Restart
.
Tip 1: New Model
contest
should adopt theContest.Empty
value, thews
should beNone
, the errors should beNil
Tip 2: New Cmd
If the
Model.ws
is defined then we have to disconnect it byScalaDaysClient.disconnectWebSocket
, otherwiseCmd.None
.