aboutsummaryrefslogtreecommitdiff
path: root/plugin/evm/vm.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/evm/vm.go')
-rw-r--r--plugin/evm/vm.go502
1 files changed, 502 insertions, 0 deletions
diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go
new file mode 100644
index 0000000..a9011ea
--- /dev/null
+++ b/plugin/evm/vm.go
@@ -0,0 +1,502 @@
+// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
+// See the file LICENSE for licensing terms.
+
+package evm
+
+import (
+ "crypto/rand"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "math/big"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/ava-labs/coreth"
+ "github.com/ava-labs/coreth/core"
+ "github.com/ava-labs/coreth/eth"
+ "github.com/ava-labs/coreth/node"
+
+ "github.com/ava-labs/go-ethereum/common"
+ "github.com/ava-labs/go-ethereum/core/types"
+ "github.com/ava-labs/go-ethereum/rlp"
+ "github.com/ava-labs/go-ethereum/rpc"
+
+ "github.com/ava-labs/gecko/cache"
+ "github.com/ava-labs/gecko/database"
+ "github.com/ava-labs/gecko/ids"
+ "github.com/ava-labs/gecko/snow"
+ "github.com/ava-labs/gecko/snow/choices"
+ "github.com/ava-labs/gecko/snow/consensus/snowman"
+ "github.com/ava-labs/gecko/utils/timer"
+
+ commonEng "github.com/ava-labs/gecko/snow/engine/common"
+)
+
+const (
+ lastAcceptedKey = "snowman_lastAccepted"
+)
+
+const (
+ minBlockTime = 250 * time.Millisecond
+ maxBlockTime = 1000 * time.Millisecond
+ batchSize = 250
+)
+
+const (
+ bdTimerStateMin = iota
+ bdTimerStateMax
+ bdTimerStateLong
+)
+
+var (
+ errEmptyBlock = errors.New("empty block")
+ errCreateBlock = errors.New("couldn't create block")
+ errUnknownBlock = errors.New("unknown block")
+ errBlockFrequency = errors.New("too frequent block issuance")
+ errUnsupportedFXs = errors.New("unsupported feature extensions")
+)
+
+func maxDuration(x, y time.Duration) time.Duration {
+ if x > y {
+ return x
+ }
+ return y
+}
+
+// VM implements the snowman.ChainVM interface
+type VM struct {
+ ctx *snow.Context
+
+ chainID *big.Int
+ networkID uint64
+ chain *coreth.ETHChain
+ chaindb Database
+ newBlockChan chan *Block
+ networkChan chan<- commonEng.Message
+ newTxPoolHeadChan chan core.NewTxPoolHeadEvent
+
+ txPoolStabilizedHead common.Hash
+ txPoolStabilizedOk chan struct{}
+ txPoolStabilizedLock sync.Mutex
+
+ metalock sync.Mutex
+ blockCache, blockStatusCache cache.LRU
+ lastAccepted *Block
+ writingMetadata uint32
+
+ bdlock sync.Mutex
+ blockDelayTimer *timer.Timer
+ bdTimerState int8
+ bdGenWaitFlag bool
+ bdGenFlag bool
+
+ genlock sync.Mutex
+ txSubmitChan <-chan struct{}
+}
+
+/*
+ ******************************************************************************
+ ********************************* Snowman API ********************************
+ ******************************************************************************
+ */
+
+// Initialize implements the snowman.ChainVM interface
+func (vm *VM) Initialize(
+ ctx *snow.Context,
+ db database.Database,
+ b []byte,
+ toEngine chan<- commonEng.Message,
+ fxs []*commonEng.Fx,
+) error {
+ if len(fxs) > 0 {
+ return errUnsupportedFXs
+ }
+
+ vm.ctx = ctx
+ vm.chaindb = Database{db}
+ g := new(core.Genesis)
+ err := json.Unmarshal(b, g)
+ if err != nil {
+ return err
+ }
+
+ vm.chainID = g.Config.ChainID
+
+ config := eth.DefaultConfig
+ config.ManualCanonical = true
+ config.Genesis = g
+ config.Miner.ManualMining = true
+ config.Miner.DisableUncle = true
+ if err := config.SetGCMode("archive"); err != nil {
+ panic(err)
+ }
+ nodecfg := node.Config{NoUSB: true}
+ chain := coreth.NewETHChain(&config, &nodecfg, nil, vm.chaindb)
+ vm.chain = chain
+ vm.networkID = config.NetworkId
+ chain.SetOnHeaderNew(func(header *types.Header) {
+ hid := make([]byte, 32)
+ _, err := rand.Read(hid)
+ if err != nil {
+ panic("cannot generate hid")
+ }
+ header.Extra = append(header.Extra, hid...)
+ })
+ chain.SetOnSeal(func(block *types.Block) error {
+ if len(block.Transactions()) == 0 {
+ // this could happen due to the async logic of geth tx pool
+ vm.newBlockChan <- nil
+ return errEmptyBlock
+ }
+ return nil
+ })
+ chain.SetOnSealFinish(func(block *types.Block) error {
+ vm.ctx.Log.Verbo("EVM sealed a block")
+
+ blk := &Block{
+ id: ids.NewID(block.Hash()),
+ ethBlock: block,
+ vm: vm,
+ }
+ vm.newBlockChan <- blk
+ vm.updateStatus(ids.NewID(block.Hash()), choices.Processing)
+ vm.txPoolStabilizedLock.Lock()
+ vm.txPoolStabilizedHead = block.Hash()
+ vm.txPoolStabilizedLock.Unlock()
+ return nil
+ })
+ chain.SetOnQueryAcceptedBlock(func() *types.Block {
+ return vm.getLastAccepted().ethBlock
+ })
+ vm.blockCache = cache.LRU{Size: 2048}
+ vm.blockStatusCache = cache.LRU{Size: 1024}
+ vm.newBlockChan = make(chan *Block)
+ vm.networkChan = toEngine
+ vm.blockDelayTimer = timer.NewTimer(func() {
+ vm.bdlock.Lock()
+ switch vm.bdTimerState {
+ case bdTimerStateMin:
+ vm.bdTimerState = bdTimerStateMax
+ vm.blockDelayTimer.SetTimeoutIn(maxDuration(maxBlockTime-minBlockTime, 0))
+ case bdTimerStateMax:
+ vm.bdTimerState = bdTimerStateLong
+ }
+ tryAgain := vm.bdGenWaitFlag
+ vm.bdlock.Unlock()
+ if tryAgain {
+ vm.tryBlockGen()
+ }
+ })
+ go ctx.Log.RecoverAndPanic(vm.blockDelayTimer.Dispatch)
+
+ vm.bdTimerState = bdTimerStateLong
+ vm.bdGenWaitFlag = true
+ vm.newTxPoolHeadChan = make(chan core.NewTxPoolHeadEvent, 1)
+ vm.txPoolStabilizedOk = make(chan struct{}, 1)
+ chain.GetTxPool().SubscribeNewHeadEvent(vm.newTxPoolHeadChan)
+ // TODO: shutdown this go routine
+ go ctx.Log.RecoverAndPanic(func() {
+ for {
+ select {
+ case h := <-vm.newTxPoolHeadChan:
+ vm.txPoolStabilizedLock.Lock()
+ if vm.txPoolStabilizedHead == h.Block.Hash() {
+ vm.txPoolStabilizedOk <- struct{}{}
+ vm.txPoolStabilizedHead = common.Hash{}
+ }
+ vm.txPoolStabilizedLock.Unlock()
+ }
+ }
+ })
+ chain.Start()
+
+ var lastAccepted *types.Block
+ if b, err := vm.chaindb.Get([]byte(lastAcceptedKey)); err == nil {
+ var hash common.Hash
+ if err = rlp.DecodeBytes(b, &hash); err == nil {
+ if block := chain.GetBlockByHash(hash); block == nil {
+ vm.ctx.Log.Debug("lastAccepted block not found in chaindb")
+ } else {
+ lastAccepted = block
+ }
+ }
+ }
+ if lastAccepted == nil {
+ vm.ctx.Log.Debug("lastAccepted is unavailable, setting to the genesis block")
+ lastAccepted = chain.GetGenesisBlock()
+ }
+ vm.lastAccepted = &Block{
+ id: ids.NewID(lastAccepted.Hash()),
+ ethBlock: lastAccepted,
+ vm: vm,
+ }
+ vm.ctx.Log.Info(fmt.Sprintf("lastAccepted = %s", vm.lastAccepted.ethBlock.Hash().Hex()))
+
+ // TODO: shutdown this go routine
+ go vm.ctx.Log.RecoverAndPanic(func() {
+ vm.txSubmitChan = vm.chain.GetTxSubmitCh()
+ for {
+ select {
+ case <-vm.txSubmitChan:
+ vm.ctx.Log.Verbo("New tx detected, trying to generate a block")
+ vm.tryBlockGen()
+ case <-time.After(5 * time.Second):
+ vm.tryBlockGen()
+ }
+ }
+ })
+
+ return nil
+}
+
+// Shutdown implements the snowman.ChainVM interface
+func (vm *VM) Shutdown() {
+ if vm.ctx == nil {
+ return
+ }
+
+ vm.writeBackMetadata()
+ vm.chain.Stop()
+}
+
+// BuildBlock implements the snowman.ChainVM interface
+func (vm *VM) BuildBlock() (snowman.Block, error) {
+ vm.chain.GenBlock()
+ block := <-vm.newBlockChan
+ if block == nil {
+ return nil, errCreateBlock
+ }
+ // reset the min block time timer
+ vm.bdlock.Lock()
+ vm.bdTimerState = bdTimerStateMin
+ vm.bdGenWaitFlag = false
+ vm.bdGenFlag = false
+ vm.blockDelayTimer.SetTimeoutIn(minBlockTime)
+ vm.bdlock.Unlock()
+
+ vm.ctx.Log.Debug("built block 0x%x", block.ID().Bytes())
+ // make sure Tx Pool is updated
+ <-vm.txPoolStabilizedOk
+ return block, nil
+}
+
+// ParseBlock implements the snowman.ChainVM interface
+func (vm *VM) ParseBlock(b []byte) (snowman.Block, error) {
+ vm.metalock.Lock()
+ defer vm.metalock.Unlock()
+
+ ethBlock := new(types.Block)
+ if err := rlp.DecodeBytes(b, ethBlock); err != nil {
+ return nil, err
+ }
+ block := &Block{
+ id: ids.NewID(ethBlock.Hash()),
+ ethBlock: ethBlock,
+ vm: vm,
+ }
+ vm.blockCache.Put(block.ID(), block)
+ return block, nil
+}
+
+// GetBlock implements the snowman.ChainVM interface
+func (vm *VM) GetBlock(id ids.ID) (snowman.Block, error) {
+ vm.metalock.Lock()
+ defer vm.metalock.Unlock()
+
+ block := vm.getBlock(id)
+ if block == nil {
+ return nil, errUnknownBlock
+ }
+ return block, nil
+}
+
+// SetPreference sets what the current tail of the chain is
+func (vm *VM) SetPreference(blkID ids.ID) {
+ err := vm.chain.SetTail(blkID.Key())
+ vm.ctx.Log.AssertNoError(err)
+}
+
+// LastAccepted returns the ID of the block that was last accepted
+func (vm *VM) LastAccepted() ids.ID {
+ vm.metalock.Lock()
+ defer vm.metalock.Unlock()
+
+ return vm.lastAccepted.ID()
+}
+
+// CreateHandlers makes new http handlers that can handle API calls
+func (vm *VM) CreateHandlers() map[string]*commonEng.HTTPHandler {
+ handler := vm.chain.NewRPCHandler()
+ vm.chain.AttachEthService(handler, []string{"eth", "personal", "txpool"})
+ handler.RegisterName("net", &NetAPI{vm})
+ handler.RegisterName("snowman", &SnowmanAPI{vm})
+ handler.RegisterName("web3", &Web3API{})
+ handler.RegisterName("debug", &DebugAPI{vm})
+
+ return map[string]*commonEng.HTTPHandler{
+ "/rpc": &commonEng.HTTPHandler{LockOptions: commonEng.NoLock, Handler: handler},
+ "/ws": &commonEng.HTTPHandler{LockOptions: commonEng.NoLock, Handler: handler.WebsocketHandler([]string{"*"})},
+ }
+}
+
+// CreateStaticHandlers makes new http handlers that can handle API calls
+func (vm *VM) CreateStaticHandlers() map[string]*commonEng.HTTPHandler {
+ handler := rpc.NewServer()
+ handler.RegisterName("static", &StaticService{})
+ return map[string]*commonEng.HTTPHandler{
+ "/rpc": &commonEng.HTTPHandler{LockOptions: commonEng.NoLock, Handler: handler},
+ "/ws": &commonEng.HTTPHandler{LockOptions: commonEng.NoLock, Handler: handler.WebsocketHandler([]string{"*"})},
+ }
+}
+
+/*
+ ******************************************************************************
+ *********************************** Helpers **********************************
+ ******************************************************************************
+ */
+
+func (vm *VM) updateStatus(blockID ids.ID, status choices.Status) {
+ vm.metalock.Lock()
+ defer vm.metalock.Unlock()
+
+ if status == choices.Accepted {
+ vm.lastAccepted = vm.getBlock(blockID)
+ // TODO: improve this naive implementation
+ if atomic.SwapUint32(&vm.writingMetadata, 1) == 0 {
+ go vm.ctx.Log.RecoverAndPanic(vm.writeBackMetadata)
+ }
+ }
+ vm.blockStatusCache.Put(blockID, status)
+}
+
+func (vm *VM) getCachedBlock(blockID ids.ID) *types.Block {
+ return vm.chain.GetBlockByHash(blockID.Key())
+}
+
+func (vm *VM) tryBlockGen() error {
+ vm.bdlock.Lock()
+ defer vm.bdlock.Unlock()
+ if vm.bdGenFlag {
+ // skip if one call already generates a block in this round
+ return nil
+ }
+ vm.bdGenWaitFlag = true
+
+ vm.genlock.Lock()
+ defer vm.genlock.Unlock()
+ // get pending size
+ size, err := vm.chain.PendingSize()
+ if err != nil {
+ return err
+ }
+ if size == 0 {
+ return nil
+ }
+
+ switch vm.bdTimerState {
+ case bdTimerStateMin:
+ return nil
+ case bdTimerStateMax:
+ if size < batchSize {
+ return nil
+ }
+ case bdTimerStateLong:
+ // timeout; go ahead and generate a new block anyway
+ }
+ select {
+ case vm.networkChan <- commonEng.PendingTxs:
+ // successfully push out the notification; this round ends
+ vm.bdGenFlag = true
+ default:
+ return errBlockFrequency
+ }
+ return nil
+}
+
+func (vm *VM) getCachedStatus(blockID ids.ID) choices.Status {
+ vm.metalock.Lock()
+ defer vm.metalock.Unlock()
+ status := choices.Processing
+
+ if statusIntf, ok := vm.blockStatusCache.Get(blockID); ok {
+ status = statusIntf.(choices.Status)
+ } else {
+ blk := vm.chain.GetBlockByHash(blockID.Key())
+ if blk == nil {
+ return choices.Unknown
+ }
+ acceptedBlk := vm.lastAccepted.ethBlock
+
+ // TODO: There must be a better way of doing this.
+ // Traverse up the chain from the lower block until the indices match
+ highBlock := blk
+ lowBlock := acceptedBlk
+ if highBlock.Number().Cmp(lowBlock.Number()) < 0 {
+ highBlock, lowBlock = lowBlock, highBlock
+ }
+ for highBlock.Number().Cmp(lowBlock.Number()) > 0 {
+ highBlock = vm.chain.GetBlockByHash(highBlock.ParentHash())
+ }
+
+ if highBlock.Hash() == lowBlock.Hash() { // on the same branch
+ if blk.Number().Cmp(acceptedBlk.Number()) <= 0 {
+ status = choices.Accepted
+ }
+ } else { // on different branches
+ status = choices.Rejected
+ }
+ }
+
+ vm.blockStatusCache.Put(blockID, status)
+ return status
+}
+
+func (vm *VM) getBlock(id ids.ID) *Block {
+ if blockIntf, ok := vm.blockCache.Get(id); ok {
+ return blockIntf.(*Block)
+ }
+ ethBlock := vm.getCachedBlock(id)
+ if ethBlock == nil {
+ return nil
+ }
+ block := &Block{
+ id: ids.NewID(ethBlock.Hash()),
+ ethBlock: ethBlock,
+ vm: vm,
+ }
+ vm.blockCache.Put(id, block)
+ return block
+}
+
+func (vm *VM) issueRemoteTxs(txs []*types.Transaction) error {
+ errs := vm.chain.AddRemoteTxs(txs)
+ for _, err := range errs {
+ if err != nil {
+ return err
+ }
+ }
+ return vm.tryBlockGen()
+}
+
+func (vm *VM) writeBackMetadata() {
+ vm.metalock.Lock()
+ defer vm.metalock.Unlock()
+
+ b, err := rlp.EncodeToBytes(vm.lastAccepted.ethBlock.Hash())
+ if err != nil {
+ vm.ctx.Log.Error("snowman-eth: error while writing back metadata")
+ return
+ }
+ vm.ctx.Log.Debug("writing back metadata")
+ vm.chaindb.Put([]byte(lastAcceptedKey), b)
+ atomic.StoreUint32(&vm.writingMetadata, 0)
+}
+
+func (vm *VM) getLastAccepted() *Block {
+ vm.metalock.Lock()
+ defer vm.metalock.Unlock()
+
+ return vm.lastAccepted
+}