diff options
Diffstat (limited to 'plugin/evm/vm.go')
-rw-r--r-- | plugin/evm/vm.go | 146 |
1 files changed, 90 insertions, 56 deletions
diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 9ab4781..200a08d 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -22,11 +22,13 @@ import ( "github.com/ava-labs/coreth/node" "github.com/ava-labs/coreth/params" - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/rlp" - "github.com/ava-labs/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" - ethcrypto "github.com/ava-labs/go-ethereum/crypto" + ethcrypto "github.com/ethereum/go-ethereum/crypto" avalancheRPC "github.com/gorilla/rpc/v2" @@ -152,20 +154,21 @@ func init() { type VM struct { ctx *snow.Context - chainID *big.Int - networkID uint64 - genesisHash common.Hash - chain *coreth.ETHChain - chaindb Database - newBlockChan chan *Block - networkChan chan<- commonEng.Message - newTxPoolHeadChan chan core.NewTxPoolHeadEvent + chainID *big.Int + networkID uint64 + genesisHash common.Hash + chain *coreth.ETHChain + chaindb Database + newBlockChan chan *Block + networkChan chan<- commonEng.Message + newMinedBlockSub *event.TypeMuxSubscription acceptedDB database.Database - txPoolStabilizedHead common.Hash - txPoolStabilizedOk chan struct{} - txPoolStabilizedLock sync.Mutex + txPoolStabilizedHead common.Hash + txPoolStabilizedOk chan struct{} + txPoolStabilizedLock sync.Mutex + txPoolStabilizedShutdownChan chan struct{} metalock sync.Mutex blockCache, blockStatusCache cache.LRU @@ -181,12 +184,15 @@ type VM struct { genlock sync.Mutex txSubmitChan <-chan struct{} atomicTxSubmitChan chan struct{} + shutdownSubmitChan chan struct{} codec codec.Codec clock timer.Clock txFee uint64 pendingAtomicTxs chan *Tx blockAtomicInputCache cache.LRU + shutdownWg sync.WaitGroup + fx secp256k1fx.Fx } @@ -243,6 +249,10 @@ func (vm *VM) Initialize( config := eth.DefaultConfig config.ManualCanonical = true config.Genesis = g + // disable the experimental snapshot feature from geth + config.TrieCleanCache += config.SnapshotCache + config.SnapshotCache = 0 + config.Miner.ManualMining = true config.Miner.DisableUncle = true @@ -287,7 +297,7 @@ func (vm *VM) Initialize( return nil, nil }) chain.SetOnSealFinish(func(block *types.Block) error { - vm.ctx.Log.Verbo("EVM sealed a block") + log.Trace("EVM sealed a block") blk := &Block{ id: ids.NewID(block.Hash()), @@ -295,6 +305,7 @@ func (vm *VM) Initialize( vm: vm, } if blk.Verify() != nil { + vm.newBlockChan <- nil return errInvalidBlock } vm.newBlockChan <- blk @@ -338,26 +349,15 @@ func (vm *VM) Initialize( vm.bdTimerState = bdTimerStateLong vm.bdGenWaitFlag = true - vm.newTxPoolHeadChan = make(chan core.NewTxPoolHeadEvent, 1) vm.txPoolStabilizedOk = make(chan struct{}, 1) + vm.txPoolStabilizedShutdownChan = make(chan struct{}, 1) // Signal goroutine to shutdown // TODO: read size from options vm.pendingAtomicTxs = make(chan *Tx, 1024) vm.atomicTxSubmitChan = make(chan struct{}, 1) - chain.GetTxPool().SubscribeNewHeadEvent(vm.newTxPoolHeadChan) - // TODO: shutdown this go routine - go ctx.Log.RecoverAndPanic(func() { - for { - select { - case h := <-vm.newTxPoolHeadChan: - vm.txPoolStabilizedLock.Lock() - if vm.txPoolStabilizedHead == h.Block.Hash() { - vm.txPoolStabilizedOk <- struct{}{} - vm.txPoolStabilizedHead = common.Hash{} - } - vm.txPoolStabilizedLock.Unlock() - } - } - }) + vm.shutdownSubmitChan = make(chan struct{}, 1) + vm.newMinedBlockSub = vm.chain.SubscribeNewMinedBlockEvent() + vm.shutdownWg.Add(1) + go ctx.Log.RecoverAndPanic(vm.awaitTxPoolStabilized) chain.Start() var lastAccepted *types.Block @@ -365,14 +365,14 @@ func (vm *VM) Initialize( var hash common.Hash if err = rlp.DecodeBytes(b, &hash); err == nil { if block := chain.GetBlockByHash(hash); block == nil { - vm.ctx.Log.Debug("lastAccepted block not found in chaindb") + log.Debug("lastAccepted block not found in chaindb") } else { lastAccepted = block } } } if lastAccepted == nil { - vm.ctx.Log.Debug("lastAccepted is unavailable, setting to the genesis block") + log.Debug("lastAccepted is unavailable, setting to the genesis block") lastAccepted = chain.GetGenesisBlock() } vm.lastAccepted = &Block{ @@ -381,24 +381,11 @@ func (vm *VM) Initialize( vm: vm, } vm.genesisHash = chain.GetGenesisBlock().Hash() - vm.ctx.Log.Info(fmt.Sprintf("lastAccepted = %s", vm.lastAccepted.ethBlock.Hash().Hex())) + log.Info(fmt.Sprintf("lastAccepted = %s", vm.lastAccepted.ethBlock.Hash().Hex())) // TODO: shutdown this go routine - go vm.ctx.Log.RecoverAndPanic(func() { - vm.txSubmitChan = vm.chain.GetTxSubmitCh() - for { - select { - case <-vm.txSubmitChan: - vm.ctx.Log.Verbo("New tx detected, trying to generate a block") - vm.tryBlockGen() - case <-vm.atomicTxSubmitChan: - vm.ctx.Log.Verbo("New atomic Tx detected, trying to generate a block") - vm.tryBlockGen() - case <-time.After(5 * time.Second): - vm.tryBlockGen() - } - } - }) + vm.shutdownWg.Add(1) + go vm.ctx.Log.RecoverAndPanic(vm.awaitSubmittedTxs) vm.codec = Codec return vm.fx.Initialize(vm) @@ -419,7 +406,10 @@ func (vm *VM) Shutdown() error { } vm.writeBackMetadata() + close(vm.txPoolStabilizedShutdownChan) + close(vm.shutdownSubmitChan) vm.chain.Stop() + vm.shutdownWg.Wait() return nil } @@ -438,7 +428,7 @@ func (vm *VM) BuildBlock() (snowman.Block, error) { vm.blockDelayTimer.SetTimeoutIn(minBlockTime) vm.bdlock.Unlock() - vm.ctx.Log.Debug("built block 0x%x", block.ID().Bytes()) + log.Debug(fmt.Sprintf("built block 0x%x", block.ID().Bytes())) // make sure Tx Pool is updated <-vm.txPoolStabilizedOk return block, nil @@ -622,7 +612,7 @@ func (vm *VM) getCachedStatus(blockID ids.ID) choices.Status { acceptedIDBytes, err := vm.acceptedDB.Get(heightKey) if err == nil { if acceptedID, err := ids.ToID(acceptedIDBytes); err != nil { - vm.ctx.Log.Error("snowman-eth: acceptedID bytes didn't match expected value: %s", err) + log.Error(fmt.Sprintf("snowman-eth: acceptedID bytes didn't match expected value: %s", err)) } else { if acceptedID.Equals(blockID) { vm.blockStatusCache.Put(blockID, choices.Accepted) @@ -637,7 +627,7 @@ func (vm *VM) getCachedStatus(blockID ids.ID) choices.Status { if status == choices.Accepted { err := vm.acceptedDB.Put(heightKey, blockID.Bytes()) if err != nil { - vm.ctx.Log.Error("snowman-eth: failed to write back acceptedID bytes: %s", err) + log.Error(fmt.Sprintf("snowman-eth: failed to write back acceptedID bytes: %s", err)) } tempBlock := wrappedBlk @@ -655,7 +645,7 @@ func (vm *VM) getCachedStatus(blockID ids.ID) choices.Status { } if err := vm.acceptedDB.Put(heightKey, parentID.Bytes()); err != nil { - vm.ctx.Log.Error("snowman-eth: failed to write back acceptedID bytes: %s", err) + log.Error(fmt.Sprintf("snowman-eth: failed to write back acceptedID bytes: %s", err)) } } } @@ -725,14 +715,58 @@ func (vm *VM) writeBackMetadata() { b, err := rlp.EncodeToBytes(vm.lastAccepted.ethBlock.Hash()) if err != nil { - vm.ctx.Log.Error("snowman-eth: error while writing back metadata") + log.Error("snowman-eth: error while writing back metadata") return } - vm.ctx.Log.Debug("writing back metadata") + log.Debug("writing back metadata") vm.chaindb.Put([]byte(lastAcceptedKey), b) atomic.StoreUint32(&vm.writingMetadata, 0) } +// awaitTxPoolStabilized waits for a txPoolHead channel event +// and notifies the VM when the tx pool has stabilized to the +// expected block hash +// Waits for signal to shutdown from txPoolStabilizedShutdownChan chan +func (vm *VM) awaitTxPoolStabilized() { + defer vm.shutdownWg.Done() + for { + select { + case e := <-vm.newMinedBlockSub.Chan(): + switch h := e.Data.(type) { + case core.NewMinedBlockEvent: + vm.txPoolStabilizedLock.Lock() + if vm.txPoolStabilizedHead == h.Block.Hash() { + vm.txPoolStabilizedOk <- struct{}{} + vm.txPoolStabilizedHead = common.Hash{} + } + vm.txPoolStabilizedLock.Unlock() + default: + } + case <-vm.txPoolStabilizedShutdownChan: + return + } + } +} + +func (vm *VM) awaitSubmittedTxs() { + defer vm.shutdownWg.Done() + vm.txSubmitChan = vm.chain.GetTxSubmitCh() + for { + select { + case <-vm.txSubmitChan: + log.Trace("New tx detected, trying to generate a block") + vm.tryBlockGen() + case <-vm.atomicTxSubmitChan: + log.Trace("New atomic Tx detected, trying to generate a block") + vm.tryBlockGen() + case <-time.After(5 * time.Second): + vm.tryBlockGen() + case <-vm.shutdownSubmitChan: + return + } + } +} + func (vm *VM) getLastAccepted() *Block { vm.metalock.Lock() defer vm.metalock.Unlock() |