diff --git a/consensus/drab/drab.go b/consensus/drab/drab.go index 3fccc161f0..1eeca77f42 100644 --- a/consensus/drab/drab.go +++ b/consensus/drab/drab.go @@ -883,32 +883,7 @@ func (d *Drab) Authorize(val common.Address, signFn SignerFn, signTxFn SignerTxF // Argument leftOver is the time reserved for block finalize(calculate root, distribute income...) func (d *Drab) Delay(chain consensus.ChainReader, header *types.Header, leftOver *time.Duration) *time.Duration { - number := header.Number.Uint64() - snap, err := d.snapshot(chain, number-1, header.ParentHash, nil) - if err != nil { - return nil - } - - delay := d.delayForHawaiiFork(snap, header) - - if *leftOver >= time.Duration(d.config.BlockTime)*time.Second { - // ignore invalid leftOver - log.Error("Delay invalid argument", "leftOver", leftOver.String(), "BlockTime", d.config.BlockTime) - } else if *leftOver >= delay { - // no left time - delay = time.Duration(0) - return &delay - } else { - // delay - delay = delay - *leftOver - } - - // The blocking time should be no more than half of period - half := time.Duration(d.config.BlockTime) * time.Second / 2 - if delay > half { - delay = half - } - return &delay + return nil } // Seal implements consensus.Engine, attempting to create a sealed block using @@ -947,7 +922,7 @@ func (d *Drab) Seal(chain consensus.ChainHeaderReader, block *types.Block, resul // Signer is among recents, only wait if the current block doesn't shift it out if limit := uint64(snap.blockLimit()); number < limit || seen+limit > number { log.Info("Sealing found signed recently, must wait for others", "seen", seen, "limit", limit, "number", number) - return nil + return errRecentlySigned } } } diff --git a/consensus/drab/hawaii.go b/consensus/drab/hawaii.go index f46244b9c5..4af0428553 100644 --- a/consensus/drab/hawaii.go +++ b/consensus/drab/hawaii.go @@ -11,19 +11,52 @@ import ( ) const ( - wiggleTime = uint64(1) // second, Random delay (per signer) to allow concurrent signers - initialBackOffTime = uint64(1) // second - processBackOffTime = uint64(1) // second - wiggleTimeBeforeFork = 500 * time.Millisecond // Random delay (per signer) to allow concurrent signers - fixedBackOffTimeBeforeFork = 200 * time.Millisecond + wiggleTime = uint64(1) // second, Random delay (per signer) to allow concurrent signers + initialBackOffTime = uint64(1) // second + processBackOffTime = uint64(1) // second + + wiggleTimeBeforeForkGranularity = 3 * time.Millisecond // Time granularity of the random delay + fixedBackOffTimeBeforeFork = 200 * time.Millisecond +) + +var ( + randDelaySeed = rand.New(rand.NewSource(time.Now().UnixNano())) ) +//// Consensus time schedule design: +/** + ┌──────────────────────────────────────────────────────┐ + │ block N │ + ├─────────────────┬───────────────┬────────────────────┤ +Diff No Turn: │ fillTx time │ DelayLeftOver │ wait seal block │ + └─────────────────┴───────────────┴────────────────────┘ + ┌──────────────────────────────────────────────────────┬─────────────────────────────────────────────────────────┐ + │ block N │ block N+1 │ + ├─────────────────┬───────────────┬────────────────────┴─────────────────────┬───────────────────────────────────┴───────────┐ +Diff Turn: │ fillTx time │ DelayLeftOver │ never seal block │ preempt seal block │ + └─────────────────┴───────────────┼────────────────────┬─────────────────────┼───────────────────────────────────────────────┤ + │ wait header time │ fixed backoff delay │ random blockTime*blockLimit range (step 11us)│ + └────────────────────┴─────────────────────┴───────────────────────────────────────────────┘ +**/ + func (d *Drab) delayForHawaiiFork(snap *Snapshot, header *types.Header) time.Duration { - delay := time.Until(time.Unix(int64(header.Time), 0)) // nolint: gosimple + delay := time.Until(time.Unix(int64(header.Time), 0)) // time until the block is supposed to be mined + // if delay <= 0 we are late, so we should try to sign immediately + if delay <= 0 { + delay = 0 + } + if header.Difficulty.Cmp(diffNoTurn) == 0 { - // It's not our turn explicitly to sign, delay it a bit - wiggle := time.Duration(snap.blockLimit()) * wiggleTimeBeforeFork - delay += fixedBackOffTimeBeforeFork + time.Duration(rand.Int63n(int64(wiggle))) + // It's not our turn explicitly to sign, delay it. + // Wait other validators have signed recently, if timeout we can try sign immediately. + backOffTime := time.Duration(d.config.BlockTime) * time.Second // fixed backoff time + // wiggle time is random delay (per signer) to allow concurrent signers + wiggle := time.Duration(snap.blockLimit()) * + wiggleTimeBeforeForkGranularity * + time.Duration(1+randDelaySeed.Int63n(int64(backOffTime/wiggleTimeBeforeForkGranularity))) + + // delay = durationToBlockTimestamp + fixedBackOffTimeBeforeFork + randomRange(wiggleTimeBeforeForkGranularity, blockTime*blockLimit) + delay += backOffTime + fixedBackOffTimeBeforeFork + wiggle } return delay } diff --git a/eth/api.go b/eth/api.go index f81dfa922b..105d66ed78 100644 --- a/eth/api.go +++ b/eth/api.go @@ -280,6 +280,9 @@ func (api *PublicDebugAPI) DumpBlock(blockNr rpc.BlockNumber) (state.Dump, error // both the pending block as well as the pending state from // the miner and operate on those _, stateDb := api.eth.miner.Pending() + if stateDb == nil { + return state.Dump{}, errors.New("pending state is not available") + } return stateDb.RawDump(opts), nil } var block *types.Block @@ -369,6 +372,9 @@ func (api *PublicDebugAPI) AccountRange(blockNrOrHash rpc.BlockNumberOrHash, sta // both the pending block as well as the pending state from // the miner and operate on those _, stateDb = api.eth.miner.Pending() + if stateDb == nil { + return state.IteratorDump{}, errors.New("pending state is not available") + } } else { var block *types.Block if number == rpc.LatestBlockNumber { diff --git a/eth/api_backend.go b/eth/api_backend.go index 3184240bf9..63395bb821 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -66,6 +66,9 @@ func (b *EthAPIBackend) HeaderByNumber(ctx context.Context, number rpc.BlockNumb // Pending block is only known by the miner if number == rpc.PendingBlockNumber { block := b.eth.miner.PendingBlock() + if block == nil { + return nil, errors.New("pending block is not available") + } return block.Header(), nil } // Otherwise resolve and return the block @@ -108,6 +111,9 @@ func (b *EthAPIBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumbe // Pending block is only known by the miner if number == rpc.PendingBlockNumber { block := b.eth.miner.PendingBlock() + if block == nil { + return nil, errors.New("pending block is not available") + } return block, nil } // Otherwise resolve and return the block @@ -150,6 +156,9 @@ func (b *EthAPIBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.B // Pending state is only known by the miner if number == rpc.PendingBlockNumber { block, state := b.eth.miner.Pending() + if block == nil || state == nil { + return nil, nil, errors.New("pending state is not available") + } return state, block.Header(), nil } // Otherwise resolve the block number and return its state diff --git a/eth/backend.go b/eth/backend.go index 1b4bfed88f..773f937947 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -467,6 +467,9 @@ func (s *Ethereum) shouldPreserve(header *types.Header) bool { if _, ok := s.engine.(*parlia.Parlia); ok { return false } + if _, ok := s.engine.(*drab.Drab); ok { + return false + } return s.isLocalBlock(header) } diff --git a/miner/miner.go b/miner/miner.go index 7f0f1583e8..db21feac50 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -120,18 +120,24 @@ func (miner *Miner) update() { shouldStart = true log.Info("Mining aborted due to sync") } + miner.worker.syncing.Store(true) + case downloader.FailedEvent: canStart = true if shouldStart { miner.SetEtherbase(miner.coinbase) miner.worker.start() } + miner.worker.syncing.Store(false) + case downloader.DoneEvent: canStart = true if shouldStart { miner.SetEtherbase(miner.coinbase) miner.worker.start() } + miner.worker.syncing.Store(false) + // Stop reacting to downloader events events.Unsubscribe() } @@ -188,7 +194,8 @@ func (miner *Miner) SetRecommitInterval(interval time.Duration) { miner.worker.setRecommitInterval(interval) } -// Pending returns the currently pending block and associated state. +// Pending returns the currently pending block and associated state. The returned +// values can be nil in case the pending block is not initialized func (miner *Miner) Pending() (*types.Block, *state.StateDB) { if miner.worker.isRunning() { pendingBlock, pendingState := miner.worker.pending() @@ -208,11 +215,11 @@ func (miner *Miner) Pending() (*types.Block, *state.StateDB) { return block, stateDb } -// PendingBlock returns the currently pending block. +// PendingBlock returns the currently pending block. The returned block can be +// nil in case the pending block is not initialized. // // Note, to access both the pending block and the pending state // simultaneously, please use Pending(), as the pending state can -// change between multiple method calls func (miner *Miner) PendingBlock() *types.Block { if miner.worker.isRunning() { pendingBlock := miner.worker.pendingBlock() @@ -225,6 +232,7 @@ func (miner *Miner) PendingBlock() *types.Block { } // PendingBlockAndReceipts returns the currently pending block and corresponding receipts. +// The returned values can be nil in case the pending block is not initialized. func (miner *Miner) PendingBlockAndReceipts() (*types.Block, types.Receipts) { return miner.worker.pendingBlockAndReceipts() } diff --git a/miner/worker.go b/miner/worker.go index 5acfbc23c9..fad153587b 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -19,13 +19,13 @@ package miner import ( "errors" "fmt" - "math/big" "sync" "sync/atomic" "time" mapset "github.com/deckarep/golang-set/v2" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/clock" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/misc" "github.com/ethereum/go-ethereum/consensus/parlia" @@ -46,10 +46,6 @@ const ( // resultQueueSize is the size of channel listening to sealing result. resultQueueSize = 10 - // txChanSize is the size of channel listening to NewTxsEvent. - // The number is referenced from the size of tx pool. - txChanSize = 4096 - // chainHeadChanSize is the size of channel listening to ChainHeadEvent. chainHeadChanSize = 10 @@ -223,7 +219,8 @@ type worker struct { snapshotState *state.StateDB // atomic status counters - running int32 // The indicator whether the consensus engine is running or not. + running atomic.Bool // The indicator whether the consensus engine is running or not. + syncing atomic.Bool // The indicator whether the node is still syncing. // External functions isLocalBlock func(header *types.Header) bool // Function used to determine whether the specified block is mined by local miner. @@ -343,24 +340,24 @@ func (w *worker) pendingBlockAndReceipts() (*types.Block, types.Receipts) { // start sets the running status as 1 and triggers new work submitting. func (w *worker) start() { - atomic.StoreInt32(&w.running, 1) + w.running.Store(true) w.startCh <- struct{}{} } // stop sets the running status as 0. func (w *worker) stop() { - atomic.StoreInt32(&w.running, 0) + w.running.Store(false) } // isRunning returns an indicator whether worker is running or not. func (w *worker) isRunning() bool { - return atomic.LoadInt32(&w.running) == 1 + return w.running.Load() } // close terminates all background threads maintained by the worker. // Note the worker does not support being closed multiple times. func (w *worker) close() { - atomic.StoreInt32(&w.running, 0) + w.running.Store(false) close(w.exitCh) w.wg.Wait() } @@ -692,19 +689,14 @@ func (w *worker) resultLoop() { } // makeEnv creates a new environment for the sealing block. -func (w *worker) makeEnv(parent *types.Block, header *types.Header, coinbase common.Address, - prevEnv *environment) (*environment, error) { +func (w *worker) makeEnv(parent *types.Block, header *types.Header, coinbase common.Address) (*environment, error) { // Retrieve the parent state to execute on top and start a prefetcher for // the miner to speed block sealing up a bit state, err := w.chain.StateAtWithSharedPool(parent.Root()) if err != nil { return nil, err } - if prevEnv == nil { - state.StartPrefetcher("miner") - } else { - state.TransferPrefetcher(prevEnv.state) - } + state.StartPrefetcher("miner") // Note the passed coinbase may be different with header.Coinbase. env := &environment{ @@ -773,8 +765,12 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece return receipt.Logs, nil } -func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, - interruptCh chan int32, stopTimer *time.Timer) error { +func (w *worker) commitTransactions( + env *environment, + txs *types.TransactionsByPriceAndNonce, + interruptCh chan int32, + stopTimer clock.Timer, +) error { gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) @@ -804,7 +800,6 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), stopPrefetchCh, txCurr) signal := commitInterruptNone -LOOP: for { // In the following three cases, we will interrupt the execution of the transaction. // (1) new head block event arrival, the reason is 1 @@ -823,22 +818,24 @@ LOOP: default: } } - // If we don't have enough gas for any further transactions then we're done - if env.gasPool.Gas() < params.TxGas { - log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) - signal = commitInterruptOutOfGas - break - } + if stopTimer != nil { select { - case <-stopTimer.C: - log.Info("Not enough time for further transactions", "txs", len(env.txs)) - stopTimer.Reset(0) // re-active the timer, in case it will be used later. + case <-stopTimer.C(): signal = commitInterruptTimeout - break LOOP + log.Info("Not enough time for further transactions", "txs", len(env.txs)) + stopTimer.Reset(0) + break default: } } + + // If we don't have enough gas for any further transactions then we're done + if env.gasPool.Gas() < params.TxGas { + log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) + signal = commitInterruptOutOfGas + break + } // Retrieve the next transaction and abort if all done tx = txs.Peek() if tx == nil { @@ -922,7 +919,6 @@ type generateParams struct { random common.Hash // The randomness generated by beacon chain, empty before the merge noUncle bool // Flag whether the uncle block inclusion is allowed noExtra bool // Flag whether the extra field assignment is allowed - prevWork *environment } // prepareWork constructs the sealing task according to the given parameters, @@ -977,7 +973,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { // Could potentially happen if starting to mine in an odd state. // Note genParams.coinbase can be different with header.Coinbase // since clique algorithm can modify the coinbase field in header. - env, err := w.makeEnv(parent, header, genParams.coinbase, genParams.prevWork) + env, err := w.makeEnv(parent, header, genParams.coinbase) if err != nil { log.Error("Failed to create sealing context", "err", err) return nil, err @@ -1014,7 +1010,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { // fillTransactions retrieves the pending transactions from the txpool and fills them // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. -func (w *worker) fillTransactions(interruptCh chan int32, env *environment, stopTimer *time.Timer) (err error) { +func (w *worker) fillTransactions(interruptCh chan int32, env *environment, timeout clock.Timer) (err error) { // Split the pending transactions into locals and remotes // Fill the block with all available pending transactions. pending := w.eth.TxPool().Pending(false) @@ -1029,7 +1025,7 @@ func (w *worker) fillTransactions(interruptCh chan int32, env *environment, stop err = nil if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) - err = w.commitTransactions(env, txs, interruptCh, stopTimer) + err = w.commitTransactions(env, txs, interruptCh, timeout) // we will abort here when: // 1.new block was imported // 2.out of Gas, no more transaction can be added. @@ -1042,7 +1038,7 @@ func (w *worker) fillTransactions(interruptCh chan int32, env *environment, stop } if len(remoteTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) - err = w.commitTransactions(env, txs, interruptCh, stopTimer) + err = w.commitTransactions(env, txs, interruptCh, timeout) } return @@ -1064,6 +1060,10 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { // commitWork generates several new sealing tasks based on the parent block // and submit them to the sealer. func (w *worker) commitWork(interruptCh chan int32, timestamp int64) { + // Abort committing if node is still syncing + if w.syncing.Load() { + return + } start := time.Now() // Set the coinbase if the worker is running or it's required @@ -1076,159 +1076,52 @@ func (w *worker) commitWork(interruptCh chan int32, timestamp int64) { coinbase = w.coinbase // Use the preset address as the fee recipient } - stopTimer := time.NewTimer(0) - defer stopTimer.Stop() - <-stopTimer.C // discard the initial tick - - stopWaitTimer := time.NewTimer(0) - defer stopWaitTimer.Stop() - <-stopWaitTimer.C // discard the initial tick - - // validator can try several times to get the most profitable block, - // as long as the timestamp is not reached. - workList := make([]*environment, 0, 10) - var prevWork *environment - // workList clean up - defer func() { - for _, wk := range workList { - // only keep the best work, discard others. - if wk == w.current { - continue - } - wk.discard() - } - }() - -LOOP: - for { - work, err := w.prepareWork(&generateParams{ - timestamp: uint64(timestamp), - coinbase: coinbase, - prevWork: prevWork, - }) - if err != nil { - return - } - prevWork = work - workList = append(workList, work) - - delay := w.engine.Delay(w.chain, work.header, &w.config.DelayLeftOver) - if delay == nil { - log.Warn("commitWork delay is nil, something is wrong") - stopTimer = nil - } else if *delay <= 0 { - log.Debug("Not enough time for commitWork") - break - } else { - log.Debug("commitWork stopTimer", "block", work.header.Number, - "header time", time.Until(time.Unix(int64(work.header.Time), 0)), - "commit delay", *delay, "DelayLeftOver", w.config.DelayLeftOver) - stopTimer.Reset(*delay) - } - - // subscribe before fillTransactions - txsCh := make(chan core.NewTxsEvent, txChanSize) - sub := w.eth.TxPool().SubscribeNewTxsEvent(txsCh) - // if TxPool has been stopped, `sub` would be nil, it could happen on shutdown. - if sub == nil { - log.Info("commitWork SubscribeNewTxsEvent return nil") - } else { - defer sub.Unsubscribe() - } + work, err := w.prepareWork(&generateParams{ + timestamp: uint64(timestamp), + coinbase: coinbase, + }) + if err != nil { + return + } - // Fill pending transactions from the txpool - fillStart := time.Now() - err = w.fillTransactions(interruptCh, work, stopTimer) - fillDuration := time.Since(fillStart) - switch { - case errors.Is(err, errBlockInterruptedByNewHead): - log.Debug("commitWork abort", "err", err) - return - case errors.Is(err, errBlockInterruptedByRecommit): - fallthrough - case errors.Is(err, errBlockInterruptedByTimeout): - fallthrough - case errors.Is(err, errBlockInterruptedByOutOfGas): - // break the loop to get the best work - log.Debug("commitWork finish", "reason", err) - break LOOP - } + // transaction processing timeout + fillTimeout := time.Until(time.Unix(int64(work.header.Time), 0))/2 - w.config.DelayLeftOver + if fillTimeout <= 0 { + log.Warn( + "fillTimeout is negative or zero", + "delayLeftOver", w.config.DelayLeftOver, + "timestamp", work.header.Time, + "now", time.Now().Unix(), + ) + fillTimeout = 200 * time.Millisecond // minimum transaction processing time + } - if interruptCh == nil || stopTimer == nil { - // it is single commit work, no need to try several time. - log.Info("commitWork interruptCh or stopTimer is nil") - break - } + deadlineTimer := clock.NewTimer(fillTimeout) + defer deadlineTimer.Stop() - newTxsNum := 0 - // stopTimer was the maximum delay for each fillTransactions - // but now it is used to wait until (head.Time - DelayLeftOver) is reached. - stopTimer.Reset(time.Until(time.Unix(int64(work.header.Time), 0)) - w.config.DelayLeftOver) - LOOP_WAIT: - for { - select { - case <-stopTimer.C: - log.Debug("commitWork stopTimer expired") - break LOOP - case <-interruptCh: - log.Debug("commitWork interruptCh closed, new block imported or resubmit triggered") - return - case ev := <-txsCh: - delay := w.engine.Delay(w.chain, work.header, &w.config.DelayLeftOver) - log.Debug("commitWork txsCh arrived", "fillDuration", fillDuration.String(), - "delay", delay.String(), "work.tcount", work.tcount, - "newTxsNum", newTxsNum, "len(ev.Txs)", len(ev.Txs)) - if *delay < fillDuration { - // There may not have enough time for another fillTransactions. - break LOOP - } else if *delay < fillDuration*2 { - // We can schedule another fillTransactions, but the time is limited, - // probably it is the last chance, schedule it immediately. - break LOOP_WAIT - } else { - // There is still plenty of time left. - // We can wait a while to collect more transactions before - // schedule another fillTransaction to reduce CPU cost. - // There will be 2 cases to schedule another fillTransactions: - // 1.newTxsNum >= work.tcount - // 2.no much time left, have to schedule it immediately. - newTxsNum = newTxsNum + len(ev.Txs) - if newTxsNum >= work.tcount { - break LOOP_WAIT - } - stopWaitTimer.Reset(*delay - fillDuration*2) - } - case <-stopWaitTimer.C: - if newTxsNum > 0 { - break LOOP_WAIT - } - } - } - // if sub's channel if full, it will block other NewTxsEvent subscribers, - // so unsubscribe ASAP and Unsubscribe() is re-enterable, safe to call several time. - if sub != nil { - sub.Unsubscribe() - } - } - // get the most profitable work - bestWork := workList[0] - bestReward := new(big.Int) - for i, wk := range workList { - balance := wk.state.GetBalance(consensus.SystemAddress) - log.Debug("Get the most profitable work", "index", i, "balance", balance, "bestReward", bestReward) - if balance.Cmp(bestReward) > 0 { - bestWork = wk - bestReward = balance - } + // Fill pending transactions from the txpool + err = w.fillTransactions(interruptCh, work, deadlineTimer) + switch { + case errors.Is(err, errBlockInterruptedByNewHead): + log.Debug("commitWork abort", "err", err) + return + case errors.Is(err, errBlockInterruptedByRecommit): + fallthrough + case errors.Is(err, errBlockInterruptedByTimeout): + fallthrough + case errors.Is(err, errBlockInterruptedByOutOfGas): + // break the loop to get the best work + log.Debug("commitWork finish", "reason", err) } - w.commit(bestWork, w.fullTaskHook, true, start) + + w.commit(work, w.fullTaskHook, true, start) // Swap out the old work with the new one, terminating any leftover // prefetcher processes in the mean time and starting a new one. if w.current != nil { w.current.discard() } - w.current = bestWork + w.current = work } // commit runs any post-transaction state modifications, assembles the final block diff --git a/miner/worker_test.go b/miner/worker_test.go index e405788c61..1960833392 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -112,7 +112,6 @@ type testWorkerBackend struct { db ethdb.Database txPool *core.TxPool chain *core.BlockChain - testTxFeed event.Feed genesis *core.Genesis uncleBlock *types.Block } @@ -263,7 +262,7 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) { if _, err := chain.InsertChain([]*types.Block{block}); err != nil { t.Fatalf("failed to insert new mined block %d: %v", block.NumberU64(), err) } - case <-time.After(3 * time.Second): // Worker needs 1s to include new changes. + case <-time.After(3 * time.Second): // Worker needs 3s to include new changes. t.Fatalf("timeout") } }