From b9cb07277a70f126e5ce098faff6fb88cab6c29a Mon Sep 17 00:00:00 2001 From: Aaron Buchwald Date: Thu, 17 Sep 2020 14:11:53 -0400 Subject: Fix panic on shutdown form orphaned goroutine --- plugin/evm/vm.go | 98 +++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 61 insertions(+), 37 deletions(-) (limited to 'plugin') diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 73a97e8..583a0c5 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -23,8 +23,8 @@ import ( "github.com/ava-labs/coreth/params" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" @@ -165,9 +165,10 @@ type VM struct { acceptedDB database.Database - txPoolStabilizedHead common.Hash - txPoolStabilizedOk chan struct{} - txPoolStabilizedLock sync.Mutex + txPoolStabilizedHead common.Hash + txPoolStabilizedOk chan struct{} + txPoolStabilizedLock sync.Mutex + txPoolStabilizedShutdownChan chan struct{} metalock sync.Mutex blockCache, blockStatusCache cache.LRU @@ -183,12 +184,15 @@ type VM struct { genlock sync.Mutex txSubmitChan <-chan struct{} atomicTxSubmitChan chan struct{} + shutdownSubmitChan chan struct{} codec codec.Codec clock timer.Clock txFee uint64 pendingAtomicTxs chan *Tx blockAtomicInputCache cache.LRU + shutdownWg sync.WaitGroup + fx secp256k1fx.Fx } @@ -347,29 +351,14 @@ func (vm *VM) Initialize( vm.bdGenWaitFlag = true //vm.newTxPoolHeadChan = make(chan core.NewTxPoolHeadEvent, 1) vm.txPoolStabilizedOk = make(chan struct{}, 1) + vm.txPoolStabilizedShutdownChan = make(chan struct{}, 1) // Signal goroutine to shutdown // TODO: read size from options vm.pendingAtomicTxs = make(chan *Tx, 1024) vm.atomicTxSubmitChan = make(chan struct{}, 1) + vm.shutdownSubmitChan = make(chan struct{}, 1) //chain.GetTxPool().SubscribeNewHeadEvent(vm.newTxPoolHeadChan) vm.newTxPoolHeadChan = vm.chain.SubscribeNewMinedBlockEvent() - // TODO: shutdown this go routine - go ctx.Log.RecoverAndPanic(func() { - for { - select { - case e := <-vm.newTxPoolHeadChan.Chan(): - switch h := e.Data.(type) { - case core.NewMinedBlockEvent: - vm.txPoolStabilizedLock.Lock() - if vm.txPoolStabilizedHead == h.Block.Hash() { - vm.txPoolStabilizedOk <- struct{}{} - vm.txPoolStabilizedHead = common.Hash{} - } - vm.txPoolStabilizedLock.Unlock() - default: - } - } - } - }) + go ctx.Log.RecoverAndPanic(vm.awaitTxPoolStabilized) chain.Start() var lastAccepted *types.Block @@ -396,21 +385,7 @@ func (vm *VM) Initialize( log.Info(fmt.Sprintf("lastAccepted = %s", vm.lastAccepted.ethBlock.Hash().Hex())) // TODO: shutdown this go routine - go vm.ctx.Log.RecoverAndPanic(func() { - vm.txSubmitChan = vm.chain.GetTxSubmitCh() - for { - select { - case <-vm.txSubmitChan: - log.Trace("New tx detected, trying to generate a block") - vm.tryBlockGen() - case <-vm.atomicTxSubmitChan: - log.Trace("New atomic Tx detected, trying to generate a block") - vm.tryBlockGen() - case <-time.After(5 * time.Second): - vm.tryBlockGen() - } - } - }) + go vm.ctx.Log.RecoverAndPanic(vm.awaitSubmittedTxs) vm.codec = Codec return vm.fx.Initialize(vm) @@ -431,7 +406,10 @@ func (vm *VM) Shutdown() error { } vm.writeBackMetadata() + close(vm.txPoolStabilizedShutdownChan) + close(vm.shutdownSubmitChan) vm.chain.Stop() + vm.shutdownWg.Wait() return nil } @@ -745,6 +723,52 @@ func (vm *VM) writeBackMetadata() { atomic.StoreUint32(&vm.writingMetadata, 0) } +// awaitTxPoolStabilized waits for a txPoolHead channel event +// and notifies the VM when the tx pool has stabilized to the +// expected block hash +// Waits for signal to shutdown from txPoolStabilizedShutdownChan chan +func (vm *VM) awaitTxPoolStabilized() { + vm.shutdownWg.Add(1) + for { + select { + case e := <-vm.newTxPoolHeadChan.Chan(): + switch h := e.Data.(type) { + case core.NewMinedBlockEvent: + vm.txPoolStabilizedLock.Lock() + if vm.txPoolStabilizedHead == h.Block.Hash() { + vm.txPoolStabilizedOk <- struct{}{} + vm.txPoolStabilizedHead = common.Hash{} + } + vm.txPoolStabilizedLock.Unlock() + default: + } + case <-vm.txPoolStabilizedShutdownChan: + vm.shutdownWg.Done() + return + } + } +} + +func (vm *VM) awaitSubmittedTxs() { + vm.shutdownWg.Add(1) + vm.txSubmitChan = vm.chain.GetTxSubmitCh() + for { + select { + case <-vm.txSubmitChan: + log.Trace("New tx detected, trying to generate a block") + vm.tryBlockGen() + case <-vm.atomicTxSubmitChan: + log.Trace("New atomic Tx detected, trying to generate a block") + vm.tryBlockGen() + case <-time.After(5 * time.Second): + vm.tryBlockGen() + case <-vm.shutdownSubmitChan: + vm.shutdownWg.Done() + return + } + } +} + func (vm *VM) getLastAccepted() *Block { vm.metalock.Lock() defer vm.metalock.Unlock() -- cgit v1.2.3-70-g09d2 From 1a17fcec0e23aba017e3641c708e40e79be8e153 Mon Sep 17 00:00:00 2001 From: Aaron Buchwald Date: Thu, 17 Sep 2020 14:32:42 -0400 Subject: move wg add outside of goroutines to prevent race condition --- plugin/evm/vm.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'plugin') diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 583a0c5..92e51e6 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -358,6 +358,7 @@ func (vm *VM) Initialize( vm.shutdownSubmitChan = make(chan struct{}, 1) //chain.GetTxPool().SubscribeNewHeadEvent(vm.newTxPoolHeadChan) vm.newTxPoolHeadChan = vm.chain.SubscribeNewMinedBlockEvent() + vm.shutdownWg.Add(1) go ctx.Log.RecoverAndPanic(vm.awaitTxPoolStabilized) chain.Start() @@ -385,6 +386,7 @@ func (vm *VM) Initialize( log.Info(fmt.Sprintf("lastAccepted = %s", vm.lastAccepted.ethBlock.Hash().Hex())) // TODO: shutdown this go routine + vm.shutdownWg.Add(1) go vm.ctx.Log.RecoverAndPanic(vm.awaitSubmittedTxs) vm.codec = Codec @@ -728,7 +730,6 @@ func (vm *VM) writeBackMetadata() { // expected block hash // Waits for signal to shutdown from txPoolStabilizedShutdownChan chan func (vm *VM) awaitTxPoolStabilized() { - vm.shutdownWg.Add(1) for { select { case e := <-vm.newTxPoolHeadChan.Chan(): @@ -750,7 +751,6 @@ func (vm *VM) awaitTxPoolStabilized() { } func (vm *VM) awaitSubmittedTxs() { - vm.shutdownWg.Add(1) vm.txSubmitChan = vm.chain.GetTxSubmitCh() for { select { -- cgit v1.2.3-70-g09d2 From b0a75c03303fe22f31fe1e1f0fa29a1e1cd78253 Mon Sep 17 00:00:00 2001 From: Aaron Buchwald Date: Thu, 17 Sep 2020 14:37:10 -0400 Subject: Defer wg Done call instead of calling before return --- plugin/evm/vm.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'plugin') diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 92e51e6..c02b835 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -730,6 +730,7 @@ func (vm *VM) writeBackMetadata() { // expected block hash // Waits for signal to shutdown from txPoolStabilizedShutdownChan chan func (vm *VM) awaitTxPoolStabilized() { + defer vm.shutdownWg.Done() for { select { case e := <-vm.newTxPoolHeadChan.Chan(): @@ -744,13 +745,13 @@ func (vm *VM) awaitTxPoolStabilized() { default: } case <-vm.txPoolStabilizedShutdownChan: - vm.shutdownWg.Done() return } } } func (vm *VM) awaitSubmittedTxs() { + defer vm.shutdownWg.Done() vm.txSubmitChan = vm.chain.GetTxSubmitCh() for { select { @@ -763,7 +764,6 @@ func (vm *VM) awaitSubmittedTxs() { case <-time.After(5 * time.Second): vm.tryBlockGen() case <-vm.shutdownSubmitChan: - vm.shutdownWg.Done() return } } -- cgit v1.2.3-70-g09d2