Skip to content

Commit

Permalink
Merge pull request #25 from input-output-hk/abailly-iohk/connect-netw…
Browse files Browse the repository at this point in the history
…ork-model

Connect network model
  • Loading branch information
Arnaud Bailly authored Feb 15, 2024
2 parents 8038eb7 + fd9f51c commit 401210c
Show file tree
Hide file tree
Showing 12 changed files with 350 additions and 201 deletions.
14 changes: 11 additions & 3 deletions peras-hs/src/Peras/Chain.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ module Peras.Chain where

import Peras.Block (Block)

data Chain t
= Genesis
| Cons (Block t) (Chain t)
data Chain t = Genesis
| Cons (Block t) (Chain t)

asList :: Chain t -> [Block t]
asList Genesis = []
asList (Cons x c) = x : asList c

asChain :: [Block t] -> Chain t
asChain [] = Genesis
asChain (x : bs) = Cons x (asChain bs)

2 changes: 1 addition & 1 deletion peras-iosim/analyses/block-production/experiment.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ echo "$i: $SEED"

"$PERAS_IOSIM" --parameter-file tmp-network.yaml --protocol-file tmp-protocol.yaml --result-file tmp-result.json

jq -r '.exitStates.N1 | "'"$SEED","$ASC","$END_SLOT","$TOTAL_STAKE"',\(.stake),\(.preferredChain.blocks|length)"' tmp-result.json >> tmp-results.csv
jq -r '.currentStates.N1 | "'"$SEED","$ASC","$END_SLOT","$TOTAL_STAKE"',\(.stake),\(.preferredChain.blocks|length)"' tmp-result.json >> tmp-results.csv

done

Expand Down
4 changes: 2 additions & 2 deletions peras-iosim/src/Peras/IOSim/GraphViz.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import Control.Lens ((^.))
import Data.List (nub)
import Peras.Block (Block (..))
import Peras.Chain (Chain (..))
import Peras.IOSim.Network.Types (NetworkState, chainsSeen, exitStates)
import Peras.IOSim.Network.Types (NetworkState, chainsSeen, currentStates)
import Peras.IOSim.Node.Types (committeeMember, downstreams, slotLeader, stake, vrfOutput)

import qualified Data.Map.Strict as M
Expand All @@ -28,7 +28,7 @@ peersGraph ::
NetworkState v ->
G.Graph
peersGraph networkState =
let nodeStates = networkState ^. exitStates
let nodeStates = networkState ^. currentStates
nodeIds = M.mapWithKey (\name _ -> G.NodeId (G.StringId $ show name) Nothing) nodeStates
mkNode name nodeState =
G.NodeStatement
Expand Down
4 changes: 3 additions & 1 deletion peras-iosim/src/Peras/IOSim/Message/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ data OutEnvelope v
| Idle
{ timestamp :: UTCTime
, source :: NodeId
, currentState :: NodeState v
}
| Exit
{ timestamp :: UTCTime
Expand All @@ -116,7 +117,7 @@ instance A.FromJSON v => A.FromJSON (OutEnvelope v) where
<*> o A..: "outMessage"
<*> o A..: "bytes"
<*> o A..: "destination"
parseIdle = Idle <$> o A..: "timestamp" <*> o A..: "source"
parseIdle = Idle <$> o A..: "timestamp" <*> o A..: "source" <*> o A..: "currentState"
parseExit = Exit <$> o A..: "timestamp" <*> o A..: "source" <*> o A..: "nodeState"
in parseMessage <|> parseIdle <|> parseExit

Expand All @@ -133,6 +134,7 @@ instance A.ToJSON v => A.ToJSON (OutEnvelope v) where
A.object
[ "timestamp" A..= timestamp
, "source" A..= source
, "currentState" A..= currentState
]
toJSON Exit{..} =
A.object
Expand Down
232 changes: 137 additions & 95 deletions peras-iosim/src/Peras/IOSim/Network.hs
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}

module Peras.IOSim.Network (
connectNode,
createNetwork,
emptyTopology,
randomTopology,
runNetwork,
) where
module Peras.IOSim.Network where

import Control.Concurrent.Class.MonadSTM (MonadSTM, atomically)
import Control.Concurrent.Class.MonadSTM (MonadSTM, STM, TQueue, atomically)
import Control.Concurrent.Class.MonadSTM.TQueue (flushTQueue, newTQueueIO, tryReadTQueue, writeTQueue)
import Control.Lens (
Field1 (_1),
Expand All @@ -23,7 +18,7 @@ import Control.Lens (
(.=),
(^.),
)
import Control.Monad (unless, void, when)
import Control.Monad (unless, void)
import Control.Monad.Class.MonadFork (MonadFork (forkIO))
import Control.Monad.Class.MonadSay (MonadSay (say))
import Control.Monad.Class.MonadTime (MonadTime)
Expand All @@ -42,7 +37,7 @@ import Peras.IOSim.Network.Types (
Topology (..),
activeNodes,
chainsSeen,
exitStates,
currentStates,
lastSlot,
lastTime,
pending,
Expand Down Expand Up @@ -114,107 +109,154 @@ runNetwork ::
Network v m ->
Slot ->
RandT g m (NetworkState v)
runNetwork parameters protocol states Network{..} endSlot =
runNetwork parameters protocol states network@Network{..} endSlot =
liftRandT . (. (def,)) . execStateT $
do
let
-- Find the total stake.
total = fromMaybe (sum $ (^. stake) <$> states) $ totalStake parameters
-- Start a node process.
forkNode (nodeId, nodeIn) =
do
gen <- use _2
let (gen', gen'') = split gen
_2 .= gen'
void
. lift
. forkIO
. runNode gen'' protocol total (states M.! nodeId)
$ NodeProcess nodeIn nodesOut
-- Send a message and mark the destination as active.
output destination inChannel inEnvelope =
do
lift . atomically . writeTQueue inChannel $ inEnvelope
_1 . activeNodes %= S.insert destination
-- Notify a node of the next slot.
notifySlot destination nodeIn =
output destination nodeIn . InEnvelope Nothing . NextSlot
=<< use (_1 . lastSlot)
-- Notify a node to stop.
notifyStop destination nodeIn = output destination nodeIn Stop
-- Route one message.
route out@OutEnvelope{..} =
do
_1 . lastTime %= max timestamp
pendings <- use $ _1 . pending
(r, gen') <- uniformR (0, 1) <$> use _2
_2 .= gen'
-- Send the message if it was already pending or if it was received in the current slot.
-- FIXME: This is an approximation.
if out `elem` pendings || r > messageDelay parameters
then case outMessage of
-- FIXME: Implement this.
FetchBlock _ -> lift $ say "Fetching blocks is not yet implemented."
-- Forward the message to the appropriate node.
SendMessage message ->
do
-- FIXME: Awkwardly peek at the chain.
case message of
NewChain chain -> _1 . chainsSeen %= S.insert chain
_ -> pure ()
-- Forward the message.
output destination (nodesIn M.! destination) $ InEnvelope (pure source) message
else _1 . pending %= (out :)
route Idle{..} =
do
_1 . lastTime %= max timestamp
_1 . activeNodes %= S.delete source
route Exit{..} =
do
_1 . lastTime %= max timestamp
_1 . activeNodes %= S.delete source
_1 . exitStates %= M.insert source nodeState
-- Read all of the pending messages.
flush =
if False -- FIXME: Is it safe to use `flushTQueue`?
then flushTQueue nodesOut -- As of `io-classes-1.3.1.0`, the queue isn't empty after this call!
else tryReadTQueue nodesOut >>= maybe (pure mempty) ((<$> flush) . (:))
-- Wait for all nodes to exit.
waitForExits :: StateT (NetworkState v, g) m ()
waitForExits =
do
allIdle <- (_1 . activeNodes) `uses` null
received <- lift $ atomically flush
mapM_ route received
unless allIdle waitForExits

-- Receive and send messages.
loop :: MonadDelay m => MonadSay m => StateT (NetworkState v, g) m (NetworkState v)
loop =
do
-- Advance the slot counter and notify the nodes, if all nodes are idle.
allIdle <- (_1 . activeNodes) `uses` null
-- FIXME: This is unsafe because a node crashing or becoming unresponsive will block the slot advancement.
when allIdle $
do
-- FIXME: This is unsafe because a node might take more than one slot to do its computations.
_1 . lastSlot %= (+ 1)
uncurry notifySlot `mapM_` M.toList nodesIn
lift $ threadDelay 1000000
-- FIXME: Assume that pending messages are received in the next slot.
mapM_ route =<< use (_1 . pending)
_1 . pending .= mempty
-- Receive and route messages.
received <- lift $ atomically flush
mapM_ route received
stepToIdle parameters network
-- Check on whether the simulation is ending.
doExit <- (_1 . lastSlot) `uses` (>= endSlot)
if doExit
then do
uncurry notifyStop `mapM_` M.toList nodesIn
waitForExits
waitForExits parameters network
gets fst
else loop
-- Start the node processes.
mapM_ forkNode $ M.toList nodesIn
startNodes parameters protocol states network
-- Run the network.
loop

startNodes ::
(Monad m, RandomGen g, Eq v, MonadSTM m, MonadSay m, Ord v, MonadDelay m, Default v, MonadFork m, MonadTime m) =>
Parameters ->
Protocol ->
M.Map NodeId (NodeState v) ->
Network v m ->
StateT (NetworkState v, g) m ()
startNodes parameters protocol states network =
mapM_ forkNode $ M.toList nodesIn
where
Network{nodesIn, nodesOut} = network
-- Find the total stake.
total = fromMaybe (sum $ (^. stake) <$> states) $ totalStake parameters
-- Start a node process.
forkNode (nodeId, nodeIn) =
do
gen <- use _2
let (gen', gen'') = split gen
_2 .= gen'
void
. lift
. forkIO
. runNode gen'' protocol total (states M.! nodeId)
$ NodeProcess nodeIn nodesOut

-- Wait for all nodes to exit.
waitForExits ::
(Monad m, RandomGen g, Eq v, MonadSTM m, MonadSay m, Ord v, MonadDelay m) =>
Parameters ->
Network v m ->
StateT (NetworkState v, g) m ()
waitForExits parameters network =
do
allIdle <- (_1 . activeNodes) `uses` null
received <- lift $ atomically (flush nodesOut)
mapM_ route received
unless allIdle $ waitForExits parameters network
where
Network{nodesOut} = network
route = routeEnvelope parameters network

-- | Read all of the pending messages.
flush :: MonadSTM m => TQueue m a -> STM m [a]
flush q =
if False -- FIXME: Is it safe to use `flushTQueue`?
then flushTQueue q -- As of `io-classes-1.3.1.0`, the queue isn't empty after this call!
else tryReadTQueue q >>= maybe (pure mempty) ((<$> flush q) . (:))

-- | Advance the network up to one single slot.
-- This function loops until all nodes are idle
stepToIdle ::
(Monad m, RandomGen g, Eq v, MonadSTM m, MonadSay m, Ord v, MonadDelay m) =>
Parameters ->
Network v m ->
StateT (NetworkState v, g) m ()
stepToIdle parameters network = do
-- Advance the slot counter and notify the nodes, if all nodes are idle.
allIdle <- (_1 . activeNodes) `uses` null
-- FIXME: This is unsafe because a node crashing or becoming unresponsive will block the slot advancement.
if allIdle
then do
-- FIXME: This is unsafe because a node might take more than one slot to do its computations.
_1 . lastSlot %= (+ 1)
uncurry notifySlot `mapM_` M.toList nodesIn
lift $ threadDelay 1000000
-- FIXME: Assume that pending messages are received in the next slot.
mapM_ route =<< use (_1 . pending)
_1 . pending .= mempty
else do
-- Receive and route messages.
received <- lift $ atomically $ flush nodesOut
mapM_ route received
stepToIdle parameters network
where
Network{nodesIn, nodesOut} = network
route = routeEnvelope parameters network
-- Notify a node of the next slot.
notifySlot destination nodeIn =
output destination nodeIn . InEnvelope Nothing . NextSlot
=<< use (_1 . lastSlot)

-- | Dispatch a single message through the network.
routeEnvelope ::
(Monad m, RandomGen g, Eq v, MonadSTM m, MonadSay m, Ord v) =>
Parameters ->
Network v m ->
OutEnvelope v ->
StateT (NetworkState v, g) m ()
routeEnvelope parameters Network{nodesIn} = \case
out@OutEnvelope{..} ->
do
_1 . lastTime %= max timestamp
pendings <- use $ _1 . pending
(r, gen') <- uniformR (0, 1) <$> use _2
_2 .= gen'
-- Send the message if it was already pending or if it was received in the current slot.
-- FIXME: This is an approximation.
if out `elem` pendings || r > messageDelay parameters
then case outMessage of
-- FIXME: Implement this.
FetchBlock _ -> lift $ say "Fetching blocks is not yet implemented."
-- Forward the message to the appropriate node.
SendMessage message ->
do
-- FIXME: Awkwardly peek at the chain.
case message of
NewChain chain -> _1 . chainsSeen %= S.insert chain
_ -> pure ()
-- Forward the message.
output destination (nodesIn M.! destination) $ InEnvelope (pure source) message
else _1 . pending %= (out :)
Idle{..} -> do
_1 . lastTime %= max timestamp
_1 . activeNodes %= S.delete source
_1 . currentStates %= M.insert source currentState
Exit{..} -> do
_1 . lastTime %= max timestamp
_1 . activeNodes %= S.delete source
_1 . currentStates %= M.insert source nodeState

-- Send a message and mark the destination as active.
output :: MonadSTM m => NodeId -> TQueue m p -> p -> StateT (NetworkState v, g) m ()
output destination inChannel inEnvelope =
do
lift . atomically . writeTQueue inChannel $ inEnvelope
_1 . activeNodes %= S.insert destination
6 changes: 3 additions & 3 deletions peras-iosim/src/Peras/IOSim/Network/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module Peras.IOSim.Network.Types (
Topology (..),
activeNodes,
chainsSeen,
exitStates,
currentStates,
lastSlot,
lastTime,
pending,
Expand Down Expand Up @@ -52,7 +52,7 @@ data NetworkState v = NetworkState
, _lastTime :: UTCTime
, _activeNodes :: S.Set NodeId
, _chainsSeen :: S.Set (Chain v)
, _exitStates :: M.Map NodeId (NodeState v)
, _currentStates :: M.Map NodeId (NodeState v)
, _pending :: [OutEnvelope v]
}
deriving stock (Eq, Generic, Ord, Read, Show)
Expand All @@ -67,7 +67,7 @@ instance ToJSON v => ToJSON (NetworkState v) where
, "lastTime" A..= _lastTime
, "activeNodes" A..= _activeNodes
, "chainsSeen" A..= _chainsSeen
, "exitStates" A..= _exitStates
, "currentStates" A..= _currentStates
, "pending" A..= _pending
]

Expand Down
3 changes: 2 additions & 1 deletion peras-iosim/src/Peras/IOSim/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,13 @@ runNode gen0 protocol total state NodeProcess{..} =
runRand $ nextSlot protocol slot total
SomeBlock _ -> error "Block transfer not implemented."
NewChain chain -> runRand $ newChain protocol chain
currentState <- get
atomically' $
do
case message of
Nothing -> pure ()
Just message' -> mapM_ (writeTQueue outgoing . OutEnvelope now nodeId' (SendMessage message') 0) downstreams'
writeTQueue outgoing $ Idle now nodeId'
writeTQueue outgoing $ Idle now nodeId' currentState
clock .= now
go gen'
Stop -> atomically' . writeTQueue outgoing . Exit now nodeId' =<< get
Expand Down
Loading

0 comments on commit 401210c

Please sign in to comment.