diff options
Diffstat (limited to 'plugin/evm/vm.go')
-rw-r--r-- | plugin/evm/vm.go | 181 |
1 files changed, 111 insertions, 70 deletions
diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 58ab600..9335b51 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -69,19 +69,15 @@ const ( maxBlockTime = 1000 * time.Millisecond batchSize = 250 maxUTXOsToFetch = 1024 - blockCacheSize = 1 << 10 // 1024 + blockCacheSize = 1024 codecVersion = uint16(0) ) const ( - bdTimerStateMin = iota - bdTimerStateMax - bdTimerStateLong + txFee = units.MilliAvax ) var ( - txFee = units.MilliAvax - errEmptyBlock = errors.New("empty block") errCreateBlock = errors.New("couldn't create block") errUnknownBlock = errors.New("unknown block") @@ -103,6 +99,17 @@ var ( errInvalidNonce = errors.New("invalid nonce") ) +// mayBuildBlockStatus denotes whether the engine should be notified +// that a block should be built, or whether more time has to pass +// before doing so. See VM's [mayBuildBlock]. +type mayBuildBlockStatus uint8 + +const ( + waitToBuild mayBuildBlockStatus = iota + conditionalWaitToBuild + mayBuild +) + func maxDuration(x, y time.Duration) time.Duration { if x > y { return x @@ -145,37 +152,53 @@ type VM struct { CLIConfig CommandLineConfig - chainID *big.Int - networkID uint64 - genesisHash common.Hash - chain *coreth.ETHChain - chaindb Database - newBlockChan chan *Block - networkChan chan<- commonEng.Message - newMinedBlockSub *event.TypeMuxSubscription + chainID *big.Int + networkID uint64 + genesisHash common.Hash + chain *coreth.ETHChain + chaindb Database + newBlockChan chan *Block + // A message is sent on this channel when a new block + // is ready to be build. This notifies the consensus engine. + notifyBuildBlockChan chan<- commonEng.Message + newMinedBlockSub *event.TypeMuxSubscription acceptedDB database.Database - txPoolStabilizedHead common.Hash - txPoolStabilizedOk chan struct{} - txPoolStabilizedLock sync.Mutex - txPoolStabilizedShutdownChan chan struct{} + txPoolStabilizedLock sync.Mutex + txPoolStabilizedHead common.Hash + txPoolStabilizedOk chan struct{} metalock sync.Mutex blockCache, blockStatusCache cache.LRU lastAccepted *Block writingMetadata uint32 - bdlock sync.Mutex - blockDelayTimer *timer.Timer - bdTimerState int8 - bdGenWaitFlag bool - bdGenFlag bool + // [buildBlockLock] must be held when accessing [mayBuildBlock], + // [tryToBuildBlock] or [awaitingBuildBlock]. + buildBlockLock sync.Mutex + // [buildBlockTimer] periodically fires in order to update [mayBuildBlock] + // and to try to build a block, if applicable. + buildBlockTimer *timer.Timer + // [mayBuildBlock] == [wait] means that the next block may be built + // only after more time has elapsed. + // [mayBuildBlock] == [conditionalWait] means that the next block may be built + // only if it has more than [batchSize] txs in it. Otherwise, wait until more + // time has elapsed. + // [mayBuildBlock] == [build] means that the next block may be built + // at any time. + mayBuildBlock mayBuildBlockStatus + // If true, try to notify the engine that a block should be built. + // Engine may not be notified because [mayBuildBlock] says to wait. + tryToBuildBlock bool + // If true, the engine has been notified that it should build a block + // but has not done so yet. If this is the case, wait until it has + // built a block before notifying it again. + awaitingBuildBlock bool genlock sync.Mutex txSubmitChan <-chan struct{} atomicTxSubmitChan chan struct{} - shutdownSubmitChan chan struct{} baseCodec codec.Codec codec codec.Manager clock timer.Clock @@ -183,7 +206,8 @@ type VM struct { pendingAtomicTxs chan *Tx blockAtomicInputCache cache.LRU - shutdownWg sync.WaitGroup + shutdownChan chan struct{} + shutdownWg sync.WaitGroup fx secp256k1fx.Fx } @@ -232,6 +256,7 @@ func (vm *VM) Initialize( return errUnsupportedFXs } + vm.shutdownChan = make(chan struct{}, 1) vm.ctx = ctx vm.chaindb = Database{db} g := new(core.Genesis) @@ -329,32 +354,35 @@ func (vm *VM) Initialize( vm.blockStatusCache = cache.LRU{Size: blockCacheSize} vm.blockAtomicInputCache = cache.LRU{Size: blockCacheSize} vm.newBlockChan = make(chan *Block) - vm.networkChan = toEngine - vm.blockDelayTimer = timer.NewTimer(func() { - vm.bdlock.Lock() - switch vm.bdTimerState { - case bdTimerStateMin: - vm.bdTimerState = bdTimerStateMax - vm.blockDelayTimer.SetTimeoutIn(maxDuration(maxBlockTime-minBlockTime, 0)) - case bdTimerStateMax: - vm.bdTimerState = bdTimerStateLong + vm.notifyBuildBlockChan = toEngine + + // Periodically updates [vm.mayBuildBlock] and tries to notify the engine to build + // a new block, if applicable. + vm.buildBlockTimer = timer.NewTimer(func() { + vm.buildBlockLock.Lock() + switch vm.mayBuildBlock { + case waitToBuild: + // Some time has passed. Allow block to be built if it has enough txs in it. + vm.mayBuildBlock = conditionalWaitToBuild + vm.buildBlockTimer.SetTimeoutIn(maxDuration(maxBlockTime-minBlockTime, 0)) + case conditionalWaitToBuild: + // More time has passed. Allow block to be built regardless of tx count. + vm.mayBuildBlock = mayBuild } - tryAgain := vm.bdGenWaitFlag - vm.bdlock.Unlock() - if tryAgain { + tryBuildBlock := vm.tryToBuildBlock + vm.buildBlockLock.Unlock() + if tryBuildBlock { vm.tryBlockGen() } }) - go ctx.Log.RecoverAndPanic(vm.blockDelayTimer.Dispatch) + go ctx.Log.RecoverAndPanic(vm.buildBlockTimer.Dispatch) - vm.bdTimerState = bdTimerStateLong - vm.bdGenWaitFlag = true + vm.mayBuildBlock = mayBuild + vm.tryToBuildBlock = true vm.txPoolStabilizedOk = make(chan struct{}, 1) - vm.txPoolStabilizedShutdownChan = make(chan struct{}, 1) // Signal goroutine to shutdown // TODO: read size from options vm.pendingAtomicTxs = make(chan *Tx, 1024) vm.atomicTxSubmitChan = make(chan struct{}, 1) - vm.shutdownSubmitChan = make(chan struct{}, 1) vm.newMinedBlockSub = vm.chain.SubscribeNewMinedBlockEvent() vm.shutdownWg.Add(1) go ctx.Log.RecoverAndPanic(vm.awaitTxPoolStabilized) @@ -401,7 +429,10 @@ func (vm *VM) Bootstrapping() error { return vm.fx.Bootstrapping() } // Bootstrapped notifies this VM that the consensus engine has finished // bootstrapping -func (vm *VM) Bootstrapped() error { return vm.fx.Bootstrapped() } +func (vm *VM) Bootstrapped() error { + vm.ctx.Bootstrapped() + return vm.fx.Bootstrapped() +} // Shutdown implements the snowman.ChainVM interface func (vm *VM) Shutdown() error { @@ -410,8 +441,8 @@ func (vm *VM) Shutdown() error { } vm.writeBackMetadata() - close(vm.txPoolStabilizedShutdownChan) - close(vm.shutdownSubmitChan) + vm.buildBlockTimer.Stop() + close(vm.shutdownChan) vm.chain.Stop() vm.shutdownWg.Wait() return nil @@ -421,18 +452,20 @@ func (vm *VM) Shutdown() error { func (vm *VM) BuildBlock() (snowman.Block, error) { vm.chain.GenBlock() block := <-vm.newBlockChan + + vm.buildBlockLock.Lock() + // Specify that we should wait before trying to build another block. + vm.mayBuildBlock = waitToBuild + vm.tryToBuildBlock = false + vm.awaitingBuildBlock = false + vm.buildBlockTimer.SetTimeoutIn(minBlockTime) + vm.buildBlockLock.Unlock() + if block == nil { return nil, errCreateBlock } - // reset the min block time timer - vm.bdlock.Lock() - vm.bdTimerState = bdTimerStateMin - vm.bdGenWaitFlag = false - vm.bdGenFlag = false - vm.blockDelayTimer.SetTimeoutIn(minBlockTime) - vm.bdlock.Unlock() - log.Debug(fmt.Sprintf("built block %s", block.ID())) + log.Debug(fmt.Sprintf("Built block %s", block.ID())) // make sure Tx Pool is updated <-vm.txPoolStabilizedOk return block, nil @@ -573,13 +606,14 @@ func (vm *VM) updateStatus(blockID ids.ID, status choices.Status) { } func (vm *VM) tryBlockGen() error { - vm.bdlock.Lock() - defer vm.bdlock.Unlock() - if vm.bdGenFlag { - // skip if one call already generates a block in this round + vm.buildBlockLock.Lock() + defer vm.buildBlockLock.Unlock() + if vm.awaitingBuildBlock { + // We notified the engine that a block should be built but it hasn't + // done so yet. Wait until it has done so before notifying again. return nil } - vm.bdGenWaitFlag = true + vm.tryToBuildBlock = true vm.genlock.Lock() defer vm.genlock.Unlock() @@ -592,20 +626,21 @@ func (vm *VM) tryBlockGen() error { return nil } - switch vm.bdTimerState { - case bdTimerStateMin: + switch vm.mayBuildBlock { + case waitToBuild: // Wait more time before notifying engine to building a block return nil - case bdTimerStateMax: + case conditionalWaitToBuild: // Notify engine only if there are enough pending txs if size < batchSize { return nil } - case bdTimerStateLong: - // timeout; go ahead and generate a new block anyway + case mayBuild: // Notify engine + default: + panic(fmt.Sprintf("mayBuildBlock has unexpected value %d", vm.mayBuildBlock)) } select { - case vm.networkChan <- commonEng.PendingTxs: - // successfully push out the notification; this round ends - vm.bdGenFlag = true + case vm.notifyBuildBlockChan <- commonEng.PendingTxs: + // Notify engine to build a block + vm.awaitingBuildBlock = true default: return errBlockFrequency } @@ -734,12 +769,18 @@ func (vm *VM) writeBackMetadata() { // awaitTxPoolStabilized waits for a txPoolHead channel event // and notifies the VM when the tx pool has stabilized to the // expected block hash -// Waits for signal to shutdown from txPoolStabilizedShutdownChan chan +// Waits for signal to shutdown from [vm.shutdownChan] func (vm *VM) awaitTxPoolStabilized() { defer vm.shutdownWg.Done() for { select { - case e := <-vm.newMinedBlockSub.Chan(): + case e, ok := <-vm.newMinedBlockSub.Chan(): + if !ok { + return + } + if e == nil { + continue + } switch h := e.Data.(type) { case core.NewMinedBlockEvent: vm.txPoolStabilizedLock.Lock() @@ -750,7 +791,7 @@ func (vm *VM) awaitTxPoolStabilized() { vm.txPoolStabilizedLock.Unlock() default: } - case <-vm.txPoolStabilizedShutdownChan: + case <-vm.shutdownChan: return } } @@ -769,7 +810,7 @@ func (vm *VM) awaitSubmittedTxs() { vm.tryBlockGen() case <-time.After(5 * time.Second): vm.tryBlockGen() - case <-vm.shutdownSubmitChan: + case <-vm.shutdownChan: return } } |