diff options
Diffstat (limited to 'eth/handler.go')
-rw-r--r-- | eth/handler.go | 844 |
1 files changed, 844 insertions, 0 deletions
diff --git a/eth/handler.go b/eth/handler.go new file mode 100644 index 0000000..4ce2d1c --- /dev/null +++ b/eth/handler.go @@ -0,0 +1,844 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "encoding/json" + "errors" + "fmt" + "math" + "math/big" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/eth/fetcher" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" +) + +const ( + softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data. + estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header + + // txChanSize is the size of channel listening to NewTxsEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 + + // minimim number of peers to broadcast new blocks to + minBroadcastPeers = 4 +) + +var ( + syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge +) + +func errResp(code errCode, format string, v ...interface{}) error { + return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) +} + +type ProtocolManager struct { + networkID uint64 + + fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks) + acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing) + + checkpointNumber uint64 // Block number for the sync progress validator to cross reference + checkpointHash common.Hash // Block hash for the sync progress validator to cross reference + + txpool txPool + blockchain *core.BlockChain + maxPeers int + + downloader *downloader.Downloader + fetcher *fetcher.Fetcher + peers *peerSet + + eventMux *event.TypeMux + txsCh chan core.NewTxsEvent + txsSub event.Subscription + minedBlockSub *event.TypeMuxSubscription + + whitelist map[uint64]common.Hash + + // channels for fetcher, syncer, txsyncLoop + newPeerCh chan *peer + txsyncCh chan *txsync + quitSync chan struct{} + noMorePeers chan struct{} + + // wait group is used for graceful shutdowns during downloading + // and processing + wg sync.WaitGroup +} + +// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable +// with the Ethereum network. +func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) { + // Create the protocol manager with the base fields + manager := &ProtocolManager{ + networkID: networkID, + eventMux: mux, + txpool: txpool, + blockchain: blockchain, + peers: newPeerSet(), + whitelist: whitelist, + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), + } + if mode == downloader.FullSync { + // The database seems empty as the current block is the genesis. Yet the fast + // block is ahead, so fast sync was enabled for this node at a certain point. + // The scenarios where this can happen is + // * if the user manually (or via a bad block) rolled back a fast sync node + // below the sync point. + // * the last fast sync is not finished while user specifies a full sync this + // time. But we don't have any recent state for full sync. + // In these cases however it's safe to reenable fast sync. + fullBlock, fastBlock := blockchain.CurrentBlock(), blockchain.CurrentFastBlock() + if fullBlock.NumberU64() == 0 && fastBlock.NumberU64() > 0 { + manager.fastSync = uint32(1) + log.Warn("Switch sync mode from full sync to fast sync") + } + } else { + if blockchain.CurrentBlock().NumberU64() > 0 { + // Print warning log if database is not empty to run fast sync. + log.Warn("Switch sync mode from fast sync to full sync") + } else { + // If fast sync was requested and our database is empty, grant it + manager.fastSync = uint32(1) + } + } + // If we have trusted checkpoints, enforce them on the chain + if checkpoint != nil { + manager.checkpointNumber = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1 + manager.checkpointHash = checkpoint.SectionHead + } + + // Construct the downloader (long sync) and its backing state bloom if fast + // sync is requested. The downloader is responsible for deallocating the state + // bloom when it's done. + var stateBloom *trie.SyncBloom + if atomic.LoadUint32(&manager.fastSync) == 1 { + stateBloom = trie.NewSyncBloom(uint64(cacheLimit), chaindb) + } + manager.downloader = downloader.New(manager.checkpointNumber, chaindb, stateBloom, manager.eventMux, blockchain, nil, manager.removePeer) + + // Construct the fetcher (short sync) + validator := func(header *types.Header) error { + return engine.VerifyHeader(blockchain, header, true) + } + heighter := func() uint64 { + return blockchain.CurrentBlock().NumberU64() + } + inserter := func(blocks types.Blocks) (int, error) { + // If sync hasn't reached the checkpoint yet, deny importing weird blocks. + // + // Ideally we would also compare the head block's timestamp and similarly reject + // the propagated block if the head is too old. Unfortunately there is a corner + // case when starting new networks, where the genesis might be ancient (0 unix) + // which would prevent full nodes from accepting it. + if manager.blockchain.CurrentBlock().NumberU64() < manager.checkpointNumber { + log.Warn("Unsynced yet, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash()) + return 0, nil + } + // If fast sync is running, deny importing weird blocks. This is a problematic + // clause when starting up a new network, because fast-syncing miners might not + // accept each others' blocks until a restart. Unfortunately we haven't figured + // out a way yet where nodes can decide unilaterally whether the network is new + // or not. This should be fixed if we figure out a solution. + if atomic.LoadUint32(&manager.fastSync) == 1 { + log.Warn("Fast syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash()) + return 0, nil + } + n, err := manager.blockchain.InsertChain(blocks) + if err == nil { + atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import + } + return n, err + } + manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer) + + return manager, nil +} + +func (pm *ProtocolManager) makeProtocol(version uint) p2p.Protocol { + length, ok := protocolLengths[version] + if !ok { + panic("makeProtocol for unknown version") + } + + return p2p.Protocol{ + Name: protocolName, + Version: version, + Length: length, + Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := pm.newPeer(int(version), p, rw) + select { + case pm.newPeerCh <- peer: + pm.wg.Add(1) + defer pm.wg.Done() + return pm.handle(peer) + case <-pm.quitSync: + return p2p.DiscQuitting + } + }, + NodeInfo: func() interface{} { + return pm.NodeInfo() + }, + PeerInfo: func(id enode.ID) interface{} { + if p := pm.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil { + return p.Info() + } + return nil + }, + } +} + +func (pm *ProtocolManager) removePeer(id string) { + // Short circuit if the peer was already removed + peer := pm.peers.Peer(id) + if peer == nil { + return + } + log.Debug("Removing Ethereum peer", "peer", id) + + // Unregister the peer from the downloader and Ethereum peer set + pm.downloader.UnregisterPeer(id) + if err := pm.peers.Unregister(id); err != nil { + log.Error("Peer removal failed", "peer", id, "err", err) + } + // Hard disconnect at the networking layer + if peer != nil { + peer.Peer.Disconnect(p2p.DiscUselessPeer) + } +} + +func (pm *ProtocolManager) Start(maxPeers int) { + pm.maxPeers = maxPeers + + // broadcast transactions + pm.txsCh = make(chan core.NewTxsEvent, txChanSize) + pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) + go pm.txBroadcastLoop() + + // broadcast mined blocks + pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) + go pm.minedBroadcastLoop() + + // start sync handlers + go pm.syncer() + go pm.txsyncLoop() +} + +func (pm *ProtocolManager) Stop() { + log.Info("Stopping Ethereum protocol") + + pm.txsSub.Unsubscribe() // quits txBroadcastLoop + pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + + // Quit the sync loop. + // After this send has completed, no new peers will be accepted. + pm.noMorePeers <- struct{}{} + + // Quit fetcher, txsyncLoop. + close(pm.quitSync) + + // Disconnect existing sessions. + // This also closes the gate for any new registrations on the peer set. + // sessions which are already established but not added to pm.peers yet + // will exit when they try to register. + pm.peers.Close() + + // Wait for all peer handler goroutines and the loops to come down. + pm.wg.Wait() + + log.Info("Ethereum protocol stopped") +} + +func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { + return newPeer(pv, p, newMeteredMsgWriter(rw)) +} + +// handle is the callback invoked to manage the life cycle of an eth peer. When +// this function terminates, the peer is disconnected. +func (pm *ProtocolManager) handle(p *peer) error { + // Ignore maxPeers if this is a trusted peer + if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted { + return p2p.DiscTooManyPeers + } + p.Log().Debug("Ethereum peer connected", "name", p.Name()) + + // Execute the Ethereum handshake + var ( + genesis = pm.blockchain.Genesis() + head = pm.blockchain.CurrentHeader() + hash = head.Hash() + number = head.Number.Uint64() + td = pm.blockchain.GetTd(hash, number) + ) + if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil { + p.Log().Debug("Ethereum handshake failed", "err", err) + return err + } + if rw, ok := p.rw.(*meteredMsgReadWriter); ok { + rw.Init(p.version) + } + // Register the peer locally + if err := pm.peers.Register(p); err != nil { + p.Log().Error("Ethereum peer registration failed", "err", err) + return err + } + defer pm.removePeer(p.id) + + // Register the peer in the downloader. If the downloader considers it banned, we disconnect + if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil { + return err + } + // Propagate existing transactions. new transactions appearing + // after this will be sent via broadcasts. + pm.syncTransactions(p) + + // If we have a trusted CHT, reject all peers below that (avoid fast sync eclipse) + if pm.checkpointHash != (common.Hash{}) { + // Request the peer's checkpoint header for chain height/weight validation + if err := p.RequestHeadersByNumber(pm.checkpointNumber, 1, 0, false); err != nil { + return err + } + // Start a timer to disconnect if the peer doesn't reply in time + p.syncDrop = time.AfterFunc(syncChallengeTimeout, func() { + p.Log().Warn("Checkpoint challenge timed out, dropping", "addr", p.RemoteAddr(), "type", p.Name()) + pm.removePeer(p.id) + }) + // Make sure it's cleaned up if the peer dies off + defer func() { + if p.syncDrop != nil { + p.syncDrop.Stop() + p.syncDrop = nil + } + }() + } + // If we have any explicit whitelist block hashes, request them + for number := range pm.whitelist { + if err := p.RequestHeadersByNumber(number, 1, 0, false); err != nil { + return err + } + } + // Handle incoming messages until the connection is torn down + for { + if err := pm.handleMsg(p); err != nil { + p.Log().Debug("Ethereum message handling failed", "err", err) + return err + } + } +} + +// handleMsg is invoked whenever an inbound message is received from a remote +// peer. The remote connection is torn down upon returning any error. +func (pm *ProtocolManager) handleMsg(p *peer) error { + // Read the next message from the remote peer, and ensure it's fully consumed + msg, err := p.rw.ReadMsg() + if err != nil { + return err + } + if msg.Size > protocolMaxMsgSize { + return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, protocolMaxMsgSize) + } + defer msg.Discard() + + // Handle the message depending on its contents + switch { + case msg.Code == StatusMsg: + // Status messages should never arrive after the handshake + return errResp(ErrExtraStatusMsg, "uncontrolled status message") + + // Block header query, collect the requested headers and reply + case msg.Code == GetBlockHeadersMsg: + // Decode the complex header query + var query getBlockHeadersData + if err := msg.Decode(&query); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + hashMode := query.Origin.Hash != (common.Hash{}) + first := true + maxNonCanonical := uint64(100) + + // Gather headers until the fetch or network limits is reached + var ( + bytes common.StorageSize + headers []*types.Header + unknown bool + ) + for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch { + // Retrieve the next header satisfying the query + var origin *types.Header + if hashMode { + if first { + first = false + origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash) + if origin != nil { + query.Origin.Number = origin.Number.Uint64() + } + } else { + origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number) + } + } else { + origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number) + } + if origin == nil { + break + } + headers = append(headers, origin) + bytes += estHeaderRlpSize + + // Advance to the next header of the query + switch { + case hashMode && query.Reverse: + // Hash based traversal towards the genesis block + ancestor := query.Skip + 1 + if ancestor == 0 { + unknown = true + } else { + query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical) + unknown = (query.Origin.Hash == common.Hash{}) + } + case hashMode && !query.Reverse: + // Hash based traversal towards the leaf block + var ( + current = origin.Number.Uint64() + next = current + query.Skip + 1 + ) + if next <= current { + infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ") + p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos) + unknown = true + } else { + if header := pm.blockchain.GetHeaderByNumber(next); header != nil { + nextHash := header.Hash() + expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical) + if expOldHash == query.Origin.Hash { + query.Origin.Hash, query.Origin.Number = nextHash, next + } else { + unknown = true + } + } else { + unknown = true + } + } + case query.Reverse: + // Number based traversal towards the genesis block + if query.Origin.Number >= query.Skip+1 { + query.Origin.Number -= query.Skip + 1 + } else { + unknown = true + } + + case !query.Reverse: + // Number based traversal towards the leaf block + query.Origin.Number += query.Skip + 1 + } + } + return p.SendBlockHeaders(headers) + + case msg.Code == BlockHeadersMsg: + // A batch of headers arrived to one of our previous requests + var headers []*types.Header + if err := msg.Decode(&headers); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // If no headers were received, but we're expencting a checkpoint header, consider it that + if len(headers) == 0 && p.syncDrop != nil { + // Stop the timer either way, decide later to drop or not + p.syncDrop.Stop() + p.syncDrop = nil + + // If we're doing a fast sync, we must enforce the checkpoint block to avoid + // eclipse attacks. Unsynced nodes are welcome to connect after we're done + // joining the network + if atomic.LoadUint32(&pm.fastSync) == 1 { + p.Log().Warn("Dropping unsynced node during fast sync", "addr", p.RemoteAddr(), "type", p.Name()) + return errors.New("unsynced node cannot serve fast sync") + } + } + // Filter out any explicitly requested headers, deliver the rest to the downloader + filter := len(headers) == 1 + if filter { + // If it's a potential sync progress check, validate the content and advertised chain weight + if p.syncDrop != nil && headers[0].Number.Uint64() == pm.checkpointNumber { + // Disable the sync drop timer + p.syncDrop.Stop() + p.syncDrop = nil + + // Validate the header and either drop the peer or continue + if headers[0].Hash() != pm.checkpointHash { + return errors.New("checkpoint hash mismatch") + } + return nil + } + // Otherwise if it's a whitelisted block, validate against the set + if want, ok := pm.whitelist[headers[0].Number.Uint64()]; ok { + if hash := headers[0].Hash(); want != hash { + p.Log().Info("Whitelist mismatch, dropping peer", "number", headers[0].Number.Uint64(), "hash", hash, "want", want) + return errors.New("whitelist block mismatch") + } + p.Log().Debug("Whitelist block verified", "number", headers[0].Number.Uint64(), "hash", want) + } + // Irrelevant of the fork checks, send the header to the fetcher just in case + headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) + } + if len(headers) > 0 || !filter { + err := pm.downloader.DeliverHeaders(p.id, headers) + if err != nil { + log.Debug("Failed to deliver headers", "err", err) + } + } + + case msg.Code == GetBlockBodiesMsg: + // Decode the retrieval message + msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + if _, err := msgStream.List(); err != nil { + return err + } + // Gather blocks until the fetch or network limits is reached + var ( + hash common.Hash + bytes int + bodies []rlp.RawValue + ) + for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch { + // Retrieve the hash of the next block + if err := msgStream.Decode(&hash); err == rlp.EOL { + break + } else if err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Retrieve the requested block body, stopping if enough was found + if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 { + bodies = append(bodies, data) + bytes += len(data) + } + } + return p.SendBlockBodiesRLP(bodies) + + case msg.Code == BlockBodiesMsg: + // A batch of block bodies arrived to one of our previous requests + var request blockBodiesData + if err := msg.Decode(&request); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Deliver them all to the downloader for queuing + transactions := make([][]*types.Transaction, len(request)) + uncles := make([][]*types.Header, len(request)) + + for i, body := range request { + transactions[i] = body.Transactions + uncles[i] = body.Uncles + } + // Filter out any explicitly requested bodies, deliver the rest to the downloader + filter := len(transactions) > 0 || len(uncles) > 0 + if filter { + transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now()) + } + if len(transactions) > 0 || len(uncles) > 0 || !filter { + err := pm.downloader.DeliverBodies(p.id, transactions, uncles) + if err != nil { + log.Debug("Failed to deliver bodies", "err", err) + } + } + + case p.version >= eth63 && msg.Code == GetNodeDataMsg: + // Decode the retrieval message + msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + if _, err := msgStream.List(); err != nil { + return err + } + // Gather state data until the fetch or network limits is reached + var ( + hash common.Hash + bytes int + data [][]byte + ) + for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch { + // Retrieve the hash of the next state entry + if err := msgStream.Decode(&hash); err == rlp.EOL { + break + } else if err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Retrieve the requested state entry, stopping if enough was found + if entry, err := pm.blockchain.TrieNode(hash); err == nil { + data = append(data, entry) + bytes += len(entry) + } + } + return p.SendNodeData(data) + + case p.version >= eth63 && msg.Code == NodeDataMsg: + // A batch of node state data arrived to one of our previous requests + var data [][]byte + if err := msg.Decode(&data); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Deliver all to the downloader + if err := pm.downloader.DeliverNodeData(p.id, data); err != nil { + log.Debug("Failed to deliver node state data", "err", err) + } + + case p.version >= eth63 && msg.Code == GetReceiptsMsg: + // Decode the retrieval message + msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + if _, err := msgStream.List(); err != nil { + return err + } + // Gather state data until the fetch or network limits is reached + var ( + hash common.Hash + bytes int + receipts []rlp.RawValue + ) + for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch { + // Retrieve the hash of the next block + if err := msgStream.Decode(&hash); err == rlp.EOL { + break + } else if err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Retrieve the requested block's receipts, skipping if unknown to us + results := pm.blockchain.GetReceiptsByHash(hash) + if results == nil { + if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { + continue + } + } + // If known, encode and queue for response packet + if encoded, err := rlp.EncodeToBytes(results); err != nil { + log.Error("Failed to encode receipt", "err", err) + } else { + receipts = append(receipts, encoded) + bytes += len(encoded) + } + } + return p.SendReceiptsRLP(receipts) + + case p.version >= eth63 && msg.Code == ReceiptsMsg: + // A batch of receipts arrived to one of our previous requests + var receipts [][]*types.Receipt + if err := msg.Decode(&receipts); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Deliver all to the downloader + if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil { + log.Debug("Failed to deliver receipts", "err", err) + } + + case msg.Code == NewBlockHashesMsg: + var announces newBlockHashesData + if err := msg.Decode(&announces); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + // Mark the hashes as present at the remote node + for _, block := range announces { + p.MarkBlock(block.Hash) + } + // Schedule all the unknown hashes for retrieval + unknown := make(newBlockHashesData, 0, len(announces)) + for _, block := range announces { + if !pm.blockchain.HasBlock(block.Hash, block.Number) { + unknown = append(unknown, block) + } + } + for _, block := range unknown { + pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies) + } + + case msg.Code == NewBlockMsg: + // Retrieve and decode the propagated block + var request newBlockData + if err := msg.Decode(&request); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + if err := request.sanityCheck(); err != nil { + return err + } + request.Block.ReceivedAt = msg.ReceivedAt + request.Block.ReceivedFrom = p + + // Mark the peer as owning the block and schedule it for import + p.MarkBlock(request.Block.Hash()) + pm.fetcher.Enqueue(p.id, request.Block) + + // Assuming the block is importable by the peer, but possibly not yet done so, + // calculate the head hash and TD that the peer truly must have. + var ( + trueHead = request.Block.ParentHash() + trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty()) + ) + // Update the peer's total difficulty if better than the previous + if _, td := p.Head(); trueTD.Cmp(td) > 0 { + p.SetHead(trueHead, trueTD) + + // Schedule a sync if above ours. Note, this will not fire a sync for a gap of + // a single block (as the true TD is below the propagated block), however this + // scenario should easily be covered by the fetcher. + currentBlock := pm.blockchain.CurrentBlock() + if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 { + go pm.synchronise(p) + } + } + + case msg.Code == TxMsg: + // Transactions arrived, make sure we have a valid and fresh chain to handle them + if atomic.LoadUint32(&pm.acceptTxs) == 0 { + break + } + // Transactions can be processed, parse all of them and deliver to the pool + var txs []*types.Transaction + if err := msg.Decode(&txs); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + for i, tx := range txs { + // Validate and mark the remote transaction + if tx == nil { + return errResp(ErrDecode, "transaction %d is nil", i) + } + p.MarkTransaction(tx.Hash()) + } + pm.txpool.AddRemotes(txs) + + default: + return errResp(ErrInvalidMsgCode, "%v", msg.Code) + } + return nil +} + +// BroadcastBlock will either propagate a block to a subset of it's peers, or +// will only announce it's availability (depending what's requested). +func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { + hash := block.Hash() + peers := pm.peers.PeersWithoutBlock(hash) + + // If propagation is requested, send to a subset of the peer + if propagate { + // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) + var td *big.Int + if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { + td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1)) + } else { + log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) + return + } + // Send the block to a subset of our peers + transferLen := int(math.Sqrt(float64(len(peers)))) + if transferLen < minBroadcastPeers { + transferLen = minBroadcastPeers + } + if transferLen > len(peers) { + transferLen = len(peers) + } + transfer := peers[:transferLen] + for _, peer := range transfer { + peer.AsyncSendNewBlock(block, td) + } + log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) + return + } + // Otherwise if the block is indeed in out own chain, announce it + if pm.blockchain.HasBlock(hash, block.NumberU64()) { + for _, peer := range peers { + peer.AsyncSendNewBlockHash(block) + } + log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) + } +} + +// BroadcastTxs will propagate a batch of transactions to all peers which are not known to +// already have the given transaction. +func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { + var txset = make(map[*peer]types.Transactions) + + // Broadcast transactions to a batch of peers not knowing about it + for _, tx := range txs { + peers := pm.peers.PeersWithoutTx(tx.Hash()) + for _, peer := range peers { + txset[peer] = append(txset[peer], tx) + } + log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers)) + } + // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] + for peer, txs := range txset { + peer.AsyncSendTransactions(txs) + } +} + +// Mined broadcast loop +func (pm *ProtocolManager) minedBroadcastLoop() { + // automatically stops if unsubscribe + for obj := range pm.minedBlockSub.Chan() { + if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { + pm.BroadcastBlock(ev.Block, true) // First propagate block to peers + pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest + } + } +} + +func (pm *ProtocolManager) txBroadcastLoop() { + for { + select { + case event := <-pm.txsCh: + pm.BroadcastTxs(event.Txs) + + // Err() channel will be closed when unsubscribing. + case <-pm.txsSub.Err(): + return + } + } +} + +// NodeInfo represents a short summary of the Ethereum sub-protocol metadata +// known about the host peer. +type NodeInfo struct { + Network uint64 `json:"network"` // Ethereum network ID (1=Frontier, 2=Morden, Ropsten=3, Rinkeby=4) + Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain + Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block + Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules + Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block +} + +// NodeInfo retrieves some protocol metadata about the running host node. +func (pm *ProtocolManager) NodeInfo() *NodeInfo { + currentBlock := pm.blockchain.CurrentBlock() + return &NodeInfo{ + Network: pm.networkID, + Difficulty: pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()), + Genesis: pm.blockchain.Genesis().Hash(), + Config: pm.blockchain.Config(), + Head: currentBlock.Hash(), + } +} |