diff options
-rw-r--r-- | plugin/evm/vm.go | 98 |
1 files changed, 61 insertions, 37 deletions
diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 73a97e8..583a0c5 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -23,8 +23,8 @@ import ( "github.com/ava-labs/coreth/params" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" @@ -165,9 +165,10 @@ type VM struct { acceptedDB database.Database - txPoolStabilizedHead common.Hash - txPoolStabilizedOk chan struct{} - txPoolStabilizedLock sync.Mutex + txPoolStabilizedHead common.Hash + txPoolStabilizedOk chan struct{} + txPoolStabilizedLock sync.Mutex + txPoolStabilizedShutdownChan chan struct{} metalock sync.Mutex blockCache, blockStatusCache cache.LRU @@ -183,12 +184,15 @@ type VM struct { genlock sync.Mutex txSubmitChan <-chan struct{} atomicTxSubmitChan chan struct{} + shutdownSubmitChan chan struct{} codec codec.Codec clock timer.Clock txFee uint64 pendingAtomicTxs chan *Tx blockAtomicInputCache cache.LRU + shutdownWg sync.WaitGroup + fx secp256k1fx.Fx } @@ -347,29 +351,14 @@ func (vm *VM) Initialize( vm.bdGenWaitFlag = true //vm.newTxPoolHeadChan = make(chan core.NewTxPoolHeadEvent, 1) vm.txPoolStabilizedOk = make(chan struct{}, 1) + vm.txPoolStabilizedShutdownChan = make(chan struct{}, 1) // Signal goroutine to shutdown // TODO: read size from options vm.pendingAtomicTxs = make(chan *Tx, 1024) vm.atomicTxSubmitChan = make(chan struct{}, 1) + vm.shutdownSubmitChan = make(chan struct{}, 1) //chain.GetTxPool().SubscribeNewHeadEvent(vm.newTxPoolHeadChan) vm.newTxPoolHeadChan = vm.chain.SubscribeNewMinedBlockEvent() - // TODO: shutdown this go routine - go ctx.Log.RecoverAndPanic(func() { - for { - select { - case e := <-vm.newTxPoolHeadChan.Chan(): - switch h := e.Data.(type) { - case core.NewMinedBlockEvent: - vm.txPoolStabilizedLock.Lock() - if vm.txPoolStabilizedHead == h.Block.Hash() { - vm.txPoolStabilizedOk <- struct{}{} - vm.txPoolStabilizedHead = common.Hash{} - } - vm.txPoolStabilizedLock.Unlock() - default: - } - } - } - }) + go ctx.Log.RecoverAndPanic(vm.awaitTxPoolStabilized) chain.Start() var lastAccepted *types.Block @@ -396,21 +385,7 @@ func (vm *VM) Initialize( log.Info(fmt.Sprintf("lastAccepted = %s", vm.lastAccepted.ethBlock.Hash().Hex())) // TODO: shutdown this go routine - go vm.ctx.Log.RecoverAndPanic(func() { - vm.txSubmitChan = vm.chain.GetTxSubmitCh() - for { - select { - case <-vm.txSubmitChan: - log.Trace("New tx detected, trying to generate a block") - vm.tryBlockGen() - case <-vm.atomicTxSubmitChan: - log.Trace("New atomic Tx detected, trying to generate a block") - vm.tryBlockGen() - case <-time.After(5 * time.Second): - vm.tryBlockGen() - } - } - }) + go vm.ctx.Log.RecoverAndPanic(vm.awaitSubmittedTxs) vm.codec = Codec return vm.fx.Initialize(vm) @@ -431,7 +406,10 @@ func (vm *VM) Shutdown() error { } vm.writeBackMetadata() + close(vm.txPoolStabilizedShutdownChan) + close(vm.shutdownSubmitChan) vm.chain.Stop() + vm.shutdownWg.Wait() return nil } @@ -745,6 +723,52 @@ func (vm *VM) writeBackMetadata() { atomic.StoreUint32(&vm.writingMetadata, 0) } +// awaitTxPoolStabilized waits for a txPoolHead channel event +// and notifies the VM when the tx pool has stabilized to the +// expected block hash +// Waits for signal to shutdown from txPoolStabilizedShutdownChan chan +func (vm *VM) awaitTxPoolStabilized() { + vm.shutdownWg.Add(1) + for { + select { + case e := <-vm.newTxPoolHeadChan.Chan(): + switch h := e.Data.(type) { + case core.NewMinedBlockEvent: + vm.txPoolStabilizedLock.Lock() + if vm.txPoolStabilizedHead == h.Block.Hash() { + vm.txPoolStabilizedOk <- struct{}{} + vm.txPoolStabilizedHead = common.Hash{} + } + vm.txPoolStabilizedLock.Unlock() + default: + } + case <-vm.txPoolStabilizedShutdownChan: + vm.shutdownWg.Done() + return + } + } +} + +func (vm *VM) awaitSubmittedTxs() { + vm.shutdownWg.Add(1) + vm.txSubmitChan = vm.chain.GetTxSubmitCh() + for { + select { + case <-vm.txSubmitChan: + log.Trace("New tx detected, trying to generate a block") + vm.tryBlockGen() + case <-vm.atomicTxSubmitChan: + log.Trace("New atomic Tx detected, trying to generate a block") + vm.tryBlockGen() + case <-time.After(5 * time.Second): + vm.tryBlockGen() + case <-vm.shutdownSubmitChan: + vm.shutdownWg.Done() + return + } + } +} + func (vm *VM) getLastAccepted() *Block { vm.metalock.Lock() defer vm.metalock.Unlock() |