Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: async commit #16175

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func SetIAVLLazyLoading(lazyLoading bool) func(*BaseApp) {
return func(bapp *BaseApp) { bapp.cms.SetLazyLoading(lazyLoading) }
}

// SetCommitBufferSize sets the size of the buffer for concurrent commit calls.
yihuang marked this conversation as resolved.
Show resolved Hide resolved
func SetCommitBufferSize(size int) func(*BaseApp) {
return func(bapp *BaseApp) { bapp.cms.SetCommitBufferSize(size) }
}

// SetInterBlockCache provides a BaseApp option function that sets the
// inter-block cache.
func SetInterBlockCache(cache storetypes.MultiStorePersistentCache) func(*BaseApp) {
Expand Down
5 changes: 5 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ type BaseConfig struct {
// IAVLLazyLoading enable/disable the lazy loading of iavl store.
IAVLLazyLoading bool `mapstructure:"iavl-lazy-loading"`

// CommitBufferSize defines the size of the async commit channel (in number of blocks),
// 0 means synchronous commitment.
CommitBufferSize int `mapstructure:"commit-buffer-size"`

// AppDBBackend defines the type of Database to use for the application and snapshots databases.
// An empty string indicates that the CometBFT config's DBBackend value should be used.
AppDBBackend string `mapstructure:"app-db-backend"`
Expand Down Expand Up @@ -237,6 +241,7 @@ func DefaultConfig() *Config {
IAVLCacheSize: 781250,
IAVLDisableFastNode: false,
IAVLLazyLoading: false,
CommitBufferSize: 0,
AppDBBackend: "",
},
Telemetry: telemetry.Config{
Expand Down
4 changes: 4 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ iavl-disable-fastnode = {{ .BaseConfig.IAVLDisableFastNode }}
# Default is false.
iavl-lazy-loading = {{ .BaseConfig.IAVLLazyLoading }}

# CommitBufferSize defines the size of the async commit channel (in number of blocks),
# 0 means synchronous commitment.
commit-buffer-size = {{ .BaseConfig.CommitBufferSize }}

# AppDBBackend defines the database backend type to use for the application and snapshots DBs.
# An empty string indicates that a fallback will be used.
# First fallback is the deprecated compile-time types.DBBackend value.
Expand Down
1 change: 1 addition & 0 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const (
FlagIAVLCacheSize = "iavl-cache-size"
FlagDisableIAVLFastNode = "iavl-disable-fastnode"
FlagIAVLLazyLoading = "iavl-lazy-loading"
FlagCommitBufferSize = "commit-buffer-size"

// state sync-related flags
FlagStateSyncSnapshotInterval = "state-sync.snapshot-interval"
Expand Down
1 change: 1 addition & 0 deletions server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ func DefaultBaseappOptions(appOpts types.AppOptions) []func(*baseapp.BaseApp) {
defaultMempool,
baseapp.SetIAVLLazyLoading(cast.ToBool(appOpts.Get(FlagIAVLLazyLoading))),
baseapp.SetChainID(chainID),
baseapp.SetCommitBufferSize(cast.ToInt(appOpts.Get(FlagCommitBufferSize))),
}
}

Expand Down
109 changes: 98 additions & 11 deletions store/iavl/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"sync"

abci "github.com/cometbft/cometbft/abci/types"
cmtprotocrypto "github.com/cometbft/cometbft/proto/tendermint/crypto"
Expand Down Expand Up @@ -35,23 +36,29 @@ var (

// Store Implements types.KVStore and CommitKVStore.
type Store struct {
tree Tree
logger log.Logger
metrics metrics.StoreMetrics
tree Tree
logger log.Logger
metrics metrics.StoreMetrics
initialVersion int64

mtx sync.Mutex
commitBufferSize int // 0 means synchronized commit
commitQueue chan<- int64
commitQuit <-chan error
}

// LoadStore returns an IAVL Store as a CommitKVStore. Internally, it will load the
// store's version (id) from the provided DB. An error is returned if the version
// fails to load, or if called with a positive version on an empty tree.
func LoadStore(db dbm.DB, logger log.Logger, key types.StoreKey, id types.CommitID, lazyLoading bool, cacheSize int, disableFastNode bool, metrics metrics.StoreMetrics) (types.CommitKVStore, error) {
return LoadStoreWithInitialVersion(db, logger, key, id, lazyLoading, 0, cacheSize, disableFastNode, metrics)
func LoadStore(db dbm.DB, logger log.Logger, key types.StoreKey, id types.CommitID, lazyLoading bool, cacheSize int, disableFastNode bool, metrics metrics.StoreMetrics, commitBufferSize int) (types.CommitKVStore, error) {
return LoadStoreWithInitialVersion(db, logger, key, id, lazyLoading, 0, cacheSize, disableFastNode, metrics, commitBufferSize)
}

// LoadStoreWithInitialVersion returns an IAVL Store as a CommitKVStore setting its initialVersion
// to the one given. Internally, it will load the store's version (id) from the
// provided DB. An error is returned if the version fails to load, or if called with a positive
// version on an empty tree.
func LoadStoreWithInitialVersion(db dbm.DB, logger log.Logger, key types.StoreKey, id types.CommitID, lazyLoading bool, initialVersion uint64, cacheSize int, disableFastNode bool, metrics metrics.StoreMetrics) (types.CommitKVStore, error) {
func LoadStoreWithInitialVersion(db dbm.DB, logger log.Logger, key types.StoreKey, id types.CommitID, lazyLoading bool, initialVersion uint64, cacheSize int, disableFastNode bool, metrics metrics.StoreMetrics, commitBufferSize int) (types.CommitKVStore, error) {
tree, err := iavl.NewMutableTreeWithOpts(db, cacheSize, &iavl.Options{InitialVersion: initialVersion}, disableFastNode)
if err != nil {
return nil, err
Expand Down Expand Up @@ -87,9 +94,11 @@ func LoadStoreWithInitialVersion(db dbm.DB, logger log.Logger, key types.StoreKe
}

return &Store{
tree: tree,
logger: logger,
metrics: metrics,
tree: tree,
logger: logger,
metrics: metrics,
initialVersion: int64(initialVersion),
commitBufferSize: commitBufferSize,
}, nil
}

Expand Down Expand Up @@ -132,17 +141,79 @@ func (st *Store) GetImmutable(version int64) (*Store, error) {
func (st *Store) Commit() types.CommitID {
defer st.metrics.MeasureSince("store", "iavl", "commit")

if st.commitBufferSize > 0 {
commitId := st.workingCommitID()

st.mtx.Lock()
defer st.mtx.Unlock()

if st.commitQueue == nil {
st.initAsyncCommit()
} else {
// check if the async commit task has failed
select {
case err := <-st.commitQuit:
panic(fmt.Errorf("async commit failed: %v", err))
default:
}
}
st.commitQueue <- commitId.Version
return commitId
}

hash, version, err := st.tree.SaveVersion()
if err != nil {
panic(err)
}

return types.CommitID{
Version: version,
Hash: hash,
}
}

func (st *Store) initAsyncCommit() {
commitQueue := make(chan int64, st.commitBufferSize)
quitChan := make(chan error)

go func() {
defer close(quitChan)

for expVersion := range commitQueue {
_, version, err := st.tree.SaveVersion()
if err != nil {
quitChan <- err
break
}

if version != expVersion {
quitChan <- fmt.Errorf("version sanity check failed: %d != %d", expVersion, version)
break
}
}
}()
Comment on lines +178 to +193

Check notice

Code scanning / CodeQL

Spawning a Go routine

Spawning a Go routine may be a possible source of non-determinism

st.commitQueue = commitQueue
st.commitQuit = quitChan
}

// WaitAsyncCommit waits for the async commits to finish
func (st *Store) WaitAsyncCommit() error {
st.mtx.Lock()
defer st.mtx.Unlock()

if st.commitQueue == nil {
return nil
}

close(st.commitQueue)
err := <-st.commitQuit

st.commitQueue = nil
st.commitQuit = nil

return err
}

// WorkingHash returns the hash of the current working tree.
func (st *Store) WorkingHash() []byte {
hash, err := st.tree.WorkingHash()
Expand Down Expand Up @@ -276,11 +347,27 @@ func (st *Store) ReverseIterator(start, end []byte) types.Iterator {
}

// SetInitialVersion sets the initial version of the IAVL tree. It is used when
// starting a new chain at an arbitrary height.
// starting a new chain at an arbitrary height, or adding a new store in an upgrade.
func (st *Store) SetInitialVersion(version int64) {
st.initialVersion = version
st.tree.SetInitialVersion(uint64(version))
}

// workingCommitID returns the commit id without actual commit.
//
// FIXME should be done in iavl library.
func (st *Store) workingCommitID() types.CommitID {
version := st.tree.Version() + 1
if version == 1 && st.initialVersion > 0 {
version = st.initialVersion
}
return types.CommitID{
Hash: st.WorkingHash(),
Version: version,
}

}

// Exports the IAVL store at the given version, returning an iavl.Exporter for the tree.
func (st *Store) Export(version int64) (*iavl.Exporter, error) {
istore, err := st.GetImmutable(version)
Expand Down
6 changes: 3 additions & 3 deletions store/iavl/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,17 @@ func TestLoadStore(t *testing.T) {
require.Equal(t, string(hcStore.Get([]byte("hello"))), "ciao")

// Querying a new store at some previous non-pruned height H
newHStore, err := LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), cIDH, false, DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics())
newHStore, err := LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), cIDH, false, DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics(), 0)
require.NoError(t, err)
require.Equal(t, string(newHStore.Get([]byte("hello"))), "hallo")

// Querying a new store at some previous pruned height Hp
newHpStore, err := LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), cIDHp, false, DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics())
newHpStore, err := LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), cIDHp, false, DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics(), 0)
require.NoError(t, err)
require.Equal(t, string(newHpStore.Get([]byte("hello"))), "hola")

// Querying a new store at current height H
newHcStore, err := LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), cIDHc, false, DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics())
newHcStore, err := LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), cIDHc, false, DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics(), 0)
require.NoError(t, err)
require.Equal(t, string(newHcStore.Get([]byte("hello"))), "ciao")
}
Expand Down
4 changes: 4 additions & 0 deletions store/mem/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (s Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.Cach
// Commit performs a no-op as entries are persistent between commitments.
func (s *Store) Commit() (id types.CommitID) { return }

func (s *Store) WaitAsyncCommit() error {
return nil
}

func (s *Store) SetPruning(pruning pruningtypes.PruningOptions) {}

// GetPruning is a no-op as pruning options cannot be directly set on this store.
Expand Down
4 changes: 4 additions & 0 deletions store/rootmulti/dbadapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func (cdsa commitDBStoreAdapter) Commit() types.CommitID {
}
}

func (cdsa commitDBStoreAdapter) WaitAsyncCommit() error {
return nil
}

func (cdsa commitDBStoreAdapter) LastCommitID() types.CommitID {
return types.CommitID{
Version: -1,
Expand Down
2 changes: 1 addition & 1 deletion store/rootmulti/proof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func TestVerifyIAVLStoreQueryProof(t *testing.T) {
// Create main tree for testing.
db := dbm.NewMemDB()
iStore, err := iavl.LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), types.CommitID{}, false, iavl.DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics())
iStore, err := iavl.LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), types.CommitID{}, false, iavl.DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics(), 0)
store := iStore.(*iavl.Store)
require.Nil(t, err)
store.Set([]byte("MYKEY"), []byte("MYVALUE"))
Expand Down
29 changes: 27 additions & 2 deletions store/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Store struct {
listeners map[types.StoreKey]*types.MemoryListener
metrics metrics.StoreMetrics
commitHeader cmtproto.Header
commitBufferSize int // 0 means synchronous commit
}

var (
Expand Down Expand Up @@ -497,6 +498,15 @@ func (rs *Store) Commit() types.CommitID {
}
}

// WaitAsyncCommit waits for the async commits to finish
func (rs *Store) WaitAsyncCommit() error {
errs := make([]error, 0, len(rs.stores))
for _, store := range rs.stores {
errs = append(errs, store.WaitAsyncCommit())
}
Comment on lines +504 to +506

Check warning

Code scanning / CodeQL

Iteration over map

Iteration over map may be a possible source of non-determinism
return errors.Join(errs...)
}

// WorkingHash returns the current hash of the store.
// it will be used to get the current app hash before commit.
func (rs *Store) WorkingHash() []byte {
Expand Down Expand Up @@ -794,6 +804,21 @@ func (rs *Store) SetInitialVersion(version int64) error {
return nil
}

// SetCommitBufferSize sets the buffer size for the commit channel,
// 0 means synchronous commit.
func (rs *Store) SetCommitBufferSize(size int) {
rs.commitBufferSize = size

for key, store := range rs.stores {
if store.GetStoreType() == types.StoreTypeIAVL {
// If the store is wrapped with an inter-block cache, we must first unwrap
// it to get the underlying IAVL store.
store = rs.GetCommitKVStore(key)
store.(types.StoreWithCommitBufferSize).SetCommitBufferSize(size)
}
}

Check warning

Code scanning / CodeQL

Iteration over map

Iteration over map may be a possible source of non-determinism
}

// parsePath expects a format like /<storeName>[/<subpath>]
// Must start with /, subpath may be empty
// Returns error if it doesn't start with /
Expand Down Expand Up @@ -1017,9 +1042,9 @@ func (rs *Store) loadCommitStoreFromParams(key types.StoreKey, id types.CommitID
var err error

if params.initialVersion == 0 {
store, err = iavl.LoadStore(db, rs.logger, key, id, rs.lazyLoading, rs.iavlCacheSize, rs.iavlDisableFastNode, rs.metrics)
store, err = iavl.LoadStore(db, rs.logger, key, id, rs.lazyLoading, rs.iavlCacheSize, rs.iavlDisableFastNode, rs.metrics, rs.commitBufferSize)
} else {
store, err = iavl.LoadStoreWithInitialVersion(db, rs.logger, key, id, rs.lazyLoading, params.initialVersion, rs.iavlCacheSize, rs.iavlDisableFastNode, rs.metrics)
store, err = iavl.LoadStoreWithInitialVersion(db, rs.logger, key, id, rs.lazyLoading, params.initialVersion, rs.iavlCacheSize, rs.iavlDisableFastNode, rs.metrics, rs.commitBufferSize)
}

if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions store/transient/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func (ts *Store) Commit() (id types.CommitID) {
return
}

func (ts *Store) WaitAsyncCommit() error {
return nil
}

func (ts *Store) SetPruning(_ pruningtypes.PruningOptions) {}

// GetPruning is a no-op as pruning options cannot be directly set on this store.
Expand Down
2 changes: 1 addition & 1 deletion store/types/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

func newMemTestKVStore(t *testing.T) types.KVStore {
db := dbm.NewMemDB()
store, err := iavl.LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), types.CommitID{}, false, iavl.DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics())
store, err := iavl.LoadStore(db, log.NewNopLogger(), types.NewKVStoreKey("test"), types.CommitID{}, false, iavl.DefaultIAVLCacheSize, false, metrics.NewNoOpMetrics(), 0)
require.NoError(t, err)
return store
}
Expand Down
10 changes: 10 additions & 0 deletions store/types/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Store interface {
type Committer interface {
Commit() CommitID
LastCommitID() CommitID
WaitAsyncCommit() error

// WorkingHash returns the hash of the KVStore's state before commit.
WorkingHash() []byte
Expand Down Expand Up @@ -194,6 +195,9 @@ type CommitMultiStore interface {
// SetIAVLLazyLoading enable/disable lazy loading on iavl.
SetLazyLoading(lazyLoading bool)

// SetCommitBufferSize sets the size of the buffer for the async commit, 0 means synchronous commit.
SetCommitBufferSize(size int)

// RollbackToVersion rollback the db to specific version(height).
RollbackToVersion(version int64) error

Expand Down Expand Up @@ -489,6 +493,12 @@ type StoreWithInitialVersion interface {
SetInitialVersion(version int64)
}

// StoreWithCommitBufferSize is a store that can have an arbitrary initial
// version.
type StoreWithCommitBufferSize interface {
SetCommitBufferSize(size int)
}

// NewTransientStoreKeys constructs a new map of TransientStoreKey's
// Must return pointers according to the ocap principle
// The function will panic if there is a potential conflict in names
Expand Down
4 changes: 4 additions & 0 deletions x/upgrade/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func BeginBlocker(k *keeper.Keeper, ctx sdk.Context) {
panic(fmt.Errorf("unable to write upgrade info to filesystem: %s", err.Error()))
}

if err := k.WaitAsyncCommit(); err != nil {
Copy link
Collaborator Author

@yihuang yihuang May 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if err := k.WaitAsyncCommit(); err != nil {
// avoid state rollback after restart
if err := k.WaitAsyncCommit(); err != nil {

panic(fmt.Errorf("async commit failed: %w", err))
}

upgradeMsg := BuildUpgradeNeededMsg(plan)
logger.Error(upgradeMsg)
panic(upgradeMsg)
Expand Down
Loading