aboutsummaryrefslogtreecommitdiff
path: root/miner
diff options
context:
space:
mode:
authorTed Yin <[email protected]>2020-09-18 13:14:29 -0400
committerGitHub <[email protected]>2020-09-18 13:14:29 -0400
commitd048937c48753d9eaef771bf71820cf95d79df26 (patch)
tree1a7f65fcd72e77092525ab01625b8b9d365e3e40 /miner
parent7d1388c743b4ec8f4a86bea95bfada785dee83f7 (diff)
parent7d8c85cf8895b0f998d8eafb02f99d5b689fcd59 (diff)
Merge pull request #34 from ava-labs/devv0.3.0-rc.5
Dev
Diffstat (limited to 'miner')
-rw-r--r--miner/miner.go81
-rw-r--r--miner/unconfirmed.go4
-rw-r--r--miner/worker.go167
3 files changed, 161 insertions, 91 deletions
diff --git a/miner/miner.go b/miner/miner.go
index 3edbd80..bbe704f 100644
--- a/miner/miner.go
+++ b/miner/miner.go
@@ -27,9 +27,9 @@ import (
"github.com/ava-labs/coreth/core/state"
"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/params"
- "github.com/ava-labs/go-ethereum/common"
- "github.com/ava-labs/go-ethereum/common/hexutil"
- "github.com/ava-labs/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/event"
)
// Backend wraps all methods required for mining.
@@ -54,55 +54,88 @@ type Config struct {
}
type Miner struct {
- w *worker
+ worker *worker
}
func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(block *types.Block) bool, mcb *MinerCallbacks) *Miner {
return &Miner{
- w: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, mcb),
+ worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true, mcb),
}
}
-func (self *Miner) Start(coinbase common.Address) {
- self.w.start()
+func (miner *Miner) Start(coinbase common.Address) {
+ miner.worker.start()
}
-func (self *Miner) Stop() {
- self.w.stop()
+func (miner *Miner) Stop() {
+ miner.worker.stop()
}
-func (self *Miner) Mining() bool {
+func (miner *Miner) Mining() bool {
return false
}
-func (self *Miner) HashRate() uint64 {
+func (miner *Miner) HashRate() uint64 {
return 0
}
-func (self *Miner) SetExtra(extra []byte) error {
+func (miner *Miner) SetExtra(extra []byte) error {
if uint64(len(extra)) > params.MaximumExtraDataSize {
- return fmt.Errorf("Extra exceeds max length. %d > %v", len(extra), params.MaximumExtraDataSize)
+ return fmt.Errorf("extra exceeds max length. %d > %v", len(extra), params.MaximumExtraDataSize)
}
- self.w.setExtra(extra)
+ miner.worker.setExtra(extra)
return nil
}
-func (self *Miner) SetRecommitInterval(interval time.Duration) {
- self.w.setRecommitInterval(interval)
+// SetRecommitInterval sets the interval for sealing work resubmitting.
+func (miner *Miner) SetRecommitInterval(interval time.Duration) {
+ miner.worker.setRecommitInterval(interval)
}
-func (self *Miner) Pending() (*types.Block, *state.StateDB) {
- return self.w.pending()
+// Pending returns the currently pending block and associated state.
+func (miner *Miner) Pending() (*types.Block, *state.StateDB) {
+ return miner.worker.pending()
}
-func (self *Miner) PendingBlock() *types.Block {
- return self.w.pendingBlock()
+// PendingBlock returns the currently pending block.
+//
+// Note, to access both the pending block and the pending state
+// simultaneously, please use Pending(), as the pending state can
+// change between multiple method calls
+func (miner *Miner) PendingBlock() *types.Block {
+ return miner.worker.pendingBlock()
+}
+
+func (miner *Miner) SetEtherbase(addr common.Address) {
+ miner.worker.setEtherbase(addr)
+}
+
+// EnablePreseal turns on the preseal mining feature. It's enabled by default.
+// Note this function shouldn't be exposed to API, it's unnecessary for users
+// (miners) to actually know the underlying detail. It's only for outside project
+// which uses this library.
+func (miner *Miner) EnablePreseal() {
+ miner.worker.enablePreseal()
+}
+
+// DisablePreseal turns off the preseal mining feature. It's necessary for some
+// fake consensus engine which can seal blocks instantaneously.
+// Note this function shouldn't be exposed to API, it's unnecessary for users
+// (miners) to actually know the underlying detail. It's only for outside project
+// which uses this library.
+func (miner *Miner) DisablePreseal() {
+ miner.worker.disablePreseal()
}
-func (self *Miner) SetEtherbase(addr common.Address) {
- self.w.setEtherbase(addr)
+// SubscribePendingLogs starts delivering logs from pending transactions
+// to the given channel.
+func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription {
+ return miner.worker.pendingLogsFeed.Subscribe(ch)
+}
+func (miner *Miner) GenBlock() {
+ miner.worker.genBlock()
}
-func (self *Miner) GenBlock() {
- self.w.genBlock()
+func (miner *Miner) GetWorkerMux() *event.TypeMux {
+ return miner.worker.mux
}
diff --git a/miner/unconfirmed.go b/miner/unconfirmed.go
index 1b8868b..ec2cfde 100644
--- a/miner/unconfirmed.go
+++ b/miner/unconfirmed.go
@@ -21,8 +21,8 @@ import (
"sync"
"github.com/ava-labs/coreth/core/types"
- "github.com/ava-labs/go-ethereum/common"
- "github.com/ava-labs/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/log"
)
// chainRetriever is used by the unconfirmed block set to verify whether a previously
diff --git a/miner/worker.go b/miner/worker.go
index 4a6303b..72597b0 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -33,10 +33,11 @@ import (
"github.com/ava-labs/coreth/core/state"
"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/params"
- "github.com/ava-labs/go-ethereum/common"
- "github.com/ava-labs/go-ethereum/event"
- "github.com/ava-labs/go-ethereum/log"
mapset "github.com/deckarep/golang-set"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/trie"
)
const (
@@ -137,6 +138,9 @@ type worker struct {
eth Backend
chain *core.BlockChain
+ // Feeds
+ pendingLogsFeed event.Feed
+
// Subscriptions
mux *event.TypeMux
txsCh chan core.NewTxsEvent
@@ -175,6 +179,13 @@ type worker struct {
running int32 // The indicator whether the consensus engine is running or not.
newTxs int32 // New arrival transaction count since last sealing work submitting.
+ // noempty is the flag used to control whether the feature of pre-seal empty
+ // block is enabled. The default value is false(pre-seal is enabled by default).
+ // But in some special scenario the consensus engine will seal blocks instantaneously,
+ // in this case this feature will add all empty blocks into canonical chain
+ // non-stop and no real transaction will be included.
+ noempty uint32
+
// External functions
isLocalBlock func(block *types.Block) bool // Function used to determine whether the specified block is mined by local miner.
@@ -189,7 +200,7 @@ type worker struct {
minerCallbacks *MinerCallbacks
}
-func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, mcb *MinerCallbacks) *worker {
+func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool, mcb *MinerCallbacks) *worker {
worker := &worker{
config: config,
chainConfig: chainConfig,
@@ -239,8 +250,9 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
go worker.taskLoop()
// Submit first work to initialize pending state.
- worker.startCh <- struct{}{}
-
+ if init {
+ worker.startCh <- struct{}{}
+ }
return worker
}
@@ -263,6 +275,16 @@ func (w *worker) setRecommitInterval(interval time.Duration) {
w.resubmitIntervalCh <- interval
}
+// disablePreseal disables pre-sealing mining feature
+func (w *worker) disablePreseal() {
+ atomic.StoreUint32(&w.noempty, 1)
+}
+
+// enablePreseal enables pre-sealing mining feature
+func (w *worker) enablePreseal() {
+ atomic.StoreUint32(&w.noempty, 0)
+}
+
// pending returns the pending state and corresponding block.
func (w *worker) pending() (*types.Block, *state.StateDB) {
// return a snapshot to avoid contention on currentMu mutex
@@ -301,9 +323,32 @@ func (w *worker) isRunning() bool {
// close terminates all background threads maintained by the worker.
// Note the worker does not support being closed multiple times.
func (w *worker) close() {
+ atomic.StoreInt32(&w.running, 0)
close(w.exitCh)
}
+// recalcRecommit recalculates the resubmitting interval upon feedback.
+func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) time.Duration {
+ var (
+ prevF = float64(prev.Nanoseconds())
+ next float64
+ )
+ if inc {
+ next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias)
+ max := float64(maxRecommitInterval.Nanoseconds())
+ if next > max {
+ next = max
+ }
+ } else {
+ next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias)
+ min := float64(minRecommit.Nanoseconds())
+ if next < min {
+ next = min
+ }
+ }
+ return time.Duration(int64(next))
+}
+
func (w *worker) genBlock() {
interrupt := new(int32)
*interrupt = commitInterruptNone
@@ -323,6 +368,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
)
timer := time.NewTimer(0)
+ defer timer.Stop()
<-timer.C // discard the initial tick
// commit aborts in-flight transaction execution with given signal and resubmits a new one.
@@ -335,27 +381,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
timer.Reset(recommit)
atomic.StoreInt32(&w.newTxs, 0)
}
- // recalcRecommit recalculates the resubmitting interval upon feedback.
- recalcRecommit := func(target float64, inc bool) {
- var (
- prev = float64(recommit.Nanoseconds())
- next float64
- )
- if inc {
- next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias)
- // Recap if interval is larger than the maximum time interval
- if next > float64(maxRecommitInterval.Nanoseconds()) {
- next = float64(maxRecommitInterval.Nanoseconds())
- }
- } else {
- next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias)
- // Recap if interval is less than the user specified minimum
- if next < float64(minRecommit.Nanoseconds()) {
- next = float64(minRecommit.Nanoseconds())
- }
- }
- recommit = time.Duration(int64(next))
- }
// clearPending cleans the stale pending tasks.
clearPending := func(number uint64) {
w.pendingMu.Lock()
@@ -415,11 +440,12 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
// Adjust resubmit interval by feedback.
if adjust.inc {
before := recommit
- recalcRecommit(float64(recommit.Nanoseconds())/adjust.ratio, true)
+ target := float64(recommit.Nanoseconds()) / adjust.ratio
+ recommit = recalcRecommit(minRecommit, recommit, target, true)
log.Trace("Increase miner recommit interval", "from", before, "to", recommit)
} else {
before := recommit
- recalcRecommit(float64(minRecommit.Nanoseconds()), false)
+ recommit = recalcRecommit(minRecommit, recommit, float64(minRecommit.Nanoseconds()), false)
log.Trace("Decrease miner recommit interval", "from", before, "to", recommit)
}
@@ -514,8 +540,9 @@ func (w *worker) mainLoop() {
w.updateSnapshot()
}
} else {
- // If clique is running in dev mode(period is 0), disable
- // advance sealing here.
+ // Special case, if the consensus engine is 0 period clique(dev mode),
+ // submit mining work here since all empty submission will be rejected
+ // by clique. Of course the advance sealing(empty submission) is disabled.
if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 {
w.commitNewWork(nil, true, time.Now().Unix())
}
@@ -576,7 +603,7 @@ func (w *worker) taskLoop() {
continue
}
w.pendingMu.Lock()
- w.pendingTasks[w.engine.SealHash(task.block.Header())] = task
+ w.pendingTasks[sealHash] = task
w.pendingMu.Unlock()
if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
@@ -636,8 +663,7 @@ func (w *worker) resultLoop() {
}
// Commit block and state to database.
//fmt.Printf("parent1: %s\n", w.chain.CurrentBlock().Hash().String())
- stat, err := w.chain.WriteBlockWithState(block, receipts, task.state)
- stat = core.CanonStatTy
+ _, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true)
//fmt.Printf("parent2: %s\n", w.chain.CurrentBlock().Hash().String())
if err != nil {
log.Error("Failed writing block to chain", "err", err)
@@ -652,16 +678,6 @@ func (w *worker) resultLoop() {
// Broadcast the block and announce chain insertion event
w.mux.Post(core.NewMinedBlockEvent{Block: block})
- var events []interface{}
- switch stat {
- case core.CanonStatTy:
- events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
- events = append(events, core.ChainHeadEvent{Block: block})
- case core.SideStatTy:
- events = append(events, core.ChainSideEvent{Block: block})
- }
- w.chain.PostChainEvents(events, logs)
-
// Insert the block into the set of pending ones to resultLoop for confirmations
w.unconfirmed.Insert(block.NumberU64(), block.Hash())
@@ -751,6 +767,7 @@ func (w *worker) updateSnapshot() {
w.current.txs,
uncles,
w.current.receipts,
+ new(trie.Trie),
nil,
)
@@ -760,7 +777,7 @@ func (w *worker) updateSnapshot() {
func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {
snap := w.current.state.Snapshot()
- receipt, _, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig())
+ receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig())
if err != nil {
w.current.state.RevertToSnapshot(snap)
return nil, err
@@ -874,7 +891,7 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
cpy[i] = new(types.Log)
*cpy[i] = *l
}
- go w.mux.Post(core.PendingLogsEvent{Logs: cpy})
+ w.pendingLogsFeed.Send(cpy)
}
// Notify resubmit loop to decrease resubmitting interval if current interval is larger
// than the user-specified one.
@@ -978,9 +995,9 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
commitUncles(w.localUncles)
commitUncles(w.remoteUncles)
- if !noempty && !w.manualMining {
- // Create an empty block based on temporary copied state for sealing in advance without waiting block
- // execution finished.
+ // Create an empty block based on temporary copied state for
+ // sealing in advance without waiting block execution finished.
+ if !noempty && atomic.LoadUint32(&w.noempty) == 0 && !w.manualMining {
w.commit(uncles, nil, false, tstart)
}
@@ -990,8 +1007,10 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
log.Error("Failed to fetch pending transactions", "err", err)
return
}
- // Short circuit if there is no available pending transactions
- if len(pending) == 0 && !w.manualMining {
+ // Short circuit if there is no available pending transactions.
+ // But if we disable empty precommit already, ignore it. Since
+ // empty block is necessary to keep the liveness of the network.
+ if len(pending) == 0 && atomic.LoadUint32(&w.noempty) == 0 && !w.manualMining {
w.updateSnapshot()
return
}
@@ -1022,13 +1041,9 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
// and commits new work if consensus engine is running.
func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
// Deep copy receipts here to avoid interaction between different tasks.
- receipts := make([]*types.Receipt, len(w.current.receipts))
- for i, l := range w.current.receipts {
- receipts[i] = new(types.Receipt)
- *receipts[i] = *l
- }
+ receipts := copyReceipts(w.current.receipts)
s := w.current.state.Copy()
- block, err := w.engine.FinalizeAndAssemble(w.chain, w.current.header, s, w.current.txs, uncles, w.current.receipts)
+ block, err := w.engine.FinalizeAndAssemble(w.chain, w.current.header, s, w.current.txs, uncles, receipts)
if err != nil {
return err
}
@@ -1039,15 +1054,10 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st
select {
case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
w.unconfirmed.Shift(block.NumberU64() - 1)
-
- feesWei := new(big.Int)
- for i, tx := range block.Transactions() {
- feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), tx.GasPrice()))
- }
- feesEth := new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))
-
log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
- "uncles", len(uncles), "txs", w.current.tcount, "gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start)))
+ "uncles", len(uncles), "txs", w.current.tcount,
+ "gas", block.GasUsed(), "fees", totalFees(block, receipts),
+ "elapsed", common.PrettyDuration(time.Since(start)))
case <-w.exitCh:
log.Info("Worker has exited")
@@ -1058,3 +1068,30 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st
}
return nil
}
+
+// copyReceipts makes a deep copy of the given receipts.
+func copyReceipts(receipts []*types.Receipt) []*types.Receipt {
+ result := make([]*types.Receipt, len(receipts))
+ for i, l := range receipts {
+ cpy := *l
+ result[i] = &cpy
+ }
+ return result
+}
+
+// postSideBlock fires a side chain event, only use it for testing.
+func (w *worker) postSideBlock(event core.ChainSideEvent) {
+ select {
+ case w.chainSideCh <- event:
+ case <-w.exitCh:
+ }
+}
+
+// totalFees computes total consumed fees in ETH. Block transactions and receipts have to have the same order.
+func totalFees(block *types.Block, receipts []*types.Receipt) *big.Float {
+ feesWei := new(big.Int)
+ for i, tx := range block.Transactions() {
+ feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), tx.GasPrice()))
+ }
+ return new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))
+}