diff options
author | Determinant <[email protected]> | 2020-07-30 14:18:44 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2020-07-30 14:18:44 -0400 |
commit | 0444e66f640999c15496066637841efcc0433934 (patch) | |
tree | c19aec2dced2e9129c880c19c52ca0f87b3d62f6 /eth/sync.go | |
parent | cffa0954bbdb43821d1b71d00f99fb705cecd25b (diff) | |
parent | 1f49826de2bb8bb4f5f99f69fd2beb039b1172d9 (diff) |
Merge branch 'multi-coin'
Diffstat (limited to 'eth/sync.go')
-rw-r--r-- | eth/sync.go | 216 |
1 files changed, 0 insertions, 216 deletions
diff --git a/eth/sync.go b/eth/sync.go deleted file mode 100644 index 6f067a3..0000000 --- a/eth/sync.go +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright 2015 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package eth - -import ( - "math/rand" - "sync/atomic" - "time" - - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/core/types" - "github.com/ava-labs/go-ethereum/eth/downloader" - "github.com/ava-labs/go-ethereum/log" - "github.com/ava-labs/go-ethereum/p2p/enode" -) - -const ( - forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available - minDesiredPeerCount = 5 // Amount of peers desired to start syncing - - // This is the target size for the packs of transactions sent by txsyncLoop. - // A pack can get larger than this if a single transactions exceeds this size. - txsyncPackSize = 100 * 1024 -) - -type txsync struct { - p *peer - txs []*types.Transaction -} - -// syncTransactions starts sending all currently pending transactions to the given peer. -func (pm *ProtocolManager) syncTransactions(p *peer) { - var txs types.Transactions - pending, _ := pm.txpool.Pending() - for _, batch := range pending { - txs = append(txs, batch...) - } - if len(txs) == 0 { - return - } - select { - case pm.txsyncCh <- &txsync{p, txs}: - case <-pm.quitSync: - } -} - -// txsyncLoop takes care of the initial transaction sync for each new -// connection. When a new peer appears, we relay all currently pending -// transactions. In order to minimise egress bandwidth usage, we send -// the transactions in small packs to one peer at a time. -func (pm *ProtocolManager) txsyncLoop() { - var ( - pending = make(map[enode.ID]*txsync) - sending = false // whether a send is active - pack = new(txsync) // the pack that is being sent - done = make(chan error, 1) // result of the send - ) - - // send starts a sending a pack of transactions from the sync. - send := func(s *txsync) { - // Fill pack with transactions up to the target size. - size := common.StorageSize(0) - pack.p = s.p - pack.txs = pack.txs[:0] - for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ { - pack.txs = append(pack.txs, s.txs[i]) - size += s.txs[i].Size() - } - // Remove the transactions that will be sent. - s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])] - if len(s.txs) == 0 { - delete(pending, s.p.ID()) - } - // Send the pack in the background. - s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size) - sending = true - go func() { done <- pack.p.SendTransactions(pack.txs) }() - } - - // pick chooses the next pending sync. - pick := func() *txsync { - if len(pending) == 0 { - return nil - } - n := rand.Intn(len(pending)) + 1 - for _, s := range pending { - if n--; n == 0 { - return s - } - } - return nil - } - - for { - select { - case s := <-pm.txsyncCh: - pending[s.p.ID()] = s - if !sending { - send(s) - } - case err := <-done: - sending = false - // Stop tracking peers that cause send failures. - if err != nil { - pack.p.Log().Debug("Transaction send failed", "err", err) - delete(pending, pack.p.ID()) - } - // Schedule the next send. - if s := pick(); s != nil { - send(s) - } - case <-pm.quitSync: - return - } - } -} - -// syncer is responsible for periodically synchronising with the network, both -// downloading hashes and blocks as well as handling the announcement handler. -func (pm *ProtocolManager) syncer() { - // Start and ensure cleanup of sync mechanisms - pm.fetcher.Start() - defer pm.fetcher.Stop() - defer pm.downloader.Terminate() - - // Wait for different events to fire synchronisation operations - forceSync := time.NewTicker(forceSyncCycle) - defer forceSync.Stop() - - for { - select { - case <-pm.newPeerCh: - // Make sure we have peers to select from, then sync - if pm.peers.Len() < minDesiredPeerCount { - break - } - go pm.synchronise(pm.peers.BestPeer()) - - case <-forceSync.C: - // Force a sync even if not enough peers are present - go pm.synchronise(pm.peers.BestPeer()) - - case <-pm.noMorePeers: - return - } - } -} - -// synchronise tries to sync up our local block chain with a remote peer. -func (pm *ProtocolManager) synchronise(peer *peer) { - // Short circuit if no peers are available - if peer == nil { - return - } - // Make sure the peer's TD is higher than our own - currentBlock := pm.blockchain.CurrentBlock() - td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) - - pHead, pTd := peer.Head() - if pTd.Cmp(td) <= 0 { - return - } - // Otherwise try to sync with the downloader - mode := downloader.FullSync - if atomic.LoadUint32(&pm.fastSync) == 1 { - // Fast sync was explicitly requested, and explicitly granted - mode = downloader.FastSync - } - if mode == downloader.FastSync { - // Make sure the peer's total difficulty we are synchronizing is higher. - if pm.blockchain.GetTdByHash(pm.blockchain.CurrentFastBlock().Hash()).Cmp(pTd) >= 0 { - return - } - } - // Run the sync cycle, and disable fast sync if we've went past the pivot block - if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil { - return - } - if atomic.LoadUint32(&pm.fastSync) == 1 { - log.Info("Fast sync complete, auto disabling") - atomic.StoreUint32(&pm.fastSync, 0) - } - // If we've successfully finished a sync cycle and passed any required checkpoint, - // enable accepting transactions from the network. - head := pm.blockchain.CurrentBlock() - if head.NumberU64() >= pm.checkpointNumber { - // Checkpoint passed, sanity check the timestamp to have a fallback mechanism - // for non-checkpointed (number = 0) private networks. - if head.Time() >= uint64(time.Now().AddDate(0, -1, 0).Unix()) { - atomic.StoreUint32(&pm.acceptTxs, 1) - } - } - if head.NumberU64() > 0 { - // We've completed a sync cycle, notify all peers of new state. This path is - // essential in star-topology networks where a gateway node needs to notify - // all its out-of-date peers of the availability of a new block. This failure - // scenario will most often crop up in private and hackathon networks with - // degenerate connectivity, but it should be healthy for the mainnet too to - // more reliably update peers or the local TD state. - go pm.BroadcastBlock(head, false) - } -} |