Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a potential problem of committing non-uniform consensus versions #3453

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ledger/accountdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,7 @@ func updateAccountsHashRound(tx *sql.Tx, hashRound basics.Round) (err error) {
}

if aff != 1 {
err = fmt.Errorf("updateAccountsRound(hashbase,%d): expected to update 1 row but got %d", hashRound, aff)
err = fmt.Errorf("updateAccountsHashRound(hashbase,%d): expected to update 1 row but got %d", hashRound, aff)
return
}
return
Expand Down
18 changes: 12 additions & 6 deletions ledger/acctupdates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,10 @@ func accumulateTotals(t testing.TB, consensusVersion protocol.ConsensusVersion,
return
}

func makeMockLedgerForTracker(t testing.TB, inMemory bool, initialBlocksCount int, consensusVersion protocol.ConsensusVersion, accts []map[basics.Address]basics.AccountData) *mockLedgerForTracker {
func makeMockLedgerForTrackerWithLogger(t testing.TB, inMemory bool, initialBlocksCount int, consensusVersion protocol.ConsensusVersion, accts []map[basics.Address]basics.AccountData, l logging.Logger) *mockLedgerForTracker {
dbs, fileName := dbOpenTest(t, inMemory)
dblogger := logging.TestingLog(t)
dblogger.SetLevel(logging.Info)
dbs.Rdb.SetLogger(dblogger)
dbs.Wdb.SetLogger(dblogger)
dbs.Rdb.SetLogger(l)
dbs.Wdb.SetLogger(l)

blocks := randomInitChain(consensusVersion, initialBlocksCount)
deltas := make([]ledgercore.StateDelta, initialBlocksCount)
Expand All @@ -92,7 +90,15 @@ func makeMockLedgerForTracker(t testing.TB, inMemory bool, initialBlocksCount in
}
}
consensusParams := config.Consensus[consensusVersion]
return &mockLedgerForTracker{dbs: dbs, log: dblogger, filename: fileName, inMemory: inMemory, blocks: blocks, deltas: deltas, consensusParams: consensusParams, accts: accts[0]}
return &mockLedgerForTracker{dbs: dbs, log: l, filename: fileName, inMemory: inMemory, blocks: blocks, deltas: deltas, consensusParams: consensusParams, accts: accts[0]}

}

func makeMockLedgerForTracker(t testing.TB, inMemory bool, initialBlocksCount int, consensusVersion protocol.ConsensusVersion, accts []map[basics.Address]basics.AccountData) *mockLedgerForTracker {
dblogger := logging.TestingLog(t)
dblogger.SetLevel(logging.Info)

return makeMockLedgerForTrackerWithLogger(t, inMemory, initialBlocksCount, consensusVersion, accts, dblogger)
}

// fork creates another database which has the same content as the current one. Works only for non-memory databases.
Expand Down
6 changes: 5 additions & 1 deletion ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,11 @@ func (ct *catchpointTracker) produceCommittingTask(committedRound basics.Round,
return nil
}

dcr.offset = uint64(newBase - dcr.oldBase)
newOffset := uint64(newBase - dcr.oldBase)
// trackers are not allowed to increase offsets, only descease
if newOffset < dcr.offset {
dcr.offset = newOffset
}

// check to see if this is a catchpoint round
dcr.isCatchpointRound = ct.isCatchpointRound(dcr.offset, dcr.oldBase, dcr.lookback)
Expand Down
4 changes: 4 additions & 0 deletions ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func TestGetCatchpointStream(t *testing.T) {

// File on disk, and database has the record
reader, err := ct.GetCatchpointStream(basics.Round(1))
require.NoError(t, err)
n, err = reader.Read(dataRead)
require.NoError(t, err)
require.Equal(t, 3, n)
Expand All @@ -125,13 +126,16 @@ func TestGetCatchpointStream(t *testing.T) {

// File deleted, but record in the database
err = os.Remove(filepath.Join(temporaryDirectroy, "catchpoints", "2.catchpoint"))
require.NoError(t, err)
reader, err = ct.GetCatchpointStream(basics.Round(2))
require.Equal(t, ledgercore.ErrNoEntry{}, err)
require.Nil(t, reader)

// File on disk, but database lost the record
err = ct.accountsq.storeCatchpoint(context.Background(), basics.Round(3), "", "", 0)
require.NoError(t, err)
reader, err = ct.GetCatchpointStream(basics.Round(3))
require.NoError(t, err)
n, err = reader.Read(dataRead)
require.NoError(t, err)
require.Equal(t, 3, n)
Expand Down
11 changes: 11 additions & 0 deletions ledger/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type ledgerTracker interface {
// effort, and all the trackers contribute to that effort. All the trackers are being handed a
// pointer to the deferredCommitRange, and have the ability to either modify it, or return a
// nil. If nil is returned, the commit would be skipped.
// The contract:
// offset must not be greater than the received dcr.offset value of non zero
// oldBase must not be modifed if non zero
produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange

// prepareCommit, commitRound and postCommit are called when it is time to commit tracker's data.
Expand Down Expand Up @@ -321,10 +324,18 @@ func (tr *trackerRegistry) scheduleCommit(blockqRound, maxLookback basics.Round)
}
cdr := &dcc.deferredCommitRange
for _, lt := range tr.trackers {
base := cdr.oldBase
offset := cdr.offset
cdr = lt.produceCommittingTask(blockqRound, dbRound, cdr)
if cdr == nil {
break
}
if offset > 0 && cdr.offset > offset {
tr.log.Warnf("tracker %T produced offset %d but expected not greater than %d, dbRound %d, latestRound %d", lt, cdr.offset, offset, dbRound, blockqRound)
}
if base > 0 && base != cdr.oldBase {
tr.log.Warnf("tracker %T modified oldBase %d that expected to be %d, dbRound %d, latestRound %d", lt, cdr.oldBase, base, dbRound, blockqRound)
}
}
if cdr != nil {
dcc.deferredCommitRange = *cdr
Expand Down
125 changes: 125 additions & 0 deletions ledger/tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright (C) 2019-2022 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package ledger

import (
"bytes"
"testing"

"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/ledger/ledgercore"
ledgertesting "github.com/algorand/go-algorand/ledger/testing"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
)

// TestTrackerScheduleCommit checks catchpointTracker.produceCommittingTask does not increase commit offset relative
// to the value set by accountUpdates
func TestTrackerScheduleCommit(t *testing.T) {
partitiontest.PartitionTest(t)

a := require.New(t)

var bufNewLogger bytes.Buffer
log := logging.NewLogger()
log.SetOutput(&bufNewLogger)

accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(1, true)}
ml := makeMockLedgerForTrackerWithLogger(t, true, 10, protocol.ConsensusCurrentVersion, accts, log)
defer ml.Close()

conf := config.GetDefaultLocal()
conf.CatchpointTracking = 1
conf.CatchpointInterval = 10

au := &accountUpdates{}
ct := &catchpointTracker{}
au.initialize(conf)
ct.initialize(conf, ".")

_, err := trackerDBInitialize(ml, false, ".")
a.NoError(err)

ml.trackers.initialize(ml, []ledgerTracker{au, ct}, conf)
defer ml.trackers.close()
err = ml.trackers.loadFromDisk(ml)
a.NoError(err)
// close commitSyncer goroutine
ml.trackers.ctxCancel()
ml.trackers.ctxCancel = nil
<-ml.trackers.commitSyncerClosed
ml.trackers.commitSyncerClosed = nil

// simulate situation when au returns smaller offset b/c of consecutive versions
// and ct increses it
// base = 1, offset = 100, lookback = 16
// lastest = 1000
// would give a large mostRecentCatchpointRound value => large newBase => larger offset

expectedOffset := uint64(100)
blockqRound := basics.Round(1000)
lookback := basics.Round(16)
dbRound := basics.Round(1)

// prepare deltas and versions
au.accountsMu.Lock()
au.deltas = make([]ledgercore.AccountDeltas, int(blockqRound))
au.deltasAccum = make([]int, int(blockqRound))
au.versions = make([]protocol.ConsensusVersion, int(blockqRound))
for i := 0; i <= int(expectedOffset); i++ {
au.versions[i] = protocol.ConsensusCurrentVersion
}
for i := int(expectedOffset) + 1; i < len(au.versions); i++ {
au.versions[i] = protocol.ConsensusFuture
}
au.accountsMu.Unlock()

// ensure au and ct produce data we expect
dcc := &deferredCommitContext{
deferredCommitRange: deferredCommitRange{
lookback: lookback,
},
}
cdr := &dcc.deferredCommitRange

cdr = au.produceCommittingTask(blockqRound, dbRound, cdr)
a.NotNil(cdr)
a.Equal(expectedOffset, cdr.offset)

cdr = ct.produceCommittingTask(blockqRound, dbRound, cdr)
a.NotNil(cdr)
// before the fix
// expectedOffset = uint64(blockqRound - lookback - dbRound) // 983
a.Equal(expectedOffset, cdr.offset)

// schedule the commit. au is expected to return offset 100 and
ml.trackers.mu.Lock()
ml.trackers.dbRound = dbRound
ml.trackers.mu.Unlock()
ml.trackers.scheduleCommit(blockqRound, lookback)

a.Equal(1, len(ml.trackers.deferredCommits))
// before the fix
// a.Contains(bufNewLogger.String(), "tracker *ledger.catchpointTracker produced offset 983")
a.NotContains(bufNewLogger.String(), "tracker *ledger.catchpointTracker produced offset")
dc := <-ml.trackers.deferredCommits
a.Equal(expectedOffset, dc.offset)
}