Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: async commit #16175

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func SetIAVLLazyLoading(lazyLoading bool) func(*BaseApp) {
return func(bapp *BaseApp) { bapp.cms.SetLazyLoading(lazyLoading) }
}

// SetIAVLAsyncCommitBuffer sets the size of the async commit channel, -1 means synchronous commitment.
func SetIAVLAsyncCommitBuffer(size int) func(*BaseApp) {
return func(bapp *BaseApp) { bapp.cms.SetIAVLAsyncCommitBuffer(size) }
}

// SetInterBlockCache provides a BaseApp option function that sets the
// inter-block cache.
func SetInterBlockCache(cache storetypes.MultiStorePersistentCache) func(*BaseApp) {
Expand Down
27 changes: 16 additions & 11 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ type BaseConfig struct {
// IAVLLazyLoading enable/disable the lazy loading of iavl store.
IAVLLazyLoading bool `mapstructure:"iavl-lazy-loading"`

// IAVLAsyncCommitBuffer defines the size of the async commit channel (in number of blocks),
// -1 means synchronous commitment.
IAVLAsyncCommitBuffer int `mapstructure:"iavl-async-commit-buffer"`

// AppDBBackend defines the type of Database to use for the application and snapshots databases.
// An empty string indicates that the CometBFT config's DBBackend value should be used.
AppDBBackend string `mapstructure:"app-db-backend"`
Expand Down Expand Up @@ -227,17 +231,18 @@ func (c *Config) GetMinGasPrices() sdk.DecCoins {
func DefaultConfig() *Config {
return &Config{
BaseConfig: BaseConfig{
MinGasPrices: defaultMinGasPrices,
InterBlockCache: true,
Pruning: pruningtypes.PruningOptionDefault,
PruningKeepRecent: "0",
PruningInterval: "0",
MinRetainBlocks: 0,
IndexEvents: make([]string, 0),
IAVLCacheSize: 781250,
IAVLDisableFastNode: false,
IAVLLazyLoading: false,
AppDBBackend: "",
MinGasPrices: defaultMinGasPrices,
InterBlockCache: true,
Pruning: pruningtypes.PruningOptionDefault,
PruningKeepRecent: "0",
PruningInterval: "0",
MinRetainBlocks: 0,
IndexEvents: make([]string, 0),
IAVLCacheSize: 781250,
IAVLDisableFastNode: false,
IAVLLazyLoading: false,
IAVLAsyncCommitBuffer: -1,
AppDBBackend: "",
},
Telemetry: telemetry.Config{
Enabled: false,
Expand Down
4 changes: 4 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ iavl-disable-fastnode = {{ .BaseConfig.IAVLDisableFastNode }}
# Default is false.
iavl-lazy-loading = {{ .BaseConfig.IAVLLazyLoading }}

# IAVLAsyncCommitBuffer defines the size of the async commit channel (in number of blocks),
# -1 means synchronous commitment.
iavl-async-commit-buffer = {{ .BaseConfig.IAVLAsyncCommitBuffer }}

# AppDBBackend defines the database backend type to use for the application and snapshots DBs.
# An empty string indicates that a fallback will be used.
# First fallback is the deprecated compile-time types.DBBackend value.
Expand Down
8 changes: 8 additions & 0 deletions server/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (ms multiStore) Commit() storetypes.CommitID {
panic("not implemented")
}

func (ms multiStore) WaitAsyncCommit() error {
return nil
}

func (ms multiStore) LastCommitID() storetypes.CommitID {
panic("not implemented")
}
Expand Down Expand Up @@ -146,6 +150,10 @@ func (ms multiStore) SetLazyLoading(bool) {
panic("not implemented")
}

func (ms multiStore) SetIAVLAsyncCommitBuffer(size int) {
panic("not implemented")
}

func (ms multiStore) SetInitialVersion(version int64) error {
panic("not implemented")
}
Expand Down
1 change: 1 addition & 0 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const (
FlagIAVLCacheSize = "iavl-cache-size"
FlagDisableIAVLFastNode = "iavl-disable-fastnode"
FlagIAVLLazyLoading = "iavl-lazy-loading"
FlagIAVLAsyncCommitBuffer = "iavl-async-commit-buffer"

// state sync-related flags
FlagStateSyncSnapshotInterval = "state-sync.snapshot-interval"
Expand Down
1 change: 1 addition & 0 deletions server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ func DefaultBaseappOptions(appOpts types.AppOptions) []func(*baseapp.BaseApp) {
defaultMempool,
baseapp.SetIAVLLazyLoading(cast.ToBool(appOpts.Get(FlagIAVLLazyLoading))),
baseapp.SetChainID(chainID),
baseapp.SetIAVLAsyncCommitBuffer(cast.ToInt(appOpts.Get(FlagCommitBufferSize))),
}
}

Expand Down
109 changes: 98 additions & 11 deletions store/iavl/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"sync"

abci "github.com/cometbft/cometbft/abci/types"
cmtprotocrypto "github.com/cometbft/cometbft/proto/tendermint/crypto"
Expand Down Expand Up @@ -35,23 +36,29 @@ var (

// Store Implements types.KVStore and CommitKVStore.
type Store struct {
tree Tree
logger log.Logger
metrics metrics.StoreMetrics
tree Tree
logger log.Logger
metrics metrics.StoreMetrics
initialVersion int64

mtx sync.Mutex
iavlAsyncCommitBuffer int // -1 means synchronized commit
commitQueue chan<- int64
commitQuit <-chan error
}

// LoadStore returns an IAVL Store as a CommitKVStore. Internally, it will load the
// store's version (id) from the provided DB. An error is returned if the version
// fails to load, or if called with a positive version on an empty tree.
func LoadStore(db dbm.DB, logger log.Logger, key types.StoreKey, id types.CommitID, lazyLoading bool, cacheSize int, disableFastNode bool, metrics metrics.StoreMetrics) (types.CommitKVStore, error) {
return LoadStoreWithInitialVersion(db, logger, key, id, lazyLoading, 0, cacheSize, disableFastNode, metrics)
func LoadStore(db dbm.DB, logger log.Logger, key types.StoreKey, id types.CommitID, lazyLoading bool, cacheSize int, disableFastNode bool, metrics metrics.StoreMetrics, iavlAsyncCommitBuffer int) (types.CommitKVStore, error) {
return LoadStoreWithInitialVersion(db, logger, key, id, lazyLoading, 0, cacheSize, disableFastNode, metrics, iavlAsyncCommitBuffer)
}

// LoadStoreWithInitialVersion returns an IAVL Store as a CommitKVStore setting its initialVersion
// to the one given. Internally, it will load the store's version (id) from the
// provided DB. An error is returned if the version fails to load, or if called with a positive
// version on an empty tree.
func LoadStoreWithInitialVersion(db dbm.DB, logger log.Logger, key types.StoreKey, id types.CommitID, lazyLoading bool, initialVersion uint64, cacheSize int, disableFastNode bool, metrics metrics.StoreMetrics) (types.CommitKVStore, error) {
func LoadStoreWithInitialVersion(db dbm.DB, logger log.Logger, key types.StoreKey, id types.CommitID, lazyLoading bool, initialVersion uint64, cacheSize int, disableFastNode bool, metrics metrics.StoreMetrics, iavlAsyncCommitBuffer int) (types.CommitKVStore, error) {
tree, err := iavl.NewMutableTreeWithOpts(db, cacheSize, &iavl.Options{InitialVersion: initialVersion}, disableFastNode)
if err != nil {
return nil, err
Expand Down Expand Up @@ -87,9 +94,11 @@ func LoadStoreWithInitialVersion(db dbm.DB, logger log.Logger, key types.StoreKe
}

return &Store{
tree: tree,
logger: logger,
metrics: metrics,
tree: tree,
logger: logger,
metrics: metrics,
initialVersion: int64(initialVersion),
iavlAsyncCommitBuffer: iavlAsyncCommitBuffer,
}, nil
}

Expand Down Expand Up @@ -132,17 +141,79 @@ func (st *Store) GetImmutable(version int64) (*Store, error) {
func (st *Store) Commit() types.CommitID {
defer st.metrics.MeasureSince("store", "iavl", "commit")

if st.iavlAsyncCommitBuffer >= 0 {
commitID := st.workingCommitID()

st.mtx.Lock()
defer st.mtx.Unlock()

if st.commitQueue == nil {
st.initAsyncCommit()
} else {
// check if the async commit task has failed
select {
case err := <-st.commitQuit:
panic(fmt.Errorf("async commit failed: %v", err))
default:
}
}
st.commitQueue <- commitID.Version
return commitID
}

hash, version, err := st.tree.SaveVersion()
if err != nil {
panic(err)
}

return types.CommitID{
Version: version,
Hash: hash,
}
}

func (st *Store) initAsyncCommit() {
commitQueue := make(chan int64, st.iavlAsyncCommitBuffer)
quitChan := make(chan error)

go func() {
defer close(quitChan)

for expVersion := range commitQueue {
_, version, err := st.tree.SaveVersion()
if err != nil {
quitChan <- err
break
}

if version != expVersion {
quitChan <- fmt.Errorf("version sanity check failed: %d != %d", expVersion, version)
break
}
}
}()
Comment on lines +178 to +193

Check notice

Code scanning / CodeQL

Spawning a Go routine

Spawning a Go routine may be a possible source of non-determinism

st.commitQueue = commitQueue
st.commitQuit = quitChan
}

// WaitAsyncCommit waits for the async commits to finish
func (st *Store) WaitAsyncCommit() error {
st.mtx.Lock()
defer st.mtx.Unlock()

if st.commitQueue == nil {
return nil
}

close(st.commitQueue)
err := <-st.commitQuit

st.commitQueue = nil
st.commitQuit = nil

return err
}

// WorkingHash returns the hash of the current working tree.
func (st *Store) WorkingHash() []byte {
hash, err := st.tree.WorkingHash()
Expand Down Expand Up @@ -276,11 +347,27 @@ func (st *Store) ReverseIterator(start, end []byte) types.Iterator {
}

// SetInitialVersion sets the initial version of the IAVL tree. It is used when
// starting a new chain at an arbitrary height.
// starting a new chain at an arbitrary height, or adding a new store in an upgrade.
func (st *Store) SetInitialVersion(version int64) {
st.initialVersion = version
st.tree.SetInitialVersion(uint64(version))
}

// workingCommitID returns the commit id without actual commit.
//
// FIXME should be done in iavl library.
func (st *Store) workingCommitID() types.CommitID {
version := st.tree.Version() + 1
if version == 1 && st.initialVersion > 0 {
version = st.initialVersion
}
return types.CommitID{
Hash: st.WorkingHash(),
Version: version,
}

}

// Exports the IAVL store at the given version, returning an iavl.Exporter for the tree.
func (st *Store) Export(version int64) (*iavl.Exporter, error) {
istore, err := st.GetImmutable(version)
Expand Down
6 changes: 3 additions & 3 deletions store/iavl/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,17 @@ func TestLoadStore(t *testing.T) {
require.Equal(t, string(hcStore.Get([]byte("hello"))), "ciao")

// Querying a new store at some previous non-pruned height H
newHStore, err := LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), cIDH, false, DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics())
newHStore, err := LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), cIDH, false, DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics(), 0)
require.NoError(t, err)
require.Equal(t, string(newHStore.Get([]byte("hello"))), "hallo")

// Querying a new store at some previous pruned height Hp
newHpStore, err := LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), cIDHp, false, DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics())
newHpStore, err := LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), cIDHp, false, DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics(), 0)
require.NoError(t, err)
require.Equal(t, string(newHpStore.Get([]byte("hello"))), "hola")

// Querying a new store at current height H
newHcStore, err := LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), cIDHc, false, DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics())
newHcStore, err := LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), cIDHc, false, DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics(), 0)
require.NoError(t, err)
require.Equal(t, string(newHcStore.Get([]byte("hello"))), "ciao")
}
Expand Down
4 changes: 4 additions & 0 deletions store/mem/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (s Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.Cach
// Commit performs a no-op as entries are persistent between commitments.
func (s *Store) Commit() (id types.CommitID) { return }

func (s *Store) WaitAsyncCommit() error {
return nil
}

func (s *Store) SetPruning(pruning pruningtypes.PruningOptions) {}

// GetPruning is a no-op as pruning options cannot be directly set on this store.
Expand Down
4 changes: 4 additions & 0 deletions store/rootmulti/dbadapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func (cdsa commitDBStoreAdapter) Commit() types.CommitID {
}
}

func (cdsa commitDBStoreAdapter) WaitAsyncCommit() error {
return nil
}

func (cdsa commitDBStoreAdapter) LastCommitID() types.CommitID {
return types.CommitID{
Version: -1,
Expand Down
2 changes: 1 addition & 1 deletion store/rootmulti/proof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func TestVerifyIAVLStoreQueryProof(t *testing.T) {
// Create main tree for testing.
db := dbm.NewMemDB()
iStore, err := iavl.LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), types.CommitID{}, false, iavl.DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics())
iStore, err := iavl.LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), types.CommitID{}, false, iavl.DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics(), 0)
store := iStore.(*iavl.Store)
require.Nil(t, err)
store.Set([]byte("MYKEY"), []byte("MYVALUE"))
Expand Down
Loading