aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron Buchwald <aaron.buchwald56@gmail.com>2020-09-17 14:11:53 -0400
committerAaron Buchwald <aaron.buchwald56@gmail.com>2020-09-17 14:11:53 -0400
commitb9cb07277a70f126e5ce098faff6fb88cab6c29a (patch)
tree9cc4cfb2d3441d2f94422468f59430efd985eaca
parent95ae4c9b65db88021c40f1aa90541001cfad5cda (diff)
Fix panic on shutdown form orphaned goroutine
-rw-r--r--plugin/evm/vm.go98
1 files changed, 61 insertions, 37 deletions
diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go
index 73a97e8..583a0c5 100644
--- a/plugin/evm/vm.go
+++ b/plugin/evm/vm.go
@@ -23,8 +23,8 @@ import (
"github.com/ava-labs/coreth/params"
"github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
@@ -165,9 +165,10 @@ type VM struct {
acceptedDB database.Database
- txPoolStabilizedHead common.Hash
- txPoolStabilizedOk chan struct{}
- txPoolStabilizedLock sync.Mutex
+ txPoolStabilizedHead common.Hash
+ txPoolStabilizedOk chan struct{}
+ txPoolStabilizedLock sync.Mutex
+ txPoolStabilizedShutdownChan chan struct{}
metalock sync.Mutex
blockCache, blockStatusCache cache.LRU
@@ -183,12 +184,15 @@ type VM struct {
genlock sync.Mutex
txSubmitChan <-chan struct{}
atomicTxSubmitChan chan struct{}
+ shutdownSubmitChan chan struct{}
codec codec.Codec
clock timer.Clock
txFee uint64
pendingAtomicTxs chan *Tx
blockAtomicInputCache cache.LRU
+ shutdownWg sync.WaitGroup
+
fx secp256k1fx.Fx
}
@@ -347,29 +351,14 @@ func (vm *VM) Initialize(
vm.bdGenWaitFlag = true
//vm.newTxPoolHeadChan = make(chan core.NewTxPoolHeadEvent, 1)
vm.txPoolStabilizedOk = make(chan struct{}, 1)
+ vm.txPoolStabilizedShutdownChan = make(chan struct{}, 1) // Signal goroutine to shutdown
// TODO: read size from options
vm.pendingAtomicTxs = make(chan *Tx, 1024)
vm.atomicTxSubmitChan = make(chan struct{}, 1)
+ vm.shutdownSubmitChan = make(chan struct{}, 1)
//chain.GetTxPool().SubscribeNewHeadEvent(vm.newTxPoolHeadChan)
vm.newTxPoolHeadChan = vm.chain.SubscribeNewMinedBlockEvent()
- // TODO: shutdown this go routine
- go ctx.Log.RecoverAndPanic(func() {
- for {
- select {
- case e := <-vm.newTxPoolHeadChan.Chan():
- switch h := e.Data.(type) {
- case core.NewMinedBlockEvent:
- vm.txPoolStabilizedLock.Lock()
- if vm.txPoolStabilizedHead == h.Block.Hash() {
- vm.txPoolStabilizedOk <- struct{}{}
- vm.txPoolStabilizedHead = common.Hash{}
- }
- vm.txPoolStabilizedLock.Unlock()
- default:
- }
- }
- }
- })
+ go ctx.Log.RecoverAndPanic(vm.awaitTxPoolStabilized)
chain.Start()
var lastAccepted *types.Block
@@ -396,21 +385,7 @@ func (vm *VM) Initialize(
log.Info(fmt.Sprintf("lastAccepted = %s", vm.lastAccepted.ethBlock.Hash().Hex()))
// TODO: shutdown this go routine
- go vm.ctx.Log.RecoverAndPanic(func() {
- vm.txSubmitChan = vm.chain.GetTxSubmitCh()
- for {
- select {
- case <-vm.txSubmitChan:
- log.Trace("New tx detected, trying to generate a block")
- vm.tryBlockGen()
- case <-vm.atomicTxSubmitChan:
- log.Trace("New atomic Tx detected, trying to generate a block")
- vm.tryBlockGen()
- case <-time.After(5 * time.Second):
- vm.tryBlockGen()
- }
- }
- })
+ go vm.ctx.Log.RecoverAndPanic(vm.awaitSubmittedTxs)
vm.codec = Codec
return vm.fx.Initialize(vm)
@@ -431,7 +406,10 @@ func (vm *VM) Shutdown() error {
}
vm.writeBackMetadata()
+ close(vm.txPoolStabilizedShutdownChan)
+ close(vm.shutdownSubmitChan)
vm.chain.Stop()
+ vm.shutdownWg.Wait()
return nil
}
@@ -745,6 +723,52 @@ func (vm *VM) writeBackMetadata() {
atomic.StoreUint32(&vm.writingMetadata, 0)
}
+// awaitTxPoolStabilized waits for a txPoolHead channel event
+// and notifies the VM when the tx pool has stabilized to the
+// expected block hash
+// Waits for signal to shutdown from txPoolStabilizedShutdownChan chan
+func (vm *VM) awaitTxPoolStabilized() {
+ vm.shutdownWg.Add(1)
+ for {
+ select {
+ case e := <-vm.newTxPoolHeadChan.Chan():
+ switch h := e.Data.(type) {
+ case core.NewMinedBlockEvent:
+ vm.txPoolStabilizedLock.Lock()
+ if vm.txPoolStabilizedHead == h.Block.Hash() {
+ vm.txPoolStabilizedOk <- struct{}{}
+ vm.txPoolStabilizedHead = common.Hash{}
+ }
+ vm.txPoolStabilizedLock.Unlock()
+ default:
+ }
+ case <-vm.txPoolStabilizedShutdownChan:
+ vm.shutdownWg.Done()
+ return
+ }
+ }
+}
+
+func (vm *VM) awaitSubmittedTxs() {
+ vm.shutdownWg.Add(1)
+ vm.txSubmitChan = vm.chain.GetTxSubmitCh()
+ for {
+ select {
+ case <-vm.txSubmitChan:
+ log.Trace("New tx detected, trying to generate a block")
+ vm.tryBlockGen()
+ case <-vm.atomicTxSubmitChan:
+ log.Trace("New atomic Tx detected, trying to generate a block")
+ vm.tryBlockGen()
+ case <-time.After(5 * time.Second):
+ vm.tryBlockGen()
+ case <-vm.shutdownSubmitChan:
+ vm.shutdownWg.Done()
+ return
+ }
+ }
+}
+
func (vm *VM) getLastAccepted() *Block {
vm.metalock.Lock()
defer vm.metalock.Unlock()