Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify checkpoint pruning #3159

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/core/cardano-wallet-core.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ test-suite unit
Cardano.Wallet.CoinSelection.Internal.BalanceSpec
Cardano.Wallet.CoinSelection.Internal.CollateralSpec
Cardano.Wallet.DB.Arbitrary
Cardano.Wallet.DB.CheckpointsSpec
Cardano.Wallet.DB.MVarSpec
Cardano.Wallet.DB.Properties
Cardano.Wallet.DB.SqliteSpec
Expand Down
91 changes: 31 additions & 60 deletions lib/core/src/Cardano/Wallet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,20 @@ import Cardano.Wallet.DB
, ErrPutLocalTxSubmission (..)
, ErrRemoveTx (..)
, ErrWalletAlreadyExists (..)
, SparseCheckpointsConfig (..)
, defaultSparseCheckpointsConfig
, sparseCheckpoints
)
import Cardano.Wallet.DB.Checkpoints
( DeltaCheckpoints (..) )
( extendAndPrune )
import Cardano.Wallet.DB.Sqlite.AddressBook
( AddressBookIso, getPrologue )
import Cardano.Wallet.DB.WalletState
( DeltaMap (..), DeltaWalletState1 (..), fromWallet, getLatest, getSlot )
( DeltaWalletState1 (..)
, WalletState (..)
, adjustNoSuchWallet
, fromWallet
, getBlockHeight
, getLatest
, getSlot
)
import Cardano.Wallet.Logging
( BracketLog
, BracketLog' (..)
Expand Down Expand Up @@ -1041,48 +1045,30 @@ restoreBlocks ctx tr wid blocks nodeTip = db & \DBLayer{..} -> mapExceptT atomic
liftIO $ logDelegation delegation
putDelegationCertificate wid cert slotNo

let unstable = Set.fromList $ sparseCheckpoints cfg (nodeTip ^. #blockHeight)
where
-- NOTE
-- The edge really is an optimization to avoid rolling back too
-- "far" in the past. Yet, we let the edge construct itself
-- organically once we reach the tip of the chain and start
-- processing blocks one by one.
--
-- This prevents the wallet from trying to create too many
-- checkpoints at once during restoration which causes massive
-- performance degradation on large wallets.
--
-- Rollback may still occur during this short period, but
-- rolling back from a few hundred blocks is relatively fast
-- anyway.
cfg = (defaultSparseCheckpointsConfig epochStability) { edgeSize = 0 }

getBlockHeight cp = fromIntegral $
cp ^. #currentTip . #blockHeight . #getQuantity
willKeep cp = getBlockHeight cp `Set.member` unstable
cpsKeep = filter willKeep (NE.init cps) <> [NE.last cps]

-- NOTE: We have to update the 'Prologue' as well,
-- as it can contain addresses for pending transactions,
-- which are removed from the 'Prologue' once the
-- transactions are accepted onto the chain and discovered.
--
-- I'm not so sure that the approach here is correct with
-- respect to rollbacks, but it is functionally the same
-- as the code that came before.
deltaPrologue =
[ ReplacePrologue $ getPrologue $ getState $ NE.last cps ]
delta = deltaPrologue ++ reverse
[ UpdateCheckpoints $ PutCheckpoint (getSlot wcp) wcp
| wcp <- map (snd . fromWallet) cpsKeep
]

liftIO $ mapM_ logCheckpoint cpsKeep
ExceptT $ modifyDBMaybe walletsDB $
adjustNoSuchWallet wid id $ \_ -> Right ( delta, () )
adjustNoSuchWallet wid id $ \wal ->
let
wcps = snd . fromWallet <$> cps
deltaCheckpoints =
[ UpdateCheckpoints
$ extendAndPrune getSlot (Quantity . getBlockHeight) epochStability
(nodeTip ^. #blockHeight) wcps (checkpoints wal)
]
-- NOTE: We have to update the 'Prologue' as well,
-- as it can contain addresses for pending transactions,
-- which are removed from the 'Prologue' once the
-- transactions are accepted onto the chain and discovered.
--
-- I'm not so sure that the approach here is correct with
-- respect to rollbacks, but it is functionally the same
-- as the code that came before.
deltaPrologue =
[ ReplacePrologue $ getPrologue $ getState $ NE.last cps ]
in Right ( deltaPrologue ++ deltaCheckpoints, () )

prune wid epochStability
-- Note: At this point, checkpoints have already been pruned
-- we only prune LocalTxSubmission and TxHistory here.
pruneTxs wid epochStability

liftIO $ do
traceWith tr $ MsgDiscoveredTxs txs
Expand All @@ -1091,9 +1077,6 @@ restoreBlocks ctx tr wid blocks nodeTip = db & \DBLayer{..} -> mapExceptT atomic
nl = ctx ^. networkLayer
db = ctx ^. dbLayer @IO @s @k

logCheckpoint :: Wallet s -> IO ()
logCheckpoint cp = traceWith tr $ MsgCheckpoint (currentTip cp)

logDelegation :: (SlotNo, DelegationCertificate) -> IO ()
logDelegation = traceWith tr . uncurry MsgDiscoveredDelegationCert

Expand Down Expand Up @@ -1402,18 +1385,6 @@ importRandomAddresses ctx wid addrs = db & \DBLayer{..} ->
s0 = getState $ getLatest wal
es1 = foldl' (\s addr -> s >>= Rnd.importAddress addr) (Right s0) addrs

-- | Adjust a specific wallet if it exists or return 'ErrNoSuchWallet'.
adjustNoSuchWallet
:: WalletId
-> (ErrNoSuchWallet -> e)
-> (w -> Either e (dw, b))
-> (Map WalletId w -> (Maybe (DeltaMap WalletId dw), Either e b))
adjustNoSuchWallet wid err update wallets = case Map.lookup wid wallets of
Nothing -> (Nothing, Left $ err $ ErrNoSuchWallet wid)
Just wal -> case update wal of
Left e -> (Nothing, Left e)
Right (dw, b) -> (Just $ Adjust wid dw, Right b)

-- NOTE
-- Addresses coming from the transaction history might be payment or
-- delegation addresses. So we normalize them all to be delegation addresses
Expand Down
138 changes: 4 additions & 134 deletions lib/core/src/Cardano/Wallet/DB.hs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}

Expand All @@ -19,12 +16,6 @@ module Cardano.Wallet.DB
, DBFactory (..)
, cleanDB

-- * Checkpoints
, sparseCheckpoints
, SparseCheckpointsConfig (..)
, defaultSparseCheckpointsConfig
, gapSize

-- * Errors
, ErrBadFormat(..)
, ErrNoSuchWallet(..)
Expand All @@ -39,7 +30,7 @@ import Prelude
import Cardano.Address.Derivation
( XPrv )
import Cardano.Wallet.DB.WalletState
( DeltaMap, DeltaWalletState )
( DeltaMap, DeltaWalletState, ErrNoSuchWallet (..) )
import Cardano.Wallet.Primitive.AddressDerivation
( Depth (..) )
import Cardano.Wallet.Primitive.Model
Expand Down Expand Up @@ -76,12 +67,10 @@ import Data.DBVar
import Data.Quantity
( Quantity (..) )
import Data.Word
( Word32, Word8 )
( Word32 )
import UnliftIO.Exception
( Exception )

import qualified Data.List as L

-- | Instantiate database layers at will
data DBFactory m s k = DBFactory
{ withDatabase :: forall a. WalletId -> (DBLayer m s k -> IO a) -> IO a
Expand Down Expand Up @@ -328,11 +317,11 @@ data DBLayer m s k = forall stm. (MonadIO stm, MonadFail stm) => DBLayer
-- point of rollback but can't be guaranteed to be exactly the same
-- because the database may only keep sparse checkpoints.

, prune
, pruneTxs
:: WalletId
-> Quantity "block" Word32
-> ExceptT ErrNoSuchWallet stm ()
-- ^ Prune database entities and remove entities that can be discarded.
-- ^ Prune and remove local tx submission and outdated transactions.
--
-- The second argument represents the stability window, or said
-- length of the deepest rollback.
Expand All @@ -351,11 +340,6 @@ data ErrBadFormat

instance Exception ErrBadFormat

-- | Can't perform given operation because there's no wallet
newtype ErrNoSuchWallet
= ErrNoSuchWallet WalletId -- Wallet is gone or doesn't exist yet
deriving (Eq, Show)

-- | Can't add a transaction to the local tx submission pool.
data ErrPutLocalTxSubmission
= ErrPutLocalTxSubmissionNoSuchWallet ErrNoSuchWallet
Expand Down Expand Up @@ -384,117 +368,3 @@ newtype ErrWalletAlreadyExists
cleanDB :: DBLayer m s k -> m ()
cleanDB DBLayer{..} = atomically $
listWallets >>= mapM_ (runExceptT . removeWallet)

-- | Storing EVERY checkpoints in the database is quite expensive and useless.
-- We make the following assumptions:
--
-- - We can't rollback for more than `k=epochStability` blocks in the past
-- - It is pretty fast to re-sync a few hundred blocks
-- - Small rollbacks may occur more often than long one
--
-- So, as we insert checkpoints, we make sure to:
--
-- - Prune any checkpoint that more than `k` blocks in the past
-- - Keep only one checkpoint every 100 blocks
-- - But still keep ~10 most recent checkpoints to cope with small rollbacks
--
-- __Example 1__: Inserting `cp153`
--
-- ℹ: `cp142` is discarded and `cp153` inserted.
--
-- @
-- Currently in DB:
-- ┌───┬───┬───┬─ ──┬───┐
-- │cp000 │cp100 │cp142 │.. ..│cp152 │
-- └───┴───┴───┴─ ──┴───┘
-- Want in DB:
-- ┌───┬───┬───┬─ ──┬───┐
-- │cp000 │cp100 │cp143 │.. ..│cp153 │
-- └───┴───┴───┴─ ──┴───┘
-- @
--
--
-- __Example 2__: Inserting `cp111`
--
-- ℹ: `cp100` is kept and `cp111` inserted.
--
-- @
-- Currently in DB:
-- ┌───┬───┬───┬─ ──┬───┐
-- │cp000 │cp100 │cp101 │.. ..│cp110 │
-- └───┴───┴───┴─ ──┴───┘
-- Want in DB:
-- ┌───┬───┬───┬─ ──┬───┐
-- │cp000 │cp100 │cp101 │.. ..│cp111 │
-- └───┴───┴───┴─ ──┴───┘
-- @
--
-- NOTE: There might be cases where the chain following "fails" (because, for
-- example, the node has switch to a different chain, different by more than k),
-- and in such cases, we have no choice but rolling back from genesis.
-- Therefore, we need to keep the very first checkpoint in the database, no
-- matter what.
sparseCheckpoints
:: SparseCheckpointsConfig
-- ^ Parameters for the function.
-> Quantity "block" Word32
-- ^ A given block height
-> [Word32]
-- ^ The list of checkpoint heights that should be kept in DB.
sparseCheckpoints cfg blkH =
let
SparseCheckpointsConfig{edgeSize,epochStability} = cfg
g = gapSize cfg
h = getQuantity blkH
e = fromIntegral edgeSize

minH =
let x = if h < epochStability + g then 0 else h - epochStability - g
in g * (x `div` g)

initial = 0
longTerm = [minH,minH+g..h]
shortTerm = if h < e
then [0..h]
else [h-e,h-e+1..h]
in
L.sort (L.nub $ initial : (longTerm ++ shortTerm))

-- | Captures the configuration for the `sparseCheckpoints` function.
--
-- NOTE: large values of 'edgeSize' aren't recommended as they would mean
-- storing many unnecessary checkpoints. In Ouroboros Praos, there's a
-- reasonable probability for small forks of a few blocks so it makes sense to
-- maintain a small part that is denser near the edge.
data SparseCheckpointsConfig = SparseCheckpointsConfig
{ edgeSize :: Word8
, epochStability :: Word32
} deriving Show

-- | A sensible default to use in production. See also 'SparseCheckpointsConfig'
defaultSparseCheckpointsConfig :: Quantity "block" Word32 -> SparseCheckpointsConfig
defaultSparseCheckpointsConfig (Quantity epochStability) =
SparseCheckpointsConfig
{ edgeSize = 5
, epochStability
}

-- | A reasonable gap size used internally in 'sparseCheckpoints'.
--
-- 'Reasonable' means that it's not _too frequent_ and it's not too large. A
-- value that is too small in front of k would require generating much more
-- checkpoints than necessary.
--
-- A value that is larger than `k` may have dramatic consequences in case of
-- deep rollbacks.
--
-- As a middle ground, we current choose `k / 3`, which is justified by:
--
-- - The current speed of the network layer (several thousands blocks per seconds)
-- - The current value of k = 2160
--
-- So, `k / 3` = 720, which should remain around a second of time needed to catch
-- up in case of large rollbacks.
gapSize :: SparseCheckpointsConfig -> Word32
gapSize SparseCheckpointsConfig{epochStability} =
epochStability `div` 3
Loading