aboutsummaryrefslogtreecommitdiff
path: root/core/tx_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r--core/tx_pool.go1523
1 files changed, 1523 insertions, 0 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go
new file mode 100644
index 0000000..caabd5c
--- /dev/null
+++ b/core/tx_pool.go
@@ -0,0 +1,1523 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "errors"
+ "fmt"
+ "math"
+ "math/big"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/common/prque"
+ "github.com/ava-labs/go-ethereum/core/state"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/event"
+ "github.com/ava-labs/go-ethereum/log"
+ "github.com/ava-labs/go-ethereum/metrics"
+ "github.com/ava-labs/go-ethereum/params"
+)
+
+const (
+ // chainHeadChanSize is the size of channel listening to ChainHeadEvent.
+ chainHeadChanSize = 10
+)
+
+var (
+ // ErrInvalidSender is returned if the transaction contains an invalid signature.
+ ErrInvalidSender = errors.New("invalid sender")
+
+ // ErrNonceTooLow is returned if the nonce of a transaction is lower than the
+ // one present in the local chain.
+ ErrNonceTooLow = errors.New("nonce too low")
+
+ // ErrUnderpriced is returned if a transaction's gas price is below the minimum
+ // configured for the transaction pool.
+ ErrUnderpriced = errors.New("transaction underpriced")
+
+ // ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
+ // with a different one without the required price bump.
+ ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
+
+ // ErrInsufficientFunds is returned if the total cost of executing a transaction
+ // is higher than the balance of the user's account.
+ ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value")
+
+ // ErrIntrinsicGas is returned if the transaction is specified to use less gas
+ // than required to start the invocation.
+ ErrIntrinsicGas = errors.New("intrinsic gas too low")
+
+ // ErrGasLimit is returned if a transaction's requested gas limit exceeds the
+ // maximum allowance of the current block.
+ ErrGasLimit = errors.New("exceeds block gas limit")
+
+ // ErrNegativeValue is a sanity error to ensure noone is able to specify a
+ // transaction with a negative value.
+ ErrNegativeValue = errors.New("negative value")
+
+ // ErrOversizedData is returned if the input data of a transaction is greater
+ // than some meaningful limit a user might use. This is not a consensus error
+ // making the transaction invalid, rather a DOS protection.
+ ErrOversizedData = errors.New("oversized data")
+)
+
+var (
+ evictionInterval = time.Minute // Time interval to check for evictable transactions
+ statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
+)
+
+var (
+ // Metrics for the pending pool
+ pendingDiscardMeter = metrics.NewRegisteredMeter("txpool/pending/discard", nil)
+ pendingReplaceMeter = metrics.NewRegisteredMeter("txpool/pending/replace", nil)
+ pendingRateLimitMeter = metrics.NewRegisteredMeter("txpool/pending/ratelimit", nil) // Dropped due to rate limiting
+ pendingNofundsMeter = metrics.NewRegisteredMeter("txpool/pending/nofunds", nil) // Dropped due to out-of-funds
+
+ // Metrics for the queued pool
+ queuedDiscardMeter = metrics.NewRegisteredMeter("txpool/queued/discard", nil)
+ queuedReplaceMeter = metrics.NewRegisteredMeter("txpool/queued/replace", nil)
+ queuedRateLimitMeter = metrics.NewRegisteredMeter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting
+ queuedNofundsMeter = metrics.NewRegisteredMeter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds
+
+ // General tx metrics
+ validMeter = metrics.NewRegisteredMeter("txpool/valid", nil)
+ invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil)
+ underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil)
+
+ pendingCounter = metrics.NewRegisteredCounter("txpool/pending", nil)
+ queuedCounter = metrics.NewRegisteredCounter("txpool/queued", nil)
+ localCounter = metrics.NewRegisteredCounter("txpool/local", nil)
+)
+
+// TxStatus is the current status of a transaction as seen by the pool.
+type TxStatus uint
+
+const (
+ TxStatusUnknown TxStatus = iota
+ TxStatusQueued
+ TxStatusPending
+ TxStatusIncluded
+)
+
+// blockChain provides the state of blockchain and current gas limit to do
+// some pre checks in tx pool and event subscribers.
+type blockChain interface {
+ CurrentBlock() *types.Block
+ GetBlock(hash common.Hash, number uint64) *types.Block
+ StateAt(root common.Hash) (*state.StateDB, error)
+
+ SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
+}
+
+// TxPoolConfig are the configuration parameters of the transaction pool.
+type TxPoolConfig struct {
+ Locals []common.Address // Addresses that should be treated by default as local
+ NoLocals bool // Whether local transaction handling should be disabled
+ Journal string // Journal of local transactions to survive node restarts
+ Rejournal time.Duration // Time interval to regenerate the local transaction journal
+
+ PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
+ PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)
+
+ AccountSlots uint64 // Number of executable transaction slots guaranteed per account
+ GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts
+ AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
+ GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts
+
+ Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
+}
+
+// DefaultTxPoolConfig contains the default configurations for the transaction
+// pool.
+var DefaultTxPoolConfig = TxPoolConfig{
+ Journal: "transactions.rlp",
+ Rejournal: time.Hour,
+
+ PriceLimit: 1,
+ PriceBump: 10,
+
+ AccountSlots: 16,
+ GlobalSlots: 4096,
+ AccountQueue: 64,
+ GlobalQueue: 1024,
+
+ Lifetime: 3 * time.Hour,
+}
+
+// sanitize checks the provided user configurations and changes anything that's
+// unreasonable or unworkable.
+func (config *TxPoolConfig) sanitize() TxPoolConfig {
+ conf := *config
+ if conf.Rejournal < time.Second {
+ log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second)
+ conf.Rejournal = time.Second
+ }
+ if conf.PriceLimit < 1 {
+ log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultTxPoolConfig.PriceLimit)
+ conf.PriceLimit = DefaultTxPoolConfig.PriceLimit
+ }
+ if conf.PriceBump < 1 {
+ log.Warn("Sanitizing invalid txpool price bump", "provided", conf.PriceBump, "updated", DefaultTxPoolConfig.PriceBump)
+ conf.PriceBump = DefaultTxPoolConfig.PriceBump
+ }
+ if conf.AccountSlots < 1 {
+ log.Warn("Sanitizing invalid txpool account slots", "provided", conf.AccountSlots, "updated", DefaultTxPoolConfig.AccountSlots)
+ conf.AccountSlots = DefaultTxPoolConfig.AccountSlots
+ }
+ if conf.GlobalSlots < 1 {
+ log.Warn("Sanitizing invalid txpool global slots", "provided", conf.GlobalSlots, "updated", DefaultTxPoolConfig.GlobalSlots)
+ conf.GlobalSlots = DefaultTxPoolConfig.GlobalSlots
+ }
+ if conf.AccountQueue < 1 {
+ log.Warn("Sanitizing invalid txpool account queue", "provided", conf.AccountQueue, "updated", DefaultTxPoolConfig.AccountQueue)
+ conf.AccountQueue = DefaultTxPoolConfig.AccountQueue
+ }
+ if conf.GlobalQueue < 1 {
+ log.Warn("Sanitizing invalid txpool global queue", "provided", conf.GlobalQueue, "updated", DefaultTxPoolConfig.GlobalQueue)
+ conf.GlobalQueue = DefaultTxPoolConfig.GlobalQueue
+ }
+ if conf.Lifetime < 1 {
+ log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultTxPoolConfig.Lifetime)
+ conf.Lifetime = DefaultTxPoolConfig.Lifetime
+ }
+ return conf
+}
+
+// TxPool contains all currently known transactions. Transactions
+// enter the pool when they are received from the network or submitted
+// locally. They exit the pool when they are included in the blockchain.
+//
+// The pool separates processable transactions (which can be applied to the
+// current state) and future transactions. Transactions move between those
+// two states over time as they are received and processed.
+type TxPool struct {
+ config TxPoolConfig
+ chainconfig *params.ChainConfig
+ chain blockChain
+ gasPrice *big.Int
+ txFeed event.Feed
+ scope event.SubscriptionScope
+ signer types.Signer
+ mu sync.RWMutex
+
+ istanbul bool // Fork indicator whether we are in the istanbul stage.
+
+ currentState *state.StateDB // Current state in the blockchain head
+ pendingNonces *txNoncer // Pending state tracking virtual nonces
+ currentMaxGas uint64 // Current gas limit for transaction caps
+
+ locals *accountSet // Set of local transaction to exempt from eviction rules
+ journal *txJournal // Journal of local transaction to back up to disk
+
+ pending map[common.Address]*txList // All currently processable transactions
+ queue map[common.Address]*txList // Queued but non-processable transactions
+ beats map[common.Address]time.Time // Last heartbeat from each known account
+ all *txLookup // All transactions to allow lookups
+ priced *txPricedList // All transactions sorted by price
+
+ chainHeadCh chan ChainHeadEvent
+ chainHeadSub event.Subscription
+ reqResetCh chan *txpoolResetRequest
+ reqPromoteCh chan *accountSet
+ queueTxEventCh chan *types.Transaction
+ reorgDoneCh chan chan struct{}
+ reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
+ wg sync.WaitGroup // tracks loop, scheduleReorgLoop
+}
+
+type txpoolResetRequest struct {
+ oldHead, newHead *types.Header
+}
+
+// NewTxPool creates a new transaction pool to gather, sort and filter inbound
+// transactions from the network.
+func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
+ // Sanitize the input to ensure no vulnerable gas prices are set
+ config = (&config).sanitize()
+
+ // Create the transaction pool with its initial settings
+ pool := &TxPool{
+ config: config,
+ chainconfig: chainconfig,
+ chain: chain,
+ signer: types.NewEIP155Signer(chainconfig.ChainID),
+ pending: make(map[common.Address]*txList),
+ queue: make(map[common.Address]*txList),
+ beats: make(map[common.Address]time.Time),
+ all: newTxLookup(),
+ chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
+ reqResetCh: make(chan *txpoolResetRequest),
+ reqPromoteCh: make(chan *accountSet),
+ queueTxEventCh: make(chan *types.Transaction),
+ reorgDoneCh: make(chan chan struct{}),
+ reorgShutdownCh: make(chan struct{}),
+ gasPrice: new(big.Int).SetUint64(config.PriceLimit),
+ }
+ pool.locals = newAccountSet(pool.signer)
+ for _, addr := range config.Locals {
+ log.Info("Setting new local account", "address", addr)
+ pool.locals.add(addr)
+ }
+ pool.priced = newTxPricedList(pool.all)
+ pool.reset(nil, chain.CurrentBlock().Header())
+
+ // Start the reorg loop early so it can handle requests generated during journal loading.
+ pool.wg.Add(1)
+ go pool.scheduleReorgLoop()
+
+ // If local transactions and journaling is enabled, load from disk
+ if !config.NoLocals && config.Journal != "" {
+ pool.journal = newTxJournal(config.Journal)
+
+ if err := pool.journal.load(pool.AddLocals); err != nil {
+ log.Warn("Failed to load transaction journal", "err", err)
+ }
+ if err := pool.journal.rotate(pool.local()); err != nil {
+ log.Warn("Failed to rotate transaction journal", "err", err)
+ }
+ }
+
+ // Subscribe events from blockchain and start the main event loop.
+ pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
+ pool.wg.Add(1)
+ go pool.loop()
+
+ return pool
+}
+
+// loop is the transaction pool's main event loop, waiting for and reacting to
+// outside blockchain events as well as for various reporting and transaction
+// eviction events.
+func (pool *TxPool) loop() {
+ defer pool.wg.Done()
+
+ var (
+ prevPending, prevQueued, prevStales int
+ // Start the stats reporting and transaction eviction tickers
+ report = time.NewTicker(statsReportInterval)
+ evict = time.NewTicker(evictionInterval)
+ journal = time.NewTicker(pool.config.Rejournal)
+ // Track the previous head headers for transaction reorgs
+ head = pool.chain.CurrentBlock()
+ )
+ defer report.Stop()
+ defer evict.Stop()
+ defer journal.Stop()
+
+ for {
+ select {
+ // Handle ChainHeadEvent
+ case ev := <-pool.chainHeadCh:
+ if ev.Block != nil {
+ pool.requestReset(head.Header(), ev.Block.Header())
+ head = ev.Block
+ }
+
+ // System shutdown.
+ case <-pool.chainHeadSub.Err():
+ close(pool.reorgShutdownCh)
+ return
+
+ // Handle stats reporting ticks
+ case <-report.C:
+ pool.mu.RLock()
+ pending, queued := pool.stats()
+ stales := pool.priced.stales
+ pool.mu.RUnlock()
+
+ if pending != prevPending || queued != prevQueued || stales != prevStales {
+ log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
+ prevPending, prevQueued, prevStales = pending, queued, stales
+ }
+
+ // Handle inactive account transaction eviction
+ case <-evict.C:
+ pool.mu.Lock()
+ for addr := range pool.queue {
+ // Skip local transactions from the eviction mechanism
+ if pool.locals.contains(addr) {
+ continue
+ }
+ // Any non-locals old enough should be removed
+ if time.Since(pool.beats[addr]) > pool.config.Lifetime {
+ for _, tx := range pool.queue[addr].Flatten() {
+ pool.removeTx(tx.Hash(), true)
+ }
+ }
+ }
+ pool.mu.Unlock()
+
+ // Handle local transaction journal rotation
+ case <-journal.C:
+ if pool.journal != nil {
+ pool.mu.Lock()
+ if err := pool.journal.rotate(pool.local()); err != nil {
+ log.Warn("Failed to rotate local tx journal", "err", err)
+ }
+ pool.mu.Unlock()
+ }
+ }
+ }
+}
+
+// Stop terminates the transaction pool.
+func (pool *TxPool) Stop() {
+ // Unsubscribe all subscriptions registered from txpool
+ pool.scope.Close()
+
+ // Unsubscribe subscriptions registered from blockchain
+ pool.chainHeadSub.Unsubscribe()
+ pool.wg.Wait()
+
+ if pool.journal != nil {
+ pool.journal.close()
+ }
+ log.Info("Transaction pool stopped")
+}
+
+// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
+// starts sending event to the given channel.
+func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
+ return pool.scope.Track(pool.txFeed.Subscribe(ch))
+}
+
+// GasPrice returns the current gas price enforced by the transaction pool.
+func (pool *TxPool) GasPrice() *big.Int {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ return new(big.Int).Set(pool.gasPrice)
+}
+
+// SetGasPrice updates the minimum price required by the transaction pool for a
+// new transaction, and drops all transactions below this threshold.
+func (pool *TxPool) SetGasPrice(price *big.Int) {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ pool.gasPrice = price
+ for _, tx := range pool.priced.Cap(price, pool.locals) {
+ pool.removeTx(tx.Hash(), false)
+ }
+ log.Info("Transaction pool price threshold updated", "price", price)
+}
+
+// Nonce returns the next nonce of an account, with all transactions executable
+// by the pool already applied on top.
+func (pool *TxPool) Nonce(addr common.Address) uint64 {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ return pool.pendingNonces.get(addr)
+}
+
+// Stats retrieves the current pool stats, namely the number of pending and the
+// number of queued (non-executable) transactions.
+func (pool *TxPool) Stats() (int, int) {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ return pool.stats()
+}
+
+// stats retrieves the current pool stats, namely the number of pending and the
+// number of queued (non-executable) transactions.
+func (pool *TxPool) stats() (int, int) {
+ pending := 0
+ for _, list := range pool.pending {
+ pending += list.Len()
+ }
+ queued := 0
+ for _, list := range pool.queue {
+ queued += list.Len()
+ }
+ return pending, queued
+}
+
+// Content retrieves the data content of the transaction pool, returning all the
+// pending as well as queued transactions, grouped by account and sorted by nonce.
+func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ pending := make(map[common.Address]types.Transactions)
+ for addr, list := range pool.pending {
+ pending[addr] = list.Flatten()
+ }
+ queued := make(map[common.Address]types.Transactions)
+ for addr, list := range pool.queue {
+ queued[addr] = list.Flatten()
+ }
+ return pending, queued
+}
+
+// Pending retrieves all currently processable transactions, grouped by origin
+// account and sorted by nonce. The returned transaction set is a copy and can be
+// freely modified by calling code.
+func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ pending := make(map[common.Address]types.Transactions)
+ for addr, list := range pool.pending {
+ pending[addr] = list.Flatten()
+ }
+ return pending, nil
+}
+
+// Locals retrieves the accounts currently considered local by the pool.
+func (pool *TxPool) Locals() []common.Address {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ return pool.locals.flatten()
+}
+
+// local retrieves all currently known local transactions, grouped by origin
+// account and sorted by nonce. The returned transaction set is a copy and can be
+// freely modified by calling code.
+func (pool *TxPool) local() map[common.Address]types.Transactions {
+ txs := make(map[common.Address]types.Transactions)
+ for addr := range pool.locals.accounts {
+ if pending := pool.pending[addr]; pending != nil {
+ txs[addr] = append(txs[addr], pending.Flatten()...)
+ }
+ if queued := pool.queue[addr]; queued != nil {
+ txs[addr] = append(txs[addr], queued.Flatten()...)
+ }
+ }
+ return txs
+}
+
+// validateTx checks whether a transaction is valid according to the consensus
+// rules and adheres to some heuristic limits of the local node (price and size).
+func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
+ // Heuristic limit, reject transactions over 32KB to prevent DOS attacks
+ if tx.Size() > 32*1024 {
+ return ErrOversizedData
+ }
+ // Transactions can't be negative. This may never happen using RLP decoded
+ // transactions but may occur if you create a transaction using the RPC.
+ if tx.Value().Sign() < 0 {
+ return ErrNegativeValue
+ }
+ // Ensure the transaction doesn't exceed the current block limit gas.
+ if pool.currentMaxGas < tx.Gas() {
+ return ErrGasLimit
+ }
+ // Make sure the transaction is signed properly
+ from, err := types.Sender(pool.signer, tx)
+ if err != nil {
+ return ErrInvalidSender
+ }
+ // Drop non-local transactions under our own minimal accepted gas price
+ local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
+ if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
+ return ErrUnderpriced
+ }
+ // Ensure the transaction adheres to nonce ordering
+ if pool.currentState.GetNonce(from) > tx.Nonce() {
+ return ErrNonceTooLow
+ }
+ // Transactor should have enough funds to cover the costs
+ // cost == V + GP * GL
+ if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
+ return ErrInsufficientFunds
+ }
+ // Ensure the transaction has more gas than the basic tx fee.
+ intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, true, pool.istanbul)
+ if err != nil {
+ return err
+ }
+ if tx.Gas() < intrGas {
+ return ErrIntrinsicGas
+ }
+ return nil
+}
+
+// add validates a transaction and inserts it into the non-executable queue for later
+// pending promotion and execution. If the transaction is a replacement for an already
+// pending or queued one, it overwrites the previous transaction if its price is higher.
+//
+// If a newly added transaction is marked as local, its sending account will be
+// whitelisted, preventing any associated transaction from being dropped out of the pool
+// due to pricing constraints.
+func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
+ // If the transaction is already known, discard it
+ hash := tx.Hash()
+ if pool.all.Get(hash) != nil {
+ log.Trace("Discarding already known transaction", "hash", hash)
+ return false, fmt.Errorf("known transaction: %x", hash)
+ }
+
+ // If the transaction fails basic validation, discard it
+ if err := pool.validateTx(tx, local); err != nil {
+ log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
+ invalidTxMeter.Mark(1)
+ return false, err
+ }
+
+ // If the transaction pool is full, discard underpriced transactions
+ if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
+ // If the new transaction is underpriced, don't accept it
+ if !local && pool.priced.Underpriced(tx, pool.locals) {
+ log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
+ underpricedTxMeter.Mark(1)
+ return false, ErrUnderpriced
+ }
+ // New transaction is better than our worse ones, make room for it
+ drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
+ for _, tx := range drop {
+ log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
+ underpricedTxMeter.Mark(1)
+ pool.removeTx(tx.Hash(), false)
+ }
+ }
+
+ // Try to replace an existing transaction in the pending pool
+ from, _ := types.Sender(pool.signer, tx) // already validated
+ if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
+ // Nonce already pending, check if required price bump is met
+ inserted, old := list.Add(tx, pool.config.PriceBump)
+ if !inserted {
+ pendingDiscardMeter.Mark(1)
+ return false, ErrReplaceUnderpriced
+ }
+ // New transaction is better, replace old one
+ if old != nil {
+ pool.all.Remove(old.Hash())
+ pool.priced.Removed(1)
+ pendingReplaceMeter.Mark(1)
+ }
+ pool.all.Add(tx)
+ pool.priced.Put(tx)
+ pool.journalTx(from, tx)
+ pool.queueTxEvent(tx)
+ log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
+ return old != nil, nil
+ }
+
+ // New transaction isn't replacing a pending one, push into queue
+ replaced, err = pool.enqueueTx(hash, tx)
+ if err != nil {
+ return false, err
+ }
+
+ // Mark local addresses and journal local transactions
+ if local {
+ if !pool.locals.contains(from) {
+ log.Info("Setting new local account", "address", from)
+ pool.locals.add(from)
+ }
+ }
+ if local || pool.locals.contains(from) {
+ localCounter.Inc(1)
+ }
+ pool.journalTx(from, tx)
+
+ log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
+ return replaced, nil
+}
+
+// enqueueTx inserts a new transaction into the non-executable transaction queue.
+//
+// Note, this method assumes the pool lock is held!
+func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
+ // Try to insert the transaction into the future queue
+ from, _ := types.Sender(pool.signer, tx) // already validated
+ if pool.queue[from] == nil {
+ pool.queue[from] = newTxList(false)
+ }
+ inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
+ if !inserted {
+ // An older transaction was better, discard this
+ queuedDiscardMeter.Mark(1)
+ return false, ErrReplaceUnderpriced
+ }
+ // Discard any previous transaction and mark this
+ if old != nil {
+ pool.all.Remove(old.Hash())
+ pool.priced.Removed(1)
+ queuedReplaceMeter.Mark(1)
+ } else {
+ // Nothing was replaced, bump the queued counter
+ queuedCounter.Inc(1)
+ }
+ if pool.all.Get(hash) == nil {
+ pool.all.Add(tx)
+ pool.priced.Put(tx)
+ }
+ return old != nil, nil
+}
+
+// journalTx adds the specified transaction to the local disk journal if it is
+// deemed to have been sent from a local account.
+func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
+ // Only journal if it's enabled and the transaction is local
+ if pool.journal == nil || !pool.locals.contains(from) {
+ return
+ }
+ if err := pool.journal.insert(tx); err != nil {
+ log.Warn("Failed to journal local transaction", "err", err)
+ }
+}
+
+// promoteTx adds a transaction to the pending (processable) list of transactions
+// and returns whether it was inserted or an older was better.
+//
+// Note, this method assumes the pool lock is held!
+func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool {
+ // Try to insert the transaction into the pending queue
+ if pool.pending[addr] == nil {
+ pool.pending[addr] = newTxList(true)
+ }
+ list := pool.pending[addr]
+
+ inserted, old := list.Add(tx, pool.config.PriceBump)
+ if !inserted {
+ // An older transaction was better, discard this
+ pool.all.Remove(hash)
+ pool.priced.Removed(1)
+
+ pendingDiscardMeter.Mark(1)
+ return false
+ }
+ // Otherwise discard any previous transaction and mark this
+ if old != nil {
+ pool.all.Remove(old.Hash())
+ pool.priced.Removed(1)
+
+ pendingReplaceMeter.Mark(1)
+ } else {
+ // Nothing was replaced, bump the pending counter
+ pendingCounter.Inc(1)
+ }
+ // Failsafe to work around direct pending inserts (tests)
+ if pool.all.Get(hash) == nil {
+ pool.all.Add(tx)
+ pool.priced.Put(tx)
+ }
+ // Set the potentially new pending nonce and notify any subsystems of the new tx
+ pool.beats[addr] = time.Now()
+ pool.pendingNonces.set(addr, tx.Nonce()+1)
+
+ return true
+}
+
+// AddLocals enqueues a batch of transactions into the pool if they are valid, marking the
+// senders as a local ones, ensuring they go around the local pricing constraints.
+//
+// This method is used to add transactions from the RPC API and performs synchronous pool
+// reorganization and event propagation.
+func (pool *TxPool) AddLocals(txs []*types.Transaction) []error {
+ return pool.addTxs(txs, !pool.config.NoLocals, true)
+}
+
+// AddLocal enqueues a single local transaction into the pool if it is valid. This is
+// a convenience wrapper aroundd AddLocals.
+func (pool *TxPool) AddLocal(tx *types.Transaction) error {
+ errs := pool.AddLocals([]*types.Transaction{tx})
+ return errs[0]
+}
+
+// AddRemotes enqueues a batch of transactions into the pool if they are valid. If the
+// senders are not among the locally tracked ones, full pricing constraints will apply.
+//
+// This method is used to add transactions from the p2p network and does not wait for pool
+// reorganization and internal event propagation.
+func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
+ return pool.addTxs(txs, false, false)
+}
+
+// This is like AddRemotes, but waits for pool reorganization. Tests use this method.
+func (pool *TxPool) AddRemotesSync(txs []*types.Transaction) []error {
+ return pool.addTxs(txs, false, true)
+}
+
+// This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method.
+func (pool *TxPool) addRemoteSync(tx *types.Transaction) error {
+ errs := pool.AddRemotesSync([]*types.Transaction{tx})
+ return errs[0]
+}
+
+// AddRemote enqueues a single transaction into the pool if it is valid. This is a convenience
+// wrapper around AddRemotes.
+//
+// Deprecated: use AddRemotes
+func (pool *TxPool) AddRemote(tx *types.Transaction) error {
+ errs := pool.AddRemotes([]*types.Transaction{tx})
+ return errs[0]
+}
+
+// addTxs attempts to queue a batch of transactions if they are valid.
+func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
+ // Cache senders in transactions before obtaining lock (pool.signer is immutable)
+ for _, tx := range txs {
+ types.Sender(pool.signer, tx)
+ }
+
+ pool.mu.Lock()
+ errs, dirtyAddrs := pool.addTxsLocked(txs, local)
+ pool.mu.Unlock()
+
+ done := pool.requestPromoteExecutables(dirtyAddrs)
+ if sync {
+ <-done
+ }
+ return errs
+}
+
+// addTxsLocked attempts to queue a batch of transactions if they are valid.
+// The transaction pool lock must be held.
+func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
+ dirty := newAccountSet(pool.signer)
+ errs := make([]error, len(txs))
+ for i, tx := range txs {
+ replaced, err := pool.add(tx, local)
+ errs[i] = err
+ if err == nil && !replaced {
+ dirty.addTx(tx)
+ }
+ }
+ validMeter.Mark(int64(len(dirty.accounts)))
+ return errs, dirty
+}
+
+// Status returns the status (unknown/pending/queued) of a batch of transactions
+// identified by their hashes.
+func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ status := make([]TxStatus, len(hashes))
+ for i, hash := range hashes {
+ if tx := pool.all.Get(hash); tx != nil {
+ from, _ := types.Sender(pool.signer, tx) // already validated
+ if pool.pending[from] != nil && pool.pending[from].txs.items[tx.Nonce()] != nil {
+ status[i] = TxStatusPending
+ } else {
+ status[i] = TxStatusQueued
+ }
+ }
+ }
+ return status
+}
+
+// Get returns a transaction if it is contained in the pool and nil otherwise.
+func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
+ return pool.all.Get(hash)
+}
+
+// removeTx removes a single transaction from the queue, moving all subsequent
+// transactions back to the future queue.
+func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
+ // Fetch the transaction we wish to delete
+ tx := pool.all.Get(hash)
+ if tx == nil {
+ return
+ }
+ addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
+
+ // Remove it from the list of known transactions
+ pool.all.Remove(hash)
+ if outofbound {
+ pool.priced.Removed(1)
+ }
+ if pool.locals.contains(addr) {
+ localCounter.Dec(1)
+ }
+ // Remove the transaction from the pending lists and reset the account nonce
+ if pending := pool.pending[addr]; pending != nil {
+ if removed, invalids := pending.Remove(tx); removed {
+ // If no more pending transactions are left, remove the list
+ if pending.Empty() {
+ delete(pool.pending, addr)
+ delete(pool.beats, addr)
+ }
+ // Postpone any invalidated transactions
+ for _, tx := range invalids {
+ pool.enqueueTx(tx.Hash(), tx)
+ }
+ // Update the account nonce if needed
+ pool.pendingNonces.setIfLower(addr, tx.Nonce())
+ // Reduce the pending counter
+ pendingCounter.Dec(int64(1 + len(invalids)))
+ return
+ }
+ }
+ // Transaction is in the future queue
+ if future := pool.queue[addr]; future != nil {
+ if removed, _ := future.Remove(tx); removed {
+ // Reduce the queued counter
+ queuedCounter.Dec(1)
+ }
+ if future.Empty() {
+ delete(pool.queue, addr)
+ }
+ }
+}
+
+// requestPromoteExecutables requests a pool reset to the new head block.
+// The returned channel is closed when the reset has occurred.
+func (pool *TxPool) requestReset(oldHead *types.Header, newHead *types.Header) chan struct{} {
+ select {
+ case pool.reqResetCh <- &txpoolResetRequest{oldHead, newHead}:
+ return <-pool.reorgDoneCh
+ case <-pool.reorgShutdownCh:
+ return pool.reorgShutdownCh
+ }
+}
+
+// requestPromoteExecutables requests transaction promotion checks for the given addresses.
+// The returned channel is closed when the promotion checks have occurred.
+func (pool *TxPool) requestPromoteExecutables(set *accountSet) chan struct{} {
+ select {
+ case pool.reqPromoteCh <- set:
+ return <-pool.reorgDoneCh
+ case <-pool.reorgShutdownCh:
+ return pool.reorgShutdownCh
+ }
+}
+
+// queueTxEvent enqueues a transaction event to be sent in the next reorg run.
+func (pool *TxPool) queueTxEvent(tx *types.Transaction) {
+ select {
+ case pool.queueTxEventCh <- tx:
+ case <-pool.reorgShutdownCh:
+ }
+}
+
+// scheduleReorgLoop schedules runs of reset and promoteExecutables. Code above should not
+// call those methods directly, but request them being run using requestReset and
+// requestPromoteExecutables instead.
+func (pool *TxPool) scheduleReorgLoop() {
+ defer pool.wg.Done()
+
+ var (
+ curDone chan struct{} // non-nil while runReorg is active
+ nextDone = make(chan struct{})
+ launchNextRun bool
+ reset *txpoolResetRequest
+ dirtyAccounts *accountSet
+ queuedEvents = make(map[common.Address]*txSortedMap)
+ )
+ for {
+ // Launch next background reorg if needed
+ if curDone == nil && launchNextRun {
+ // Run the background reorg and announcements
+ go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents)
+
+ // Prepare everything for the next round of reorg
+ curDone, nextDone = nextDone, make(chan struct{})
+ launchNextRun = false
+
+ reset, dirtyAccounts = nil, nil
+ queuedEvents = make(map[common.Address]*txSortedMap)
+ }
+
+ select {
+ case req := <-pool.reqResetCh:
+ // Reset request: update head if request is already pending.
+ if reset == nil {
+ reset = req
+ } else {
+ reset.newHead = req.newHead
+ }
+ launchNextRun = true
+ pool.reorgDoneCh <- nextDone
+
+ case req := <-pool.reqPromoteCh:
+ // Promote request: update address set if request is already pending.
+ if dirtyAccounts == nil {
+ dirtyAccounts = req
+ } else {
+ dirtyAccounts.merge(req)
+ }
+ launchNextRun = true
+ pool.reorgDoneCh <- nextDone
+
+ case tx := <-pool.queueTxEventCh:
+ // Queue up the event, but don't schedule a reorg. It's up to the caller to
+ // request one later if they want the events sent.
+ addr, _ := types.Sender(pool.signer, tx)
+ if _, ok := queuedEvents[addr]; !ok {
+ queuedEvents[addr] = newTxSortedMap()
+ }
+ queuedEvents[addr].Put(tx)
+
+ case <-curDone:
+ curDone = nil
+
+ case <-pool.reorgShutdownCh:
+ // Wait for current run to finish.
+ if curDone != nil {
+ <-curDone
+ }
+ close(nextDone)
+ return
+ }
+ }
+}
+
+// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
+func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*txSortedMap) {
+ defer close(done)
+
+ var promoteAddrs []common.Address
+ if dirtyAccounts != nil {
+ promoteAddrs = dirtyAccounts.flatten()
+ }
+ pool.mu.Lock()
+ if reset != nil {
+ // Reset from the old head to the new, rescheduling any reorged transactions
+ pool.reset(reset.oldHead, reset.newHead)
+
+ // Nonces were reset, discard any events that became stale
+ for addr := range events {
+ events[addr].Forward(pool.pendingNonces.get(addr))
+ if events[addr].Len() == 0 {
+ delete(events, addr)
+ }
+ }
+ // Reset needs promote for all addresses
+ promoteAddrs = promoteAddrs[:0]</