diff --git a/.gitignore b/.gitignore index f02fd512172..575fb9c6e4f 100644 --- a/.gitignore +++ b/.gitignore @@ -37,10 +37,12 @@ hie.yaml ### Direnv ### .envrc-local .envrc-override +.direnv/flake-profile +.direnv/flake-profile-1-link ### auto-generated faulty JSON golden tests ### *.faulty.json -*.faulty.json +*.faulty.reencoded.json lib/shelley/test/data/balanceTx/**/actual ### Release scripts output @@ -49,3 +51,8 @@ lib/shelley/test/data/balanceTx/**/actual ### Docs build /_build/ .vscode/settings.json + +## local ignored space +ignore-me + + diff --git a/lib/core/src/Cardano/Wallet/Primitive/Model.hs b/lib/core/src/Cardano/Wallet/Primitive/Model.hs index eb06e695899..d107057f3e5 100644 --- a/lib/core/src/Cardano/Wallet/Primitive/Model.hs +++ b/lib/core/src/Cardano/Wallet/Primitive/Model.hs @@ -41,6 +41,7 @@ module Cardano.Wallet.Primitive.Model , BlockData (..) , firstHeader + , lastHeader -- * Accessors , currentTip @@ -388,6 +389,8 @@ firstHeader :: BlockData m addr txs s -> BlockHeader firstHeader (List xs) = header $ NE.head xs firstHeader (Summary _ BlockSummary{from}) = from +-- | Last 'BlockHeader' of the blocks represented +-- by 'BlockData'. lastHeader :: BlockData m addr txs s -> BlockHeader lastHeader (List xs) = header $ NE.last xs lastHeader (Summary _ BlockSummary{to}) = to diff --git a/lib/core/src/Ouroboros/Network/Client/Wallet.hs b/lib/core/src/Ouroboros/Network/Client/Wallet.hs index a5d47bb4ee9..427656fe77c 100644 --- a/lib/core/src/Ouroboros/Network/Client/Wallet.hs +++ b/lib/core/src/Ouroboros/Network/Client/Wallet.hs @@ -4,10 +4,14 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE Rank2Types #-} +{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE ViewPatterns #-} -- | -- Copyright: © 2020 IOHK @@ -24,6 +28,9 @@ module Ouroboros.Network.Client.Wallet -- * ChainSyncWithBlocks , chainSyncWithBlocks + , PipeliningStrategy(..) + , thousandPipeliningStrategy + , tunedForMainnetPipeliningStrategy -- * LocalTxSubmission , LocalTxSubmissionCmd (..) @@ -74,6 +81,8 @@ import Data.List.NonEmpty ( NonEmpty (..) ) import Data.Ord ( comparing ) +import Data.Text + ( Text ) import Data.Void ( Void ) import Network.TypedProtocol.Pipelined @@ -114,6 +123,7 @@ import Ouroboros.Network.Protocol.LocalTxSubmission.Type ( SubmitResult (..) ) import qualified Data.List.NonEmpty as NE +import qualified Data.Text as T import qualified Ouroboros.Network.Protocol.ChainSync.ClientPipelined as P import qualified Ouroboros.Network.Protocol.LocalStateQuery.Client as LSQ @@ -225,6 +235,32 @@ chainSyncFollowTip toCardanoEra onTipUpdate = type RequestNextStrategy m n block = P.ClientPipelinedStIdle n block (Point block) (Tip block) m Void +-- | How to drive pipelining size from the block height +data PipeliningStrategy block = PipeliningStrategy + { getPipeliningSize :: block -> Natural + , pipeliningStrategyName :: Text + } + +instance Show (PipeliningStrategy block) where + show PipeliningStrategy{pipeliningStrategyName} + = T.unpack pipeliningStrategyName + +thousandPipeliningStrategy :: PipeliningStrategy block +thousandPipeliningStrategy = PipeliningStrategy {..} + where + getPipeliningSize _ = 1_000 + pipeliningStrategyName = "Constant pipelining of 1000 blocks" + +tunedForMainnetPipeliningStrategy :: HasHeader block => PipeliningStrategy block +tunedForMainnetPipeliningStrategy = PipeliningStrategy {..} + where + getPipeliningSize (blockNo -> n) + | n <= 5_200_000 = 1000 + | n <= 6_100_000 = 200 + | n <= 6_500_000 = 125 + | otherwise = 100 + pipeliningStrategyName = "Variable pipelining suited for mainnet blockchain" + -- | Helper type for the different ways we handle rollbacks. -- -- Helps remove some boilerplate. @@ -278,9 +314,10 @@ data LocalRollbackResult block chainSyncWithBlocks :: forall m block. (Monad m, MonadSTM m, MonadThrow m, HasHeader block) => Tracer m (ChainSyncLog block (Point block)) + -> PipeliningStrategy block -> ChainFollower m (Point block) (Tip block) (NonEmpty block) -> ChainSyncClientPipelined block (Point block) (Tip block) m Void -chainSyncWithBlocks tr chainFollower = +chainSyncWithBlocks tr pipeliningStrategy chainFollower = ChainSyncClientPipelined clientStNegotiateIntersection where -- Return the _number of slots between two tips. @@ -381,12 +418,17 @@ chainSyncWithBlocks tr chainFollower = let blocks' = NE.reverse (block :| blocks) traceWith tr $ MsgChainRollForward blocks' (getTipPoint tip) handleRollforward blocks' tip - let distance = tipDistance (blockNo block) tip traceWith tr $ MsgTipDistance distance let strategy = if distance <= 1 then oneByOne - else pipeline (fromIntegral $ min distance 1000) Zero + else pipeline + (fromIntegral + . min distance + . getPipeliningSize pipeliningStrategy + $ block + ) + Zero clientStIdle strategy , P.recvMsgRollBackward = \point tip -> do diff --git a/lib/shelley/bench/latency-bench.hs b/lib/shelley/bench/latency-bench.hs index 0edd0ab1970..00f61aa4af8 100644 --- a/lib/shelley/bench/latency-bench.hs +++ b/lib/shelley/bench/latency-bench.hs @@ -102,6 +102,8 @@ import Network.Wai.Middleware.Logging ( ApiLog (..) ) import Numeric.Natural ( Natural ) +import Ouroboros.Network.Client.Wallet + ( tunedForMainnetPipeliningStrategy ) import System.Directory ( createDirectory ) import System.FilePath @@ -474,6 +476,7 @@ withShelleyServer tracers action = do serveWallet (NodeSource conn vData) np + tunedForMainnetPipeliningStrategy (SomeNetworkDiscriminant $ Proxy @'Mainnet) tracers (SyncTolerance 10) diff --git a/lib/shelley/bench/restore-bench.hs b/lib/shelley/bench/restore-bench.hs index 9ceab412ef1..61be04040aa 100644 --- a/lib/shelley/bench/restore-bench.hs +++ b/lib/shelley/bench/restore-bench.hs @@ -9,12 +9,18 @@ {-# LANGUAGE KindSignatures #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE MultiWayIf #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NoMonoLocalBinds #-} {-# LANGUAGE Rank2Types #-} {-# LANGUAGE ScopedTypeVariables #-} {-# OPTIONS_GHC -fno-warn-orphans #-} {-# LANGUAGE TypeApplications #-} +{-# OPTIONS_GHC -Wno-unused-local-binds #-} +{-# OPTIONS_GHC -Wno-unused-do-bind #-} + +{- HLINT ignore "Redundant pure" -} + -- | Benchmark measuring how long restoration takes for different wallets. -- @@ -82,7 +88,6 @@ import Cardano.Wallet.Network , ChainSyncLog (..) , NetworkLayer (..) ) - import Cardano.Wallet.Primitive.AddressDerivation ( Depth (..) , NetworkDiscriminant (..) @@ -194,6 +199,8 @@ import Data.Text ( Text ) import Data.Text.Class ( ToText (..) ) +import Data.Time + ( UTCTime, diffUTCTime ) import Data.Time.Clock.POSIX ( POSIXTime, getCurrentTime, utcTimeToPOSIXSeconds ) import Data.Word @@ -218,6 +225,8 @@ import GHC.TypeLits ( KnownNat, Nat, natVal ) import Numeric ( fromRat, showFFloat ) +import Ouroboros.Network.Client.Wallet + ( PipeliningStrategy, tunedForMainnetPipeliningStrategy ) import Say ( sayErr ) import System.Exit @@ -236,7 +245,7 @@ import UnliftIO.Exception ( evaluate, throwString ) import UnliftIO.Temporary ( withSystemTempFile ) - + import qualified Cardano.Wallet as W import qualified Cardano.Wallet.DB.Sqlite as Sqlite import qualified Cardano.Wallet.Primitive.AddressDerivation.Byron as Byron @@ -251,7 +260,6 @@ import qualified Data.List.NonEmpty as NE import qualified Data.Text as T import qualified Data.Text.Encoding as T - main :: IO () main = execBenchWithNode argsNetworkConfig cardanoRestoreBench >>= exitWith @@ -267,10 +275,10 @@ argsNetworkConfig args = case argNetworkName args of TestnetConfig (argsNetworkDir args "genesis-byron.json") -- | Run all available benchmarks. -cardanoRestoreBench - :: Trace IO Text - -> NetworkConfiguration - -> CardanoNodeConn +cardanoRestoreBench + :: Trace IO Text + -> NetworkConfiguration + -> CardanoNodeConn -> IO () cardanoRestoreBench tr c socketFile = do (SomeNetworkDiscriminant networkProxy, np, vData, _b) @@ -280,13 +288,13 @@ cardanoRestoreBench tr c socketFile = do let network = networkDescription networkProxy sayErr $ "Network: " <> network - prepareNode (trMessageText tr) networkProxy socketFile np vData - let benchRestoreMultipleWallets nWallets target = do - let targetStr = T.pack $ showFFloat Nothing + let benchRestoreMultipleWallets nWallets target pipelinings = do + let targetStr = T.pack $ showFFloat Nothing (fromRational @Double $ getPercentage target) "" bench_restoration @_ @ShelleyKey + pipelinings networkProxy (trMessageText tr) walletTr @@ -301,9 +309,10 @@ cardanoRestoreBench tr c socketFile = do target benchmarksSeq - let benchRestoreRndWithOwnership p = do + let benchRestoreRndWithOwnership p pipelinings = do let benchname = showPercentFromPermyriad p <> "-percent-rnd" bench_restoration + pipelinings networkProxy (trMessageText tr) walletTr @@ -317,9 +326,10 @@ cardanoRestoreBench tr c socketFile = do (unsafeMkPercentage 1) benchmarksRnd - let benchRestoreSeqWithOwnership p = do + let benchRestoreSeqWithOwnership p pipelinings = do let benchname = showPercentFromPermyriad p <> "-percent-seq" bench_restoration + pipelinings networkProxy (trMessageText tr) walletTr @@ -332,9 +342,10 @@ cardanoRestoreBench tr c socketFile = do True -- Write progress to .timelog file (unsafeMkPercentage 1) benchmarksSeq - let benchRestoreBaseline = do + let benchRestoreBaseline pipelinings = do let benchname = "baseline" bench_baseline_restoration + pipelinings networkProxy (trMessageText tr) walletTr @@ -345,20 +356,23 @@ cardanoRestoreBench tr c socketFile = do True (unsafeMkPercentage 1) - - runBenchmarks - [ - benchRestoreBaseline - -- We restore /to/ a percentage that is low enough to be fast, - -- but high enough to give an accurate enough indication of the - -- to-100% time. - , benchRestoreMultipleWallets 1 (unsafeMkPercentage 0.1) - , benchRestoreMultipleWallets 10 (unsafeMkPercentage 0.01) - , benchRestoreMultipleWallets 100 (unsafeMkPercentage 0.01) - + runBenchmarks [ + benchRestoreBaseline tunedForMainnetPipeliningStrategy + -- -- We restore /to/ a percentage that is low enough to be fast, + -- -- but high enough to give an accurate enough indication of the + -- -- to-100% time. , benchRestoreSeqWithOwnership (Proxy @0) + tunedForMainnetPipeliningStrategy , benchRestoreSeqWithOwnership (Proxy @1) + tunedForMainnetPipeliningStrategy , benchRestoreRndWithOwnership (Proxy @1) + tunedForMainnetPipeliningStrategy + , benchRestoreMultipleWallets 1 (unsafeMkPercentage 0.1) + tunedForMainnetPipeliningStrategy + , benchRestoreMultipleWallets 10 (unsafeMkPercentage 0.01) + tunedForMainnetPipeliningStrategy + , benchRestoreMultipleWallets 100 (unsafeMkPercentage 0.01) + tunedForMainnetPipeliningStrategy ] where walletRnd @@ -377,7 +391,7 @@ cardanoRestoreBench tr c socketFile = do walletSeq :: Text - -> ((ShelleyKey 'RootK XPrv, Passphrase "encryption") + -> ((ShelleyKey 'RootK XPrv, Passphrase "encryption") -> AddressPoolGap -> s ) -> (WalletId, WalletName, s) @@ -652,7 +666,8 @@ bench_baseline_restoration ( NetworkDiscriminantVal n , HasNetworkId n ) - => Proxy n + => PipeliningStrategy (CardanoBlock StandardCrypto) + -> Proxy n -> Tracer IO (BenchmarkLog n) -> Trace IO Text -- ^ For wallet tracing @@ -668,14 +683,18 @@ bench_baseline_restoration -- ^ Target sync progress -> IO SomeBenchmarkResults bench_baseline_restoration - proxy tr wlTr socket np vData benchName traceToDisk targetSync = do - putStrLn $ "*** " ++ T.unpack benchName - withRestoreEnvironment doRestore + pipeliningStrat proxy tr wlTr socket np vData benchName + traceToDisk targetSync = do + putStrLn $ "*** " ++ T.unpack benchName + withRestoreEnvironment doRestore where withRestoreEnvironment action = - withWalletLayerTracer benchName traceToDisk $ \progressTrace -> - withNetworkLayer networkTrace networkId np socket vData sTol $ \nw -> - action progressTrace nw + withWalletLayerTracer benchName pipeliningStrat traceToDisk $ + \progressTrace -> withNetworkLayer + networkTrace + pipeliningStrat + networkId np socket vData sTol $ + \nw -> action progressTrace nw where networkId = networkIdVal proxy networkTrace = trMessageText wlTr @@ -689,21 +708,25 @@ bench_baseline_restoration synchronizer <- async $ chainSync nw nullTracer $ ChainFollower - { readLocalTip = readTVarIO chainPointT + { readLocalTip = readTVarIO chainPointT , rollForward = \blocks ntip -> do - atomically $ writeTVar chainPointT + atomically $ writeTVar chainPointT [chainPointFromBlockHeader ntip] let (ntxs, hss) = NE.unzip $ numberOfTransactionsInBlock <$> blocks (heights, slots) = NE.unzip hss - - traceWith progressTrace $ Just $ NE.last heights - seq (sum ntxs) $ atomically $ writeTVar wait - $ Just $ NE.last slots + tip = NE.last heights + traceWith progressTrace $ Just tip + seq (sum ntxs) + . atomically + . writeTVar wait + . Just + $ NE.last slots , rollBackward = pure } (_, restorationTime) <- bench "restoration" $ - reportProgress nw tr targetSync (atomically $ readTVar wait >>= maybe retry pure) + reportProgress nw tr targetSync + (atomically $ readTVar wait >>= maybe retry pure) saveBenchmarkPoints benchName restorationTime cancel synchronizer pure $ SomeBenchmarkResults @@ -729,7 +752,8 @@ bench_restoration , Buildable results , ToJSON results ) - => Proxy n + => PipeliningStrategy (CardanoBlock StandardCrypto) + -> Proxy n -> Tracer IO (BenchmarkLog n) -> Trace IO Text -- ^ For wallet tracing -> CardanoNodeConn -- ^ Socket path @@ -748,49 +772,53 @@ bench_restoration -> IO results) -> IO SomeBenchmarkResults bench_restoration - proxy tr wlTr socket np vData benchname wallets traceToDisk targetSync benchmarks = do + pipeliningStrat proxy tr wlTr socket np vData benchname wallets traceToDisk + targetSync benchmarks = do putStrLn $ "*** " ++ T.unpack benchname let networkId = networkIdVal proxy let tl = newTransactionLayer @k networkId - withNetworkLayer (trMessageText wlTr) networkId np socket vData sTol $ \nw' -> do - let gp = genesisParameters np - let convert = fromCardanoBlock gp - let nw = convert <$> nw' - let ti = neverFails "bench db shouldn't forecast into future" - $ timeInterpreter nw - withBenchDBLayer @s @k ti wlTr $ \db -> do - withWalletLayerTracer benchname traceToDisk $ \progressTrace -> do - let tracer = - trMessageText wlTr <> - contramap walletWorkerLogToBlockHeight progressTrace - let w = WalletLayer - tracer (emptyGenesis gp, np, sTol) nw tl db - - forM_ wallets $ \(wid, wname, s) -> do - _ <- unsafeRunExceptT $ W.createWallet w wid wname s - void - $ forkIO - $ unsafeRunExceptT - $ W.restoreWallet @_ @s @k w wid - - -- NOTE: This is now the time to restore /all/ wallets. - (_, restorationTime) <- bench "restoration" $ do - waitForWalletsSyncTo - targetSync - tr - proxy - w - (map fst' wallets) - gp - vData - - let (wid0, wname0, _) = head wallets - results <- - benchmarks proxy w wid0 wname0 benchname restorationTime - saveBenchmarkPoints benchname results - forM_ wallets $ \(wid, _, _) -> - unsafeRunExceptT (W.deleteWallet w wid) - pure $ SomeBenchmarkResults results + let gp = genesisParameters np + withNetworkLayer (trMessageText wlTr) pipeliningStrat + networkId np socket vData sTol $ \nw' -> do + let convert = fromCardanoBlock gp + let nw = convert <$> nw' + let ti = neverFails "bench db shouldn't forecast into future" + $ timeInterpreter nw + withBenchDBLayer ti wlTr + $ \db -> withWalletLayerTracer + benchname pipeliningStrat traceToDisk + $ \progressTrace -> do + let tracer = + trMessageText wlTr <> + contramap walletWorkerLogToBlockHeight progressTrace + let w = WalletLayer + tracer (emptyGenesis gp, np, sTol) nw tl db + + forM_ wallets $ \(wid, wname, s) -> do + _ <- unsafeRunExceptT $ W.createWallet w wid wname s + void + $ forkIO + $ unsafeRunExceptT + $ W.restoreWallet @_ @s @k w wid + + -- NOTE: This is now the time to restore /all/ wallets. + (_, restorationTime) <- bench "restoration" $ do + waitForWalletsSyncTo + targetSync + tr + proxy + w + (map fst' wallets) + gp + vData + + let (wid0, wname0, _) = head wallets + results <- + benchmarks proxy w wid0 wname0 benchname restorationTime + saveBenchmarkPoints benchname results + forM_ wallets $ \(wid, _, _) -> + unsafeRunExceptT (W.deleteWallet w wid) + pure $ SomeBenchmarkResults results where fst' (x,_,_) = x @@ -807,20 +835,32 @@ saveBenchmarkPoints :: ToJSON a => Text -> a -> IO () saveBenchmarkPoints benchname = Aeson.encodeFile (T.unpack benchname <> ".json") withWalletLayerTracer - :: Text + :: Show a + => Text + -> a -> Bool -> (Tracer IO (Maybe (Quantity "block" Word32)) -> IO r) -> IO r -withWalletLayerTracer benchname traceToDisk act - | traceToDisk = - withFile (T.unpack benchname <> ".timelog") WriteMode $ \h -> do - -- Use a custom tracer to output (time, blockHeight) to a file - -- each time we apply blocks. - let fileTr = Tracer $ \msg -> do - liftIO . B8.hPut h . T.encodeUtf8 . (<> "\n") $ msg - hFlush h - act $ traceBlockHeadersProgressForPlotting fileTr - | otherwise = act nullTracer +withWalletLayerTracer benchname pipelining traceToDisk act = do + let benchmarkFilename + = T.unpack $ benchname + <> "." + <> T.replace " " "_" (T.pack $ show pipelining) + <> ".timelog" + t0 <- getCurrentTime + if + | traceToDisk -> + withFile + benchmarkFilename + WriteMode + $ \h -> do + -- Use a custom tracer to output (time, blockHeight) to a file + -- each time we apply blocks. + let fileTr = Tracer $ \msg -> do + liftIO . B8.hPut h . T.encodeUtf8 . (<> "\n") $ msg + hFlush h + act $ traceBlockHeadersProgressForPlotting t0 fileTr + | otherwise -> act nullTracer dummyAddress :: forall (n :: NetworkDiscriminant). NetworkDiscriminantVal n @@ -840,12 +880,13 @@ dummySeedFromName = SomeMnemonic @24 . T.encodeUtf8 traceBlockHeadersProgressForPlotting - :: Tracer IO Text + :: UTCTime + -> Tracer IO Text -> Tracer IO (Maybe (Quantity "block" Word32)) -traceBlockHeadersProgressForPlotting tr = Tracer $ \bs -> do +traceBlockHeadersProgressForPlotting t0 tr = Tracer $ \bs -> do let mtip = pretty . getQuantity <$> bs - time <- pretty . utcTimeToPOSIXSeconds <$> getCurrentTime - case mtip of + time <- pretty . (`diffUTCTime` t0) <$> getCurrentTime + case mtip of Just tip -> traceWith tr $ time <> " " <> tip Nothing -> pure () @@ -888,7 +929,9 @@ prepareNode prepareNode tr proxy socketPath np vData = do traceWith tr $ MsgSyncStart proxy let networkId = networkIdVal proxy - sl <- withNetworkLayer nullTracer networkId np socketPath vData sTol $ \nw' -> do + sl <- withNetworkLayer nullTracer + tunedForMainnetPipeliningStrategy + networkId np socketPath vData sTol $ \nw' -> do let gp = genesisParameters np let convert = fromCardanoBlock gp let nw = convert <$> nw' @@ -986,15 +1029,21 @@ instance HasSeverityAnnotation (BenchmarkLog n) where instance NetworkDiscriminantVal n => ToText (BenchmarkLog n) where toText = \case MsgNodeTipTick tip progress -> - "Initial node synchronization: " +| progress |+ " " +| tip ||+"" + "Initial node synchronization: " + +| progress |+ " " + +| tip ||+ "" MsgRestorationTick _posixTime progressList -> - "Restoring: "+|progressList |+"" + "Restoring: " + +| progressList |+ "" MsgSyncStart _ -> - "Syncing "+| networkDiscriminantVal @n |+" node... " + "Syncing " + +| networkDiscriminantVal @n |+ " node... " MsgSyncCompleted _ tip -> - "Completed sync of "+| networkDiscriminantVal @n |+" up to "+|| tip ||+"" + "Completed sync of " + +| networkDiscriminantVal @n |+ " up to " +|| tip ||+ "" MsgRetryShortly delay -> - "Fetching tip failed, retrying in " +|| (delay `div` 1000) ||+ "ms" + "Fetching tip failed, retrying in " + +|| (delay `div` 1000) ||+ "ms" -- | Format a type-level per-myriad number as percent diff --git a/lib/shelley/exe/cardano-wallet.hs b/lib/shelley/exe/cardano-wallet.hs index 2048d021af6..44e7ef5a3e9 100644 --- a/lib/shelley/exe/cardano-wallet.hs +++ b/lib/shelley/exe/cardano-wallet.hs @@ -138,6 +138,8 @@ import Options.Applicative , progDesc , value ) +import Ouroboros.Network.Client.Wallet + ( tunedForMainnetPipeliningStrategy ) import System.Environment ( getArgs, getExecutablePath ) import System.Exit @@ -252,6 +254,7 @@ cmdServe = command "serve" $ info (helper <*> helper' <*> cmd) $ exitWith =<< serveWallet blockchainSource netParams + tunedForMainnetPipeliningStrategy discriminant tracers sTolerance diff --git a/lib/shelley/exe/local-cluster.hs b/lib/shelley/exe/local-cluster.hs index 68041e4b0bd..be56f622017 100644 --- a/lib/shelley/exe/local-cluster.hs +++ b/lib/shelley/exe/local-cluster.hs @@ -74,6 +74,8 @@ import Data.Text ( Text ) import Data.Text.Class ( ToText (..) ) +import Ouroboros.Network.Client.Wallet + ( tunedForMainnetPipeliningStrategy ) import System.Directory ( createDirectory ) import System.FilePath @@ -225,13 +227,13 @@ main = withLocalClusterSetup $ \dir clusterLogs walletLogs -> let trCluster' = contramap MsgCluster trCluster let encodeAddresses = map (first (T.unpack . encodeAddress @'Mainnet)) let accts = KeyCredential <$> concatMap genRewardAccounts mirMnemonics - let rewards = (, Coin $ fromIntegral oneMillionAda) <$> accts + let rewards' = (, Coin $ fromIntegral oneMillionAda) <$> accts sendFaucetFundsTo trCluster' socketPath dir $ encodeAddresses shelleyIntegrationTestFunds sendFaucetAssetsTo trCluster' socketPath dir 20 $ encodeAddresses $ maryIntegrationTestAssets (Coin 1_000_000_000) - moveInstantaneousRewardsTo trCluster' socketPath dir rewards + moveInstantaneousRewardsTo trCluster' socketPath dir rewards' whenReady dir trCluster logs (RunningNode socketPath block0 (gp, vData)) = withLoggingNamed "cardano-wallet" logs $ \(sb, (cfg, tr)) -> do @@ -256,6 +258,7 @@ main = withLocalClusterSetup $ \dir clusterLogs walletLogs -> void $ serveWallet (NodeSource socketPath vData) gp + tunedForMainnetPipeliningStrategy (SomeNetworkDiscriminant $ Proxy @'Mainnet) tracers (SyncTolerance 10) diff --git a/lib/shelley/src/Cardano/Wallet/Shelley.hs b/lib/shelley/src/Cardano/Wallet/Shelley.hs index 5d1c890b789..a009d053362 100644 --- a/lib/shelley/src/Cardano/Wallet/Shelley.hs +++ b/lib/shelley/src/Cardano/Wallet/Shelley.hs @@ -163,6 +163,8 @@ import Network.URI ( URI (..), parseURI ) import Network.Wai.Handler.Warp ( setBeforeMainLoop ) +import Ouroboros.Network.Client.Wallet + ( PipeliningStrategy ) import System.Exit ( ExitCode (..) ) import System.IOManager @@ -191,6 +193,8 @@ serveWallet -> NetworkParameters -- ^ Records the complete set of parameters -- currently in use by the network that are relevant to the wallet. + -> PipeliningStrategy (CardanoBlock StandardCrypto) + -- ^ pipelining value depending on block height -> SomeNetworkDiscriminant -- ^ Proxy for the network discriminant -> Tracers IO @@ -223,6 +227,7 @@ serveWallet , genesisParameters , slottingParameters } + pipeliningStrategy network@(SomeNetworkDiscriminant proxyNetwork) Tracers{..} sTolerance @@ -241,6 +246,7 @@ serveWallet lift . trace $ MsgNetworkName $ networkName proxyNetwork netLayer <- withNetworkLayer networkTracer + pipeliningStrategy blockchainSource network netParams diff --git a/lib/shelley/src/Cardano/Wallet/Shelley/Network.hs b/lib/shelley/src/Cardano/Wallet/Shelley/Network.hs index 4e6e53b171a..58c33c20c1f 100644 --- a/lib/shelley/src/Cardano/Wallet/Shelley/Network.hs +++ b/lib/shelley/src/Cardano/Wallet/Shelley/Network.hs @@ -39,6 +39,8 @@ import Data.Text.Class ( ToText (toText) ) import GHC.Stack ( HasCallStack ) +import Ouroboros.Network.Client.Wallet + ( PipeliningStrategy ) data NetworkLayerLog = NodeNetworkLog Node.Log @@ -59,17 +61,19 @@ instance HasSeverityAnnotation NetworkLayerLog where withNetworkLayer :: HasCallStack => Tracer IO NetworkLayerLog + -> PipeliningStrategy (CardanoBlock StandardCrypto) -> BlockchainSource -> SomeNetworkDiscriminant -> NetworkParameters -> SyncTolerance -> ContT r IO (NetworkLayer IO (CardanoBlock StandardCrypto)) -withNetworkLayer tr blockchainSrc net netParams tol = +withNetworkLayer tr pipeliningStrategy blockchainSrc net netParams tol = ContT $ case blockchainSrc of NodeSource nodeConn ver -> let tr' = NodeNetworkLog >$< tr netId = networkDiscriminantToId net - in Node.withNetworkLayer tr' netId netParams nodeConn ver tol + in Node.withNetworkLayer + tr' pipeliningStrategy netId netParams nodeConn ver tol BlockfrostSource project -> let tr' = BlockfrostNetworkLog >$< tr in Blockfrost.withNetworkLayer tr' net netParams project diff --git a/lib/shelley/src/Cardano/Wallet/Shelley/Network/Node.hs b/lib/shelley/src/Cardano/Wallet/Shelley/Network/Node.hs index 48bb8a7614e..24db42dcae0 100644 --- a/lib/shelley/src/Cardano/Wallet/Shelley/Network/Node.hs +++ b/lib/shelley/src/Cardano/Wallet/Shelley/Network/Node.hs @@ -212,6 +212,7 @@ import Ouroboros.Network.Client.Wallet ( LSQ (..) , LocalStateQueryCmd (..) , LocalTxSubmissionCmd (..) + , PipeliningStrategy , chainSyncFollowTip , chainSyncWithBlocks , localStateQuery @@ -288,6 +289,8 @@ withNetworkLayer :: HasCallStack => Tracer IO Log -- ^ Logging of network layer startup + -> PipeliningStrategy (CardanoBlock StandardCrypto) + -- ^ pipelining value by the block heigh -> Cardano.NetworkId -- ^ NetworkId for local node connection -> W.NetworkParameters @@ -300,13 +303,15 @@ withNetworkLayer -> (NetworkLayer IO (CardanoBlock StandardCrypto) -> IO a) -- ^ Callback function with the network layer -> IO a -withNetworkLayer tr net np conn ver tol action = do +withNetworkLayer tr pipeliningStrategy net np conn ver tol action = do trTimings <- traceQueryTimings tr - withNodeNetworkLayerBase (tr <> trTimings) net np conn ver tol action + withNodeNetworkLayerBase + (tr <> trTimings) pipeliningStrategy net np conn ver tol action withNodeNetworkLayerBase :: HasCallStack => Tracer IO Log + -> PipeliningStrategy (CardanoBlock StandardCrypto) -> Cardano.NetworkId -> W.NetworkParameters -> CardanoNodeConn @@ -314,7 +319,8 @@ withNodeNetworkLayerBase -> SyncTolerance -> (NetworkLayer IO (CardanoBlock StandardCrypto) -> IO a) -> IO a -withNodeNetworkLayerBase tr net np conn versionData tol action = do +withNodeNetworkLayerBase + tr pipeliningStrategy net np conn versionData tol action = do -- NOTE: We keep client connections running for accessing the node tip, -- submitting transactions, querying parameters and delegations/rewards. -- @@ -347,6 +353,7 @@ withNodeNetworkLayerBase tr net np conn versionData tol action = do let blockHeader = fromTip' gp let client = mkWalletClient (mapChainSyncLog mapB mapP >$< trChainSyncLog) + pipeliningStrategy (mapChainFollower toPoint mapP blockHeader id follower) cfg traceWith trFollowLog MsgStartFollowing @@ -583,17 +590,17 @@ mkWalletClient . ( block ~ CardanoBlock (StandardCrypto) , MonadThrow m, MonadST m, MonadTimer m, MonadAsync m) => Tracer m (ChainSyncLog block (Point block)) + -> PipeliningStrategy block -> ChainFollower m (Point block) (Tip block) (NonEmpty block) -> CodecConfig block -> NetworkClient m -mkWalletClient tr follower cfg v = +mkWalletClient tr pipeliningStrategy follower cfg v = nodeToClientProtocols (const $ return $ NodeToClientProtocols { localChainSyncProtocol = InitiatorProtocolOnly $ MuxPeerRaw $ \channel -> runPipelinedPeer nullTracer (cChainSyncCodec $ codecs v cfg) channel $ chainSyncClientPeerPipelined - $ chainSyncWithBlocks tr follower - + $ chainSyncWithBlocks tr pipeliningStrategy follower , localTxSubmissionProtocol = doNothingProtocol , localStateQueryProtocol = diff --git a/lib/shelley/test/integration/shelley-integration-test.hs b/lib/shelley/test/integration/shelley-integration-test.hs index 3c14397910e..c75c2a1fa30 100644 --- a/lib/shelley/test/integration/shelley-integration-test.hs +++ b/lib/shelley/test/integration/shelley-integration-test.hs @@ -113,6 +113,8 @@ import Network.HTTP.Client ) import Network.URI ( URI ) +import Ouroboros.Network.Client.Wallet + ( tunedForMainnetPipeliningStrategy ) import System.Directory ( createDirectory ) import System.Environment @@ -341,6 +343,7 @@ specWithServer testDir (tr, tracers) = aroundAll withContext serveWallet (NodeSource conn vData) gp + tunedForMainnetPipeliningStrategy (SomeNetworkDiscriminant $ Proxy @'Mainnet) tracers (SyncTolerance 10) diff --git a/lib/shelley/test/unit/Cardano/Wallet/Shelley/NetworkSpec.hs b/lib/shelley/test/unit/Cardano/Wallet/Shelley/NetworkSpec.hs index d48d4630ee9..f9cee3e7bc8 100644 --- a/lib/shelley/test/unit/Cardano/Wallet/Shelley/NetworkSpec.hs +++ b/lib/shelley/test/unit/Cardano/Wallet/Shelley/NetworkSpec.hs @@ -36,6 +36,8 @@ import Data.Set ( Set ) import Fmt ( build, fmt, indentF ) +import Ouroboros.Network.Client.Wallet + ( tunedForMainnetPipeliningStrategy ) import Ouroboros.Network.Magic ( NetworkMagic (..) ) import Ouroboros.Network.NodeToClient @@ -73,10 +75,14 @@ concurrentConnectionSpec = describe "NetworkLayer regression test #1708" $ do withTestNode nullTracer $ \np sock vData -> do let sTol = SyncTolerance 60 tasks <- replicateM 10 $ async $ - withNetworkLayer tr testnet np sock vData sTol $ \nl -> do - -- Wait for the first tip result from the node - waiter <- newEmptyMVar - race_ (watchNodeTip nl (putMVar waiter)) (takeMVar waiter) + withNetworkLayer tr + tunedForMainnetPipeliningStrategy + testnet np sock vData sTol $ \nl -> do + -- Wait for the first tip result from the node + waiter <- newEmptyMVar + race_ + (watchNodeTip nl (putMVar waiter)) + (takeMVar waiter) void $ waitAnyCancel tasks observerSpec :: Spec