diff options
Diffstat (limited to 'plugin/evm/vm.go')
-rw-r--r-- | plugin/evm/vm.go | 502 |
1 files changed, 502 insertions, 0 deletions
diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go new file mode 100644 index 0000000..a9011ea --- /dev/null +++ b/plugin/evm/vm.go @@ -0,0 +1,502 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package evm + +import ( + "crypto/rand" + "encoding/json" + "errors" + "fmt" + "math/big" + "sync" + "sync/atomic" + "time" + + "github.com/ava-labs/coreth" + "github.com/ava-labs/coreth/core" + "github.com/ava-labs/coreth/eth" + "github.com/ava-labs/coreth/node" + + "github.com/ava-labs/go-ethereum/common" + "github.com/ava-labs/go-ethereum/core/types" + "github.com/ava-labs/go-ethereum/rlp" + "github.com/ava-labs/go-ethereum/rpc" + + "github.com/ava-labs/gecko/cache" + "github.com/ava-labs/gecko/database" + "github.com/ava-labs/gecko/ids" + "github.com/ava-labs/gecko/snow" + "github.com/ava-labs/gecko/snow/choices" + "github.com/ava-labs/gecko/snow/consensus/snowman" + "github.com/ava-labs/gecko/utils/timer" + + commonEng "github.com/ava-labs/gecko/snow/engine/common" +) + +const ( + lastAcceptedKey = "snowman_lastAccepted" +) + +const ( + minBlockTime = 250 * time.Millisecond + maxBlockTime = 1000 * time.Millisecond + batchSize = 250 +) + +const ( + bdTimerStateMin = iota + bdTimerStateMax + bdTimerStateLong +) + +var ( + errEmptyBlock = errors.New("empty block") + errCreateBlock = errors.New("couldn't create block") + errUnknownBlock = errors.New("unknown block") + errBlockFrequency = errors.New("too frequent block issuance") + errUnsupportedFXs = errors.New("unsupported feature extensions") +) + +func maxDuration(x, y time.Duration) time.Duration { + if x > y { + return x + } + return y +} + +// VM implements the snowman.ChainVM interface +type VM struct { + ctx *snow.Context + + chainID *big.Int + networkID uint64 + chain *coreth.ETHChain + chaindb Database + newBlockChan chan *Block + networkChan chan<- commonEng.Message + newTxPoolHeadChan chan core.NewTxPoolHeadEvent + + txPoolStabilizedHead common.Hash + txPoolStabilizedOk chan struct{} + txPoolStabilizedLock sync.Mutex + + metalock sync.Mutex + blockCache, blockStatusCache cache.LRU + lastAccepted *Block + writingMetadata uint32 + + bdlock sync.Mutex + blockDelayTimer *timer.Timer + bdTimerState int8 + bdGenWaitFlag bool + bdGenFlag bool + + genlock sync.Mutex + txSubmitChan <-chan struct{} +} + +/* + ****************************************************************************** + ********************************* Snowman API ******************************** + ****************************************************************************** + */ + +// Initialize implements the snowman.ChainVM interface +func (vm *VM) Initialize( + ctx *snow.Context, + db database.Database, + b []byte, + toEngine chan<- commonEng.Message, + fxs []*commonEng.Fx, +) error { + if len(fxs) > 0 { + return errUnsupportedFXs + } + + vm.ctx = ctx + vm.chaindb = Database{db} + g := new(core.Genesis) + err := json.Unmarshal(b, g) + if err != nil { + return err + } + + vm.chainID = g.Config.ChainID + + config := eth.DefaultConfig + config.ManualCanonical = true + config.Genesis = g + config.Miner.ManualMining = true + config.Miner.DisableUncle = true + if err := config.SetGCMode("archive"); err != nil { + panic(err) + } + nodecfg := node.Config{NoUSB: true} + chain := coreth.NewETHChain(&config, &nodecfg, nil, vm.chaindb) + vm.chain = chain + vm.networkID = config.NetworkId + chain.SetOnHeaderNew(func(header *types.Header) { + hid := make([]byte, 32) + _, err := rand.Read(hid) + if err != nil { + panic("cannot generate hid") + } + header.Extra = append(header.Extra, hid...) + }) + chain.SetOnSeal(func(block *types.Block) error { + if len(block.Transactions()) == 0 { + // this could happen due to the async logic of geth tx pool + vm.newBlockChan <- nil + return errEmptyBlock + } + return nil + }) + chain.SetOnSealFinish(func(block *types.Block) error { + vm.ctx.Log.Verbo("EVM sealed a block") + + blk := &Block{ + id: ids.NewID(block.Hash()), + ethBlock: block, + vm: vm, + } + vm.newBlockChan <- blk + vm.updateStatus(ids.NewID(block.Hash()), choices.Processing) + vm.txPoolStabilizedLock.Lock() + vm.txPoolStabilizedHead = block.Hash() + vm.txPoolStabilizedLock.Unlock() + return nil + }) + chain.SetOnQueryAcceptedBlock(func() *types.Block { + return vm.getLastAccepted().ethBlock + }) + vm.blockCache = cache.LRU{Size: 2048} + vm.blockStatusCache = cache.LRU{Size: 1024} + vm.newBlockChan = make(chan *Block) + vm.networkChan = toEngine + vm.blockDelayTimer = timer.NewTimer(func() { + vm.bdlock.Lock() + switch vm.bdTimerState { + case bdTimerStateMin: + vm.bdTimerState = bdTimerStateMax + vm.blockDelayTimer.SetTimeoutIn(maxDuration(maxBlockTime-minBlockTime, 0)) + case bdTimerStateMax: + vm.bdTimerState = bdTimerStateLong + } + tryAgain := vm.bdGenWaitFlag + vm.bdlock.Unlock() + if tryAgain { + vm.tryBlockGen() + } + }) + go ctx.Log.RecoverAndPanic(vm.blockDelayTimer.Dispatch) + + vm.bdTimerState = bdTimerStateLong + vm.bdGenWaitFlag = true + vm.newTxPoolHeadChan = make(chan core.NewTxPoolHeadEvent, 1) + vm.txPoolStabilizedOk = make(chan struct{}, 1) + chain.GetTxPool().SubscribeNewHeadEvent(vm.newTxPoolHeadChan) + // TODO: shutdown this go routine + go ctx.Log.RecoverAndPanic(func() { + for { + select { + case h := <-vm.newTxPoolHeadChan: + vm.txPoolStabilizedLock.Lock() + if vm.txPoolStabilizedHead == h.Block.Hash() { + vm.txPoolStabilizedOk <- struct{}{} + vm.txPoolStabilizedHead = common.Hash{} + } + vm.txPoolStabilizedLock.Unlock() + } + } + }) + chain.Start() + + var lastAccepted *types.Block + if b, err := vm.chaindb.Get([]byte(lastAcceptedKey)); err == nil { + var hash common.Hash + if err = rlp.DecodeBytes(b, &hash); err == nil { + if block := chain.GetBlockByHash(hash); block == nil { + vm.ctx.Log.Debug("lastAccepted block not found in chaindb") + } else { + lastAccepted = block + } + } + } + if lastAccepted == nil { + vm.ctx.Log.Debug("lastAccepted is unavailable, setting to the genesis block") + lastAccepted = chain.GetGenesisBlock() + } + vm.lastAccepted = &Block{ + id: ids.NewID(lastAccepted.Hash()), + ethBlock: lastAccepted, + vm: vm, + } + vm.ctx.Log.Info(fmt.Sprintf("lastAccepted = %s", vm.lastAccepted.ethBlock.Hash().Hex())) + + // TODO: shutdown this go routine + go vm.ctx.Log.RecoverAndPanic(func() { + vm.txSubmitChan = vm.chain.GetTxSubmitCh() + for { + select { + case <-vm.txSubmitChan: + vm.ctx.Log.Verbo("New tx detected, trying to generate a block") + vm.tryBlockGen() + case <-time.After(5 * time.Second): + vm.tryBlockGen() + } + } + }) + + return nil +} + +// Shutdown implements the snowman.ChainVM interface +func (vm *VM) Shutdown() { + if vm.ctx == nil { + return + } + + vm.writeBackMetadata() + vm.chain.Stop() +} + +// BuildBlock implements the snowman.ChainVM interface +func (vm *VM) BuildBlock() (snowman.Block, error) { + vm.chain.GenBlock() + block := <-vm.newBlockChan + if block == nil { + return nil, errCreateBlock + } + // reset the min block time timer + vm.bdlock.Lock() + vm.bdTimerState = bdTimerStateMin + vm.bdGenWaitFlag = false + vm.bdGenFlag = false + vm.blockDelayTimer.SetTimeoutIn(minBlockTime) + vm.bdlock.Unlock() + + vm.ctx.Log.Debug("built block 0x%x", block.ID().Bytes()) + // make sure Tx Pool is updated + <-vm.txPoolStabilizedOk + return block, nil +} + +// ParseBlock implements the snowman.ChainVM interface +func (vm *VM) ParseBlock(b []byte) (snowman.Block, error) { + vm.metalock.Lock() + defer vm.metalock.Unlock() + + ethBlock := new(types.Block) + if err := rlp.DecodeBytes(b, ethBlock); err != nil { + return nil, err + } + block := &Block{ + id: ids.NewID(ethBlock.Hash()), + ethBlock: ethBlock, + vm: vm, + } + vm.blockCache.Put(block.ID(), block) + return block, nil +} + +// GetBlock implements the snowman.ChainVM interface +func (vm *VM) GetBlock(id ids.ID) (snowman.Block, error) { + vm.metalock.Lock() + defer vm.metalock.Unlock() + + block := vm.getBlock(id) + if block == nil { + return nil, errUnknownBlock + } + return block, nil +} + +// SetPreference sets what the current tail of the chain is +func (vm *VM) SetPreference(blkID ids.ID) { + err := vm.chain.SetTail(blkID.Key()) + vm.ctx.Log.AssertNoError(err) +} + +// LastAccepted returns the ID of the block that was last accepted +func (vm *VM) LastAccepted() ids.ID { + vm.metalock.Lock() + defer vm.metalock.Unlock() + + return vm.lastAccepted.ID() +} + +// CreateHandlers makes new http handlers that can handle API calls +func (vm *VM) CreateHandlers() map[string]*commonEng.HTTPHandler { + handler := vm.chain.NewRPCHandler() + vm.chain.AttachEthService(handler, []string{"eth", "personal", "txpool"}) + handler.RegisterName("net", &NetAPI{vm}) + handler.RegisterName("snowman", &SnowmanAPI{vm}) + handler.RegisterName("web3", &Web3API{}) + handler.RegisterName("debug", &DebugAPI{vm}) + + return map[string]*commonEng.HTTPHandler{ + "/rpc": &commonEng.HTTPHandler{LockOptions: commonEng.NoLock, Handler: handler}, + "/ws": &commonEng.HTTPHandler{LockOptions: commonEng.NoLock, Handler: handler.WebsocketHandler([]string{"*"})}, + } +} + +// CreateStaticHandlers makes new http handlers that can handle API calls +func (vm *VM) CreateStaticHandlers() map[string]*commonEng.HTTPHandler { + handler := rpc.NewServer() + handler.RegisterName("static", &StaticService{}) + return map[string]*commonEng.HTTPHandler{ + "/rpc": &commonEng.HTTPHandler{LockOptions: commonEng.NoLock, Handler: handler}, + "/ws": &commonEng.HTTPHandler{LockOptions: commonEng.NoLock, Handler: handler.WebsocketHandler([]string{"*"})}, + } +} + +/* + ****************************************************************************** + *********************************** Helpers ********************************** + ****************************************************************************** + */ + +func (vm *VM) updateStatus(blockID ids.ID, status choices.Status) { + vm.metalock.Lock() + defer vm.metalock.Unlock() + + if status == choices.Accepted { + vm.lastAccepted = vm.getBlock(blockID) + // TODO: improve this naive implementation + if atomic.SwapUint32(&vm.writingMetadata, 1) == 0 { + go vm.ctx.Log.RecoverAndPanic(vm.writeBackMetadata) + } + } + vm.blockStatusCache.Put(blockID, status) +} + +func (vm *VM) getCachedBlock(blockID ids.ID) *types.Block { + return vm.chain.GetBlockByHash(blockID.Key()) +} + +func (vm *VM) tryBlockGen() error { + vm.bdlock.Lock() + defer vm.bdlock.Unlock() + if vm.bdGenFlag { + // skip if one call already generates a block in this round + return nil + } + vm.bdGenWaitFlag = true + + vm.genlock.Lock() + defer vm.genlock.Unlock() + // get pending size + size, err := vm.chain.PendingSize() + if err != nil { + return err + } + if size == 0 { + return nil + } + + switch vm.bdTimerState { + case bdTimerStateMin: + return nil + case bdTimerStateMax: + if size < batchSize { + return nil + } + case bdTimerStateLong: + // timeout; go ahead and generate a new block anyway + } + select { + case vm.networkChan <- commonEng.PendingTxs: + // successfully push out the notification; this round ends + vm.bdGenFlag = true + default: + return errBlockFrequency + } + return nil +} + +func (vm *VM) getCachedStatus(blockID ids.ID) choices.Status { + vm.metalock.Lock() + defer vm.metalock.Unlock() + status := choices.Processing + + if statusIntf, ok := vm.blockStatusCache.Get(blockID); ok { + status = statusIntf.(choices.Status) + } else { + blk := vm.chain.GetBlockByHash(blockID.Key()) + if blk == nil { + return choices.Unknown + } + acceptedBlk := vm.lastAccepted.ethBlock + + // TODO: There must be a better way of doing this. + // Traverse up the chain from the lower block until the indices match + highBlock := blk + lowBlock := acceptedBlk + if highBlock.Number().Cmp(lowBlock.Number()) < 0 { + highBlock, lowBlock = lowBlock, highBlock + } + for highBlock.Number().Cmp(lowBlock.Number()) > 0 { + highBlock = vm.chain.GetBlockByHash(highBlock.ParentHash()) + } + + if highBlock.Hash() == lowBlock.Hash() { // on the same branch + if blk.Number().Cmp(acceptedBlk.Number()) <= 0 { + status = choices.Accepted + } + } else { // on different branches + status = choices.Rejected + } + } + + vm.blockStatusCache.Put(blockID, status) + return status +} + +func (vm *VM) getBlock(id ids.ID) *Block { + if blockIntf, ok := vm.blockCache.Get(id); ok { + return blockIntf.(*Block) + } + ethBlock := vm.getCachedBlock(id) + if ethBlock == nil { + return nil + } + block := &Block{ + id: ids.NewID(ethBlock.Hash()), + ethBlock: ethBlock, + vm: vm, + } + vm.blockCache.Put(id, block) + return block +} + +func (vm *VM) issueRemoteTxs(txs []*types.Transaction) error { + errs := vm.chain.AddRemoteTxs(txs) + for _, err := range errs { + if err != nil { + return err + } + } + return vm.tryBlockGen() +} + +func (vm *VM) writeBackMetadata() { + vm.metalock.Lock() + defer vm.metalock.Unlock() + + b, err := rlp.EncodeToBytes(vm.lastAccepted.ethBlock.Hash()) + if err != nil { + vm.ctx.Log.Error("snowman-eth: error while writing back metadata") + return + } + vm.ctx.Log.Debug("writing back metadata") + vm.chaindb.Put([]byte(lastAcceptedKey), b) + atomic.StoreUint32(&vm.writingMetadata, 0) +} + +func (vm *VM) getLastAccepted() *Block { + vm.metalock.Lock() + defer vm.metalock.Unlock() + + return vm.lastAccepted +} |