Skip to content

Commit

Permalink
chg: Cancel active sealing op when a new chain segment is received (M…
Browse files Browse the repository at this point in the history
…AT-1116) (ethereum#54)

* dev: Fix out-of-turn signing log statement

* dev: Cancel active sealing op when a new chain segment is received

* remove debug statement and return value

* minor: no need to use a channel

* Fix race condition in shouldSeal channel

* Info -> Debug statement
  • Loading branch information
atvanguard authored May 11, 2020
1 parent e03cd94 commit d1ea515
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 17 deletions.
68 changes: 58 additions & 10 deletions consensus/bor/bor.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ var (
errRecentlySigned = errors.New("recently signed")
)

// ActiveSealingOp keeps the context of the active sealing operation
type ActiveSealingOp struct {
number uint64
cancel context.CancelFunc
}

// SignerFn is a signer callback function to request a header to be signed by a
// backing account.
type SignerFn func(accounts.Account, string, []byte) ([]byte, error)
Expand Down Expand Up @@ -246,8 +252,9 @@ type Bor struct {
stateReceiverABI abi.ABI
HeimdallClient IHeimdallClient

stateDataFeed event.Feed
scope event.SubscriptionScope
stateDataFeed event.Feed
scope event.SubscriptionScope
activeSealingOp *ActiveSealingOp
// The fields below are for testing only
fakeDiff bool // Skip difficulty verifications
}
Expand Down Expand Up @@ -725,6 +732,9 @@ func (c *Bor) Authorize(signer common.Address, signFn SignerFn) {
// Seal implements consensus.Engine, attempting to create a sealed block using
// the local signing credentials.
func (c *Bor) Seal(chain consensus.ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error {
if c.activeSealingOp != nil {
return &SealingInFlightError{c.activeSealingOp.number}
}
header := block.Header()

// Sealing the genesis block is not supported
Expand Down Expand Up @@ -762,9 +772,6 @@ func (c *Bor) Seal(chain consensus.ChainReader, block *types.Block, results chan
wiggle := time.Duration(2*c.config.Period) * time.Second * time.Duration(successionNumber)
delay += wiggle

log.Info("Out-of-turn signing requested", "wiggle", common.PrettyDuration(wiggle))
log.Info("Sealing block with", "number", number, "delay", delay, "headerDifficulty", header.Difficulty, "signer", signer.Hex(), "proposer", snap.ValidatorSet.GetProposer().Address.Hex())

// Sign all the things!
sighash, err := signFn(accounts.Account{Address: signer}, accounts.MimetypeBor, BorRLP(header))
if err != nil {
Expand All @@ -774,23 +781,64 @@ func (c *Bor) Seal(chain consensus.ChainReader, block *types.Block, results chan

// Wait until sealing is terminated or delay timeout.
log.Trace("Waiting for slot to sign and propagate", "delay", common.PrettyDuration(delay))
shouldSeal := make(chan bool)
go c.WaitForSealingOp(number, shouldSeal, delay, stop)
go func() {
select {
case <-stop:
defer func() {
close(shouldSeal)
c.activeSealingOp = nil
}()
switch <-shouldSeal {
case false:
return
case <-time.After(delay):
case true:
if wiggle > 0 {
log.Info(
"Sealing out-of-turn",
"number", number,
"wiggle", common.PrettyDuration(wiggle),
"in-turn-signer", snap.ValidatorSet.GetProposer().Address.Hex(),
)
}
log.Info(
"Sealing successful",
"number", number,
"delay", delay,
"headerDifficulty", header.Difficulty,
)
}

select {
case results <- block.WithSeal(header):
default:
log.Warn("Sealing result is not read by miner", "sealhash", SealHash(header))
log.Warn("Sealing result was not read by miner", "sealhash", SealHash(header))
}
}()

return nil
}

// WaitForSealingOp blocks until delay elapses or stop signal is received
func (c *Bor) WaitForSealingOp(number uint64, shouldSeal chan bool, delay time.Duration, stop <-chan struct{}) {
ctx, cancel := context.WithCancel(context.Background())
c.activeSealingOp = &ActiveSealingOp{number, cancel}
select {
case <-stop:
shouldSeal <- false
case <-ctx.Done():
shouldSeal <- false
case <-time.After(delay):
shouldSeal <- true
}
}

// CancelActiveSealingOp cancels in-flight sealing process
func (c *Bor) CancelActiveSealingOp() {
if c.activeSealingOp != nil {
log.Debug("Discarding active sealing operation", "number", c.activeSealingOp.number)
c.activeSealingOp.cancel()
}
}

// CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty
// that a new block should have based on the previous blocks in the chain and the
// current signer.
Expand Down
11 changes: 11 additions & 0 deletions consensus/bor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,14 @@ func (e *MaxCheckpointLengthExceededError) Error() string {
MaxCheckpointLength,
)
}

type SealingInFlightError struct {
Number uint64
}

func (e *SealingInFlightError) Error() string {
return fmt.Sprintf(
"Requested concurrent block sealing. Sealing for block %d is already in progress",
e.Number,
)
}
7 changes: 7 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ type Engine interface {
Close() error
}

// Bor is a consensus engine developed by folks at Matic Network
type Bor interface {
Engine
IsValidatorAction(chain ChainReader, from common.Address, tx *types.Transaction) bool
CancelActiveSealingOp()
}

// PoW is a consensus engine based on proof-of-work.
type PoW interface {
Engine
Expand Down
1 change: 1 addition & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1718,6 +1718,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
events = append(events, ChainHeadEvent{lastCanon})
}
bc.engine.(consensus.Bor).CancelActiveSealingOp()
return it.index, events, coalescedLogs, err
}

Expand Down
8 changes: 1 addition & 7 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,6 @@ const (
TxStatusIncluded
)

// bor acts as a way to be able to type cast consensus.Engine;
// since importing "github.com/maticnetwork/bor/consensus/bor" results in a cyclic dependency
type bor interface {
IsValidatorAction(chain consensus.ChainReader, from common.Address, tx *types.Transaction) bool
}

// blockChain provides the state of blockchain and current gas limit to do
// some pre checks in tx pool and event subscribers.
type blockChain interface {
Expand Down Expand Up @@ -537,7 +531,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// Drop non-local transactions under our own minimal accepted gas price
local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
if !local &&
!pool.chain.Engine().(bor).IsValidatorAction(pool.chain.(consensus.ChainReader), from, tx) &&
!pool.chain.Engine().(consensus.Bor).IsValidatorAction(pool.chain.(consensus.ChainReader), from, tx) &&
pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
return ErrUnderpriced
}
Expand Down

0 comments on commit d1ea515

Please sign in to comment.