aboutsummaryrefslogtreecommitdiff
path: root/ethstats/ethstats.go
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2020-09-15 23:55:34 -0400
committerDeterminant <[email protected]>2020-09-15 23:55:34 -0400
commit78745551c077bf54151202138c2629f288769561 (patch)
tree2b628e99fd110617089778fa91235ecd2888f4ef /ethstats/ethstats.go
parent7d1388c743b4ec8f4a86bea95bfada785dee83f7 (diff)
WIP: geth-tavum
Diffstat (limited to 'ethstats/ethstats.go')
-rw-r--r--ethstats/ethstats.go406
1 files changed, 237 insertions, 169 deletions
diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go
index a2a6aa4..3dd06e3 100644
--- a/ethstats/ethstats.go
+++ b/ethstats/ethstats.go
@@ -28,19 +28,23 @@ import (
"runtime"
"strconv"
"strings"
+ "sync"
"time"
"github.com/ava-labs/coreth/consensus"
"github.com/ava-labs/coreth/core"
"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/eth"
+ "github.com/ava-labs/coreth/miner"
+ "github.com/ava-labs/coreth/node"
"github.com/ava-labs/coreth/rpc"
- "github.com/ava-labs/go-ethereum/common"
- "github.com/ava-labs/go-ethereum/common/mclock"
- "github.com/ava-labs/go-ethereum/event"
- "github.com/ava-labs/go-ethereum/les"
- "github.com/ava-labs/go-ethereum/log"
- "github.com/ava-labs/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/les"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p"
"github.com/gorilla/websocket"
)
@@ -56,23 +60,33 @@ const (
chainHeadChanSize = 10
)
-type txPool interface {
- // SubscribeNewTxsEvent should return an event subscription of
- // NewTxsEvent and send events to the given channel.
- SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
+// backend encompasses the bare-minimum functionality needed for ethstats reporting
+type backend interface {
+ SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
+ SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription
+ CurrentHeader() *types.Header
+ HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error)
+ GetTd(ctx context.Context, hash common.Hash) *big.Int
+ Stats() (pending int, queued int)
+ Downloader() *downloader.Downloader
}
-type blockChain interface {
- SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
+// fullNodeBackend encompasses the functionality necessary for a full node
+// reporting to ethstats
+type fullNodeBackend interface {
+ backend
+ Miner() *miner.Miner
+ BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error)
+ CurrentBlock() *types.Block
+ SuggestPrice(ctx context.Context) (*big.Int, error)
}
// Service implements an Ethereum netstats reporting daemon that pushes local
// chain statistics up to a monitoring server.
type Service struct {
- server *p2p.Server // Peer-to-peer server to retrieve networking infos
- eth *eth.Ethereum // Full Ethereum service if monitoring a full node
- les *les.LightEthereum // Light Ethereum service if monitoring a light node
- engine consensus.Engine // Consensus engine to retrieve variadic block fields
+ server *p2p.Server // Peer-to-peer server to retrieve networking infos
+ backend backend
+ engine consensus.Engine // Consensus engine to retrieve variadic block fields
node string // Name of the node to display on the monitoring page
pass string // Password to authorize access to the monitoring page
@@ -80,53 +94,86 @@ type Service struct {
pongCh chan struct{} // Pong notifications are fed into this channel
histCh chan []uint64 // History request block numbers are fed into this channel
+
+}
+
+// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the
+// websocket.
+//
+// From Gorilla websocket docs:
+// Connections support one concurrent reader and one concurrent writer.
+// Applications are responsible for ensuring that no more than one goroutine calls the write methods
+// - NextWriter, SetWriteDeadline, WriteMessage, WriteJSON, EnableWriteCompression, SetCompressionLevel
+// concurrently and that no more than one goroutine calls the read methods
+// - NextReader, SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler
+// concurrently.
+// The Close and WriteControl methods can be called concurrently with all other methods.
+type connWrapper struct {
+ conn *websocket.Conn
+
+ rlock sync.Mutex
+ wlock sync.Mutex
+}
+
+func newConnectionWrapper(conn *websocket.Conn) *connWrapper {
+ return &connWrapper{conn: conn}
+}
+
+// WriteJSON wraps corresponding method on the websocket but is safe for concurrent calling
+func (w *connWrapper) WriteJSON(v interface{}) error {
+ w.wlock.Lock()
+ defer w.wlock.Unlock()
+
+ return w.conn.WriteJSON(v)
+}
+
+// ReadJSON wraps corresponding method on the websocket but is safe for concurrent calling
+func (w *connWrapper) ReadJSON(v interface{}) error {
+ w.rlock.Lock()
+ defer w.rlock.Unlock()
+
+ return w.conn.ReadJSON(v)
+}
+
+// Close wraps corresponding method on the websocket but is safe for concurrent calling
+func (w *connWrapper) Close() error {
+ // The Close and WriteControl methods can be called concurrently with all other methods,
+ // so the mutex is not used here
+ return w.conn.Close()
}
// New returns a monitoring service ready for stats reporting.
-func New(url string, ethServ *eth.Ethereum, lesServ *les.LightEthereum) (*Service, error) {
+func New(node *node.Node, backend backend, engine consensus.Engine, url string) error {
// Parse the netstats connection url
re := regexp.MustCompile("([^:@]*)(:([^@]*))?@(.+)")
parts := re.FindStringSubmatch(url)
if len(parts) != 5 {
- return nil, fmt.Errorf("invalid netstats url: \"%s\", should be nodename:secret@host:port", url)
+ return fmt.Errorf("invalid netstats url: \"%s\", should be nodename:secret@host:port", url)
+ }
+ ethstats := &Service{
+ backend: backend,
+ engine: engine,
+ server: node.Server(),
+ node: parts[1],
+ pass: parts[3],
+ host: parts[4],
+ pongCh: make(chan struct{}),
+ histCh: make(chan []uint64, 1),
}
- // Assemble and return the stats service
- var engine consensus.Engine
- if ethServ != nil {
- engine = ethServ.Engine()
- } else {
- engine = lesServ.Engine()
- }
- return &Service{
- eth: ethServ,
- les: lesServ,
- engine: engine,
- node: parts[1],
- pass: parts[3],
- host: parts[4],
- pongCh: make(chan struct{}),
- histCh: make(chan []uint64, 1),
- }, nil
-}
-
-// Protocols implements node.Service, returning the P2P network protocols used
-// by the stats service (nil as it doesn't use the devp2p overlay network).
-func (s *Service) Protocols() []p2p.Protocol { return nil }
-// APIs implements node.Service, returning the RPC API endpoints provided by the
-// stats service (nil as it doesn't provide any user callable APIs).
-func (s *Service) APIs() []rpc.API { return nil }
+ node.RegisterLifecycle(ethstats)
+ return nil
+}
-// Start implements node.Service, starting up the monitoring and reporting daemon.
-func (s *Service) Start(server *p2p.Server) error {
- s.server = server
+// Start implements node.Lifecycle, starting up the monitoring and reporting daemon.
+func (s *Service) Start() error {
go s.loop()
log.Info("Stats daemon started")
return nil
}
-// Stop implements node.Service, terminating the monitoring and reporting daemon.
+// Stop implements node.Lifecycle, terminating the monitoring and reporting daemon.
func (s *Service) Stop() error {
log.Info("Stats daemon stopped")
return nil
@@ -136,25 +183,15 @@ func (s *Service) Stop() error {
// until termination.
func (s *Service) loop() {
// Subscribe to chain events to execute updates on
- var blockchain blockChain
- var txpool txPool
- if s.eth != nil {
- blockchain = s.eth.BlockChain()
- txpool = s.eth.TxPool()
- } else {
- blockchain = s.les.BlockChain()
- txpool = s.les.TxPool()
- }
-
chainHeadCh := make(chan core.ChainHeadEvent, chainHeadChanSize)
- headSub := blockchain.SubscribeChainHeadEvent(chainHeadCh)
+ headSub := s.backend.SubscribeChainHeadEvent(chainHeadCh)
defer headSub.Unsubscribe()
txEventCh := make(chan core.NewTxsEvent, txChanSize)
- txSub := txpool.SubscribeNewTxsEvent(txEventCh)
+ txSub := s.backend.SubscribeNewTxsEvent(txEventCh)
defer txSub.Unsubscribe()
- // Start a goroutine that exhausts the subsciptions to avoid events piling up
+ // Start a goroutine that exhausts the subscriptions to avoid events piling up
var (
quitCh = make(chan struct{})
headCh = make(chan *types.Block, 1)
@@ -194,82 +231,99 @@ func (s *Service) loop() {
}
close(quitCh)
}()
+
+ // Resolve the URL, defaulting to TLS, but falling back to none too
+ path := fmt.Sprintf("%s/api", s.host)
+ urls := []string{path}
+
+ // url.Parse and url.IsAbs is unsuitable (https://github.com/golang/go/issues/19779)
+ if !strings.Contains(path, "://") {
+ urls = []string{"wss://" + path, "ws://" + path}
+ }
+
+ errTimer := time.NewTimer(0)
+ defer errTimer.Stop()
// Loop reporting until termination
for {
- // Resolve the URL, defaulting to TLS, but falling back to none too
- path := fmt.Sprintf("%s/api", s.host)
- urls := []string{path}
-
- // url.Parse and url.IsAbs is unsuitable (https://github.com/golang/go/issues/19779)
- if !strings.Contains(path, "://") {
- urls = []string{"wss://" + path, "ws://" + path}
- }
- // Establish a websocket connection to the server on any supported URL
- var (
- conn *websocket.Conn
- err error
- )
- dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second}
- header := make(http.Header)
- header.Set("origin", "http://localhost")
- for _, url := range urls {
- conn, _, err = dialer.Dial(url, header)
- if err == nil {
- break
+ select {
+ case <-quitCh:
+ return
+ case <-errTimer.C:
+ // Establish a websocket connection to the server on any supported URL
+ var (
+ conn *connWrapper
+ err error
+ )
+ dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second}
+ header := make(http.Header)
+ header.Set("origin", "http://localhost")
+ for _, url := range urls {
+ c, _, e := dialer.Dial(url, header)
+ err = e
+ if err == nil {
+ conn = newConnectionWrapper(c)
+ break
+ }
}
- }
- if err != nil {
- log.Warn("Stats server unreachable", "err", err)
- time.Sleep(10 * time.Second)
- continue
- }
- // Authenticate the client with the server
- if err = s.login(conn); err != nil {
- log.Warn("Stats login failed", "err", err)
- conn.Close()
- time.Sleep(10 * time.Second)
- continue
- }
- go s.readLoop(conn)
-
- // Send the initial stats so our node looks decent from the get go
- if err = s.report(conn); err != nil {
- log.Warn("Initial stats report failed", "err", err)
- conn.Close()
- continue
- }
- // Keep sending status updates until the connection breaks
- fullReport := time.NewTicker(15 * time.Second)
+ if err != nil {
+ log.Warn("Stats server unreachable", "err", err)
+ errTimer.Reset(10 * time.Second)
+ continue
+ }
+ // Authenticate the client with the server
+ if err = s.login(conn); err != nil {
+ log.Warn("Stats login failed", "err", err)
+ conn.Close()
+ errTimer.Reset(10 * time.Second)
+ continue
+ }
+ go s.readLoop(conn)
- for err == nil {
- select {
- case <-quitCh:
+ // Send the initial stats so our node looks decent from the get go
+ if err = s.report(conn); err != nil {
+ log.Warn("Initial stats report failed", "err", err)
conn.Close()
- return
+ errTimer.Reset(0)
+ continue
+ }
+ // Keep sending status updates until the connection breaks
+ fullReport := time.NewTicker(15 * time.Second)
- case <-fullReport.C:
- if err = s.report(conn); err != nil {
- log.Warn("Full stats report failed", "err", err)
- }
- case list := <-s.histCh:
- if err = s.reportHistory(conn, list); err != nil {
- log.Warn("Requested history report failed", "err", err)
- }
- case head := <-headCh:
- if err = s.reportBlock(conn, head); err != nil {
- log.Warn("Block stats report failed", "err", err)
- }
- if err = s.reportPending(conn); err != nil {
- log.Warn("Post-block transaction stats report failed", "err", err)
- }
- case <-txCh:
- if err = s.reportPending(conn); err != nil {
- log.Warn("Transaction stats report failed", "err", err)
+ for err == nil {
+ select {
+ case <-quitCh:
+ fullReport.Stop()
+ // Make sure the connection is closed
+ conn.Close()
+ return
+
+ case <-fullReport.C:
+ if err = s.report(conn); err != nil {
+ log.Warn("Full stats report failed", "err", err)
+ }
+ case list := <-s.histCh:
+ if err = s.reportHistory(conn, list); err != nil {
+ log.Warn("Requested history report failed", "err", err)
+ }
+ case head := <-headCh:
+ if err = s.reportBlock(conn, head); err != nil {
+ log.Warn("Block stats report failed", "err", err)
+ }
+ if err = s.reportPending(conn); err != nil {
+ log.Warn("Post-block transaction stats report failed", "err", err)
+ }
+ case <-txCh:
+ if err = s.reportPending(conn); err != nil {
+ log.Warn("Transaction stats report failed", "err", err)
+ }
}
}
+ fullReport.Stop()
+
+ // Close the current connection and establish a new one
+ conn.Close()
+ errTimer.Reset(0)
}
- // Make sure the connection is closed
- conn.Close()
}
}
@@ -277,14 +331,29 @@ func (s *Service) loop() {
// from the network socket. If any of them match an active request, it forwards
// it, if they themselves are requests it initiates a reply, and lastly it drops
// unknown packets.
-func (s *Service) readLoop(conn *websocket.Conn) {
+func (s *Service) readLoop(conn *connWrapper) {
// If the read loop exists, close the connection
defer conn.Close()
for {
// Retrieve the next generic network packet and bail out on error
+ var blob json.RawMessage
+ if err := conn.ReadJSON(&blob); err != nil {
+ log.Warn("Failed to retrieve stats server message", "err", err)
+ return
+ }
+ // If the network packet is a system ping, respond to it directly
+ var ping string
+ if err := json.Unmarshal(blob, &ping); err == nil && strings.HasPrefix(ping, "primus::ping::") {
+ if err := conn.WriteJSON(strings.Replace(ping, "ping", "pong", -1)); err != nil {
+ log.Warn("Failed to respond to system ping message", "err", err)
+ return
+ }
+ continue
+ }
+ // Not a system ping, try to decode an actual state message
var msg map[string][]interface{}
- if err := conn.ReadJSON(&msg); err != nil {
+ if err := json.Unmarshal(blob, &msg); err != nil {
log.Warn("Failed to decode stats server message", "err", err)
return
}
@@ -316,8 +385,11 @@ func (s *Service) readLoop(conn *websocket.Conn) {
request, ok := msg["emit"][1].(map[string]interface{})
if !ok {
log.Warn("Invalid stats history request", "msg", msg["emit"][1])
- s.histCh <- nil
- continue // Ethstats sometime sends invalid history requests, ignore those
+ select {
+ case s.histCh <- nil: // Treat it as an no indexes request
+ default:
+ }
+ continue
}
list, ok := request["list"].([]interface{})
if !ok {
@@ -345,7 +417,7 @@ func (s *Service) readLoop(conn *websocket.Conn) {
}
}
-// nodeInfo is the collection of metainformation about a node that is displayed
+// nodeInfo is the collection of meta information about a node that is displayed
// on the monitoring page.
type nodeInfo struct {
Name string `json:"name"`
@@ -368,7 +440,7 @@ type authMsg struct {
}
// login tries to authorize the client at the remote server.
-func (s *Service) login(conn *websocket.Conn) error {
+func (s *Service) login(conn *connWrapper) error {
// Construct and send the login authentication
infos := s.server.NodeInfo()
@@ -413,7 +485,7 @@ func (s *Service) login(conn *websocket.Conn) error {
// report collects all possible data to report and send it to the stats server.
// This should only be used on reconnects or rarely to avoid overloading the
// server. Use the individual methods for reporting subscribed events.
-func (s *Service) report(conn *websocket.Conn) error {
+func (s *Service) report(conn *connWrapper) error {
if err := s.reportLatency(conn); err != nil {
return err
}
@@ -431,7 +503,7 @@ func (s *Service) report(conn *websocket.Conn) error {
// reportLatency sends a ping request to the server, measures the RTT time and
// finally sends a latency update.
-func (s *Service) reportLatency(conn *websocket.Conn) error {
+func (s *Service) reportLatency(conn *connWrapper) error {
// Send the current time to the ethstats server
start := time.Now()
@@ -500,7 +572,7 @@ func (s uncleStats) MarshalJSON() ([]byte, error) {
}
// reportBlock retrieves the current chain head and reports it to the stats server.
-func (s *Service) reportBlock(conn *websocket.Conn, block *types.Block) error {
+func (s *Service) reportBlock(conn *connWrapper, block *types.Block) error {
// Gather the block details from the header or block chain
details := s.assembleBlockStats(block)
@@ -527,13 +599,15 @@ func (s *Service) assembleBlockStats(block *types.Block) *blockStats {
txs []txStats
uncles []*types.Header
)
- if s.eth != nil {
- // Full nodes have all needed information available
+
+ // check if backend is a full node
+ fullBackend, ok := s.backend.(fullNodeBackend)
+ if ok {
if block == nil {
- block = s.eth.BlockChain().CurrentBlock()
+ block = fullBackend.CurrentBlock()
}
header = block.Header()
- td = s.eth.BlockChain().GetTd(header.Hash(), header.Number.Uint64())
+ td = fullBackend.GetTd(context.Background(), header.Hash())
txs = make([]txStats, len(block.Transactions()))
for i, tx := range block.Transactions() {
@@ -545,11 +619,12 @@ func (s *Service) assembleBlockStats(block *types.Block) *blockStats {
if block != nil {
header = block.Header()
} else {
- header = s.les.BlockChain().CurrentHeader()
+ header = s.backend.CurrentHeader()
}
- td = s.les.BlockChain().GetTd(header.Hash(), header.Number.Uint64())
+ td = s.backend.GetTd(context.Background(), header.Hash())
txs = []txStats{}
}
+
// Assemble and return the block stats
author, _ := s.engine.Author(header)
@@ -572,7 +647,7 @@ func (s *Service) assembleBlockStats(block *types.Block) *blockStats {
// reportHistory retrieves the most recent batch of blocks and reports it to the
// stats server.
-func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error {
+func (s *Service) reportHistory(conn *connWrapper, list []uint64) error {
// Figure out the indexes that need reporting
indexes := make([]uint64, 0, historyUpdateRange)
if len(list) > 0 {
@@ -580,12 +655,7 @@ func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error {
indexes = append(indexes, list...)
} else {
// No indexes requested, send back the top ones
- var head int64
- if s.eth != nil {
- head = s.eth.BlockChain().CurrentHeader().Number.Int64()
- } else {
- head = s.les.BlockChain().CurrentHeader().Number.Int64()
- }
+ head := s.backend.CurrentHeader().Number.Int64()
start := head - historyUpdateRange + 1
if start < 0 {
start = 0
@@ -597,12 +667,13 @@ func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error {
// Gather the batch of blocks to report
history := make([]*blockStats, len(indexes))
for i, number := range indexes {
+ fullBackend, ok := s.backend.(fullNodeBackend)
// Retrieve the next block if it's known to us
var block *types.Block
- if s.eth != nil {
- block = s.eth.BlockChain().GetBlockByNumber(number)
+ if ok {
+ block, _ = fullBackend.BlockByNumber(context.Background(), rpc.BlockNumber(number)) // TODO ignore error here ?
} else {
- if header := s.les.BlockChain().GetHeaderByNumber(number); header != nil {
+ if header, _ := s.backend.HeaderByNumber(context.Background(), rpc.BlockNumber(number)); header != nil {
block = types.NewBlockWithHeader(header)
}
}
@@ -638,14 +709,9 @@ type pendStats struct {
// reportPending retrieves the current number of pending transactions and reports
// it to the stats server.
-func (s *Service) reportPending(conn *websocket.Conn) error {
+func (s *Service) reportPending(conn *connWrapper) error {
// Retrieve the pending count from the local blockchain
- var pending int
- if s.eth != nil {
- pending, _ = s.eth.TxPool().Stats()
- } else {
- pending = s.les.TxPool().Stats()
- }
+ pending, _ := s.backend.Stats()
// Assemble the transaction stats and send it to the server
log.Trace("Sending pending transactions to ethstats", "count", pending)
@@ -672,9 +738,9 @@ type nodeStats struct {
Uptime int `json:"uptime"`
}
-// reportPending retrieves various stats about the node at the networking and
+// reportStats retrieves various stats about the node at the networking and
// mining layer and reports it to the stats server.
-func (s *Service) reportStats(conn *websocket.Conn) error {
+func (s *Service) reportStats(conn *connWrapper) error {
// Gather the syncing and mining infos from the local miner instance
var (
mining bool
@@ -682,18 +748,20 @@ func (s *Service) reportStats(conn *websocket.Conn) error {
syncing bool
gasprice int
)
- if s.eth != nil {
- mining = s.eth.Miner().Mining()
- hashrate = int(s.eth.Miner().HashRate())
+ // check if backend is a full node
+ fullBackend, ok := s.backend.(fullNodeBackend)
+ if ok {
+ mining = fullBackend.Miner().Mining()
+ hashrate = int(fullBackend.Miner().HashRate())
- sync := s.eth.Downloader().Progress()
- syncing = s.eth.BlockChain().CurrentHeader().Number.Uint64() >= sync.HighestBlock
+ sync := fullBackend.Downloader().Progress()
+ syncing = fullBackend.CurrentHeader().Number.Uint64() >= sync.HighestBlock
- price, _ := s.eth.APIBackend.SuggestPrice(context.Background())
+ price, _ := fullBackend.SuggestPrice(context.Background())
gasprice = int(price.Uint64())
} else {
- sync := s.les.Downloader().Progress()
- syncing = s.les.BlockChain().CurrentHeader().Number.Uint64() >= sync.HighestBlock
+ sync := s.backend.Downloader().Progress()
+ syncing = s.backend.CurrentHeader().Number.Uint64() >= sync.HighestBlock
}
// Assemble the node stats and send it to the server
log.Trace("Sending node details to ethstats")