aboutsummaryrefslogtreecommitdiff
path: root/eth
diff options
context:
space:
mode:
authorDeterminant <tederminant@gmail.com>2020-09-15 23:55:34 -0400
committerDeterminant <tederminant@gmail.com>2020-09-15 23:55:34 -0400
commit78745551c077bf54151202138c2629f288769561 (patch)
tree2b628e99fd110617089778fa91235ecd2888f4ef /eth
parent7d1388c743b4ec8f4a86bea95bfada785dee83f7 (diff)
WIP: geth-tavum
Diffstat (limited to 'eth')
-rw-r--r--eth/backend.go2
-rw-r--r--eth/bloombits.go13
-rw-r--r--eth/filters/api.go16
-rw-r--r--eth/filters/filter.go12
-rw-r--r--eth/filters/filter_system.go175
-rw-r--r--eth/gasprice/gasprice.go145
-rw-r--r--eth/metrics.go139
-rw-r--r--eth/protocol.go68
-rw-r--r--eth/tracers/tracer.go35
9 files changed, 263 insertions, 342 deletions
diff --git a/eth/backend.go b/eth/backend.go
index 773f48e..056f8cb 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -26,7 +26,6 @@ import (
//"sync/atomic"
"github.com/ava-labs/coreth/accounts"
- "github.com/ava-labs/coreth/accounts/abi/bind"
"github.com/ava-labs/coreth/consensus"
"github.com/ava-labs/coreth/consensus/clique"
"github.com/ava-labs/coreth/consensus/dummy"
@@ -42,6 +41,7 @@ import (
"github.com/ava-labs/coreth/node"
"github.com/ava-labs/coreth/params"
"github.com/ava-labs/coreth/rpc"
+ "github.com/ava-labs/go-ethereum/accounts/abi/bind"
"github.com/ava-labs/go-ethereum/common"
"github.com/ava-labs/go-ethereum/common/hexutil"
"github.com/ava-labs/go-ethereum/core/bloombits"
diff --git a/eth/bloombits.go b/eth/bloombits.go
index 7136c29..a6b51a2 100644
--- a/eth/bloombits.go
+++ b/eth/bloombits.go
@@ -24,9 +24,9 @@ import (
"github.com/ava-labs/coreth/core/bloombits"
"github.com/ava-labs/coreth/core/rawdb"
"github.com/ava-labs/coreth/core/types"
- "github.com/ava-labs/go-ethereum/common"
- "github.com/ava-labs/go-ethereum/common/bitutil"
- "github.com/ava-labs/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/bitutil"
+ "github.com/ethereum/go-ethereum/ethdb"
)
const (
@@ -54,7 +54,7 @@ func (eth *Ethereum) startBloomHandlers(sectionSize uint64) {
go func() {
for {
select {
- case <-eth.shutdownChan:
+ case <-eth.closeBloomHandler:
return
case request := <-eth.bloomRequests:
@@ -136,3 +136,8 @@ func (b *BloomIndexer) Commit() error {
}
return batch.Write()
}
+
+// Prune returns an empty error since we don't support pruning here.
+func (b *BloomIndexer) Prune(threshold uint64) error {
+ return nil
+}
diff --git a/eth/filters/api.go b/eth/filters/api.go
index a1d0b21..71e6454 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -27,11 +27,11 @@ import (
"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/rpc"
- ethereum "github.com/ava-labs/go-ethereum"
- "github.com/ava-labs/go-ethereum/common"
- "github.com/ava-labs/go-ethereum/common/hexutil"
- "github.com/ava-labs/go-ethereum/ethdb"
- "github.com/ava-labs/go-ethereum/event"
+ ethereum "github.com/ethereum/go-ethereum"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/event"
)
var (
@@ -65,9 +65,8 @@ type PublicFilterAPI struct {
func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
api := &PublicFilterAPI{
backend: backend,
- mux: backend.EventMux(),
chainDb: backend.ChainDb(),
- events: NewEventSystem(backend.EventMux(), backend, lightMode),
+ events: NewEventSystem(backend, lightMode),
filters: make(map[rpc.ID]*filter),
}
go api.timeoutLoop()
@@ -79,6 +78,7 @@ func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
// Tt is started when the api is created.
func (api *PublicFilterAPI) timeoutLoop() {
ticker := time.NewTicker(5 * time.Minute)
+ defer ticker.Stop()
for {
<-ticker.C
api.filtersMu.Lock()
@@ -428,7 +428,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes), nil
- case LogsSubscription:
+ case LogsSubscription, MinedAndPendingLogsSubscription:
logs := f.logs
f.logs = nil
return returnLogs(logs), nil
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index 6d4a74d..e189391 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -24,15 +24,14 @@ import (
"github.com/ava-labs/coreth/core"
"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/rpc"
- "github.com/ava-labs/go-ethereum/common"
- "github.com/ava-labs/go-ethereum/core/bloombits"
- "github.com/ava-labs/go-ethereum/ethdb"
- "github.com/ava-labs/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/bloombits"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/event"
)
type Backend interface {
ChainDb() ethdb.Database
- EventMux() *event.TypeMux
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
@@ -42,6 +41,7 @@ type Backend interface {
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
+ SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
@@ -210,7 +210,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err
}
}
-// indexedLogs returns the logs matching the filter criteria based on raw block
+// unindexedLogs returns the logs matching the filter criteria based on raw block
// iteration and bloom matching.
func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
var logs []*types.Log
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 41427c7..1c0f1bf 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -20,7 +20,6 @@ package filters
import (
"context"
- "errors"
"fmt"
"sync"
"time"
@@ -29,10 +28,10 @@ import (
"github.com/ava-labs/coreth/core/rawdb"
"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/rpc"
- ethereum "github.com/ava-labs/go-ethereum"
- "github.com/ava-labs/go-ethereum/common"
- "github.com/ava-labs/go-ethereum/event"
- "github.com/ava-labs/go-ethereum/log"
+ ethereum "github.com/ethereum/go-ethereum"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
)
// Type determines the kind of filter and is used to put the filter in to
@@ -58,7 +57,6 @@ const (
)
const (
-
// txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096
@@ -70,10 +68,6 @@ const (
chainEvChanSize = 10
)
-var (
- ErrInvalidSubscriptionID = errors.New("invalid id")
-)
-
type subscription struct {
id rpc.ID
typ Type
@@ -89,25 +83,25 @@ type subscription struct {
// EventSystem creates subscriptions, processes events and broadcasts them to the
// subscription which match the subscription criteria.
type EventSystem struct {
- mux *event.TypeMux
backend Backend
lightMode bool
lastHead *types.Header
// Subscriptions
- txsSub event.Subscription // Subscription for new transaction event
- logsSub event.Subscription // Subscription for new log event
- rmLogsSub event.Subscription // Subscription for removed log event
- chainSub event.Subscription // Subscription for new chain event
- pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
+ txsSub event.Subscription // Subscription for new transaction event
+ logsSub event.Subscription // Subscription for new log event
+ rmLogsSub event.Subscription // Subscription for removed log event
+ pendingLogsSub event.Subscription // Subscription for pending log event
+ chainSub event.Subscription // Subscription for new chain event
// Channels
- install chan *subscription // install filter for event notification
- uninstall chan *subscription // remove filter for event notification
- txsCh chan core.NewTxsEvent // Channel to receive new transactions event
- logsCh chan []*types.Log // Channel to receive new log event
- rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
- chainCh chan core.ChainEvent // Channel to receive new chain event
+ install chan *subscription // install filter for event notification
+ uninstall chan *subscription // remove filter for event notification
+ txsCh chan core.NewTxsEvent // Channel to receive new transactions event
+ logsCh chan []*types.Log // Channel to receive new log event
+ pendingLogsCh chan []*types.Log // Channel to receive new log event
+ rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
+ chainCh chan core.ChainEvent // Channel to receive new chain event
}
// NewEventSystem creates a new manager that listens for event on the given mux,
@@ -116,17 +110,17 @@ type EventSystem struct {
//
// The returned manager has a loop that needs to be stopped with the Stop function
// or by stopping the given mux.
-func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem {
+func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
m := &EventSystem{
- mux: mux,
- backend: backend,
- lightMode: lightMode,
- install: make(chan *subscription),
- uninstall: make(chan *subscription),
- txsCh: make(chan core.NewTxsEvent, txChanSize),
- logsCh: make(chan []*types.Log, logsChanSize),
- rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
- chainCh: make(chan core.ChainEvent, chainEvChanSize),
+ backend: backend,
+ lightMode: lightMode,
+ install: make(chan *subscription),
+ uninstall: make(chan *subscription),
+ txsCh: make(chan core.NewTxsEvent, txChanSize),
+ logsCh: make(chan []*types.Log, logsChanSize),
+ rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
+ pendingLogsCh: make(chan []*types.Log, logsChanSize),
+ chainCh: make(chan core.ChainEvent, chainEvChanSize),
}
// Subscribe events
@@ -134,12 +128,10 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
- // TODO(rjl493456442): use feed to subscribe pending log event
- m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
+ m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)
// Make sure none of the subscriptions are empty
- if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
- m.pendingLogSub.Closed() {
+ if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
log.Crit("Subscribe for event system failed")
}
@@ -316,58 +308,61 @@ func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscript
type filterIndex map[Type]map[rpc.ID]*subscription
-// broadcast event to filters that match criteria.
-func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
- if ev == nil {
+func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
+ if len(ev) == 0 {
return
}
+ for _, f := range filters[LogsSubscription] {
+ matchedLogs := filterLogs(ev, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
+ if len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
+ }
+}
- switch e := ev.(type) {
- case []*types.Log:
- if len(e) > 0 {
- for _, f := range filters[LogsSubscription] {
- if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
- }
+func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) {
+ if len(ev) == 0 {
+ return
+ }
+ for _, f := range filters[PendingLogsSubscription] {
+ matchedLogs := filterLogs(ev, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
+ if len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
- case core.RemovedLogsEvent:
- for _, f := range filters[LogsSubscription] {
- if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ }
+}
+
+func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLogsEvent) {
+ for _, f := range filters[LogsSubscription] {
+ matchedLogs := filterLogs(ev.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
+ if len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
- case *event.TypeMuxEvent:
- if muxe, ok := e.Data.(core.PendingLogsEvent); ok {
- for _, f := range filters[PendingLogsSubscription] {
- if e.Time.After(f.created) {
- if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ }
+}
+
+func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
+ hashes := make([]common.Hash, 0, len(ev.Txs))
+ for _, tx := range ev.Txs {
+ hashes = append(hashes, tx.Hash())
+ }
+ for _, f := range filters[PendingTransactionsSubscription] {
+ f.hashes <- hashes
+ }
+}
+
+func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) {
+ for _, f := range filters[BlocksSubscription] {
+ f.headers <- ev.Block.Header()
+ }
+ if es.lightMode && len(filters[LogsSubscription]) > 0 {
+ es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
+ for _, f := range filters[LogsSubscription] {
+ if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
- }
- case core.NewTxsEvent:
- hashes := make([]common.Hash, 0, len(e.Txs))
- for _, tx := range e.Txs {
- hashes = append(hashes, tx.Hash())
- }
- for _, f := range filters[PendingTransactionsSubscription] {
- f.hashes <- hashes
- }
- case core.ChainEvent:
- for _, f := range filters[BlocksSubscription] {
- f.headers <- e.Block.Header()
- }
- if es.lightMode && len(filters[LogsSubscription]) > 0 {
- es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
- for _, f := range filters[LogsSubscription] {
- if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
- }
- })
- }
+ })
}
}
@@ -448,10 +443,10 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
func (es *EventSystem) eventLoop() {
// Ensure all subscriptions get cleaned up
defer func() {
- es.pendingLogSub.Unsubscribe()
es.txsSub.Unsubscribe()
es.logsSub.Unsubscribe()
es.rmLogsSub.Unsubscribe()
+ es.pendingLogsSub.Unsubscribe()
es.chainSub.Unsubscribe()
}()
@@ -462,20 +457,16 @@ func (es *EventSystem) eventLoop() {
for {
select {
- // Handle subscribed events
case ev := <-es.txsCh:
- es.broadcast(index, ev)
+ es.handleTxsEvent(index, ev)
case ev := <-es.logsCh:
- es.broadcast(index, ev)
+ es.handleLogs(index, ev)
case ev := <-es.rmLogsCh:
- es.broadcast(index, ev)
+ es.handleRemovedLogs(index, ev)
+ case ev := <-es.pendingLogsCh:
+ es.handlePendingLogs(index, ev)
case ev := <-es.chainCh:
- es.broadcast(index, ev)
- case ev, active := <-es.pendingLogSub.Chan():
- if !active { // system stopped
- return
- }
- es.broadcast(index, ev)
+ es.handleChainEvent(index, ev)
case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
diff --git a/eth/gasprice/gasprice.go b/eth/gasprice/gasprice.go
index 23e49a6..14f50b1 100644
--- a/eth/gasprice/gasprice.go
+++ b/eth/gasprice/gasprice.go
@@ -23,123 +23,144 @@ import (
"sync"
"github.com/ava-labs/coreth/core/types"
- "github.com/ava-labs/coreth/internal/ethapi"
"github.com/ava-labs/coreth/params"
"github.com/ava-labs/coreth/rpc"
- "github.com/ava-labs/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/log"
)
-var maxPrice = big.NewInt(500 * params.GWei)
+const sampleNumber = 3 // Number of transactions sampled in a block
+
+var DefaultMaxPrice = big.NewInt(500 * params.GWei)
type Config struct {
Blocks int
Percentile int
Default *big.Int `toml:",omitempty"`
+ MaxPrice *big.Int `toml:",omitempty"`
+}
+
+// OracleBackend includes all necessary background APIs for oracle.
+type OracleBackend interface {
+ HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error)
+ BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error)
+ ChainConfig() *params.ChainConfig
}
// Oracle recommends gas prices based on the content of recent
// blocks. Suitable for both light and full clients.
type Oracle struct {
- backend ethapi.Backend
+ backend OracleBackend
lastHead common.Hash
lastPrice *big.Int
+ maxPrice *big.Int
cacheLock sync.RWMutex
fetchLock sync.Mutex
- checkBlocks, maxEmpty, maxBlocks int
- percentile int
+ checkBlocks int
+ percentile int
}
-// NewOracle returns a new oracle.
-func NewOracle(backend ethapi.Backend, params Config) *Oracle {
+// NewOracle returns a new gasprice oracle which can recommend suitable
+// gasprice for newly created transaction.
+func NewOracle(backend OracleBackend, params Config) *Oracle {
blocks := params.Blocks
if blocks < 1 {
blocks = 1
+ log.Warn("Sanitizing invalid gasprice oracle sample blocks", "provided", params.Blocks, "updated", blocks)
}
percent := params.Percentile
if percent < 0 {
percent = 0
+ log.Warn("Sanitizing invalid gasprice oracle sample percentile", "provided", params.Percentile, "updated", percent)
}
if percent > 100 {
percent = 100
+ log.Warn("Sanitizing invalid gasprice oracle sample percentile", "provided", params.Percentile, "updated", percent)
+ }
+ maxPrice := params.MaxPrice
+ if maxPrice == nil || maxPrice.Int64() <= 0 {
+ maxPrice = DefaultMaxPrice
+ log.Warn("Sanitizing invalid gasprice oracle price cap", "provided", params.MaxPrice, "updated", maxPrice)
}
return &Oracle{
backend: backend,
lastPrice: params.Default,
+ maxPrice: maxPrice,
checkBlocks: blocks,
- maxEmpty: blocks / 2,
- maxBlocks: blocks * 5,
percentile: percent,
}
}
-// SuggestPrice returns the recommended gas price.
+// SuggestPrice returns a gasprice so that newly created transaction can
+// have a very high chance to be included in the following blocks.
func (gpo *Oracle) SuggestPrice(ctx context.Context) (*big.Int, error) {
- gpo.cacheLock.RLock()
- lastHead := gpo.lastHead
- lastPrice := gpo.lastPrice
- gpo.cacheLock.RUnlock()
-
head, _ := gpo.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
headHash := head.Hash()
+
+ // If the latest gasprice is still available, return it.
+ gpo.cacheLock.RLock()
+ lastHead, lastPrice := gpo.lastHead, gpo.lastPrice
+ gpo.cacheLock.RUnlock()
if headHash == lastHead {
return lastPrice, nil
}
-
gpo.fetchLock.Lock()
defer gpo.fetchLock.Unlock()
- // try checking the cache again, maybe the last fetch fetched what we need
+ // Try checking the cache again, maybe the last fetch fetched what we need
gpo.cacheLock.RLock()
- lastHead = gpo.lastHead
- lastPrice = gpo.lastPrice
+ lastHead, lastPrice = gpo.lastHead, gpo.lastPrice
gpo.cacheLock.RUnlock()
if headHash == lastHead {
return lastPrice, nil
}
-
- blockNum := head.Number.Uint64()
- ch := make(chan getBlockPricesResult, gpo.checkBlocks)
- sent := 0
- exp := 0
- var blockPrices []*big.Int
- for sent < gpo.checkBlocks && blockNum > 0 {
- go gpo.getBlockPrices(ctx, types.MakeSigner(gpo.backend.ChainConfig(), big.NewInt(int64(blockNum))), blockNum, ch)
+ var (
+ sent, exp int
+ number = head.Number.Uint64()
+ result = make(chan getBlockPricesResult, gpo.checkBlocks)
+ quit = make(chan struct{})
+ txPrices []*big.Int
+ )
+ for sent < gpo.checkBlocks && number > 0 {
+ go gpo.getBlockPrices(ctx, types.MakeSigner(gpo.backend.ChainConfig(), big.NewInt(int64(number))), number, sampleNumber, result, quit)
sent++
exp++
- blockNum--
+ number--
}
- maxEmpty := gpo.maxEmpty
for exp > 0 {
- res := <-ch
+ res := <-result
if res.err != nil {
+ close(quit)
return lastPrice, res.err
}
exp--
- if res.price != nil {
- blockPrices = append(blockPrices, res.price)
- continue
- }
- if maxEmpty > 0 {
- maxEmpty--
- continue
+ // Nothing returned. There are two special cases here:
+ // - The block is empty
+ // - All the transactions included are sent by the miner itself.
+ // In these cases, use the latest calculated price for samping.
+ if len(res.prices) == 0 {
+ res.prices = []*big.Int{lastPrice}
}
- if blockNum > 0 && sent < gpo.maxBlocks {
- go gpo.getBlockPrices(ctx, types.MakeSigner(gpo.backend.ChainConfig(), big.NewInt(int64(blockNum))), blockNum, ch)
+ // Besides, in order to collect enough data for sampling, if nothing
+ // meaningful returned, try to query more blocks. But the maximum
+ // is 2*checkBlocks.
+ if len(res.prices) == 1 && len(txPrices)+1+exp < gpo.checkBlocks*2 && number > 0 {
+ go gpo.getBlockPrices(ctx, types.MakeSigner(gpo.backend.ChainConfig(), big.NewInt(int64(number))), number, sampleNumber, result, quit)
sent++
exp++
- blockNum--
+ number--
}
+ txPrices = append(txPrices, res.prices...)
}
price := lastPrice
- if len(blockPrices) > 0 {
- sort.Sort(bigIntArray(blockPrices))
- price = blockPrices[(len(blockPrices)-1)*gpo.percentile/100]
+ if len(txPrices) > 0 {
+ sort.Sort(bigIntArray(txPrices))
+ price = txPrices[(len(txPrices)-1)*gpo.percentile/100]
}
- if price.Cmp(maxPrice) > 0 {
- price = new(big.Int).Set(maxPrice)
+ if price.Cmp(gpo.maxPrice) > 0 {
+ price = new(big.Int).Set(gpo.maxPrice)
}
-
gpo.cacheLock.Lock()
gpo.lastHead = headHash
gpo.lastPrice = price
@@ -148,38 +169,48 @@ func (gpo *Oracle) SuggestPrice(ctx context.Context) (*big.Int, error) {
}
type getBlockPricesResult struct {
- price *big.Int
- err error
+ prices []*big.Int
+ err error
}
type transactionsByGasPrice []*types.Transaction
func (t transactionsByGasPrice) Len() int { return len(t) }
func (t transactionsByGasPrice) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
-func (t transactionsByGasPrice) Less(i, j int) bool { return t[i].GasPrice().Cmp(t[j].GasPrice()) < 0 }
+func (t transactionsByGasPrice) Less(i, j int) bool { return t[i].GasPriceCmp(t[j]) < 0 }
// getBlockPrices calculates the lowest transaction gas price in a given block
-// and sends it to the result channel. If the block is empty, price is nil.
-func (gpo *Oracle) getBlockPrices(ctx context.Context, signer types.Signer, blockNum uint64, ch chan getBlockPricesResult) {
+// and sends it to the result channel. If the block is empty or all transactions
+// are sent by the miner itself(it doesn't make any sense to include this kind of
+// transaction prices for sampling), nil gasprice is returned.
+func (gpo *Oracle) getBlockPrices(ctx context.Context, signer types.Signer, blockNum uint64, limit int, result chan getBlockPricesResult, quit chan struct{}) {
block, err := gpo.backend.BlockByNumber(ctx, rpc.BlockNumber(blockNum))
if block == nil {
- ch <- getBlockPricesResult{nil, err}
+ select {
+ case result <- getBlockPricesResult{nil, err}:
+ case <-quit:
+ }
return
}
-
blockTxs := block.Transactions()
txs := make([]*types.Transaction, len(blockTxs))
copy(txs, blockTxs)
sort.Sort(transactionsByGasPrice(txs))
+ var prices []*big.Int
for _, tx := range txs {
sender, err := types.Sender(signer, tx)
if err == nil && sender != block.Coinbase() {
- ch <- getBlockPricesResult{tx.GasPrice(), nil}
- return
+ prices = append(prices, tx.GasPrice())
+ if len(prices) >= limit {
+ break
+ }
}
}
- ch <- getBlockPricesResult{nil, nil}
+ select {
+ case result <- getBlockPricesResult{prices, nil}:
+ case <-quit:
+ }
}
type bigIntArray []*big.Int
diff --git a/eth/metrics.go b/eth/metrics.go
deleted file mode 100644
index 0729c44..0000000
--- a/eth/metrics.go
+++ /dev/null
@@ -1,139 +0,0 @@
-// Copyright 2015 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package eth
-
-import (
- "github.com/ava-labs/go-ethereum/metrics"
- "github.com/ava-labs/go-ethereum/p2p"
-)
-
-var (
- propTxnInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/txns/in/packets", nil)
- propTxnInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/txns/in/traffic", nil)
- propTxnOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/txns/out/packets", nil)
- propTxnOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/txns/out/traffic", nil)
- propHashInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/hashes/in/packets", nil)
- propHashInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/hashes/in/traffic", nil)
- propHashOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/hashes/out/packets", nil)
- propHashOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/hashes/out/traffic", nil)
- propBlockInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/blocks/in/packets", nil)
- propBlockInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/blocks/in/traffic", nil)
- propBlockOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/blocks/out/packets", nil)
- propBlockOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/blocks/out/traffic", nil)
- reqHeaderInPacketsMeter = metrics.NewRegisteredMeter("eth/req/headers/in/packets", nil)
- reqHeaderInTrafficMeter = metrics.NewRegisteredMeter("eth/req/headers/in/traffic", nil)
- reqHeaderOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/headers/out/packets", nil)
- reqHeaderOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/headers/out/traffic", nil)
- reqBodyInPacketsMeter = metrics.NewRegisteredMeter("eth/req/bodies/in/packets", nil)
- reqBodyInTrafficMeter = metrics.NewRegisteredMeter("eth/req/bodies/in/traffic", nil)
- reqBodyOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/bodies/out/packets", nil)
- reqBodyOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/bodies/out/traffic", nil)
- reqStateInPacketsMeter = metrics.NewRegisteredMeter("eth/req/states/in/packets", nil)
- reqStateInTrafficMeter = metrics.NewRegisteredMeter("eth/req/states/in/traffic", nil)
- reqStateOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/states/out/packets", nil)
- reqStateOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/states/out/traffic", nil)
- reqReceiptInPacketsMeter = metrics.NewRegisteredMeter("eth/req/receipts/in/packets", nil)
- reqReceiptInTrafficMeter = metrics.NewRegisteredMeter("eth/req/receipts/in/traffic", nil)
- reqReceiptOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/receipts/out/packets", nil)
- reqReceiptOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/receipts/out/traffic", nil)
- miscInPacketsMeter = metrics.NewRegisteredMeter("eth/misc/in/packets", nil)
- miscInTrafficMeter = metrics.NewRegisteredMeter("eth/misc/in/traffic", nil)
- miscOutPacketsMeter = metrics.NewRegisteredMeter("eth/misc/out/packets", nil)
- miscOutTrafficMeter = metrics.NewRegisteredMeter("eth/misc/out/traffic", nil)
-)
-
-// meteredMsgReadWriter is a wrapper around a p2p.MsgReadWriter, capable of
-// accumulating the above defined metrics based on the data stream contents.
-type meteredMsgReadWriter struct {
- p2p.MsgReadWriter // Wrapped message stream to meter
- version int // Protocol version to select correct meters
-}
-
-// newMeteredMsgWriter wraps a p2p MsgReadWriter with metering support. If the
-// metrics system is disabled, this function returns the original object.
-func newMeteredMsgWriter(rw p2p.MsgReadWriter) p2p.MsgReadWriter {
- if !metrics.Enabled {
- return rw
- }
- return &meteredMsgReadWriter{MsgReadWriter: rw}
-}
-
-// Init sets the protocol version used by the stream to know which meters to
-// increment in case of overlapping message ids between protocol versions.
-func (rw *meteredMsgReadWriter) Init(version int) {
- rw.version = version
-}
-
-func (rw *meteredMsgReadWriter) ReadMsg() (p2p.Msg, error) {
- // Read the message and short circuit in case of an error
- msg, err := rw.MsgReadWriter.ReadMsg()
- if err != nil {
- return msg, err
- }
- // Account for the data traffic
- packets, traffic := miscInPacketsMeter, miscInTrafficMeter
- switch {
- case msg.Code == BlockHeadersMsg:
- packets, traffic = reqHeaderInPacketsMeter, reqHeaderInTrafficMeter
- case msg.Code == BlockBodiesMsg:
- packets, traffic = reqBodyInPacketsMeter, reqBodyInTrafficMeter
-
- case rw.version >= eth63 && msg.Code == NodeDataMsg:
- packets, traffic = reqStateInPacketsMeter, reqStateInTrafficMeter
- case rw.version >= eth63 && msg.Code == ReceiptsMsg:
- packets, traffic = reqReceiptInPacketsMeter, reqReceiptInTrafficMeter
-
- case msg.Code == NewBlockHashesMsg:
- packets, traffic = propHashInPacketsMeter, propHashInTrafficMeter
- case msg.Code == NewBlockMsg:
- packets, traffic = propBlockInPacketsMeter, propBlockInTrafficMeter
- case msg.Code == TxMsg:
- packets, traffic = propTxnInPacketsMeter, propTxnInTrafficMeter
- }
- packets.Mark(1)
- traffic.Mark(int64(msg.Size))
-
- return msg, err
-}
-
-func (rw *meteredMsgReadWriter) WriteMsg(msg p2p.Msg) error {
- // Account for the data traffic
- packets, traffic := miscOutPacketsMeter, miscOutTrafficMeter
- switch {
- case msg.Code == BlockHeadersMsg:
- packets, traffic = reqHeaderOutPacketsMeter, reqHeaderOutTrafficMeter
- case msg.Code == BlockBodiesMsg:
- packets, traffic = reqBodyOutPacketsMeter, reqBodyOutTrafficMeter
-
- case rw.version >= eth63 && msg.Code == NodeDataMsg:
- packets, traffic = reqStateOutPacketsMeter, reqStateOutTrafficMeter
- case rw.version >= eth63 && msg.Code == ReceiptsMsg:
- packets, traffic = reqReceiptOutPacketsMeter, reqReceiptOutTrafficMeter
-
- case msg.Code == NewBlockHashesMsg:
- packets, traffic = propHashOutPacketsMeter, propHashOutTrafficMeter
- case msg.Code == NewBlockMsg:
- packets, traffic = propBlockOutPacketsMeter, propBlockOutTrafficMeter
- case msg.Code == TxMsg:
- packets, traffic = propTxnOutPacketsMeter, propTxnOutTrafficMeter
- }
- packets.Mark(1)
- traffic.Mark(int64(msg.Size))
-
- // Send the packet to the p2p layer
- return rw.MsgReadWriter.WriteMsg(msg)
-}
diff --git a/eth/protocol.go b/eth/protocol.go
index e205aad..94260c2 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -23,45 +23,51 @@ import (
"github.com/ava-labs/coreth/core"
"github.com/ava-labs/coreth/core/types"
- "github.com/ava-labs/go-ethereum/common"
- "github.com/ava-labs/go-ethereum/event"
- "github.com/ava-labs/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/rlp"
)
// Constants to match up protocol versions and messages
const (
- eth62 = 62
eth63 = 63
+ eth64 = 64
+ eth65 = 65
)
// protocolName is the official short name of the protocol used during capability negotiation.
const protocolName = "eth"
// ProtocolVersions are the supported versions of the eth protocol (first is primary).
-var ProtocolVersions = []uint{eth63}
+var ProtocolVersions = []uint{eth65, eth64, eth63}
// protocolLengths are the number of implemented message corresponding to different protocol versions.
-var protocolLengths = map[uint]uint64{eth63: 17, eth62: 8}
+var protocolLengths = map[uint]uint64{eth65: 17, eth64: 17, eth63: 17}
const protocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message
// eth protocol message codes
const (
- // Protocol messages belonging to eth/62
StatusMsg = 0x00
NewBlockHashesMsg = 0x01
- TxMsg = 0x02
+ TransactionMsg = 0x02
GetBlockHeadersMsg = 0x03
BlockHeadersMsg = 0x04
GetBlockBodiesMsg = 0x05
BlockBodiesMsg = 0x06
NewBlockMsg = 0x07
-
- // Protocol messages belonging to eth/63
- GetNodeDataMsg = 0x0d
- NodeDataMsg = 0x0e
- GetReceiptsMsg = 0x0f
- ReceiptsMsg = 0x10
+ GetNodeDataMsg = 0x0d
+