diff options
author | Ted Yin <[email protected]> | 2020-09-18 13:14:29 -0400 |
---|---|---|
committer | GitHub <[email protected]> | 2020-09-18 13:14:29 -0400 |
commit | d048937c48753d9eaef771bf71820cf95d79df26 (patch) | |
tree | 1a7f65fcd72e77092525ab01625b8b9d365e3e40 /miner/worker.go | |
parent | 7d1388c743b4ec8f4a86bea95bfada785dee83f7 (diff) | |
parent | 7d8c85cf8895b0f998d8eafb02f99d5b689fcd59 (diff) |
Merge pull request #34 from ava-labs/devv0.3.0-rc.5
Dev
Diffstat (limited to 'miner/worker.go')
-rw-r--r-- | miner/worker.go | 167 |
1 files changed, 102 insertions, 65 deletions
diff --git a/miner/worker.go b/miner/worker.go index 4a6303b..72597b0 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -33,10 +33,11 @@ import ( "github.com/ava-labs/coreth/core/state" "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/params" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/event" - "github.com/ava-labs/go-ethereum/log" mapset "github.com/deckarep/golang-set" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/trie" ) const ( @@ -137,6 +138,9 @@ type worker struct { eth Backend chain *core.BlockChain + // Feeds + pendingLogsFeed event.Feed + // Subscriptions mux *event.TypeMux txsCh chan core.NewTxsEvent @@ -175,6 +179,13 @@ type worker struct { running int32 // The indicator whether the consensus engine is running or not. newTxs int32 // New arrival transaction count since last sealing work submitting. + // noempty is the flag used to control whether the feature of pre-seal empty + // block is enabled. The default value is false(pre-seal is enabled by default). + // But in some special scenario the consensus engine will seal blocks instantaneously, + // in this case this feature will add all empty blocks into canonical chain + // non-stop and no real transaction will be included. + noempty uint32 + // External functions isLocalBlock func(block *types.Block) bool // Function used to determine whether the specified block is mined by local miner. @@ -189,7 +200,7 @@ type worker struct { minerCallbacks *MinerCallbacks } -func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, mcb *MinerCallbacks) *worker { +func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool, mcb *MinerCallbacks) *worker { worker := &worker{ config: config, chainConfig: chainConfig, @@ -239,8 +250,9 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus go worker.taskLoop() // Submit first work to initialize pending state. - worker.startCh <- struct{}{} - + if init { + worker.startCh <- struct{}{} + } return worker } @@ -263,6 +275,16 @@ func (w *worker) setRecommitInterval(interval time.Duration) { w.resubmitIntervalCh <- interval } +// disablePreseal disables pre-sealing mining feature +func (w *worker) disablePreseal() { + atomic.StoreUint32(&w.noempty, 1) +} + +// enablePreseal enables pre-sealing mining feature +func (w *worker) enablePreseal() { + atomic.StoreUint32(&w.noempty, 0) +} + // pending returns the pending state and corresponding block. func (w *worker) pending() (*types.Block, *state.StateDB) { // return a snapshot to avoid contention on currentMu mutex @@ -301,9 +323,32 @@ func (w *worker) isRunning() bool { // close terminates all background threads maintained by the worker. // Note the worker does not support being closed multiple times. func (w *worker) close() { + atomic.StoreInt32(&w.running, 0) close(w.exitCh) } +// recalcRecommit recalculates the resubmitting interval upon feedback. +func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) time.Duration { + var ( + prevF = float64(prev.Nanoseconds()) + next float64 + ) + if inc { + next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias) + max := float64(maxRecommitInterval.Nanoseconds()) + if next > max { + next = max + } + } else { + next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias) + min := float64(minRecommit.Nanoseconds()) + if next < min { + next = min + } + } + return time.Duration(int64(next)) +} + func (w *worker) genBlock() { interrupt := new(int32) *interrupt = commitInterruptNone @@ -323,6 +368,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) { ) timer := time.NewTimer(0) + defer timer.Stop() <-timer.C // discard the initial tick // commit aborts in-flight transaction execution with given signal and resubmits a new one. @@ -335,27 +381,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) { timer.Reset(recommit) atomic.StoreInt32(&w.newTxs, 0) } - // recalcRecommit recalculates the resubmitting interval upon feedback. - recalcRecommit := func(target float64, inc bool) { - var ( - prev = float64(recommit.Nanoseconds()) - next float64 - ) - if inc { - next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias) - // Recap if interval is larger than the maximum time interval - if next > float64(maxRecommitInterval.Nanoseconds()) { - next = float64(maxRecommitInterval.Nanoseconds()) - } - } else { - next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias) - // Recap if interval is less than the user specified minimum - if next < float64(minRecommit.Nanoseconds()) { - next = float64(minRecommit.Nanoseconds()) - } - } - recommit = time.Duration(int64(next)) - } // clearPending cleans the stale pending tasks. clearPending := func(number uint64) { w.pendingMu.Lock() @@ -415,11 +440,12 @@ func (w *worker) newWorkLoop(recommit time.Duration) { // Adjust resubmit interval by feedback. if adjust.inc { before := recommit - recalcRecommit(float64(recommit.Nanoseconds())/adjust.ratio, true) + target := float64(recommit.Nanoseconds()) / adjust.ratio + recommit = recalcRecommit(minRecommit, recommit, target, true) log.Trace("Increase miner recommit interval", "from", before, "to", recommit) } else { before := recommit - recalcRecommit(float64(minRecommit.Nanoseconds()), false) + recommit = recalcRecommit(minRecommit, recommit, float64(minRecommit.Nanoseconds()), false) log.Trace("Decrease miner recommit interval", "from", before, "to", recommit) } @@ -514,8 +540,9 @@ func (w *worker) mainLoop() { w.updateSnapshot() } } else { - // If clique is running in dev mode(period is 0), disable - // advance sealing here. + // Special case, if the consensus engine is 0 period clique(dev mode), + // submit mining work here since all empty submission will be rejected + // by clique. Of course the advance sealing(empty submission) is disabled. if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 { w.commitNewWork(nil, true, time.Now().Unix()) } @@ -576,7 +603,7 @@ func (w *worker) taskLoop() { continue } w.pendingMu.Lock() - w.pendingTasks[w.engine.SealHash(task.block.Header())] = task + w.pendingTasks[sealHash] = task w.pendingMu.Unlock() if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil { @@ -636,8 +663,7 @@ func (w *worker) resultLoop() { } // Commit block and state to database. //fmt.Printf("parent1: %s\n", w.chain.CurrentBlock().Hash().String()) - stat, err := w.chain.WriteBlockWithState(block, receipts, task.state) - stat = core.CanonStatTy + _, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true) //fmt.Printf("parent2: %s\n", w.chain.CurrentBlock().Hash().String()) if err != nil { log.Error("Failed writing block to chain", "err", err) @@ -652,16 +678,6 @@ func (w *worker) resultLoop() { // Broadcast the block and announce chain insertion event w.mux.Post(core.NewMinedBlockEvent{Block: block}) - var events []interface{} - switch stat { - case core.CanonStatTy: - events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) - events = append(events, core.ChainHeadEvent{Block: block}) - case core.SideStatTy: - events = append(events, core.ChainSideEvent{Block: block}) - } - w.chain.PostChainEvents(events, logs) - // Insert the block into the set of pending ones to resultLoop for confirmations w.unconfirmed.Insert(block.NumberU64(), block.Hash()) @@ -751,6 +767,7 @@ func (w *worker) updateSnapshot() { w.current.txs, uncles, w.current.receipts, + new(trie.Trie), nil, ) @@ -760,7 +777,7 @@ func (w *worker) updateSnapshot() { func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) { snap := w.current.state.Snapshot() - receipt, _, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig()) + receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig()) if err != nil { w.current.state.RevertToSnapshot(snap) return nil, err @@ -874,7 +891,7 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin cpy[i] = new(types.Log) *cpy[i] = *l } - go w.mux.Post(core.PendingLogsEvent{Logs: cpy}) + w.pendingLogsFeed.Send(cpy) } // Notify resubmit loop to decrease resubmitting interval if current interval is larger // than the user-specified one. @@ -978,9 +995,9 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) commitUncles(w.localUncles) commitUncles(w.remoteUncles) - if !noempty && !w.manualMining { - // Create an empty block based on temporary copied state for sealing in advance without waiting block - // execution finished. + // Create an empty block based on temporary copied state for + // sealing in advance without waiting block execution finished. + if !noempty && atomic.LoadUint32(&w.noempty) == 0 && !w.manualMining { w.commit(uncles, nil, false, tstart) } @@ -990,8 +1007,10 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) log.Error("Failed to fetch pending transactions", "err", err) return } - // Short circuit if there is no available pending transactions - if len(pending) == 0 && !w.manualMining { + // Short circuit if there is no available pending transactions. + // But if we disable empty precommit already, ignore it. Since + // empty block is necessary to keep the liveness of the network. + if len(pending) == 0 && atomic.LoadUint32(&w.noempty) == 0 && !w.manualMining { w.updateSnapshot() return } @@ -1022,13 +1041,9 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) // and commits new work if consensus engine is running. func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error { // Deep copy receipts here to avoid interaction between different tasks. - receipts := make([]*types.Receipt, len(w.current.receipts)) - for i, l := range w.current.receipts { - receipts[i] = new(types.Receipt) - *receipts[i] = *l - } + receipts := copyReceipts(w.current.receipts) s := w.current.state.Copy() - block, err := w.engine.FinalizeAndAssemble(w.chain, w.current.header, s, w.current.txs, uncles, w.current.receipts) + block, err := w.engine.FinalizeAndAssemble(w.chain, w.current.header, s, w.current.txs, uncles, receipts) if err != nil { return err } @@ -1039,15 +1054,10 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st select { case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}: w.unconfirmed.Shift(block.NumberU64() - 1) - - feesWei := new(big.Int) - for i, tx := range block.Transactions() { - feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), tx.GasPrice())) - } - feesEth := new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether))) - log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()), - "uncles", len(uncles), "txs", w.current.tcount, "gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start))) + "uncles", len(uncles), "txs", w.current.tcount, + "gas", block.GasUsed(), "fees", totalFees(block, receipts), + "elapsed", common.PrettyDuration(time.Since(start))) case <-w.exitCh: log.Info("Worker has exited") @@ -1058,3 +1068,30 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st } return nil } + +// copyReceipts makes a deep copy of the given receipts. +func copyReceipts(receipts []*types.Receipt) []*types.Receipt { + result := make([]*types.Receipt, len(receipts)) + for i, l := range receipts { + cpy := *l + result[i] = &cpy + } + return result +} + +// postSideBlock fires a side chain event, only use it for testing. +func (w *worker) postSideBlock(event core.ChainSideEvent) { + select { + case w.chainSideCh <- event: + case <-w.exitCh: + } +} + +// totalFees computes total consumed fees in ETH. Block transactions and receipts have to have the same order. +func totalFees(block *types.Block, receipts []*types.Receipt) *big.Float { + feesWei := new(big.Int) + for i, tx := range block.Transactions() { + feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), tx.GasPrice())) + } + return new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether))) +} |