Skip to content

Commit

Permalink
Merge #4442
Browse files Browse the repository at this point in the history
4442: tx-generator: Fix multi-threaded use of tx-streams. r=MarcFontaine a=MarcFontaine

* Put the tx-streams behind an `MVar`.
* Replace 'BechmarkTx' with generic 'NtoM'.
* Remove 'RunBenchmarkAux' and cleanups.
* Remove global state for 'TFee' and 'TTxAdditionalSize'

Co-authored-by: Michael Karg <[email protected]>
Co-authored-by: MarcFontaine <[email protected]>
  • Loading branch information
3 people authored Sep 20, 2022
2 parents 27c28e0 + 0818647 commit d03e0f5
Show file tree
Hide file tree
Showing 19 changed files with 224 additions and 446 deletions.
56 changes: 15 additions & 41 deletions bench/tx-generator/src/Cardano/Benchmarking/Compiler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import Data.Text (Text)
import qualified Data.Text as Text

import Cardano.Api
import Cardano.Benchmarking.Types
import Cardano.Benchmarking.NixOptions
import Cardano.Benchmarking.Script.Setters
import Cardano.Benchmarking.Script.Store (Name(..), WalletName)
Expand Down Expand Up @@ -54,12 +53,10 @@ compileToScript = do
genesisWallet <- importGenesisFunds
collateralWallet <- addCollaterals genesisWallet
splitWallet <- splittingPhase genesisWallet
benchmarkingPhaseNew splitWallet collateralWallet
benchmarkingPhase splitWallet collateralWallet

initConstants :: Compiler ()
initConstants = do
setN TTxAdditionalSize _nix_add_tx_size
setN TFee _nix_tx_fee
setN TLocalSocket _nix_localNodeSocketPath
setConst TTTL 1000000
where
Expand Down Expand Up @@ -101,8 +98,8 @@ addCollaterals src = do

splittingPhase :: SrcWallet -> Compiler DstWallet
splittingPhase srcWallet = do
(NumberOfTxs tx_count) <- askNixOption _nix_tx_count
(NumberOfInputsPerTx inputs_per_tx) <- askNixOption _nix_inputs_per_tx
tx_count <- askNixOption _nix_tx_count
inputs_per_tx <- askNixOption _nix_inputs_per_tx
tx_fee <- askNixOption _nix_tx_fee
era <- askNixOption _nix_era
minValuePerInput <- _minValuePerInput <$> evilFeeMagic
Expand Down Expand Up @@ -170,19 +167,23 @@ unfoldSplitSequence fee value outputs
(x, 0) -> x
(x, _rest) -> x+1

benchmarkingPhaseNew :: WalletName -> Maybe WalletName -> Compiler ()
benchmarkingPhaseNew wallet collateralWallet = do
benchmarkingPhase :: WalletName -> Maybe WalletName -> Compiler ()
benchmarkingPhase wallet collateralWallet = do
debugMode <- askNixOption _nix_debugMode
targetNodes <- askNixOption _nix_targetNodes
extraArgs <- evilValueMagic
tps <- askNixOption _nix_tps
era <- askNixOption _nix_era
(NumberOfTxs txCount) <- askNixOption _nix_tx_count
txCount <- askNixOption _nix_tx_count
fee <- askNixOption _nix_tx_fee
inputs <- askNixOption _nix_inputs_per_tx
outputs <- askNixOption _nix_outputs_per_tx
metadataSize <- askNixOption _nix_add_tx_size
let
payMode = PayToAddr (KeyName "pass-partout") wallet --todo: used different wallet here !
submitMode = if debugMode
then LocalSocket
else Benchmark targetNodes (ThreadName "tx-submit-benchmark") tps extraArgs
generator = Take txCount $ Cycle $ BechmarkTx wallet extraArgs collateralWallet
else Benchmark targetNodes (ThreadName "tx-submit-benchmark") tps txCount
generator = Take txCount $ Cycle $ NtoM fee wallet payMode inputs outputs (Just metadataSize) collateralWallet
emit $ Submit era submitMode generator
unless debugMode $ do
emit $ WaitBenchmark $ ThreadName "tx-submit-benchmark"
Expand All @@ -196,8 +197,8 @@ evilFeeMagic :: Compiler Fees
evilFeeMagic = do
(Quantity tx_fee) <- lovelaceToQuantity <$> askNixOption _nix_tx_fee
plutusMode <- askNixOption _nix_plutusMode
(NumberOfInputsPerTx inputs_per_tx) <- askNixOption _nix_inputs_per_tx
(NumberOfOutputsPerTx outputs_per_tx) <- askNixOption _nix_outputs_per_tx
inputs_per_tx <- askNixOption _nix_inputs_per_tx
outputs_per_tx <- askNixOption _nix_outputs_per_tx
(Quantity min_utxo_value) <- lovelaceToQuantity <$> askNixOption _nix_min_utxo_value
let
scriptFees = 5000000;
Expand Down Expand Up @@ -249,30 +250,3 @@ newWallet n = do
name <- WalletName <$> newIdentifier n
emit $ InitWallet name
return name

-- Approximate the ada values for inputs of the benchmarking Phase
evilValueMagic :: Compiler RunBenchmarkAux
evilValueMagic = do
(NumberOfInputsPerTx inputsPerTx) <- askNixOption _nix_inputs_per_tx
(NumberOfOutputsPerTx outputsPerTx) <- askNixOption _nix_outputs_per_tx
(NumberOfTxs txCount) <- askNixOption _nix_tx_count
fee <- askNixOption _nix_tx_fee
minValuePerUTxO <- askNixOption _nix_min_utxo_value
let
(Quantity minValue) = lovelaceToQuantity $ fromIntegral outputsPerTx * minValuePerUTxO + fee

-- this is not totally correct:
-- beware of rounding errors !
minValuePerInput = quantityToLovelace $ fromIntegral (if m==0 then d else d+1)
where
(d, m) = minValue `divMod` fromIntegral inputsPerTx
return $ RunBenchmarkAux {
auxTxCount = txCount
, auxFee = fee
, auxOutputsPerTx = outputsPerTx
, auxInputsPerTx = inputsPerTx
, auxInputs = inputsPerTx * txCount
, auxOutputs = inputsPerTx * txCount
, auxMinValuePerUTxO = minValuePerInput
}

12 changes: 6 additions & 6 deletions bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE UndecidableInstances #-}

Expand All @@ -10,7 +10,6 @@

module Cardano.Benchmarking.GeneratorTx
( AsyncBenchmarkControl
, TxGenError
, walletBenchmark
, readSigningKey
, waitBenchmark
Expand All @@ -34,14 +33,14 @@ import Cardano.Node.Configuration.NodeAddress

import Cardano.Api hiding (txFee)

import Cardano.TxGenerator.Types (TxGenError(..))
import Cardano.Benchmarking.GeneratorTx.NodeToNode
import Cardano.Benchmarking.GeneratorTx.Submission
import Cardano.Benchmarking.GeneratorTx.SubmissionClient
import Cardano.Benchmarking.TpsThrottle
import Cardano.Benchmarking.LogTypes
import Cardano.Benchmarking.TpsThrottle
import Cardano.Benchmarking.Types
import Cardano.Benchmarking.Wallet (TxStream)
import Cardano.TxGenerator.Types (NumberOfTxs, TPSRate, TxGenError (..))

readSigningKey :: SigningKeyFile -> ExceptT TxGenError IO (SigningKey PaymentKey)
readSigningKey =
Expand Down Expand Up @@ -138,17 +137,18 @@ walletBenchmark
traceDebug $ "******* Tx generator, launching Tx peers: " ++ show (NE.length remoteAddresses) ++ " of them"

startTime <- Clock.getCurrentTime
tpsThrottle <- newTpsThrottle 32 (unNumberOfTxs count) tpsRate
tpsThrottle <- newTpsThrottle 32 count tpsRate

reportRefs <- STM.atomically $ replicateM (fromIntegral numTargets) STM.newEmptyTMVar

txStreamRef <- newMVar $ StreamActive txSource
allAsyncs <- forM (zip reportRefs $ NE.toList remoteAddresses) $
\(reportRef, remoteAddr) -> do
let errorHandler = handleTxSubmissionClientError traceSubmit remoteAddr reportRef errorPolicy
client = txSubmissionClient
traceN2N
traceSubmit
(txStreamSource txSource tpsThrottle)
(txStreamSource txStreamRef tpsThrottle)
(submitSubmissionThreadStats reportRef)
async $ handle errorHandler (connectClient remoteAddr client)

Expand Down
43 changes: 38 additions & 5 deletions bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/Genesis.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import qualified Data.ListMap as ListMap
import Prelude (error, filter)

import Cardano.Api
import Cardano.Api.Shelley (fromShelleyLovelace, fromShelleyPaymentCredential,
fromShelleyStakeReference, ReferenceScript(..))
import Cardano.Api.Shelley (ReferenceScript (..), fromShelleyLovelace,
fromShelleyPaymentCredential, fromShelleyStakeReference)
import Control.Arrow ((***))

import Cardano.TxGenerator.FundQueue
import Cardano.Benchmarking.GeneratorTx.Tx
import Cardano.TxGenerator.Fund
import Cardano.TxGenerator.Utils

import Cardano.Ledger.Shelley.API (Addr (..), ShelleyGenesis, sgInitialFunds)
import Ouroboros.Consensus.Shelley.Eras (StandardShelley)
Expand Down Expand Up @@ -58,7 +58,7 @@ genesisExpenditure ::
-> (Tx era, Fund)
genesisExpenditure networkId inputKey addr coin fee ttl outputKey = (tx, Fund $ InAnyCardanoEra cardanoEra fund)
where
tx = mkGenesisTransaction (castKey inputKey) 0 ttl fee [ pseudoTxIn ] [ txout ]
tx = mkGenesisTransaction (castKey inputKey) ttl fee [ pseudoTxIn ] [ txout ]

value = mkTxOutValueAdaOnly $ coin - fee
txout = TxOut addr value TxOutDatumNone ReferenceScriptNone
Expand All @@ -75,3 +75,36 @@ genesisExpenditure networkId inputKey addr coin fee ttl outputKey = (tx, Fund $
, _fundVal = value
, _fundSigningKey = Just outputKey
}

mkGenesisTransaction :: forall era .
IsShelleyBasedEra era
=> SigningKey GenesisUTxOKey
-> SlotNo
-> Lovelace
-> [TxIn]
-> [TxOut CtxTx era]
-> Tx era
mkGenesisTransaction key ttl fee txins txouts
= case makeTransactionBody txBodyContent of
Right b -> signShelleyTransaction b [WitnessGenesisUTxOKey key]
Left err -> error $ show err
where
txBodyContent = TxBodyContent {
txIns = zip txins $ repeat $ BuildTxWith $ KeyWitness KeyWitnessForSpending
, txInsCollateral = TxInsCollateralNone
, txInsReference = TxInsReferenceNone
, txOuts = txouts
, txFee = mkTxFee fee
, txValidityRange = (TxValidityNoLowerBound, mkTxValidityUpperBound ttl)
, txMetadata = TxMetadataNone
, txAuxScripts = TxAuxScriptsNone
, txExtraKeyWits = TxExtraKeyWitnessesNone
, txProtocolParams = BuildTxWith Nothing
, txWithdrawals = TxWithdrawalsNone
, txCertificates = TxCertificatesNone
, txUpdateProposal = TxUpdateProposalNone
, txMintValue = TxMintNone
, txScriptValidity = TxScriptValidityNone
, txReturnCollateral = TxReturnCollateralNone
, txTotalCollateral = TxTotalCollateralNone
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ where
import Prelude

import Cardano.Api
import Cardano.Benchmarking.GeneratorTx.Tx
import qualified Data.ByteString as BS
import qualified Data.Map.Strict as Map
import Data.Word (Word64)

import Cardano.TxGenerator.Utils


maxMapSize :: Int
maxMapSize = 1000
maxBSSize :: Int
Expand Down Expand Up @@ -110,8 +112,8 @@ dummyTxSizeInEra metadata = case makeTransactionBody dummyTx of
, txInsCollateral = TxInsCollateralNone
, txInsReference = TxInsReferenceNone
, txOuts = []
, txFee = mkFee 0
, txValidityRange = (TxValidityNoLowerBound, mkValidityUpperBound 0)
, txFee = mkTxFee 0
, txValidityRange = (TxValidityNoLowerBound, mkTxValidityUpperBound 0)
, txMetadata = metadata
, txAuxScripts = TxAuxScriptsNone
, txExtraKeyWits = TxExtraKeyWitnessesNone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NoMonomorphismRestriction #-}
Expand All @@ -18,7 +19,8 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}

module Cardano.Benchmarking.GeneratorTx.Submission
( SubmissionParams(..)
( StreamState (..)
, SubmissionParams(..)
, SubmissionThreadReport
, TxSource
, ReportRef
Expand Down Expand Up @@ -47,6 +49,7 @@ import Cardano.Tracing.OrphanInstances.Shelley ()
import Ouroboros.Network.Protocol.TxSubmission2.Type (TokBlockingStyle (..))

import Cardano.Api
import Cardano.TxGenerator.Types (TPSRate)

import Cardano.Benchmarking.TpsThrottle
import Cardano.Benchmarking.LogTypes
Expand Down Expand Up @@ -112,37 +115,52 @@ mkSubmissionSummary ssThreadName startTime reportsRefs
where
txDiffTimeTPS :: Int -> NominalDiffTime -> TPSRate
txDiffTimeTPS n delta =
TPSRate $ realToFrac $ fromIntegral n / delta
realToFrac $ fromIntegral n / delta

threadReportTps :: SubmissionThreadReport -> TPSRate
threadReportTps
SubmissionThreadReport
{ strStats=SubmissionThreadStats{stsAcked=Ack ack}, strEndOfProtocol } =
txDiffTimeTPS ack (Clock.diffUTCTime strEndOfProtocol startTime)

txStreamSource :: forall era. TxStream IO era -> TpsThrottle -> TxSource era
txStreamSource stream tpsThrottle = Active $ worker stream
txStreamSource :: forall era. MVar (StreamState (TxStream IO era)) -> TpsThrottle -> TxSource era
txStreamSource streamRef tpsThrottle = Active worker
where
worker :: forall m blocking . MonadIO m => TxStream IO era -> TokBlockingStyle blocking -> Req -> m (TxSource era, [Tx era])
worker s blocking req = do
worker :: forall m blocking . MonadIO m => TokBlockingStyle blocking -> Req -> m (TxSource era, [Tx era])
worker blocking req = do
(done, txCount) <- case blocking of
TokBlocking -> liftIO $ consumeTxsBlocking tpsThrottle req
TokNonBlocking -> liftIO $ consumeTxsNonBlocking tpsThrottle req
(txList, newScript) <- liftIO $ unFold s txCount
txList <- liftIO $ unFold txCount
case done of
Stop -> return (Exhausted, txList)
Next -> return (Active $ worker newScript, txList)

unFold :: TxStream IO era -> Int -> IO ([Tx era], TxStream IO era)
unFold s 0 = return ([], s)
unFold s n = do
next <- Streaming.next s
case next of
-- Node2node clients buffer a number x of TXs internally (x is determined by the node.)
-- Therefore it is possible that the submission client requests TXs from an empty TxStream.
-- In other words, it is not an error to request more TXs than there are in the TxStream.
Left _ -> return ([], s)
Right (Right tx, t) -> do
(l, out) <- unFold t $ pred n
return (tx:l, out)
Right (Left err, _) -> error err
Next -> return (Active worker, txList)

unFold :: Int -> IO [Tx era]
unFold 0 = return []
unFold n = nextOnMVar streamRef >>= \case
-- Node2node clients buffer a number x of TXs internally (x is determined by the node.)
-- Therefore it is possible that the submission client requests TXs from an empty TxStream.
-- In other words, it is not an error to request more TXs than there are in the TxStream.
StreamEmpty -> return []
StreamError err -> error err
StreamActive tx -> do
l <- unFold $ pred n
return $ tx:l

nextOnMVar :: MVar (StreamState (TxStream IO era)) -> IO (StreamState (Tx era))
nextOnMVar v = modifyMVar v $ \case
StreamEmpty -> return (StreamEmpty, StreamEmpty)
StreamError err -> return (StreamError err, StreamError err)
StreamActive s -> update <$> Streaming.next s
where
update :: Either () (Either String (Tx era), TxStream IO era) -> (StreamState (TxStream IO era), StreamState (Tx era))
update x = case x of
Left () -> (StreamEmpty, StreamEmpty)
Right (Right tx, t) -> (StreamActive t, StreamActive tx)
Right (Left err, _) -> (StreamError err, StreamError err)

data StreamState x
= StreamEmpty
| StreamError String
| StreamActive x
Loading

0 comments on commit d03e0f5

Please sign in to comment.