diff --git a/network-mux/src/Network/Mux/Trace.hs b/network-mux/src/Network/Mux/Trace.hs index aa2c16c75b5..cf9b7976235 100644 --- a/network-mux/src/Network/Mux/Trace.hs +++ b/network-mux/src/Network/Mux/Trace.hs @@ -24,6 +24,7 @@ import Text.Printf import Control.Exception hiding (throwIO) import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime +import Data.Bifunctor (Bifunctor (..)) import Data.Word import GHC.Generics (Generic (..)) import Quiet (Quiet (..)) @@ -101,6 +102,9 @@ handleIOException errorMsg e = throwIO MuxError { data TraceLabelPeer peerid a = TraceLabelPeer peerid a deriving (Eq, Functor, Show) +instance Bifunctor TraceLabelPeer where + bimap f g (TraceLabelPeer a b) = TraceLabelPeer (f a) (g b) + -- | Type used for tracing mux events. -- data WithMuxBearer peerid a = WithMuxBearer { diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Node.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Node.hs index 97846a84acc..7484669dee7 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Node.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Node.hs @@ -67,6 +67,7 @@ import Ouroboros.Network.BlockFetch (BlockFetchConfiguration (..)) import qualified Ouroboros.Network.Diffusion as Diffusion import qualified Ouroboros.Network.Diffusion.NonP2P as NonP2P import qualified Ouroboros.Network.Diffusion.P2P as P2P +import qualified Ouroboros.Network.Diffusion.Policies as Diffusion import Ouroboros.Network.Magic import Ouroboros.Network.NodeToClient (ConnectionId, LocalAddress, LocalSocket, NodeToClientVersionData (..), combineVersions, @@ -77,7 +78,7 @@ import Ouroboros.Network.NodeToNode (DiffusionMode (..), defaultMiniProtocolParameters) import Ouroboros.Network.PeerSelection.LedgerPeers (LedgerPeersConsensusInterface (..)) -import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics (..), +import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics, newPeerMetric, reportMetric) import Ouroboros.Network.Protocol.Limits (shortWait) import Ouroboros.Network.RethrowPolicy @@ -347,7 +348,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} = nodeKernel <- initNodeKernel nodeKernelArgs rnNodeKernelHook registry nodeKernel - peerMetrics <- newPeerMetric + peerMetrics <- newPeerMetric Diffusion.peerMetricsConfiguration let ntnApps = mkNodeToNodeApps nodeKernelArgs nodeKernel peerMetrics ntcApps = mkNodeToClientApps nodeKernelArgs nodeKernel (apps, appsExtra) = mkDiffusionApplications @@ -389,7 +390,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} = (NTN.defaultCodecs codecConfig version) NTN.byteLimits llrnChainSyncTimeout - (reportMetric peerMetrics) + (reportMetric Diffusion.peerMetricsConfiguration peerMetrics) (NTN.mkHandlers nodeKernelArgs nodeKernel) mkNodeToClientApps diff --git a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs b/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs index aecf80c4098..f18bf5a0264 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs @@ -569,5 +569,5 @@ data InboundGovernorTrace peerAddr | TrRemoteState !(Map (ConnectionId peerAddr) RemoteSt) | TrUnexpectedlyFalseAssertion !(IGAssertionLocation peerAddr) -- ^ This case is unexpected at call site. - | TrInboundGovernorError !SomeException + | TrInboundGovernorError !SomeException deriving Show diff --git a/ouroboros-network-testing/src/Ouroboros/Network/Testing/Data/Script.hs b/ouroboros-network-testing/src/Ouroboros/Network/Testing/Data/Script.hs index b448e0ae450..a52f9b80cbc 100644 --- a/ouroboros-network-testing/src/Ouroboros/Network/Testing/Data/Script.hs +++ b/ouroboros-network-testing/src/Ouroboros/Network/Testing/Data/Script.hs @@ -10,6 +10,8 @@ module Ouroboros.Network.Testing.Data.Script , initScript , stepScript , stepScriptSTM + , stepScriptOrFinish + , stepScriptOrFinishSTM , initScript' , stepScript' , stepScriptSTM' @@ -26,6 +28,7 @@ module Ouroboros.Network.Testing.Data.Script , interpretPickScript ) where +import Data.Functor (($>)) import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as NonEmpty import Data.Set (Set) @@ -76,6 +79,20 @@ stepScriptSTM scriptVar = do x':xs' -> LazySTM.writeTVar scriptVar (Script (x' :| xs')) return x +-- | Return 'Left' if it was the last step, return 'Right' if the script can +-- continue. +-- +stepScriptOrFinish :: MonadSTM m => TVar m (Script a) -> m (Either a a) +stepScriptOrFinish scriptVar = atomically (stepScriptOrFinishSTM scriptVar) + +stepScriptOrFinishSTM :: MonadSTM m => TVar m (Script a) -> STM m (Either a a) +stepScriptOrFinishSTM scriptVar = do + Script (x :| xs) <- LazySTM.readTVar scriptVar + case xs of + [] -> return (Left x) + x':xs' -> writeTVar scriptVar (Script (x' :| xs')) + $> Right x + initScript' :: MonadSTM m => Script a -> m (TVar m (Script a)) initScript' = newTVarIO @@ -110,7 +127,7 @@ instance Arbitrary a => Arbitrary (Script a) where type TimedScript a = Script (a, ScriptDelay) -data ScriptDelay = NoDelay | ShortDelay | LongDelay +data ScriptDelay = NoDelay | ShortDelay | LongDelay | Delay DiffTime deriving (Eq, Show) instance Arbitrary ScriptDelay where @@ -121,6 +138,7 @@ instance Arbitrary ScriptDelay where shrink LongDelay = [NoDelay, ShortDelay] shrink ShortDelay = [NoDelay] shrink NoDelay = [] + shrink (Delay _) = [] playTimedScript :: (MonadAsync m, MonadTimer m) => Tracer m a -> TimedScript a -> m (TVar m a) @@ -135,9 +153,10 @@ playTimedScript tracer (Script ((x0,d0) :| script)) = do | (x,d) <- script ] return v where - interpretScriptDelay NoDelay = 0 - interpretScriptDelay ShortDelay = 1 - interpretScriptDelay LongDelay = 3600 + interpretScriptDelay NoDelay = 0 + interpretScriptDelay ShortDelay = 1 + interpretScriptDelay LongDelay = 3600 + interpretScriptDelay (Delay delay) = delay -- diff --git a/ouroboros-network/ouroboros-network.cabal b/ouroboros-network/ouroboros-network.cabal index a6c7e1c5928..eecbf4c6339 100644 --- a/ouroboros-network/ouroboros-network.cabal +++ b/ouroboros-network/ouroboros-network.cabal @@ -298,6 +298,7 @@ test-suite test Test.Ouroboros.Network.PeerSelection.Json Test.Ouroboros.Network.PeerSelection.MockEnvironment Test.Ouroboros.Network.PeerSelection.PeerGraph + Test.Ouroboros.Network.PeerSelection.PeerMetric Test.Ouroboros.Network.NodeToNode.Version Test.Ouroboros.Network.NodeToClient.Version Test.Ouroboros.Network.ShrinkCarefully diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs index cc87cd18219..2b231c94cf4 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs @@ -95,7 +95,7 @@ import Ouroboros.Network.PeerSelection.Governor.Types PeerSelectionCounters (..), TracePeerSelection (..)) import Ouroboros.Network.PeerSelection.LedgerPeers (UseLedgerAfter (..), withLedgerPeers) -import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics (..)) +import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics) import Ouroboros.Network.PeerSelection.PeerStateActions (PeerConnectionHandle, PeerSelectionActionsTrace (..), PeerStateActionsArguments (..), withPeerStateActions) @@ -231,6 +231,20 @@ data ArgumentsExtra m = ArgumentsExtra { -- is using @TIME_WAIT@. -- , daTimeWaitTimeout :: DiffTime + + -- | Churn interval between churn events in deadline mode. A small fuzz + -- is added (max 10 minutes) so that not all nodes churn at the same time. + -- + -- By default it is set to 3300 seconds. + -- + , daDeadlineChurnInterval :: DiffTime + + -- | Churn interval between churn events in bulk sync mode. A small fuzz + -- is added (max 1 minute) so that not all nodes churn at the same time. + -- + -- By default it is set to 300 seconds. + -- + , daBulkChurnInterval :: DiffTime } -- @@ -606,6 +620,8 @@ runM Interfaces , daReadUseLedgerAfter , daProtocolIdleTimeout , daTimeWaitTimeout + , daDeadlineChurnInterval + , daBulkChurnInterval } Applications { daApplicationInitiatorMode @@ -879,6 +895,8 @@ runM Interfaces Async.withAsync (Governor.peerChurnGovernor dtTracePeerSelectionTracer + daDeadlineChurnInterval + daBulkChurnInterval daPeerMetrics churnModeVar churnRng @@ -1026,6 +1044,8 @@ runM Interfaces Async.withAsync (Governor.peerChurnGovernor dtTracePeerSelectionTracer + daDeadlineChurnInterval + daBulkChurnInterval daPeerMetrics churnModeVar churnRng diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs index e115aca7a10..9eea4127037 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs @@ -9,6 +9,8 @@ import Control.Monad.Class.MonadSTM.Strict import Control.Monad.Class.MonadTime import Data.List (sortOn, unfoldr) +import qualified Data.Map.Merge.Strict as Map +import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import qualified Data.Set as Set import Data.Word (Word32) @@ -34,7 +36,7 @@ deactivateTimeout = 300 -- | Timeout for 'spsCloseConnectionTimeout'. -- -- This timeout depends on 'KeepAlive' and 'TipSample' timeouts. 'KeepAlive' --- keeps agancy most of the time, but 'TipSample' can give away its agency for +-- keeps agency most of the time, but 'TipSample' can give away its agency for -- longer periods of time. Here we allow it to get 6 blocks (assuming a new -- block every @20s@). -- @@ -42,6 +44,30 @@ closeConnectionTimeout :: DiffTime closeConnectionTimeout = 120 +-- | Number of events tracked by 'PeerMetrics'. This corresponds to one hour of +-- blocks on mainnet. +-- +-- TODO: issue #3866 +-- +peerMetricsConfiguration :: PeerMetricsConfiguration +peerMetricsConfiguration = PeerMetricsConfiguration { + maxEntriesToTrack = 180 + } + +-- | Merge two dictionaries where values of the first one are obligatory, while +-- the second one are optional. +-- +optionalMerge + :: Ord k + => Map k a + -> Map k b + -> Map k (a, Maybe b) +optionalMerge = Map.merge (Map.mapMissing (\_ a -> (a, Nothing))) + Map.dropMissing + (Map.zipWithMatched (\_ a b -> (a, Just b))) + + + simplePeerSelectionPolicy :: forall m peerAddr. ( MonadSTM m , Ord peerAddr @@ -88,22 +114,31 @@ simplePeerSelectionPolicy rngVar getChurnMode metrics errorDelay = PeerSelection mode <- getChurnMode scores <- case mode of ChurnModeNormal -> do - hup <- upstreamyness <$> getHeaderMetrics metrics - bup <- fetchynessBlocks <$> getFetchedMetrics metrics - return $ Map.unionWith (+) hup bup + jpm <- joinedPeerMetricAt metrics + hup <- upstreamyness metrics + bup <- fetchynessBlocks metrics + return $ Map.unionWith (+) hup bup `optionalMerge` jpm + + ChurnModeBulkSync -> do + jpm <- joinedPeerMetricAt metrics + bup <- fetchynessBytes metrics + return $ bup `optionalMerge` jpm - ChurnModeBulkSync -> - fetchynessBytes <$> getFetchedMetrics metrics available' <- addRand available (,) return $ Set.fromList . map fst . take pickNum + -- order the results, resolve the ties using slot number when + -- a peer joined the leader board. + -- + -- note: this will prefer to preserve newer peers, whose results + -- less certain than peers who entered leader board earlier. . sortOn (\(peer, rn) -> - (Map.findWithDefault 0 peer scores, rn)) + (Map.findWithDefault (0, Nothing) peer scores, rn)) . Map.assocs $ available' - -- Randomly pick peers to demote, peeers with knownPeerTepid set are twice + -- Randomly pick peers to demote, peers with knownPeerTepid set are twice -- as likely to be demoted. warmDemotionPolicy :: PickPolicy peerAddr m warmDemotionPolicy _ _ isTepid available pickNum = do diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs index 48e2ae02afd..396689b6ef7 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs @@ -589,6 +589,10 @@ peerChurnGovernor :: forall m peeraddr. , MonadDelay m ) => Tracer m (TracePeerSelection peeraddr) + -> DiffTime + -- ^ the base for churn interval in the deadline mode. + -> DiffTime + -- ^ the base for churn interval in the bulk sync mode. -> PeerMetrics m peeraddr -> StrictTVar m ChurnMode -> StdGen @@ -596,7 +600,8 @@ peerChurnGovernor :: forall m peeraddr. -> PeerSelectionTargets -> StrictTVar m PeerSelectionTargets -> m Void -peerChurnGovernor tracer _metrics churnModeVar inRng getFetchMode base peerSelectionVar = do +peerChurnGovernor tracer deadlineChurnInterval bulkChurnInterval + _metrics churnModeVar inRng getFetchMode base peerSelectionVar = do -- Wait a while so that not only the closest peers have had the time -- to become warm. startTs0 <- getMonotonicTime @@ -657,8 +662,7 @@ peerChurnGovernor tracer _metrics churnModeVar inRng getFetchMode base peerSelec -- Short delay, we may have no active peers right now threadDelay 1 - -- Pick new active peer(s) based on the best performing established - -- peers. + -- Pick new active peer(s). atomically $ increaseActivePeers churnMode -- Give the promotion process time to start @@ -703,18 +707,11 @@ peerChurnGovernor tracer _metrics churnModeVar inRng getFetchMode base peerSelec longDelay :: StdGen -> DiffTime -> m StdGen - longDelay = fuzzyDelay' churnInterval 600 + longDelay = fuzzyDelay' deadlineChurnInterval 600 shortDelay :: StdGen -> DiffTime -> m StdGen - shortDelay = fuzzyDelay' churnIntervalBulk 60 - - -- The min time between running the churn governor. - churnInterval :: DiffTime - churnInterval = 3300 - - churnIntervalBulk :: DiffTime - churnIntervalBulk = 300 + shortDelay = fuzzyDelay' bulkChurnInterval 60 -- Replace 20% or at least on peer every churnInterval. decrease :: Int -> Int diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs index 97c3bd643f7..037ded629d3 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs @@ -593,8 +593,11 @@ data TracePeerSelection peeraddr = | TracePromoteColdDone Int Int peeraddr -- | target active, actual active, selected peers | TracePromoteWarmPeers Int Int (Set peeraddr) - -- | local per-group (target active, actual active), selected peers - | TracePromoteWarmLocalPeers [(Int, Int)] (Set peeraddr) + -- | Promote local peers to warm + | TracePromoteWarmLocalPeers + [(Int, Int)] -- ^ local per-group `(target active, actual active)`, + -- only limited to groups which are below their target. + (Set peeraddr) -- ^ selected peers -- | target active, actual active, peer, reason | TracePromoteWarmFailed Int Int peeraddr SomeException -- | target active, actual active, peer diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerMetric.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerMetric.hs index dc3aa4a4d11..f9e662ffda6 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerMetric.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerMetric.hs @@ -5,15 +5,42 @@ {-# LANGUAGE ScopedTypeVariables #-} -module Ouroboros.Network.PeerSelection.PeerMetric where +module Ouroboros.Network.PeerSelection.PeerMetric + ( -- * Peer metrics + PeerMetrics + , PeerMetricsConfiguration (..) + , newPeerMetric + -- * Metric calculations + , joinedPeerMetricAt + , upstreamyness + , fetchynessBytes + , fetchynessBlocks + -- * Tracers + , headerMetricTracer + , fetchedMetricTracer + -- * Metrics reporters + , ReportPeerMetrics (..) + , nullMetric + , reportMetric + -- * Internals + -- only exported for testing purposes + , SlotMetric + , newPeerMetric' + ) where +import Control.Monad (when) import Control.Monad.Class.MonadSTM.Strict import Control.Monad.Class.MonadTime import Control.Tracer (Tracer (..), contramap, nullTracer) +import Data.Bifunctor (Bifunctor (..)) import Data.IntPSQ (IntPSQ) -import qualified Data.IntPSQ as Pq +import qualified Data.IntPSQ as IntPSQ import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map +import Data.Maybe (fromMaybe) +import Data.Monoid (Sum (..)) +import Data.OrdPSQ (OrdPSQ) +import qualified Data.OrdPSQ as OrdPSQ import Cardano.Slotting.Slot (SlotNo (..)) import Ouroboros.Network.DeltaQ (SizeInBytes) @@ -21,28 +48,131 @@ import Ouroboros.Network.NodeToNode (ConnectionId (..)) import Ouroboros.Network.PeerSelection.PeerMetric.Type --- The maximum numbers of slots we will store data for. --- On some chains sometimes this corresponds to 1h --- worth of metrics *sighs*. -maxEntriesToTrack :: Int -maxEntriesToTrack = 180 +newtype PeerMetricsConfiguration = PeerMetricsConfiguration { + -- | The maximum numbers of slots we will store data for. On some chains + -- sometimes this corresponds to 1h worth of metrics *sighs*. + -- + -- this number MUST correspond to number of headers / blocks which are + -- produced in one hour. + maxEntriesToTrack :: Int + } +-- | Integer based metric ordered by 'SlotNo' which holds the peer and time. +-- +-- The `p` parameter is truly polymorphic. For `upstreamyness` and we use peer +-- address, and for `fetchyness` it is a pair of peer id and bytes downloaded. +-- type SlotMetric p = IntPSQ SlotNo (p, Time) -data PeerMetrics m p = PeerMetrics { - headerMetrics :: StrictTVar m (SlotMetric p) - , fetchedMetrics :: StrictTVar m (SlotMetric (p, SizeInBytes)) +-- | Peer registry ordered by slot when a peer joined the peer metric. +-- +type PeerRegistry p = OrdPSQ p SlotNo AverageMetrics + +-- | Peer registry ordered by slot when a peer was last seen. +-- +type LastSeenRegistry p = OrdPSQ p SlotNo () + +-- | Mutable peer metrics state accessible via 'STM'. +-- +newtype PeerMetrics m p = PeerMetrics { + peerMetricsVar :: StrictTVar m (PeerMetricsState p) + } + +-- | Internal state +-- +data PeerMetricsState p = PeerMetricsState { + + -- | Header metrics. + -- + headerMetrics :: SlotMetric p, + + -- | Fetch metrics. + -- + fetchedMetrics :: SlotMetric (p, SizeInBytes), + + -- | Registry recording when a peer joined the board of 'PeerMetrics'. The + -- values are average header and fetched metrics. + -- + peerRegistry :: PeerRegistry p, + + -- | A registry which indicates when the last time a peer was seen. + -- + -- If a peer hasn't been seen since the oldest recorded slot number, it will + -- be removed. + -- + lastSeenRegistry :: LastSeenRegistry p, + + -- | Latest slot registered in the leader board. + -- + lastSlotNo :: SlotNo, + + -- | Metrics configuration. Its kept here just for convenience. + -- + metricsConfig :: PeerMetricsConfiguration } + +-- | Average results at a given slot. +-- +data AverageMetrics = AverageMetrics { + averageUpstreamyness :: !Int, + averageFetchynessBlocks :: !Int, + averageFetchynessBytes :: !Int + } + deriving Show + + +newPeerMetric + :: MonadSTM m + => PeerMetricsConfiguration + -> m (PeerMetrics m p) +newPeerMetric = newPeerMetric' IntPSQ.empty IntPSQ.empty + + +newPeerMetric' + :: MonadSTM m + => SlotMetric p + -> SlotMetric (p, SizeInBytes) + -> PeerMetricsConfiguration + -> m (PeerMetrics m p) +newPeerMetric' headerMetrics fetchedMetrics metricsConfig = + PeerMetrics <$> newTVarIO PeerMetricsState { + headerMetrics, + fetchedMetrics, + peerRegistry = OrdPSQ.empty, + lastSeenRegistry = OrdPSQ.empty, + lastSlotNo = SlotNo 0, + metricsConfig + } + +updateLastSlot :: SlotNo -> PeerMetricsState p -> PeerMetricsState p +updateLastSlot slotNo state@PeerMetricsState { lastSlotNo } + | slotNo > lastSlotNo = state { lastSlotNo = slotNo } + | otherwise = state + + +firstSlotNo :: PeerMetricsState p -> Maybe SlotNo +firstSlotNo PeerMetricsState {headerMetrics, fetchedMetrics} = + (\a b -> min (f a) (f b)) + <$> IntPSQ.minView headerMetrics + <*> IntPSQ.minView fetchedMetrics + where + f :: (a, SlotNo, b, c) -> SlotNo + f (_, slotNo, _, _) = slotNo + + reportMetric :: forall m p. - ( MonadSTM m ) - => PeerMetrics m p + ( MonadSTM m + , Ord p + ) + => PeerMetricsConfiguration + -> PeerMetrics m p -> ReportPeerMetrics m (ConnectionId p) -reportMetric peerMetrics = - ReportPeerMetrics (headerMetricTracer peerMetrics) - (fetchedMetricTracer peerMetrics) +reportMetric config peerMetrics = + ReportPeerMetrics (headerMetricTracer config peerMetrics) + (fetchedMetricTracer config peerMetrics) nullMetric :: MonadSTM m @@ -50,85 +180,257 @@ nullMetric nullMetric = ReportPeerMetrics nullTracer nullTracer -slotMetricKey :: SlotNo -> Int -slotMetricKey (SlotNo s) = fromIntegral s +slotToInt :: SlotNo -> Int +slotToInt = fromIntegral . unSlotNo + + +-- | Tracer which updates header metrics (upstreameness) and inserts new peers +-- into 'peerRegistry'. +-- headerMetricTracer :: forall m p. - ( MonadSTM m ) - => PeerMetrics m p + ( MonadSTM m + , Ord p + ) + => PeerMetricsConfiguration + -> PeerMetrics m p -> Tracer (STM m) (TraceLabelPeer (ConnectionId p) (SlotNo, Time)) -headerMetricTracer PeerMetrics{headerMetrics} = - (\(TraceLabelPeer con d) -> TraceLabelPeer (remoteAddress con) d) +headerMetricTracer config peerMetrics@PeerMetrics{peerMetricsVar} = + bimap remoteAddress fst + `contramap` + peerRegistryTracer peerMetrics + <> first remoteAddress `contramap` - metricsTracer headerMetrics + metricsTracer + (headerMetrics <$> readTVar peerMetricsVar) + (\headerMetrics -> modifyTVar peerMetricsVar + (\metrics -> metrics { headerMetrics })) + config + +-- | Tracer which updates fetched metrics (fetchyness) and inserts new peers +-- into 'peerRegistry'. +-- fetchedMetricTracer :: forall m p. - ( MonadSTM m ) - => PeerMetrics m p + ( MonadSTM m + , Ord p + ) + => PeerMetricsConfiguration + -> PeerMetrics m p -> Tracer (STM m) (TraceLabelPeer (ConnectionId p) ( SizeInBytes , SlotNo , Time )) -fetchedMetricTracer PeerMetrics{fetchedMetrics} = - (\(TraceLabelPeer con (bytes, slot, time)) -> +fetchedMetricTracer config peerMetrics@PeerMetrics{peerMetricsVar} = + bimap remoteAddress (\(_, slotNo, _) -> slotNo) + `contramap` + peerRegistryTracer peerMetrics + <> (\(TraceLabelPeer con (bytes, slot, time)) -> TraceLabelPeer (remoteAddress con, bytes) (slot, time)) `contramap` - metricsTracer fetchedMetrics + metricsTracer + (fetchedMetrics <$> readTVar peerMetricsVar) + (\fetchedMetrics -> modifyTVar peerMetricsVar + (\metrics -> metrics { fetchedMetrics })) + config -getHeaderMetrics - :: MonadSTM m - => PeerMetrics m p - -> STM m (SlotMetric p) -getHeaderMetrics PeerMetrics{headerMetrics} = readTVar headerMetrics +-- +-- peer registry tracer which maintains 'peerRegistry' and 'lastSeenRegistry' +-- + +-- | Insert new peer into 'PeerMetricsState'. If this peer hasn't been +-- recorded before, we compute the current average score and record it in +-- 'peerRegistry'. Entries in `peerRegistry' are only kept if they are newer +-- than the oldest slot in the 'headerMetrics' and 'fetchedMetrics'. +-- +-- Implementation detail: +-- We need first check 'lastSeenRegistry' which checks if a peer is part of the +-- leader board. If a peer has not contributed to 'PeerMetrics' in +-- `maxEntriesToTrack` slots, we will consider it as a new peer. Without using +-- `lastSeenRegistry` we could consider a peer new while it exists in peer +-- metrics for a very long time. Just using `peerRegistry` does not guarantee +-- that. +-- +insertPeer :: forall p. Ord p + => p + -> SlotNo -- ^ current slot + -> PeerMetricsState p + -> PeerMetricsState p +insertPeer p slotNo + peerMetricsState@PeerMetricsState { lastSeenRegistry, peerRegistry } = + if p `OrdPSQ.member` lastSeenRegistry + then peerMetricsState + else case OrdPSQ.alter f p peerRegistry of + (False, peerRegistry') -> peerMetricsState { peerRegistry = peerRegistry' } + (True, _peerRegistry') -> peerMetricsState + where + f :: Maybe (SlotNo, AverageMetrics) -> (Bool, Maybe (SlotNo, AverageMetrics)) + f a@Just {} = (True, a) + f Nothing = (False, Just ( slotNo + , AverageMetrics { + averageUpstreamyness = avg upstreamenessResults, + averageFetchynessBytes = avg fetchynessBytesResults, + averageFetchynessBlocks = avg fetchynessBlocksResults + } + )) + where + upstreamenessResults = upstreamynessImpl peerMetricsState + fetchynessBytesResults = fetchynessBytesImpl peerMetricsState + fetchynessBlocksResults = fetchynessBlocksImpl peerMetricsState + + avg :: Map p Int -> Int + avg m | Map.null m = 0 + avg m = + -- division truncated towards the plus infinity, rather then the minus + -- infinity + case getSum (foldMap Sum m) `divMod` Map.size m of + (x, 0) -> x + (x, _) -> x + 1 + + +-- | A tracer which takes care about: +-- +-- * inserting new peers to 'peerRegistry' +-- * removing old entries of 'peerRegistry' +-- +-- * inserting new peers to 'lastSeenRegistry' +-- * removing old entries of 'lastSeenRegistry' +-- +peerRegistryTracer :: forall p m. + ( MonadSTM m + , Ord p + ) + => PeerMetrics m p + -> Tracer (STM m) (TraceLabelPeer p SlotNo) +peerRegistryTracer PeerMetrics { peerMetricsVar } = + Tracer $ \(TraceLabelPeer peer slotNo) -> do + -- order matters: 'insertPeer' must access the previous value of + -- lastSeenRegistry + modifyTVar peerMetricsVar $ updateLastSlot slotNo + . witnessedPeer peer slotNo + . insertPeer peer slotNo + . afterSlot + where + snd_ (_, slotNo, _, _) = slotNo + + -- remove all entries which are older than the oldest slot in the + -- 'PeerMetrics' + afterSlot :: PeerMetricsState p -> PeerMetricsState p + afterSlot peerMetrics@PeerMetricsState { headerMetrics, + fetchedMetrics, + peerRegistry, + lastSeenRegistry } = + let -- the oldest slot in the metrics leader board + slotNo :: SlotNo + slotNo = fromMaybe 0 $ + (snd_ <$> IntPSQ.minView headerMetrics) + `min` + (snd_ <$> IntPSQ.minView fetchedMetrics) + + in peerMetrics + { peerRegistry = snd (OrdPSQ.atMostView slotNo peerRegistry), + lastSeenRegistry = snd (OrdPSQ.atMostView slotNo lastSeenRegistry) + } + + witnessedPeer :: p -> SlotNo + -> PeerMetricsState p -> PeerMetricsState p + witnessedPeer peer slotNo + peerMetrics@PeerMetricsState { lastSeenRegistry } = + peerMetrics { lastSeenRegistry = + OrdPSQ.insert peer slotNo () lastSeenRegistry + } -getFetchedMetrics - :: MonadSTM m - => PeerMetrics m p - -> STM m (SlotMetric (p, SizeInBytes)) -getFetchedMetrics PeerMetrics{fetchedMetrics} = readTVar fetchedMetrics +-- +-- Metrics tracer +-- + +-- | A metrics tracer which updates the metric. +-- metricsTracer - :: forall m p. ( MonadSTM m ) - => StrictTVar m (SlotMetric p) + :: forall m p. + MonadSTM m + => STM m (SlotMetric p) -- ^ read metrics + -> (SlotMetric p -> STM m ()) -- ^ update metrics + -> PeerMetricsConfiguration -> Tracer (STM m) (TraceLabelPeer p (SlotNo, Time)) -metricsTracer metricsVar = Tracer $ \(TraceLabelPeer !peer (!slot, !time)) -> do - metrics <- readTVar metricsVar - case Pq.lookup (slotMetricKey slot) metrics of - Nothing -> do - let metrics' = Pq.insert (slotMetricKey slot) slot (peer, time) metrics - if Pq.size metrics' > maxEntriesToTrack - then - case Pq.minView metrics' of - Nothing -> error "impossible empty pq" -- We just inserted an element! - Just (_, minSlotNo, _, metrics'') -> - if minSlotNo == slot - then return () - else writeTVar metricsVar metrics'' - else writeTVar metricsVar metrics' - Just (_, (_, oldTime)) -> - if oldTime <= time - then return () - else writeTVar metricsVar (Pq.insert (slotMetricKey slot) slot (peer, time) metrics) +metricsTracer getMetrics writeMetrics PeerMetricsConfiguration { maxEntriesToTrack } = + Tracer $ \(TraceLabelPeer !peer (!slot, !time)) -> do + metrics <- getMetrics + case IntPSQ.lookup (slotToInt slot) metrics of + Nothing -> do + let metrics' = IntPSQ.insert (slotToInt slot) slot (peer, time) metrics + if IntPSQ.size metrics' > maxEntriesToTrack + -- drop last element if the metric board is too large + then case IntPSQ.minView metrics' of + Nothing -> error "impossible empty pq" + -- We just inserted an element! + Just (_, minSlotNo, _, metrics'') -> + when (minSlotNo /= slot) $ + writeMetrics metrics'' + else writeMetrics metrics' + Just (_, (_, oldTime)) -> + when (oldTime > time) $ + writeMetrics (IntPSQ.insert (slotToInt slot) slot + (peer, time) metrics) -newPeerMetric - :: MonadSTM m - => m (PeerMetrics m p) -newPeerMetric = do - hs <- newTVarIO Pq.empty - bs <- newTVarIO Pq.empty - return $ PeerMetrics hs bs - --- Returns a Map which counts the number of times a given peer --- was the first to present us with a block/header. + +joinedPeerMetricAt + :: forall p m. + MonadSTM m + => Ord p + => PeerMetrics m p + -> STM m (Map p SlotNo) +joinedPeerMetricAt PeerMetrics {peerMetricsVar} = + joinedPeerMetricAtImpl <$> readTVar peerMetricsVar + + +joinedPeerMetricAtImpl + :: forall p. + Ord p + => PeerMetricsState p + -> Map p SlotNo +joinedPeerMetricAtImpl PeerMetricsState { peerRegistry } = + OrdPSQ.fold' (\p slotNo _ m -> Map.insert p slotNo m) Map.empty peerRegistry + +-- +-- Metrics +-- +-- * upstreameness +-- * fetchyness by blocks +-- * fetchyness by bytes +-- + +-- | Returns a Map which counts the number of times a given peer was the first +-- to present us with a block/header. +-- upstreamyness - :: forall p. ( Ord p ) - => SlotMetric p + :: forall p m. + MonadSTM m + => Ord p + => PeerMetrics m p + -> STM m (Map p Int) +upstreamyness PeerMetrics {peerMetricsVar} = + upstreamynessImpl <$> readTVar peerMetricsVar + + +upstreamynessImpl + :: forall p. + Ord p + => PeerMetricsState p -> Map p Int -upstreamyness = Pq.fold' count Map.empty +upstreamynessImpl state@PeerMetricsState { headerMetrics, + peerRegistry, + lastSlotNo, + metricsConfig + } = + Map.unionWith (+) (IntPSQ.fold' count Map.empty headerMetrics) + (OrdPSQ.fold' (countCorrection (firstSlotNo state)) + Map.empty peerRegistry) where count :: Int -> SlotNo @@ -142,14 +444,47 @@ upstreamyness = Pq.fold' count Map.empty fn Nothing = Just 1 fn (Just c) = Just $! c + 1 + countCorrection :: Maybe SlotNo + -> p + -> SlotNo + -> AverageMetrics + -> Map p Int + -> Map p Int + countCorrection minSlotNo peer joinedAt AverageMetrics { averageUpstreamyness } m = + Map.insert peer + (adjustAvg metricsConfig + minSlotNo + joinedAt + lastSlotNo + averageUpstreamyness) + m --- Returns a Map which counts the number of bytes downloaded --- for a given peer. + +-- | Returns a Map which counts the number of bytes downloaded for a given +-- peer. +-- fetchynessBytes - :: forall p. ( Ord p ) - => SlotMetric (p, SizeInBytes) + :: forall p m. + MonadSTM m + => Ord p + => PeerMetrics m p + -> STM m (Map p Int) +fetchynessBytes PeerMetrics {peerMetricsVar} = + fetchynessBytesImpl <$> readTVar peerMetricsVar + +fetchynessBytesImpl + :: forall p. + Ord p + => PeerMetricsState p -> Map p Int -fetchynessBytes = Pq.fold' count Map.empty +fetchynessBytesImpl state@PeerMetricsState { fetchedMetrics, + peerRegistry, + lastSlotNo, + metricsConfig + } = + Map.unionWith (+) (IntPSQ.fold' count Map.empty fetchedMetrics) + (OrdPSQ.fold' (countCorrection (firstSlotNo state)) + Map.empty peerRegistry) where count :: Int -> SlotNo @@ -163,13 +498,47 @@ fetchynessBytes = Pq.fold' count Map.empty fn Nothing = Just $ fromIntegral bytes fn (Just oldBytes) = Just $! oldBytes + fromIntegral bytes --- Returns a Map which counts the number of times a given peer --- was the first we downloaded a block from. + countCorrection :: Maybe SlotNo + -> p + -> SlotNo + -> AverageMetrics + -> Map p Int + -> Map p Int + countCorrection minSlotNo peer joinedAt AverageMetrics { averageFetchynessBytes } m = + Map.insert peer + (adjustAvg metricsConfig + minSlotNo + joinedAt + lastSlotNo + averageFetchynessBytes) + m + + +-- | Returns a Map which counts the number of times a given peer was the first +-- we downloaded a block from. +-- fetchynessBlocks - :: forall p. ( Ord p ) - => SlotMetric (p, SizeInBytes) + :: forall p m. + MonadSTM m + => Ord p + => PeerMetrics m p + -> STM m (Map p Int) +fetchynessBlocks PeerMetrics {peerMetricsVar} = + fetchynessBlocksImpl <$> readTVar peerMetricsVar + +fetchynessBlocksImpl + :: forall p. + Ord p + => PeerMetricsState p -> Map p Int -fetchynessBlocks = Pq.fold' count Map.empty +fetchynessBlocksImpl state@PeerMetricsState { fetchedMetrics, + peerRegistry, + lastSlotNo, + metricsConfig + } = + Map.unionWith (+) (IntPSQ.fold' count Map.empty fetchedMetrics) + (OrdPSQ.fold' (countCorrection (firstSlotNo state)) + Map.empty peerRegistry) where count :: Int -> SlotNo @@ -183,4 +552,49 @@ fetchynessBlocks = Pq.fold' count Map.empty fn Nothing = Just 1 fn (Just c) = Just $! c + 1 + countCorrection :: Maybe SlotNo + -> p + -> SlotNo + -> AverageMetrics + -> Map p Int + -> Map p Int + countCorrection minSlotNo peer joinedAt AverageMetrics { averageFetchynessBlocks } m = + Map.insert peer + (adjustAvg metricsConfig + minSlotNo + joinedAt + lastSlotNo + averageFetchynessBlocks) + m + + +-- +-- Utils +-- + +adjustAvg :: PeerMetricsConfiguration + -> Maybe SlotNo -- ^ smallest slot number + -> SlotNo -- ^ slot when joined the leader board + -> SlotNo -- ^ current slot + -> Int + -> Int +adjustAvg PeerMetricsConfiguration { maxEntriesToTrack } minSlotNo joinedSlotNo lastSlotNo avg + -- when there are only a few results in the 'PeerMetricsState' we don't + -- take into account the average. This allows the system to start, without + -- penalising the peers which we connected to early. + | lastSlot - minSlot + 1 < maxEntriesToTrack `div` 2 + = 0 + + -- the peer is too old to take the correction into account. + | lastSlot - joinedSlot + 1 >= maxEntriesToTrack + = 0 + + | otherwise + = (maxEntriesToTrack + joinedSlot - lastSlot) * avg + `div` maxEntriesToTrack + where + minSlot, lastSlot, joinedSlot :: Int + minSlot = maybe 1 slotToInt minSlotNo + lastSlot = slotToInt lastSlotNo + joinedSlot = slotToInt joinedSlotNo diff --git a/ouroboros-network/test/Main.hs b/ouroboros-network/test/Main.hs index b2b1670327c..ff05ed9c2df 100644 --- a/ouroboros-network/test/Main.hs +++ b/ouroboros-network/test/Main.hs @@ -25,6 +25,7 @@ import qualified Test.Ouroboros.Network.PeerSelection (tests) import qualified Test.Ouroboros.Network.PeerSelection.Json (tests) import qualified Test.Ouroboros.Network.PeerSelection.LocalRootPeers import qualified Test.Ouroboros.Network.PeerSelection.MockEnvironment +import qualified Test.Ouroboros.Network.PeerSelection.PeerMetric import qualified Test.Ouroboros.Network.PeerSelection.RootPeersDNS import qualified Test.Ouroboros.Network.Testnet (tests) import qualified Test.Ouroboros.Network.TxSubmission (tests) @@ -66,6 +67,7 @@ tests = , Test.Ouroboros.Network.PeerSelection.Json.tests , Test.Ouroboros.Network.PeerSelection.LocalRootPeers.tests , Test.Ouroboros.Network.PeerSelection.MockEnvironment.tests + , Test.Ouroboros.Network.PeerSelection.PeerMetric.tests , Test.Ouroboros.Network.PeerSelection.RootPeersDNS.tests , Test.Ouroboros.Network.KeepAlive.tests , Test.Ouroboros.Network.TxSubmission.tests diff --git a/ouroboros-network/test/Test/Ouroboros/Network/Diffusion/Node.hs b/ouroboros-network/test/Test/Ouroboros/Network/Diffusion/Node.hs index 428b397fc3f..35b5691e79c 100644 --- a/ouroboros-network/test/Test/Ouroboros/Network/Diffusion/Node.hs +++ b/ouroboros-network/test/Test/Ouroboros/Network/Diffusion/Node.hs @@ -34,8 +34,7 @@ import Control.Monad.Class.MonadFork (MonadFork) import Control.Monad.Class.MonadST (MonadST) import qualified Control.Monad.Class.MonadSTM as LazySTM import Control.Monad.Class.MonadSTM.Strict (MonadLabelledSTM, - MonadSTM (STM, atomically), MonadTraceSTM, StrictTVar, - newTVar) + MonadSTM (STM), MonadTraceSTM, StrictTVar) import Control.Monad.Class.MonadThrow (MonadEvaluate, MonadMask, MonadThrow, SomeException) import Control.Monad.Class.MonadTime (DiffTime, MonadTime) @@ -44,7 +43,6 @@ import Control.Monad.Fix (MonadFix) import Control.Tracer (nullTracer) import Data.IP (IP (..)) -import qualified Data.IntPSQ as IntPSQ import Data.Map (Map) import Data.Set (Set) import qualified Data.Text as Text @@ -64,7 +62,8 @@ import Ouroboros.Network.PeerSelection.Governor (PeerSelectionTargets (..)) import Ouroboros.Network.PeerSelection.LedgerPeers (LedgerPeersConsensusInterface (..), UseLedgerAfter (..)) -import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics (..)) +import Ouroboros.Network.PeerSelection.PeerMetric + (PeerMetricsConfiguration (..), newPeerMetric) import Ouroboros.Network.PeerSelection.RootPeersDNS (DomainAccessPoint (..), LookupReqs (..), RelayAccessPoint (..)) @@ -165,9 +164,7 @@ run blockGeneratorArgs limits ni na tracersExtra = $ \ nodeKernel nodeKernelThread -> do dnsTimeoutScriptVar <- LazySTM.newTVarIO (aDNSTimeoutScript na) dnsLookupDelayScriptVar <- LazySTM.newTVarIO (aDNSLookupDelayScript na) - peerMetrics <- atomically $ PeerMetrics - <$> newTVar IntPSQ.empty - <*> newTVar IntPSQ.empty + peerMetrics <- newPeerMetric PeerMetricsConfiguration { maxEntriesToTrack = 180 } let -- diffusion interfaces interfaces :: Diff.P2P.Interfaces (NtNFD m) NtNAddr NtNVersion NtNVersionData (NtCFD m) NtCAddr NtCVersion NtCVersionData @@ -269,12 +266,14 @@ run blockGeneratorArgs limits ni na tracersExtra = argsExtra :: Diff.P2P.ArgumentsExtra m argsExtra = Diff.P2P.ArgumentsExtra - { Diff.P2P.daPeerSelectionTargets = aPeerSelectionTargets na - , Diff.P2P.daReadLocalRootPeers = aReadLocalRootPeers na - , Diff.P2P.daReadPublicRootPeers = aReadPublicRootPeers na - , Diff.P2P.daReadUseLedgerAfter = aReadUseLedgerAfter na - , Diff.P2P.daProtocolIdleTimeout = aProtocolIdleTimeout na - , Diff.P2P.daTimeWaitTimeout = aTimeWaitTimeout na + { Diff.P2P.daPeerSelectionTargets = aPeerSelectionTargets na + , Diff.P2P.daReadLocalRootPeers = aReadLocalRootPeers na + , Diff.P2P.daReadPublicRootPeers = aReadPublicRootPeers na + , Diff.P2P.daReadUseLedgerAfter = aReadUseLedgerAfter na + , Diff.P2P.daProtocolIdleTimeout = aProtocolIdleTimeout na + , Diff.P2P.daTimeWaitTimeout = aTimeWaitTimeout na + , Diff.P2P.daDeadlineChurnInterval = 3300 + , Diff.P2P.daBulkChurnInterval = 300 } appArgs :: Node.AppArgs m diff --git a/ouroboros-network/test/Test/Ouroboros/Network/Diffusion/Policies.hs b/ouroboros-network/test/Test/Ouroboros/Network/Diffusion/Policies.hs index 73bcc36ff1e..89c794fc923 100644 --- a/ouroboros-network/test/Test/Ouroboros/Network/Diffusion/Policies.hs +++ b/ouroboros-network/test/Test/Ouroboros/Network/Diffusion/Policies.hs @@ -153,16 +153,14 @@ prop_hotToWarmM ArbitraryPolicyArguments{..} seed = do let rng = mkStdGen seed rngVar <- newTVarIO rng cmVar <- newTVarIO apaChurnMode - hVar <- newTVarIO apaHeaderMetric - fVar <- newTVarIO apaFetchedMetric - + metrics <- newPeerMetric' apaHeaderMetric apaFetchedMetric + PeerMetricsConfiguration { maxEntriesToTrack = 180 } let policies = simplePeerSelectionPolicy rngVar (readTVar cmVar) metrics (ReconnectDelay 10) - metrics = PeerMetrics hVar fVar picked <- atomically $ policyPickHotPeersToDemote policies (const PeerSourceLocalRoot) peerConnectFailCount @@ -185,11 +183,11 @@ prop_hotToWarmM ArbitraryPolicyArguments{..} seed = do noneWorse metrics pickedSet = do scores <- atomically $ case apaChurnMode of ChurnModeNormal -> do - hup <- upstreamyness <$> getHeaderMetrics metrics - bup <- fetchynessBlocks <$> getFetchedMetrics metrics + hup <- upstreamyness metrics + bup <- fetchynessBlocks metrics return $ Map.unionWith (+) hup bup - ChurnModeBulkSync -> fetchynessBytes <$> - getFetchedMetrics metrics + ChurnModeBulkSync -> + fetchynessBytes metrics let (picked, notPicked) = Map.partitionWithKey fn scores maxPicked = maximum $ Map.elems picked minNotPicked = minimum $ Map.elems notPicked @@ -209,7 +207,7 @@ prop_randomDemotion :: ArbitraryPolicyArguments prop_randomDemotion args seed = runSimOrThrow $ prop_randomDemotionM args seed --- Verifies that Tepid (formely hot) or failing peers are more likely to get +-- Verifies that Tepid (formerly hot) or failing peers are more likely to get -- demoted/forgotten. prop_randomDemotionM :: forall m. ( MonadSTM m @@ -222,16 +220,14 @@ prop_randomDemotionM ArbitraryPolicyArguments{..} seed = do let rng = mkStdGen seed rngVar <- newTVarIO rng cmVar <- newTVarIO apaChurnMode - hVar <- newTVarIO apaHeaderMetric - fVar <- newTVarIO apaFetchedMetric - + metrics <- newPeerMetric' apaHeaderMetric apaFetchedMetric + PeerMetricsConfiguration { maxEntriesToTrack = 180 } let policies = simplePeerSelectionPolicy rngVar (readTVar cmVar) metrics (ReconnectDelay 10) - metrics = PeerMetrics hVar fVar doDemotion numberOfTries policies Map.empty diff --git a/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection/MockEnvironment.hs b/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection/MockEnvironment.hs index 19ef85de45a..2d225f526ff 100644 --- a/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection/MockEnvironment.hs +++ b/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection/MockEnvironment.hs @@ -357,6 +357,7 @@ mockPeerSelectionActions' tracer let interpretScriptDelay NoDelay = 1 interpretScriptDelay ShortDelay = 60 interpretScriptDelay LongDelay = 600 + interpretScriptDelay (Delay a) = a -- not used by the generator done <- case demotion of Noop -> return True diff --git a/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection/PeerMetric.hs b/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection/PeerMetric.hs new file mode 100644 index 00000000000..23bcfbf9c1d --- /dev/null +++ b/ouroboros-network/test/Test/Ouroboros/Network/PeerSelection/PeerMetric.hs @@ -0,0 +1,422 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE NumericUnderscores #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Test.Ouroboros.Network.PeerSelection.PeerMetric where + + +import Control.Monad (when) +import Control.Monad.Class.MonadAsync +import qualified Control.Monad.Class.MonadSTM as LazySTM +import Control.Monad.Class.MonadSTM.Strict +import Control.Monad.Class.MonadTime +import Control.Monad.Class.MonadTimer +import Control.Tracer (Tracer (..), traceWith) + +import Data.Foldable (Foldable (foldl'), foldr') +import Data.List (sortOn) +import qualified Data.List.NonEmpty as NonEmpty +import qualified Data.Map.Merge.Strict as Map +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map +import qualified Data.Set as Set + +import Network.Mux.Trace (TraceLabelPeer (..)) + +import Ouroboros.Network.ConnectionId +import Ouroboros.Network.DeltaQ (SizeInBytes) +import Ouroboros.Network.PeerSelection.PeerMetric + +import Cardano.Slotting.Slot (SlotNo (..)) + +import Control.Monad.IOSim + +import Ouroboros.Network.Testing.Data.Script +import TestLib.Utils (AllProperty (..)) + +import Test.QuickCheck +import Test.Tasty (TestTree, testGroup) +import Test.Tasty.QuickCheck (testProperty) + + +tests :: TestTree +tests = testGroup "Ouroboros.Network.PeerSelection.PeerMetric" + [ testProperty "insert peer invariant" prop_insert_peer + , testProperty "metrics results are bounded" prop_metrics_are_bounded + , testProperty "size property" prop_bounded_size + ] + + + +newtype TestAddress = TestAddress Int + deriving (Show, Eq, Ord) + +instance Arbitrary TestAddress where + arbitrary = do + size <- choose (0, 20) + TestAddress . getPositive <$> resize size arbitrary + shrink (TestAddress addr) = + TestAddress . getPositive <$> shrink (Positive addr) + +data Event = + FetchedHeader TestAddress SlotNo + | FetchedBlock TestAddress SlotNo SizeInBytes + deriving Show + +eventPeer :: Event -> TestAddress +eventPeer (FetchedHeader peer _) = peer +eventPeer (FetchedBlock peer _ _) = peer + +eventSlot :: Event -> SlotNo +eventSlot (FetchedHeader _ slotNo) = slotNo +eventSlot (FetchedBlock _ slotNo _) = slotNo + +instance Arbitrary Event where + arbitrary = oneof [ FetchedHeader <$> arbitrary + <*> (SlotNo . getSmall . getPositive <$> arbitrary) + , FetchedBlock <$> arbitrary + <*> (SlotNo . getSmall . getPositive <$> arbitrary) + <*> (arbitrary `suchThat` \sizeInBytes -> 0 < sizeInBytes && sizeInBytes <= 2_000_000) + ] + shrink FetchedHeader {} = [] + shrink (FetchedBlock peer slotNo size) = + [ FetchedBlock peer slotNo size' + | size' <- shrink size + , size' > 0 + ] + + +newtype FixedScript = FixedScript { getFixedScript :: Script Event } + deriving Show + +-- | Order events by 'SlotNo' +-- +-- TODO: 'SizeInBytes' should be a function of 'SlotNo' +-- +mkFixedScript :: Script Event -> FixedScript +mkFixedScript (Script events) = FixedScript + . Script + $ NonEmpty.sortWith + eventSlot + events + +instance Arbitrary FixedScript where + -- Generated scripts must be long enough. We ignore first 100 results, to + -- avoid effects when the peer metrics has not enough data and thus it is + -- ignoring averages: 'Ouroboros.Network.PeerSelection.PeerMetric.adjustAvg'. + arbitrary = mkFixedScript + <$> resize 360 arbitrary + `suchThat` \(Script as) -> NonEmpty.length as > 100 + shrink (FixedScript script) = mkFixedScript `map` shrink script + + +mkTimedScript :: FixedScript -> TimedScript Event +mkTimedScript = go . fmap (\a -> (a, eventSlot a)) . getFixedScript + where + go :: Script (Event, SlotNo) -> TimedScript Event + go (Script script) = Script + . NonEmpty.fromList + . foldr' f [] + $ zip events ((Just . snd) `map` tail events ++ [Nothing]) + where + events = NonEmpty.toList script + + f :: ((Event, SlotNo), Maybe SlotNo) + -> [(Event, ScriptDelay)] + -> [(Event, ScriptDelay)] + f ((event, slotNo), nextSlotNo) as = + (event, Delay $ slotDiffTime slotNo nextSlotNo) : as + + slotToTime :: SlotNo -> Time + slotToTime (SlotNo slotNo) = Time $ realToFrac slotNo -- each slot takes 1s + + slotDiffTime :: SlotNo -> Maybe SlotNo -> DiffTime + slotDiffTime _slotNo Nothing = 0 + slotDiffTime slotNo (Just nextSlotNo) = slotToTime nextSlotNo + `diffTime` slotToTime slotNo + + +data PeerMetricsTrace = PeerMetricsTrace { + pmtPeer :: TestAddress, + pmtSlot :: SlotNo, + pmtUpstreamyness :: Map TestAddress Int, + pmtFetchynessBytes :: Map TestAddress Int, + pmtFetchynessBlocks :: Map TestAddress Int, + pmtJoinedAt :: Map TestAddress SlotNo + } + deriving Show + + +simulatePeerMetricScript + :: forall m. + ( MonadAsync m + , MonadTimer m + , MonadMonotonicTime m + ) + => Tracer m PeerMetricsTrace + -> PeerMetricsConfiguration + -> FixedScript + -> m () +simulatePeerMetricScript tracer config script = do + peerMetrics <- newPeerMetric config + let reporter :: ReportPeerMetrics m (ConnectionId TestAddress) + reporter = reportMetric config peerMetrics + v <- atomically (initScript timedScript) + go v peerMetrics reporter + where + timedScript :: TimedScript Event + timedScript = mkTimedScript script + + go :: LazySTM.TVar m (TimedScript Event) + -> PeerMetrics m TestAddress + -> ReportPeerMetrics m (ConnectionId TestAddress) + -> m () + go v peerMetrics reporter@ReportPeerMetrics { reportHeader, reportFetch } = do + (continue, (ev, delay)) <- (\case Left a -> (False, a) + Right a -> (True, a)) + <$> stepScriptOrFinish v + time <- getMonotonicTime + peer <- case ev of + FetchedHeader peer slotNo -> do + atomically $ traceWith reportHeader + $ TraceLabelPeer ConnectionId { + localAddress = TestAddress 0, + remoteAddress = peer + } + (slotNo, time) + return peer + + FetchedBlock peer slotNo size -> do + atomically $ traceWith reportFetch + $ TraceLabelPeer ConnectionId { + localAddress = TestAddress 0, + remoteAddress = peer + } + (size, slotNo, time) + return peer + + trace <- atomically $ + PeerMetricsTrace + <$> pure peer + <*> pure (eventSlot ev) + <*> upstreamyness peerMetrics + <*> fetchynessBytes peerMetrics + <*> fetchynessBlocks peerMetrics + <*> joinedPeerMetricAt peerMetrics + traceWith tracer trace + + when continue $ do + threadDelay (interpretScriptDelay delay) + go v peerMetrics reporter + + interpretScriptDelay NoDelay = 0 + interpretScriptDelay ShortDelay = 1 + interpretScriptDelay LongDelay = 3600 + interpretScriptDelay (Delay delay) = delay + + +-- | Check that newly added peer is never in the 20% worst performing peers (if +-- there are at least 5 results). +-- +prop_insert_peer :: FixedScript -> Property +prop_insert_peer script = + label ("length: " + ++ show ( len `div` band * band + , (len `div` band + 1) * band - 1 + )) $ + label (case trace of + [] -> "empty" + _ -> "non-empty") $ + getAllProperty $ foldMap go + $ zip (Nothing : Just `map` trace) trace + where + band = 50 + len = case getFixedScript script of Script as -> NonEmpty.length as + + config :: PeerMetricsConfiguration + config = PeerMetricsConfiguration { maxEntriesToTrack = 180 } + + sim :: IOSim s () + sim = simulatePeerMetricScript (Tracer traceM) config script + + -- drop first 90 slots + trace :: [PeerMetricsTrace] + trace = dropWhile (\a -> pmtSlot a <= firstSlot + 90) + $ selectTraceEventsDynamic (runSimTrace sim) + where + firstSlot = case script of + FixedScript (Script (a :| _)) -> eventSlot a + + go :: (Maybe PeerMetricsTrace, PeerMetricsTrace) + -> AllProperty + go (Nothing, _) = AllProperty (property True) + go (Just prev, res@PeerMetricsTrace { pmtPeer = peer, + pmtUpstreamyness = upstreamynessResults, + pmtFetchynessBytes = fetchynessBytesResults, + pmtFetchynessBlocks = fetchynessBlocksResults, + pmtJoinedAt = joinedAtResults + }) = + if peer `Map.member` pmtUpstreamyness prev + || peer `Map.member` pmtFetchynessBytes prev + || peer `Map.member` pmtFetchynessBlocks prev + then AllProperty $ property True + else AllProperty ( counterexample (show (res, prev)) + $ checkResult "upstreamyness" peer joinedAtResults upstreamynessResults) + <> AllProperty ( counterexample (show (res ,prev)) + $ checkResult "fetchynessBytes" peer joinedAtResults fetchynessBytesResults) + <> AllProperty ( counterexample (show (res, prev)) + $ checkResult "fetchynessBlocks" peer joinedAtResults fetchynessBlocksResults) + + -- check that the peer is not in 20% worst peers, but only if there are more + -- than 5 results. + checkResult :: String + -> TestAddress + -> Map TestAddress SlotNo + -> Map TestAddress Int + -> Property + checkResult name peer joinedAt m = + (\peers -> counterexample (name ++ ": peer (" ++ show peer ++ ") member of " + ++ show (peers, m')) + (Set.notMember peer peers)) + . Set.fromList + . map fst + . take (size `div` 5) + . sortOn (snd :: (a, (Int, Maybe SlotNo)) -> (Int, Maybe SlotNo)) + . Map.toList + $ m' + where + m' = Map.merge (Map.mapMissing (\_ a -> (a, Nothing))) + Map.dropMissing + (Map.zipWithMatched (\_ a b -> (a, Just b))) + m + joinedAt + size = Map.size m + + +-- | Check that the results are always positive. +-- +prop_metrics_are_bounded :: FixedScript -> Property +prop_metrics_are_bounded script = + getAllProperty $ foldMap go trace + where + config :: PeerMetricsConfiguration + config = PeerMetricsConfiguration { maxEntriesToTrack = 180 } + + sim :: IOSim s () + sim = simulatePeerMetricScript (Tracer traceM) config script + + trace :: [PeerMetricsTrace] + trace = selectTraceEventsDynamic (runSimTrace sim) + + safeMaximum :: Map a Int -> Int + safeMaximum m | Map.null m = 0 + safeMaximum m = maximum m + + -- We bound each result by twice the maximum value, that's very + -- conservative. Less conservative would be maximal value plus average of + -- last `maxEntriesToTrack` results or so. + bound :: Int + bound = + (2 *) + . safeMaximum + . Map.fromListWith (+) + . foldr (\a as -> case a of + FetchedHeader peer _ -> (peer, 1) : as + FetchedBlock peer _ _ -> (peer, 1) : as) + [] + $ case getFixedScript script of + Script as -> as + + fetchyness_bytes_bound :: Int + fetchyness_bytes_bound = + (2 *) + . safeMaximum + . fmap fromIntegral + . Map.fromListWith (+) + . foldr (\a as -> case a of + FetchedHeader {} -> as + FetchedBlock peer _ bytes -> (peer, bytes) : as) + [] + $ case getFixedScript script of + Script as -> as + + + go :: PeerMetricsTrace + -> AllProperty + go PeerMetricsTrace { pmtUpstreamyness, + pmtFetchynessBytes, + pmtFetchynessBlocks + } = + foldMap (\a -> AllProperty + $ counterexample + (show ("upstreameness", a, bound, pmtUpstreamyness)) + (a >= 0)) + pmtUpstreamyness + <> foldMap (\a -> AllProperty + $ counterexample + (show ("fetchynessBytes", a, fetchyness_bytes_bound, pmtFetchynessBytes)) + (a >= 0 && a <= fetchyness_bytes_bound)) + pmtFetchynessBytes + <> foldMap (\a -> AllProperty + $ counterexample + (show ("fetchynessBlocks", a, bound)) + (a >= 0)) + pmtFetchynessBlocks + + +-- | Check that the result are bounded. +-- +-- The bound is 'maxEntriesToTrack' times number of peers in the simulation. +-- This could be lowered by computing number of peers in each +-- 'maxEntriesToTrack' slots window. +-- +prop_bounded_size :: Positive Int -> FixedScript -> Property +prop_bounded_size (Positive maxEntriesToTrack) script = + getAllProperty $ foldMap go trace + where + config :: PeerMetricsConfiguration + config = PeerMetricsConfiguration { maxEntriesToTrack } + + sim :: IOSim s () + sim = simulatePeerMetricScript (Tracer traceM) config script + + trace :: [PeerMetricsTrace] + trace = selectTraceEventsDynamic (runSimTrace sim) + + number_of_peers :: Int + number_of_peers = Set.size + . Set.fromList + . foldl' (\as a -> eventPeer a : as) [] + $ case getFixedScript script of + Script as -> as + + bound :: Int + bound = maxEntriesToTrack * number_of_peers + + go :: PeerMetricsTrace -> AllProperty + go PeerMetricsTrace { + pmtUpstreamyness, + pmtFetchynessBytes, + pmtFetchynessBlocks + } = AllProperty ( counterexample + ( "upstreamyness: " + ++ show (Map.size pmtUpstreamyness) + ++ " ≰ " + ++ show maxEntriesToTrack ) + ( Map.size pmtUpstreamyness <= bound ) + ) + <> AllProperty ( counterexample + ( "fetchynessBytes: " + ++ show (Map.size pmtFetchynessBytes) + ++ " ≰ " + ++ show maxEntriesToTrack) + ( Map.size pmtFetchynessBytes <= bound ) + ) + <> AllProperty ( counterexample + ( "fetchynessBlocks: " + ++ show (Map.size pmtFetchynessBlocks) + ++ " ≰ " + ++ show maxEntriesToTrack) + ( Map.size pmtFetchynessBlocks <= bound ) + )