From 78745551c077bf54151202138c2629f288769561 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 15 Sep 2020 23:55:34 -0400 Subject: WIP: geth-tavum --- eth/backend.go | 2 +- eth/bloombits.go | 13 +++- eth/filters/api.go | 16 ++-- eth/filters/filter.go | 12 +-- eth/filters/filter_system.go | 175 ++++++++++++++++++++----------------------- eth/gasprice/gasprice.go | 145 +++++++++++++++++++++-------------- eth/metrics.go | 139 ---------------------------------- eth/protocol.go | 68 +++++++++++------ eth/tracers/tracer.go | 35 +++++---- 9 files changed, 263 insertions(+), 342 deletions(-) delete mode 100644 eth/metrics.go (limited to 'eth') diff --git a/eth/backend.go b/eth/backend.go index 773f48e..056f8cb 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -26,7 +26,6 @@ import ( //"sync/atomic" "github.com/ava-labs/coreth/accounts" - "github.com/ava-labs/coreth/accounts/abi/bind" "github.com/ava-labs/coreth/consensus" "github.com/ava-labs/coreth/consensus/clique" "github.com/ava-labs/coreth/consensus/dummy" @@ -42,6 +41,7 @@ import ( "github.com/ava-labs/coreth/node" "github.com/ava-labs/coreth/params" "github.com/ava-labs/coreth/rpc" + "github.com/ava-labs/go-ethereum/accounts/abi/bind" "github.com/ava-labs/go-ethereum/common" "github.com/ava-labs/go-ethereum/common/hexutil" "github.com/ava-labs/go-ethereum/core/bloombits" diff --git a/eth/bloombits.go b/eth/bloombits.go index 7136c29..a6b51a2 100644 --- a/eth/bloombits.go +++ b/eth/bloombits.go @@ -24,9 +24,9 @@ import ( "github.com/ava-labs/coreth/core/bloombits" "github.com/ava-labs/coreth/core/rawdb" "github.com/ava-labs/coreth/core/types" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/common/bitutil" - "github.com/ava-labs/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/bitutil" + "github.com/ethereum/go-ethereum/ethdb" ) const ( @@ -54,7 +54,7 @@ func (eth *Ethereum) startBloomHandlers(sectionSize uint64) { go func() { for { select { - case <-eth.shutdownChan: + case <-eth.closeBloomHandler: return case request := <-eth.bloomRequests: @@ -136,3 +136,8 @@ func (b *BloomIndexer) Commit() error { } return batch.Write() } + +// Prune returns an empty error since we don't support pruning here. +func (b *BloomIndexer) Prune(threshold uint64) error { + return nil +} diff --git a/eth/filters/api.go b/eth/filters/api.go index a1d0b21..71e6454 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -27,11 +27,11 @@ import ( "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/rpc" - ethereum "github.com/ava-labs/go-ethereum" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/common/hexutil" - "github.com/ava-labs/go-ethereum/ethdb" - "github.com/ava-labs/go-ethereum/event" + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" ) var ( @@ -65,9 +65,8 @@ type PublicFilterAPI struct { func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { api := &PublicFilterAPI{ backend: backend, - mux: backend.EventMux(), chainDb: backend.ChainDb(), - events: NewEventSystem(backend.EventMux(), backend, lightMode), + events: NewEventSystem(backend, lightMode), filters: make(map[rpc.ID]*filter), } go api.timeoutLoop() @@ -79,6 +78,7 @@ func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { // Tt is started when the api is created. func (api *PublicFilterAPI) timeoutLoop() { ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() for { <-ticker.C api.filtersMu.Lock() @@ -428,7 +428,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { hashes := f.hashes f.hashes = nil return returnHashes(hashes), nil - case LogsSubscription: + case LogsSubscription, MinedAndPendingLogsSubscription: logs := f.logs f.logs = nil return returnLogs(logs), nil diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 6d4a74d..e189391 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -24,15 +24,14 @@ import ( "github.com/ava-labs/coreth/core" "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/rpc" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/core/bloombits" - "github.com/ava-labs/go-ethereum/ethdb" - "github.com/ava-labs/go-ethereum/event" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" ) type Backend interface { ChainDb() ethdb.Database - EventMux() *event.TypeMux HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) @@ -42,6 +41,7 @@ type Backend interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription + SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) @@ -210,7 +210,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err } } -// indexedLogs returns the logs matching the filter criteria based on raw block +// unindexedLogs returns the logs matching the filter criteria based on raw block // iteration and bloom matching. func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { var logs []*types.Log diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 41427c7..1c0f1bf 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -20,7 +20,6 @@ package filters import ( "context" - "errors" "fmt" "sync" "time" @@ -29,10 +28,10 @@ import ( "github.com/ava-labs/coreth/core/rawdb" "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/rpc" - ethereum "github.com/ava-labs/go-ethereum" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/event" - "github.com/ava-labs/go-ethereum/log" + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" ) // Type determines the kind of filter and is used to put the filter in to @@ -58,7 +57,6 @@ const ( ) const ( - // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 @@ -70,10 +68,6 @@ const ( chainEvChanSize = 10 ) -var ( - ErrInvalidSubscriptionID = errors.New("invalid id") -) - type subscription struct { id rpc.ID typ Type @@ -89,25 +83,25 @@ type subscription struct { // EventSystem creates subscriptions, processes events and broadcasts them to the // subscription which match the subscription criteria. type EventSystem struct { - mux *event.TypeMux backend Backend lightMode bool lastHead *types.Header // Subscriptions - txsSub event.Subscription // Subscription for new transaction event - logsSub event.Subscription // Subscription for new log event - rmLogsSub event.Subscription // Subscription for removed log event - chainSub event.Subscription // Subscription for new chain event - pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event + txsSub event.Subscription // Subscription for new transaction event + logsSub event.Subscription // Subscription for new log event + rmLogsSub event.Subscription // Subscription for removed log event + pendingLogsSub event.Subscription // Subscription for pending log event + chainSub event.Subscription // Subscription for new chain event // Channels - install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification - txsCh chan core.NewTxsEvent // Channel to receive new transactions event - logsCh chan []*types.Log // Channel to receive new log event - rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event - chainCh chan core.ChainEvent // Channel to receive new chain event + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txsCh chan core.NewTxsEvent // Channel to receive new transactions event + logsCh chan []*types.Log // Channel to receive new log event + pendingLogsCh chan []*types.Log // Channel to receive new log event + rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event + chainCh chan core.ChainEvent // Channel to receive new chain event } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -116,17 +110,17 @@ type EventSystem struct { // // The returned manager has a loop that needs to be stopped with the Stop function // or by stopping the given mux. -func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem { +func NewEventSystem(backend Backend, lightMode bool) *EventSystem { m := &EventSystem{ - mux: mux, - backend: backend, - lightMode: lightMode, - install: make(chan *subscription), - uninstall: make(chan *subscription), - txsCh: make(chan core.NewTxsEvent, txChanSize), - logsCh: make(chan []*types.Log, logsChanSize), - rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), - chainCh: make(chan core.ChainEvent, chainEvChanSize), + backend: backend, + lightMode: lightMode, + install: make(chan *subscription), + uninstall: make(chan *subscription), + txsCh: make(chan core.NewTxsEvent, txChanSize), + logsCh: make(chan []*types.Log, logsChanSize), + rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), + pendingLogsCh: make(chan []*types.Log, logsChanSize), + chainCh: make(chan core.ChainEvent, chainEvChanSize), } // Subscribe events @@ -134,12 +128,10 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) - // TODO(rjl493456442): use feed to subscribe pending log event - m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) + m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh) // Make sure none of the subscriptions are empty - if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || - m.pendingLogSub.Closed() { + if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil { log.Crit("Subscribe for event system failed") } @@ -316,58 +308,61 @@ func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscript type filterIndex map[Type]map[rpc.ID]*subscription -// broadcast event to filters that match criteria. -func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { - if ev == nil { +func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) { + if len(ev) == 0 { return } + for _, f := range filters[LogsSubscription] { + matchedLogs := filterLogs(ev, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics) + if len(matchedLogs) > 0 { + f.logs <- matchedLogs + } + } +} - switch e := ev.(type) { - case []*types.Log: - if len(e) > 0 { - for _, f := range filters[LogsSubscription] { - if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } - } +func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) { + if len(ev) == 0 { + return + } + for _, f := range filters[PendingLogsSubscription] { + matchedLogs := filterLogs(ev, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics) + if len(matchedLogs) > 0 { + f.logs <- matchedLogs } - case core.RemovedLogsEvent: - for _, f := range filters[LogsSubscription] { - if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + } +} + +func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLogsEvent) { + for _, f := range filters[LogsSubscription] { + matchedLogs := filterLogs(ev.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics) + if len(matchedLogs) > 0 { + f.logs <- matchedLogs } - case *event.TypeMuxEvent: - if muxe, ok := e.Data.(core.PendingLogsEvent); ok { - for _, f := range filters[PendingLogsSubscription] { - if e.Time.After(f.created) { - if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + } +} + +func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) { + hashes := make([]common.Hash, 0, len(ev.Txs)) + for _, tx := range ev.Txs { + hashes = append(hashes, tx.Hash()) + } + for _, f := range filters[PendingTransactionsSubscription] { + f.hashes <- hashes + } +} + +func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) { + for _, f := range filters[BlocksSubscription] { + f.headers <- ev.Block.Header() + } + if es.lightMode && len(filters[LogsSubscription]) > 0 { + es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) { + for _, f := range filters[LogsSubscription] { + if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } - } - case core.NewTxsEvent: - hashes := make([]common.Hash, 0, len(e.Txs)) - for _, tx := range e.Txs { - hashes = append(hashes, tx.Hash()) - } - for _, f := range filters[PendingTransactionsSubscription] { - f.hashes <- hashes - } - case core.ChainEvent: - for _, f := range filters[BlocksSubscription] { - f.headers <- e.Block.Header() - } - if es.lightMode && len(filters[LogsSubscription]) > 0 { - es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) { - for _, f := range filters[LogsSubscription] { - if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } - } - }) - } + }) } } @@ -448,10 +443,10 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. func (es *EventSystem) eventLoop() { // Ensure all subscriptions get cleaned up defer func() { - es.pendingLogSub.Unsubscribe() es.txsSub.Unsubscribe() es.logsSub.Unsubscribe() es.rmLogsSub.Unsubscribe() + es.pendingLogsSub.Unsubscribe() es.chainSub.Unsubscribe() }() @@ -462,20 +457,16 @@ func (es *EventSystem) eventLoop() { for { select { - // Handle subscribed events case ev := <-es.txsCh: - es.broadcast(index, ev) + es.handleTxsEvent(index, ev) case ev := <-es.logsCh: - es.broadcast(index, ev) + es.handleLogs(index, ev) case ev := <-es.rmLogsCh: - es.broadcast(index, ev) + es.handleRemovedLogs(index, ev) + case ev := <-es.pendingLogsCh: + es.handlePendingLogs(index, ev) case ev := <-es.chainCh: - es.broadcast(index, ev) - case ev, active := <-es.pendingLogSub.Chan(): - if !active { // system stopped - return - } - es.broadcast(index, ev) + es.handleChainEvent(index, ev) case f := <-es.install: if f.typ == MinedAndPendingLogsSubscription { diff --git a/eth/gasprice/gasprice.go b/eth/gasprice/gasprice.go index 23e49a6..14f50b1 100644 --- a/eth/gasprice/gasprice.go +++ b/eth/gasprice/gasprice.go @@ -23,123 +23,144 @@ import ( "sync" "github.com/ava-labs/coreth/core/types" - "github.com/ava-labs/coreth/internal/ethapi" "github.com/ava-labs/coreth/params" "github.com/ava-labs/coreth/rpc" - "github.com/ava-labs/go-ethereum/common" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" ) -var maxPrice = big.NewInt(500 * params.GWei) +const sampleNumber = 3 // Number of transactions sampled in a block + +var DefaultMaxPrice = big.NewInt(500 * params.GWei) type Config struct { Blocks int Percentile int Default *big.Int `toml:",omitempty"` + MaxPrice *big.Int `toml:",omitempty"` +} + +// OracleBackend includes all necessary background APIs for oracle. +type OracleBackend interface { + HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error) + BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error) + ChainConfig() *params.ChainConfig } // Oracle recommends gas prices based on the content of recent // blocks. Suitable for both light and full clients. type Oracle struct { - backend ethapi.Backend + backend OracleBackend lastHead common.Hash lastPrice *big.Int + maxPrice *big.Int cacheLock sync.RWMutex fetchLock sync.Mutex - checkBlocks, maxEmpty, maxBlocks int - percentile int + checkBlocks int + percentile int } -// NewOracle returns a new oracle. -func NewOracle(backend ethapi.Backend, params Config) *Oracle { +// NewOracle returns a new gasprice oracle which can recommend suitable +// gasprice for newly created transaction. +func NewOracle(backend OracleBackend, params Config) *Oracle { blocks := params.Blocks if blocks < 1 { blocks = 1 + log.Warn("Sanitizing invalid gasprice oracle sample blocks", "provided", params.Blocks, "updated", blocks) } percent := params.Percentile if percent < 0 { percent = 0 + log.Warn("Sanitizing invalid gasprice oracle sample percentile", "provided", params.Percentile, "updated", percent) } if percent > 100 { percent = 100 + log.Warn("Sanitizing invalid gasprice oracle sample percentile", "provided", params.Percentile, "updated", percent) + } + maxPrice := params.MaxPrice + if maxPrice == nil || maxPrice.Int64() <= 0 { + maxPrice = DefaultMaxPrice + log.Warn("Sanitizing invalid gasprice oracle price cap", "provided", params.MaxPrice, "updated", maxPrice) } return &Oracle{ backend: backend, lastPrice: params.Default, + maxPrice: maxPrice, checkBlocks: blocks, - maxEmpty: blocks / 2, - maxBlocks: blocks * 5, percentile: percent, } } -// SuggestPrice returns the recommended gas price. +// SuggestPrice returns a gasprice so that newly created transaction can +// have a very high chance to be included in the following blocks. func (gpo *Oracle) SuggestPrice(ctx context.Context) (*big.Int, error) { - gpo.cacheLock.RLock() - lastHead := gpo.lastHead - lastPrice := gpo.lastPrice - gpo.cacheLock.RUnlock() - head, _ := gpo.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) headHash := head.Hash() + + // If the latest gasprice is still available, return it. + gpo.cacheLock.RLock() + lastHead, lastPrice := gpo.lastHead, gpo.lastPrice + gpo.cacheLock.RUnlock() if headHash == lastHead { return lastPrice, nil } - gpo.fetchLock.Lock() defer gpo.fetchLock.Unlock() - // try checking the cache again, maybe the last fetch fetched what we need + // Try checking the cache again, maybe the last fetch fetched what we need gpo.cacheLock.RLock() - lastHead = gpo.lastHead - lastPrice = gpo.lastPrice + lastHead, lastPrice = gpo.lastHead, gpo.lastPrice gpo.cacheLock.RUnlock() if headHash == lastHead { return lastPrice, nil } - - blockNum := head.Number.Uint64() - ch := make(chan getBlockPricesResult, gpo.checkBlocks) - sent := 0 - exp := 0 - var blockPrices []*big.Int - for sent < gpo.checkBlocks && blockNum > 0 { - go gpo.getBlockPrices(ctx, types.MakeSigner(gpo.backend.ChainConfig(), big.NewInt(int64(blockNum))), blockNum, ch) + var ( + sent, exp int + number = head.Number.Uint64() + result = make(chan getBlockPricesResult, gpo.checkBlocks) + quit = make(chan struct{}) + txPrices []*big.Int + ) + for sent < gpo.checkBlocks && number > 0 { + go gpo.getBlockPrices(ctx, types.MakeSigner(gpo.backend.ChainConfig(), big.NewInt(int64(number))), number, sampleNumber, result, quit) sent++ exp++ - blockNum-- + number-- } - maxEmpty := gpo.maxEmpty for exp > 0 { - res := <-ch + res := <-result if res.err != nil { + close(quit) return lastPrice, res.err } exp-- - if res.price != nil { - blockPrices = append(blockPrices, res.price) - continue - } - if maxEmpty > 0 { - maxEmpty-- - continue + // Nothing returned. There are two special cases here: + // - The block is empty + // - All the transactions included are sent by the miner itself. + // In these cases, use the latest calculated price for samping. + if len(res.prices) == 0 { + res.prices = []*big.Int{lastPrice} } - if blockNum > 0 && sent < gpo.maxBlocks { - go gpo.getBlockPrices(ctx, types.MakeSigner(gpo.backend.ChainConfig(), big.NewInt(int64(blockNum))), blockNum, ch) + // Besides, in order to collect enough data for sampling, if nothing + // meaningful returned, try to query more blocks. But the maximum + // is 2*checkBlocks. + if len(res.prices) == 1 && len(txPrices)+1+exp < gpo.checkBlocks*2 && number > 0 { + go gpo.getBlockPrices(ctx, types.MakeSigner(gpo.backend.ChainConfig(), big.NewInt(int64(number))), number, sampleNumber, result, quit) sent++ exp++ - blockNum-- + number-- } + txPrices = append(txPrices, res.prices...) } price := lastPrice - if len(blockPrices) > 0 { - sort.Sort(bigIntArray(blockPrices)) - price = blockPrices[(len(blockPrices)-1)*gpo.percentile/100] + if len(txPrices) > 0 { + sort.Sort(bigIntArray(txPrices)) + price = txPrices[(len(txPrices)-1)*gpo.percentile/100] } - if price.Cmp(maxPrice) > 0 { - price = new(big.Int).Set(maxPrice) + if price.Cmp(gpo.maxPrice) > 0 { + price = new(big.Int).Set(gpo.maxPrice) } - gpo.cacheLock.Lock() gpo.lastHead = headHash gpo.lastPrice = price @@ -148,38 +169,48 @@ func (gpo *Oracle) SuggestPrice(ctx context.Context) (*big.Int, error) { } type getBlockPricesResult struct { - price *big.Int - err error + prices []*big.Int + err error } type transactionsByGasPrice []*types.Transaction func (t transactionsByGasPrice) Len() int { return len(t) } func (t transactionsByGasPrice) Swap(i, j int) { t[i], t[j] = t[j], t[i] } -func (t transactionsByGasPrice) Less(i, j int) bool { return t[i].GasPrice().Cmp(t[j].GasPrice()) < 0 } +func (t transactionsByGasPrice) Less(i, j int) bool { return t[i].GasPriceCmp(t[j]) < 0 } // getBlockPrices calculates the lowest transaction gas price in a given block -// and sends it to the result channel. If the block is empty, price is nil. -func (gpo *Oracle) getBlockPrices(ctx context.Context, signer types.Signer, blockNum uint64, ch chan getBlockPricesResult) { +// and sends it to the result channel. If the block is empty or all transactions +// are sent by the miner itself(it doesn't make any sense to include this kind of +// transaction prices for sampling), nil gasprice is returned. +func (gpo *Oracle) getBlockPrices(ctx context.Context, signer types.Signer, blockNum uint64, limit int, result chan getBlockPricesResult, quit chan struct{}) { block, err := gpo.backend.BlockByNumber(ctx, rpc.BlockNumber(blockNum)) if block == nil { - ch <- getBlockPricesResult{nil, err} + select { + case result <- getBlockPricesResult{nil, err}: + case <-quit: + } return } - blockTxs := block.Transactions() txs := make([]*types.Transaction, len(blockTxs)) copy(txs, blockTxs) sort.Sort(transactionsByGasPrice(txs)) + var prices []*big.Int for _, tx := range txs { sender, err := types.Sender(signer, tx) if err == nil && sender != block.Coinbase() { - ch <- getBlockPricesResult{tx.GasPrice(), nil} - return + prices = append(prices, tx.GasPrice()) + if len(prices) >= limit { + break + } } } - ch <- getBlockPricesResult{nil, nil} + select { + case result <- getBlockPricesResult{prices, nil}: + case <-quit: + } } type bigIntArray []*big.Int diff --git a/eth/metrics.go b/eth/metrics.go deleted file mode 100644 index 0729c44..0000000 --- a/eth/metrics.go +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2015 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package eth - -import ( - "github.com/ava-labs/go-ethereum/metrics" - "github.com/ava-labs/go-ethereum/p2p" -) - -var ( - propTxnInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/txns/in/packets", nil) - propTxnInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/txns/in/traffic", nil) - propTxnOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/txns/out/packets", nil) - propTxnOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/txns/out/traffic", nil) - propHashInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/hashes/in/packets", nil) - propHashInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/hashes/in/traffic", nil) - propHashOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/hashes/out/packets", nil) - propHashOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/hashes/out/traffic", nil) - propBlockInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/blocks/in/packets", nil) - propBlockInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/blocks/in/traffic", nil) - propBlockOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/blocks/out/packets", nil) - propBlockOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/blocks/out/traffic", nil) - reqHeaderInPacketsMeter = metrics.NewRegisteredMeter("eth/req/headers/in/packets", nil) - reqHeaderInTrafficMeter = metrics.NewRegisteredMeter("eth/req/headers/in/traffic", nil) - reqHeaderOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/headers/out/packets", nil) - reqHeaderOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/headers/out/traffic", nil) - reqBodyInPacketsMeter = metrics.NewRegisteredMeter("eth/req/bodies/in/packets", nil) - reqBodyInTrafficMeter = metrics.NewRegisteredMeter("eth/req/bodies/in/traffic", nil) - reqBodyOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/bodies/out/packets", nil) - reqBodyOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/bodies/out/traffic", nil) - reqStateInPacketsMeter = metrics.NewRegisteredMeter("eth/req/states/in/packets", nil) - reqStateInTrafficMeter = metrics.NewRegisteredMeter("eth/req/states/in/traffic", nil) - reqStateOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/states/out/packets", nil) - reqStateOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/states/out/traffic", nil) - reqReceiptInPacketsMeter = metrics.NewRegisteredMeter("eth/req/receipts/in/packets", nil) - reqReceiptInTrafficMeter = metrics.NewRegisteredMeter("eth/req/receipts/in/traffic", nil) - reqReceiptOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/receipts/out/packets", nil) - reqReceiptOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/receipts/out/traffic", nil) - miscInPacketsMeter = metrics.NewRegisteredMeter("eth/misc/in/packets", nil) - miscInTrafficMeter = metrics.NewRegisteredMeter("eth/misc/in/traffic", nil) - miscOutPacketsMeter = metrics.NewRegisteredMeter("eth/misc/out/packets", nil) - miscOutTrafficMeter = metrics.NewRegisteredMeter("eth/misc/out/traffic", nil) -) - -// meteredMsgReadWriter is a wrapper around a p2p.MsgReadWriter, capable of -// accumulating the above defined metrics based on the data stream contents. -type meteredMsgReadWriter struct { - p2p.MsgReadWriter // Wrapped message stream to meter - version int // Protocol version to select correct meters -} - -// newMeteredMsgWriter wraps a p2p MsgReadWriter with metering support. If the -// metrics system is disabled, this function returns the original object. -func newMeteredMsgWriter(rw p2p.MsgReadWriter) p2p.MsgReadWriter { - if !metrics.Enabled { - return rw - } - return &meteredMsgReadWriter{MsgReadWriter: rw} -} - -// Init sets the protocol version used by the stream to know which meters to -// increment in case of overlapping message ids between protocol versions. -func (rw *meteredMsgReadWriter) Init(version int) { - rw.version = version -} - -func (rw *meteredMsgReadWriter) ReadMsg() (p2p.Msg, error) { - // Read the message and short circuit in case of an error - msg, err := rw.MsgReadWriter.ReadMsg() - if err != nil { - return msg, err - } - // Account for the data traffic - packets, traffic := miscInPacketsMeter, miscInTrafficMeter - switch { - case msg.Code == BlockHeadersMsg: - packets, traffic = reqHeaderInPacketsMeter, reqHeaderInTrafficMeter - case msg.Code == BlockBodiesMsg: - packets, traffic = reqBodyInPacketsMeter, reqBodyInTrafficMeter - - case rw.version >= eth63 && msg.Code == NodeDataMsg: - packets, traffic = reqStateInPacketsMeter, reqStateInTrafficMeter - case rw.version >= eth63 && msg.Code == ReceiptsMsg: - packets, traffic = reqReceiptInPacketsMeter, reqReceiptInTrafficMeter - - case msg.Code == NewBlockHashesMsg: - packets, traffic = propHashInPacketsMeter, propHashInTrafficMeter - case msg.Code == NewBlockMsg: - packets, traffic = propBlockInPacketsMeter, propBlockInTrafficMeter - case msg.Code == TxMsg: - packets, traffic = propTxnInPacketsMeter, propTxnInTrafficMeter - } - packets.Mark(1) - traffic.Mark(int64(msg.Size)) - - return msg, err -} - -func (rw *meteredMsgReadWriter) WriteMsg(msg p2p.Msg) error { - // Account for the data traffic - packets, traffic := miscOutPacketsMeter, miscOutTrafficMeter - switch { - case msg.Code == BlockHeadersMsg: - packets, traffic = reqHeaderOutPacketsMeter, reqHeaderOutTrafficMeter - case msg.Code == BlockBodiesMsg: - packets, traffic = reqBodyOutPacketsMeter, reqBodyOutTrafficMeter - - case rw.version >= eth63 && msg.Code == NodeDataMsg: - packets, traffic = reqStateOutPacketsMeter, reqStateOutTrafficMeter - case rw.version >= eth63 && msg.Code == ReceiptsMsg: - packets, traffic = reqReceiptOutPacketsMeter, reqReceiptOutTrafficMeter - - case msg.Code == NewBlockHashesMsg: - packets, traffic = propHashOutPacketsMeter, propHashOutTrafficMeter - case msg.Code == NewBlockMsg: - packets, traffic = propBlockOutPacketsMeter, propBlockOutTrafficMeter - case msg.Code == TxMsg: - packets, traffic = propTxnOutPacketsMeter, propTxnOutTrafficMeter - } - packets.Mark(1) - traffic.Mark(int64(msg.Size)) - - // Send the packet to the p2p layer - return rw.MsgReadWriter.WriteMsg(msg) -} diff --git a/eth/protocol.go b/eth/protocol.go index e205aad..94260c2 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -23,45 +23,51 @@ import ( "github.com/ava-labs/coreth/core" "github.com/ava-labs/coreth/core/types" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/event" - "github.com/ava-labs/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/rlp" ) // Constants to match up protocol versions and messages const ( - eth62 = 62 eth63 = 63 + eth64 = 64 + eth65 = 65 ) // protocolName is the official short name of the protocol used during capability negotiation. const protocolName = "eth" // ProtocolVersions are the supported versions of the eth protocol (first is primary). -var ProtocolVersions = []uint{eth63} +var ProtocolVersions = []uint{eth65, eth64, eth63} // protocolLengths are the number of implemented message corresponding to different protocol versions. -var protocolLengths = map[uint]uint64{eth63: 17, eth62: 8} +var protocolLengths = map[uint]uint64{eth65: 17, eth64: 17, eth63: 17} const protocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message // eth protocol message codes const ( - // Protocol messages belonging to eth/62 StatusMsg = 0x00 NewBlockHashesMsg = 0x01 - TxMsg = 0x02 + TransactionMsg = 0x02 GetBlockHeadersMsg = 0x03 BlockHeadersMsg = 0x04 GetBlockBodiesMsg = 0x05 BlockBodiesMsg = 0x06 NewBlockMsg = 0x07 - - // Protocol messages belonging to eth/63 - GetNodeDataMsg = 0x0d - NodeDataMsg = 0x0e - GetReceiptsMsg = 0x0f - ReceiptsMsg = 0x10 + GetNodeDataMsg = 0x0d + NodeDataMsg = 0x0e + GetReceiptsMsg = 0x0f + ReceiptsMsg = 0x10 + + // New protocol message codes introduced in eth65 + // + // Previously these message ids were used by some legacy and unsupported + // eth protocols, reown them here. + NewPooledTransactionHashesMsg = 0x08 + GetPooledTransactionsMsg = 0x09 + PooledTransactionsMsg = 0x0a ) type errCode int @@ -71,11 +77,11 @@ const ( ErrDecode ErrInvalidMsgCode ErrProtocolVersionMismatch - ErrNetworkIdMismatch - ErrGenesisBlockMismatch + ErrNetworkIDMismatch + ErrGenesisMismatch + ErrForkIDRejected ErrNoStatusMsg ErrExtraStatusMsg - ErrSuspendedPeer ) func (e errCode) String() string { @@ -88,14 +94,22 @@ var errorToString = map[int]string{ ErrDecode: "Invalid message", ErrInvalidMsgCode: "Invalid message code", ErrProtocolVersionMismatch: "Protocol version mismatch", - ErrNetworkIdMismatch: "NetworkId mismatch", - ErrGenesisBlockMismatch: "Genesis block mismatch", + ErrNetworkIDMismatch: "Network ID mismatch", + ErrGenesisMismatch: "Genesis mismatch", + ErrForkIDRejected: "Fork ID rejected", ErrNoStatusMsg: "No status message", ErrExtraStatusMsg: "Extra status message", - ErrSuspendedPeer: "Suspended peer", } type txPool interface { + // Has returns an indicator whether txpool has a transaction + // cached with the given hash. + Has(hash common.Hash) bool + + // Get retrieves the transaction from local txpool with given + // tx hash. + Get(hash common.Hash) *types.Transaction + // AddRemotes should add the given transactions to the pool. AddRemotes([]*types.Transaction) []error @@ -108,8 +122,8 @@ type txPool interface { SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription } -// statusData is the network packet for the status message. -type statusData struct { +// statusData63 is the network packet for the status message for eth/63. +type statusData63 struct { ProtocolVersion uint32 NetworkId uint64 TD *big.Int @@ -117,6 +131,16 @@ type statusData struct { GenesisBlock common.Hash } +// statusData is the network packet for the status message for eth/64 and later. +type statusData struct { + ProtocolVersion uint32 + NetworkID uint64 + TD *big.Int + Head common.Hash + Genesis common.Hash + ForkID forkid.ID +} + // newBlockHashesData is the network packet for the block announcements. type newBlockHashesData []struct { Hash common.Hash // Hash of one particular block being announced diff --git a/eth/tracers/tracer.go b/eth/tracers/tracer.go index f2ef25d..524500d 100644 --- a/eth/tracers/tracer.go +++ b/eth/tracers/tracer.go @@ -26,10 +26,10 @@ import ( "unsafe" "github.com/ava-labs/coreth/core/vm" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/common/hexutil" - "github.com/ava-labs/go-ethereum/crypto" - "github.com/ava-labs/go-ethereum/log" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" duktape "gopkg.in/olebedev/go-duktape.v3" ) @@ -93,18 +93,27 @@ type memoryWrapper struct { // slice returns the requested range of memory as a byte slice. func (mw *memoryWrapper) slice(begin, end int64) []byte { + if end == begin { + return []byte{} + } + if end < begin || begin < 0 { + // TODO(karalabe): We can't js-throw from Go inside duktape inside Go. The Go + // runtime goes belly up https://github.com/golang/go/issues/15639. + log.Warn("Tracer accessed out of bound memory", "offset", begin, "end", end) + return nil + } if mw.memory.Len() < int(end) { // TODO(karalabe): We can't js-throw from Go inside duktape inside Go. The Go // runtime goes belly up https://github.com/golang/go/issues/15639. log.Warn("Tracer accessed out of bound memory", "available", mw.memory.Len(), "offset", begin, "size", end-begin) return nil } - return mw.memory.Get(begin, end-begin) + return mw.memory.GetCopy(begin, end-begin) } // getUint returns the 32 bytes at the specified address interpreted as a uint. func (mw *memoryWrapper) getUint(addr int64) *big.Int { - if mw.memory.Len() < int(addr)+32 { + if mw.memory.Len() < int(addr)+32 || addr < 0 { // TODO(karalabe): We can't js-throw from Go inside duktape inside Go. The Go // runtime goes belly up https://github.com/golang/go/issues/15639. log.Warn("Tracer accessed out of bound memory", "available", mw.memory.Len(), "offset", addr, "size", 32) @@ -147,13 +156,13 @@ type stackWrapper struct { // peek returns the nth-from-the-top element of the stack. func (sw *stackWrapper) peek(idx int) *big.Int { - if len(sw.stack.Data()) <= idx { + if len(sw.stack.Data()) <= idx || idx < 0 { // TODO(karalabe): We can't js-throw from Go inside duktape inside Go. The Go // runtime goes belly up https://github.com/golang/go/issues/15639. log.Warn("Tracer accessed out of bound stack", "size", len(sw.stack.Data()), "index", idx) return new(big.Int) } - return sw.stack.Data()[len(sw.stack.Data())-idx-1] + return sw.stack.Back(idx).ToBig() } // pushObject assembles a JSVM object wrapping a swappable stack and pushes it @@ -419,17 +428,17 @@ func New(code string) (*Tracer, error) { tracer.tracerObject = 0 // yeah, nice, eval can't return the index itself if !tracer.vm.GetPropString(tracer.tracerObject, "step") { - return nil, fmt.Errorf("Trace object must expose a function step()") + return nil, fmt.Errorf("trace object must expose a function step()") } tracer.vm.Pop() if !tracer.vm.GetPropString(tracer.tracerObject, "fault") { - return nil, fmt.Errorf("Trace object must expose a function fault()") + return nil, fmt.Errorf("trace object must expose a function fault()") } tracer.vm.Pop() if !tracer.vm.GetPropString(tracer.tracerObject, "result") { - return nil, fmt.Errorf("Trace object must expose a function result()") + return nil, fmt.Errorf("trace object must expose a function result()") } tracer.vm.Pop() @@ -532,7 +541,7 @@ func (jst *Tracer) CaptureStart(from common.Address, to common.Address, create b } // CaptureState implements the Tracer interface to trace a single step of VM execution. -func (jst *Tracer) CaptureState(env *vm.EVM, pc uint64, op vm.OpCode, gas, cost uint64, memory *vm.Memory, stack *vm.Stack, contract *vm.Contract, depth int, err error) error { +func (jst *Tracer) CaptureState(env *vm.EVM, pc uint64, op vm.OpCode, gas, cost uint64, memory *vm.Memory, stack *vm.Stack, rStack *vm.ReturnStack, rdata []byte, contract *vm.Contract, depth int, err error) error { if jst.err == nil { // Initialize the context if it wasn't done yet if !jst.inited { @@ -571,7 +580,7 @@ func (jst *Tracer) CaptureState(env *vm.EVM, pc uint64, op vm.OpCode, gas, cost // CaptureFault implements the Tracer interface to trace an execution fault // while running an opcode. -func (jst *Tracer) CaptureFault(env *vm.EVM, pc uint64, op vm.OpCode, gas, cost uint64, memory *vm.Memory, stack *vm.Stack, contract *vm.Contract, depth int, err error) error { +func (jst *Tracer) CaptureFault(env *vm.EVM, pc uint64, op vm.OpCode, gas, cost uint64, memory *vm.Memory, stack *vm.Stack, rStack *vm.ReturnStack, contract *vm.Contract, depth int, err error) error { if jst.err == nil { // Apart from the error, everything matches the previous invocation jst.errorValue = new(string) -- cgit v1.2.3