diff options
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r-- | core/tx_pool.go | 255 |
1 files changed, 168 insertions, 87 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go index 5b2a3c0..848627a 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -18,7 +18,6 @@ package core import ( "errors" - "fmt" "math" "math/big" "sort" @@ -28,26 +27,38 @@ import ( "github.com/ava-labs/coreth/core/state" "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/params" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/common/prque" - "github.com/ava-labs/go-ethereum/event" - "github.com/ava-labs/go-ethereum/log" - "github.com/ava-labs/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/prque" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" ) const ( // chainHeadChanSize is the size of channel listening to ChainHeadEvent. chainHeadChanSize = 10 + + // txSlotSize is used to calculate how many data slots a single transaction + // takes up based on its size. The slots are used as DoS protection, ensuring + // that validating a new transaction remains a constant operation (in reality + // O(maxslots), where max slots are 4 currently). + txSlotSize = 32 * 1024 + + // txMaxSize is the maximum size a single transaction can have. This field has + // non-trivial consequences: larger transactions are significantly harder and + // more expensive to propagate; larger transactions also take more resources + // to validate whether they fit into the pool or not. + txMaxSize = 4 * txSlotSize // 128KB ) var ( + // ErrAlreadyKnown is returned if the transactions is already contained + // within the pool. + ErrAlreadyKnown = errors.New("already known") + // ErrInvalidSender is returned if the transaction contains an invalid signature. ErrInvalidSender = errors.New("invalid sender") - // ErrNonceTooLow is returned if the nonce of a transaction is lower than the - // one present in the local chain. - ErrNonceTooLow = errors.New("nonce too low") - // ErrUnderpriced is returned if a transaction's gas price is below the minimum // configured for the transaction pool. ErrUnderpriced = errors.New("transaction underpriced") @@ -56,19 +67,11 @@ var ( // with a different one without the required price bump. ErrReplaceUnderpriced = errors.New("replacement transaction underpriced") - // ErrInsufficientFunds is returned if the total cost of executing a transaction - // is higher than the balance of the user's account. - ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value") - - // ErrIntrinsicGas is returned if the transaction is specified to use less gas - // than required to start the invocation. - ErrIntrinsicGas = errors.New("intrinsic gas too low") - // ErrGasLimit is returned if a transaction's requested gas limit exceeds the // maximum allowance of the current block. ErrGasLimit = errors.New("exceeds block gas limit") - // ErrNegativeValue is a sanity error to ensure noone is able to specify a + // ErrNegativeValue is a sanity error to ensure no one is able to specify a // transaction with a negative value. ErrNegativeValue = errors.New("negative value") @@ -95,15 +98,18 @@ var ( queuedReplaceMeter = metrics.NewRegisteredMeter("txpool/queued/replace", nil) queuedRateLimitMeter = metrics.NewRegisteredMeter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting queuedNofundsMeter = metrics.NewRegisteredMeter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds + queuedEvictionMeter = metrics.NewRegisteredMeter("txpool/queued/eviction", nil) // Dropped due to lifetime // General tx metrics - validMeter = metrics.NewRegisteredMeter("txpool/valid", nil) + knownTxMeter = metrics.NewRegisteredMeter("txpool/known", nil) + validTxMeter = metrics.NewRegisteredMeter("txpool/valid", nil) invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil) underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil) - pendingCounter = metrics.NewRegisteredCounter("txpool/pending", nil) - queuedCounter = metrics.NewRegisteredCounter("txpool/queued", nil) - localCounter = metrics.NewRegisteredCounter("txpool/local", nil) + pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil) + queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil) + localGauge = metrics.NewRegisteredGauge("txpool/local", nil) + slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil) ) // TxStatus is the current status of a transaction as seen by the pool. @@ -359,9 +365,11 @@ func (pool *TxPool) loop() { } // Any non-locals old enough should be removed if time.Since(pool.beats[addr]) > pool.config.Lifetime { - for _, tx := range pool.queue[addr].Flatten() { + list := pool.queue[addr].Flatten() + for _, tx := range list { pool.removeTx(tx.Hash(), true) } + queuedEvictionMeter.Mark(int64(len(list))) } } pool.mu.Unlock() @@ -517,8 +525,8 @@ func (pool *TxPool) local() map[common.Address]types.Transactions { // validateTx checks whether a transaction is valid according to the consensus // rules and adheres to some heuristic limits of the local node (price and size). func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { - // Heuristic limit, reject transactions over 32KB to prevent DOS attacks - if tx.Size() > 32*1024 { + // Reject transactions over defined size to prevent DOS attacks + if uint64(tx.Size()) > txMaxSize { return ErrOversizedData } // Transactions can't be negative. This may never happen using RLP decoded @@ -537,7 +545,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { } // Drop non-local transactions under our own minimal accepted gas price local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network - if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { + if !local && tx.GasPriceIntCmp(pool.gasPrice) < 0 { return ErrUnderpriced } // Ensure the transaction adheres to nonce ordering @@ -572,16 +580,15 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e hash := tx.Hash() if pool.all.Get(hash) != nil { log.Trace("Discarding already known transaction", "hash", hash) - return false, fmt.Errorf("known transaction: %x", hash) + knownTxMeter.Mark(1) + return false, ErrAlreadyKnown } - // If the transaction fails basic validation, discard it if err := pool.validateTx(tx, local); err != nil { log.Trace("Discarding invalid transaction", "hash", hash, "err", err) invalidTxMeter.Mark(1) return false, err } - // If the transaction pool is full, discard underpriced transactions if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue { // If the new transaction is underpriced, don't accept it @@ -591,14 +598,13 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e return false, ErrUnderpriced } // New transaction is better than our worse ones, make room for it - drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals) + drop := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), pool.locals) for _, tx := range drop { log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice()) underpricedTxMeter.Mark(1) pool.removeTx(tx.Hash(), false) } } - // Try to replace an existing transaction in the pending pool from, _ := types.Sender(pool.signer, tx) // already validated if list := pool.pending[from]; list != nil && list.Overlaps(tx) { @@ -619,15 +625,16 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e pool.journalTx(from, tx) pool.queueTxEvent(tx) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) + + // Successful promotion, bump the heartbeat + pool.beats[from] = time.Now() return old != nil, nil } - // New transaction isn't replacing a pending one, push into queue replaced, err = pool.enqueueTx(hash, tx) if err != nil { return false, err } - // Mark local addresses and journal local transactions if local { if !pool.locals.contains(from) { @@ -636,7 +643,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e } } if local || pool.locals.contains(from) { - localCounter.Inc(1) + localGauge.Inc(1) } pool.journalTx(from, tx) @@ -666,12 +673,16 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er queuedReplaceMeter.Mark(1) } else { // Nothing was replaced, bump the queued counter - queuedCounter.Inc(1) + queuedGauge.Inc(1) } if pool.all.Get(hash) == nil { pool.all.Add(tx) pool.priced.Put(tx) } + // If we never record the heartbeat, do it right now. + if _, exist := pool.beats[from]; !exist { + pool.beats[from] = time.Now() + } return old != nil, nil } @@ -703,7 +714,6 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // An older transaction was better, discard this pool.all.Remove(hash) pool.priced.Removed(1) - pendingDiscardMeter.Mark(1) return false } @@ -711,11 +721,10 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T if old != nil { pool.all.Remove(old.Hash()) pool.priced.Removed(1) - pendingReplaceMeter.Mark(1) } else { // Nothing was replaced, bump the pending counter - pendingCounter.Inc(1) + pendingGauge.Inc(1) } // Failsafe to work around direct pending inserts (tests) if pool.all.Get(hash) == nil { @@ -723,9 +732,10 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T pool.priced.Put(tx) } // Set the potentially new pending nonce and notify any subsystems of the new tx - pool.beats[addr] = time.Now() pool.pendingNonces.set(addr, tx.Nonce()+1) + // Successful promotion, bump the heartbeat + pool.beats[addr] = time.Now() return true } @@ -776,15 +786,47 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error { // addTxs attempts to queue a batch of transactions if they are valid. func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { - // Cache senders in transactions before obtaining lock (pool.signer is immutable) - for _, tx := range txs { - types.Sender(pool.signer, tx) + // Filter out known ones without obtaining the pool lock or recovering signatures + var ( + errs = make([]error, len(txs)) + news = make([]*types.Transaction, 0, len(txs)) + ) + for i, tx := range txs { + // If the transaction is known, pre-set the error slot + if pool.all.Get(tx.Hash()) != nil { + errs[i] = ErrAlreadyKnown + knownTxMeter.Mark(1) + continue + } + // Exclude transactions with invalid signatures as soon as + // possible and cache senders in transactions before + // obtaining lock + _, err := types.Sender(pool.signer, tx) + if err != nil { + errs[i] = ErrInvalidSender + invalidTxMeter.Mark(1) + continue + } + // Accumulate all unknown transactions for deeper processing + news = append(news, tx) + } + if len(news) == 0 { + return errs } + // Process all the new transaction and merge any errors into the original slice pool.mu.Lock() - errs, dirtyAddrs := pool.addTxsLocked(txs, local) + newErrs, dirtyAddrs := pool.addTxsLocked(news, local) pool.mu.Unlock() + var nilSlot = 0 + for _, err := range newErrs { + for errs[nilSlot] != nil { + nilSlot++ + } + errs[nilSlot] = err + } + // Reorg the pool internals if needed and return done := pool.requestPromoteExecutables(dirtyAddrs) if sync { <-done @@ -804,26 +846,29 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, dirty.addTx(tx) } } - validMeter.Mark(int64(len(dirty.accounts))) + validTxMeter.Mark(int64(len(dirty.accounts))) return errs, dirty } // Status returns the status (unknown/pending/queued) of a batch of transactions // identified by their hashes. func (pool *TxPool) Status(hashes []common.Hash) []TxStatus { - pool.mu.RLock() - defer pool.mu.RUnlock() - status := make([]TxStatus, len(hashes)) for i, hash := range hashes { - if tx := pool.all.Get(hash); tx != nil { - from, _ := types.Sender(pool.signer, tx) // already validated - if pool.pending[from] != nil && pool.pending[from].txs.items[tx.Nonce()] != nil { - status[i] = TxStatusPending - } else { - status[i] = TxStatusQueued - } + tx := pool.Get(hash) + if tx == nil { + continue + } + from, _ := types.Sender(pool.signer, tx) // already validated + pool.mu.RLock() + if txList := pool.pending[from]; txList != nil && txList.txs.items[tx.Nonce()] != nil { + status[i] = TxStatusPending + } else if txList := pool.queue[from]; txList != nil && txList.txs.items[tx.Nonce()] != nil { + status[i] = TxStatusQueued } + // implicit else: the tx may have been included into a block between + // checking pool.Get and obtaining the lock. In that case, TxStatusUnknown is correct + pool.mu.RUnlock() } return status } @@ -833,6 +878,12 @@ func (pool *TxPool) Get(hash common.Hash) *types.Transaction { return pool.all.Get(hash) } +// Has returns an indicator whether txpool has a transaction cached with the +// given hash. +func (pool *TxPool) Has(hash common.Hash) bool { + return pool.all.Get(hash) != nil +} + // removeTx removes a single transaction from the queue, moving all subsequent // transactions back to the future queue. func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { @@ -849,7 +900,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { pool.priced.Removed(1) } if pool.locals.contains(addr) { - localCounter.Dec(1) + localGauge.Dec(1) } // Remove the transaction from the pending lists and reset the account nonce if pending := pool.pending[addr]; pending != nil { @@ -857,7 +908,6 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { // If no more pending transactions are left, remove the list if pending.Empty() { delete(pool.pending, addr) - delete(pool.beats, addr) } // Postpone any invalidated transactions for _, tx := range invalids { @@ -866,7 +916,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { // Update the account nonce if needed pool.pendingNonces.setIfLower(addr, tx.Nonce()) // Reduce the pending counter - pendingCounter.Dec(int64(1 + len(invalids))) + pendingGauge.Dec(int64(1 + len(invalids))) return } } @@ -874,10 +924,11 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { if future := pool.queue[addr]; future != nil { if removed, _ := future.Remove(tx); removed { // Reduce the queued counter - queuedCounter.Dec(1) + queuedGauge.Dec(1) } if future.Empty() { delete(pool.queue, addr) + delete(pool.beats, addr) } } } @@ -989,7 +1040,10 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt defer close(done) var promoteAddrs []common.Address - if dirtyAccounts != nil { + if dirtyAccounts != nil && reset == nil { + // Only dirty accounts need to be promoted, unless we're resetting. + // For resets, all addresses in the tx queue will be promoted and + // the flatten operation can be avoided. promoteAddrs = dirtyAccounts.flatten() } pool.mu.Lock() @@ -1005,20 +1059,14 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt } } // Reset needs promote for all addresses - promoteAddrs = promoteAddrs[:0] + promoteAddrs = make([]common.Address, 0, len(pool.queue)) for addr := range pool.queue { promoteAddrs = append(promoteAddrs, addr) } } // Check for pending transactions for every account that sent new ones promoted := pool.promoteExecutables(promoteAddrs) - for _, tx := range promoted { - addr, _ := types.Sender(pool.signer, tx) - if _, ok := events[addr]; !ok { - events[addr] = newTxSortedMap() - } - events[addr].Put(tx) - } + // If a new block appeared, validate the pool of pending transactions. This will // remove any transaction that has been included in the block or was invalidated // because of another transaction (e.g. higher gas price). @@ -1031,12 +1079,19 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt // Update all accounts to the latest known pending nonce for addr, list := range pool.pending { - txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway - pool.pendingNonces.set(addr, txs[len(txs)-1].Nonce()+1) + highestPending := list.LastElement() + pool.pendingNonces.set(addr, highestPending.Nonce()+1) } pool.mu.Unlock() // Notify subsystems for newly added transactions + for _, tx := range promoted { + addr, _ := types.Sender(pool.signer, tx) + if _, ok := events[addr]; !ok { + events[addr] = newTxSortedMap() + } + events[addr].Put(tx) + } if len(events) > 0 { var txs []*types.Transaction for _, set := range events { @@ -1152,15 +1207,15 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans for _, tx := range forwards { hash := tx.Hash() pool.all.Remove(hash) - log.Trace("Removed old queued transaction", "hash", hash) } + log.Trace("Removed old queued transactions", "count", len(forwards)) // Drop all transactions that are too costly (low balance or out of gas) drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() pool.all.Remove(hash) - log.Trace("Removed unpayable queued transaction", "hash", hash) } + log.Trace("Removed unpayable queued transactions", "count", len(drops)) queuedNofundsMeter.Mark(int64(len(drops))) // Gather all executable transactions and promote them @@ -1168,11 +1223,11 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans for _, tx := range readies { hash := tx.Hash() if pool.promoteTx(addr, hash, tx) { - log.Trace("Promoting queued transaction", "hash", hash) promoted = append(promoted, tx) } } - queuedCounter.Dec(int64(len(readies))) + log.Trace("Promoted queued transactions", "count", len(promoted)) + queuedGauge.Dec(int64(len(readies))) // Drop all transactions over the allowed limit var caps types.Transactions @@ -1187,13 +1242,14 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans } // Mark all the items dropped as removed pool.priced.Removed(len(forwards) + len(drops) + len(caps)) - queuedCounter.Dec(int64(len(forwards) + len(drops) + len(caps))) + queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) if pool.locals.contains(addr) { - localCounter.Dec(int64(len(forwards) + len(drops) + len(caps))) + localGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) } // Delete the entire queue entry if it became empty. if list.Empty() { delete(pool.queue, addr) + delete(pool.beats, addr) } } return promoted @@ -1248,9 +1304,9 @@ func (pool *TxPool) truncatePending() { log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } pool.priced.Removed(len(caps)) - pendingCounter.Dec(int64(len(caps))) + pendingGauge.Dec(int64(len(caps))) if pool.locals.contains(offenders[i]) { - localCounter.Dec(int64(len(caps))) + localGauge.Dec(int64(len(caps))) } pending-- } @@ -1275,9 +1331,9 @@ func (pool *TxPool) truncatePending() { log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } pool.priced.Removed(len(caps)) - pendingCounter.Dec(int64(len(caps))) + pendingGauge.Dec(int64(len(caps))) if pool.locals.contains(addr) { - localCounter.Dec(int64(len(caps))) + localGauge.Dec(int64(len(caps))) } pending-- } @@ -1361,9 +1417,9 @@ func (pool *TxPool) demoteUnexecutables() { log.Trace("Demoting pending transaction", "hash", hash) pool.enqueueTx(hash, tx) } - pendingCounter.Dec(int64(len(olds) + len(drops) + len(invalids))) + pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) if pool.locals.contains(addr) { - localCounter.Dec(int64(len(olds) + len(drops) + len(invalids))) + localGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) } // If there's a gap in front, alert (should never happen) and postpone all transactions if list.Len() > 0 && list.txs.Get(nonce) == nil { @@ -1373,12 +1429,13 @@ func (pool *TxPool) demoteUnexecutables() { log.Error("Demoting invalidated transaction", "hash", hash) pool.enqueueTx(hash, tx) } - pendingCounter.Dec(int64(len(gapped))) + pendingGauge.Dec(int64(len(gapped))) + // This might happen in a reorg, so log it to the metering + blockReorgInvalidatedTx.Mark(int64(len(gapped))) } - // Delete the entire queue entry if it became empty. + // Delete the entire pending entry if it became empty. if list.Empty() { delete(pool.pending, addr) - delete(pool.beats, addr) } } } @@ -1422,6 +1479,10 @@ func (as *accountSet) contains(addr common.Address) bool { return exist } +func (as *accountSet) empty() bool { + return len(as.accounts) == 0 +} + // containsTx checks if the sender of a given tx is within the set. If the sender // cannot be derived, this method returns false. func (as *accountSet) containsTx(tx *types.Transaction) bool { @@ -1475,8 +1536,9 @@ func (as *accountSet) merge(other *accountSet) { // peeking into the pool in TxPool.Get without having to acquire the widely scoped // TxPool.mu mutex. type txLookup struct { - all map[common.Hash]*types.Transaction - lock sync.RWMutex + all map[common.Hash]*types.Transaction + slots int + lock sync.RWMutex } // newTxLookup returns a new txLookup structure. @@ -1514,11 +1576,22 @@ func (t *txLookup) Count() int { return len(t.all) } +// Slots returns the current number of slots used in the lookup. +func (t *txLookup) Slots() int { + t.lock.RLock() + defer t.lock.RUnlock() + + return t.slots +} + // Add adds a transaction to the lookup. func (t *txLookup) Add(tx *types.Transaction) { t.lock.Lock() defer t.lock.Unlock() + t.slots += numSlots(tx) + slotsGauge.Update(int64(t.slots)) + t.all[tx.Hash()] = tx } @@ -1527,5 +1600,13 @@ func (t *txLookup) Remove(hash common.Hash) { t.lock.Lock() defer t.lock.Unlock() + t.slots -= numSlots(t.all[hash]) + slotsGauge.Update(int64(t.slots)) + delete(t.all, hash) } + +// numSlots calculates the number of slots needed for a single transaction. +func numSlots(tx *types.Transaction) int { + return int((tx.Size() + txSlotSize - 1) / txSlotSize) +} |