Skip to content

Commit

Permalink
Merge pull request #21 from input-output-hk/bwbush/peras-iosim
Browse files Browse the repository at this point in the history
Refactored to use `MonadRandom`.
  • Loading branch information
bwbush authored Feb 14, 2024
2 parents c594bd9 + 00938c7 commit f457cb8
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 223 deletions.
1 change: 1 addition & 0 deletions Logbook.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### BB - ISSim enhancements

- Improved faithfulness of slot-leader selection.
- Refactored to use `MonadRandom` throughout.

### AB - Model-Based Testing a node

Expand Down
3 changes: 3 additions & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ optimization: False

flags: +defer-plugin-errors

allow-older:
io-classes-mtl

write-ghc-environment-files: never
234 changes: 116 additions & 118 deletions peras-iosim/src/Peras/IOSim/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,24 @@ module Peras.IOSim.Network (

import Control.Concurrent.Class.MonadSTM (MonadSTM, atomically)
import Control.Concurrent.Class.MonadSTM.TQueue (flushTQueue, newTQueueIO, tryReadTQueue, writeTQueue)
import Control.Lens (use, uses, (%=), (^.))
import Control.Monad (unless)
import Control.Lens (
Field1 (_1),
Field2 (_2),
use,
uses,
(%=),
(.=),
(^.),
)
import Control.Monad (unless, void, when)
import Control.Monad.Class.MonadFork (MonadFork (forkIO))
import Control.Monad.Class.MonadSay (MonadSay (say))
import Control.Monad.Class.MonadTime (MonadTime)
import Control.Monad.Class.MonadTimer (MonadDelay (..))
import Control.Monad.Random (Rand, getRandomR)
import Control.Monad.State (StateT, evalStateT, get, lift)
import Control.Monad.Random (MonadRandom, RandT, getRandomR, liftRandT)
import Control.Monad.State (StateT, execStateT, gets, lift)
import Data.Default (Default (def))
import Data.Foldable (foldlM, foldrM)
import Data.Foldable (foldrM)
import Data.List (delete)
import Peras.Block (Slot)
import Peras.IOSim.Message.Types (InEnvelope (..), OutEnvelope (..), OutMessage (..))
Expand Down Expand Up @@ -54,9 +62,9 @@ emptyTopology ::
emptyTopology = Topology . M.fromList . fmap (,S.empty)

randomTopology ::
RandomGen g =>
MonadRandom m =>
Parameters ->
Rand g Topology
m Topology
randomTopology Parameters{..} =
let nodeIds = MkNodeId . ("N" <>) . show <$> [1 .. peerCount]
choose 0 _ = pure mempty
Expand Down Expand Up @@ -99,123 +107,113 @@ runNetwork ::
MonadSTM m =>
MonadTime m =>
RandomGen g =>
g ->
Parameters ->
Protocol ->
M.Map NodeId (NodeState v) ->
Network v m ->
Slot ->
m (NetworkState v)
runNetwork gen0 parameters protocol states Network{..} endSlot =
do
let
-- Find the total stake.
total = sum $ (^. stake) <$> states
-- Split the random number generator.
(gen1, gen2) = split gen0
-- FIXME: Needless to say, the random number generation here is messy. We need to add `MonadRandom` to a transformer stack.
gens = M.fromList $ zip (M.keys states) (splitGen gen2 $ M.size states + 1)
-- Start a node process.
forkNode nodeId nodeIn =
forkIO
. runNode (gens M.! nodeId) parameters protocol total (states M.! nodeId)
$ NodeProcess nodeIn nodesOut
-- Send a message and mark the destination as active.
output destination inChannel inEnvelope =
do
lift . atomically . writeTQueue inChannel $ inEnvelope
activeNodes %= S.insert destination
-- Notify a node of the next slot.
notifySlot destination nodeIn = output destination nodeIn . InEnvelope Nothing . NextSlot =<< use lastSlot
-- Notify a node to stop.
notifyStop destination nodeIn = output destination nodeIn Stop
-- Route one message.
route gen out@OutEnvelope{..} =
do
lastTime %= max timestamp
pendings <- use pending
let (r, gen') = uniformR (0, 1) gen
-- Send the message if it was already pending or if it was received in the current slot.
-- FIXME: This is an approximation.
if out `elem` pendings || r > messageDelay parameters
then case outMessage of
-- FIXME: Implement this.
FetchBlock _ -> lift $ say "Fetching blocks is not yet implemented."
-- Forward the message to the appropriate node.
SendMessage message ->
do
-- FIXME: Awkwardly peek at the chain.
case message of
NewChain chain -> chainsSeen %= S.insert chain
_ -> pure ()
-- Forward the message.
output destination (nodesIn M.! destination) $ InEnvelope (pure source) message
else pending %= (out :)
pure gen'
route gen Idle{..} =
do
lastTime %= max timestamp
activeNodes %= S.delete source
pure gen
route gen Exit{..} =
do
lastTime %= max timestamp
activeNodes %= S.delete source
exitStates %= M.insert source nodeState
pure gen
-- Read all of the pending messages.
flush =
if False -- FIXME: Is it safe to use `flushTQueue`?
then flushTQueue nodesOut -- As of `io-classes-1.3.1.0`, the queue isn't empty after this call!
else tryReadTQueue nodesOut >>= maybe (pure mempty) ((<$> flush) . (:))
-- Wait for all nodes to exit.
waitForExits gen5 =
do
allIdle <- activeNodes `uses` null
received <- lift $ atomically flush
gen' <- foldlM route gen5 received
unless allIdle $ waitForExits gen'
-- Receive and send messages.
loop :: MonadDelay m => MonadSay m => g -> StateT (NetworkState v) m (NetworkState v)
loop gen3 =
do
-- Advance the slot counter and notify the nodes, if all nodes are idle.
allIdle <- activeNodes `uses` null
-- FIXME: This is unsafe because a node crashing or becoming unresponsive will block the slot advancement.
gen4 <-
if allIdle
then do
RandT g m (NetworkState v)
runNetwork parameters protocol states Network{..} endSlot =
liftRandT . (. (def,)) . execStateT $
do
let
-- Find the total stake.
total = sum $ (^. stake) <$> states
-- Start a node process.
forkNode (nodeId, nodeIn) =
do
gen <- use _2
let (gen', gen'') = split gen
_2 .= gen'
void
. lift
. forkIO
. runNode gen'' protocol total (states M.! nodeId)
$ NodeProcess nodeIn nodesOut
-- Send a message and mark the destination as active.
output destination inChannel inEnvelope =
do
lift . atomically . writeTQueue inChannel $ inEnvelope
_1 . activeNodes %= S.insert destination
-- Notify a node of the next slot.
notifySlot destination nodeIn =
output destination nodeIn . InEnvelope Nothing . NextSlot
=<< use (_1 . lastSlot)
-- Notify a node to stop.
notifyStop destination nodeIn = output destination nodeIn Stop
-- Route one message.
route out@OutEnvelope{..} =
do
_1 . lastTime %= max timestamp
pendings <- use $ _1 . pending
(r, gen') <- uniformR (0, 1) <$> use _2
_2 .= gen'
-- Send the message if it was already pending or if it was received in the current slot.
-- FIXME: This is an approximation.
if out `elem` pendings || r > messageDelay parameters
then case outMessage of
-- FIXME: Implement this.
FetchBlock _ -> lift $ say "Fetching blocks is not yet implemented."
-- Forward the message to the appropriate node.
SendMessage message ->
do
-- FIXME: Awkwardly peek at the chain.
case message of
NewChain chain -> _1 . chainsSeen %= S.insert chain
_ -> pure ()
-- Forward the message.
output destination (nodesIn M.! destination) $ InEnvelope (pure source) message
else _1 . pending %= (out :)
route Idle{..} =
do
_1 . lastTime %= max timestamp
_1 . activeNodes %= S.delete source
route Exit{..} =
do
_1 . lastTime %= max timestamp
_1 . activeNodes %= S.delete source
_1 . exitStates %= M.insert source nodeState
-- Read all of the pending messages.
flush =
if False -- FIXME: Is it safe to use `flushTQueue`?
then flushTQueue nodesOut -- As of `io-classes-1.3.1.0`, the queue isn't empty after this call!
else tryReadTQueue nodesOut >>= maybe (pure mempty) ((<$> flush) . (:))
-- Wait for all nodes to exit.
waitForExits :: StateT (NetworkState v, g) m ()
waitForExits =
do
allIdle <- (_1 . activeNodes) `uses` null
received <- lift $ atomically flush
mapM_ route received
unless allIdle waitForExits
-- Receive and send messages.
loop :: MonadDelay m => MonadSay m => StateT (NetworkState v, g) m (NetworkState v)
loop =
do
-- Advance the slot counter and notify the nodes, if all nodes are idle.
allIdle <- (_1 . activeNodes) `uses` null
-- FIXME: This is unsafe because a node crashing or becoming unresponsive will block the slot advancement.
when allIdle $
do
-- FIXME: This is unsafe because a node might take more than one slot to do its computations.
lastSlot %= (+ 1)
_1 . lastSlot %= (+ 1)
uncurry notifySlot `mapM_` M.toList nodesIn
lift $ threadDelay 1000000
-- FIXME: Assume that pending messages are received in the next slot.
gen' <- foldlM route gen3 =<< use pending
pending %= mempty
pure gen'
else pure gen3
-- Receive and route messages.
received <- lift $ atomically flush
gen5 <- foldlM route gen4 received
-- Check on whether the simulation is ending.
doExit <- lastSlot `uses` (>= endSlot)
if doExit
then do
uncurry notifyStop `mapM_` M.toList nodesIn
waitForExits gen5
get
else loop gen5
-- Start the node processes.
uncurry forkNode `mapM_` M.toList nodesIn
-- Run the network.
loop gen1 `evalStateT` def

splitGen ::
RandomGen g =>
g ->
Int ->
[g]
splitGen gen 0 = pure gen
splitGen gen n =
let (gen', gen'') = split gen
in gen' : splitGen gen'' (n - 1)
mapM_ route =<< use (_1 . pending)
_1 . pending .= mempty
-- Receive and route messages.
received <- lift $ atomically flush
mapM_ route received
-- Check on whether the simulation is ending.
doExit <- (_1 . lastSlot) `uses` (>= endSlot)
if doExit
then do
uncurry notifyStop `mapM_` M.toList nodesIn
waitForExits
gets fst
else loop
-- Start the node processes.
mapM_ forkNode $ M.toList nodesIn
-- Run the network.
loop
Loading

0 comments on commit f457cb8

Please sign in to comment.