Skip to content

Commit

Permalink
Merge #3915
Browse files Browse the repository at this point in the history
3915: PeerMetrics: interpolate results when inserting a new peer r=coot a=coot

# Description

This PR makes sure that newly added peers to `PeerMetrics` are never in 20% of worst performing peers.  Currently the churn interval and number of records kept by `PeerMetrics` are in sync, peers added just before the churn event can be removed as they might have too few results.  In the future the churn interval and the number of records kept by `PeerMetrics` might diverge, and thus it's more important to make sure that newly added peers are not immediately removed.

This PR solves this problem by keeping track of average results when a peer joined the leader board and is using linear regression to adjust peer results for the leader board period where the peer was absent from the leader board.

Fixes #3861



Co-authored-by: Marcin Szamotulski <[email protected]>
  • Loading branch information
iohk-bors[bot] and coot authored Aug 17, 2022
2 parents 15b56d1 + 977e73f commit 79998cf
Show file tree
Hide file tree
Showing 15 changed files with 1,051 additions and 137 deletions.
4 changes: 4 additions & 0 deletions network-mux/src/Network/Mux/Trace.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import Text.Printf
import Control.Exception hiding (throwIO)
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime
import Data.Bifunctor (Bifunctor (..))
import Data.Word
import GHC.Generics (Generic (..))
import Quiet (Quiet (..))
Expand Down Expand Up @@ -101,6 +102,9 @@ handleIOException errorMsg e = throwIO MuxError {
data TraceLabelPeer peerid a = TraceLabelPeer peerid a
deriving (Eq, Functor, Show)

instance Bifunctor TraceLabelPeer where
bimap f g (TraceLabelPeer a b) = TraceLabelPeer (f a) (g b)

-- | Type used for tracing mux events.
--
data WithMuxBearer peerid a = WithMuxBearer {
Expand Down
7 changes: 4 additions & 3 deletions ouroboros-consensus/src/Ouroboros/Consensus/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import Ouroboros.Network.BlockFetch (BlockFetchConfiguration (..))
import qualified Ouroboros.Network.Diffusion as Diffusion
import qualified Ouroboros.Network.Diffusion.NonP2P as NonP2P
import qualified Ouroboros.Network.Diffusion.P2P as P2P
import qualified Ouroboros.Network.Diffusion.Policies as Diffusion
import Ouroboros.Network.Magic
import Ouroboros.Network.NodeToClient (ConnectionId, LocalAddress,
LocalSocket, NodeToClientVersionData (..), combineVersions,
Expand All @@ -77,7 +78,7 @@ import Ouroboros.Network.NodeToNode (DiffusionMode (..),
defaultMiniProtocolParameters)
import Ouroboros.Network.PeerSelection.LedgerPeers
(LedgerPeersConsensusInterface (..))
import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics (..),
import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics,
newPeerMetric, reportMetric)
import Ouroboros.Network.Protocol.Limits (shortWait)
import Ouroboros.Network.RethrowPolicy
Expand Down Expand Up @@ -347,7 +348,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
nodeKernel <- initNodeKernel nodeKernelArgs
rnNodeKernelHook registry nodeKernel

peerMetrics <- newPeerMetric
peerMetrics <- newPeerMetric Diffusion.peerMetricsConfiguration
let ntnApps = mkNodeToNodeApps nodeKernelArgs nodeKernel peerMetrics
ntcApps = mkNodeToClientApps nodeKernelArgs nodeKernel
(apps, appsExtra) = mkDiffusionApplications
Expand Down Expand Up @@ -389,7 +390,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
(NTN.defaultCodecs codecConfig version)
NTN.byteLimits
llrnChainSyncTimeout
(reportMetric peerMetrics)
(reportMetric Diffusion.peerMetricsConfiguration peerMetrics)
(NTN.mkHandlers nodeKernelArgs nodeKernel)

mkNodeToClientApps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,5 +569,5 @@ data InboundGovernorTrace peerAddr
| TrRemoteState !(Map (ConnectionId peerAddr) RemoteSt)
| TrUnexpectedlyFalseAssertion !(IGAssertionLocation peerAddr)
-- ^ This case is unexpected at call site.
| TrInboundGovernorError !SomeException
| TrInboundGovernorError !SomeException
deriving Show
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ module Ouroboros.Network.Testing.Data.Script
, initScript
, stepScript
, stepScriptSTM
, stepScriptOrFinish
, stepScriptOrFinishSTM
, initScript'
, stepScript'
, stepScriptSTM'
Expand All @@ -26,6 +28,7 @@ module Ouroboros.Network.Testing.Data.Script
, interpretPickScript
) where

import Data.Functor (($>))
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as NonEmpty
import Data.Set (Set)
Expand Down Expand Up @@ -76,6 +79,20 @@ stepScriptSTM scriptVar = do
x':xs' -> LazySTM.writeTVar scriptVar (Script (x' :| xs'))
return x

-- | Return 'Left' if it was the last step, return 'Right' if the script can
-- continue.
--
stepScriptOrFinish :: MonadSTM m => TVar m (Script a) -> m (Either a a)
stepScriptOrFinish scriptVar = atomically (stepScriptOrFinishSTM scriptVar)

stepScriptOrFinishSTM :: MonadSTM m => TVar m (Script a) -> STM m (Either a a)
stepScriptOrFinishSTM scriptVar = do
Script (x :| xs) <- LazySTM.readTVar scriptVar
case xs of
[] -> return (Left x)
x':xs' -> writeTVar scriptVar (Script (x' :| xs'))
$> Right x

initScript' :: MonadSTM m => Script a -> m (TVar m (Script a))
initScript' = newTVarIO

Expand Down Expand Up @@ -110,7 +127,7 @@ instance Arbitrary a => Arbitrary (Script a) where

type TimedScript a = Script (a, ScriptDelay)

data ScriptDelay = NoDelay | ShortDelay | LongDelay
data ScriptDelay = NoDelay | ShortDelay | LongDelay | Delay DiffTime
deriving (Eq, Show)

instance Arbitrary ScriptDelay where
Expand All @@ -121,6 +138,7 @@ instance Arbitrary ScriptDelay where
shrink LongDelay = [NoDelay, ShortDelay]
shrink ShortDelay = [NoDelay]
shrink NoDelay = []
shrink (Delay _) = []

playTimedScript :: (MonadAsync m, MonadTimer m)
=> Tracer m a -> TimedScript a -> m (TVar m a)
Expand All @@ -135,9 +153,10 @@ playTimedScript tracer (Script ((x0,d0) :| script)) = do
| (x,d) <- script ]
return v
where
interpretScriptDelay NoDelay = 0
interpretScriptDelay ShortDelay = 1
interpretScriptDelay LongDelay = 3600
interpretScriptDelay NoDelay = 0
interpretScriptDelay ShortDelay = 1
interpretScriptDelay LongDelay = 3600
interpretScriptDelay (Delay delay) = delay


--
Expand Down
1 change: 1 addition & 0 deletions ouroboros-network/ouroboros-network.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ test-suite test
Test.Ouroboros.Network.PeerSelection.Json
Test.Ouroboros.Network.PeerSelection.MockEnvironment
Test.Ouroboros.Network.PeerSelection.PeerGraph
Test.Ouroboros.Network.PeerSelection.PeerMetric
Test.Ouroboros.Network.NodeToNode.Version
Test.Ouroboros.Network.NodeToClient.Version
Test.Ouroboros.Network.ShrinkCarefully
Expand Down
22 changes: 21 additions & 1 deletion ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ import Ouroboros.Network.PeerSelection.Governor.Types
PeerSelectionCounters (..), TracePeerSelection (..))
import Ouroboros.Network.PeerSelection.LedgerPeers
(UseLedgerAfter (..), withLedgerPeers)
import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics (..))
import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics)
import Ouroboros.Network.PeerSelection.PeerStateActions
(PeerConnectionHandle, PeerSelectionActionsTrace (..),
PeerStateActionsArguments (..), withPeerStateActions)
Expand Down Expand Up @@ -231,6 +231,20 @@ data ArgumentsExtra m = ArgumentsExtra {
-- is using @TIME_WAIT@.
--
, daTimeWaitTimeout :: DiffTime

-- | Churn interval between churn events in deadline mode. A small fuzz
-- is added (max 10 minutes) so that not all nodes churn at the same time.
--
-- By default it is set to 3300 seconds.
--
, daDeadlineChurnInterval :: DiffTime

-- | Churn interval between churn events in bulk sync mode. A small fuzz
-- is added (max 1 minute) so that not all nodes churn at the same time.
--
-- By default it is set to 300 seconds.
--
, daBulkChurnInterval :: DiffTime
}

--
Expand Down Expand Up @@ -606,6 +620,8 @@ runM Interfaces
, daReadUseLedgerAfter
, daProtocolIdleTimeout
, daTimeWaitTimeout
, daDeadlineChurnInterval
, daBulkChurnInterval
}
Applications
{ daApplicationInitiatorMode
Expand Down Expand Up @@ -879,6 +895,8 @@ runM Interfaces
Async.withAsync
(Governor.peerChurnGovernor
dtTracePeerSelectionTracer
daDeadlineChurnInterval
daBulkChurnInterval
daPeerMetrics
churnModeVar
churnRng
Expand Down Expand Up @@ -1026,6 +1044,8 @@ runM Interfaces
Async.withAsync
(Governor.peerChurnGovernor
dtTracePeerSelectionTracer
daDeadlineChurnInterval
daBulkChurnInterval
daPeerMetrics
churnModeVar
churnRng
Expand Down
51 changes: 43 additions & 8 deletions ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadTime

import Data.List (sortOn, unfoldr)
import qualified Data.Map.Merge.Strict as Map
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Word (Word32)
Expand All @@ -34,14 +36,38 @@ deactivateTimeout = 300
-- | Timeout for 'spsCloseConnectionTimeout'.
--
-- This timeout depends on 'KeepAlive' and 'TipSample' timeouts. 'KeepAlive'
-- keeps agancy most of the time, but 'TipSample' can give away its agency for
-- keeps agency most of the time, but 'TipSample' can give away its agency for
-- longer periods of time. Here we allow it to get 6 blocks (assuming a new
-- block every @20s@).
--
closeConnectionTimeout :: DiffTime
closeConnectionTimeout = 120


-- | Number of events tracked by 'PeerMetrics'. This corresponds to one hour of
-- blocks on mainnet.
--
-- TODO: issue #3866
--
peerMetricsConfiguration :: PeerMetricsConfiguration
peerMetricsConfiguration = PeerMetricsConfiguration {
maxEntriesToTrack = 180
}

-- | Merge two dictionaries where values of the first one are obligatory, while
-- the second one are optional.
--
optionalMerge
:: Ord k
=> Map k a
-> Map k b
-> Map k (a, Maybe b)
optionalMerge = Map.merge (Map.mapMissing (\_ a -> (a, Nothing)))
Map.dropMissing
(Map.zipWithMatched (\_ a b -> (a, Just b)))



simplePeerSelectionPolicy :: forall m peerAddr.
( MonadSTM m
, Ord peerAddr
Expand Down Expand Up @@ -88,22 +114,31 @@ simplePeerSelectionPolicy rngVar getChurnMode metrics errorDelay = PeerSelection
mode <- getChurnMode
scores <- case mode of
ChurnModeNormal -> do
hup <- upstreamyness <$> getHeaderMetrics metrics
bup <- fetchynessBlocks <$> getFetchedMetrics metrics
return $ Map.unionWith (+) hup bup
jpm <- joinedPeerMetricAt metrics
hup <- upstreamyness metrics
bup <- fetchynessBlocks metrics
return $ Map.unionWith (+) hup bup `optionalMerge` jpm

ChurnModeBulkSync -> do
jpm <- joinedPeerMetricAt metrics
bup <- fetchynessBytes metrics
return $ bup `optionalMerge` jpm

ChurnModeBulkSync ->
fetchynessBytes <$> getFetchedMetrics metrics
available' <- addRand available (,)
return $ Set.fromList
. map fst
. take pickNum
-- order the results, resolve the ties using slot number when
-- a peer joined the leader board.
--
-- note: this will prefer to preserve newer peers, whose results
-- less certain than peers who entered leader board earlier.
. sortOn (\(peer, rn) ->
(Map.findWithDefault 0 peer scores, rn))
(Map.findWithDefault (0, Nothing) peer scores, rn))
. Map.assocs
$ available'

-- Randomly pick peers to demote, peeers with knownPeerTepid set are twice
-- Randomly pick peers to demote, peers with knownPeerTepid set are twice
-- as likely to be demoted.
warmDemotionPolicy :: PickPolicy peerAddr m
warmDemotionPolicy _ _ isTepid available pickNum = do
Expand Down
21 changes: 9 additions & 12 deletions ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -589,14 +589,19 @@ peerChurnGovernor :: forall m peeraddr.
, MonadDelay m
)
=> Tracer m (TracePeerSelection peeraddr)
-> DiffTime
-- ^ the base for churn interval in the deadline mode.
-> DiffTime
-- ^ the base for churn interval in the bulk sync mode.
-> PeerMetrics m peeraddr
-> StrictTVar m ChurnMode
-> StdGen
-> STM m FetchMode
-> PeerSelectionTargets
-> StrictTVar m PeerSelectionTargets
-> m Void
peerChurnGovernor tracer _metrics churnModeVar inRng getFetchMode base peerSelectionVar = do
peerChurnGovernor tracer deadlineChurnInterval bulkChurnInterval
_metrics churnModeVar inRng getFetchMode base peerSelectionVar = do
-- Wait a while so that not only the closest peers have had the time
-- to become warm.
startTs0 <- getMonotonicTime
Expand Down Expand Up @@ -657,8 +662,7 @@ peerChurnGovernor tracer _metrics churnModeVar inRng getFetchMode base peerSelec
-- Short delay, we may have no active peers right now
threadDelay 1

-- Pick new active peer(s) based on the best performing established
-- peers.
-- Pick new active peer(s).
atomically $ increaseActivePeers churnMode

-- Give the promotion process time to start
Expand Down Expand Up @@ -703,18 +707,11 @@ peerChurnGovernor tracer _metrics churnModeVar inRng getFetchMode base peerSelec


longDelay :: StdGen -> DiffTime -> m StdGen
longDelay = fuzzyDelay' churnInterval 600
longDelay = fuzzyDelay' deadlineChurnInterval 600


shortDelay :: StdGen -> DiffTime -> m StdGen
shortDelay = fuzzyDelay' churnIntervalBulk 60

-- The min time between running the churn governor.
churnInterval :: DiffTime
churnInterval = 3300

churnIntervalBulk :: DiffTime
churnIntervalBulk = 300
shortDelay = fuzzyDelay' bulkChurnInterval 60

-- Replace 20% or at least on peer every churnInterval.
decrease :: Int -> Int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,8 +593,11 @@ data TracePeerSelection peeraddr =
| TracePromoteColdDone Int Int peeraddr
-- | target active, actual active, selected peers
| TracePromoteWarmPeers Int Int (Set peeraddr)
-- | local per-group (target active, actual active), selected peers
| TracePromoteWarmLocalPeers [(Int, Int)] (Set peeraddr)
-- | Promote local peers to warm
| TracePromoteWarmLocalPeers
[(Int, Int)] -- ^ local per-group `(target active, actual active)`,
-- only limited to groups which are below their target.
(Set peeraddr) -- ^ selected peers
-- | target active, actual active, peer, reason
| TracePromoteWarmFailed Int Int peeraddr SomeException
-- | target active, actual active, peer
Expand Down
Loading

0 comments on commit 79998cf

Please sign in to comment.