// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package eth
import (
"errors"
"fmt"
"math/big"
"sync"
"time"
mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
)
var (
errClosed = errors.New("peer set is closed")
errAlreadyRegistered = errors.New("peer is already registered")
errNotRegistered = errors.New("peer is not registered")
)
const (
maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
// maxQueuedTxs is the maximum number of transaction lists to queue up before
// dropping broadcasts. This is a sensitive number as a transaction list might
// contain a single transaction, or thousands.
maxQueuedTxs = 128
// maxQueuedProps is the maximum number of block propagations to queue up before
// dropping broadcasts. There's not much point in queueing stale blocks, so a few
// that might cover uncles should be enough.
maxQueuedProps = 4
// maxQueuedAnns is the maximum number of block announcements to queue up before
// dropping broadcasts. Similarly to block propagations, there's no point to queue
// above some healthy uncle limit, so use that.
maxQueuedAnns = 4
handshakeTimeout = 5 * time.Second
)
// PeerInfo represents a short summary of the Ethereum sub-protocol metadata known
// about a connected peer.
type PeerInfo struct {
Version int `json:"version"` // Ethereum protocol version negotiated
Difficulty *big.Int `json:"difficulty"` // Total difficulty of the peer's blockchain
Head string `json:"head"` // SHA3 hash of the peer's best owned block
}
// propEvent is a block propagation, waiting for its turn in the broadcast queue.
type propEvent struct {
block *types.Block
td *big.Int
}
type peer struct {
id string
*p2p.Peer
rw p2p.MsgReadWriter
version int // Protocol version negotiated
syncDrop *time.Timer // Timed connection dropper if sync progress isn't validated in time
head common.Hash
td *big.Int
lock sync.RWMutex
knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
queuedAnns chan *types.Block // Queue of blocks to announce to the peer
term chan struct{} // Termination channel to stop the broadcaster
}
func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
return &peer{
Peer: p,
rw: rw,
version: version,
id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
knownTxs: mapset.NewSet(),
knownBlocks: mapset.NewSet(),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
queuedProps: make(chan *propEvent, maxQueuedProps),
queuedAnns: make(chan *types.Block, maxQueuedAnns),
term: make(chan struct{}),
}
}
// broadcast is a write loop that multiplexes block propagations, announcements
// and transaction broadcasts into the remote peer. The goal is to have an async
// writer that does not lock up node internals.
func (p *peer) broadcast() {
for {
select {
case txs := <-p.queuedTxs:
if err := p.SendTransactions(txs); err != nil {
return
}
p.Log().Trace("Broadcast transactions", "count", len(txs))
case prop := <-p.queuedProps:
if err := p.SendNewBlock(prop.block, prop.td); err != nil {
return
}
p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)
case block := <-p.queuedAnns:
if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
return
}
p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
case <-p.term:
return
}
}
}
// close signals the broadcast goroutine to terminate.
func (p *peer) close() {
close(p.term)
}
// Info gathers and returns a collection of metadata known about a peer.
func (p *peer) Info() *PeerInfo {
hash, td := p.Head()
return &PeerInfo{
Version: p.version,
Difficulty: td,
Head: hash.Hex(),
}
}
// Head retrieves a copy of the current head hash and total difficulty of the
// peer.
func (p *peer) Head() (hash common.Hash, td *big.Int) {
p.lock.RLock()
defer p.lock.RUnlock()
copy(hash[:], p.head[:])
return hash, new(big.Int).Set(p.td)
}
// SetHead updates the head hash and total difficulty of the peer.
func (p *peer) SetHead(hash common.Hash, td *big.Int) {
p.lock.Lock()
defer p.lock.Unlock()
copy(p.head[:], hash[:])
p.td.Set(td)
}
// MarkBlock marks a block as known for the peer, ensuring that the block will
// never be propagated to this particular peer.
func (p *peer) MarkBlock(hash common.Hash) {
// If we reached the memory allowance, drop a previously known block hash
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
p.knownBlocks.Pop()
}
p.knownBlocks.Add(hash)
}
// MarkTransaction marks a transaction as known for the peer, ensuring that it
// will never be propagated to this particular peer.
func (p *peer) MarkTransaction(hash common.Hash) {
// If we reached the memory allowance, drop a previously known transaction hash
for p.knownTxs.Cardinality() >= maxKnownTxs {
p.knownTxs.Pop()
}
p.knownTxs.Add(hash)
}
// SendTransactions sends transactions to the peer and includes the hashes
// in its transaction hash set for future reference.
func (p *peer) SendTransactions(txs types.Transactions) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
for _, tx := range txs {
p.knownTxs.Add(tx.Hash())
}
for p.knownTxs.Cardinality() >= maxKnownTxs {
p.knownTxs.Pop()
}
return p2p.Send(p.rw, TxMsg, txs)
}
// AsyncSendTransactions queues list of transactions propagation to a remote
// peer. If the peer's broadcast queue is full, the event is silently dropped.
func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
select {
case p.queuedTxs <- txs:
// Mark all the transactions as known, but ensure we don't overflow our limits
for _, tx := range txs {
p.knownTxs.Add(tx.Hash())
}
for p.knownTxs.Cardinality() >= maxKnownTxs {
p.knownTxs.Pop()
}
default:
p.Log().Debug("Dropping transaction propagation", "count", len(txs))
}
}
// SendNewBlockHashes announces the availability of a number of blocks through
// a hash notification.
func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
// Mark all the block hashes as known, but ensure we don't overflow our limits
for _, hash := range hashes {
p.knownBlocks.Add(hash)
}
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
p.knownBlocks.Pop()
}
request := make(newBlockHashesData,