Skip to content

Commit

Permalink
Merge branch 'master' into hbandura/upstream_1.10.8_2
Browse files Browse the repository at this point in the history
  • Loading branch information
hbandura authored Apr 20, 2022
2 parents a350306 + 5db2af7 commit 178faa1
Show file tree
Hide file tree
Showing 18 changed files with 1,492 additions and 251 deletions.
76 changes: 76 additions & 0 deletions cmd/uptime/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"fmt"
"os"
"path/filepath"

"github.com/celo-org/celo-blockchain/cmd/utils"
"github.com/celo-org/celo-blockchain/internal/debug"
"github.com/celo-org/celo-blockchain/internal/flags"
"github.com/celo-org/celo-blockchain/node"
"github.com/celo-org/celo-blockchain/params"
"gopkg.in/urfave/cli.v1"
)

var (
// Git information set by linker when building with ci.go.
gitCommit string
gitDate string
app = &cli.App{
Name: filepath.Base(os.Args[0]),
Usage: "uptime",
Version: params.VersionWithCommit(gitCommit, gitDate),
Writer: os.Stdout,
HideVersion: true,
EnableBashCompletion: true,
}
)

func init() {
// Set up the CLI app.
app.Flags = append(app.Flags, debug.Flags...)
app.Flags = append(app.Flags, utils.DataDirFlag)
app.Before = func(ctx *cli.Context) error {
return debug.Setup(ctx)
}
app.After = func(ctx *cli.Context) error {
debug.Exit()
return nil
}
app.CommandNotFound = func(ctx *cli.Context, cmd string) {
fmt.Fprintf(os.Stderr, "No such command: %s\n", cmd)
os.Exit(1)
}
// Add subcommands.
app.Commands = []cli.Command{
reportUptimeCommand,
}
cli.CommandHelpTemplate = flags.OriginCommandHelpTemplate
}

func exit(err interface{}) {
if err == nil {
os.Exit(0)
}
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}

func main() {
exit(app.Run(os.Args))
}

const (
clientIdentifier = "celo" // Client identifier to advertise over the network
)

func defaultNodeConfig() node.Config {
cfg := node.DefaultConfig
cfg.Name = clientIdentifier
cfg.Version = params.VersionWithCommit(gitCommit, gitDate)
cfg.HTTPModules = append(cfg.HTTPModules, "eth")
cfg.WSModules = append(cfg.WSModules, "eth")
cfg.IPCPath = "geth.ipc"
return cfg
}
133 changes: 133 additions & 0 deletions cmd/uptime/report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package main

import (
"fmt"
"time"

"github.com/celo-org/celo-blockchain/cmd/utils"
"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/consensus/istanbul"
"github.com/celo-org/celo-blockchain/consensus/istanbul/uptime"
"github.com/celo-org/celo-blockchain/core/rawdb"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/celo-org/celo-blockchain/ethdb"
"github.com/celo-org/celo-blockchain/node"
"gopkg.in/urfave/cli.v1"
)

var epochFlag = cli.Int64Flag{
Name: "epoch",
Usage: "Epoch number to report on",
}

var lookbackFlag = cli.Int64Flag{
Name: "lookback",
Usage: "Lookback window to use for the uptime calculation",
}

var valSetSizeFlag = cli.Int64Flag{
Name: "valset",
Usage: "Validator set size to use in the calculation",
}

var reportUptimeCommand = cli.Command{
Name: "report",
Usage: "Reports uptime for all validators",
Action: reportUptime,
ArgsUsage: "",
Flags: []cli.Flag{epochFlag, lookbackFlag, valSetSizeFlag},
}

// getHeaderByNumber retrieves a block header from the database by number,
// caching it (associated with its hash) if found.
func getHeaderByNumber(db ethdb.Database, number uint64) *types.Header {
hash := rawdb.ReadCanonicalHash(db, number)
if hash == (common.Hash{}) {
return nil
}
return rawdb.ReadHeader(db, hash, number)
}

func reportUptime(ctx *cli.Context) error {
if !ctx.IsSet(epochFlag.Name) {
utils.Fatalf("This command requires an epoch argument")
}
epoch := ctx.Uint64(epochFlag.Name)

// lookback and valset size could actually be calculated
// but we need a whole instantiated blockchain to be able
// to execute the contract calls.
if !ctx.IsSet(lookbackFlag.Name) {
utils.Fatalf("This command requires a lookback argument")
}
lookback := ctx.Uint64(lookbackFlag.Name)

if !ctx.IsSet(valSetSizeFlag.Name) {
utils.Fatalf("This command requires a valset argument")
}
valSetSize := ctx.Uint64(valSetSizeFlag.Name)

cfg := defaultNodeConfig()
cfg.DataDir = utils.MakeDataDir(ctx)
nod, _ := node.New(&cfg)
defer nod.Close()

db := utils.MakeChainDatabase(ctx, nod, true)
defer db.Close()

genesisHash := rawdb.ReadCanonicalHash(db, 0)
genConfig := rawdb.ReadChainConfig(db, genesisHash)
epochSize := genConfig.Istanbul.Epoch
lastBlock := istanbul.GetEpochLastBlockNumber(epoch, epochSize)
headers := getHeaders(db, lastBlock, int(epochSize))
return runReport(headers, epochSize, lookback, int(valSetSize))
}

func getHeaders(db ethdb.Database, lastBlock uint64, amount int) []*types.Header {
start := time.Now()
headers := make([]*types.Header, amount)

headers[amount-1] = getHeaderByNumber(db, lastBlock)
for i := amount - 2; i >= 0; i-- {
headers[i] = rawdb.ReadHeader(db, headers[i+1].ParentHash, headers[i+1].Number.Uint64()-1)
}
fmt.Printf("Headers[%d, %d] retrieved in %v\n", headers[0].Number.Int64(), headers[len(headers)-1].Number.Int64(), time.Since(start))
return headers
}

func runReport(headers []*types.Header, epochSize uint64, lookback uint64, valSetSize int) error {
epoch := istanbul.GetEpochNumber(headers[0].Number.Uint64(), epochSize)
monitor := uptime.NewMonitor(epochSize, epoch, lookback, valSetSize)
start := time.Now()
var header *types.Header
for _, header = range headers {
monitor.ProcessHeader(header)
}
fmt.Printf("Headers added in %v\n", time.Since(start))
r, err := monitor.ComputeUptime(header)
if err != nil {
return err
}
fmt.Printf("Report done in %v\n", time.Since(start))
for i, v := range r {
fmt.Println("Validator", i, "uptime", v)
}
return nil
}

type singleEpochStore struct {
epoch uint64
uptime *uptime.Uptime
}

func (m *singleEpochStore) ReadAccumulatedEpochUptime(epoch uint64) *uptime.Uptime {
if m.epoch == epoch {
return m.uptime
}
return nil
}

func (m *singleEpochStore) WriteAccumulatedEpochUptime(epoch uint64, uptime *uptime.Uptime) {
m.epoch = epoch
m.uptime = uptime
}
3 changes: 3 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ type Istanbul interface {
// The changes are executed inline.
UpdateValSetDiff(chain ChainHeaderReader, header *types.Header, state *state.StateDB) error

// OnBlockInsertion is a hook method called when system is inserting a block to the chain
OnBlockInsertion(header *types.Header, state *state.StateDB) error

// IsLastBlockOfEpoch will check to see if the header is from the last block of an epoch
IsLastBlockOfEpoch(header *types.Header) bool

Expand Down
20 changes: 20 additions & 0 deletions consensus/istanbul/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/celo-org/celo-blockchain/consensus/istanbul/backend/internal/replica"
istanbulCore "github.com/celo-org/celo-blockchain/consensus/istanbul/core"
"github.com/celo-org/celo-blockchain/consensus/istanbul/proxy"
"github.com/celo-org/celo-blockchain/consensus/istanbul/uptime"
"github.com/celo-org/celo-blockchain/consensus/istanbul/validator"
"github.com/celo-org/celo-blockchain/contracts"
"github.com/celo-org/celo-blockchain/contracts/election"
Expand Down Expand Up @@ -341,6 +342,8 @@ type Backend struct {
randomSeed []byte
randomSeedMu sync.Mutex

uptimeMonitor uptime.Builder

// Test hooks
abortCommitHook func(result *istanbulCore.StateProcessResult) bool // Method to call upon committing a proposal
}
Expand Down Expand Up @@ -1004,6 +1007,23 @@ func (sb *Backend) RemoveProxy(node *enode.Node) error {
}
}

func (sb *Backend) OnBlockInsertion(header *types.Header, state *state.StateDB) error {
return sb.retrieveUptimeScoreBuilder(header, state).ProcessHeader(header)
}

func (sb *Backend) retrieveUptimeScoreBuilder(header *types.Header, state *state.StateDB) uptime.Builder {
epoch := istanbul.GetEpochNumber(header.Number.Uint64(), sb.EpochSize())

if sb.uptimeMonitor == nil || sb.uptimeMonitor.GetEpoch() != epoch {
valSet := sb.GetValidators(header.Number, header.Hash())
lookbackWindow := sb.LookbackWindow(header, state)
builder := uptime.NewMonitor(sb.EpochSize(), epoch, lookbackWindow, len(valSet))
headersProvider := istanbul.NewHeadersProvider(sb.chain)
sb.uptimeMonitor = uptime.NewAutoFixBuilder(builder, headersProvider)
}
return sb.uptimeMonitor
}

// VerifyPendingBlockValidatorSignature will verify that the message sender is a validator that is responsible
// for the current pending block (the next block right after the head block).
func (sb *Backend) VerifyPendingBlockValidatorSignature(data []byte, sig []byte) (common.Address, error) {
Expand Down
13 changes: 2 additions & 11 deletions consensus/istanbul/backend/pos.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (

"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/consensus/istanbul"
"github.com/celo-org/celo-blockchain/consensus/istanbul/uptime"
"github.com/celo-org/celo-blockchain/consensus/istanbul/uptime/store"
"github.com/celo-org/celo-blockchain/contracts"
"github.com/celo-org/celo-blockchain/contracts/currency"
"github.com/celo-org/celo-blockchain/contracts/election"
Expand Down Expand Up @@ -145,16 +143,9 @@ func (sb *Backend) updateValidatorScores(header *types.Header, state *state.Stat
logger := sb.logger.New("func", "Backend.updateValidatorScores", "blocknum", header.Number.Uint64(), "epoch", epoch, "epochsize", sb.EpochSize())

// header (&state) == lastBlockOfEpoch
// sb.LookbackWindow(header, state) => value at the end of epoch
// It doesn't matter which was the value at the beginning but how it ends.
// Notice that exposed metrics compute based on current block (not last of epoch) so if lookback window changed during the epoch, metric uptime score might differ
lookbackWindow := sb.LookbackWindow(header, state)

logger = logger.New("window", lookbackWindow)
logger.Trace("Updating validator scores")

monitor := uptime.NewMonitor(store.New(sb.db), sb.EpochSize(), lookbackWindow)
uptimes, err := monitor.ComputeValidatorsUptime(epoch, len(valSet))
monitor := sb.retrieveUptimeScoreBuilder(header, state)
uptimes, err := monitor.ComputeUptime(header)
if err != nil {
return nil, err
}
Expand Down
75 changes: 75 additions & 0 deletions consensus/istanbul/headers_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package istanbul

import (
"fmt"

"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/core/types"
)

type ChainHeaderReader interface {
// GetHeader retrieves a block header from the database by hash and number.
GetHeader(hash common.Hash, number uint64) *types.Header

// GetHeaderByNumber retrieves a block header from the database by number.
GetHeaderByNumber(number uint64) *types.Header
}

type EpochHeadersProvider interface {
// GetEpochHeadersUpTo returns all headers from the same epoch as the header provided (included) ordered, up to
// the given one, with a limit on the amount of headers to return. E.g limit 3 would return [prev-2, prev-1, upToHeader], assuming
// all of them are on the same epoch.
GetEpochHeadersUpToLimit(epochSize uint64, upToHeader *types.Header, limit uint64) ([]*types.Header, error)
}

func NewHeadersProvider(chr ChainHeaderReader) EpochHeadersProvider {
return &dbHeadersProvider{chr: chr}
}

type dbHeadersProvider struct {
chr ChainHeaderReader
}

func (d *dbHeadersProvider) GetEpochHeadersUpToLimit(epochSize uint64, upToHeader *types.Header, limit uint64) ([]*types.Header, error) {
if limit == 0 {
return []*types.Header{}, nil
}
number := upToHeader.Number.Uint64()
numberWithinEpoch := GetNumberWithinEpoch(number, epochSize)
var amountToLoad uint64 = numberWithinEpoch - 1
if amountToLoad > limit-1 {
amountToLoad = limit - 1
}
if amountToLoad == 0 {
// Nothing to do
return []*types.Header{upToHeader}, nil
}
headers := getHeaders(d.chr, number-1, amountToLoad)
if headers == nil {
// Error retrieving headers
return nil, fmt.Errorf("error attempting to retrieve headers: epochSize %d, headerHash %v", epochSize, upToHeader.Hash())
}
return append(headers, upToHeader), nil
}

func getHeaders(chr ChainHeaderReader, lastBlock uint64, amount uint64) []*types.Header {
headers := make([]*types.Header, amount)
if amount == 0 {
// Nothing to do
return headers
}
lastHeader := chr.GetHeaderByNumber(lastBlock)
if lastHeader == nil {
// Error retrieving header
return nil
}
headers[amount-1] = lastHeader
for i := amount - 1; i > 0; i-- {
headers[i-1] = chr.GetHeader(headers[i].ParentHash, headers[i].Number.Uint64()-1)
if headers[i-1] == nil {
// Error retrieving header
return nil
}
}
return headers
}
Loading

0 comments on commit 178faa1

Please sign in to comment.