aboutsummaryrefslogtreecommitdiff
path: root/miner/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'miner/worker.go')
-rw-r--r--miner/worker.go167
1 files changed, 102 insertions, 65 deletions
diff --git a/miner/worker.go b/miner/worker.go
index 4a6303b..72597b0 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -33,10 +33,11 @@ import (
"github.com/ava-labs/coreth/core/state"
"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/params"
- "github.com/ava-labs/go-ethereum/common"
- "github.com/ava-labs/go-ethereum/event"
- "github.com/ava-labs/go-ethereum/log"
mapset "github.com/deckarep/golang-set"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/trie"
)
const (
@@ -137,6 +138,9 @@ type worker struct {
eth Backend
chain *core.BlockChain
+ // Feeds
+ pendingLogsFeed event.Feed
+
// Subscriptions
mux *event.TypeMux
txsCh chan core.NewTxsEvent
@@ -175,6 +179,13 @@ type worker struct {
running int32 // The indicator whether the consensus engine is running or not.
newTxs int32 // New arrival transaction count since last sealing work submitting.
+ // noempty is the flag used to control whether the feature of pre-seal empty
+ // block is enabled. The default value is false(pre-seal is enabled by default).
+ // But in some special scenario the consensus engine will seal blocks instantaneously,
+ // in this case this feature will add all empty blocks into canonical chain
+ // non-stop and no real transaction will be included.
+ noempty uint32
+
// External functions
isLocalBlock func(block *types.Block) bool // Function used to determine whether the specified block is mined by local miner.
@@ -189,7 +200,7 @@ type worker struct {
minerCallbacks *MinerCallbacks
}
-func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, mcb *MinerCallbacks) *worker {
+func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool, mcb *MinerCallbacks) *worker {
worker := &worker{
config: config,
chainConfig: chainConfig,
@@ -239,8 +250,9 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
go worker.taskLoop()
// Submit first work to initialize pending state.
- worker.startCh <- struct{}{}
-
+ if init {
+ worker.startCh <- struct{}{}
+ }
return worker
}
@@ -263,6 +275,16 @@ func (w *worker) setRecommitInterval(interval time.Duration) {
w.resubmitIntervalCh <- interval
}
+// disablePreseal disables pre-sealing mining feature
+func (w *worker) disablePreseal() {
+ atomic.StoreUint32(&w.noempty, 1)
+}
+
+// enablePreseal enables pre-sealing mining feature
+func (w *worker) enablePreseal() {
+ atomic.StoreUint32(&w.noempty, 0)
+}
+
// pending returns the pending state and corresponding block.
func (w *worker) pending() (*types.Block, *state.StateDB) {
// return a snapshot to avoid contention on currentMu mutex
@@ -301,9 +323,32 @@ func (w *worker) isRunning() bool {
// close terminates all background threads maintained by the worker.
// Note the worker does not support being closed multiple times.
func (w *worker) close() {
+ atomic.StoreInt32(&w.running, 0)
close(w.exitCh)
}
+// recalcRecommit recalculates the resubmitting interval upon feedback.
+func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) time.Duration {
+ var (
+ prevF = float64(prev.Nanoseconds())
+ next float64
+ )
+ if inc {
+ next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias)
+ max := float64(maxRecommitInterval.Nanoseconds())
+ if next > max {
+ next = max
+ }
+ } else {
+ next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias)
+ min := float64(minRecommit.Nanoseconds())
+ if next < min {
+ next = min
+ }
+ }
+ return time.Duration(int64(next))
+}
+
func (w *worker) genBlock() {
interrupt := new(int32)
*interrupt = commitInterruptNone
@@ -323,6 +368,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
)
timer := time.NewTimer(0)
+ defer timer.Stop()
<-timer.C // discard the initial tick
// commit aborts in-flight transaction execution with given signal and resubmits a new one.
@@ -335,27 +381,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
timer.Reset(recommit)
atomic.StoreInt32(&w.newTxs, 0)
}
- // recalcRecommit recalculates the resubmitting interval upon feedback.
- recalcRecommit := func(target float64, inc bool) {
- var (
- prev = float64(recommit.Nanoseconds())
- next float64
- )
- if inc {
- next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias)
- // Recap if interval is larger than the maximum time interval
- if next > float64(maxRecommitInterval.Nanoseconds()) {
- next = float64(maxRecommitInterval.Nanoseconds())
- }
- } else {
- next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias)
- // Recap if interval is less than the user specified minimum
- if next < float64(minRecommit.Nanoseconds()) {
- next = float64(minRecommit.Nanoseconds())
- }
- }
- recommit = time.Duration(int64(next))
- }
// clearPending cleans the stale pending tasks.
clearPending := func(number uint64) {
w.pendingMu.Lock()
@@ -415,11 +440,12 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
// Adjust resubmit interval by feedback.
if adjust.inc {
before := recommit
- recalcRecommit(float64(recommit.Nanoseconds())/adjust.ratio, true)
+ target := float64(recommit.Nanoseconds()) / adjust.ratio
+ recommit = recalcRecommit(minRecommit, recommit, target, true)
log.Trace("Increase miner recommit interval", "from", before, "to", recommit)
} else {
before := recommit
- recalcRecommit(float64(minRecommit.Nanoseconds()), false)
+ recommit = recalcRecommit(minRecommit, recommit, float64(minRecommit.Nanoseconds()), false)
log.Trace("Decrease miner recommit interval", "from", before, "to", recommit)
}
@@ -514,8 +540,9 @@ func (w *worker) mainLoop() {
w.updateSnapshot()
}
} else {
- // If clique is running in dev mode(period is 0), disable
- // advance sealing here.
+ // Special case, if the consensus engine is 0 period clique(dev mode),
+ // submit mining work here since all empty submission will be rejected
+ // by clique. Of course the advance sealing(empty submission) is disabled.
if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 {
w.commitNewWork(nil, true, time.Now().Unix())
}
@@ -576,7 +603,7 @@ func (w *worker) taskLoop() {
continue
}
w.pendingMu.Lock()
- w.pendingTasks[w.engine.SealHash(task.block.Header())] = task
+ w.pendingTasks[sealHash] = task
w.pendingMu.Unlock()
if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
@@ -636,8 +663,7 @@ func (w *worker) resultLoop() {
}
// Commit block and state to database.
//fmt.Printf("parent1: %s\n", w.chain.CurrentBlock().Hash().String())
- stat, err := w.chain.WriteBlockWithState(block, receipts, task.state)
- stat = core.CanonStatTy
+ _, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true)
//fmt.Printf("parent2: %s\n", w.chain.CurrentBlock().Hash().String())
if err != nil {
log.Error("Failed writing block to chain", "err", err)
@@ -652,16 +678,6 @@ func (w *worker) resultLoop() {
// Broadcast the block and announce chain insertion event
w.mux.Post(core.NewMinedBlockEvent{Block: block})
- var events []interface{}
- switch stat {
- case core.CanonStatTy:
- events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
- events = append(events, core.ChainHeadEvent{Block: block})
- case core.SideStatTy:
- events = append(events, core.ChainSideEvent{Block: block})
- }
- w.chain.PostChainEvents(events, logs)
-
// Insert the block into the set of pending ones to resultLoop for confirmations
w.unconfirmed.Insert(block.NumberU64(), block.Hash())
@@ -751,6 +767,7 @@ func (w *worker) updateSnapshot() {
w.current.txs,
uncles,
w.current.receipts,
+ new(trie.Trie),
nil,
)
@@ -760,7 +777,7 @@ func (w *worker) updateSnapshot() {
func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {
snap := w.current.state.Snapshot()
- receipt, _, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig())
+ receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig())
if err != nil {
w.current.state.RevertToSnapshot(snap)
return nil, err
@@ -874,7 +891,7 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
cpy[i] = new(types.Log)
*cpy[i] = *l
}
- go w.mux.Post(core.PendingLogsEvent{Logs: cpy})
+ w.pendingLogsFeed.Send(cpy)
}
// Notify resubmit loop to decrease resubmitting interval if current interval is larger
// than the user-specified one.
@@ -978,9 +995,9 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
commitUncles(w.localUncles)
commitUncles(w.remoteUncles)
- if !noempty && !w.manualMining {
- // Create an empty block based on temporary copied state for sealing in advance without waiting block
- // execution finished.
+ // Create an empty block based on temporary copied state for
+ // sealing in advance without waiting block execution finished.
+ if !noempty && atomic.LoadUint32(&w.noempty) == 0 && !w.manualMining {
w.commit(uncles, nil, false, tstart)
}
@@ -990,8 +1007,10 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
log.Error("Failed to fetch pending transactions", "err", err)
return
}
- // Short circuit if there is no available pending transactions
- if len(pending) == 0 && !w.manualMining {
+ // Short circuit if there is no available pending transactions.
+ // But if we disable empty precommit already, ignore it. Since
+ // empty block is necessary to keep the liveness of the network.
+ if len(pending) == 0 && atomic.LoadUint32(&w.noempty) == 0 && !w.manualMining {
w.updateSnapshot()
return
}
@@ -1022,13 +1041,9 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
// and commits new work if consensus engine is running.
func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
// Deep copy receipts here to avoid interaction between different tasks.
- receipts := make([]*types.Receipt, len(w.current.receipts))
- for i, l := range w.current.receipts {
- receipts[i] = new(types.Receipt)
- *receipts[i] = *l
- }
+ receipts := copyReceipts(w.current.receipts)
s := w.current.state.Copy()
- block, err := w.engine.FinalizeAndAssemble(w.chain, w.current.header, s, w.current.txs, uncles, w.current.receipts)
+ block, err := w.engine.FinalizeAndAssemble(w.chain, w.current.header, s, w.current.txs, uncles, receipts)
if err != nil {
return err
}
@@ -1039,15 +1054,10 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st
select {
case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
w.unconfirmed.Shift(block.NumberU64() - 1)
-
- feesWei := new(big.Int)
- for i, tx := range block.Transactions() {
- feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), tx.GasPrice()))
- }
- feesEth := new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))
-
log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
- "uncles", len(uncles), "txs", w.current.tcount, "gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start)))
+ "uncles", len(uncles), "txs", w.current.tcount,
+ "gas", block.GasUsed(), "fees", totalFees(block, receipts),
+ "elapsed", common.PrettyDuration(time.Since(start)))
case <-w.exitCh:
log.Info("Worker has exited")
@@ -1058,3 +1068,30 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st
}
return nil
}
+
+// copyReceipts makes a deep copy of the given receipts.
+func copyReceipts(receipts []*types.Receipt) []*types.Receipt {
+ result := make([]*types.Receipt, len(receipts))
+ for i, l := range receipts {
+ cpy := *l
+ result[i] = &cpy
+ }
+ return result
+}
+
+// postSideBlock fires a side chain event, only use it for testing.
+func (w *worker) postSideBlock(event core.ChainSideEvent) {
+ select {
+ case w.chainSideCh <- event:
+ case <-w.exitCh:
+ }
+}
+
+// totalFees computes total consumed fees in ETH. Block transactions and receipts have to have the same order.
+func totalFees(block *types.Block, receipts []*types.Receipt) *big.Float {
+ feesWei := new(big.Int)
+ for i, tx := range block.Transactions() {
+ feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), tx.GasPrice()))
+ }
+ return new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))
+}