aboutsummaryrefslogtreecommitdiff
path: root/plugin/evm/vm.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/evm/vm.go')
-rw-r--r--plugin/evm/vm.go146
1 files changed, 90 insertions, 56 deletions
diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go
index 9ab4781..200a08d 100644
--- a/plugin/evm/vm.go
+++ b/plugin/evm/vm.go
@@ -22,11 +22,13 @@ import (
"github.com/ava-labs/coreth/node"
"github.com/ava-labs/coreth/params"
- "github.com/ava-labs/go-ethereum/common"
- "github.com/ava-labs/go-ethereum/rlp"
- "github.com/ava-labs/go-ethereum/rpc"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/rpc"
- ethcrypto "github.com/ava-labs/go-ethereum/crypto"
+ ethcrypto "github.com/ethereum/go-ethereum/crypto"
avalancheRPC "github.com/gorilla/rpc/v2"
@@ -152,20 +154,21 @@ func init() {
type VM struct {
ctx *snow.Context
- chainID *big.Int
- networkID uint64
- genesisHash common.Hash
- chain *coreth.ETHChain
- chaindb Database
- newBlockChan chan *Block
- networkChan chan<- commonEng.Message
- newTxPoolHeadChan chan core.NewTxPoolHeadEvent
+ chainID *big.Int
+ networkID uint64
+ genesisHash common.Hash
+ chain *coreth.ETHChain
+ chaindb Database
+ newBlockChan chan *Block
+ networkChan chan<- commonEng.Message
+ newMinedBlockSub *event.TypeMuxSubscription
acceptedDB database.Database
- txPoolStabilizedHead common.Hash
- txPoolStabilizedOk chan struct{}
- txPoolStabilizedLock sync.Mutex
+ txPoolStabilizedHead common.Hash
+ txPoolStabilizedOk chan struct{}
+ txPoolStabilizedLock sync.Mutex
+ txPoolStabilizedShutdownChan chan struct{}
metalock sync.Mutex
blockCache, blockStatusCache cache.LRU
@@ -181,12 +184,15 @@ type VM struct {
genlock sync.Mutex
txSubmitChan <-chan struct{}
atomicTxSubmitChan chan struct{}
+ shutdownSubmitChan chan struct{}
codec codec.Codec
clock timer.Clock
txFee uint64
pendingAtomicTxs chan *Tx
blockAtomicInputCache cache.LRU
+ shutdownWg sync.WaitGroup
+
fx secp256k1fx.Fx
}
@@ -243,6 +249,10 @@ func (vm *VM) Initialize(
config := eth.DefaultConfig
config.ManualCanonical = true
config.Genesis = g
+ // disable the experimental snapshot feature from geth
+ config.TrieCleanCache += config.SnapshotCache
+ config.SnapshotCache = 0
+
config.Miner.ManualMining = true
config.Miner.DisableUncle = true
@@ -287,7 +297,7 @@ func (vm *VM) Initialize(
return nil, nil
})
chain.SetOnSealFinish(func(block *types.Block) error {
- vm.ctx.Log.Verbo("EVM sealed a block")
+ log.Trace("EVM sealed a block")
blk := &Block{
id: ids.NewID(block.Hash()),
@@ -295,6 +305,7 @@ func (vm *VM) Initialize(
vm: vm,
}
if blk.Verify() != nil {
+ vm.newBlockChan <- nil
return errInvalidBlock
}
vm.newBlockChan <- blk
@@ -338,26 +349,15 @@ func (vm *VM) Initialize(
vm.bdTimerState = bdTimerStateLong
vm.bdGenWaitFlag = true
- vm.newTxPoolHeadChan = make(chan core.NewTxPoolHeadEvent, 1)
vm.txPoolStabilizedOk = make(chan struct{}, 1)
+ vm.txPoolStabilizedShutdownChan = make(chan struct{}, 1) // Signal goroutine to shutdown
// TODO: read size from options
vm.pendingAtomicTxs = make(chan *Tx, 1024)
vm.atomicTxSubmitChan = make(chan struct{}, 1)
- chain.GetTxPool().SubscribeNewHeadEvent(vm.newTxPoolHeadChan)
- // TODO: shutdown this go routine
- go ctx.Log.RecoverAndPanic(func() {
- for {
- select {
- case h := <-vm.newTxPoolHeadChan:
- vm.txPoolStabilizedLock.Lock()
- if vm.txPoolStabilizedHead == h.Block.Hash() {
- vm.txPoolStabilizedOk <- struct{}{}
- vm.txPoolStabilizedHead = common.Hash{}
- }
- vm.txPoolStabilizedLock.Unlock()
- }
- }
- })
+ vm.shutdownSubmitChan = make(chan struct{}, 1)
+ vm.newMinedBlockSub = vm.chain.SubscribeNewMinedBlockEvent()
+ vm.shutdownWg.Add(1)
+ go ctx.Log.RecoverAndPanic(vm.awaitTxPoolStabilized)
chain.Start()
var lastAccepted *types.Block
@@ -365,14 +365,14 @@ func (vm *VM) Initialize(
var hash common.Hash
if err = rlp.DecodeBytes(b, &hash); err == nil {
if block := chain.GetBlockByHash(hash); block == nil {
- vm.ctx.Log.Debug("lastAccepted block not found in chaindb")
+ log.Debug("lastAccepted block not found in chaindb")
} else {
lastAccepted = block
}
}
}
if lastAccepted == nil {
- vm.ctx.Log.Debug("lastAccepted is unavailable, setting to the genesis block")
+ log.Debug("lastAccepted is unavailable, setting to the genesis block")
lastAccepted = chain.GetGenesisBlock()
}
vm.lastAccepted = &Block{
@@ -381,24 +381,11 @@ func (vm *VM) Initialize(
vm: vm,
}
vm.genesisHash = chain.GetGenesisBlock().Hash()
- vm.ctx.Log.Info(fmt.Sprintf("lastAccepted = %s", vm.lastAccepted.ethBlock.Hash().Hex()))
+ log.Info(fmt.Sprintf("lastAccepted = %s", vm.lastAccepted.ethBlock.Hash().Hex()))
// TODO: shutdown this go routine
- go vm.ctx.Log.RecoverAndPanic(func() {
- vm.txSubmitChan = vm.chain.GetTxSubmitCh()
- for {
- select {
- case <-vm.txSubmitChan:
- vm.ctx.Log.Verbo("New tx detected, trying to generate a block")
- vm.tryBlockGen()
- case <-vm.atomicTxSubmitChan:
- vm.ctx.Log.Verbo("New atomic Tx detected, trying to generate a block")
- vm.tryBlockGen()
- case <-time.After(5 * time.Second):
- vm.tryBlockGen()
- }
- }
- })
+ vm.shutdownWg.Add(1)
+ go vm.ctx.Log.RecoverAndPanic(vm.awaitSubmittedTxs)
vm.codec = Codec
return vm.fx.Initialize(vm)
@@ -419,7 +406,10 @@ func (vm *VM) Shutdown() error {
}
vm.writeBackMetadata()
+ close(vm.txPoolStabilizedShutdownChan)
+ close(vm.shutdownSubmitChan)
vm.chain.Stop()
+ vm.shutdownWg.Wait()
return nil
}
@@ -438,7 +428,7 @@ func (vm *VM) BuildBlock() (snowman.Block, error) {
vm.blockDelayTimer.SetTimeoutIn(minBlockTime)
vm.bdlock.Unlock()
- vm.ctx.Log.Debug("built block 0x%x", block.ID().Bytes())
+ log.Debug(fmt.Sprintf("built block 0x%x", block.ID().Bytes()))
// make sure Tx Pool is updated
<-vm.txPoolStabilizedOk
return block, nil
@@ -622,7 +612,7 @@ func (vm *VM) getCachedStatus(blockID ids.ID) choices.Status {
acceptedIDBytes, err := vm.acceptedDB.Get(heightKey)
if err == nil {
if acceptedID, err := ids.ToID(acceptedIDBytes); err != nil {
- vm.ctx.Log.Error("snowman-eth: acceptedID bytes didn't match expected value: %s", err)
+ log.Error(fmt.Sprintf("snowman-eth: acceptedID bytes didn't match expected value: %s", err))
} else {
if acceptedID.Equals(blockID) {
vm.blockStatusCache.Put(blockID, choices.Accepted)
@@ -637,7 +627,7 @@ func (vm *VM) getCachedStatus(blockID ids.ID) choices.Status {
if status == choices.Accepted {
err := vm.acceptedDB.Put(heightKey, blockID.Bytes())
if err != nil {
- vm.ctx.Log.Error("snowman-eth: failed to write back acceptedID bytes: %s", err)
+ log.Error(fmt.Sprintf("snowman-eth: failed to write back acceptedID bytes: %s", err))
}
tempBlock := wrappedBlk
@@ -655,7 +645,7 @@ func (vm *VM) getCachedStatus(blockID ids.ID) choices.Status {
}
if err := vm.acceptedDB.Put(heightKey, parentID.Bytes()); err != nil {
- vm.ctx.Log.Error("snowman-eth: failed to write back acceptedID bytes: %s", err)
+ log.Error(fmt.Sprintf("snowman-eth: failed to write back acceptedID bytes: %s", err))
}
}
}
@@ -725,14 +715,58 @@ func (vm *VM) writeBackMetadata() {
b, err := rlp.EncodeToBytes(vm.lastAccepted.ethBlock.Hash())
if err != nil {
- vm.ctx.Log.Error("snowman-eth: error while writing back metadata")
+ log.Error("snowman-eth: error while writing back metadata")
return
}
- vm.ctx.Log.Debug("writing back metadata")
+ log.Debug("writing back metadata")
vm.chaindb.Put([]byte(lastAcceptedKey), b)
atomic.StoreUint32(&vm.writingMetadata, 0)
}
+// awaitTxPoolStabilized waits for a txPoolHead channel event
+// and notifies the VM when the tx pool has stabilized to the
+// expected block hash
+// Waits for signal to shutdown from txPoolStabilizedShutdownChan chan
+func (vm *VM) awaitTxPoolStabilized() {
+ defer vm.shutdownWg.Done()
+ for {
+ select {
+ case e := <-vm.newMinedBlockSub.Chan():
+ switch h := e.Data.(type) {
+ case core.NewMinedBlockEvent:
+ vm.txPoolStabilizedLock.Lock()
+ if vm.txPoolStabilizedHead == h.Block.Hash() {
+ vm.txPoolStabilizedOk <- struct{}{}
+ vm.txPoolStabilizedHead = common.Hash{}
+ }
+ vm.txPoolStabilizedLock.Unlock()
+ default:
+ }
+ case <-vm.txPoolStabilizedShutdownChan:
+ return
+ }
+ }
+}
+
+func (vm *VM) awaitSubmittedTxs() {
+ defer vm.shutdownWg.Done()
+ vm.txSubmitChan = vm.chain.GetTxSubmitCh()
+ for {
+ select {
+ case <-vm.txSubmitChan:
+ log.Trace("New tx detected, trying to generate a block")
+ vm.tryBlockGen()
+ case <-vm.atomicTxSubmitChan:
+ log.Trace("New atomic Tx detected, trying to generate a block")
+ vm.tryBlockGen()
+ case <-time.After(5 * time.Second):
+ vm.tryBlockGen()
+ case <-vm.shutdownSubmitChan:
+ return
+ }
+ }
+}
+
func (vm *VM) getLastAccepted() *Block {
vm.metalock.Lock()
defer vm.metalock.Unlock()