diff options
author | Aaron Buchwald <[email protected]> | 2020-12-14 16:22:22 -0500 |
---|---|---|
committer | Aaron Buchwald <[email protected]> | 2020-12-14 19:10:21 -0500 |
commit | de9c3bc629f7e712b3cd84b280db6a32b9bc7030 (patch) | |
tree | 4aabd136351727a9021408cd4ef458c2953f3a67 /ethstats/ethstats.go | |
parent | ddb43b6d824b5de77e4df9e9f551aa067be9b40e (diff) |
Fix linting and add to CI
Diffstat (limited to 'ethstats/ethstats.go')
-rw-r--r-- | ethstats/ethstats.go | 785 |
1 files changed, 0 insertions, 785 deletions
diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go deleted file mode 100644 index 3dd06e3..0000000 --- a/ethstats/ethstats.go +++ /dev/null @@ -1,785 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Package ethstats implements the network stats reporting service. -package ethstats - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "math/big" - "net/http" - "regexp" - "runtime" - "strconv" - "strings" - "sync" - "time" - - "github.com/ava-labs/coreth/consensus" - "github.com/ava-labs/coreth/core" - "github.com/ava-labs/coreth/core/types" - "github.com/ava-labs/coreth/eth" - "github.com/ava-labs/coreth/miner" - "github.com/ava-labs/coreth/node" - "github.com/ava-labs/coreth/rpc" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/mclock" - "github.com/ethereum/go-ethereum/eth/downloader" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/les" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/p2p" - "github.com/gorilla/websocket" -) - -const ( - // historyUpdateRange is the number of blocks a node should report upon login or - // history request. - historyUpdateRange = 50 - - // txChanSize is the size of channel listening to NewTxsEvent. - // The number is referenced from the size of tx pool. - txChanSize = 4096 - // chainHeadChanSize is the size of channel listening to ChainHeadEvent. - chainHeadChanSize = 10 -) - -// backend encompasses the bare-minimum functionality needed for ethstats reporting -type backend interface { - SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription - SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription - CurrentHeader() *types.Header - HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error) - GetTd(ctx context.Context, hash common.Hash) *big.Int - Stats() (pending int, queued int) - Downloader() *downloader.Downloader -} - -// fullNodeBackend encompasses the functionality necessary for a full node -// reporting to ethstats -type fullNodeBackend interface { - backend - Miner() *miner.Miner - BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error) - CurrentBlock() *types.Block - SuggestPrice(ctx context.Context) (*big.Int, error) -} - -// Service implements an Ethereum netstats reporting daemon that pushes local -// chain statistics up to a monitoring server. -type Service struct { - server *p2p.Server // Peer-to-peer server to retrieve networking infos - backend backend - engine consensus.Engine // Consensus engine to retrieve variadic block fields - - node string // Name of the node to display on the monitoring page - pass string // Password to authorize access to the monitoring page - host string // Remote address of the monitoring service - - pongCh chan struct{} // Pong notifications are fed into this channel - histCh chan []uint64 // History request block numbers are fed into this channel - -} - -// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the -// websocket. -// -// From Gorilla websocket docs: -// Connections support one concurrent reader and one concurrent writer. -// Applications are responsible for ensuring that no more than one goroutine calls the write methods -// - NextWriter, SetWriteDeadline, WriteMessage, WriteJSON, EnableWriteCompression, SetCompressionLevel -// concurrently and that no more than one goroutine calls the read methods -// - NextReader, SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler -// concurrently. -// The Close and WriteControl methods can be called concurrently with all other methods. -type connWrapper struct { - conn *websocket.Conn - - rlock sync.Mutex - wlock sync.Mutex -} - -func newConnectionWrapper(conn *websocket.Conn) *connWrapper { - return &connWrapper{conn: conn} -} - -// WriteJSON wraps corresponding method on the websocket but is safe for concurrent calling -func (w *connWrapper) WriteJSON(v interface{}) error { - w.wlock.Lock() - defer w.wlock.Unlock() - - return w.conn.WriteJSON(v) -} - -// ReadJSON wraps corresponding method on the websocket but is safe for concurrent calling -func (w *connWrapper) ReadJSON(v interface{}) error { - w.rlock.Lock() - defer w.rlock.Unlock() - - return w.conn.ReadJSON(v) -} - -// Close wraps corresponding method on the websocket but is safe for concurrent calling -func (w *connWrapper) Close() error { - // The Close and WriteControl methods can be called concurrently with all other methods, - // so the mutex is not used here - return w.conn.Close() -} - -// New returns a monitoring service ready for stats reporting. -func New(node *node.Node, backend backend, engine consensus.Engine, url string) error { - // Parse the netstats connection url - re := regexp.MustCompile("([^:@]*)(:([^@]*))?@(.+)") - parts := re.FindStringSubmatch(url) - if len(parts) != 5 { - return fmt.Errorf("invalid netstats url: \"%s\", should be nodename:secret@host:port", url) - } - ethstats := &Service{ - backend: backend, - engine: engine, - server: node.Server(), - node: parts[1], - pass: parts[3], - host: parts[4], - pongCh: make(chan struct{}), - histCh: make(chan []uint64, 1), - } - - node.RegisterLifecycle(ethstats) - return nil -} - -// Start implements node.Lifecycle, starting up the monitoring and reporting daemon. -func (s *Service) Start() error { - go s.loop() - - log.Info("Stats daemon started") - return nil -} - -// Stop implements node.Lifecycle, terminating the monitoring and reporting daemon. -func (s *Service) Stop() error { - log.Info("Stats daemon stopped") - return nil -} - -// loop keeps trying to connect to the netstats server, reporting chain events -// until termination. -func (s *Service) loop() { - // Subscribe to chain events to execute updates on - chainHeadCh := make(chan core.ChainHeadEvent, chainHeadChanSize) - headSub := s.backend.SubscribeChainHeadEvent(chainHeadCh) - defer headSub.Unsubscribe() - - txEventCh := make(chan core.NewTxsEvent, txChanSize) - txSub := s.backend.SubscribeNewTxsEvent(txEventCh) - defer txSub.Unsubscribe() - - // Start a goroutine that exhausts the subscriptions to avoid events piling up - var ( - quitCh = make(chan struct{}) - headCh = make(chan *types.Block, 1) - txCh = make(chan struct{}, 1) - ) - go func() { - var lastTx mclock.AbsTime - - HandleLoop: - for { - select { - // Notify of chain head events, but drop if too frequent - case head := <-chainHeadCh: - select { - case headCh <- head.Block: - default: - } - - // Notify of new transaction events, but drop if too frequent - case <-txEventCh: - if time.Duration(mclock.Now()-lastTx) < time.Second { - continue - } - lastTx = mclock.Now() - - select { - case txCh <- struct{}{}: - default: - } - - // node stopped - case <-txSub.Err(): - break HandleLoop - case <-headSub.Err(): - break HandleLoop - } - } - close(quitCh) - }() - - // Resolve the URL, defaulting to TLS, but falling back to none too - path := fmt.Sprintf("%s/api", s.host) - urls := []string{path} - - // url.Parse and url.IsAbs is unsuitable (https://github.com/golang/go/issues/19779) - if !strings.Contains(path, "://") { - urls = []string{"wss://" + path, "ws://" + path} - } - - errTimer := time.NewTimer(0) - defer errTimer.Stop() - // Loop reporting until termination - for { - select { - case <-quitCh: - return - case <-errTimer.C: - // Establish a websocket connection to the server on any supported URL - var ( - conn *connWrapper - err error - ) - dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second} - header := make(http.Header) - header.Set("origin", "http://localhost") - for _, url := range urls { - c, _, e := dialer.Dial(url, header) - err = e - if err == nil { - conn = newConnectionWrapper(c) - break - } - } - if err != nil { - log.Warn("Stats server unreachable", "err", err) - errTimer.Reset(10 * time.Second) - continue - } - // Authenticate the client with the server - if err = s.login(conn); err != nil { - log.Warn("Stats login failed", "err", err) - conn.Close() - errTimer.Reset(10 * time.Second) - continue - } - go s.readLoop(conn) - - // Send the initial stats so our node looks decent from the get go - if err = s.report(conn); err != nil { - log.Warn("Initial stats report failed", "err", err) - conn.Close() - errTimer.Reset(0) - continue - } - // Keep sending status updates until the connection breaks - fullReport := time.NewTicker(15 * time.Second) - - for err == nil { - select { - case <-quitCh: - fullReport.Stop() - // Make sure the connection is closed - conn.Close() - return - - case <-fullReport.C: - if err = s.report(conn); err != nil { - log.Warn("Full stats report failed", "err", err) - } - case list := <-s.histCh: - if err = s.reportHistory(conn, list); err != nil { - log.Warn("Requested history report failed", "err", err) - } - case head := <-headCh: - if err = s.reportBlock(conn, head); err != nil { - log.Warn("Block stats report failed", "err", err) - } - if err = s.reportPending(conn); err != nil { - log.Warn("Post-block transaction stats report failed", "err", err) - } - case <-txCh: - if err = s.reportPending(conn); err != nil { - log.Warn("Transaction stats report failed", "err", err) - } - } - } - fullReport.Stop() - - // Close the current connection and establish a new one - conn.Close() - errTimer.Reset(0) - } - } -} - -// readLoop loops as long as the connection is alive and retrieves data packets -// from the network socket. If any of them match an active request, it forwards -// it, if they themselves are requests it initiates a reply, and lastly it drops -// unknown packets. -func (s *Service) readLoop(conn *connWrapper) { - // If the read loop exists, close the connection - defer conn.Close() - - for { - // Retrieve the next generic network packet and bail out on error - var blob json.RawMessage - if err := conn.ReadJSON(&blob); err != nil { - log.Warn("Failed to retrieve stats server message", "err", err) - return - } - // If the network packet is a system ping, respond to it directly - var ping string - if err := json.Unmarshal(blob, &ping); err == nil && strings.HasPrefix(ping, "primus::ping::") { - if err := conn.WriteJSON(strings.Replace(ping, "ping", "pong", -1)); err != nil { - log.Warn("Failed to respond to system ping message", "err", err) - return - } - continue - } - // Not a system ping, try to decode an actual state message - var msg map[string][]interface{} - if err := json.Unmarshal(blob, &msg); err != nil { - log.Warn("Failed to decode stats server message", "err", err) - return - } - log.Trace("Received message from stats server", "msg", msg) - if len(msg["emit"]) == 0 { - log.Warn("Stats server sent non-broadcast", "msg", msg) - return - } - command, ok := msg["emit"][0].(string) - if !ok { - log.Warn("Invalid stats server message type", "type", msg["emit"][0]) - return - } - // If the message is a ping reply, deliver (someone must be listening!) - if len(msg["emit"]) == 2 && command == "node-pong" { - select { - case s.pongCh <- struct{}{}: - // Pong delivered, continue listening - continue - default: - // Ping routine dead, abort - log.Warn("Stats server pinger seems to have died") - return - } - } - // If the message is a history request, forward to the event processor - if len(msg["emit"]) == 2 && command == "history" { - // Make sure the request is valid and doesn't crash us - request, ok := msg["emit"][1].(map[string]interface{}) - if !ok { - log.Warn("Invalid stats history request", "msg", msg["emit"][1]) - select { - case s.histCh <- nil: // Treat it as an no indexes request - default: - } - continue - } - list, ok := request["list"].([]interface{}) - if !ok { - log.Warn("Invalid stats history block list", "list", request["list"]) - return - } - // Convert the block number list to an integer list - numbers := make([]uint64, len(list)) - for i, num := range list { - n, ok := num.(float64) - if !ok { - log.Warn("Invalid stats history block number", "number", num) - return - } - numbers[i] = uint64(n) - } - select { - case s.histCh <- numbers: - continue - default: - } - } - // Report anything else and continue - log.Info("Unknown stats message", "msg", msg) - } -} - -// nodeInfo is the collection of meta information about a node that is displayed -// on the monitoring page. -type nodeInfo struct { - Name string `json:"name"` - Node string `json:"node"` - Port int `json:"port"` - Network string `json:"net"` - Protocol string `json:"protocol"` - API string `json:"api"` - Os string `json:"os"` - OsVer string `json:"os_v"` - Client string `json:"client"` - History bool `json:"canUpdateHistory"` -} - -// authMsg is the authentication infos needed to login to a monitoring server. -type authMsg struct { - ID string `json:"id"` - Info nodeInfo `json:"info"` - Secret string `json:"secret"` -} - -// login tries to authorize the client at the remote server. -func (s *Service) login(conn *connWrapper) error { - // Construct and send the login authentication - infos := s.server.NodeInfo() - - var network, protocol string - if info := infos.Protocols["eth"]; info != nil { - network = fmt.Sprintf("%d", info.(*eth.NodeInfo).Network) - protocol = fmt.Sprintf("eth/%d", eth.ProtocolVersions[0]) - } else { - network = fmt.Sprintf("%d", infos.Protocols["les"].(*les.NodeInfo).Network) - protocol = fmt.Sprintf("les/%d", les.ClientProtocolVersions[0]) - } - auth := &authMsg{ - ID: s.node, - Info: nodeInfo{ - Name: s.node, - Node: infos.Name, - Port: infos.Ports.Listener, - Network: network, - Protocol: protocol, - API: "No", - Os: runtime.GOOS, - OsVer: runtime.GOARCH, - Client: "0.1.1", - History: true, - }, - Secret: s.pass, - } - login := map[string][]interface{}{ - "emit": {"hello", auth}, - } - if err := conn.WriteJSON(login); err != nil { - return err - } - // Retrieve the remote ack or connection termination - var ack map[string][]string - if err := conn.ReadJSON(&ack); err != nil || len(ack["emit"]) != 1 || ack["emit"][0] != "ready" { - return errors.New("unauthorized") - } - return nil -} - -// report collects all possible data to report and send it to the stats server. -// This should only be used on reconnects or rarely to avoid overloading the -// server. Use the individual methods for reporting subscribed events. -func (s *Service) report(conn *connWrapper) error { - if err := s.reportLatency(conn); err != nil { - return err - } - if err := s.reportBlock(conn, nil); err != nil { - return err - } - if err := s.reportPending(conn); err != nil { - return err - } - if err := s.reportStats(conn); err != nil { - return err - } - return nil -} - -// reportLatency sends a ping request to the server, measures the RTT time and -// finally sends a latency update. -func (s *Service) reportLatency(conn *connWrapper) error { - // Send the current time to the ethstats server - start := time.Now() - - ping := map[string][]interface{}{ - "emit": {"node-ping", map[string]string{ - "id": s.node, - "clientTime": start.String(), - }}, - } - if err := conn.WriteJSON(ping); err != nil { - return err - } - // Wait for the pong request to arrive back - select { - case <-s.pongCh: - // Pong delivered, report the latency - case <-time.After(5 * time.Second): - // Ping timeout, abort - return errors.New("ping timed out") - } - latency := strconv.Itoa(int((time.Since(start) / time.Duration(2)).Nanoseconds() / 1000000)) - - // Send back the measured latency - log.Trace("Sending measured latency to ethstats", "latency", latency) - - stats := map[string][]interface{}{ - "emit": {"latency", map[string]string{ - "id": s.node, - "latency": latency, - }}, - } - return conn.WriteJSON(stats) -} - -// blockStats is the information to report about individual blocks. -type blockStats struct { - Number *big.Int `json:"number"` - Hash common.Hash `json:"hash"` - ParentHash common.Hash `json:"parentHash"` - Timestamp *big.Int `json:"timestamp"` - Miner common.Address `json:"miner"` - GasUsed uint64 `json:"gasUsed"` - GasLimit uint64 `json:"gasLimit"` - Diff string `json:"difficulty"` - TotalDiff string `json:"totalDifficulty"` - Txs []txStats `json:"transactions"` - TxHash common.Hash `json:"transactionsRoot"` - Root common.Hash `json:"stateRoot"` - Uncles uncleStats `json:"uncles"` -} - -// txStats is the information to report about individual transactions. -type txStats struct { - Hash common.Hash `json:"hash"` -} - -// uncleStats is a custom wrapper around an uncle array to force serializing -// empty arrays instead of returning null for them. -type uncleStats []*types.Header - -func (s uncleStats) MarshalJSON() ([]byte, error) { - if uncles := ([]*types.Header)(s); len(uncles) > 0 { - return json.Marshal(uncles) - } - return []byte("[]"), nil -} - -// reportBlock retrieves the current chain head and reports it to the stats server. -func (s *Service) reportBlock(conn *connWrapper, block *types.Block) error { - // Gather the block details from the header or block chain - details := s.assembleBlockStats(block) - - // Assemble the block report and send it to the server - log.Trace("Sending new block to ethstats", "number", details.Number, "hash", details.Hash) - - stats := map[string]interface{}{ - "id": s.node, - "block": details, - } - report := map[string][]interface{}{ - "emit": {"block", stats}, - } - return conn.WriteJSON(report) -} - -// assembleBlockStats retrieves any required metadata to report a single block -// and assembles the block stats. If block is nil, the current head is processed. -func (s *Service) assembleBlockStats(block *types.Block) *blockStats { - // Gather the block infos from the local blockchain - var ( - header *types.Header - td *big.Int - txs []txStats - uncles []*types.Header - ) - - // check if backend is a full node - fullBackend, ok := s.backend.(fullNodeBackend) - if ok { - if block == nil { - block = fullBackend.CurrentBlock() - } - header = block.Header() - td = fullBackend.GetTd(context.Background(), header.Hash()) - - txs = make([]txStats, len(block.Transactions())) - for i, tx := range block.Transactions() { - txs[i].Hash = tx.Hash() - } - uncles = block.Uncles() - } else { - // Light nodes would need on-demand lookups for transactions/uncles, skip - if block != nil { - header = block.Header() - } else { - header = s.backend.CurrentHeader() - } - td = s.backend.GetTd(context.Background(), header.Hash()) - txs = []txStats{} - } - - // Assemble and return the block stats - author, _ := s.engine.Author(header) - - return &blockStats{ - Number: header.Number, - Hash: header.Hash(), - ParentHash: header.ParentHash, - Timestamp: new(big.Int).SetUint64(header.Time), - Miner: author, - GasUsed: header.GasUsed, - GasLimit: header.GasLimit, - Diff: header.Difficulty.String(), - TotalDiff: td.String(), - Txs: txs, - TxHash: header.TxHash, - Root: header.Root, - Uncles: uncles, - } -} - -// reportHistory retrieves the most recent batch of blocks and reports it to the -// stats server. -func (s *Service) reportHistory(conn *connWrapper, list []uint64) error { - // Figure out the indexes that need reporting - indexes := make([]uint64, 0, historyUpdateRange) - if len(list) > 0 { - // Specific indexes requested, send them back in particular - indexes = append(indexes, list...) - } else { - // No indexes requested, send back the top ones - head := s.backend.CurrentHeader().Number.Int64() - start := head - historyUpdateRange + 1 - if start < 0 { - start = 0 - } - for i := uint64(start); i <= uint64(head); i++ { - indexes = append(indexes, i) - } - } - // Gather the batch of blocks to report - history := make([]*blockStats, len(indexes)) - for i, number := range indexes { - fullBackend, ok := s.backend.(fullNodeBackend) - // Retrieve the next block if it's known to us - var block *types.Block - if ok { - block, _ = fullBackend.BlockByNumber(context.Background(), rpc.BlockNumber(number)) // TODO ignore error here ? - } else { - if header, _ := s.backend.HeaderByNumber(context.Background(), rpc.BlockNumber(number)); header != nil { - block = types.NewBlockWithHeader(header) - } - } - // If we do have the block, add to the history and continue - if block != nil { - history[len(history)-1-i] = s.assembleBlockStats(block) - continue - } - // Ran out of blocks, cut the report short and send - history = history[len(history)-i:] - break - } - // Assemble the history report and send it to the server - if len(history) > 0 { - log.Trace("Sending historical blocks to ethstats", "first", history[0].Number, "last", history[len(history)-1].Number) - } else { - log.Trace("No history to send to stats server") - } - stats := map[string]interface{}{ - "id": s.node, - "history": history, - } - report := map[string][]interface{}{ - "emit": {"history", stats}, - } - return conn.WriteJSON(report) -} - -// pendStats is the information to report about pending transactions. -type pendStats struct { - Pending int `json:"pending"` -} - -// reportPending retrieves the current number of pending transactions and reports -// it to the stats server. -func (s *Service) reportPending(conn *connWrapper) error { - // Retrieve the pending count from the local blockchain - pending, _ := s.backend.Stats() - // Assemble the transaction stats and send it to the server - log.Trace("Sending pending transactions to ethstats", "count", pending) - - stats := map[string]interface{}{ - "id": s.node, - "stats": &pendStats{ - Pending: pending, - }, - } - report := map[string][]interface{}{ - "emit": {"pending", stats}, - } - return conn.WriteJSON(report) -} - -// nodeStats is the information to report about the local node. -type nodeStats struct { - Active bool `json:"active"` - Syncing bool `json:"syncing"` - Mining bool `json:"mining"` - Hashrate int `json:"hashrate"` - Peers int `json:"peers"` - GasPrice int `json:"gasPrice"` - Uptime int `json:"uptime"` -} - -// reportStats retrieves various stats about the node at the networking and -// mining layer and reports it to the stats server. -func (s *Service) reportStats(conn *connWrapper) error { - // Gather the syncing and mining infos from the local miner instance - var ( - mining bool - hashrate int - syncing bool - gasprice int - ) - // check if backend is a full node - fullBackend, ok := s.backend.(fullNodeBackend) - if ok { - mining = fullBackend.Miner().Mining() - hashrate = int(fullBackend.Miner().HashRate()) - - sync := fullBackend.Downloader().Progress() - syncing = fullBackend.CurrentHeader().Number.Uint64() >= sync.HighestBlock - - price, _ := fullBackend.SuggestPrice(context.Background()) - gasprice = int(price.Uint64()) - } else { - sync := s.backend.Downloader().Progress() - syncing = s.backend.CurrentHeader().Number.Uint64() >= sync.HighestBlock - } - // Assemble the node stats and send it to the server - log.Trace("Sending node details to ethstats") - - stats := map[string]interface{}{ - "id": s.node, - "stats": &nodeStats{ - Active: true, - Mining: mining, - Hashrate: hashrate, - Peers: s.server.PeerCount(), - GasPrice: gasprice, - Syncing: syncing, - Uptime: 100, - }, - } - report := map[string][]interface{}{ - "emit": {"stats", stats}, - } - return conn.WriteJSON(report) -} |