diff options
Diffstat (limited to 'ethstats/ethstats.go')
-rw-r--r-- | ethstats/ethstats.go | 406 |
1 files changed, 237 insertions, 169 deletions
diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index a2a6aa4..3dd06e3 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -28,19 +28,23 @@ import ( "runtime" "strconv" "strings" + "sync" "time" "github.com/ava-labs/coreth/consensus" "github.com/ava-labs/coreth/core" "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/eth" + "github.com/ava-labs/coreth/miner" + "github.com/ava-labs/coreth/node" "github.com/ava-labs/coreth/rpc" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/common/mclock" - "github.com/ava-labs/go-ethereum/event" - "github.com/ava-labs/go-ethereum/les" - "github.com/ava-labs/go-ethereum/log" - "github.com/ava-labs/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/les" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" "github.com/gorilla/websocket" ) @@ -56,23 +60,33 @@ const ( chainHeadChanSize = 10 ) -type txPool interface { - // SubscribeNewTxsEvent should return an event subscription of - // NewTxsEvent and send events to the given channel. - SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription +// backend encompasses the bare-minimum functionality needed for ethstats reporting +type backend interface { + SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription + SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription + CurrentHeader() *types.Header + HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error) + GetTd(ctx context.Context, hash common.Hash) *big.Int + Stats() (pending int, queued int) + Downloader() *downloader.Downloader } -type blockChain interface { - SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription +// fullNodeBackend encompasses the functionality necessary for a full node +// reporting to ethstats +type fullNodeBackend interface { + backend + Miner() *miner.Miner + BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error) + CurrentBlock() *types.Block + SuggestPrice(ctx context.Context) (*big.Int, error) } // Service implements an Ethereum netstats reporting daemon that pushes local // chain statistics up to a monitoring server. type Service struct { - server *p2p.Server // Peer-to-peer server to retrieve networking infos - eth *eth.Ethereum // Full Ethereum service if monitoring a full node - les *les.LightEthereum // Light Ethereum service if monitoring a light node - engine consensus.Engine // Consensus engine to retrieve variadic block fields + server *p2p.Server // Peer-to-peer server to retrieve networking infos + backend backend + engine consensus.Engine // Consensus engine to retrieve variadic block fields node string // Name of the node to display on the monitoring page pass string // Password to authorize access to the monitoring page @@ -80,53 +94,86 @@ type Service struct { pongCh chan struct{} // Pong notifications are fed into this channel histCh chan []uint64 // History request block numbers are fed into this channel + +} + +// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the +// websocket. +// +// From Gorilla websocket docs: +// Connections support one concurrent reader and one concurrent writer. +// Applications are responsible for ensuring that no more than one goroutine calls the write methods +// - NextWriter, SetWriteDeadline, WriteMessage, WriteJSON, EnableWriteCompression, SetCompressionLevel +// concurrently and that no more than one goroutine calls the read methods +// - NextReader, SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler +// concurrently. +// The Close and WriteControl methods can be called concurrently with all other methods. +type connWrapper struct { + conn *websocket.Conn + + rlock sync.Mutex + wlock sync.Mutex +} + +func newConnectionWrapper(conn *websocket.Conn) *connWrapper { + return &connWrapper{conn: conn} +} + +// WriteJSON wraps corresponding method on the websocket but is safe for concurrent calling +func (w *connWrapper) WriteJSON(v interface{}) error { + w.wlock.Lock() + defer w.wlock.Unlock() + + return w.conn.WriteJSON(v) +} + +// ReadJSON wraps corresponding method on the websocket but is safe for concurrent calling +func (w *connWrapper) ReadJSON(v interface{}) error { + w.rlock.Lock() + defer w.rlock.Unlock() + + return w.conn.ReadJSON(v) +} + +// Close wraps corresponding method on the websocket but is safe for concurrent calling +func (w *connWrapper) Close() error { + // The Close and WriteControl methods can be called concurrently with all other methods, + // so the mutex is not used here + return w.conn.Close() } // New returns a monitoring service ready for stats reporting. -func New(url string, ethServ *eth.Ethereum, lesServ *les.LightEthereum) (*Service, error) { +func New(node *node.Node, backend backend, engine consensus.Engine, url string) error { // Parse the netstats connection url re := regexp.MustCompile("([^:@]*)(:([^@]*))?@(.+)") parts := re.FindStringSubmatch(url) if len(parts) != 5 { - return nil, fmt.Errorf("invalid netstats url: \"%s\", should be nodename:secret@host:port", url) + return fmt.Errorf("invalid netstats url: \"%s\", should be nodename:secret@host:port", url) + } + ethstats := &Service{ + backend: backend, + engine: engine, + server: node.Server(), + node: parts[1], + pass: parts[3], + host: parts[4], + pongCh: make(chan struct{}), + histCh: make(chan []uint64, 1), } - // Assemble and return the stats service - var engine consensus.Engine - if ethServ != nil { - engine = ethServ.Engine() - } else { - engine = lesServ.Engine() - } - return &Service{ - eth: ethServ, - les: lesServ, - engine: engine, - node: parts[1], - pass: parts[3], - host: parts[4], - pongCh: make(chan struct{}), - histCh: make(chan []uint64, 1), - }, nil -} - -// Protocols implements node.Service, returning the P2P network protocols used -// by the stats service (nil as it doesn't use the devp2p overlay network). -func (s *Service) Protocols() []p2p.Protocol { return nil } -// APIs implements node.Service, returning the RPC API endpoints provided by the -// stats service (nil as it doesn't provide any user callable APIs). -func (s *Service) APIs() []rpc.API { return nil } + node.RegisterLifecycle(ethstats) + return nil +} -// Start implements node.Service, starting up the monitoring and reporting daemon. -func (s *Service) Start(server *p2p.Server) error { - s.server = server +// Start implements node.Lifecycle, starting up the monitoring and reporting daemon. +func (s *Service) Start() error { go s.loop() log.Info("Stats daemon started") return nil } -// Stop implements node.Service, terminating the monitoring and reporting daemon. +// Stop implements node.Lifecycle, terminating the monitoring and reporting daemon. func (s *Service) Stop() error { log.Info("Stats daemon stopped") return nil @@ -136,25 +183,15 @@ func (s *Service) Stop() error { // until termination. func (s *Service) loop() { // Subscribe to chain events to execute updates on - var blockchain blockChain - var txpool txPool - if s.eth != nil { - blockchain = s.eth.BlockChain() - txpool = s.eth.TxPool() - } else { - blockchain = s.les.BlockChain() - txpool = s.les.TxPool() - } - chainHeadCh := make(chan core.ChainHeadEvent, chainHeadChanSize) - headSub := blockchain.SubscribeChainHeadEvent(chainHeadCh) + headSub := s.backend.SubscribeChainHeadEvent(chainHeadCh) defer headSub.Unsubscribe() txEventCh := make(chan core.NewTxsEvent, txChanSize) - txSub := txpool.SubscribeNewTxsEvent(txEventCh) + txSub := s.backend.SubscribeNewTxsEvent(txEventCh) defer txSub.Unsubscribe() - // Start a goroutine that exhausts the subsciptions to avoid events piling up + // Start a goroutine that exhausts the subscriptions to avoid events piling up var ( quitCh = make(chan struct{}) headCh = make(chan *types.Block, 1) @@ -194,82 +231,99 @@ func (s *Service) loop() { } close(quitCh) }() + + // Resolve the URL, defaulting to TLS, but falling back to none too + path := fmt.Sprintf("%s/api", s.host) + urls := []string{path} + + // url.Parse and url.IsAbs is unsuitable (https://github.com/golang/go/issues/19779) + if !strings.Contains(path, "://") { + urls = []string{"wss://" + path, "ws://" + path} + } + + errTimer := time.NewTimer(0) + defer errTimer.Stop() // Loop reporting until termination for { - // Resolve the URL, defaulting to TLS, but falling back to none too - path := fmt.Sprintf("%s/api", s.host) - urls := []string{path} - - // url.Parse and url.IsAbs is unsuitable (https://github.com/golang/go/issues/19779) - if !strings.Contains(path, "://") { - urls = []string{"wss://" + path, "ws://" + path} - } - // Establish a websocket connection to the server on any supported URL - var ( - conn *websocket.Conn - err error - ) - dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second} - header := make(http.Header) - header.Set("origin", "http://localhost") - for _, url := range urls { - conn, _, err = dialer.Dial(url, header) - if err == nil { - break + select { + case <-quitCh: + return + case <-errTimer.C: + // Establish a websocket connection to the server on any supported URL + var ( + conn *connWrapper + err error + ) + dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second} + header := make(http.Header) + header.Set("origin", "http://localhost") + for _, url := range urls { + c, _, e := dialer.Dial(url, header) + err = e + if err == nil { + conn = newConnectionWrapper(c) + break + } } - } - if err != nil { - log.Warn("Stats server unreachable", "err", err) - time.Sleep(10 * time.Second) - continue - } - // Authenticate the client with the server - if err = s.login(conn); err != nil { - log.Warn("Stats login failed", "err", err) - conn.Close() - time.Sleep(10 * time.Second) - continue - } - go s.readLoop(conn) - - // Send the initial stats so our node looks decent from the get go - if err = s.report(conn); err != nil { - log.Warn("Initial stats report failed", "err", err) - conn.Close() - continue - } - // Keep sending status updates until the connection breaks - fullReport := time.NewTicker(15 * time.Second) + if err != nil { + log.Warn("Stats server unreachable", "err", err) + errTimer.Reset(10 * time.Second) + continue + } + // Authenticate the client with the server + if err = s.login(conn); err != nil { + log.Warn("Stats login failed", "err", err) + conn.Close() + errTimer.Reset(10 * time.Second) + continue + } + go s.readLoop(conn) - for err == nil { - select { - case <-quitCh: + // Send the initial stats so our node looks decent from the get go + if err = s.report(conn); err != nil { + log.Warn("Initial stats report failed", "err", err) conn.Close() - return + errTimer.Reset(0) + continue + } + // Keep sending status updates until the connection breaks + fullReport := time.NewTicker(15 * time.Second) - case <-fullReport.C: - if err = s.report(conn); err != nil { - log.Warn("Full stats report failed", "err", err) - } - case list := <-s.histCh: - if err = s.reportHistory(conn, list); err != nil { - log.Warn("Requested history report failed", "err", err) - } - case head := <-headCh: - if err = s.reportBlock(conn, head); err != nil { - log.Warn("Block stats report failed", "err", err) - } - if err = s.reportPending(conn); err != nil { - log.Warn("Post-block transaction stats report failed", "err", err) - } - case <-txCh: - if err = s.reportPending(conn); err != nil { - log.Warn("Transaction stats report failed", "err", err) + for err == nil { + select { + case <-quitCh: + fullReport.Stop() + // Make sure the connection is closed + conn.Close() + return + + case <-fullReport.C: + if err = s.report(conn); err != nil { + log.Warn("Full stats report failed", "err", err) + } + case list := <-s.histCh: + if err = s.reportHistory(conn, list); err != nil { + log.Warn("Requested history report failed", "err", err) + } + case head := <-headCh: + if err = s.reportBlock(conn, head); err != nil { + log.Warn("Block stats report failed", "err", err) + } + if err = s.reportPending(conn); err != nil { + log.Warn("Post-block transaction stats report failed", "err", err) + } + case <-txCh: + if err = s.reportPending(conn); err != nil { + log.Warn("Transaction stats report failed", "err", err) + } } } + fullReport.Stop() + + // Close the current connection and establish a new one + conn.Close() + errTimer.Reset(0) } - // Make sure the connection is closed - conn.Close() } } @@ -277,14 +331,29 @@ func (s *Service) loop() { // from the network socket. If any of them match an active request, it forwards // it, if they themselves are requests it initiates a reply, and lastly it drops // unknown packets. -func (s *Service) readLoop(conn *websocket.Conn) { +func (s *Service) readLoop(conn *connWrapper) { // If the read loop exists, close the connection defer conn.Close() for { // Retrieve the next generic network packet and bail out on error + var blob json.RawMessage + if err := conn.ReadJSON(&blob); err != nil { + log.Warn("Failed to retrieve stats server message", "err", err) + return + } + // If the network packet is a system ping, respond to it directly + var ping string + if err := json.Unmarshal(blob, &ping); err == nil && strings.HasPrefix(ping, "primus::ping::") { + if err := conn.WriteJSON(strings.Replace(ping, "ping", "pong", -1)); err != nil { + log.Warn("Failed to respond to system ping message", "err", err) + return + } + continue + } + // Not a system ping, try to decode an actual state message var msg map[string][]interface{} - if err := conn.ReadJSON(&msg); err != nil { + if err := json.Unmarshal(blob, &msg); err != nil { log.Warn("Failed to decode stats server message", "err", err) return } @@ -316,8 +385,11 @@ func (s *Service) readLoop(conn *websocket.Conn) { request, ok := msg["emit"][1].(map[string]interface{}) if !ok { log.Warn("Invalid stats history request", "msg", msg["emit"][1]) - s.histCh <- nil - continue // Ethstats sometime sends invalid history requests, ignore those + select { + case s.histCh <- nil: // Treat it as an no indexes request + default: + } + continue } list, ok := request["list"].([]interface{}) if !ok { @@ -345,7 +417,7 @@ func (s *Service) readLoop(conn *websocket.Conn) { } } -// nodeInfo is the collection of metainformation about a node that is displayed +// nodeInfo is the collection of meta information about a node that is displayed // on the monitoring page. type nodeInfo struct { Name string `json:"name"` @@ -368,7 +440,7 @@ type authMsg struct { } // login tries to authorize the client at the remote server. -func (s *Service) login(conn *websocket.Conn) error { +func (s *Service) login(conn *connWrapper) error { // Construct and send the login authentication infos := s.server.NodeInfo() @@ -413,7 +485,7 @@ func (s *Service) login(conn *websocket.Conn) error { // report collects all possible data to report and send it to the stats server. // This should only be used on reconnects or rarely to avoid overloading the // server. Use the individual methods for reporting subscribed events. -func (s *Service) report(conn *websocket.Conn) error { +func (s *Service) report(conn *connWrapper) error { if err := s.reportLatency(conn); err != nil { return err } @@ -431,7 +503,7 @@ func (s *Service) report(conn *websocket.Conn) error { // reportLatency sends a ping request to the server, measures the RTT time and // finally sends a latency update. -func (s *Service) reportLatency(conn *websocket.Conn) error { +func (s *Service) reportLatency(conn *connWrapper) error { // Send the current time to the ethstats server start := time.Now() @@ -500,7 +572,7 @@ func (s uncleStats) MarshalJSON() ([]byte, error) { } // reportBlock retrieves the current chain head and reports it to the stats server. -func (s *Service) reportBlock(conn *websocket.Conn, block *types.Block) error { +func (s *Service) reportBlock(conn *connWrapper, block *types.Block) error { // Gather the block details from the header or block chain details := s.assembleBlockStats(block) @@ -527,13 +599,15 @@ func (s *Service) assembleBlockStats(block *types.Block) *blockStats { txs []txStats uncles []*types.Header ) - if s.eth != nil { - // Full nodes have all needed information available + + // check if backend is a full node + fullBackend, ok := s.backend.(fullNodeBackend) + if ok { if block == nil { - block = s.eth.BlockChain().CurrentBlock() + block = fullBackend.CurrentBlock() } header = block.Header() - td = s.eth.BlockChain().GetTd(header.Hash(), header.Number.Uint64()) + td = fullBackend.GetTd(context.Background(), header.Hash()) txs = make([]txStats, len(block.Transactions())) for i, tx := range block.Transactions() { @@ -545,11 +619,12 @@ func (s *Service) assembleBlockStats(block *types.Block) *blockStats { if block != nil { header = block.Header() } else { - header = s.les.BlockChain().CurrentHeader() + header = s.backend.CurrentHeader() } - td = s.les.BlockChain().GetTd(header.Hash(), header.Number.Uint64()) + td = s.backend.GetTd(context.Background(), header.Hash()) txs = []txStats{} } + // Assemble and return the block stats author, _ := s.engine.Author(header) @@ -572,7 +647,7 @@ func (s *Service) assembleBlockStats(block *types.Block) *blockStats { // reportHistory retrieves the most recent batch of blocks and reports it to the // stats server. -func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error { +func (s *Service) reportHistory(conn *connWrapper, list []uint64) error { // Figure out the indexes that need reporting indexes := make([]uint64, 0, historyUpdateRange) if len(list) > 0 { @@ -580,12 +655,7 @@ func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error { indexes = append(indexes, list...) } else { // No indexes requested, send back the top ones - var head int64 - if s.eth != nil { - head = s.eth.BlockChain().CurrentHeader().Number.Int64() - } else { - head = s.les.BlockChain().CurrentHeader().Number.Int64() - } + head := s.backend.CurrentHeader().Number.Int64() start := head - historyUpdateRange + 1 if start < 0 { start = 0 @@ -597,12 +667,13 @@ func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error { // Gather the batch of blocks to report history := make([]*blockStats, len(indexes)) for i, number := range indexes { + fullBackend, ok := s.backend.(fullNodeBackend) // Retrieve the next block if it's known to us var block *types.Block - if s.eth != nil { - block = s.eth.BlockChain().GetBlockByNumber(number) + if ok { + block, _ = fullBackend.BlockByNumber(context.Background(), rpc.BlockNumber(number)) // TODO ignore error here ? } else { - if header := s.les.BlockChain().GetHeaderByNumber(number); header != nil { + if header, _ := s.backend.HeaderByNumber(context.Background(), rpc.BlockNumber(number)); header != nil { block = types.NewBlockWithHeader(header) } } @@ -638,14 +709,9 @@ type pendStats struct { // reportPending retrieves the current number of pending transactions and reports // it to the stats server. -func (s *Service) reportPending(conn *websocket.Conn) error { +func (s *Service) reportPending(conn *connWrapper) error { // Retrieve the pending count from the local blockchain - var pending int - if s.eth != nil { - pending, _ = s.eth.TxPool().Stats() - } else { - pending = s.les.TxPool().Stats() - } + pending, _ := s.backend.Stats() // Assemble the transaction stats and send it to the server log.Trace("Sending pending transactions to ethstats", "count", pending) @@ -672,9 +738,9 @@ type nodeStats struct { Uptime int `json:"uptime"` } -// reportPending retrieves various stats about the node at the networking and +// reportStats retrieves various stats about the node at the networking and // mining layer and reports it to the stats server. -func (s *Service) reportStats(conn *websocket.Conn) error { +func (s *Service) reportStats(conn *connWrapper) error { // Gather the syncing and mining infos from the local miner instance var ( mining bool @@ -682,18 +748,20 @@ func (s *Service) reportStats(conn *websocket.Conn) error { syncing bool gasprice int ) - if s.eth != nil { - mining = s.eth.Miner().Mining() - hashrate = int(s.eth.Miner().HashRate()) + // check if backend is a full node + fullBackend, ok := s.backend.(fullNodeBackend) + if ok { + mining = fullBackend.Miner().Mining() + hashrate = int(fullBackend.Miner().HashRate()) - sync := s.eth.Downloader().Progress() - syncing = s.eth.BlockChain().CurrentHeader().Number.Uint64() >= sync.HighestBlock + sync := fullBackend.Downloader().Progress() + syncing = fullBackend.CurrentHeader().Number.Uint64() >= sync.HighestBlock - price, _ := s.eth.APIBackend.SuggestPrice(context.Background()) + price, _ := fullBackend.SuggestPrice(context.Background()) gasprice = int(price.Uint64()) } else { - sync := s.les.Downloader().Progress() - syncing = s.les.BlockChain().CurrentHeader().Number.Uint64() >= sync.HighestBlock + sync := s.backend.Downloader().Progress() + syncing = s.backend.CurrentHeader().Number.Uint64() >= sync.HighestBlock } // Assemble the node stats and send it to the server log.Trace("Sending node details to ethstats") |