aboutsummaryrefslogtreecommitdiff
path: root/core/tx_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r--core/tx_pool.go1523
1 files changed, 1523 insertions, 0 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go
new file mode 100644
index 0000000..caabd5c
--- /dev/null
+++ b/core/tx_pool.go
@@ -0,0 +1,1523 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "errors"
+ "fmt"
+ "math"
+ "math/big"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/common/prque"
+ "github.com/ava-labs/go-ethereum/core/state"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/event"
+ "github.com/ava-labs/go-ethereum/log"
+ "github.com/ava-labs/go-ethereum/metrics"
+ "github.com/ava-labs/go-ethereum/params"
+)
+
+const (
+ // chainHeadChanSize is the size of channel listening to ChainHeadEvent.
+ chainHeadChanSize = 10
+)
+
+var (
+ // ErrInvalidSender is returned if the transaction contains an invalid signature.
+ ErrInvalidSender = errors.New("invalid sender")
+
+ // ErrNonceTooLow is returned if the nonce of a transaction is lower than the
+ // one present in the local chain.
+ ErrNonceTooLow = errors.New("nonce too low")
+
+ // ErrUnderpriced is returned if a transaction's gas price is below the minimum
+ // configured for the transaction pool.
+ ErrUnderpriced = errors.New("transaction underpriced")
+
+ // ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
+ // with a different one without the required price bump.
+ ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
+
+ // ErrInsufficientFunds is returned if the total cost of executing a transaction
+ // is higher than the balance of the user's account.
+ ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value")
+
+ // ErrIntrinsicGas is returned if the transaction is specified to use less gas
+ // than required to start the invocation.
+ ErrIntrinsicGas = errors.New("intrinsic gas too low")
+
+ // ErrGasLimit is returned if a transaction's requested gas limit exceeds the
+ // maximum allowance of the current block.
+ ErrGasLimit = errors.New("exceeds block gas limit")
+
+ // ErrNegativeValue is a sanity error to ensure noone is able to specify a
+ // transaction with a negative value.
+ ErrNegativeValue = errors.New("negative value")
+
+ // ErrOversizedData is returned if the input data of a transaction is greater
+ // than some meaningful limit a user might use. This is not a consensus error
+ // making the transaction invalid, rather a DOS protection.
+ ErrOversizedData = errors.New("oversized data")
+)
+
+var (
+ evictionInterval = time.Minute // Time interval to check for evictable transactions
+ statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
+)
+
+var (
+ // Metrics for the pending pool
+ pendingDiscardMeter = metrics.NewRegisteredMeter("txpool/pending/discard", nil)
+ pendingReplaceMeter = metrics.NewRegisteredMeter("txpool/pending/replace", nil)
+ pendingRateLimitMeter = metrics.NewRegisteredMeter("txpool/pending/ratelimit", nil) // Dropped due to rate limiting
+ pendingNofundsMeter = metrics.NewRegisteredMeter("txpool/pending/nofunds", nil) // Dropped due to out-of-funds
+
+ // Metrics for the queued pool
+ queuedDiscardMeter = metrics.NewRegisteredMeter("txpool/queued/discard", nil)
+ queuedReplaceMeter = metrics.NewRegisteredMeter("txpool/queued/replace", nil)
+ queuedRateLimitMeter = metrics.NewRegisteredMeter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting
+ queuedNofundsMeter = metrics.NewRegisteredMeter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds
+
+ // General tx metrics
+ validMeter = metrics.NewRegisteredMeter("txpool/valid", nil)
+ invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil)
+ underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil)
+
+ pendingCounter = metrics.NewRegisteredCounter("txpool/pending", nil)
+ queuedCounter = metrics.NewRegisteredCounter("txpool/queued", nil)
+ localCounter = metrics.NewRegisteredCounter("txpool/local", nil)
+)
+
+// TxStatus is the current status of a transaction as seen by the pool.
+type TxStatus uint
+
+const (
+ TxStatusUnknown TxStatus = iota
+ TxStatusQueued
+ TxStatusPending
+ TxStatusIncluded
+)
+
+// blockChain provides the state of blockchain and current gas limit to do
+// some pre checks in tx pool and event subscribers.
+type blockChain interface {
+ CurrentBlock() *types.Block
+ GetBlock(hash common.Hash, number uint64) *types.Block
+ StateAt(root common.Hash) (*state.StateDB, error)
+
+ SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
+}
+
+// TxPoolConfig are the configuration parameters of the transaction pool.
+type TxPoolConfig struct {
+ Locals []common.Address // Addresses that should be treated by default as local
+ NoLocals bool // Whether local transaction handling should be disabled
+ Journal string // Journal of local transactions to survive node restarts
+ Rejournal time.Duration // Time interval to regenerate the local transaction journal
+
+ PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
+ PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)
+
+ AccountSlots uint64 // Number of executable transaction slots guaranteed per account
+ GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts
+ AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
+ GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts
+
+ Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
+}
+
+// DefaultTxPoolConfig contains the default configurations for the transaction
+// pool.
+var DefaultTxPoolConfig = TxPoolConfig{
+ Journal: "transactions.rlp",
+ Rejournal: time.Hour,
+
+ PriceLimit: 1,
+ PriceBump: 10,
+
+ AccountSlots: 16,
+ GlobalSlots: 4096,
+ AccountQueue: 64,
+ GlobalQueue: 1024,
+
+ Lifetime: 3 * time.Hour,
+}
+
+// sanitize checks the provided user configurations and changes anything that's
+// unreasonable or unworkable.
+func (config *TxPoolConfig) sanitize() TxPoolConfig {
+ conf := *config
+ if conf.Rejournal < time.Second {
+ log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second)
+ conf.Rejournal = time.Second
+ }
+ if conf.PriceLimit < 1 {
+ log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultTxPoolConfig.PriceLimit)
+ conf.PriceLimit = DefaultTxPoolConfig.PriceLimit
+ }
+ if conf.PriceBump < 1 {
+ log.Warn("Sanitizing invalid txpool price bump", "provided", conf.PriceBump, "updated", DefaultTxPoolConfig.PriceBump)
+ conf.PriceBump = DefaultTxPoolConfig.PriceBump
+ }
+ if conf.AccountSlots < 1 {
+ log.Warn("Sanitizing invalid txpool account slots", "provided", conf.AccountSlots, "updated", DefaultTxPoolConfig.AccountSlots)
+ conf.AccountSlots = DefaultTxPoolConfig.AccountSlots
+ }
+ if conf.GlobalSlots < 1 {
+ log.Warn("Sanitizing invalid txpool global slots", "provided", conf.GlobalSlots, "updated", DefaultTxPoolConfig.GlobalSlots)
+ conf.GlobalSlots = DefaultTxPoolConfig.GlobalSlots
+ }
+ if conf.AccountQueue < 1 {
+ log.Warn("Sanitizing invalid txpool account queue", "provided", conf.AccountQueue, "updated", DefaultTxPoolConfig.AccountQueue)
+ conf.AccountQueue = DefaultTxPoolConfig.AccountQueue
+ }
+ if conf.GlobalQueue < 1 {
+ log.Warn("Sanitizing invalid txpool global queue", "provided", conf.GlobalQueue, "updated", DefaultTxPoolConfig.GlobalQueue)
+ conf.GlobalQueue = DefaultTxPoolConfig.GlobalQueue
+ }
+ if conf.Lifetime < 1 {
+ log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultTxPoolConfig.Lifetime)
+ conf.Lifetime = DefaultTxPoolConfig.Lifetime
+ }
+ return conf
+}
+
+// TxPool contains all currently known transactions. Transactions
+// enter the pool when they are received from the network or submitted
+// locally. They exit the pool when they are included in the blockchain.
+//
+// The pool separates processable transactions (which can be applied to the
+// current state) and future transactions. Transactions move between those
+// two states over time as they are received and processed.
+type TxPool struct {
+ config TxPoolConfig
+ chainconfig *params.ChainConfig
+ chain blockChain
+ gasPrice *big.Int
+ txFeed event.Feed
+ scope event.SubscriptionScope
+ signer types.Signer
+ mu sync.RWMutex
+
+ istanbul bool // Fork indicator whether we are in the istanbul stage.
+
+ currentState *state.StateDB // Current state in the blockchain head
+ pendingNonces *txNoncer // Pending state tracking virtual nonces
+ currentMaxGas uint64 // Current gas limit for transaction caps
+
+ locals *accountSet // Set of local transaction to exempt from eviction rules
+ journal *txJournal // Journal of local transaction to back up to disk
+
+ pending map[common.Address]*txList // All currently processable transactions
+ queue map[common.Address]*txList // Queued but non-processable transactions
+ beats map[common.Address]time.Time // Last heartbeat from each known account
+ all *txLookup // All transactions to allow lookups
+ priced *txPricedList // All transactions sorted by price
+
+ chainHeadCh chan ChainHeadEvent
+ chainHeadSub event.Subscription
+ reqResetCh chan *txpoolResetRequest
+ reqPromoteCh chan *accountSet
+ queueTxEventCh chan *types.Transaction
+ reorgDoneCh chan chan struct{}
+ reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
+ wg sync.WaitGroup // tracks loop, scheduleReorgLoop
+}
+
+type txpoolResetRequest struct {
+ oldHead, newHead *types.Header
+}
+
+// NewTxPool creates a new transaction pool to gather, sort and filter inbound
+// transactions from the network.
+func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
+ // Sanitize the input to ensure no vulnerable gas prices are set
+ config = (&config).sanitize()
+
+ // Create the transaction pool with its initial settings
+ pool := &TxPool{
+ config: config,
+ chainconfig: chainconfig,
+ chain: chain,
+ signer: types.NewEIP155Signer(chainconfig.ChainID),
+ pending: make(map[common.Address]*txList),
+ queue: make(map[common.Address]*txList),
+ beats: make(map[common.Address]time.Time),
+ all: newTxLookup(),
+ chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
+ reqResetCh: make(chan *txpoolResetRequest),
+ reqPromoteCh: make(chan *accountSet),
+ queueTxEventCh: make(chan *types.Transaction),
+ reorgDoneCh: make(chan chan struct{}),
+ reorgShutdownCh: make(chan struct{}),
+ gasPrice: new(big.Int).SetUint64(config.PriceLimit),
+ }
+ pool.locals = newAccountSet(pool.signer)
+ for _, addr := range config.Locals {
+ log.Info("Setting new local account", "address", addr)
+ pool.locals.add(addr)
+ }
+ pool.priced = newTxPricedList(pool.all)
+ pool.reset(nil, chain.CurrentBlock().Header())
+
+ // Start the reorg loop early so it can handle requests generated during journal loading.
+ pool.wg.Add(1)
+ go pool.scheduleReorgLoop()
+
+ // If local transactions and journaling is enabled, load from disk
+ if !config.NoLocals && config.Journal != "" {
+ pool.journal = newTxJournal(config.Journal)
+
+ if err := pool.journal.load(pool.AddLocals); err != nil {
+ log.Warn("Failed to load transaction journal", "err", err)
+ }
+ if err := pool.journal.rotate(pool.local()); err != nil {
+ log.Warn("Failed to rotate transaction journal", "err", err)
+ }
+ }
+
+ // Subscribe events from blockchain and start the main event loop.
+ pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
+ pool.wg.Add(1)
+ go pool.loop()
+
+ return pool
+}
+
+// loop is the transaction pool's main event loop, waiting for and reacting to
+// outside blockchain events as well as for various reporting and transaction
+// eviction events.
+func (pool *TxPool) loop() {
+ defer pool.wg.Done()
+
+ var (
+ prevPending, prevQueued, prevStales int
+ // Start the stats reporting and transaction eviction tickers
+ report = time.NewTicker(statsReportInterval)
+ evict = time.NewTicker(evictionInterval)
+ journal = time.NewTicker(pool.config.Rejournal)
+ // Track the previous head headers for transaction reorgs
+ head = pool.chain.CurrentBlock()
+ )
+ defer report.Stop()
+ defer evict.Stop()
+ defer journal.Stop()
+
+ for {
+ select {
+ // Handle ChainHeadEvent
+ case ev := <-pool.chainHeadCh:
+ if ev.Block != nil {
+ pool.requestReset(head.Header(), ev.Block.Header())
+ head = ev.Block
+ }
+
+ // System shutdown.
+ case <-pool.chainHeadSub.Err():
+ close(pool.reorgShutdownCh)
+ return
+
+ // Handle stats reporting ticks
+ case <-report.C:
+ pool.mu.RLock()
+ pending, queued := pool.stats()
+ stales := pool.priced.stales
+ pool.mu.RUnlock()
+
+ if pending != prevPending || queued != prevQueued || stales != prevStales {
+ log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
+ prevPending, prevQueued, prevStales = pending, queued, stales
+ }
+
+ // Handle inactive account transaction eviction
+ case <-evict.C:
+ pool.mu.Lock()
+ for addr := range pool.queue {
+ // Skip local transactions from the eviction mechanism
+ if pool.locals.contains(addr) {
+ continue
+ }
+ // Any non-locals old enough should be removed
+ if time.Since(pool.beats[addr]) > pool.config.Lifetime {
+ for _, tx := range pool.queue[addr].Flatten() {
+ pool.removeTx(tx.Hash(), true)
+ }
+ }
+ }
+ pool.mu.Unlock()
+
+ // Handle local transaction journal rotation
+ case <-journal.C:
+ if pool.journal != nil {
+ pool.mu.Lock()
+ if err := pool.journal.rotate(pool.local()); err != nil {
+ log.Warn("Failed to rotate local tx journal", "err", err)
+ }
+ pool.mu.Unlock()
+ }
+ }
+ }
+}
+
+// Stop terminates the transaction pool.
+func (pool *TxPool) Stop() {
+ // Unsubscribe all subscriptions registered from txpool
+ pool.scope.Close()
+
+ // Unsubscribe subscriptions registered from blockchain
+ pool.chainHeadSub.Unsubscribe()
+ pool.wg.Wait()
+
+ if pool.journal != nil {
+ pool.journal.close()
+ }
+ log.Info("Transaction pool stopped")
+}
+
+// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
+// starts sending event to the given channel.
+func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
+ return pool.scope.Track(pool.txFeed.Subscribe(ch))
+}
+
+// GasPrice returns the current gas price enforced by the transaction pool.
+func (pool *TxPool) GasPrice() *big.Int {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ return new(big.Int).Set(pool.gasPrice)
+}
+
+// SetGasPrice updates the minimum price required by the transaction pool for a
+// new transaction, and drops all transactions below this threshold.
+func (pool *TxPool) SetGasPrice(price *big.Int) {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ pool.gasPrice = price
+ for _, tx := range pool.priced.Cap(price, pool.locals) {
+ pool.removeTx(tx.Hash(), false)
+ }
+ log.Info("Transaction pool price threshold updated", "price", price)
+}
+
+// Nonce returns the next nonce of an account, with all transactions executable
+// by the pool already applied on top.
+func (pool *TxPool) Nonce(addr common.Address) uint64 {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ return pool.pendingNonces.get(addr)
+}
+
+// Stats retrieves the current pool stats, namely the number of pending and the
+// number of queued (non-executable) transactions.
+func (pool *TxPool) Stats() (int, int) {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ return pool.stats()
+}
+
+// stats retrieves the current pool stats, namely the number of pending and the
+// number of queued (non-executable) transactions.
+func (pool *TxPool) stats() (int, int) {
+ pending := 0
+ for _, list := range pool.pending {
+ pending += list.Len()
+ }
+ queued := 0
+ for _, list := range pool.queue {
+ queued += list.Len()
+ }
+ return pending, queued
+}
+
+// Content retrieves the data content of the transaction pool, returning all the
+// pending as well as queued transactions, grouped by account and sorted by nonce.
+func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ pending := make(map[common.Address]types.Transactions)
+ for addr, list := range pool.pending {
+ pending[addr] = list.Flatten()
+ }
+ queued := make(map[common.Address]types.Transactions)
+ for addr, list := range pool.queue {
+ queued[addr] = list.Flatten()
+ }
+ return pending, queued
+}
+
+// Pending retrieves all currently processable transactions, grouped by origin
+// account and sorted by nonce. The returned transaction set is a copy and can be
+// freely modified by calling code.
+func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ pending := make(map[common.Address]types.Transactions)
+ for addr, list := range pool.pending {
+ pending[addr] = list.Flatten()
+ }
+ return pending, nil
+}
+
+// Locals retrieves the accounts currently considered local by the pool.
+func (pool *TxPool) Locals() []common.Address {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ return pool.locals.flatten()
+}
+
+// local retrieves all currently known local transactions, grouped by origin
+// account and sorted by nonce. The returned transaction set is a copy and can be
+// freely modified by calling code.
+func (pool *TxPool) local() map[common.Address]types.Transactions {
+ txs := make(map[common.Address]types.Transactions)
+ for addr := range pool.locals.accounts {
+ if pending := pool.pending[addr]; pending != nil {
+ txs[addr] = append(txs[addr], pending.Flatten()...)
+ }
+ if queued := pool.queue[addr]; queued != nil {
+ txs[addr] = append(txs[addr], queued.Flatten()...)
+ }
+ }
+ return txs
+}
+
+// validateTx checks whether a transaction is valid according to the consensus
+// rules and adheres to some heuristic limits of the local node (price and size).
+func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
+ // Heuristic limit, reject transactions over 32KB to prevent DOS attacks
+ if tx.Size() > 32*1024 {
+ return ErrOversizedData
+ }
+ // Transactions can't be negative. This may never happen using RLP decoded
+ // transactions but may occur if you create a transaction using the RPC.
+ if tx.Value().Sign() < 0 {
+ return ErrNegativeValue
+ }
+ // Ensure the transaction doesn't exceed the current block limit gas.
+ if pool.currentMaxGas < tx.Gas() {
+ return ErrGasLimit
+ }
+ // Make sure the transaction is signed properly
+ from, err := types.Sender(pool.signer, tx)
+ if err != nil {
+ return ErrInvalidSender
+ }
+ // Drop non-local transactions under our own minimal accepted gas price
+ local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
+ if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
+ return ErrUnderpriced
+ }
+ // Ensure the transaction adheres to nonce ordering
+ if pool.currentState.GetNonce(from) > tx.Nonce() {
+ return ErrNonceTooLow
+ }
+ // Transactor should have enough funds to cover the costs
+ // cost == V + GP * GL
+ if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
+ return ErrInsufficientFunds
+ }
+ // Ensure the transaction has more gas than the basic tx fee.
+ intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, true, pool.istanbul)
+ if err != nil {
+ return err
+ }
+ if tx.Gas() < intrGas {
+ return ErrIntrinsicGas
+ }
+ return nil
+}
+
+// add validates a transaction and inserts it into the non-executable queue for later
+// pending promotion and execution. If the transaction is a replacement for an already
+// pending or queued one, it overwrites the previous transaction if its price is higher.
+//
+// If a newly added transaction is marked as local, its sending account will be
+// whitelisted, preventing any associated transaction from being dropped out of the pool
+// due to pricing constraints.
+func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
+ // If the transaction is already known, discard it
+ hash := tx.Hash()
+ if pool.all.Get(hash) != nil {
+ log.Trace("Discarding already known transaction", "hash", hash)
+ return false, fmt.Errorf("known transaction: %x", hash)
+ }
+
+ // If the transaction fails basic validation, discard it
+ if err := pool.validateTx(tx, local); err != nil {
+ log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
+ invalidTxMeter.Mark(1)
+ return false, err
+ }
+
+ // If the transaction pool is full, discard underpriced transactions
+ if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
+ // If the new transaction is underpriced, don't accept it
+ if !local && pool.priced.Underpriced(tx, pool.locals) {
+ log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
+ underpricedTxMeter.Mark(1)
+ return false, ErrUnderpriced
+ }
+ // New transaction is better than our worse ones, make room for it
+ drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
+ for _, tx := range drop {
+ log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
+ underpricedTxMeter.Mark(1)
+ pool.removeTx(tx.Hash(), false)
+ }
+ }
+
+ // Try to replace an existing transaction in the pending pool
+ from, _ := types.Sender(pool.signer, tx) // already validated
+ if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
+ // Nonce already pending, check if required price bump is met
+ inserted, old := list.Add(tx, pool.config.PriceBump)
+ if !inserted {
+ pendingDiscardMeter.Mark(1)
+ return false, ErrReplaceUnderpriced
+ }
+ // New transaction is better, replace old one
+ if old != nil {
+ pool.all.Remove(old.Hash())
+ pool.priced.Removed(1)
+ pendingReplaceMeter.Mark(1)
+ }
+ pool.all.Add(tx)
+ pool.priced.Put(tx)
+ pool.journalTx(from, tx)
+ pool.queueTxEvent(tx)
+ log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
+ return old != nil, nil
+ }
+
+ // New transaction isn't replacing a pending one, push into queue
+ replaced, err = pool.enqueueTx(hash, tx)
+ if err != nil {
+ return false, err
+ }
+
+ // Mark local addresses and journal local transactions
+ if local {
+ if !pool.locals.contains(from) {
+ log.Info("Setting new local account", "address", from)
+ pool.locals.add(from)
+ }
+ }
+ if local || pool.locals.contains(from) {
+ localCounter.Inc(1)
+ }
+ pool.journalTx(from, tx)
+
+ log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
+ return replaced, nil
+}
+
+// enqueueTx inserts a new transaction into the non-executable transaction queue.
+//
+// Note, this method assumes the pool lock is held!
+func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
+ // Try to insert the transaction into the future queue
+ from, _ := types.Sender(pool.signer, tx) // already validated
+ if pool.queue[from] == nil {
+ pool.queue[from] = newTxList(false)
+ }
+ inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
+ if !inserted {
+ // An older transaction was better, discard this
+ queuedDiscardMeter.Mark(1)
+ return false, ErrReplaceUnderpriced
+ }
+ // Discard any previous transaction and mark this
+ if old != nil {
+ pool.all.Remove(old.Hash())
+ pool.priced.Removed(1)
+ queuedReplaceMeter.Mark(1)
+ } else {
+ // Nothing was replaced, bump the queued counter
+ queuedCounter.Inc(1)
+ }
+ if pool.all.Get(hash) == nil {
+ pool.all.Add(tx)
+ pool.priced.Put(tx)
+ }
+ return old != nil, nil
+}
+
+// journalTx adds the specified transaction to the local disk journal if it is
+// deemed to have been sent from a local account.
+func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
+ // Only journal if it's enabled and the transaction is local
+ if pool.journal == nil || !pool.locals.contains(from) {
+ return
+ }
+ if err := pool.journal.insert(tx); err != nil {
+ log.Warn("Failed to journal local transaction", "err", err)
+ }
+}
+
+// promoteTx adds a transaction to the pending (processable) list of transactions
+// and returns whether it was inserted or an older was better.
+//
+// Note, this method assumes the pool lock is held!
+func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool {
+ // Try to insert the transaction into the pending queue
+ if pool.pending[addr] == nil {
+ pool.pending[addr] = newTxList(true)
+ }
+ list := pool.pending[addr]
+
+ inserted, old := list.Add(tx, pool.config.PriceBump)
+ if !inserted {
+ // An older transaction was better, discard this
+ pool.all.Remove(hash)
+ pool.priced.Removed(1)
+
+ pendingDiscardMeter.Mark(1)
+ return false
+ }
+ // Otherwise discard any previous transaction and mark this
+ if old != nil {
+ pool.all.Remove(old.Hash())
+ pool.priced.Removed(1)
+
+ pendingReplaceMeter.Mark(1)
+ } else {
+ // Nothing was replaced, bump the pending counter
+ pendingCounter.Inc(1)
+ }
+ // Failsafe to work around direct pending inserts (tests)
+ if pool.all.Get(hash) == nil {
+ pool.all.Add(tx)
+ pool.priced.Put(tx)
+ }
+ // Set the potentially new pending nonce and notify any subsystems of the new tx
+ pool.beats[addr] = time.Now()
+ pool.pendingNonces.set(addr, tx.Nonce()+1)
+
+ return true
+}
+
+// AddLocals enqueues a batch of transactions into the pool if they are valid, marking the
+// senders as a local ones, ensuring they go around the local pricing constraints.
+//
+// This method is used to add transactions from the RPC API and performs synchronous pool
+// reorganization and event propagation.
+func (pool *TxPool) AddLocals(txs []*types.Transaction) []error {
+ return pool.addTxs(txs, !pool.config.NoLocals, true)
+}
+
+// AddLocal enqueues a single local transaction into the pool if it is valid. This is
+// a convenience wrapper aroundd AddLocals.
+func (pool *TxPool) AddLocal(tx *types.Transaction) error {
+ errs := pool.AddLocals([]*types.Transaction{tx})
+ return errs[0]
+}
+
+// AddRemotes enqueues a batch of transactions into the pool if they are valid. If the
+// senders are not among the locally tracked ones, full pricing constraints will apply.
+//
+// This method is used to add transactions from the p2p network and does not wait for pool
+// reorganization and internal event propagation.
+func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
+ return pool.addTxs(txs, false, false)
+}
+
+// This is like AddRemotes, but waits for pool reorganization. Tests use this method.
+func (pool *TxPool) AddRemotesSync(txs []*types.Transaction) []error {
+ return pool.addTxs(txs, false, true)
+}
+
+// This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method.
+func (pool *TxPool) addRemoteSync(tx *types.Transaction) error {
+ errs := pool.AddRemotesSync([]*types.Transaction{tx})
+ return errs[0]
+}
+
+// AddRemote enqueues a single transaction into the pool if it is valid. This is a convenience
+// wrapper around AddRemotes.
+//
+// Deprecated: use AddRemotes
+func (pool *TxPool) AddRemote(tx *types.Transaction) error {
+ errs := pool.AddRemotes([]*types.Transaction{tx})
+ return errs[0]
+}
+
+// addTxs attempts to queue a batch of transactions if they are valid.
+func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
+ // Cache senders in transactions before obtaining lock (pool.signer is immutable)
+ for _, tx := range txs {
+ types.Sender(pool.signer, tx)
+ }
+
+ pool.mu.Lock()
+ errs, dirtyAddrs := pool.addTxsLocked(txs, local)
+ pool.mu.Unlock()
+
+ done := pool.requestPromoteExecutables(dirtyAddrs)
+ if sync {
+ <-done
+ }
+ return errs
+}
+
+// addTxsLocked attempts to queue a batch of transactions if they are valid.
+// The transaction pool lock must be held.
+func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
+ dirty := newAccountSet(pool.signer)
+ errs := make([]error, len(txs))
+ for i, tx := range txs {
+ replaced, err := pool.add(tx, local)
+ errs[i] = err
+ if err == nil && !replaced {
+ dirty.addTx(tx)
+ }
+ }
+ validMeter.Mark(int64(len(dirty.accounts)))
+ return errs, dirty
+}
+
+// Status returns the status (unknown/pending/queued) of a batch of transactions
+// identified by their hashes.
+func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ status := make([]TxStatus, len(hashes))
+ for i, hash := range hashes {
+ if tx := pool.all.Get(hash); tx != nil {
+ from, _ := types.Sender(pool.signer, tx) // already validated
+ if pool.pending[from] != nil && pool.pending[from].txs.items[tx.Nonce()] != nil {
+ status[i] = TxStatusPending
+ } else {
+ status[i] = TxStatusQueued
+ }
+ }
+ }
+ return status
+}
+
+// Get returns a transaction if it is contained in the pool and nil otherwise.
+func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
+ return pool.all.Get(hash)
+}
+
+// removeTx removes a single transaction from the queue, moving all subsequent
+// transactions back to the future queue.
+func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
+ // Fetch the transaction we wish to delete
+ tx := pool.all.Get(hash)
+ if tx == nil {
+ return
+ }
+ addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
+
+ // Remove it from the list of known transactions
+ pool.all.Remove(hash)
+ if outofbound {
+ pool.priced.Removed(1)
+ }
+ if pool.locals.contains(addr) {
+ localCounter.Dec(1)
+ }
+ // Remove the transaction from the pending lists and reset the account nonce
+ if pending := pool.pending[addr]; pending != nil {
+ if removed, invalids := pending.Remove(tx); removed {
+ // If no more pending transactions are left, remove the list
+ if pending.Empty() {
+ delete(pool.pending, addr)
+ delete(pool.beats, addr)
+ }
+ // Postpone any invalidated transactions
+ for _, tx := range invalids {
+ pool.enqueueTx(tx.Hash(), tx)
+ }
+ // Update the account nonce if needed
+ pool.pendingNonces.setIfLower(addr, tx.Nonce())
+ // Reduce the pending counter
+ pendingCounter.Dec(int64(1 + len(invalids)))
+ return
+ }
+ }
+ // Transaction is in the future queue
+ if future := pool.queue[addr]; future != nil {
+ if removed, _ := future.Remove(tx); removed {
+ // Reduce the queued counter
+ queuedCounter.Dec(1)
+ }
+ if future.Empty() {
+ delete(pool.queue, addr)
+ }
+ }
+}
+
+// requestPromoteExecutables requests a pool reset to the new head block.
+// The returned channel is closed when the reset has occurred.
+func (pool *TxPool) requestReset(oldHead *types.Header, newHead *types.Header) chan struct{} {
+ select {
+ case pool.reqResetCh <- &txpoolResetRequest{oldHead, newHead}:
+ return <-pool.reorgDoneCh
+ case <-pool.reorgShutdownCh:
+ return pool.reorgShutdownCh
+ }
+}
+
+// requestPromoteExecutables requests transaction promotion checks for the given addresses.
+// The returned channel is closed when the promotion checks have occurred.
+func (pool *TxPool) requestPromoteExecutables(set *accountSet) chan struct{} {
+ select {
+ case pool.reqPromoteCh <- set:
+ return <-pool.reorgDoneCh
+ case <-pool.reorgShutdownCh:
+ return pool.reorgShutdownCh
+ }
+}
+
+// queueTxEvent enqueues a transaction event to be sent in the next reorg run.
+func (pool *TxPool) queueTxEvent(tx *types.Transaction) {
+ select {
+ case pool.queueTxEventCh <- tx:
+ case <-pool.reorgShutdownCh:
+ }
+}
+
+// scheduleReorgLoop schedules runs of reset and promoteExecutables. Code above should not
+// call those methods directly, but request them being run using requestReset and
+// requestPromoteExecutables instead.
+func (pool *TxPool) scheduleReorgLoop() {
+ defer pool.wg.Done()
+
+ var (
+ curDone chan struct{} // non-nil while runReorg is active
+ nextDone = make(chan struct{})
+ launchNextRun bool
+ reset *txpoolResetRequest
+ dirtyAccounts *accountSet
+ queuedEvents = make(map[common.Address]*txSortedMap)
+ )
+ for {
+ // Launch next background reorg if needed
+ if curDone == nil && launchNextRun {
+ // Run the background reorg and announcements
+ go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents)
+
+ // Prepare everything for the next round of reorg
+ curDone, nextDone = nextDone, make(chan struct{})
+ launchNextRun = false
+
+ reset, dirtyAccounts = nil, nil
+ queuedEvents = make(map[common.Address]*txSortedMap)
+ }
+
+ select {
+ case req := <-pool.reqResetCh:
+ // Reset request: update head if request is already pending.
+ if reset == nil {
+ reset = req
+ } else {
+ reset.newHead = req.newHead
+ }
+ launchNextRun = true
+ pool.reorgDoneCh <- nextDone
+
+ case req := <-pool.reqPromoteCh:
+ // Promote request: update address set if request is already pending.
+ if dirtyAccounts == nil {
+ dirtyAccounts = req
+ } else {
+ dirtyAccounts.merge(req)
+ }
+ launchNextRun = true
+ pool.reorgDoneCh <- nextDone
+
+ case tx := <-pool.queueTxEventCh:
+ // Queue up the event, but don't schedule a reorg. It's up to the caller to
+ // request one later if they want the events sent.
+ addr, _ := types.Sender(pool.signer, tx)
+ if _, ok := queuedEvents[addr]; !ok {
+ queuedEvents[addr] = newTxSortedMap()
+ }
+ queuedEvents[addr].Put(tx)
+
+ case <-curDone:
+ curDone = nil
+
+ case <-pool.reorgShutdownCh:
+ // Wait for current run to finish.
+ if curDone != nil {
+ <-curDone
+ }
+ close(nextDone)
+ return
+ }
+ }
+}
+
+// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
+func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*txSortedMap) {
+ defer close(done)
+
+ var promoteAddrs []common.Address
+ if dirtyAccounts != nil {
+ promoteAddrs = dirtyAccounts.flatten()
+ }
+ pool.mu.Lock()
+ if reset != nil {
+ // Reset from the old head to the new, rescheduling any reorged transactions
+ pool.reset(reset.oldHead, reset.newHead)
+
+ // Nonces were reset, discard any events that became stale
+ for addr := range events {
+ events[addr].Forward(pool.pendingNonces.get(addr))
+ if events[addr].Len() == 0 {
+ delete(events, addr)
+ }
+ }
+ // Reset needs promote for all addresses
+ promoteAddrs = promoteAddrs[:0]
+ for addr := range pool.queue {
+ promoteAddrs = append(promoteAddrs, addr)
+ }
+ }
+ // Check for pending transactions for every account that sent new ones
+ promoted := pool.promoteExecutables(promoteAddrs)
+ for _, tx := range promoted {
+ addr, _ := types.Sender(pool.signer, tx)
+ if _, ok := events[addr]; !ok {
+ events[addr] = newTxSortedMap()
+ }
+ events[addr].Put(tx)
+ }
+ // If a new block appeared, validate the pool of pending transactions. This will
+ // remove any transaction that has been included in the block or was invalidated
+ // because of another transaction (e.g. higher gas price).
+ if reset != nil {
+ pool.demoteUnexecutables()
+ }
+ // Ensure pool.queue and pool.pending sizes stay within the configured limits.
+ pool.truncatePending()
+ pool.truncateQueue()
+
+ // Update all accounts to the latest known pending nonce
+ for addr, list := range pool.pending {
+ txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
+ pool.pendingNonces.set(addr, txs[len(txs)-1].Nonce()+1)
+ }
+ pool.mu.Unlock()
+
+ // Notify subsystems for newly added transactions
+ if len(events) > 0 {
+ var txs []*types.Transaction
+ for _, set := range events {
+ txs = append(txs, set.Flatten()...)
+ }
+ pool.txFeed.Send(NewTxsEvent{txs})
+ }
+}
+
+// reset retrieves the current state of the blockchain and ensures the content
+// of the transaction pool is valid with regard to the chain state.
+func (pool *TxPool) reset(oldHead, newHead *types.Header) {
+ // If we're reorging an old state, reinject all dropped transactions
+ var reinject types.Transactions
+
+ if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
+ // If the reorg is too deep, avoid doing it (will happen during fast sync)
+ oldNum := oldHead.Number.Uint64()
+ newNum := newHead.Number.Uint64()
+
+ if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
+ log.Debug("Skipping deep transaction reorg", "depth", depth)
+ } else {
+ // Reorg seems shallow enough to pull in all transactions into memory
+ var discarded, included types.Transactions
+ var (
+ rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
+ add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
+ )
+ if rem == nil {
+ // This can happen if a setHead is performed, where we simply discard the old
+ // head from the chain.
+ // If that is the case, we don't have the lost transactions any more, and
+ // there's nothing to add
+ if newNum < oldNum {
+ // If the reorg ended up on a lower number, it's indicative of setHead being the cause
+ log.Debug("Skipping transaction reset caused by setHead",
+ "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
+ } else {
+ // If we reorged to a same or higher number, then it's not a case of setHead
+ log.Warn("Transaction pool reset with missing oldhead",
+ "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
+ }
+ return
+ }
+ for rem.NumberU64() > add.NumberU64() {
+ discarded = append(discarded, rem.Transactions()...)
+ if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
+ log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
+ return
+ }
+ }
+ for add.NumberU64() > rem.NumberU64() {
+ included = append(included, add.Transactions()...)
+ if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
+ log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
+ return
+ }
+ }
+ for rem.Hash() != add.Hash() {
+ discarded = append(discarded, rem.Transactions()...)
+ if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
+ log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
+ return
+ }
+ included = append(included, add.Transactions()...)
+ if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
+ log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
+ return
+ }
+ }
+ reinject = types.TxDifference(discarded, included)
+ }
+ }
+ // Initialize the internal state to the current head
+ if newHead == nil {
+ newHead = pool.chain.CurrentBlock().Header() // Special case during testing
+ }
+ statedb, err := pool.chain.StateAt(newHead.Root)
+ if err != nil {
+ log.Error("Failed to reset txpool state", "err", err)
+ return
+ }
+ pool.currentState = statedb
+ pool.pendingNonces = newTxNoncer(statedb)
+ pool.currentMaxGas = newHead.GasLimit
+
+ // Inject any transactions discarded due to reorgs
+ log.Debug("Reinjecting stale transactions", "count", len(reinject))
+ senderCacher.recover(pool.signer, reinject)
+ pool.addTxsLocked(reinject, false)
+
+ // Update all fork indicator by next pending block number.
+ next := new(big.Int).Add(newHead.Number, big.NewInt(1))
+ pool.istanbul = pool.chainconfig.IsIstanbul(next)
+}
+
+// promoteExecutables moves transactions that have become processable from the
+// future queue to the set of pending transactions. During this process, all
+// invalidated transactions (low nonce, low balance) are deleted.
+func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
+ // Track the promoted transactions to broadcast them at once
+ var promoted []*types.Transaction
+
+ // Iterate over all accounts and promote any executable transactions
+ for _, addr := range accounts {
+ list := pool.queue[addr]
+ if list == nil {
+ continue // Just in case someone calls with a non existing account
+ }
+ // Drop all transactions that are deemed too old (low nonce)
+ forwards := list.Forward(pool.currentState.GetNonce(addr))
+ for _, tx := range forwards {
+ hash := tx.Hash()
+ pool.all.Remove(hash)
+ log.Trace("Removed old queued transaction", "hash", hash)
+ }
+ // Drop all transactions that are too costly (low balance or out of gas)
+ drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
+ for _, tx := range drops {
+ hash := tx.Hash()
+ pool.all.Remove(hash)
+ log.Trace("Removed unpayable queued transaction", "hash", hash)
+ }
+ queuedNofundsMeter.Mark(int64(len(drops)))
+
+ // Gather all executable transactions and promote them
+ readies := list.Ready(pool.pendingNonces.get(addr))
+ for _, tx := range readies {
+ hash := tx.Hash()
+ if pool.promoteTx(addr, hash, tx) {
+ log.Trace("Promoting queued transaction", "hash", hash)
+ promoted = append(promoted, tx)
+ }
+ }
+ queuedCounter.Dec(int64(len(readies)))
+
+ // Drop all transactions over the allowed limit
+ var caps types.Transactions
+ if !pool.locals.contains(addr) {
+ caps = list.Cap(int(pool.config.AccountQueue))
+ for _, tx := range caps {
+ hash := tx.Hash()
+ pool.all.Remove(hash)
+ log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
+ }
+ queuedRateLimitMeter.Mark(int64(len(caps)))
+ }
+ // Mark all the items dropped as removed
+ pool.priced.Removed(len(forwards) + len(drops) + len(caps))
+ queuedCounter.Dec(int64(len(forwards) + len(drops) + len(caps)))
+ if pool.locals.contains(addr) {
+ localCounter.Dec(int64(len(forwards) + len(drops) + len(caps)))
+ }
+ // Delete the entire queue entry if it became empty.
+ if list.Empty() {
+ delete(pool.queue, addr)
+ }
+ }
+ return promoted
+}
+
+// truncatePending removes transactions from the pending queue if the pool is above the
+// pending limit. The algorithm tries to reduce transaction counts by an approximately
+// equal number for all for accounts with many pending transactions.
+func (pool *TxPool) truncatePending() {
+ pending := uint64(0)
+ for _, list := range pool.pending {
+ pending += uint64(list.Len())
+ }
+ if pending <= pool.config.GlobalSlots {
+ return
+ }
+
+ pendingBeforeCap := pending
+ // Assemble a spam order to penalize large transactors first
+ spammers := prque.New(nil)
+ for addr, list := range pool.pending {
+ // Only evict transactions from high rollers
+ if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
+ spammers.Push(addr, int64(list.Len()))
+ }
+ }
+ // Gradually drop transactions from offenders
+ offenders := []common.Address{}
+ for pending > pool.config.GlobalSlots && !spammers.Empty() {
+ // Retrieve the next offender if not local address
+ offender, _ := spammers.Pop()
+ offenders = append(offenders, offender.(common.Address))
+
+ // Equalize balances until all the same or below threshold
+ if len(offenders) > 1 {
+ // Calculate the equalization threshold for all current offenders
+ threshold := pool.pending[offender.(common.Address)].Len()
+
+ // Iteratively reduce all offenders until below limit or threshold reached
+ for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
+ for i := 0; i < len(offenders)-1; i++ {
+ list := pool.pending[offenders[i]]
+
+ caps := list.Cap(list.Len() - 1)
+ for _, tx := range caps {
+ // Drop the transaction from the global pools too
+ hash := tx.Hash()
+ pool.all.Remove(hash)
+
+ // Update the account nonce to the dropped transaction
+ pool.pendingNonces.setIfLower(offenders[i], tx.Nonce())
+ log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
+ }
+ pool.priced.Removed(len(caps))
+ pendingCounter.Dec(int64(len(caps)))
+ if pool.locals.contains(offenders[i]) {
+ localCounter.Dec(int64(len(caps)))
+ }
+ pending--
+ }
+ }
+ }
+ }
+
+ // If still above threshold, reduce to limit or min allowance
+ if pending > pool.config.GlobalSlots && len(offenders) > 0 {
+ for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
+ for _, addr := range offenders {
+ list := pool.pending[addr]
+
+ caps := list.Cap(list.Len() - 1)
+ for _, tx := range caps {
+ // Drop the transaction from the global pools too
+ hash := tx.Hash()
+ pool.all.Remove(hash)
+
+ // Update the account nonce to the dropped transaction
+ pool.pendingNonces.setIfLower(addr, tx.Nonce())
+ log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
+ }
+ pool.priced.Removed(len(caps))
+ pendingCounter.Dec(int64(len(caps)))
+ if pool.locals.contains(addr) {
+ localCounter.Dec(int64(len(caps)))
+ }
+ pending--
+ }
+ }
+ }
+ pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending))
+}
+
+// truncateQueue drops the oldes transactions in the queue if the pool is above the global queue limit.
+func (pool *TxPool) truncateQueue() {
+ queued := uint64(0)
+ for _, list := range pool.queue {
+ queued += uint64(list.Len())
+ }
+ if queued <= pool.config.GlobalQueue {
+ return
+ }
+
+ // Sort all accounts with queued transactions by heartbeat
+ addresses := make(addressesByHeartbeat, 0, len(pool.queue))
+ for addr := range pool.queue {
+ if !pool.locals.contains(addr) { // don't drop locals
+ addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
+ }
+ }
+ sort.Sort(addresses)
+
+ // Drop transactions until the total is below the limit or only locals remain
+ for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
+ addr := addresses[len(addresses)-1]
+ list := pool.queue[addr.address]
+
+ addresses = addresses[:len(addresses)-1]
+
+ // Drop all transactions if they are less than the overflow
+ if size := uint64(list.Len()); size <= drop {
+ for _, tx := range list.Flatten() {
+ pool.removeTx(tx.Hash(), true)
+ }
+ drop -= size
+ queuedRateLimitMeter.Mark(int64(size))
+ continue
+ }
+ // Otherwise drop only last few transactions
+ txs := list.Flatten()
+ for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
+ pool.removeTx(txs[i].Hash(), true)
+ drop--
+ queuedRateLimitMeter.Mark(1)
+ }
+ }
+}
+
+// demoteUnexecutables removes invalid and processed transactions from the pools
+// executable/pending queue and any subsequent transactions that become unexecutable
+// are moved back into the future queue.
+func (pool *TxPool) demoteUnexecutables() {
+ // Iterate over all accounts and demote any non-executable transactions
+ for addr, list := range pool.pending {
+ nonce := pool.currentState.GetNonce(addr)
+
+ // Drop all transactions that are deemed too old (low nonce)
+ olds := list.Forward(nonce)
+ for _, tx := range olds {
+ hash := tx.Hash()
+ pool.all.Remove(hash)
+ log.Trace("Removed old pending transaction", "hash", hash)
+ }
+ // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
+ drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
+ for _, tx := range drops {
+ hash := tx.Hash()
+ log.Trace("Removed unpayable pending transaction", "hash", hash)
+ pool.all.Remove(hash)
+ }
+ pool.priced.Removed(len(olds) + len(drops))
+ pendingNofundsMeter.Mark(int64(len(drops)))
+
+ for _, tx := range invalids {
+ hash := tx.Hash()
+ log.Trace("Demoting pending transaction", "hash", hash)
+ pool.enqueueTx(hash, tx)
+ }
+ pendingCounter.Dec(int64(len(olds) + len(drops) + len(invalids)))
+ if pool.locals.contains(addr) {
+ localCounter.Dec(int64(len(olds) + len(drops) + len(invalids)))
+ }
+ // If there's a gap in front, alert (should never happen) and postpone all transactions
+ if list.Len() > 0 && list.txs.Get(nonce) == nil {
+ gapped := list.Cap(0)
+ for _, tx := range gapped {
+ hash := tx.Hash()
+ log.Error("Demoting invalidated transaction", "hash", hash)
+ pool.enqueueTx(hash, tx)
+ }
+ pendingCounter.Dec(int64(len(gapped)))
+ }
+ // Delete the entire queue entry if it became empty.
+ if list.Empty() {
+ delete(pool.pending, addr)
+ delete(pool.beats, addr)
+ }
+ }
+}
+
+// addressByHeartbeat is an account address tagged with its last activity timestamp.
+type addressByHeartbeat struct {
+ address common.Address
+ heartbeat time.Time
+}
+
+type addressesByHeartbeat []addressByHeartbeat
+
+func (a addressesByHeartbeat) Len() int { return len(a) }
+func (a addressesByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) }
+func (a addressesByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
+
+// accountSet is simply a set of addresses to check for existence, and a signer
+// capable of deriving addresses from transactions.
+type accountSet struct {
+ accounts map[common.Address]struct{}
+ signer types.Signer
+ cache *[]common.Address
+}
+
+// newAccountSet creates a new address set with an associated signer for sender
+// derivations.
+func newAccountSet(signer types.Signer, addrs ...common.Address) *accountSet {
+ as := &accountSet{
+ accounts: make(map[common.Address]struct{}),
+ signer: signer,
+ }
+ for _, addr := range addrs {
+ as.add(addr)
+ }
+ return as
+}
+
+// contains checks if a given address is contained within the set.
+func (as *accountSet) contains(addr common.Address) bool {
+ _, exist := as.accounts[addr]
+ return exist
+}
+
+// containsTx checks if the sender of a given tx is within the set. If the sender
+// cannot be derived, this method returns false.
+func (as *accountSet) containsTx(tx *types.Transaction) bool {
+ if addr, err := types.Sender(as.signer, tx); err == nil {
+ return as.contains(addr)
+ }
+ return false
+}
+
+// add inserts a new address into the set to track.
+func (as *accountSet) add(addr common.Address) {
+ as.accounts[addr] = struct{}{}
+ as.cache = nil
+}
+
+// addTx adds the sender of tx into the set.
+func (as *accountSet) addTx(tx *types.Transaction) {
+ if addr, err := types.Sender(as.signer, tx); err == nil {
+ as.add(addr)
+ }
+}
+
+// flatten returns the list of addresses within this set, also caching it for later
+// reuse. The returned slice should not be changed!
+func (as *accountSet) flatten() []common.Address {
+ if as.cache == nil {
+ accounts := make([]common.Address, 0, len(as.accounts))
+ for account := range as.accounts {
+ accounts = append(accounts, account)
+ }
+ as.cache = &accounts
+ }
+ return *as.cache
+}
+
+// merge adds all addresses from the 'other' set into 'as'.
+func (as *accountSet) merge(other *accountSet) {
+ for addr := range other.accounts {
+ as.accounts[addr] = struct{}{}
+ }
+ as.cache = nil
+}
+
+// txLookup is used internally by TxPool to track transactions while allowing lookup without
+// mutex contention.
+//
+// Note, although this type is properly protected against concurrent access, it
+// is **not** a type that should ever be mutated or even exposed outside of the
+// transaction pool, since its internal state is tightly coupled with the pools
+// internal mechanisms. The sole purpose of the type is to permit out-of-bound
+// peeking into the pool in TxPool.Get without having to acquire the widely scoped
+// TxPool.mu mutex.
+type txLookup struct {
+ all map[common.Hash]*types.Transaction
+ lock sync.RWMutex
+}
+
+// newTxLookup returns a new txLookup structure.
+func newTxLookup() *txLookup {
+ return &txLookup{
+ all: make(map[common.Hash]*types.Transaction),
+ }
+}
+
+// Range calls f on each key and value present in the map.
+func (t *txLookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) {
+ t.lock.RLock()
+ defer t.lock.RUnlock()
+
+ for key, value := range t.all {
+ if !f(key, value) {
+ break
+ }
+ }
+}
+
+// Get returns a transaction if it exists in the lookup, or nil if not found.
+func (t *txLookup) Get(hash common.Hash) *types.Transaction {
+ t.lock.RLock()
+ defer t.lock.RUnlock()
+
+ return t.all[hash]
+}
+
+// Count returns the current number of items in the lookup.
+func (t *txLookup) Count() int {
+ t.lock.RLock()
+ defer t.lock.RUnlock()
+
+ return len(t.all)
+}
+
+// Add adds a transaction to the lookup.
+func (t *txLookup) Add(tx *types.Transaction) {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ t.all[tx.Hash()] = tx
+}
+
+// Remove removes a transaction from the lookup.
+func (t *txLookup) Remove(hash common.Hash) {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ delete(t.all, hash)
+}