From bc43122f54ed8de21f74ee6393549c9554d732e8 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 1 Oct 2019 13:28:55 -0400 Subject: support "accepted" as block number in JSON-RPC --- eth/filters/api.go | 576 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 576 insertions(+) create mode 100644 eth/filters/api.go (limited to 'eth/filters/api.go') diff --git a/eth/filters/api.go b/eth/filters/api.go new file mode 100644 index 0000000..9d9e757 --- /dev/null +++ b/eth/filters/api.go @@ -0,0 +1,576 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package filters + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/big" + "sync" + "time" + + ethereum "github.com/ava-labs/go-ethereum" + "github.com/ava-labs/go-ethereum/common" + "github.com/ava-labs/go-ethereum/common/hexutil" + "github.com/ava-labs/go-ethereum/core/types" + "github.com/ava-labs/go-ethereum/ethdb" + "github.com/ava-labs/go-ethereum/event" + "github.com/ava-labs/go-ethereum/rpc" +) + +var ( + deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline +) + +// filter is a helper struct that holds meta information over the filter type +// and associated subscription in the event system. +type filter struct { + typ Type + deadline *time.Timer // filter is inactiv when deadline triggers + hashes []common.Hash + crit FilterCriteria + logs []*types.Log + s *Subscription // associated subscription in event system +} + +// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various +// information related to the Ethereum protocol such als blocks, transactions and logs. +type PublicFilterAPI struct { + backend Backend + mux *event.TypeMux + quit chan struct{} + chainDb ethdb.Database + events *EventSystem + filtersMu sync.Mutex + filters map[rpc.ID]*filter +} + +// NewPublicFilterAPI returns a new PublicFilterAPI instance. +func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { + api := &PublicFilterAPI{ + backend: backend, + mux: backend.EventMux(), + chainDb: backend.ChainDb(), + events: NewEventSystem(backend.EventMux(), backend, lightMode), + filters: make(map[rpc.ID]*filter), + } + go api.timeoutLoop() + + return api +} + +// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used. +// Tt is started when the api is created. +func (api *PublicFilterAPI) timeoutLoop() { + ticker := time.NewTicker(5 * time.Minute) + for { + <-ticker.C + api.filtersMu.Lock() + for id, f := range api.filters { + select { + case <-f.deadline.C: + f.s.Unsubscribe() + delete(api.filters, id) + default: + continue + } + } + api.filtersMu.Unlock() + } +} + +// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes +// as transactions enter the pending state. +// +// It is part of the filter package because this filter can be used through the +// `eth_getFilterChanges` polling method that is also used for log filters. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter +func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { + var ( + pendingTxs = make(chan []common.Hash) + pendingTxSub = api.events.SubscribePendingTxs(pendingTxs) + ) + + api.filtersMu.Lock() + api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub} + api.filtersMu.Unlock() + + go func() { + for { + select { + case ph := <-pendingTxs: + api.filtersMu.Lock() + if f, found := api.filters[pendingTxSub.ID]; found { + f.hashes = append(f.hashes, ph...) + } + api.filtersMu.Unlock() + case <-pendingTxSub.Err(): + api.filtersMu.Lock() + delete(api.filters, pendingTxSub.ID) + api.filtersMu.Unlock() + return + } + } + }() + + return pendingTxSub.ID +} + +// NewPendingTransactions creates a subscription that is triggered each time a transaction +// enters the transaction pool and was signed from one of the transactions this nodes manages. +func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + txHashes := make(chan []common.Hash, 128) + pendingTxSub := api.events.SubscribePendingTxs(txHashes) + + for { + select { + case hashes := <-txHashes: + // To keep the original behaviour, send a single tx hash in one notification. + // TODO(rjl493456442) Send a batch of tx hashes in one notification + for _, h := range hashes { + notifier.Notify(rpcSub.ID, h) + } + case <-rpcSub.Err(): + pendingTxSub.Unsubscribe() + return + case <-notifier.Closed(): + pendingTxSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + +// NewBlockFilter creates a filter that fetches blocks that are imported into the chain. +// It is part of the filter package since polling goes with eth_getFilterChanges. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter +func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { + var ( + headers = make(chan *types.Header) + headerSub = api.events.SubscribeNewHeads(headers) + ) + + api.filtersMu.Lock() + api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub} + api.filtersMu.Unlock() + + go func() { + for { + select { + case h := <-headers: + api.filtersMu.Lock() + if f, found := api.filters[headerSub.ID]; found { + f.hashes = append(f.hashes, h.Hash()) + } + api.filtersMu.Unlock() + case <-headerSub.Err(): + api.filtersMu.Lock() + delete(api.filters, headerSub.ID) + api.filtersMu.Unlock() + return + } + } + }() + + return headerSub.ID +} + +// NewHeads send a notification each time a new (header) block is appended to the chain. +func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + headers := make(chan *types.Header) + headersSub := api.events.SubscribeNewHeads(headers) + + for { + select { + case h := <-headers: + notifier.Notify(rpcSub.ID, h) + case <-rpcSub.Err(): + headersSub.Unsubscribe() + return + case <-notifier.Closed(): + headersSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + +// Logs creates a subscription that fires for all new log that match the given filter criteria. +func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + var ( + rpcSub = notifier.CreateSubscription() + matchedLogs = make(chan []*types.Log) + ) + + logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs) + if err != nil { + return nil, err + } + + go func() { + + for { + select { + case logs := <-matchedLogs: + for _, log := range logs { + notifier.Notify(rpcSub.ID, &log) + } + case <-rpcSub.Err(): // client send an unsubscribe request + logsSub.Unsubscribe() + return + case <-notifier.Closed(): // connection dropped + logsSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + +// FilterCriteria represents a request to create a new filter. +// Same as ethereum.FilterQuery but with UnmarshalJSON() method. +type FilterCriteria ethereum.FilterQuery + +// NewFilter creates a new filter and returns the filter id. It can be +// used to retrieve logs when the state changes. This method cannot be +// used to fetch logs that are already stored in the state. +// +// Default criteria for the from and to block are "latest". +// Using "latest" as block number will return logs for mined blocks. +// Using "pending" as block number returns logs for not yet mined (pending) blocks. +// In case logs are removed (chain reorg) previously returned logs are returned +// again but with the removed property set to true. +// +// In case "fromBlock" > "toBlock" an error is returned. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter +func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { + logs := make(chan []*types.Log) + logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), logs) + if err != nil { + return rpc.ID(""), err + } + + api.filtersMu.Lock() + api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]*types.Log, 0), s: logsSub} + api.filtersMu.Unlock() + + go func() { + for { + select { + case l := <-logs: + api.filtersMu.Lock() + if f, found := api.filters[logsSub.ID]; found { + f.logs = append(f.logs, l...) + } + api.filtersMu.Unlock() + case <-logsSub.Err(): + api.filtersMu.Lock() + delete(api.filters, logsSub.ID) + api.filtersMu.Unlock() + return + } + } + }() + + return logsSub.ID, nil +} + +// GetLogs returns logs matching the given argument that are stored within the state. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs +func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) { + var filter *Filter + if crit.BlockHash != nil { + // Block filter requested, construct a single-shot filter + filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics) + } else { + // Convert the RPC block numbers into internal representations + begin := rpc.LatestBlockNumber.Int64() + if crit.FromBlock != nil { + begin = crit.FromBlock.Int64() + } + end := rpc.LatestBlockNumber.Int64() + if crit.ToBlock != nil { + end = crit.ToBlock.Int64() + } + // Construct the range filter + filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics) + } + // Run the filter and return all the logs + logs, err := filter.Logs(ctx) + if err != nil { + return nil, err + } + return returnLogs(logs), err +} + +// UninstallFilter removes the filter with the given filter id. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter +func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool { + api.filtersMu.Lock() + f, found := api.filters[id] + if found { + delete(api.filters, id) + } + api.filtersMu.Unlock() + if found { + f.s.Unsubscribe() + } + + return found +} + +// GetFilterLogs returns the logs for the filter with the given id. +// If the filter could not be found an empty array of logs is returned. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs +func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Log, error) { + api.filtersMu.Lock() + f, found := api.filters[id] + api.filtersMu.Unlock() + + if !found || f.typ != LogsSubscription { + return nil, fmt.Errorf("filter not found") + } + + var filter *Filter + if f.crit.BlockHash != nil { + // Block filter requested, construct a single-shot filter + filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics) + } else { + // Convert the RPC block numbers into internal representations + begin := rpc.LatestBlockNumber.Int64() + if f.crit.FromBlock != nil { + begin = f.crit.FromBlock.Int64() + } + end := rpc.LatestBlockNumber.Int64() + if f.crit.ToBlock != nil { + end = f.crit.ToBlock.Int64() + } + // Construct the range filter + filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics) + } + // Run the filter and return all the logs + logs, err := filter.Logs(ctx) + if err != nil { + return nil, err + } + return returnLogs(logs), nil +} + +// GetFilterChanges returns the logs for the filter with the given id since +// last time it was called. This can be used for polling. +// +// For pending transaction and block filters the result is []common.Hash. +// (pending)Log filters return []Log. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges +func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { + api.filtersMu.Lock() + defer api.filtersMu.Unlock() + + if f, found := api.filters[id]; found { + if !f.deadline.Stop() { + // timer expired but filter is not yet removed in timeout loop + // receive timer value and reset timer + <-f.deadline.C + } + f.deadline.Reset(deadline) + + switch f.typ { + case PendingTransactionsSubscription, BlocksSubscription: + hashes := f.hashes + f.hashes = nil + return returnHashes(hashes), nil + case LogsSubscription: + logs := f.logs + f.logs = nil + return returnLogs(logs), nil + } + } + + return []interface{}{}, fmt.Errorf("filter not found") +} + +// returnHashes is a helper that will return an empty hash array case the given hash array is nil, +// otherwise the given hashes array is returned. +func returnHashes(hashes []common.Hash) []common.Hash { + if hashes == nil { + return []common.Hash{} + } + return hashes +} + +// returnLogs is a helper that will return an empty log array in case the given logs array is nil, +// otherwise the given logs array is returned. +func returnLogs(logs []*types.Log) []*types.Log { + if logs == nil { + return []*types.Log{} + } + return logs +} + +// UnmarshalJSON sets *args fields with given data. +func (args *FilterCriteria) UnmarshalJSON(data []byte) error { + type input struct { + BlockHash *common.Hash `json:"blockHash"` + FromBlock *rpc.BlockNumber `json:"fromBlock"` + ToBlock *rpc.BlockNumber `json:"toBlock"` + Addresses interface{} `json:"address"` + Topics []interface{} `json:"topics"` + } + + var raw input + if err := json.Unmarshal(data, &raw); err != nil { + return err + } + + if raw.BlockHash != nil { + if raw.FromBlock != nil || raw.ToBlock != nil { + // BlockHash is mutually exclusive with FromBlock/ToBlock criteria + return fmt.Errorf("cannot specify both BlockHash and FromBlock/ToBlock, choose one or the other") + } + args.BlockHash = raw.BlockHash + } else { + if raw.FromBlock != nil { + args.FromBlock = big.NewInt(raw.FromBlock.Int64()) + } + + if raw.ToBlock != nil { + args.ToBlock = big.NewInt(raw.ToBlock.Int64()) + } + } + + args.Addresses = []common.Address{} + + if raw.Addresses != nil { + // raw.Address can contain a single address or an array of addresses + switch rawAddr := raw.Addresses.(type) { + case []interface{}: + for i, addr := range rawAddr { + if strAddr, ok := addr.(string); ok { + addr, err := decodeAddress(strAddr) + if err != nil { + return fmt.Errorf("invalid address at index %d: %v", i, err) + } + args.Addresses = append(args.Addresses, addr) + } else { + return fmt.Errorf("non-string address at index %d", i) + } + } + case string: + addr, err := decodeAddress(rawAddr) + if err != nil { + return fmt.Errorf("invalid address: %v", err) + } + args.Addresses = []common.Address{addr} + default: + return errors.New("invalid addresses in query") + } + } + + // topics is an array consisting of strings and/or arrays of strings. + // JSON null values are converted to common.Hash{} and ignored by the filter manager. + if len(raw.Topics) > 0 { + args.Topics = make([][]common.Hash, len(raw.Topics)) + for i, t := range raw.Topics { + switch topic := t.(type) { + case nil: + // ignore topic when matching logs + + case string: + // match specific topic + top, err := decodeTopic(topic) + if err != nil { + return err + } + args.Topics[i] = []common.Hash{top} + + case []interface{}: + // or case e.g. [null, "topic0", "topic1"] + for _, rawTopic := range topic { + if rawTopic == nil { + // null component, match all + args.Topics[i] = nil + break + } + if topic, ok := rawTopic.(string); ok { + parsed, err := decodeTopic(topic) + if err != nil { + return err + } + args.Topics[i] = append(args.Topics[i], parsed) + } else { + return fmt.Errorf("invalid topic(s)") + } + } + default: + return fmt.Errorf("invalid topic(s)") + } + } + } + + return nil +} + +func decodeAddress(s string) (common.Address, error) { + b, err := hexutil.Decode(s) + if err == nil && len(b) != common.AddressLength { + err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for address", len(b), common.AddressLength) + } + return common.BytesToAddress(b), err +} + +func decodeTopic(s string) (common.Hash, error) { + b, err := hexutil.Decode(s) + if err == nil && len(b) != common.HashLength { + err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for topic", len(b), common.HashLength) + } + return common.BytesToHash(b), err +} -- cgit v1.2.3