From d235e2c6a5788ec4a6cff15a16f56b38a3876a0d Mon Sep 17 00:00:00 2001 From: Determinant Date: Sun, 28 Jun 2020 14:47:41 -0400 Subject: ... --- rpc/client.go | 625 ++++++++++++++++++++++++++++++++++++++++++++ rpc/client_example_test.go | 88 +++++++ rpc/client_test.go | 569 ++++++++++++++++++++++++++++++++++++++++ rpc/constants_unix_nocgo.go | 25 ++ rpc/doc.go | 118 +++++++++ rpc/errors.go | 65 +++++ rpc/gzip.go | 66 +++++ rpc/handler.go | 397 ++++++++++++++++++++++++++++ rpc/http.go | 359 +++++++++++++++++++++++++ rpc/http_test.go | 54 ++++ rpc/ipc_js.go | 37 +++ rpc/json.go | 335 ++++++++++++++++++++++++ rpc/server.go | 147 +++++++++++ rpc/server_test.go | 152 +++++++++++ rpc/service.go | 285 ++++++++++++++++++++ rpc/subscription.go | 327 +++++++++++++++++++++++ rpc/subscription_test.go | 206 +++++++++++++++ rpc/testservice_test.go | 180 +++++++++++++ rpc/types.go | 34 +++ rpc/types_test.go | 66 +++++ rpc/websocket.go | 175 +++++++++++++ rpc/websocket_test.go | 259 ++++++++++++++++++ 22 files changed, 4569 insertions(+) create mode 100644 rpc/client.go create mode 100644 rpc/client_example_test.go create mode 100644 rpc/client_test.go create mode 100644 rpc/constants_unix_nocgo.go create mode 100644 rpc/doc.go create mode 100644 rpc/errors.go create mode 100644 rpc/gzip.go create mode 100644 rpc/handler.go create mode 100644 rpc/http.go create mode 100644 rpc/http_test.go create mode 100644 rpc/ipc_js.go create mode 100644 rpc/json.go create mode 100644 rpc/server.go create mode 100644 rpc/server_test.go create mode 100644 rpc/service.go create mode 100644 rpc/subscription.go create mode 100644 rpc/subscription_test.go create mode 100644 rpc/testservice_test.go create mode 100644 rpc/types_test.go create mode 100644 rpc/websocket.go create mode 100644 rpc/websocket_test.go (limited to 'rpc') diff --git a/rpc/client.go b/rpc/client.go new file mode 100644 index 0000000..1c7058b --- /dev/null +++ b/rpc/client.go @@ -0,0 +1,625 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "net/url" + "reflect" + "strconv" + "sync/atomic" + "time" + + "github.com/ava-labs/go-ethereum/log" +) + +var ( + ErrClientQuit = errors.New("client is closed") + ErrNoResult = errors.New("no result in JSON-RPC response") + ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow") + errClientReconnected = errors.New("client reconnected") + errDead = errors.New("connection lost") +) + +const ( + // Timeouts + defaultDialTimeout = 10 * time.Second // used if context has no deadline + subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls +) + +const ( + // Subscriptions are removed when the subscriber cannot keep up. + // + // This can be worked around by supplying a channel with sufficiently sized buffer, + // but this can be inconvenient and hard to explain in the docs. Another issue with + // buffered channels is that the buffer is static even though it might not be needed + // most of the time. + // + // The approach taken here is to maintain a per-subscription linked list buffer + // shrinks on demand. If the buffer reaches the size below, the subscription is + // dropped. + maxClientSubscriptionBuffer = 20000 +) + +// BatchElem is an element in a batch request. +type BatchElem struct { + Method string + Args []interface{} + // The result is unmarshaled into this field. Result must be set to a + // non-nil pointer value of the desired type, otherwise the response will be + // discarded. + Result interface{} + // Error is set if the server returns an error for this request, or if + // unmarshaling into Result fails. It is not set for I/O errors. + Error error +} + +// Client represents a connection to an RPC server. +type Client struct { + idgen func() ID // for subscriptions + isHTTP bool + services *serviceRegistry + + idCounter uint32 + + // This function, if non-nil, is called when the connection is lost. + reconnectFunc reconnectFunc + + // writeConn is used for writing to the connection on the caller's goroutine. It should + // only be accessed outside of dispatch, with the write lock held. The write lock is + // taken by sending on requestOp and released by sending on sendDone. + writeConn jsonWriter + + // for dispatch + close chan struct{} + closing chan struct{} // closed when client is quitting + didClose chan struct{} // closed when client quits + reconnected chan ServerCodec // where write/reconnect sends the new connection + readOp chan readOp // read messages + readErr chan error // errors from read + reqInit chan *requestOp // register response IDs, takes write lock + reqSent chan error // signals write completion, releases write lock + reqTimeout chan *requestOp // removes response IDs when call timeout expires +} + +type reconnectFunc func(ctx context.Context) (ServerCodec, error) + +type clientContextKey struct{} + +type clientConn struct { + codec ServerCodec + handler *handler +} + +func (c *Client) newClientConn(conn ServerCodec) *clientConn { + ctx := context.WithValue(context.Background(), clientContextKey{}, c) + handler := newHandler(ctx, conn, c.idgen, c.services) + return &clientConn{conn, handler} +} + +func (cc *clientConn) close(err error, inflightReq *requestOp) { + cc.handler.close(err, inflightReq) + cc.codec.Close() +} + +type readOp struct { + msgs []*jsonrpcMessage + batch bool +} + +type requestOp struct { + ids []json.RawMessage + err error + resp chan *jsonrpcMessage // receives up to len(ids) responses + sub *ClientSubscription // only set for EthSubscribe requests +} + +func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, error) { + select { + case <-ctx.Done(): + // Send the timeout to dispatch so it can remove the request IDs. + if !c.isHTTP { + select { + case c.reqTimeout <- op: + case <-c.closing: + } + } + return nil, ctx.Err() + case resp := <-op.resp: + return resp, op.err + } +} + +// Dial creates a new client for the given URL. +// +// The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a +// file name with no URL scheme, a local socket connection is established using UNIX +// domain sockets on supported platforms and named pipes on Windows. If you want to +// configure transport options, use DialHTTP, DialWebsocket or DialIPC instead. +// +// For websocket connections, the origin is set to the local host name. +// +// The client reconnects automatically if the connection is lost. +func Dial(rawurl string) (*Client, error) { + return DialContext(context.Background(), rawurl) +} + +// DialContext creates a new RPC client, just like Dial. +// +// The context is used to cancel or time out the initial connection establishment. It does +// not affect subsequent interactions with the client. +func DialContext(ctx context.Context, rawurl string) (*Client, error) { + u, err := url.Parse(rawurl) + if err != nil { + return nil, err + } + switch u.Scheme { + case "http", "https": + return DialHTTP(rawurl) + case "ws", "wss": + return DialWebsocket(ctx, rawurl, "") + //case "stdio": + // return DialStdIO(ctx) + //case "": + // return DialIPC(ctx, rawurl) + default: + return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme) + } +} + +// Client retrieves the client from the context, if any. This can be used to perform +// 'reverse calls' in a handler method. +func ClientFromContext(ctx context.Context) (*Client, bool) { + client, ok := ctx.Value(clientContextKey{}).(*Client) + return client, ok +} + +func newClient(initctx context.Context, connect reconnectFunc) (*Client, error) { + conn, err := connect(initctx) + if err != nil { + return nil, err + } + c := initClient(conn, randomIDGenerator(), new(serviceRegistry)) + c.reconnectFunc = connect + return c, nil +} + +func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client { + _, isHTTP := conn.(*httpConn) + c := &Client{ + idgen: idgen, + isHTTP: isHTTP, + services: services, + writeConn: conn, + close: make(chan struct{}), + closing: make(chan struct{}), + didClose: make(chan struct{}), + reconnected: make(chan ServerCodec), + readOp: make(chan readOp), + readErr: make(chan error), + reqInit: make(chan *requestOp), + reqSent: make(chan error, 1), + reqTimeout: make(chan *requestOp), + } + if !isHTTP { + go c.dispatch(conn) + } + return c +} + +// RegisterName creates a service for the given receiver type under the given name. When no +// methods on the given receiver match the criteria to be either a RPC method or a +// subscription an error is returned. Otherwise a new service is created and added to the +// service collection this client provides to the server. +func (c *Client) RegisterName(name string, receiver interface{}) error { + return c.services.registerName(name, receiver) +} + +func (c *Client) nextID() json.RawMessage { + id := atomic.AddUint32(&c.idCounter, 1) + return strconv.AppendUint(nil, uint64(id), 10) +} + +// SupportedModules calls the rpc_modules method, retrieving the list of +// APIs that are available on the server. +func (c *Client) SupportedModules() (map[string]string, error) { + var result map[string]string + ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) + defer cancel() + err := c.CallContext(ctx, &result, "rpc_modules") + return result, err +} + +// Close closes the client, aborting any in-flight requests. +func (c *Client) Close() { + if c.isHTTP { + return + } + select { + case c.close <- struct{}{}: + <-c.didClose + case <-c.didClose: + } +} + +// Call performs a JSON-RPC call with the given arguments and unmarshals into +// result if no error occurred. +// +// The result must be a pointer so that package json can unmarshal into it. You +// can also pass nil, in which case the result is ignored. +func (c *Client) Call(result interface{}, method string, args ...interface{}) error { + ctx := context.Background() + return c.CallContext(ctx, result, method, args...) +} + +// CallContext performs a JSON-RPC call with the given arguments. If the context is +// canceled before the call has successfully returned, CallContext returns immediately. +// +// The result must be a pointer so that package json can unmarshal into it. You +// can also pass nil, in which case the result is ignored. +func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { + msg, err := c.newMessage(method, args...) + if err != nil { + return err + } + op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)} + + if c.isHTTP { + err = c.sendHTTP(ctx, op, msg) + } else { + err = c.send(ctx, op, msg) + } + if err != nil { + return err + } + + // dispatch has accepted the request and will close the channel when it quits. + switch resp, err := op.wait(ctx, c); { + case err != nil: + return err + case resp.Error != nil: + return resp.Error + case len(resp.Result) == 0: + return ErrNoResult + default: + return json.Unmarshal(resp.Result, &result) + } +} + +// BatchCall sends all given requests as a single batch and waits for the server +// to return a response for all of them. +// +// In contrast to Call, BatchCall only returns I/O errors. Any error specific to +// a request is reported through the Error field of the corresponding BatchElem. +// +// Note that batch calls may not be executed atomically on the server side. +func (c *Client) BatchCall(b []BatchElem) error { + ctx := context.Background() + return c.BatchCallContext(ctx, b) +} + +// BatchCall sends all given requests as a single batch and waits for the server +// to return a response for all of them. The wait duration is bounded by the +// context's deadline. +// +// In contrast to CallContext, BatchCallContext only returns errors that have occurred +// while sending the request. Any error specific to a request is reported through the +// Error field of the corresponding BatchElem. +// +// Note that batch calls may not be executed atomically on the server side. +func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { + msgs := make([]*jsonrpcMessage, len(b)) + op := &requestOp{ + ids: make([]json.RawMessage, len(b)), + resp: make(chan *jsonrpcMessage, len(b)), + } + for i, elem := range b { + msg, err := c.newMessage(elem.Method, elem.Args...) + if err != nil { + return err + } + msgs[i] = msg + op.ids[i] = msg.ID + } + + var err error + if c.isHTTP { + err = c.sendBatchHTTP(ctx, op, msgs) + } else { + err = c.send(ctx, op, msgs) + } + + // Wait for all responses to come back. + for n := 0; n < len(b) && err == nil; n++ { + var resp *jsonrpcMessage + resp, err = op.wait(ctx, c) + if err != nil { + break + } + // Find the element corresponding to this response. + // The element is guaranteed to be present because dispatch + // only sends valid IDs to our channel. + var elem *BatchElem + for i := range msgs { + if bytes.Equal(msgs[i].ID, resp.ID) { + elem = &b[i] + break + } + } + if resp.Error != nil { + elem.Error = resp.Error + continue + } + if len(resp.Result) == 0 { + elem.Error = ErrNoResult + continue + } + elem.Error = json.Unmarshal(resp.Result, elem.Result) + } + return err +} + +// Notify sends a notification, i.e. a method call that doesn't expect a response. +func (c *Client) Notify(ctx context.Context, method string, args ...interface{}) error { + op := new(requestOp) + msg, err := c.newMessage(method, args...) + if err != nil { + return err + } + msg.ID = nil + + if c.isHTTP { + return c.sendHTTP(ctx, op, msg) + } else { + return c.send(ctx, op, msg) + } +} + +// EthSubscribe registers a subscripion under the "eth" namespace. +func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { + return c.Subscribe(ctx, "eth", channel, args...) +} + +// ShhSubscribe registers a subscripion under the "shh" namespace. +func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { + return c.Subscribe(ctx, "shh", channel, args...) +} + +// Subscribe calls the "_subscribe" method with the given arguments, +// registering a subscription. Server notifications for the subscription are +// sent to the given channel. The element type of the channel must match the +// expected type of content returned by the subscription. +// +// The context argument cancels the RPC request that sets up the subscription but has no +// effect on the subscription after Subscribe has returned. +// +// Slow subscribers will be dropped eventually. Client buffers up to 20000 notifications +// before considering the subscriber dead. The subscription Err channel will receive +// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure +// that the channel usually has at least one reader to prevent this issue. +func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) { + // Check type of channel first. + chanVal := reflect.ValueOf(channel) + if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 { + panic("first argument to Subscribe must be a writable channel") + } + if chanVal.IsNil() { + panic("channel given to Subscribe must not be nil") + } + if c.isHTTP { + return nil, ErrNotificationsUnsupported + } + + msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...) + if err != nil { + return nil, err + } + op := &requestOp{ + ids: []json.RawMessage{msg.ID}, + resp: make(chan *jsonrpcMessage), + sub: newClientSubscription(c, namespace, chanVal), + } + + // Send the subscription request. + // The arrival and validity of the response is signaled on sub.quit. + if err := c.send(ctx, op, msg); err != nil { + return nil, err + } + if _, err := op.wait(ctx, c); err != nil { + return nil, err + } + return op.sub, nil +} + +func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) { + msg := &jsonrpcMessage{Version: vsn, ID: c.nextID(), Method: method} + if paramsIn != nil { // prevent sending "params":null + var err error + if msg.Params, err = json.Marshal(paramsIn); err != nil { + return nil, err + } + } + return msg, nil +} + +// send registers op with the dispatch loop, then sends msg on the connection. +// if sending fails, op is deregistered. +func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error { + select { + case c.reqInit <- op: + err := c.write(ctx, msg) + c.reqSent <- err + return err + case <-ctx.Done(): + // This can happen if the client is overloaded or unable to keep up with + // subscription notifications. + return ctx.Err() + case <-c.closing: + return ErrClientQuit + } +} + +func (c *Client) write(ctx context.Context, msg interface{}) error { + // The previous write failed. Try to establish a new connection. + if c.writeConn == nil { + if err := c.reconnect(ctx); err != nil { + return err + } + } + err := c.writeConn.Write(ctx, msg) + if err != nil { + c.writeConn = nil + } + return err +} + +func (c *Client) reconnect(ctx context.Context) error { + if c.reconnectFunc == nil { + return errDead + } + + if _, ok := ctx.Deadline(); !ok { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, defaultDialTimeout) + defer cancel() + } + newconn, err := c.reconnectFunc(ctx) + if err != nil { + log.Trace("RPC client reconnect failed", "err", err) + return err + } + select { + case c.reconnected <- newconn: + c.writeConn = newconn + return nil + case <-c.didClose: + newconn.Close() + return ErrClientQuit + } +} + +// dispatch is the main loop of the client. +// It sends read messages to waiting calls to Call and BatchCall +// and subscription notifications to registered subscriptions. +func (c *Client) dispatch(codec ServerCodec) { + var ( + lastOp *requestOp // tracks last send operation + reqInitLock = c.reqInit // nil while the send lock is held + conn = c.newClientConn(codec) + reading = true + ) + defer func() { + close(c.closing) + if reading { + conn.close(ErrClientQuit, nil) + c.drainRead() + } + close(c.didClose) + }() + + // Spawn the initial read loop. + go c.read(codec) + + for { + select { + case <-c.close: + return + + // Read path: + case op := <-c.readOp: + if op.batch { + conn.handler.handleBatch(op.msgs) + } else { + conn.handler.handleMsg(op.msgs[0]) + } + + case err := <-c.readErr: + conn.handler.log.Debug("RPC connection read error", "err", err) + conn.close(err, lastOp) + reading = false + + // Reconnect: + case newcodec := <-c.reconnected: + log.Debug("RPC client reconnected", "reading", reading, "conn", newcodec.RemoteAddr()) + if reading { + // Wait for the previous read loop to exit. This is a rare case which + // happens if this loop isn't notified in time after the connection breaks. + // In those cases the caller will notice first and reconnect. Closing the + // handler terminates all waiting requests (closing op.resp) except for + // lastOp, which will be transferred to the new handler. + conn.close(errClientReconnected, lastOp) + c.drainRead() + } + go c.read(newcodec) + reading = true + conn = c.newClientConn(newcodec) + // Re-register the in-flight request on the new handler + // because that's where it will be sent. + conn.handler.addRequestOp(lastOp) + + // Send path: + case op := <-reqInitLock: + // Stop listening for further requests until the current one has been sent. + reqInitLock = nil + lastOp = op + conn.handler.addRequestOp(op) + + case err := <-c.reqSent: + if err != nil { + // Remove response handlers for the last send. When the read loop + // goes down, it will signal all other current operations. + conn.handler.removeRequestOp(lastOp) + } + // Let the next request in. + reqInitLock = c.reqInit + lastOp = nil + + case op := <-c.reqTimeout: + conn.handler.removeRequestOp(op) + } + } +} + +// drainRead drops read messages until an error occurs. +func (c *Client) drainRead() { + for { + select { + case <-c.readOp: + case <-c.readErr: + return + } + } +} + +// read decodes RPC messages from a codec, feeding them into dispatch. +func (c *Client) read(codec ServerCodec) { + for { + msgs, batch, err := codec.Read() + if _, ok := err.(*json.SyntaxError); ok { + codec.Write(context.Background(), errorMessage(&parseError{err.Error()})) + } + if err != nil { + c.readErr <- err + return + } + c.readOp <- readOp{msgs, batch} + } +} diff --git a/rpc/client_example_test.go b/rpc/client_example_test.go new file mode 100644 index 0000000..149de2c --- /dev/null +++ b/rpc/client_example_test.go @@ -0,0 +1,88 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc_test + +import ( + "context" + "fmt" + "math/big" + "time" + + "github.com/ava-labs/go-ethereum/rpc" +) + +// In this example, our client wishes to track the latest 'block number' +// known to the server. The server supports two methods: +// +// eth_getBlockByNumber("latest", {}) +// returns the latest block object. +// +// eth_subscribe("newBlocks") +// creates a subscription which fires block objects when new blocks arrive. + +type Block struct { + Number *big.Int +} + +func ExampleClientSubscription() { + // Connect the client. + client, _ := rpc.Dial("ws://127.0.0.1:8485") + subch := make(chan Block) + + // Ensure that subch receives the latest block. + go func() { + for i := 0; ; i++ { + if i > 0 { + time.Sleep(2 * time.Second) + } + subscribeBlocks(client, subch) + } + }() + + // Print events from the subscription as they arrive. + for block := range subch { + fmt.Println("latest block:", block.Number) + } +} + +// subscribeBlocks runs in its own goroutine and maintains +// a subscription for new blocks. +func subscribeBlocks(client *rpc.Client, subch chan Block) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Subscribe to new blocks. + sub, err := client.EthSubscribe(ctx, subch, "newHeads") + if err != nil { + fmt.Println("subscribe error:", err) + return + } + + // The connection is established now. + // Update the channel with the current block. + var lastBlock Block + if err := client.CallContext(ctx, &lastBlock, "eth_getBlockByNumber", "latest"); err != nil { + fmt.Println("can't get latest block:", err) + return + } + subch <- lastBlock + + // The subscription will deliver events to the channel. Wait for the + // subscription to end for any reason, then loop around to re-establish + // the connection. + fmt.Println("connection lost: ", <-sub.Err()) +} diff --git a/rpc/client_test.go b/rpc/client_test.go new file mode 100644 index 0000000..79ea32e --- /dev/null +++ b/rpc/client_test.go @@ -0,0 +1,569 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "context" + "fmt" + "math/rand" + "net" + "net/http" + "net/http/httptest" + "os" + "reflect" + "runtime" + "sync" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/ava-labs/go-ethereum/log" +) + +func TestClientRequest(t *testing.T) { + server := newTestServer() + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + var resp Result + if err := client.Call(&resp, "test_echo", "hello", 10, &Args{"world"}); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(resp, Result{"hello", 10, &Args{"world"}}) { + t.Errorf("incorrect result %#v", resp) + } +} + +func TestClientBatchRequest(t *testing.T) { + server := newTestServer() + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + batch := []BatchElem{ + { + Method: "test_echo", + Args: []interface{}{"hello", 10, &Args{"world"}}, + Result: new(Result), + }, + { + Method: "test_echo", + Args: []interface{}{"hello2", 11, &Args{"world"}}, + Result: new(Result), + }, + { + Method: "no_such_method", + Args: []interface{}{1, 2, 3}, + Result: new(int), + }, + } + if err := client.BatchCall(batch); err != nil { + t.Fatal(err) + } + wantResult := []BatchElem{ + { + Method: "test_echo", + Args: []interface{}{"hello", 10, &Args{"world"}}, + Result: &Result{"hello", 10, &Args{"world"}}, + }, + { + Method: "test_echo", + Args: []interface{}{"hello2", 11, &Args{"world"}}, + Result: &Result{"hello2", 11, &Args{"world"}}, + }, + { + Method: "no_such_method", + Args: []interface{}{1, 2, 3}, + Result: new(int), + Error: &jsonError{Code: -32601, Message: "the method no_such_method does not exist/is not available"}, + }, + } + if !reflect.DeepEqual(batch, wantResult) { + t.Errorf("batch results mismatch:\ngot %swant %s", spew.Sdump(batch), spew.Sdump(wantResult)) + } +} + +func TestClientNotify(t *testing.T) { + server := newTestServer() + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + if err := client.Notify(context.Background(), "test_echo", "hello", 10, &Args{"world"}); err != nil { + t.Fatal(err) + } +} + +// func TestClientCancelInproc(t *testing.T) { testClientCancel("inproc", t) } +func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) } +func TestClientCancelHTTP(t *testing.T) { testClientCancel("http", t) } +func TestClientCancelIPC(t *testing.T) { testClientCancel("ipc", t) } + +// This test checks that requests made through CallContext can be canceled by canceling +// the context. +func testClientCancel(transport string, t *testing.T) { + // These tests take a lot of time, run them all at once. + // You probably want to run with -parallel 1 or comment out + // the call to t.Parallel if you enable the logging. + t.Parallel() + + server := newTestServer() + defer server.Stop() + + // What we want to achieve is that the context gets canceled + // at various stages of request processing. The interesting cases + // are: + // - cancel during dial + // - cancel while performing a HTTP request + // - cancel while waiting for a response + // + // To trigger those, the times are chosen such that connections + // are killed within the deadline for every other call (maxKillTimeout + // is 2x maxCancelTimeout). + // + // Once a connection is dead, there is a fair chance it won't connect + // successfully because the accept is delayed by 1s. + maxContextCancelTimeout := 300 * time.Millisecond + fl := &flakeyListener{ + maxAcceptDelay: 1 * time.Second, + maxKillTimeout: 600 * time.Millisecond, + } + + var client *Client + switch transport { + case "ws", "http": + c, hs := httpTestClient(server, transport, fl) + defer hs.Close() + client = c + case "ipc": + c, l := ipcTestClient(server, fl) + defer l.Close() + client = c + default: + panic("unknown transport: " + transport) + } + + // The actual test starts here. + var ( + wg sync.WaitGroup + nreqs = 10 + ncallers = 6 + ) + caller := func(index int) { + defer wg.Done() + for i := 0; i < nreqs; i++ { + var ( + ctx context.Context + cancel func() + timeout = time.Duration(rand.Int63n(int64(maxContextCancelTimeout))) + ) + if index < ncallers/2 { + // For half of the callers, create a context without deadline + // and cancel it later. + ctx, cancel = context.WithCancel(context.Background()) + time.AfterFunc(timeout, cancel) + } else { + // For the other half, create a context with a deadline instead. This is + // different because the context deadline is used to set the socket write + // deadline. + ctx, cancel = context.WithTimeout(context.Background(), timeout) + } + // Now perform a call with the context. + // The key thing here is that no call will ever complete successfully. + sleepTime := maxContextCancelTimeout + 20*time.Millisecond + err := client.CallContext(ctx, nil, "test_sleep", sleepTime) + if err != nil { + log.Debug(fmt.Sprint("got expected error:", err)) + } else { + t.Errorf("no error for call with %v wait time", timeout) + } + cancel() + } + } + wg.Add(ncallers) + for i := 0; i < ncallers; i++ { + go caller(i) + } + wg.Wait() +} + +func TestClientSubscribeInvalidArg(t *testing.T) { + server := newTestServer() + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + check := func(shouldPanic bool, arg interface{}) { + defer func() { + err := recover() + if shouldPanic && err == nil { + t.Errorf("EthSubscribe should've panicked for %#v", arg) + } + if !shouldPanic && err != nil { + t.Errorf("EthSubscribe shouldn't have panicked for %#v", arg) + buf := make([]byte, 1024*1024) + buf = buf[:runtime.Stack(buf, false)] + t.Error(err) + t.Error(string(buf)) + } + }() + client.EthSubscribe(context.Background(), arg, "foo_bar") + } + check(true, nil) + check(true, 1) + check(true, (chan int)(nil)) + check(true, make(<-chan int)) + check(false, make(chan int)) + check(false, make(chan<- int)) +} + +func TestClientSubscribe(t *testing.T) { + server := newTestServer() + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + nc := make(chan int) + count := 10 + sub, err := client.Subscribe(context.Background(), "nftest", nc, "someSubscription", count, 0) + if err != nil { + t.Fatal("can't subscribe:", err) + } + for i := 0; i < count; i++ { + if val := <-nc; val != i { + t.Fatalf("value mismatch: got %d, want %d", val, i) + } + } + + sub.Unsubscribe() + select { + case v := <-nc: + t.Fatal("received value after unsubscribe:", v) + case err := <-sub.Err(): + if err != nil { + t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err) + } + case <-time.After(1 * time.Second): + t.Fatalf("subscription not closed within 1s after unsubscribe") + } +} + +// In this test, the connection drops while Subscribe is waiting for a response. +func TestClientSubscribeClose(t *testing.T) { + server := newTestServer() + service := ¬ificationTestService{ + gotHangSubscriptionReq: make(chan struct{}), + unblockHangSubscription: make(chan struct{}), + } + if err := server.RegisterName("nftest2", service); err != nil { + t.Fatal(err) + } + + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + var ( + nc = make(chan int) + errc = make(chan error) + sub *ClientSubscription + err error + ) + go func() { + sub, err = client.Subscribe(context.Background(), "nftest2", nc, "hangSubscription", 999) + errc <- err + }() + + <-service.gotHangSubscriptionReq + client.Close() + service.unblockHangSubscription <- struct{}{} + + select { + case err := <-errc: + if err == nil { + t.Errorf("Subscribe returned nil error after Close") + } + if sub != nil { + t.Error("Subscribe returned non-nil subscription after Close") + } + case <-time.After(1 * time.Second): + t.Fatalf("Subscribe did not return within 1s after Close") + } +} + +// This test reproduces https://github.com/ethereum/go-ethereum/issues/17837 where the +// client hangs during shutdown when Unsubscribe races with Client.Close. +func TestClientCloseUnsubscribeRace(t *testing.T) { + server := newTestServer() + defer server.Stop() + + for i := 0; i < 20; i++ { + client := DialInProc(server) + nc := make(chan int) + sub, err := client.Subscribe(context.Background(), "nftest", nc, "someSubscription", 3, 1) + if err != nil { + t.Fatal(err) + } + go client.Close() + go sub.Unsubscribe() + select { + case <-sub.Err(): + case <-time.After(5 * time.Second): + t.Fatal("subscription not closed within timeout") + } + } +} + +// This test checks that Client doesn't lock up when a single subscriber +// doesn't read subscription events. +func TestClientNotificationStorm(t *testing.T) { + server := newTestServer() + defer server.Stop() + + doTest := func(count int, wantError bool) { + client := DialInProc(server) + defer client.Close() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Subscribe on the server. It will start sending many notifications + // very quickly. + nc := make(chan int) + sub, err := client.Subscribe(ctx, "nftest", nc, "someSubscription", count, 0) + if err != nil { + t.Fatal("can't subscribe:", err) + } + defer sub.Unsubscribe() + + // Process each notification, try to run a call in between each of them. + for i := 0; i < count; i++ { + select { + case val := <-nc: + if val != i { + t.Fatalf("(%d/%d) unexpected value %d", i, count, val) + } + case err := <-sub.Err(): + if wantError && err != ErrSubscriptionQueueOverflow { + t.Fatalf("(%d/%d) got error %q, want %q", i, count, err, ErrSubscriptionQueueOverflow) + } else if !wantError { + t.Fatalf("(%d/%d) got unexpected error %q", i, count, err) + } + return + } + var r int + err := client.CallContext(ctx, &r, "nftest_echo", i) + if err != nil { + if !wantError { + t.Fatalf("(%d/%d) call error: %v", i, count, err) + } + return + } + } + if wantError { + t.Fatalf("didn't get expected error") + } + } + + doTest(8000, false) + doTest(21000, true) +} + +func TestClientHTTP(t *testing.T) { + server := newTestServer() + defer server.Stop() + + client, hs := httpTestClient(server, "http", nil) + defer hs.Close() + defer client.Close() + + // Launch concurrent requests. + var ( + results = make([]Result, 100) + errc = make(chan error) + wantResult = Result{"a", 1, new(Args)} + ) + defer client.Close() + for i := range results { + i := i + go func() { + errc <- client.Call(&results[i], "test_echo", + wantResult.String, wantResult.Int, wantResult.Args) + }() + } + + // Wait for all of them to complete. + timeout := time.NewTimer(5 * time.Second) + defer timeout.Stop() + for i := range results { + select { + case err := <-errc: + if err != nil { + t.Fatal(err) + } + case <-timeout.C: + t.Fatalf("timeout (got %d/%d) results)", i+1, len(results)) + } + } + + // Check results. + for i := range results { + if !reflect.DeepEqual(results[i], wantResult) { + t.Errorf("result %d mismatch: got %#v, want %#v", i, results[i], wantResult) + } + } +} + +func TestClientReconnect(t *testing.T) { + startServer := func(addr string) (*Server, net.Listener) { + srv := newTestServer() + l, err := net.Listen("tcp", addr) + if err != nil { + t.Fatal("can't listen:", err) + } + go http.Serve(l, srv.WebsocketHandler([]string{"*"})) + return srv, l + } + + ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second) + defer cancel() + + // Start a server and corresponding client. + s1, l1 := startServer("127.0.0.1:0") + client, err := DialContext(ctx, "ws://"+l1.Addr().String()) + if err != nil { + t.Fatal("can't dial", err) + } + + // Perform a call. This should work because the server is up. + var resp Result + if err := client.CallContext(ctx, &resp, "test_echo", "", 1, nil); err != nil { + t.Fatal(err) + } + + // Shut down the server and allow for some cool down time so we can listen on the same + // address again. + l1.Close() + s1.Stop() + time.Sleep(2 * time.Second) + + // Try calling again. It shouldn't work. + if err := client.CallContext(ctx, &resp, "test_echo", "", 2, nil); err == nil { + t.Error("successful call while the server is down") + t.Logf("resp: %#v", resp) + } + + // Start it up again and call again. The connection should be reestablished. + // We spawn multiple calls here to check whether this hangs somehow. + s2, l2 := startServer(l1.Addr().String()) + defer l2.Close() + defer s2.Stop() + + start := make(chan struct{}) + errors := make(chan error, 20) + for i := 0; i < cap(errors); i++ { + go func() { + <-start + var resp Result + errors <- client.CallContext(ctx, &resp, "test_echo", "", 3, nil) + }() + } + close(start) + errcount := 0 + for i := 0; i < cap(errors); i++ { + if err = <-errors; err != nil { + errcount++ + } + } + t.Logf("%d errors, last error: %v", errcount, err) + if errcount > 1 { + t.Errorf("expected one error after disconnect, got %d", errcount) + } +} + +func httpTestClient(srv *Server, transport string, fl *flakeyListener) (*Client, *httptest.Server) { + // Create the HTTP server. + var hs *httptest.Server + switch transport { + case "ws": + hs = httptest.NewUnstartedServer(srv.WebsocketHandler([]string{"*"})) + case "http": + hs = httptest.NewUnstartedServer(srv) + default: + panic("unknown HTTP transport: " + transport) + } + // Wrap the listener if required. + if fl != nil { + fl.Listener = hs.Listener + hs.Listener = fl + } + // Connect the client. + hs.Start() + client, err := Dial(transport + "://" + hs.Listener.Addr().String()) + if err != nil { + panic(err) + } + return client, hs +} + +func ipcTestClient(srv *Server, fl *flakeyListener) (*Client, net.Listener) { + // Listen on a random endpoint. + endpoint := fmt.Sprintf("go-ethereum-test-ipc-%d-%d", os.Getpid(), rand.Int63()) + if runtime.GOOS == "windows" { + endpoint = `\\.\pipe\` + endpoint + } else { + endpoint = os.TempDir() + "/" + endpoint + } + l, err := ipcListen(endpoint) + if err != nil { + panic(err) + } + // Connect the listener to the server. + if fl != nil { + fl.Listener = l + l = fl + } + go srv.ServeListener(l) + // Connect the client. + client, err := Dial(endpoint) + if err != nil { + panic(err) + } + return client, l +} + +// flakeyListener kills accepted connections after a random timeout. +type flakeyListener struct { + net.Listener + maxKillTimeout time.Duration + maxAcceptDelay time.Duration +} + +func (l *flakeyListener) Accept() (net.Conn, error) { + delay := time.Duration(rand.Int63n(int64(l.maxAcceptDelay))) + time.Sleep(delay) + + c, err := l.Listener.Accept() + if err == nil { + timeout := time.Duration(rand.Int63n(int64(l.maxKillTimeout))) + time.AfterFunc(timeout, func() { + log.Debug(fmt.Sprintf("killing conn %v after %v", c.LocalAddr(), timeout)) + c.Close() + }) + } + return c, err +} diff --git a/rpc/constants_unix_nocgo.go b/rpc/constants_unix_nocgo.go new file mode 100644 index 0000000..ecb231f --- /dev/null +++ b/rpc/constants_unix_nocgo.go @@ -0,0 +1,25 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// +build !cgo,!windows + +package rpc + +var ( + // On Linux, sun_path is 108 bytes in size + // see http://man7.org/linux/man-pages/man7/unix.7.html + max_path_size = 108 +) diff --git a/rpc/doc.go b/rpc/doc.go new file mode 100644 index 0000000..e5840c3 --- /dev/null +++ b/rpc/doc.go @@ -0,0 +1,118 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +/* + +Package rpc implements bi-directional JSON-RPC 2.0 on multiple transports. + +It provides access to the exported methods of an object across a network or other I/O +connection. After creating a server or client instance, objects can be registered to make +them visible as 'services'. Exported methods that follow specific conventions can be +called remotely. It also has support for the publish/subscribe pattern. + +RPC Methods + +Methods that satisfy the following criteria are made available for remote access: + + - method must be exported + - method returns 0, 1 (response or error) or 2 (response and error) values + - method argument(s) must be exported or builtin types + - method returned value(s) must be exported or builtin types + +An example method: + + func (s *CalcService) Add(a, b int) (int, error) + +When the returned error isn't nil the returned integer is ignored and the error is sent +back to the client. Otherwise the returned integer is sent back to the client. + +Optional arguments are supported by accepting pointer values as arguments. E.g. if we want +to do the addition in an optional finite field we can accept a mod argument as pointer +value. + + func (s *CalcService) Add(a, b int, mod *int) (int, error) + +This RPC method can be called with 2 integers and a null value as third argument. In that +case the mod argument will be nil. Or it can be called with 3 integers, in that case mod +will be pointing to the given third argument. Since the optional argument is the last +argument the RPC package will also accept 2 integers as arguments. It will pass the mod +argument as nil to the RPC method. + +The server offers the ServeCodec method which accepts a ServerCodec instance. It will read +requests from the codec, process the request and sends the response back to the client +using the codec. The server can execute requests concurrently. Responses can be sent back +to the client out of order. + +An example server which uses the JSON codec: + + type CalculatorService struct {} + + func (s *CalculatorService) Add(a, b int) int { + return a + b + } + + func (s *CalculatorService) Div(a, b int) (int, error) { + if b == 0 { + return 0, errors.New("divide by zero") + } + return a/b, nil + } + + calculator := new(CalculatorService) + server := NewServer() + server.RegisterName("calculator", calculator") + + l, _ := net.ListenUnix("unix", &net.UnixAddr{Net: "unix", Name: "/tmp/calculator.sock"}) + for { + c, _ := l.AcceptUnix() + codec := v2.NewJSONCodec(c) + go server.ServeCodec(codec, 0) + } + +Subscriptions + +The package also supports the publish subscribe pattern through the use of subscriptions. +A method that is considered eligible for notifications must satisfy the following +criteria: + + - method must be exported + - first method argument type must be context.Context + - method argument(s) must be exported or builtin types + - method must have return types (rpc.Subscription, error) + +An example method: + + func (s *BlockChainService) NewBlocks(ctx context.Context) (rpc.Subscription, error) { + ... + } + +When the service containing the subscription method is registered to the server, for +example under the "blockchain" namespace, a subscription is created by calling the +"blockchain_subscribe" method. + +Subscriptions are deleted when the user sends an unsubscribe request or when the +connection which was used to create the subscription is closed. This can be initiated by +the client and server. The server will close the connection for any write error. + +For more information about subscriptions, see https://github.com/ethereum/go-ethereum/wiki/RPC-PUB-SUB. + +Reverse Calls + +In any method handler, an instance of rpc.Client can be accessed through the +ClientFromContext method. Using this client instance, server-to-client method calls can be +performed on the RPC connection. +*/ +package rpc diff --git a/rpc/errors.go b/rpc/errors.go new file mode 100644 index 0000000..c3aa826 --- /dev/null +++ b/rpc/errors.go @@ -0,0 +1,65 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import "fmt" + +const defaultErrorCode = -32000 + +type methodNotFoundError struct{ method string } + +func (e *methodNotFoundError) ErrorCode() int { return -32601 } + +func (e *methodNotFoundError) Error() string { + return fmt.Sprintf("the method %s does not exist/is not available", e.method) +} + +type subscriptionNotFoundError struct{ namespace, subscription string } + +func (e *subscriptionNotFoundError) ErrorCode() int { return -32601 } + +func (e *subscriptionNotFoundError) Error() string { + return fmt.Sprintf("no %q subscription in %s namespace", e.subscription, e.namespace) +} + +// Invalid JSON was received by the server. +type parseError struct{ message string } + +func (e *parseError) ErrorCode() int { return -32700 } + +func (e *parseError) Error() string { return e.message } + +// received message isn't a valid request +type invalidRequestError struct{ message string } + +func (e *invalidRequestError) ErrorCode() int { return -32600 } + +func (e *invalidRequestError) Error() string { return e.message } + +// received message is invalid +type invalidMessageError struct{ message string } + +func (e *invalidMessageError) ErrorCode() int { return -32700 } + +func (e *invalidMessageError) Error() string { return e.message } + +// unable to decode supplied params, or an invalid number of parameters +type invalidParamsError struct{ message string } + +func (e *invalidParamsError) ErrorCode() int { return -32602 } + +func (e *invalidParamsError) Error() string { return e.message } diff --git a/rpc/gzip.go b/rpc/gzip.go new file mode 100644 index 0000000..a14fd09 --- /dev/null +++ b/rpc/gzip.go @@ -0,0 +1,66 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "compress/gzip" + "io" + "io/ioutil" + "net/http" + "strings" + "sync" +) + +var gzPool = sync.Pool{ + New: func() interface{} { + w := gzip.NewWriter(ioutil.Discard) + return w + }, +} + +type gzipResponseWriter struct { + io.Writer + http.ResponseWriter +} + +func (w *gzipResponseWriter) WriteHeader(status int) { + w.Header().Del("Content-Length") + w.ResponseWriter.WriteHeader(status) +} + +func (w *gzipResponseWriter) Write(b []byte) (int, error) { + return w.Writer.Write(b) +} + +func newGzipHandler(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { + next.ServeHTTP(w, r) + return + } + + w.Header().Set("Content-Encoding", "gzip") + + gz := gzPool.Get().(*gzip.Writer) + defer gzPool.Put(gz) + + gz.Reset(w) + defer gz.Close() + + next.ServeHTTP(&gzipResponseWriter{ResponseWriter: w, Writer: gz}, r) + }) +} diff --git a/rpc/handler.go b/rpc/handler.go new file mode 100644 index 0000000..187d0f8 --- /dev/null +++ b/rpc/handler.go @@ -0,0 +1,397 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "context" + "encoding/json" + "reflect" + "strconv" + "strings" + "sync" + "time" + + "github.com/ava-labs/go-ethereum/log" +) + +// handler handles JSON-RPC messages. There is one handler per connection. Note that +// handler is not safe for concurrent use. Message handling never blocks indefinitely +// because RPCs are processed on background goroutines launched by handler. +// +// The entry points for incoming messages are: +// +// h.handleMsg(message) +// h.handleBatch(message) +// +// Outgoing calls use the requestOp struct. Register the request before sending it +// on the connection: +// +// op := &requestOp{ids: ...} +// h.addRequestOp(op) +// +// Now send the request, then wait for the reply to be delivered through handleMsg: +// +// if err := op.wait(...); err != nil { +// h.removeRequestOp(op) // timeout, etc. +// } +// +type handler struct { + reg *serviceRegistry + unsubscribeCb *callback + idgen func() ID // subscription ID generator + respWait map[string]*requestOp // active client requests + clientSubs map[string]*ClientSubscription // active client subscriptions + callWG sync.WaitGroup // pending call goroutines + rootCtx context.Context // canceled by close() + cancelRoot func() // cancel function for rootCtx + conn jsonWriter // where responses will be sent + log log.Logger + allowSubscribe bool + + subLock sync.Mutex + serverSubs map[ID]*Subscription +} + +type callProc struct { + ctx context.Context + notifiers []*Notifier +} + +func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler { + rootCtx, cancelRoot := context.WithCancel(connCtx) + h := &handler{ + reg: reg, + idgen: idgen, + conn: conn, + respWait: make(map[string]*requestOp), + clientSubs: make(map[string]*ClientSubscription), + rootCtx: rootCtx, + cancelRoot: cancelRoot, + allowSubscribe: true, + serverSubs: make(map[ID]*Subscription), + log: log.Root(), + } + if conn.RemoteAddr() != "" { + h.log = h.log.New("conn", conn.RemoteAddr()) + } + h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe)) + return h +} + +// handleBatch executes all messages in a batch and returns the responses. +func (h *handler) handleBatch(msgs []*jsonrpcMessage) { + // Emit error response for empty batches: + if len(msgs) == 0 { + h.startCallProc(func(cp *callProc) { + h.conn.Write(cp.ctx, errorMessage(&invalidRequestError{"empty batch"})) + }) + return + } + + // Handle non-call messages first: + calls := make([]*jsonrpcMessage, 0, len(msgs)) + for _, msg := range msgs { + if handled := h.handleImmediate(msg); !handled { + calls = append(calls, msg) + } + } + if len(calls) == 0 { + return + } + // Process calls on a goroutine because they may block indefinitely: + h.startCallProc(func(cp *callProc) { + answers := make([]*jsonrpcMessage, 0, len(msgs)) + for _, msg := range calls { + if answer := h.handleCallMsg(cp, msg); answer != nil { + answers = append(answers, answer) + } + } + h.addSubscriptions(cp.notifiers) + if len(answers) > 0 { + h.conn.Write(cp.ctx, answers) + } + for _, n := range cp.notifiers { + n.activate() + } + }) +} + +// handleMsg handles a single message. +func (h *handler) handleMsg(msg *jsonrpcMessage) { + if ok := h.handleImmediate(msg); ok { + return + } + h.startCallProc(func(cp *callProc) { + answer := h.handleCallMsg(cp, msg) + h.addSubscriptions(cp.notifiers) + if answer != nil { + h.conn.Write(cp.ctx, answer) + } + for _, n := range cp.notifiers { + n.activate() + } + }) +} + +// close cancels all requests except for inflightReq and waits for +// call goroutines to shut down. +func (h *handler) close(err error, inflightReq *requestOp) { + h.cancelAllRequests(err, inflightReq) + h.callWG.Wait() + h.cancelRoot() + h.cancelServerSubscriptions(err) +} + +// addRequestOp registers a request operation. +func (h *handler) addRequestOp(op *requestOp) { + for _, id := range op.ids { + h.respWait[string(id)] = op + } +} + +// removeRequestOps stops waiting for the given request IDs. +func (h *handler) removeRequestOp(op *requestOp) { + for _, id := range op.ids { + delete(h.respWait, string(id)) + } +} + +// cancelAllRequests unblocks and removes pending requests and active subscriptions. +func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) { + didClose := make(map[*requestOp]bool) + if inflightReq != nil { + didClose[inflightReq] = true + } + + for id, op := range h.respWait { + // Remove the op so that later calls will not close op.resp again. + delete(h.respWait, id) + + if !didClose[op] { + op.err = err + close(op.resp) + didClose[op] = true + } + } + for id, sub := range h.clientSubs { + delete(h.clientSubs, id) + sub.quitWithError(err, false) + } +} + +func (h *handler) addSubscriptions(nn []*Notifier) { + h.subLock.Lock() + defer h.subLock.Unlock() + + for _, n := range nn { + if sub := n.takeSubscription(); sub != nil { + h.serverSubs[sub.ID] = sub + } + } +} + +// cancelServerSubscriptions removes all subscriptions and closes their error channels. +func (h *handler) cancelServerSubscriptions(err error) { + h.subLock.Lock() + defer h.subLock.Unlock() + + for id, s := range h.serverSubs { + s.err <- err + close(s.err) + delete(h.serverSubs, id) + } +} + +// startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group. +func (h *handler) startCallProc(fn func(*callProc)) { + h.callWG.Add(1) + go func() { + ctx, cancel := context.WithCancel(h.rootCtx) + defer h.callWG.Done() + defer cancel() + fn(&callProc{ctx: ctx}) + }() +} + +// handleImmediate executes non-call messages. It returns false if the message is a +// call or requires a reply. +func (h *handler) handleImmediate(msg *jsonrpcMessage) bool { + start := time.Now() + switch { + case msg.isNotification(): + if strings.HasSuffix(msg.Method, notificationMethodSuffix) { + h.handleSubscriptionResult(msg) + return true + } + return false + case msg.isResponse(): + h.handleResponse(msg) + h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "t", time.Since(start)) + return true + default: + return false + } +} + +// handleSubscriptionResult processes subscription notifications. +func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) { + var result subscriptionResult + if err := json.Unmarshal(msg.Params, &result); err != nil { + h.log.Debug("Dropping invalid subscription message") + return + } + if h.clientSubs[result.ID] != nil { + h.clientSubs[result.ID].deliver(result.Result) + } +} + +// handleResponse processes method call responses. +func (h *handler) handleResponse(msg *jsonrpcMessage) { + op := h.respWait[string(msg.ID)] + if op == nil { + h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID}) + return + } + delete(h.respWait, string(msg.ID)) + // For normal responses, just forward the reply to Call/BatchCall. + if op.sub == nil { + op.resp <- msg + return + } + // For subscription responses, start the subscription if the server + // indicates success. EthSubscribe gets unblocked in either case through + // the op.resp channel. + defer close(op.resp) + if msg.Error != nil { + op.err = msg.Error + return + } + if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil { + go op.sub.start() + h.clientSubs[op.sub.subid] = op.sub + } +} + +// handleCallMsg executes a call message and returns the answer. +func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage { + start := time.Now() + switch { + case msg.isNotification(): + h.handleCall(ctx, msg) + h.log.Debug("Served "+msg.Method, "t", time.Since(start)) + return nil + case msg.isCall(): + resp := h.handleCall(ctx, msg) + if resp.Error != nil { + h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message) + } else { + h.log.Debug("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start)) + } + return resp + case msg.hasValidID(): + return msg.errorResponse(&invalidRequestError{"invalid request"}) + default: + return errorMessage(&invalidRequestError{"invalid request"}) + } +} + +// handleCall processes method calls. +func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage { + if msg.isSubscribe() { + return h.handleSubscribe(cp, msg) + } + var callb *callback + if msg.isUnsubscribe() { + callb = h.unsubscribeCb + } else { + callb = h.reg.callback(msg.Method) + } + if callb == nil { + return msg.errorResponse(&methodNotFoundError{method: msg.Method}) + } + args, err := parsePositionalArguments(msg.Params, callb.argTypes) + if err != nil { + return msg.errorResponse(&invalidParamsError{err.Error()}) + } + + return h.runMethod(cp.ctx, msg, callb, args) +} + +// handleSubscribe processes *_subscribe method calls. +func (h *handler) handleSubscribe(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage { + if !h.allowSubscribe { + return msg.errorResponse(ErrNotificationsUnsupported) + } + + // Subscription method name is first argument. + name, err := parseSubscriptionName(msg.Params) + if err != nil { + return msg.errorResponse(&invalidParamsError{err.Error()}) + } + namespace := msg.namespace() + callb := h.reg.subscription(namespace, name) + if callb == nil { + return msg.errorResponse(&subscriptionNotFoundError{namespace, name}) + } + + // Parse subscription name arg too, but remove it before calling the callback. + argTypes := append([]reflect.Type{stringType}, callb.argTypes...) + args, err := parsePositionalArguments(msg.Params, argTypes) + if err != nil { + return msg.errorResponse(&invalidParamsError{err.Error()}) + } + args = args[1:] + + // Install notifier in context so the subscription handler can find it. + n := &Notifier{h: h, namespace: namespace} + cp.notifiers = append(cp.notifiers, n) + ctx := context.WithValue(cp.ctx, notifierKey{}, n) + + return h.runMethod(ctx, msg, callb, args) +} + +// runMethod runs the Go callback for an RPC method. +func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *callback, args []reflect.Value) *jsonrpcMessage { + result, err := callb.call(ctx, msg.Method, args) + if err != nil { + return msg.errorResponse(err) + } + return msg.response(result) +} + +// unsubscribe is the callback function for all *_unsubscribe calls. +func (h *handler) unsubscribe(ctx context.Context, id ID) (bool, error) { + h.subLock.Lock() + defer h.subLock.Unlock() + + s := h.serverSubs[id] + if s == nil { + return false, ErrSubscriptionNotFound + } + close(s.err) + delete(h.serverSubs, id) + return true, nil +} + +type idForLog struct{ json.RawMessage } + +func (id idForLog) String() string { + if s, err := strconv.Unquote(string(id.RawMessage)); err == nil { + return s + } + return string(id.RawMessage) +} diff --git a/rpc/http.go b/rpc/http.go new file mode 100644 index 0000000..2dffc5d --- /dev/null +++ b/rpc/http.go @@ -0,0 +1,359 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "mime" + "net" + "net/http" + "strings" + "sync" + "time" + + "github.com/ava-labs/go-ethereum/log" + "github.com/rs/cors" +) + +const ( + maxRequestContentLength = 1024 * 1024 * 5 + contentType = "application/json" +) + +// https://www.jsonrpc.org/historical/json-rpc-over-http.html#id13 +var acceptedContentTypes = []string{contentType, "application/json-rpc", "application/jsonrequest"} + +type httpConn struct { + client *http.Client + req *http.Request + closeOnce sync.Once + closed chan interface{} +} + +// httpConn is treated specially by Client. +func (hc *httpConn) Write(context.Context, interface{}) error { + panic("Write called on httpConn") +} + +func (hc *httpConn) RemoteAddr() string { + return hc.req.URL.String() +} + +func (hc *httpConn) Read() ([]*jsonrpcMessage, bool, error) { + <-hc.closed + return nil, false, io.EOF +} + +func (hc *httpConn) Close() { + hc.closeOnce.Do(func() { close(hc.closed) }) +} + +func (hc *httpConn) Closed() <-chan interface{} { + return hc.closed +} + +// HTTPTimeouts represents the configuration params for the HTTP RPC server. +type HTTPTimeouts struct { + // ReadTimeout is the maximum duration for reading the entire + // request, including the body. + // + // Because ReadTimeout does not let Handlers make per-request + // decisions on each request body's acceptable deadline or + // upload rate, most users will prefer to use + // ReadHeaderTimeout. It is valid to use them both. + ReadTimeout time.Duration + + // WriteTimeout is the maximum duration before timing out + // writes of the response. It is reset whenever a new + // request's header is read. Like ReadTimeout, it does not + // let Handlers make decisions on a per-request basis. + WriteTimeout time.Duration + + // IdleTimeout is the maximum amount of time to wait for the + // next request when keep-alives are enabled. If IdleTimeout + // is zero, the value of ReadTimeout is used. If both are + // zero, ReadHeaderTimeout is used. + IdleTimeout time.Duration +} + +// DefaultHTTPTimeouts represents the default timeout values used if further +// configuration is not provided. +var DefaultHTTPTimeouts = HTTPTimeouts{ + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 120 * time.Second, +} + +// DialHTTPWithClient creates a new RPC client that connects to an RPC server over HTTP +// using the provided HTTP Client. +func DialHTTPWithClient(endpoint string, client *http.Client) (*Client, error) { + req, err := http.NewRequest(http.MethodPost, endpoint, nil) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", contentType) + req.Header.Set("Accept", contentType) + + initctx := context.Background() + return newClient(initctx, func(context.Context) (ServerCodec, error) { + return &httpConn{client: client, req: req, closed: make(chan interface{})}, nil + }) +} + +// DialHTTP creates a new RPC client that connects to an RPC server over HTTP. +func DialHTTP(endpoint string) (*Client, error) { + return DialHTTPWithClient(endpoint, new(http.Client)) +} + +func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error { + hc := c.writeConn.(*httpConn) + respBody, err := hc.doRequest(ctx, msg) + if respBody != nil { + defer respBody.Close() + } + + if err != nil { + if respBody != nil { + buf := new(bytes.Buffer) + if _, err2 := buf.ReadFrom(respBody); err2 == nil { + return fmt.Errorf("%v %v", err, buf.String()) + } + } + return err + } + var respmsg jsonrpcMessage + if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil { + return err + } + op.resp <- &respmsg + return nil +} + +func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error { + hc := c.writeConn.(*httpConn) + respBody, err := hc.doRequest(ctx, msgs) + if err != nil { + return err + } + defer respBody.Close() + var respmsgs []jsonrpcMessage + if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil { + return err + } + for i := 0; i < len(respmsgs); i++ { + op.resp <- &respmsgs[i] + } + return nil +} + +func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) { + body, err := json.Marshal(msg) + if err != nil { + return nil, err + } + req := hc.req.WithContext(ctx) + req.Body = ioutil.NopCloser(bytes.NewReader(body)) + req.ContentLength = int64(len(body)) + + resp, err := hc.client.Do(req) + if err != nil { + return nil, err + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return resp.Body, errors.New(resp.Status) + } + return resp.Body, nil +} + +// httpServerConn turns a HTTP connection into a Conn. +type httpServerConn struct { + io.Reader + io.Writer + r *http.Request +} + +func newHTTPServerConn(r *http.Request, w http.ResponseWriter) ServerCodec { + body := io.LimitReader(r.Body, maxRequestContentLength) + conn := &httpServerConn{Reader: body, Writer: w, r: r} + return NewJSONCodec(conn) +} + +// Close does nothing and always returns nil. +func (t *httpServerConn) Close() error { return nil } + +// RemoteAddr returns the peer address of the underlying connection. +func (t *httpServerConn) RemoteAddr() string { + return t.r.RemoteAddr +} + +// SetWriteDeadline does nothing and always returns nil. +func (t *httpServerConn) SetWriteDeadline(time.Time) error { return nil } + +// NewHTTPServer creates a new HTTP RPC server around an API provider. +// +// Deprecated: Server implements http.Handler +func NewHTTPServer(cors []string, vhosts []string, timeouts HTTPTimeouts, srv http.Handler) *http.Server { + // Wrap the CORS-handler within a host-handler + handler := newCorsHandler(srv, cors) + handler = newVHostHandler(vhosts, handler) + handler = newGzipHandler(handler) + + // Make sure timeout values are meaningful + if timeouts.ReadTimeout < time.Second { + log.Warn("Sanitizing invalid HTTP read timeout", "provided", timeouts.ReadTimeout, "updated", DefaultHTTPTimeouts.ReadTimeout) + timeouts.ReadTimeout = DefaultHTTPTimeouts.ReadTimeout + } + if timeouts.WriteTimeout < time.Second { + log.Warn("Sanitizing invalid HTTP write timeout", "provided", timeouts.WriteTimeout, "updated", DefaultHTTPTimeouts.WriteTimeout) + timeouts.WriteTimeout = DefaultHTTPTimeouts.WriteTimeout + } + if timeouts.IdleTimeout < time.Second { + log.Warn("Sanitizing invalid HTTP idle timeout", "provided", timeouts.IdleTimeout, "updated", DefaultHTTPTimeouts.IdleTimeout) + timeouts.IdleTimeout = DefaultHTTPTimeouts.IdleTimeout + } + // Bundle and start the HTTP server + return &http.Server{ + Handler: handler, + ReadTimeout: timeouts.ReadTimeout, + WriteTimeout: timeouts.WriteTimeout, + IdleTimeout: timeouts.IdleTimeout, + } +} + +// ServeHTTP serves JSON-RPC requests over HTTP. +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Permit dumb empty requests for remote health-checks (AWS) + if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" { + return + } + if code, err := validateRequest(r); err != nil { + http.Error(w, err.Error(), code) + return + } + // All checks passed, create a codec that reads direct from the request body + // untilEOF and writes the response to w and order the server to process a + // single request. + ctx := r.Context() + ctx = context.WithValue(ctx, "remote", r.RemoteAddr) + ctx = context.WithValue(ctx, "scheme", r.Proto) + ctx = context.WithValue(ctx, "local", r.Host) + if ua := r.Header.Get("User-Agent"); ua != "" { + ctx = context.WithValue(ctx, "User-Agent", ua) + } + if origin := r.Header.Get("Origin"); origin != "" { + ctx = context.WithValue(ctx, "Origin", origin) + } + + w.Header().Set("content-type", contentType) + codec := newHTTPServerConn(r, w) + defer codec.Close() + s.serveSingleRequest(ctx, codec) +} + +// validateRequest returns a non-zero response code and error message if the +// request is invalid. +func validateRequest(r *http.Request) (int, error) { + if r.Method == http.MethodPut || r.Method == http.MethodDelete { + return http.StatusMethodNotAllowed, errors.New("method not allowed") + } + if r.ContentLength > maxRequestContentLength { + err := fmt.Errorf("content length too large (%d>%d)", r.ContentLength, maxRequestContentLength) + return http.StatusRequestEntityTooLarge, err + } + // Allow OPTIONS (regardless of content-type) + if r.Method == http.MethodOptions { + return 0, nil + } + // Check content-type + if mt, _, err := mime.ParseMediaType(r.Header.Get("content-type")); err == nil { + for _, accepted := range acceptedContentTypes { + if accepted == mt { + return 0, nil + } + } + } + // Invalid content-type + err := fmt.Errorf("invalid content type, only %s is supported", contentType) + return http.StatusUnsupportedMediaType, err +} + +func newCorsHandler(srv http.Handler, allowedOrigins []string) http.Handler { + // disable CORS support if user has not specified a custom CORS configuration + if len(allowedOrigins) == 0 { + return srv + } + c := cors.New(cors.Options{ + AllowedOrigins: allowedOrigins, + AllowedMethods: []string{http.MethodPost, http.MethodGet}, + MaxAge: 600, + AllowedHeaders: []string{"*"}, + }) + return c.Handler(srv) +} + +// virtualHostHandler is a handler which validates the Host-header of incoming requests. +// The virtualHostHandler can prevent DNS rebinding attacks, which do not utilize CORS-headers, +// since they do in-domain requests against the RPC api. Instead, we can see on the Host-header +// which domain was used, and validate that against a whitelist. +type virtualHostHandler struct { + vhosts map[string]struct{} + next http.Handler +} + +// ServeHTTP serves JSON-RPC requests over HTTP, implements http.Handler +func (h *virtualHostHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // if r.Host is not set, we can continue serving since a browser would set the Host header + if r.Host == "" { + h.next.ServeHTTP(w, r) + return + } + host, _, err := net.SplitHostPort(r.Host) + if err != nil { + // Either invalid (too many colons) or no port specified + host = r.Host + } + if ipAddr := net.ParseIP(host); ipAddr != nil { + // It's an IP address, we can serve that + h.next.ServeHTTP(w, r) + return + + } + // Not an ip address, but a hostname. Need to validate + if _, exist := h.vhosts["*"]; exist { + h.next.ServeHTTP(w, r) + return + } + if _, exist := h.vhosts[host]; exist { + h.next.ServeHTTP(w, r) + return + } + http.Error(w, "invalid host specified", http.StatusForbidden) +} + +func newVHostHandler(vhosts []string, next http.Handler) http.Handler { + vhostMap := make(map[string]struct{}) + for _, allowedHost := range vhosts { + vhostMap[strings.ToLower(allowedHost)] = struct{}{} + } + return &virtualHostHandler{vhostMap, next} +} diff --git a/rpc/http_test.go b/rpc/http_test.go new file mode 100644 index 0000000..b3f694d --- /dev/null +++ b/rpc/http_test.go @@ -0,0 +1,54 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func TestHTTPErrorResponseWithDelete(t *testing.T) { + testHTTPErrorResponse(t, http.MethodDelete, contentType, "", http.StatusMethodNotAllowed) +} + +func TestHTTPErrorResponseWithPut(t *testing.T) { + testHTTPErrorResponse(t, http.MethodPut, contentType, "", http.StatusMethodNotAllowed) +} + +func TestHTTPErrorResponseWithMaxContentLength(t *testing.T) { + body := make([]rune, maxRequestContentLength+1) + testHTTPErrorResponse(t, + http.MethodPost, contentType, string(body), http.StatusRequestEntityTooLarge) +} + +func TestHTTPErrorResponseWithEmptyContentType(t *testing.T) { + testHTTPErrorResponse(t, http.MethodPost, "", "", http.StatusUnsupportedMediaType) +} + +func TestHTTPErrorResponseWithValidRequest(t *testing.T) { + testHTTPErrorResponse(t, http.MethodPost, contentType, "", 0) +} + +func testHTTPErrorResponse(t *testing.T, method, contentType, body string, expected int) { + request := httptest.NewRequest(method, "http://url.com", strings.NewReader(body)) + request.Header.Set("content-type", contentType) + if code, _ := validateRequest(request); code != expected { + t.Fatalf("response code should be %d not %d", expected, code) + } +} diff --git a/rpc/ipc_js.go b/rpc/ipc_js.go new file mode 100644 index 0000000..7e7554a --- /dev/null +++ b/rpc/ipc_js.go @@ -0,0 +1,37 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// +build js + +package rpc + +import ( + "context" + "errors" + "net" +) + +var errNotSupported = errors.New("rpc: not supported") + +// ipcListen will create a named pipe on the given endpoint. +func ipcListen(endpoint string) (net.Listener, error) { + return nil, errNotSupported +} + +// newIPCConnection will connect to a named pipe with the given endpoint as name. +func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) { + return nil, errNotSupported +} diff --git a/rpc/json.go b/rpc/json.go new file mode 100644 index 0000000..75c2210 --- /dev/null +++ b/rpc/json.go @@ -0,0 +1,335 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "reflect" + "strings" + "sync" + "time" +) + +const ( + vsn = "2.0" + serviceMethodSeparator = "_" + subscribeMethodSuffix = "_subscribe" + unsubscribeMethodSuffix = "_unsubscribe" + notificationMethodSuffix = "_subscription" + + defaultWriteTimeout = 10 * time.Second // used if context has no deadline +) + +var null = json.RawMessage("null") + +type subscriptionResult struct { + ID string `json:"subscription"` + Result json.RawMessage `json:"result,omitempty"` +} + +// A value of this type can a JSON-RPC request, notification, successful response or +// error response. Which one it is depends on the fields. +type jsonrpcMessage struct { + Version string `json:"jsonrpc,omitempty"` + ID json.RawMessage `json:"id,omitempty"` + Method string `json:"method,omitempty"` + Params json.RawMessage `json:"params,omitempty"` + Error *jsonError `json:"error,omitempty"` + Result json.RawMessage `json:"result,omitempty"` +} + +func (msg *jsonrpcMessage) isNotification() bool { + return msg.ID == nil && msg.Method != "" +} + +func (msg *jsonrpcMessage) isCall() bool { + return msg.hasValidID() && msg.Method != "" +} + +func (msg *jsonrpcMessage) isResponse() bool { + return msg.hasValidID() && msg.Method == "" && msg.Params == nil && (msg.Result != nil || msg.Error != nil) +} + +func (msg *jsonrpcMessage) hasValidID() bool { + return len(msg.ID) > 0 && msg.ID[0] != '{' && msg.ID[0] != '[' +} + +func (msg *jsonrpcMessage) isSubscribe() bool { + return strings.HasSuffix(msg.Method, subscribeMethodSuffix) +} + +func (msg *jsonrpcMessage) isUnsubscribe() bool { + return strings.HasSuffix(msg.Method, unsubscribeMethodSuffix) +} + +func (msg *jsonrpcMessage) namespace() string { + elem := strings.SplitN(msg.Method, serviceMethodSeparator, 2) + return elem[0] +} + +func (msg *jsonrpcMessage) String() string { + b, _ := json.Marshal(msg) + return string(b) +} + +func (msg *jsonrpcMessage) errorResponse(err error) *jsonrpcMessage { + resp := errorMessage(err) + resp.ID = msg.ID + return resp +} + +func (msg *jsonrpcMessage) response(result interface{}) *jsonrpcMessage { + enc, err := json.Marshal(result) + if err != nil { + // TODO: wrap with 'internal server error' + return msg.errorResponse(err) + } + return &jsonrpcMessage{Version: vsn, ID: msg.ID, Result: enc} +} + +func errorMessage(err error) *jsonrpcMessage { + msg := &jsonrpcMessage{Version: vsn, ID: null, Error: &jsonError{ + Code: defaultErrorCode, + Message: err.Error(), + }} + ec, ok := err.(Error) + if ok { + msg.Error.Code = ec.ErrorCode() + } + return msg +} + +type jsonError struct { + Code int `json:"code"` + Message string `json:"message"` + Data interface{} `json:"data,omitempty"` +} + +func (err *jsonError) Error() string { + if err.Message == "" { + return fmt.Sprintf("json-rpc error %d", err.Code) + } + return err.Message +} + +func (err *jsonError) ErrorCode() int { + return err.Code +} + +// Conn is a subset of the methods of net.Conn which are sufficient for ServerCodec. +type Conn interface { + io.ReadWriteCloser + SetWriteDeadline(time.Time) error +} + +type deadlineCloser interface { + io.Closer + SetWriteDeadline(time.Time) error +} + +// ConnRemoteAddr wraps the RemoteAddr operation, which returns a description +// of the peer address of a connection. If a Conn also implements ConnRemoteAddr, this +// description is used in log messages. +type ConnRemoteAddr interface { + RemoteAddr() string +} + +// connWithRemoteAddr overrides the remote address of a connection. +type connWithRemoteAddr struct { + Conn + addr string +} + +func (c connWithRemoteAddr) RemoteAddr() string { return c.addr } + +// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has +// support for parsing arguments and serializing (result) objects. +type jsonCodec struct { + remoteAddr string + closer sync.Once // close closed channel once + closed chan interface{} // closed on Close + decode func(v interface{}) error // decoder to allow multiple transports + encMu sync.Mutex // guards the encoder + encode func(v interface{}) error // encoder to allow multiple transports + conn deadlineCloser +} + +func newCodec(conn deadlineCloser, encode, decode func(v interface{}) error) ServerCodec { + codec := &jsonCodec{ + closed: make(chan interface{}), + encode: encode, + decode: decode, + conn: conn, + } + if ra, ok := conn.(ConnRemoteAddr); ok { + codec.remoteAddr = ra.RemoteAddr() + } + return codec +} + +// NewJSONCodec creates a codec that reads from the given connection. If conn implements +// ConnRemoteAddr, log messages will use it to include the remote address of the +// connection. +func NewJSONCodec(conn Conn) ServerCodec { + enc := json.NewEncoder(conn) + dec := json.NewDecoder(conn) + dec.UseNumber() + return newCodec(conn, enc.Encode, dec.Decode) +} + +func (c *jsonCodec) RemoteAddr() string { + return c.remoteAddr +} + +func (c *jsonCodec) Read() (msg []*jsonrpcMessage, batch bool, err error) { + // Decode the next JSON object in the input stream. + // This verifies basic syntax, etc. + var rawmsg json.RawMessage + if err := c.decode(&rawmsg); err != nil { + return nil, false, err + } + msg, batch = parseMessage(rawmsg) + return msg, batch, nil +} + +// Write sends a message to client. +func (c *jsonCodec) Write(ctx context.Context, v interface{}) error { + c.encMu.Lock() + defer c.encMu.Unlock() + + deadline, ok := ctx.Deadline() + if !ok { + deadline = time.Now().Add(defaultWriteTimeout) + } + c.conn.SetWriteDeadline(deadline) + return c.encode(v) +} + +// Close the underlying connection +func (c *jsonCodec) Close() { + c.closer.Do(func() { + close(c.closed) + c.conn.Close() + }) +} + +// Closed returns a channel which will be closed when Close is called +func (c *jsonCodec) Closed() <-chan interface{} { + return c.closed +} + +// parseMessage parses raw bytes as a (batch of) JSON-RPC message(s). There are no error +// checks in this function because the raw message has already been syntax-checked when it +// is called. Any non-JSON-RPC messages in the input return the zero value of +// jsonrpcMessage. +func parseMessage(raw json.RawMessage) ([]*jsonrpcMessage, bool) { + if !isBatch(raw) { + msgs := []*jsonrpcMessage{{}} + json.Unmarshal(raw, &msgs[0]) + return msgs, false + } + dec := json.NewDecoder(bytes.NewReader(raw)) + dec.Token() // skip '[' + var msgs []*jsonrpcMessage + for dec.More() { + msgs = append(msgs, new(jsonrpcMessage)) + dec.Decode(&msgs[len(msgs)-1]) + } + return msgs, true +} + +// isBatch returns true when the first non-whitespace characters is '[' +func isBatch(raw json.RawMessage) bool { + for _, c := range raw { + // skip insignificant whitespace (http://www.ietf.org/rfc/rfc4627.txt) + if c == 0x20 || c == 0x09 || c == 0x0a || c == 0x0d { + continue + } + return c == '[' + } + return false +} + +// parsePositionalArguments tries to parse the given args to an array of values with the +// given types. It returns the parsed values or an error when the args could not be +// parsed. Missing optional arguments are returned as reflect.Zero values. +func parsePositionalArguments(rawArgs json.RawMessage, types []reflect.Type) ([]reflect.Value, error) { + dec := json.NewDecoder(bytes.NewReader(rawArgs)) + var args []reflect.Value + tok, err := dec.Token() + switch { + case err == io.EOF || tok == nil && err == nil: + // "params" is optional and may be empty. Also allow "params":null even though it's + // not in the spec because our own client used to send it. + case err != nil: + return nil, err + case tok == json.Delim('['): + // Read argument array. + if args, err = parseArgumentArray(dec, types); err != nil { + return nil, err + } + default: + return nil, errors.New("non-array args") + } + // Set any missing args to nil. + for i := len(args); i < len(types); i++ { + if types[i].Kind() != reflect.Ptr { + return nil, fmt.Errorf("missing value for required argument %d", i) + } + args = append(args, reflect.Zero(types[i])) + } + return args, nil +} + +func parseArgumentArray(dec *json.Decoder, types []reflect.Type) ([]reflect.Value, error) { + args := make([]reflect.Value, 0, len(types)) + for i := 0; dec.More(); i++ { + if i >= len(types) { + return args, fmt.Errorf("too many arguments, want at most %d", len(types)) + } + argval := reflect.New(types[i]) + if err := dec.Decode(argval.Interface()); err != nil { + return args, fmt.Errorf("invalid argument %d: %v", i, err) + } + if argval.IsNil() && types[i].Kind() != reflect.Ptr { + return args, fmt.Errorf("missing value for required argument %d", i) + } + args = append(args, argval.Elem()) + } + // Read end of args array. + _, err := dec.Token() + return args, err +} + +// parseSubscriptionName extracts the subscription name from an encoded argument array. +func parseSubscriptionName(rawArgs json.RawMessage) (string, error) { + dec := json.NewDecoder(bytes.NewReader(rawArgs)) + if tok, _ := dec.Token(); tok != json.Delim('[') { + return "", errors.New("non-array args") + } + v, _ := dec.Token() + method, ok := v.(string) + if !ok { + return "", errors.New("expected subscription name as first argument") + } + return method, nil +} diff --git a/rpc/server.go b/rpc/server.go new file mode 100644 index 0000000..bf5d93e --- /dev/null +++ b/rpc/server.go @@ -0,0 +1,147 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "context" + "io" + "sync/atomic" + + mapset "github.com/deckarep/golang-set" + "github.com/ava-labs/go-ethereum/log" +) + +const MetadataApi = "rpc" + +// CodecOption specifies which type of messages a codec supports. +// +// Deprecated: this option is no longer honored by Server. +type CodecOption int + +const ( + // OptionMethodInvocation is an indication that the codec supports RPC method calls + OptionMethodInvocation CodecOption = 1 << iota + + // OptionSubscriptions is an indication that the codec suports RPC notifications + OptionSubscriptions = 1 << iota // support pub sub +) + +// Server is an RPC server. +type Server struct { + services serviceRegistry + idgen func() ID + run int32 + codecs mapset.Set +} + +// NewServer creates a new server instance with no registered handlers. +func NewServer() *Server { + server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1} + // Register the default service providing meta information about the RPC service such + // as the services and methods it offers. + rpcService := &RPCService{server} + server.RegisterName(MetadataApi, rpcService) + return server +} + +// RegisterName creates a service for the given receiver type under the given name. When no +// methods on the given receiver match the criteria to be either a RPC method or a +// subscription an error is returned. Otherwise a new service is created and added to the +// service collection this server provides to clients. +func (s *Server) RegisterName(name string, receiver interface{}) error { + return s.services.registerName(name, receiver) +} + +// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes +// the response back using the given codec. It will block until the codec is closed or the +// server is stopped. In either case the codec is closed. +// +// Note that codec options are no longer supported. +func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) { + defer codec.Close() + + // Don't serve if server is stopped. + if atomic.LoadInt32(&s.run) == 0 { + return + } + + // Add the codec to the set so it can be closed by Stop. + s.codecs.Add(codec) + defer s.codecs.Remove(codec) + + c := initClient(codec, s.idgen, &s.services) + <-codec.Closed() + c.Close() +} + +// serveSingleRequest reads and processes a single RPC request from the given codec. This +// is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in +// this mode. +func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { + // Don't serve if server is stopped. + if atomic.LoadInt32(&s.run) == 0 { + return + } + + h := newHandler(ctx, codec, s.idgen, &s.services) + h.allowSubscribe = false + defer h.close(io.EOF, nil) + + reqs, batch, err := codec.Read() + if err != nil { + if err != io.EOF { + codec.Write(ctx, errorMessage(&invalidMessageError{"parse error"})) + } + return + } + if batch { + h.handleBatch(reqs) + } else { + h.handleMsg(reqs[0]) + } +} + +// Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending +// requests to finish, then closes all codecs which will cancel pending requests and +// subscriptions. +func (s *Server) Stop() { + if atomic.CompareAndSwapInt32(&s.run, 1, 0) { + log.Debug("RPC server shutting down") + s.codecs.Each(func(c interface{}) bool { + c.(ServerCodec).Close() + return true + }) + } +} + +// RPCService gives meta information about the server. +// e.g. gives information about the loaded modules. +type RPCService struct { + server *Server +} + +// Modules returns the list of RPC services with their version number +func (s *RPCService) Modules() map[string]string { + s.server.services.mu.Lock() + defer s.server.services.mu.Unlock() + + modules := make(map[string]string) + for name := range s.server.services.services { + modules[name] = "1.0" + } + return modules +} diff --git a/rpc/server_test.go b/rpc/server_test.go new file mode 100644 index 0000000..3909954 --- /dev/null +++ b/rpc/server_test.go @@ -0,0 +1,152 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "bufio" + "bytes" + "io" + "io/ioutil" + "net" + "path/filepath" + "strings" + "testing" + "time" +) + +func TestServerRegisterName(t *testing.T) { + server := NewServer() + service := new(testService) + + if err := server.RegisterName("test", service); err != nil { + t.Fatalf("%v", err) + } + + if len(server.services.services) != 2 { + t.Fatalf("Expected 2 service entries, got %d", len(server.services.services)) + } + + svc, ok := server.services.services["test"] + if !ok { + t.Fatalf("Expected service calc to be registered") + } + + wantCallbacks := 7 + if len(svc.callbacks) != wantCallbacks { + t.Errorf("Expected %d callbacks for service 'service', got %d", wantCallbacks, len(svc.callbacks)) + } +} + +func TestServer(t *testing.T) { + files, err := ioutil.ReadDir("testdata") + if err != nil { + t.Fatal("where'd my testdata go?") + } + for _, f := range files { + if f.IsDir() || strings.HasPrefix(f.Name(), ".") { + continue + } + path := filepath.Join("testdata", f.Name()) + name := strings.TrimSuffix(f.Name(), filepath.Ext(f.Name())) + t.Run(name, func(t *testing.T) { + runTestScript(t, path) + }) + } +} + +func runTestScript(t *testing.T, file string) { + server := newTestServer() + content, err := ioutil.ReadFile(file) + if err != nil { + t.Fatal(err) + } + + clientConn, serverConn := net.Pipe() + defer clientConn.Close() + go server.ServeCodec(NewJSONCodec(serverConn), OptionMethodInvocation|OptionSubscriptions) + readbuf := bufio.NewReader(clientConn) + for _, line := range strings.Split(string(content), "\n") { + line = strings.TrimSpace(line) + switch { + case len(line) == 0 || strings.HasPrefix(line, "//"): + // skip comments, blank lines + continue + case strings.HasPrefix(line, "--> "): + t.Log(line) + // write to connection + clientConn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + if _, err := io.WriteString(clientConn, line[4:]+"\n"); err != nil { + t.Fatalf("write error: %v", err) + } + case strings.HasPrefix(line, "<-- "): + t.Log(line) + want := line[4:] + // read line from connection and compare text + clientConn.SetReadDeadline(time.Now().Add(5 * time.Second)) + sent, err := readbuf.ReadString('\n') + if err != nil { + t.Fatalf("read error: %v", err) + } + sent = strings.TrimRight(sent, "\r\n") + if sent != want { + t.Errorf("wrong line from server\ngot: %s\nwant: %s", sent, want) + } + default: + panic("invalid line in test script: " + line) + } + } +} + +// This test checks that responses are delivered for very short-lived connections that +// only carry a single request. +func TestServerShortLivedConn(t *testing.T) { + server := newTestServer() + defer server.Stop() + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("can't listen:", err) + } + defer listener.Close() + go server.ServeListener(listener) + + var ( + request = `{"jsonrpc":"2.0","id":1,"method":"rpc_modules"}` + "\n" + wantResp = `{"jsonrpc":"2.0","id":1,"result":{"nftest":"1.0","rpc":"1.0","test":"1.0"}}` + "\n" + deadline = time.Now().Add(10 * time.Second) + ) + for i := 0; i < 20; i++ { + conn, err := net.Dial("tcp", listener.Addr().String()) + if err != nil { + t.Fatal("can't dial:", err) + } + defer conn.Close() + conn.SetDeadline(deadline) + // Write the request, then half-close the connection so the server stops reading. + conn.Write([]byte(request)) + conn.(*net.TCPConn).CloseWrite() + // Now try to get the response. + buf := make([]byte, 2000) + n, err := conn.Read(buf) + if err != nil { + t.Fatal("read error:", err) + } + if !bytes.Equal(buf[:n], []byte(wantResp)) { + t.Fatalf("wrong response: %s", buf[:n]) + } + } +} diff --git a/rpc/service.go b/rpc/service.go new file mode 100644 index 0000000..ead6fb6 --- /dev/null +++ b/rpc/service.go @@ -0,0 +1,285 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "context" + "errors" + "fmt" + "reflect" + "runtime" + "strings" + "sync" + "unicode" + "unicode/utf8" + + "github.com/ava-labs/go-ethereum/log" +) + +var ( + contextType = reflect.TypeOf((*context.Context)(nil)).Elem() + errorType = reflect.TypeOf((*error)(nil)).Elem() + subscriptionType = reflect.TypeOf(Subscription{}) + stringType = reflect.TypeOf("") +) + +type serviceRegistry struct { + mu sync.Mutex + services map[string]service +} + +// service represents a registered object. +type service struct { + name string // name for service + callbacks map[string]*callback // registered handlers + subscriptions map[string]*callback // available subscriptions/notifications +} + +// callback is a method callback which was registered in the server +type callback struct { + fn reflect.Value // the function + rcvr reflect.Value // receiver object of method, set if fn is method + argTypes []reflect.Type // input argument types + hasCtx bool // method's first argument is a context (not included in argTypes) + errPos int // err return idx, of -1 when method cannot return error + isSubscribe bool // true if this is a subscription callback +} + +func (r *serviceRegistry) registerName(name string, rcvr interface{}) error { + rcvrVal := reflect.ValueOf(rcvr) + if name == "" { + return fmt.Errorf("no service name for type %s", rcvrVal.Type().String()) + } + callbacks := suitableCallbacks(rcvrVal) + if len(callbacks) == 0 { + return fmt.Errorf("service %T doesn't have any suitable methods/subscriptions to expose", rcvr) + } + + r.mu.Lock() + defer r.mu.Unlock() + if r.services == nil { + r.services = make(map[string]service) + } + svc, ok := r.services[name] + if !ok { + svc = service{ + name: name, + callbacks: make(map[string]*callback), + subscriptions: make(map[string]*callback), + } + r.services[name] = svc + } + for name, cb := range callbacks { + if cb.isSubscribe { + svc.subscriptions[name] = cb + } else { + svc.callbacks[name] = cb + } + } + return nil +} + +// callback returns the callback corresponding to the given RPC method name. +func (r *serviceRegistry) callback(method string) *callback { + elem := strings.SplitN(method, serviceMethodSeparator, 2) + if len(elem) != 2 { + return nil + } + r.mu.Lock() + defer r.mu.Unlock() + return r.services[elem[0]].callbacks[elem[1]] +} + +// subscription returns a subscription callback in the given service. +func (r *serviceRegistry) subscription(service, name string) *callback { + r.mu.Lock() + defer r.mu.Unlock() + return r.services[service].subscriptions[name] +} + +// suitableCallbacks iterates over the methods of the given type. It determines if a method +// satisfies the criteria for a RPC callback or a subscription callback and adds it to the +// collection of callbacks. See server documentation for a summary of these criteria. +func suitableCallbacks(receiver reflect.Value) map[string]*callback { + typ := receiver.Type() + callbacks := make(map[string]*callback) + for m := 0; m < typ.NumMethod(); m++ { + method := typ.Method(m) + if method.PkgPath != "" { + continue // method not exported + } + cb := newCallback(receiver, method.Func) + if cb == nil { + continue // function invalid + } + name := formatName(method.Name) + callbacks[name] = cb + } + return callbacks +} + +// newCallback turns fn (a function) into a callback object. It returns nil if the function +// is unsuitable as an RPC callback. +func newCallback(receiver, fn reflect.Value) *callback { + fntype := fn.Type() + c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype)} + // Determine parameter types. They must all be exported or builtin types. + c.makeArgTypes() + if !allExportedOrBuiltin(c.argTypes) { + return nil + } + // Verify return types. The function must return at most one error + // and/or one other non-error value. + outs := make([]reflect.Type, fntype.NumOut()) + for i := 0; i < fntype.NumOut(); i++ { + outs[i] = fntype.Out(i) + } + if len(outs) > 2 || !allExportedOrBuiltin(outs) { + return nil + } + // If an error is returned, it must be the last returned value. + switch { + case len(outs) == 1 && isErrorType(outs[0]): + c.errPos = 0 + case len(outs) == 2: + if isErrorType(outs[0]) || !isErrorType(outs[1]) { + return nil + } + c.errPos = 1 + } + return c +} + +// makeArgTypes composes the argTypes list. +func (c *callback) makeArgTypes() { + fntype := c.fn.Type() + // Skip receiver and context.Context parameter (if present). + firstArg := 0 + if c.rcvr.IsValid() { + firstArg++ + } + if fntype.NumIn() > firstArg && fntype.In(firstArg) == contextType { + c.hasCtx = true + firstArg++ + } + // Add all remaining parameters. + c.argTypes = make([]reflect.Type, fntype.NumIn()-firstArg) + for i := firstArg; i < fntype.NumIn(); i++ { + c.argTypes[i-firstArg] = fntype.In(i) + } +} + +// call invokes the callback. +func (c *callback) call(ctx context.Context, method string, args []reflect.Value) (res interface{}, errRes error) { + // Create the argument slice. + fullargs := make([]reflect.Value, 0, 2+len(args)) + if c.rcvr.IsValid() { + fullargs = append(fullargs, c.rcvr) + } + if c.hasCtx { + fullargs = append(fullargs, reflect.ValueOf(ctx)) + } + fullargs = append(fullargs, args...) + + // Catch panic while running the callback. + defer func() { + if err := recover(); err != nil { + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, buf)) + errRes = errors.New("method handler crashed") + } + }() + // Run the callback. + results := c.fn.Call(fullargs) + if len(results) == 0 { + return nil, nil + } + if c.errPos >= 0 && !results[c.errPos].IsNil() { + // Method has returned non-nil error value. + err := results[c.errPos].Interface().(error) + return reflect.Value{}, err + } + return results[0].Interface(), nil +} + +// Is this an exported - upper case - name? +func isExported(name string) bool { + rune, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(rune) +} + +// Are all those types exported or built-in? +func allExportedOrBuiltin(types []reflect.Type) bool { + for _, typ := range types { + for typ.Kind() == reflect.Ptr { + typ = typ.Elem() + } + // PkgPath will be non-empty even for an exported type, + // so we need to check the type name as well. + if !isExported(typ.Name()) && typ.PkgPath() != "" { + return false + } + } + return true +} + +// Is t context.Context or *context.Context? +func isContextType(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + return t == contextType +} + +// Does t satisfy the error interface? +func isErrorType(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + return t.Implements(errorType) +} + +// Is t Subscription or *Subscription? +func isSubscriptionType(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + return t == subscriptionType +} + +// isPubSub tests whether the given method has as as first argument a context.Context and +// returns the pair (Subscription, error). +func isPubSub(methodType reflect.Type) bool { + // numIn(0) is the receiver type + if methodType.NumIn() < 2 || methodType.NumOut() != 2 { + return false + } + return isContextType(methodType.In(1)) && + isSubscriptionType(methodType.Out(0)) && + isErrorType(methodType.Out(1)) +} + +// formatName converts to first character of name to lowercase. +func formatName(name string) string { + ret := []rune(name) + if len(ret) > 0 { + ret[0] = unicode.ToLower(ret[0]) + } + return string(ret) +} diff --git a/rpc/subscription.go b/rpc/subscription.go new file mode 100644 index 0000000..c1e869b --- /dev/null +++ b/rpc/subscription.go @@ -0,0 +1,327 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "bufio" + "container/list" + "context" + crand "crypto/rand" + "encoding/binary" + "encoding/hex" + "encoding/json" + "errors" + "math/rand" + "reflect" + "strings" + "sync" + "time" +) + +var ( + // ErrNotificationsUnsupported is returned when the connection doesn't support notifications + ErrNotificationsUnsupported = errors.New("notifications not supported") + // ErrNotificationNotFound is returned when the notification for the given id is not found + ErrSubscriptionNotFound = errors.New("subscription not found") +) + +var globalGen = randomIDGenerator() + +// ID defines a pseudo random number that is used to identify RPC subscriptions. +type ID string + +// NewID returns a new, random ID. +func NewID() ID { + return globalGen() +} + +// randomIDGenerator returns a function generates a random IDs. +func randomIDGenerator() func() ID { + seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader)) + if err != nil { + seed = int64(time.Now().Nanosecond()) + } + var ( + mu sync.Mutex + rng = rand.New(rand.NewSource(seed)) + ) + return func() ID { + mu.Lock() + defer mu.Unlock() + id := make([]byte, 16) + rng.Read(id) + return encodeID(id) + } +} + +func encodeID(b []byte) ID { + id := hex.EncodeToString(b) + id = strings.TrimLeft(id, "0") + if id == "" { + id = "0" // ID's are RPC quantities, no leading zero's and 0 is 0x0. + } + return ID("0x" + id) +} + +type notifierKey struct{} + +// NotifierFromContext returns the Notifier value stored in ctx, if any. +func NotifierFromContext(ctx context.Context) (*Notifier, bool) { + n, ok := ctx.Value(notifierKey{}).(*Notifier) + return n, ok +} + +// Notifier is tied to a RPC connection that supports subscriptions. +// Server callbacks use the notifier to send notifications. +type Notifier struct { + h *handler + namespace string + + mu sync.Mutex + sub *Subscription + buffer []json.RawMessage + callReturned bool + activated bool +} + +// CreateSubscription returns a new subscription that is coupled to the +// RPC connection. By default subscriptions are inactive and notifications +// are dropped until the subscription is marked as active. This is done +// by the RPC server after the subscription ID is send to the client. +func (n *Notifier) CreateSubscription() *Subscription { + n.mu.Lock() + defer n.mu.Unlock() + + if n.sub != nil { + panic("can't create multiple subscriptions with Notifier") + } else if n.callReturned { + panic("can't create subscription after subscribe call has returned") + } + n.sub = &Subscription{ID: n.h.idgen(), namespace: n.namespace, err: make(chan error, 1)} + return n.sub +} + +// Notify sends a notification to the client with the given data as payload. +// If an error occurs the RPC connection is closed and the error is returned. +func (n *Notifier) Notify(id ID, data interface{}) error { + enc, err := json.Marshal(data) + if err != nil { + return err + } + + n.mu.Lock() + defer n.mu.Unlock() + + if n.sub == nil { + panic("can't Notify before subscription is created") + } else if n.sub.ID != id { + panic("Notify with wrong ID") + } + if n.activated { + return n.send(n.sub, enc) + } + n.buffer = append(n.buffer, enc) + return nil +} + +// Closed returns a channel that is closed when the RPC connection is closed. +// Deprecated: use subscription error channel +func (n *Notifier) Closed() <-chan interface{} { + return n.h.conn.Closed() +} + +// takeSubscription returns the subscription (if one has been created). No subscription can +// be created after this call. +func (n *Notifier) takeSubscription() *Subscription { + n.mu.Lock() + defer n.mu.Unlock() + n.callReturned = true + return n.sub +} + +// acticate is called after the subscription ID was sent to client. Notifications are +// buffered before activation. This prevents notifications being sent to the client before +// the subscription ID is sent to the client. +func (n *Notifier) activate() error { + n.mu.Lock() + defer n.mu.Unlock() + + for _, data := range n.buffer { + if err := n.send(n.sub, data); err != nil { + return err + } + } + n.activated = true + return nil +} + +func (n *Notifier) send(sub *Subscription, data json.RawMessage) error { + params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data}) + ctx := context.Background() + return n.h.conn.Write(ctx, &jsonrpcMessage{ + Version: vsn, + Method: n.namespace + notificationMethodSuffix, + Params: params, + }) +} + +// A Subscription is created by a notifier and tight to that notifier. The client can use +// this subscription to wait for an unsubscribe request for the client, see Err(). +type Subscription struct { + ID ID + namespace string + err chan error // closed on unsubscribe +} + +// Err returns a channel that is closed when the client send an unsubscribe request. +func (s *Subscription) Err() <-chan error { + return s.err +} + +// MarshalJSON marshals a subscription as its ID. +func (s *Subscription) MarshalJSON() ([]byte, error) { + return json.Marshal(s.ID) +} + +// ClientSubscription is a subscription established through the Client's Subscribe or +// EthSubscribe methods. +type ClientSubscription struct { + client *Client + etype reflect.Type + channel reflect.Value + namespace string + subid string + in chan json.RawMessage + + quitOnce sync.Once // ensures quit is closed once + quit chan struct{} // quit is closed when the subscription exits + errOnce sync.Once // ensures err is closed once + err chan error +} + +func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription { + sub := &ClientSubscription{ + client: c, + namespace: namespace, + etype: channel.Type().Elem(), + channel: channel, + quit: make(chan struct{}), + err: make(chan error, 1), + in: make(chan json.RawMessage), + } + return sub +} + +// Err returns the subscription error channel. The intended use of Err is to schedule +// resubscription when the client connection is closed unexpectedly. +// +// The error channel receives a value when the subscription has ended due +// to an error. The received error is nil if Close has been called +// on the underlying client and no other error has occurred. +// +// The error channel is closed when Unsubscribe is called on the subscription. +func (sub *ClientSubscription) Err() <-chan error { + return sub.err +} + +// Unsubscribe unsubscribes the notification and closes the error channel. +// It can safely be called more than once. +func (sub *ClientSubscription) Unsubscribe() { + sub.quitWithError(nil, true) + sub.errOnce.Do(func() { close(sub.err) }) +} + +func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) { + sub.quitOnce.Do(func() { + // The dispatch loop won't be able to execute the unsubscribe call + // if it is blocked on deliver. Close sub.quit first because it + // unblocks deliver. + close(sub.quit) + if unsubscribeServer { + sub.requestUnsubscribe() + } + if err != nil { + if err == ErrClientQuit { + err = nil // Adhere to subscription semantics. + } + sub.err <- err + } + }) +} + +func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) { + select { + case sub.in <- result: + return true + case <-sub.quit: + return false + } +} + +func (sub *ClientSubscription) start() { + sub.quitWithError(sub.forward()) +} + +func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) { + cases := []reflect.SelectCase{ + {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)}, + {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)}, + {Dir: reflect.SelectSend, Chan: sub.channel}, + } + buffer := list.New() + defer buffer.Init() + for { + var chosen int + var recv reflect.Value + if buffer.Len() == 0 { + // Idle, omit send case. + chosen, recv, _ = reflect.Select(cases[:2]) + } else { + // Non-empty buffer, send the first queued item. + cases[2].Send = reflect.ValueOf(buffer.Front().Value) + chosen, recv, _ = reflect.Select(cases) + } + + switch chosen { + case 0: // <-sub.quit + return nil, false + case 1: // <-sub.in + val, err := sub.unmarshal(recv.Interface().(json.RawMessage)) + if err != nil { + return err, true + } + if buffer.Len() == maxClientSubscriptionBuffer { + return ErrSubscriptionQueueOverflow, true + } + buffer.PushBack(val) + case 2: // sub.channel<- + cases[2].Send = reflect.Value{} // Don't hold onto the value. + buffer.Remove(buffer.Front()) + } + } +} + +func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) { + val := reflect.New(sub.etype) + err := json.Unmarshal(result, val.Interface()) + return val.Elem().Interface(), err +} + +func (sub *ClientSubscription) requestUnsubscribe() error { + var result interface{} + return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid) +} diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go new file mode 100644 index 0000000..eba1924 --- /dev/null +++ b/rpc/subscription_test.go @@ -0,0 +1,206 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "encoding/json" + "fmt" + "net" + "strings" + "testing" + "time" +) + +func TestNewID(t *testing.T) { + hexchars := "0123456789ABCDEFabcdef" + for i := 0; i < 100; i++ { + id := string(NewID()) + if !strings.HasPrefix(id, "0x") { + t.Fatalf("invalid ID prefix, want '0x...', got %s", id) + } + + id = id[2:] + if len(id) == 0 || len(id) > 32 { + t.Fatalf("invalid ID length, want len(id) > 0 && len(id) <= 32), got %d", len(id)) + } + + for i := 0; i < len(id); i++ { + if strings.IndexByte(hexchars, id[i]) == -1 { + t.Fatalf("unexpected byte, want any valid hex char, got %c", id[i]) + } + } + } +} + +func TestSubscriptions(t *testing.T) { + var ( + namespaces = []string{"eth", "shh", "bzz"} + service = ¬ificationTestService{} + subCount = len(namespaces) + notificationCount = 3 + + server = NewServer() + clientConn, serverConn = net.Pipe() + out = json.NewEncoder(clientConn) + in = json.NewDecoder(clientConn) + successes = make(chan subConfirmation) + notifications = make(chan subscriptionResult) + errors = make(chan error, subCount*notificationCount+1) + ) + + // setup and start server + for _, namespace := range namespaces { + if err := server.RegisterName(namespace, service); err != nil { + t.Fatalf("unable to register test service %v", err) + } + } + go server.ServeCodec(NewJSONCodec(serverConn), OptionMethodInvocation|OptionSubscriptions) + defer server.Stop() + + // wait for message and write them to the given channels + go waitForMessages(in, successes, notifications, errors) + + // create subscriptions one by one + for i, namespace := range namespaces { + request := map[string]interface{}{ + "id": i, + "method": fmt.Sprintf("%s_subscribe", namespace), + "version": "2.0", + "params": []interface{}{"someSubscription", notificationCount, i}, + } + if err := out.Encode(&request); err != nil { + t.Fatalf("Could not create subscription: %v", err) + } + } + + timeout := time.After(30 * time.Second) + subids := make(map[string]string, subCount) + count := make(map[string]int, subCount) + allReceived := func() bool { + done := len(count) == subCount + for _, c := range count { + if c < notificationCount { + done = false + } + } + return done + } + for !allReceived() { + select { + case confirmation := <-successes: // subscription created + subids[namespaces[confirmation.reqid]] = string(confirmation.subid) + case notification := <-notifications: + count[notification.ID]++ + case err := <-errors: + t.Fatal(err) + case <-timeout: + for _, namespace := range namespaces { + subid, found := subids[namespace] + if !found { + t.Errorf("subscription for %q not created", namespace) + continue + } + if count, found := count[subid]; !found || count < notificationCount { + t.Errorf("didn't receive all notifications (%d<%d) in time for namespace %q", count, notificationCount, namespace) + } + } + t.Fatal("timed out") + } + } +} + +// This test checks that unsubscribing works. +func TestServerUnsubscribe(t *testing.T) { + // Start the server. + server := newTestServer() + service := ¬ificationTestService{unsubscribed: make(chan string)} + server.RegisterName("nftest2", service) + p1, p2 := net.Pipe() + go server.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions) + + p2.SetDeadline(time.Now().Add(10 * time.Second)) + + // Subscribe. + p2.Write([]byte(`{"jsonrpc":"2.0","id":1,"method":"nftest2_subscribe","params":["someSubscription",0,10]}`)) + + // Handle received messages. + resps := make(chan subConfirmation) + notifications := make(chan subscriptionResult) + errors := make(chan error) + go waitForMessages(json.NewDecoder(p2), resps, notifications, errors) + + // Receive the subscription ID. + var sub subConfirmation + select { + case sub = <-resps: + case err := <-errors: + t.Fatal(err) + } + + // Unsubscribe and check that it is handled on the server side. + p2.Write([]byte(`{"jsonrpc":"2.0","method":"nftest2_unsubscribe","params":["` + sub.subid + `"]}`)) + for { + select { + case id := <-service.unsubscribed: + if id != string(sub.subid) { + t.Errorf("wrong subscription ID unsubscribed") + } + return + case err := <-errors: + t.Fatal(err) + case <-notifications: + // drop notifications + } + } +} + +type subConfirmation struct { + reqid int + subid ID +} + +func waitForMessages(in *json.Decoder, successes chan subConfirmation, notifications chan subscriptionResult, errors chan error) { + for { + var msg jsonrpcMessage + if err := in.Decode(&msg); err != nil { + errors <- fmt.Errorf("decode error: %v", err) + return + } + switch { + case msg.isNotification(): + var res subscriptionResult + if err := json.Unmarshal(msg.Params, &res); err != nil { + errors <- fmt.Errorf("invalid subscription result: %v", err) + } else { + notifications <- res + } + case msg.isResponse(): + var c subConfirmation + if msg.Error != nil { + errors <- msg.Error + } else if err := json.Unmarshal(msg.Result, &c.subid); err != nil { + errors <- fmt.Errorf("invalid response: %v", err) + } else { + json.Unmarshal(msg.ID, &c.reqid) + successes <- c + } + default: + errors <- fmt.Errorf("unrecognized message: %v", msg) + return + } + } +} diff --git a/rpc/testservice_test.go b/rpc/testservice_test.go new file mode 100644 index 0000000..98871b5 --- /dev/null +++ b/rpc/testservice_test.go @@ -0,0 +1,180 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "context" + "encoding/binary" + "errors" + "sync" + "time" +) + +func newTestServer() *Server { + server := NewServer() + server.idgen = sequentialIDGenerator() + if err := server.RegisterName("test", new(testService)); err != nil { + panic(err) + } + if err := server.RegisterName("nftest", new(notificationTestService)); err != nil { + panic(err) + } + return server +} + +func sequentialIDGenerator() func() ID { + var ( + mu sync.Mutex + counter uint64 + ) + return func() ID { + mu.Lock() + defer mu.Unlock() + counter++ + id := make([]byte, 8) + binary.BigEndian.PutUint64(id, counter) + return encodeID(id) + } +} + +type testService struct{} + +type Args struct { + S string +} + +type Result struct { + String string + Int int + Args *Args +} + +func (s *testService) NoArgsRets() {} + +func (s *testService) Echo(str string, i int, args *Args) Result { + return Result{str, i, args} +} + +func (s *testService) EchoWithCtx(ctx context.Context, str string, i int, args *Args) Result { + return Result{str, i, args} +} + +func (s *testService) Sleep(ctx context.Context, duration time.Duration) { + time.Sleep(duration) +} + +func (s *testService) Rets() (string, error) { + return "", nil +} + +func (s *testService) InvalidRets1() (error, string) { + return nil, "" +} + +func (s *testService) InvalidRets2() (string, string) { + return "", "" +} + +func (s *testService) InvalidRets3() (string, string, error) { + return "", "", nil +} + +func (s *testService) CallMeBack(ctx context.Context, method string, args []interface{}) (interface{}, error) { + c, ok := ClientFromContext(ctx) + if !ok { + return nil, errors.New("no client") + } + var result interface{} + err := c.Call(&result, method, args...) + return result, err +} + +func (s *testService) CallMeBackLater(ctx context.Context, method string, args []interface{}) error { + c, ok := ClientFromContext(ctx) + if !ok { + return errors.New("no client") + } + go func() { + <-ctx.Done() + var result interface{} + c.Call(&result, method, args...) + }() + return nil +} + +func (s *testService) Subscription(ctx context.Context) (*Subscription, error) { + return nil, nil +} + +type notificationTestService struct { + unsubscribed chan string + gotHangSubscriptionReq chan struct{} + unblockHangSubscription chan struct{} +} + +func (s *notificationTestService) Echo(i int) int { + return i +} + +func (s *notificationTestService) Unsubscribe(subid string) { + if s.unsubscribed != nil { + s.unsubscribed <- subid + } +} + +func (s *notificationTestService) SomeSubscription(ctx context.Context, n, val int) (*Subscription, error) { + notifier, supported := NotifierFromContext(ctx) + if !supported { + return nil, ErrNotificationsUnsupported + } + + // By explicitly creating an subscription we make sure that the subscription id is send + // back to the client before the first subscription.Notify is called. Otherwise the + // events might be send before the response for the *_subscribe method. + subscription := notifier.CreateSubscription() + go func() { + for i := 0; i < n; i++ { + if err := notifier.Notify(subscription.ID, val+i); err != nil { + return + } + } + select { + case <-notifier.Closed(): + case <-subscription.Err(): + } + if s.unsubscribed != nil { + s.unsubscribed <- string(subscription.ID) + } + }() + return subscription, nil +} + +// HangSubscription blocks on s.unblockHangSubscription before sending anything. +func (s *notificationTestService) HangSubscription(ctx context.Context, val int) (*Subscription, error) { + notifier, supported := NotifierFromContext(ctx) + if !supported { + return nil, ErrNotificationsUnsupported + } + s.gotHangSubscriptionReq <- struct{}{} + <-s.unblockHangSubscription + subscription := notifier.CreateSubscription() + + go func() { + notifier.Notify(subscription.ID, val) + }() + return subscription, nil +} diff --git a/rpc/types.go b/rpc/types.go index 7312397..202dd79 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -17,6 +17,7 @@ package rpc import ( + "context" "fmt" "math" "strings" @@ -24,6 +25,39 @@ import ( "github.com/ava-labs/go-ethereum/common/hexutil" ) +// API describes the set of methods offered over the RPC interface +type API struct { + Namespace string // namespace under which the rpc methods of Service are exposed + Version string // api version for DApp's + Service interface{} // receiver instance which holds the methods + Public bool // indication if the methods must be considered safe for public use +} + +// Error wraps RPC errors, which contain an error code in addition to the message. +type Error interface { + Error() string // returns the message + ErrorCode() int // returns the code +} + +// ServerCodec implements reading, parsing and writing RPC messages for the server side of +// a RPC session. Implementations must be go-routine safe since the codec can be called in +// multiple go-routines concurrently. +type ServerCodec interface { + Read() (msgs []*jsonrpcMessage, isBatch bool, err error) + Close() + jsonWriter +} + +// jsonWriter can write JSON messages to its underlying connection. +// Implementations must be safe for concurrent use. +type jsonWriter interface { + Write(context.Context, interface{}) error + // Closed returns a channel which is closed when the connection is closed. + Closed() <-chan interface{} + // RemoteAddr returns the peer address of the connection. + RemoteAddr() string +} + type BlockNumber int64 const ( diff --git a/rpc/types_test.go b/rpc/types_test.go new file mode 100644 index 0000000..0465849 --- /dev/null +++ b/rpc/types_test.go @@ -0,0 +1,66 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "encoding/json" + "testing" + + "github.com/ava-labs/go-ethereum/common/math" +) + +func TestBlockNumberJSONUnmarshal(t *testing.T) { + tests := []struct { + input string + mustFail bool + expected BlockNumber + }{ + 0: {`"0x"`, true, BlockNumber(0)}, + 1: {`"0x0"`, false, BlockNumber(0)}, + 2: {`"0X1"`, false, BlockNumber(1)}, + 3: {`"0x00"`, true, BlockNumber(0)}, + 4: {`"0x01"`, true, BlockNumber(0)}, + 5: {`"0x1"`, false, BlockNumber(1)}, + 6: {`"0x12"`, false, BlockNumber(18)}, + 7: {`"0x7fffffffffffffff"`, false, BlockNumber(math.MaxInt64)}, + 8: {`"0x8000000000000000"`, true, BlockNumber(0)}, + 9: {"0", true, BlockNumber(0)}, + 10: {`"ff"`, true, BlockNumber(0)}, + 11: {`"pending"`, false, PendingBlockNumber}, + 12: {`"latest"`, false, LatestBlockNumber}, + 13: {`"earliest"`, false, EarliestBlockNumber}, + 14: {`someString`, true, BlockNumber(0)}, + 15: {`""`, true, BlockNumber(0)}, + 16: {``, true, BlockNumber(0)}, + } + + for i, test := range tests { + var num BlockNumber + err := json.Unmarshal([]byte(test.input), &num) + if test.mustFail && err == nil { + t.Errorf("Test %d should fail", i) + continue + } + if !test.mustFail && err != nil { + t.Errorf("Test %d should pass but got err: %v", i, err) + continue + } + if num != test.expected { + t.Errorf("Test %d got unexpected value, want %d, got %d", i, test.expected, num) + } + } +} diff --git a/rpc/websocket.go b/rpc/websocket.go new file mode 100644 index 0000000..d87e8a5 --- /dev/null +++ b/rpc/websocket.go @@ -0,0 +1,175 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "context" + "encoding/base64" + "fmt" + "net/http" + "net/url" + "os" + "strings" + "sync" + + mapset "github.com/deckarep/golang-set" + "github.com/ava-labs/go-ethereum/log" + "github.com/gorilla/websocket" +) + +const ( + wsReadBuffer = 1024 + wsWriteBuffer = 1024 +) + +var wsBufferPool = new(sync.Pool) + +// NewWSServer creates a new websocket RPC server around an API provider. +// +// Deprecated: use Server.WebsocketHandler +func NewWSServer(allowedOrigins []string, srv *Server) *http.Server { + return &http.Server{Handler: srv.WebsocketHandler(allowedOrigins)} +} + +// WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections. +// +// allowedOrigins should be a comma-separated list of allowed origin URLs. +// To allow connections with any origin, pass "*". +func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler { + var upgrader = websocket.Upgrader{ + ReadBufferSize: wsReadBuffer, + WriteBufferSize: wsWriteBuffer, + WriteBufferPool: wsBufferPool, + CheckOrigin: wsHandshakeValidator(allowedOrigins), + } + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Debug("WebSocket upgrade failed", "err", err) + return + } + codec := newWebsocketCodec(conn) + s.ServeCodec(codec, OptionMethodInvocation|OptionSubscriptions) + }) +} + +// wsHandshakeValidator returns a handler that verifies the origin during the +// websocket upgrade process. When a '*' is specified as an allowed origins all +// connections are accepted. +func wsHandshakeValidator(allowedOrigins []string) func(*http.Request) bool { + origins := mapset.NewSet() + allowAllOrigins := false + + for _, origin := range allowedOrigins { + if origin == "*" { + allowAllOrigins = true + } + if origin != "" { + origins.Add(strings.ToLower(origin)) + } + } + // allow localhost if no allowedOrigins are specified. + if len(origins.ToSlice()) == 0 { + origins.Add("http://localhost") + if hostname, err := os.Hostname(); err == nil { + origins.Add("http://" + strings.ToLower(hostname)) + } + } + log.Debug(fmt.Sprintf("Allowed origin(s) for WS RPC interface %v", origins.ToSlice())) + + f := func(req *http.Request) bool { + // Skip origin verification if no Origin header is present. The origin check + // is supposed to protect against browser based attacks. Browsers always set + // Origin. Non-browser software can put anything in origin and checking it doesn't + // provide additional security. + if _, ok := req.Header["Origin"]; !ok { + return true + } + // Verify origin against whitelist. + origin := strings.ToLower(req.Header.Get("Origin")) + if allowAllOrigins || origins.Contains(origin) { + return true + } + log.Warn("Rejected WebSocket connection", "origin", origin) + return false + } + + return f +} + +type wsHandshakeError struct { + err error + status string +} + +func (e wsHandshakeError) Error() string { + s := e.err.Error() + if e.status != "" { + s += " (HTTP status " + e.status + ")" + } + return s +} + +// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server +// that is listening on the given endpoint. +// +// The context is used for the initial connection establishment. It does not +// affect subsequent interactions with the client. +func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) { + endpoint, header, err := wsClientHeaders(endpoint, origin) + if err != nil { + return nil, err + } + dialer := websocket.Dialer{ + ReadBufferSize: wsReadBuffer, + WriteBufferSize: wsWriteBuffer, + WriteBufferPool: wsBufferPool, + } + return newClient(ctx, func(ctx context.Context) (ServerCodec, error) { + conn, resp, err := dialer.DialContext(ctx, endpoint, header) + if err != nil { + hErr := wsHandshakeError{err: err} + if resp != nil { + hErr.status = resp.Status + } + return nil, hErr + } + return newWebsocketCodec(conn), nil + }) +} + +func wsClientHeaders(endpoint, origin string) (string, http.Header, error) { + endpointURL, err := url.Parse(endpoint) + if err != nil { + return endpoint, nil, err + } + header := make(http.Header) + if origin != "" { + header.Add("origin", origin) + } + if endpointURL.User != nil { + b64auth := base64.StdEncoding.EncodeToString([]byte(endpointURL.User.String())) + header.Add("authorization", "Basic "+b64auth) + endpointURL.User = nil + } + return endpointURL.String(), header, nil +} + +func newWebsocketCodec(conn *websocket.Conn) ServerCodec { + conn.SetReadLimit(maxRequestContentLength) + return newCodec(conn, conn.WriteJSON, conn.ReadJSON) +} diff --git a/rpc/websocket_test.go b/rpc/websocket_test.go new file mode 100644 index 0000000..9dc1084 --- /dev/null +++ b/rpc/websocket_test.go @@ -0,0 +1,259 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "context" + "net" + "net/http" + "net/http/httptest" + "reflect" + "strings" + "testing" + "time" + + "github.com/gorilla/websocket" +) + +func TestWebsocketClientHeaders(t *testing.T) { + t.Parallel() + + endpoint, header, err := wsClientHeaders("wss://testuser:test-PASS_01@example.com:1234", "https://example.com") + if err != nil { + t.Fatalf("wsGetConfig failed: %s", err) + } + if endpoint != "wss://example.com:1234" { + t.Fatal("User should have been stripped from the URL") + } + if header.Get("authorization") != "Basic dGVzdHVzZXI6dGVzdC1QQVNTXzAx" { + t.Fatal("Basic auth header is incorrect") + } + if header.Get("origin") != "https://example.com" { + t.Fatal("Origin not set") + } +} + +// This test checks that the server rejects connections from disallowed origins. +func TestWebsocketOriginCheck(t *testing.T) { + t.Parallel() + + var ( + srv = newTestServer() + httpsrv = httptest.NewServer(srv.WebsocketHandler([]string{"http://example.com"})) + wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") + ) + defer srv.Stop() + defer httpsrv.Close() + + client, err := DialWebsocket(context.Background(), wsURL, "http://ekzample.com") + if err == nil { + client.Close() + t.Fatal("no error for wrong origin") + } + wantErr := wsHandshakeError{websocket.ErrBadHandshake, "403 Forbidden"} + if !reflect.DeepEqual(err, wantErr) { + t.Fatalf("wrong error for wrong origin: %q", err) + } + + // Connections without origin header should work. + client, err = DialWebsocket(context.Background(), wsURL, "") + if err != nil { + t.Fatal("error for empty origin") + } + client.Close() +} + +// This test checks whether calls exceeding the request size limit are rejected. +func TestWebsocketLargeCall(t *testing.T) { + t.Parallel() + + var ( + srv = newTestServer() + httpsrv = httptest.NewServer(srv.WebsocketHandler([]string{"*"})) + wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") + ) + defer srv.Stop() + defer httpsrv.Close() + + client, err := DialWebsocket(context.Background(), wsURL, "") + if err != nil { + t.Fatalf("can't dial: %v", err) + } + defer client.Close() + + // This call sends slightly less than the limit and should work. + var result Result + arg := strings.Repeat("x", maxRequestContentLength-200) + if err := client.Call(&result, "test_echo", arg, 1); err != nil { + t.Fatalf("valid call didn't work: %v", err) + } + if result.String != arg { + t.Fatal("wrong string echoed") + } + + // This call sends twice the allowed size and shouldn't work. + arg = strings.Repeat("x", maxRequestContentLength*2) + err = client.Call(&result, "test_echo", arg) + if err == nil { + t.Fatal("no error for too large call") + } +} + +// This test checks that client handles WebSocket ping frames correctly. +func TestClientWebsocketPing(t *testing.T) { + t.Parallel() + + var ( + sendPing = make(chan struct{}) + server = wsPingTestServer(t, sendPing) + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + ) + defer cancel() + defer server.Shutdown(ctx) + + client, err := DialContext(ctx, "ws://"+server.Addr) + if err != nil { + t.Fatalf("client dial error: %v", err) + } + resultChan := make(chan int) + sub, err := client.EthSubscribe(ctx, resultChan, "foo") + if err != nil { + t.Fatalf("client subscribe error: %v", err) + } + + // Wait for the context's deadline to be reached before proceeding. + // This is important for reproducing https://github.com/ethereum/go-ethereum/issues/19798 + <-ctx.Done() + close(sendPing) + + // Wait for the subscription result. + timeout := time.NewTimer(5 * time.Second) + for { + select { + case err := <-sub.Err(): + t.Error("client subscription error:", err) + case result := <-resultChan: + t.Log("client got result:", result) + return + case <-timeout.C: + t.Error("didn't get any result within the test timeout") + return + } + } +} + +// wsPingTestServer runs a WebSocket server which accepts a single subscription request. +// When a value arrives on sendPing, the server sends a ping frame, waits for a matching +// pong and finally delivers a single subscription result. +func wsPingTestServer(t *testing.T, sendPing <-chan struct{}) *http.Server { + var srv http.Server + shutdown := make(chan struct{}) + srv.RegisterOnShutdown(func() { + close(shutdown) + }) + srv.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Upgrade to WebSocket. + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + } + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Errorf("server WS upgrade error: %v", err) + return + } + defer conn.Close() + + // Handle the connection. + wsPingTestHandler(t, conn, shutdown, sendPing) + }) + + // Start the server. + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("can't listen:", err) + } + srv.Addr = listener.Addr().String() + go srv.Serve(listener) + return &srv +} + +func wsPingTestHandler(t *testing.T, conn *websocket.Conn, shutdown, sendPing <-chan struct{}) { + // Canned responses for the eth_subscribe call in TestClientWebsocketPing. + const ( + subResp = `{"jsonrpc":"2.0","id":1,"result":"0x00"}` + subNotify = `{"jsonrpc":"2.0","method":"eth_subscription","params":{"subscription":"0x00","result":1}}` + ) + + // Handle subscribe request. + if _, _, err := conn.ReadMessage(); err != nil { + t.Errorf("server read error: %v", err) + return + } + if err := conn.WriteMessage(websocket.TextMessage, []byte(subResp)); err != nil { + t.Errorf("server write error: %v", err) + return + } + + // Read from the connection to process control messages. + var pongCh = make(chan string) + conn.SetPongHandler(func(d string) error { + t.Logf("server got pong: %q", d) + pongCh <- d + return nil + }) + go func() { + for { + typ, msg, err := conn.ReadMessage() + if err != nil { + return + } + t.Logf("server got message (%d): %q", typ, msg) + } + }() + + // Write messages. + var ( + sendResponse <-chan time.Time + wantPong string + ) + for { + select { + case _, open := <-sendPing: + if !open { + sendPing = nil + } + t.Logf("server sending ping") + conn.WriteMessage(websocket.PingMessage, []byte("ping")) + wantPong = "ping" + case data := <-pongCh: + if wantPong == "" { + t.Errorf("unexpected pong") + } else if data != wantPong { + t.Errorf("got pong with wrong data %q", data) + } + wantPong = "" + sendResponse = time.NewTimer(200 * time.Millisecond).C + case <-sendResponse: + t.Logf("server sending response") + conn.WriteMessage(websocket.TextMessage, []byte(subNotify)) + sendResponse = nil + case <-shutdown: + conn.Close() + return + } + } +} -- cgit v1.2.3-70-g09d2 From 7feec02902d52a3abf722613eb9e218e015b723c Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 30 Jun 2020 17:05:50 -0400 Subject: make mc tx work --- core/evm.go | 7 +- core/gen_genesis.go | 2 + core/gen_genesis_account.go | 8 + core/genesis.go | 9 + core/state/statedb.go | 5 + core/tx_pool.go | 1 - core/types/gen_tx_json.go | 12 + core/types/transaction.go | 22 +- core/vm/evm.go | 2 - eth/backend.go | 2 - eth/gen_config.go | 12 + eth/tracers/internal/tracers/assets.go | 37 +-- examples/chain/main.go | 2 +- examples/counter/main.go | 2 +- examples/multicoin/main.go | 218 +++++++++++++ plugin/evm/static_service_test.go | 64 ---- plugin/evm/vm_genesis_parse_test.go | 32 -- rpc/client_example_test.go | 88 ----- rpc/client_test.go | 569 --------------------------------- rpc/http_test.go | 54 ---- rpc/server_test.go | 152 --------- rpc/subscription_test.go | 206 ------------ rpc/testservice_test.go | 180 ----------- rpc/types_test.go | 66 ---- rpc/websocket_test.go | 259 --------------- 25 files changed, 297 insertions(+), 1714 deletions(-) create mode 100644 examples/multicoin/main.go delete mode 100644 plugin/evm/static_service_test.go delete mode 100644 plugin/evm/vm_genesis_parse_test.go delete mode 100644 rpc/client_example_test.go delete mode 100644 rpc/client_test.go delete mode 100644 rpc/http_test.go delete mode 100644 rpc/server_test.go delete mode 100644 rpc/subscription_test.go delete mode 100644 rpc/testservice_test.go delete mode 100644 rpc/types_test.go delete mode 100644 rpc/websocket_test.go (limited to 'rpc') diff --git a/core/evm.go b/core/evm.go index ecab8e6..796b312 100644 --- a/core/evm.go +++ b/core/evm.go @@ -99,7 +99,7 @@ func CanTransferMC(db vm.StateDB, addr common.Address, to common.Address, coinID } if !db.IsMultiCoin(addr) { err := db.EnableMultiCoin(addr) - log.Debug("try to enable MC", "err", err) + log.Debug("try to enable MC", "addr", addr.Hex(), "err", err) } if !(db.IsMultiCoin(addr) && db.IsMultiCoin(to)) { // incompatible @@ -124,8 +124,5 @@ func TransferMultiCoin(db vm.StateDB, sender, recipient common.Address, coinID * return } db.SubBalanceMultiCoin(sender, *coinID, amount) - z := &big.Int{} - z.Add(amount, big.NewInt(1000000000000000000)) - log.Info("hi") - db.AddBalanceMultiCoin(recipient, *coinID, z) + db.AddBalanceMultiCoin(recipient, *coinID, amount) } diff --git a/core/gen_genesis.go b/core/gen_genesis.go index 05883c0..97175f7 100644 --- a/core/gen_genesis.go +++ b/core/gen_genesis.go @@ -15,6 +15,7 @@ import ( var _ = (*genesisSpecMarshaling)(nil) +// MarshalJSON marshals as JSON. func (g Genesis) MarshalJSON() ([]byte, error) { type Genesis struct { Config *params.ChainConfig `json:"config"` @@ -51,6 +52,7 @@ func (g Genesis) MarshalJSON() ([]byte, error) { return json.Marshal(&enc) } +// UnmarshalJSON unmarshals from JSON. func (g *Genesis) UnmarshalJSON(input []byte) error { type Genesis struct { Config *params.ChainConfig `json:"config"` diff --git a/core/gen_genesis_account.go b/core/gen_genesis_account.go index a7ef4a6..b90b658 100644 --- a/core/gen_genesis_account.go +++ b/core/gen_genesis_account.go @@ -14,11 +14,13 @@ import ( var _ = (*genesisAccountMarshaling)(nil) +// MarshalJSON marshals as JSON. func (g GenesisAccount) MarshalJSON() ([]byte, error) { type GenesisAccount struct { Code hexutil.Bytes `json:"code,omitempty"` Storage map[storageJSON]storageJSON `json:"storage,omitempty"` Balance *math.HexOrDecimal256 `json:"balance" gencodec:"required"` + MCBalance GenesisMultiCoinBalance `json:"mcbalance,omitempty"` Nonce math.HexOrDecimal64 `json:"nonce,omitempty"` PrivateKey hexutil.Bytes `json:"secretKey,omitempty"` } @@ -31,16 +33,19 @@ func (g GenesisAccount) MarshalJSON() ([]byte, error) { } } enc.Balance = (*math.HexOrDecimal256)(g.Balance) + enc.MCBalance = g.MCBalance enc.Nonce = math.HexOrDecimal64(g.Nonce) enc.PrivateKey = g.PrivateKey return json.Marshal(&enc) } +// UnmarshalJSON unmarshals from JSON. func (g *GenesisAccount) UnmarshalJSON(input []byte) error { type GenesisAccount struct { Code *hexutil.Bytes `json:"code,omitempty"` Storage map[storageJSON]storageJSON `json:"storage,omitempty"` Balance *math.HexOrDecimal256 `json:"balance" gencodec:"required"` + MCBalance *GenesisMultiCoinBalance `json:"mcbalance,omitempty"` Nonce *math.HexOrDecimal64 `json:"nonce,omitempty"` PrivateKey *hexutil.Bytes `json:"secretKey,omitempty"` } @@ -61,6 +66,9 @@ func (g *GenesisAccount) UnmarshalJSON(input []byte) error { return errors.New("missing required field 'balance' for GenesisAccount") } g.Balance = (*big.Int)(dec.Balance) + if dec.MCBalance != nil { + g.MCBalance = *dec.MCBalance + } if dec.Nonce != nil { g.Nonce = uint64(*dec.Nonce) } diff --git a/core/genesis.go b/core/genesis.go index 347beb3..ef490bf 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -78,11 +78,14 @@ func (ga *GenesisAlloc) UnmarshalJSON(data []byte) error { return nil } +type GenesisMultiCoinBalance map[common.Hash]*big.Int + // GenesisAccount is an account in the state of the genesis block. type GenesisAccount struct { Code []byte `json:"code,omitempty"` Storage map[common.Hash]common.Hash `json:"storage,omitempty"` Balance *big.Int `json:"balance" gencodec:"required"` + MCBalance GenesisMultiCoinBalance `json:"mcbalance,omitempty"` Nonce uint64 `json:"nonce,omitempty"` PrivateKey []byte `json:"secretKey,omitempty"` // for tests } @@ -261,6 +264,12 @@ func (g *Genesis) ToBlock(db ethdb.Database) *types.Block { for key, value := range account.Storage { statedb.SetState(addr, key, value) } + if account.MCBalance != nil { + statedb.ForceEnableMultiCoin(addr) + for coinID, value := range account.MCBalance { + statedb.AddBalanceMultiCoin(addr, coinID, value) + } + } } root := statedb.IntermediateRoot(false) head := &types.Header{ diff --git a/core/state/statedb.go b/core/state/statedb.go index 12deebe..1d3207d 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -248,6 +248,11 @@ func (self *StateDB) EnableMultiCoin(addr common.Address) error { return nil } +func (self *StateDB) ForceEnableMultiCoin(addr common.Address) { + stateObject := self.GetOrNewStateObject(addr) + stateObject.EnableMultiCoin() +} + func (self *StateDB) IsMultiCoin(addr common.Address) bool { stateObject := self.getStateObject(addr) if stateObject != nil { diff --git a/core/tx_pool.go b/core/tx_pool.go index 1acd488..5b2a3c0 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -528,7 +528,6 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { } // Ensure the transaction doesn't exceed the current block limit gas. if pool.currentMaxGas < tx.Gas() { - fmt.Println(pool.currentMaxGas, tx.Gas()) return ErrGasLimit } // Make sure the transaction is signed properly diff --git a/core/types/gen_tx_json.go b/core/types/gen_tx_json.go index 0410632..dd0d069 100644 --- a/core/types/gen_tx_json.go +++ b/core/types/gen_tx_json.go @@ -21,6 +21,8 @@ func (t txdata) MarshalJSON() ([]byte, error) { GasLimit hexutil.Uint64 `json:"gas" gencodec:"required"` Recipient *common.Address `json:"to" rlp:"nil"` Amount *hexutil.Big `json:"value" gencodec:"required"` + CoinID *common.Hash `json:"coinid" rlp:"nil"` + Amount2 *hexutil.Big `json:"value2" rlp:"nil"` Payload hexutil.Bytes `json:"input" gencodec:"required"` V *hexutil.Big `json:"v" gencodec:"required"` R *hexutil.Big `json:"r" gencodec:"required"` @@ -33,6 +35,8 @@ func (t txdata) MarshalJSON() ([]byte, error) { enc.GasLimit = hexutil.Uint64(t.GasLimit) enc.Recipient = t.Recipient enc.Amount = (*hexutil.Big)(t.Amount) + enc.CoinID = t.CoinID + enc.Amount2 = (*hexutil.Big)(t.Amount2) enc.Payload = t.Payload enc.V = (*hexutil.Big)(t.V) enc.R = (*hexutil.Big)(t.R) @@ -49,6 +53,8 @@ func (t *txdata) UnmarshalJSON(input []byte) error { GasLimit *hexutil.Uint64 `json:"gas" gencodec:"required"` Recipient *common.Address `json:"to" rlp:"nil"` Amount *hexutil.Big `json:"value" gencodec:"required"` + CoinID *common.Hash `json:"coinid" rlp:"nil"` + Amount2 *hexutil.Big `json:"value2" rlp:"nil"` Payload *hexutil.Bytes `json:"input" gencodec:"required"` V *hexutil.Big `json:"v" gencodec:"required"` R *hexutil.Big `json:"r" gencodec:"required"` @@ -78,6 +84,12 @@ func (t *txdata) UnmarshalJSON(input []byte) error { return errors.New("missing required field 'value' for txdata") } t.Amount = (*big.Int)(dec.Amount) + if dec.CoinID != nil { + t.CoinID = dec.CoinID + } + if dec.Amount2 != nil { + t.Amount2 = (*big.Int)(dec.Amount2) + } if dec.Payload == nil { return errors.New("missing required field 'input' for txdata") } diff --git a/core/types/transaction.go b/core/types/transaction.go index cf9e61a..858b443 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -49,8 +49,8 @@ type txdata struct { GasLimit uint64 `json:"gas" gencodec:"required"` Recipient *common.Address `json:"to" rlp:"nil"` // nil means contract creation Amount *big.Int `json:"value" gencodec:"required"` - CoinID *common.Hash `json:"coinid" rlp:"-"` - Amount2 *big.Int `json:"value2"` + CoinID *common.Hash `json:"coinid" rlp:"nil"` + Amount2 *big.Int `json:"value2" rlp:"nil"` Payload []byte `json:"input" gencodec:"required"` // Signature values @@ -67,7 +67,7 @@ type txdataMarshaling struct { Price *hexutil.Big GasLimit hexutil.Uint64 Amount *hexutil.Big - CoinID *hexutil.Bytes + CoinID *common.Hash Amount2 *hexutil.Big Payload hexutil.Bytes V *hexutil.Big @@ -183,14 +183,14 @@ func (tx *Transaction) UnmarshalJSON(input []byte) error { return nil } -func (tx *Transaction) Data() []byte { return common.CopyBytes(tx.data.Payload) } -func (tx *Transaction) Gas() uint64 { return tx.data.GasLimit } -func (tx *Transaction) GasPrice() *big.Int { return new(big.Int).Set(tx.data.Price) } -func (tx *Transaction) Value() *big.Int { return new(big.Int).Set(tx.data.Amount) } -func (tx *Transaction) CoinID() *big.Int { return big.NewInt(0) } -func (tx *Transaction) Value2() *big.Int { return big.NewInt(0) } -func (tx *Transaction) Nonce() uint64 { return tx.data.AccountNonce } -func (tx *Transaction) CheckNonce() bool { return true } +func (tx *Transaction) Data() []byte { return common.CopyBytes(tx.data.Payload) } +func (tx *Transaction) Gas() uint64 { return tx.data.GasLimit } +func (tx *Transaction) GasPrice() *big.Int { return new(big.Int).Set(tx.data.Price) } +func (tx *Transaction) Value() *big.Int { return new(big.Int).Set(tx.data.Amount) } +func (tx *Transaction) CoinID() *common.Hash { return tx.data.CoinID } +func (tx *Transaction) Value2() *big.Int { return new(big.Int).Set(tx.data.Amount2) } +func (tx *Transaction) Nonce() uint64 { return tx.data.AccountNonce } +func (tx *Transaction) CheckNonce() bool { return true } // To returns the recipient address of the transaction. // It returns nil if the transaction is a contract creation. diff --git a/core/vm/evm.go b/core/vm/evm.go index ff3587c..be8b240 100644 --- a/core/vm/evm.go +++ b/core/vm/evm.go @@ -17,7 +17,6 @@ package vm import ( - "fmt" "math/big" "sync/atomic" "time" @@ -260,7 +259,6 @@ func (evm *EVM) Call(caller ContractRef, addr common.Address, input []byte, gas // This allows the user transfer balance of a specified coinId in addition to a normal Call(). func (evm *EVM) CallExpert(caller ContractRef, addr common.Address, input []byte, gas uint64, value *big.Int, coinID *common.Hash, value2 *big.Int) (ret []byte, leftOverGas uint64, err error) { - fmt.Println("CallExpert") if evm.vmConfig.NoRecursion && evm.depth > 0 { return nil, gas, nil } diff --git a/eth/backend.go b/eth/backend.go index ab43558..983909c 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -173,8 +173,6 @@ func New(ctx *node.ServiceContext, config *Config, bcb: bcb, } - fmt.Println(eth.config.Genesis.GasLimit) - bcVersion := rawdb.ReadDatabaseVersion(chainDb) var dbVer = "" if bcVersion != nil { diff --git a/eth/gen_config.go b/eth/gen_config.go index 617a885..d34f0b3 100644 --- a/eth/gen_config.go +++ b/eth/gen_config.go @@ -49,6 +49,8 @@ func (c Config) MarshalTOML() (interface{}, error) { RPCGasCap *big.Int `toml:",omitempty"` Checkpoint *params.TrustedCheckpoint `toml:",omitempty"` CheckpointOracle *params.CheckpointOracleConfig `toml:",omitempty"` + OverrideIstanbul *big.Int + ManualCanonical bool } var enc Config enc.Genesis = c.Genesis @@ -82,6 +84,8 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.RPCGasCap = c.RPCGasCap enc.Checkpoint = c.Checkpoint enc.CheckpointOracle = c.CheckpointOracle + enc.OverrideIstanbul = c.OverrideIstanbul + enc.ManualCanonical = c.ManualCanonical return &enc, nil } @@ -119,6 +123,8 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { RPCGasCap *big.Int `toml:",omitempty"` Checkpoint *params.TrustedCheckpoint `toml:",omitempty"` CheckpointOracle *params.CheckpointOracleConfig `toml:",omitempty"` + OverrideIstanbul *big.Int + ManualCanonical *bool } var dec Config if err := unmarshal(&dec); err != nil { @@ -217,5 +223,11 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.CheckpointOracle != nil { c.CheckpointOracle = dec.CheckpointOracle } + if dec.OverrideIstanbul != nil { + c.OverrideIstanbul = dec.OverrideIstanbul + } + if dec.ManualCanonical != nil { + c.ManualCanonical = *dec.ManualCanonical + } return nil } diff --git a/eth/tracers/internal/tracers/assets.go b/eth/tracers/internal/tracers/assets.go index d0a0bf7..b1930dc 100644 --- a/eth/tracers/internal/tracers/assets.go +++ b/eth/tracers/internal/tracers/assets.go @@ -3,7 +3,7 @@ // 4byte_tracer.js (2.933kB) // bigram_tracer.js (1.712kB) // call_tracer.js (8.643kB) -// evmdis_tracer.js (4.194kB) +// evmdis_tracer.js (4.195kB) // noop_tracer.js (1.271kB) // opcount_tracer.js (1.372kB) // prestate_tracer.js (4.234kB) @@ -28,7 +28,7 @@ import ( func bindataRead(data []byte, name string) ([]byte, error) { gz, err := gzip.NewReader(bytes.NewBuffer(data)) if err != nil { - return nil, fmt.Errorf("read %q: %v", name, err) + return nil, fmt.Errorf("read %q: %w", name, err) } var buf bytes.Buffer @@ -36,7 +36,7 @@ func bindataRead(data []byte, name string) ([]byte, error) { clErr := gz.Close() if err != nil { - return nil, fmt.Errorf("read %q: %v", name, err) + return nil, fmt.Errorf("read %q: %w", name, err) } if clErr != nil { return nil, err @@ -137,7 +137,7 @@ func call_tracerJs() (*asset, error) { return a, nil } -var _evmdis_tracerJs = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x57\xdf\x6f\xda\xca\x12\x7e\x86\xbf\x62\x94\x27\x50\x29\x60\x63\x08\x38\x27\x47\xe2\xa6\xf4\x1c\xae\xd2\x24\x02\x72\x8f\x2a\x94\x87\x05\xc6\xb0\xaa\xf1\x5a\xbb\x6b\x72\xb8\x55\xfe\xf7\xab\xd9\x59\x03\xf9\x75\xdb\x4a\xa7\x0f\x3b\xb5\x77\xbe\x6f\xbe\x9d\x19\xcf\x92\x56\x0b\xae\x54\xbe\xd7\x72\xbd\xb1\x10\xb6\x83\x73\x98\x6d\x10\xd6\xea\x23\xda\x0d\x6a\x2c\xb6\x30\x2c\xec\x46\x69\x53\x6d\xb5\x60\xb6\x91\x06\x12\x99\x22\x48\x03\xb9\xd0\x16\x54\x02\xf6\x85\x7f\x2a\x17\x5a\xe8\x7d\xb3\xda\x6a\x31\xe6\xcd\x6d\x62\x48\x34\x22\x18\x95\xd8\x47\xa1\x31\x86\xbd\x2a\x60\x29\x32\xd0\xb8\x92\xc6\x6a\xb9\x28\x2c\x82\xb4\x20\xb2\x55\x4b\x69\xd8\xaa\x95\x4c\xf6\x44\x29\x2d\x14\xd9\x0a\xb5\x0b\x6d\x51\x6f\x4d\xa9\xe3\x8f\x9b\x7b\xb8\x46\x63\x50\xc3\x1f\x98\xa1\x16\x29\xdc\x15\x8b\x54\x2e\xe1\x5a\x2e\x31\x33\x08\xc2\x40\x4e\x6f\xcc\x06\x57\xb0\x70\x74\x04\xfc\x4c\x52\xa6\x5e\x0a\x7c\x56\x45\xb6\x12\x56\xaa\xac\x01\x28\x49\x39\xec\x50\x1b\xa9\x32\xe8\x94\xa1\x3c\x61\x03\x94\x26\x92\x9a\xb0\x74\x00\x0d\x2a\x27\x5c\x1d\x44\xb6\x87\x54\xd8\x23\xf4\x27\x12\x72\x3c\xf7\x0a\x64\xe6\xc2\x6c\x54\x8e\x60\x37\xc2\xd2\xa9\x1f\x65\x9a\xc2\x02\xa1\x30\x98\x14\x69\x83\xd8\x16\x85\x85\xbf\xc6\xb3\x3f\x6f\xef\x67\x30\xbc\xf9\x0a\x7f\x0d\x27\x93\xe1\xcd\xec\xeb\x05\x3c\x4a\xbb\x51\x85\x05\xdc\x21\x53\xc9\x6d\x9e\x4a\x5c\xc1\xa3\xd0\x5a\x64\x76\x0f\x2a\x21\x86\x2f\xa3\xc9\xd5\x9f\xc3\x9b\xd9\xf0\x5f\xe3\xeb\xf1\xec\x2b\x28\x0d\x9f\xc7\xb3\x9b\xd1\x74\x0a\x9f\x6f\x27\x30\x84\xbb\xe1\x64\x36\xbe\xba\xbf\x1e\x4e\xe0\xee\x7e\x72\x77\x3b\x1d\x35\x61\x8a\xa4\x0a\x09\xff\xe3\x9c\x27\xae\x7a\x1a\x61\x85\x56\xc8\xd4\x94\x99\xf8\xaa\x0a\x30\x1b\x55\xa4\x2b\xd8\x88\x1d\x82\xc6\x25\xca\x1d\xae\x40\xc0\x52\xe5\xfb\x9f\x2e\x2a\x71\x89\x54\x65\x6b\x77\xe6\x77\x1b\x12\xc6\x09\x64\xca\x36\xc0\x20\xc2\x6f\x1b\x6b\xf3\xb8\xd5\x7a\x7c\x7c\x6c\xae\xb3\xa2\xa9\xf4\xba\x95\x32\x9d\x69\xfd\xde\xac\x12\x27\xee\xb6\x2b\x69\x66\x5a\x2c\x51\x83\x46\x5b\xe8\xcc\x80\x29\x92\x84\xfc\x2c\xc8\x2c\x51\x7a\xeb\xda\x04\x12\xad\xb6\x20\xc0\x92\x2f\x58\x05\x39\x6a\xda\xf4\x14\x1f\x8d\xdd\xa7\x4e\xe6\x4a\x1a\x61\x0c\x6e\x17\xe9\xbe\x59\xfd\x5e\xad\x18\x2b\x96\xdf\x62\x98\x7f\x57\xb9\x89\x61\xfe\xf0\xf4\xd0\xa8\x56\x2b\x59\x5e\x98\x0d\x9a\x18\xbe\xb7\x63\x68\x37\x20\x88\x21\x68\x40\xe8\xd6\x8e\x5b\x23\xb7\x76\xdd\xda\x73\xeb\xb9\x5b\xfb\x6e\x1d\xb8\x35\x68\xb3\x61\x74\xc0\x6e\x01\xfb\x05\xec\x18\xb0\x67\xc8\x9e\xa1\x8f\xc3\x81\x42\x8e\x14\x72\xa8\x90\x63\x85\xcc\xd2\x61\x97\x88\x59\x22\x66\xe9\x32\x4b\x97\x59\xba\xec\xd2\x65\x96\xae\x17\xdc\x75\xe7\xe9\x32\x4b\xf7\x9c\x9f\x98\xa5\xcb\x2c\x3d\x3e\x72\x8f\x01\x3d\x7f\x44\x06\xf4\x58\x7c\x8f\x01\x3d\x06\xf4\x19\xd0\xe7\xb0\xfd\x90\x9f\x3a\x6c\x98\xa5\xcf\x61\xfb\x3d\x36\x1c\xb6\xcf\x2c\x7d\x66\x19\xb0\xf8\x41\xe0\xf6\x06\x1c\x6f\xc0\xf1\x06\x3e\xab\x65\x5a\x7d\x5e\xdb\x3e\xb1\xed\xd0\xdb\x8e\xb7\x91\xb7\x5d\x6f\x7d\xe6\xdb\x3e\xf5\x6d\x9f\xfb\xb6\xe7\x3b\xd4\xc9\xf3\x05\x9e\x2f\xf0\x7c\x81\xe7\x0b\x3c\x5f\x59\xc9\xb2\x94\x65\x2d\x7d\x31\x03\x5f\xcd\xc0\x97\x33\xf0\xf5\x0c\x7c\x41\x03\x5f\xd1\xc0\x97\x34\xf0\x35\x0d\x42\xcf\x17\xf6\x63\x08\xc9\x0e\x62\xe8\x34\x20\xe8\xb4\x63\x88\xc8\x06\x31\x74\xc9\x86\x31\xf4\xc8\x76\x62\x38\x27\x1b\xc5\xd0\x27\xdb\x8d\x61\x40\x96\xf8\xa8\x6b\x3b\x44\x48\x8c\x1d\x52\x48\x94\x1d\x92\x48\x9c\x11\x69\x24\xd2\x88\x44\x12\x6b\x44\x2a\x89\x36\x22\x99\xc4\x1b\x45\xac\x23\xea\xb2\x8e\xa8\xc7\x3a\xa2\x73\xd6\x41\xdd\xe7\x00\x03\xd6\x41\xfd\x47\x3a\xa8\x01\x49\x87\xeb\x40\xd2\xe1\x7a\x90\x74\xb8\x2e\x24\x4a\xea\x43\xa7\xc3\x75\x22\x91\x52\x2f\x3a\x1d\xae\x1b\x89\xd6\xf5\x23\xf1\xfa\x8e\x0c\x7a\x81\xb7\xa1\xb7\x1d\x6f\x23\x67\xc3\xc8\x7f\x45\x91\xff\x8c\x22\xff\x1d\x45\x1d\xbf\xef\xfd\xdc\x47\xf0\x44\xdf\x79\xab\x05\x1a\x4d\x91\x5a\x1a\xfe\x32\xdb\xa9\x6f\x34\x9e\x37\x98\x81\x48\x53\x37\xc7\x54\xbe\x54\x2b\x34\x3c\x1f\x17\x88\x19\x48\x8b\x5a\xd0\x05\xa1\x76\xa8\xe9\x6e\x2c\x27\x93\xa3\x23\x4c\x22\x33\x91\x96\xc4\x7e\x86\xd2\x60\x92\xd9\xba\x59\xad\xf0\xfb\x18\x92\x22\x5b\xd2\xe8\xaa\xd5\xe1\xbb\xa7\x00\xbb\x91\xa6\xe9\x46\xd2\xbc\xfd\xd0\x54\xb9\xb9\x80\x52\x67\x22\xde\x92\x49\xd4\x62\x69\x0b\x91\x02\xfe\x8d\xcb\xc2\xcd\x42\x95\x80\xc8\xbc\x72\x48\x78\xe0\x57\x1c\xfe\x24\x6a\xaa\xd6\x0d\x58\x2d\x28\x78\x19\xc2\x58\xcc\x4f\x23\xd0\xb5\x81\x3b\xd4\xfb\x92\xcb\x5d\x83\x14\xf2\x3f\x5f\x7c\x38\x24\x6a\xc2\xbd\xc9\x5c\xad\x54\x76\x42\x43\xa2\xc5\x16\xe1\xf2\xf4\x74\xc7\xff\x36\x53\xcc\xd6\x76\x03\x1f\x21\x78\xb8\xa8\x7a\x04\x6a\xad\x34\x5c\x42\xaa\xd6\xcd\x35\xda\x11\x3d\xd6\xea\x17\xd5\x4a\x45\x26\x50\x73\xbb\x4c\x5f\x71\xdc\xf3\x33\xf7\xea\xec\x01\x2e\x19\x4a\x9e\x4f\x80\xa9\x41\x20\x80\xa7\xf9\x84\xb9\xdd\xd4\xea\x70\x79\x2a\xc5\xc7\xf7\x74\x2a\xa7\x4b\x05\x2e\xf9\xa9\xa2\xf2\x18\xe8\x1f\x11\xa8\xbc\x69\xd5\x4d\xb1\x5d\xa0\xae\xd5\x1b\x6e\x7b\x45\x84\x10\xc3\x73\x7e\xde\x2b\xcb\x3c\x7f\x70\xcf\x4f\x24\xc9\xa9\x77\x8a\xa9\xb6\xe5\xc9\x7f\x87\xb6\x8f\xee\xce\x9e\x6b\xdc\xa9\x1c\x2e\xe1\xe0\x38\x7f\x05\xe1\x64\x11\x22\x51\xba\x46\x28\x09\x97\xd0\xbe\x00\x09\xbf\xf1\xd9\xfc\x0d\x36\x67\xb6\xa6\xca\x1f\x2e\x40\x7e\xf8\x50\x77\xa0\x8a\x7f\xcb\x1a\x9b\xe4\xea\x72\xc4\x09\xc9\x11\xbf\xd5\x64\xbd\x69\xd5\xd4\x6a\x99\xad\x6b\x41\xaf\xee\x72\x5f\x79\xa2\xc5\x3c\x4a\xbb\x64\x7f\x97\x12\xef\x54\xf7\x67\x58\x0a\x83\x70\x76\x35\xbc\xbe\x3e\x8b\xe1\xf8\x70\x75\xfb\x69\x74\x16\x1f\x0e\x29\x33\x63\xe9\xe7\x2b\x97\xf8\x24\x6e\xa7\xde\xdc\x89\xb4\xc0\xdb\x84\xeb\x7d\x70\x97\xff\xc5\xd7\xde\xd1\x2b\x6f\x2e\xe0\xfc\x6c\x2d\x8c\x6b\x87\x17\x80\xf6\xbb\x00\xab\xde\xf2\x0f\x9e\xa7\xe1\x39\xc4\x31\xbd\x85\x0a\x4f\x50\x2f\x30\x32\xcb\x0b\x7b\xc0\x6c\x71\xab\xf4\xbe\x69\xe8\x87\x4f\xcd\xe7\xa4\x71\x48\xce\x07\x7f\xee\x17\x14\xc7\x5e\xcf\x8a\x34\x7d\xbe\xc7\x73\xe4\x9d\x4d\x95\x73\x4e\xe6\xbe\x77\x4e\x3e\x02\xd7\x02\xec\xe7\xa3\x2d\x34\x8a\x6f\x17\xc7\x8a\x7e\x1a\x5d\x8f\xfe\x18\xce\x46\xcf\x2a\x3b\x9d\x0d\x67\xe3\x2b\x7e\xf5\xe3\xda\x86\xbf\x54\xdb\xd7\x9d\x70\x3c\x87\x3b\x06\xbc\x6a\xc1\xb7\x5b\xe0\x97\x7b\xe0\x97\x9a\xe0\x58\xd0\x7f\xa2\xa2\xff\xbf\xa4\xff\x74\x4d\x27\xa3\xd9\xfd\xe4\xe6\xa4\x74\xf4\xe7\xca\x4f\x7c\x33\xde\xf5\xed\xba\x05\xaf\xdc\x79\x7c\xf9\x2b\xee\x8d\xc6\x57\x85\x6d\xb8\xd0\x1f\x4a\xd6\x77\xf4\x4e\x67\xb7\x77\xc7\xde\xbb\x1f\x5f\x8d\x0f\x43\xe5\x47\x31\xda\x0d\x68\xbf\xc3\xfa\xef\xfb\x2f\x77\x9f\x46\xd3\x99\x67\x2a\x33\x9b\x2f\x0f\x9f\xe9\x1a\xed\xdd\x55\xed\x64\x06\xca\xa4\x9c\x7f\xd2\xdc\x51\x9a\xcb\xe9\x77\x40\xa7\x98\x1d\xe0\xcf\x6e\x0e\xf8\x08\xed\xbf\xbb\x78\xe4\x3a\x0e\xf7\x97\x05\xf3\x37\x98\x23\x3e\xd6\xf5\xd9\x45\x7a\x3c\xdd\xf3\x3b\x88\xf1\xd5\xca\x53\xf5\xa9\xfa\xbf\x00\x00\x00\xff\xff\x51\x4b\xdc\x7e\x62\x10\x00\x00") +var _evmdis_tracerJs = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x57\xdf\x6f\xda\xca\x12\x7e\x86\xbf\x62\x94\x27\x50\x29\x60\x63\x08\x38\x27\x47\xe2\xa6\xf4\x1c\xae\xd2\x24\x02\x72\x8f\x2a\x94\x87\x05\xc6\xb0\xaa\xf1\x5a\xbb\x6b\x72\xb8\x55\xfe\xf7\xab\xd9\x59\x03\xf9\x75\xdb\x4a\xa7\x0f\x3b\xb5\x77\xbe\x6f\xbe\x9d\x19\xcf\x92\x56\x0b\xae\x54\xbe\xd7\x72\xbd\xb1\x10\xb6\x83\x73\x98\x6d\x10\xd6\xea\x23\xda\x0d\x6a\x2c\xb6\x30\x2c\xec\x46\x69\x53\x6d\xb5\x60\xb6\x91\x06\x12\x99\x22\x48\x03\xb9\xd0\x16\x54\x02\xf6\x85\x7f\x2a\x17\x5a\xe8\x7d\xb3\xda\x6a\x31\xe6\xcd\x6d\x62\x48\x34\x22\x18\x95\xd8\x47\xa1\x31\x86\xbd\x2a\x60\x29\x32\xd0\xb8\x92\xc6\x6a\xb9\x28\x2c\x82\xb4\x20\xb2\x55\x4b\x69\xd8\xaa\x95\x4c\xf6\x44\x29\x2d\x14\xd9\x0a\xb5\x0b\x6d\x51\x6f\x4d\xa9\xe3\x8f\x9b\x7b\xb8\x46\x63\x50\xc3\x1f\x98\xa1\x16\x29\xdc\x15\x8b\x54\x2e\xe1\x5a\x2e\x31\x33\x08\xc2\x40\x4e\x6f\xcc\x06\x57\xb0\x70\x74\x04\xfc\x4c\x52\xa6\x5e\x0a\x7c\x56\x45\xb6\x12\x56\xaa\xac\x01\x28\x49\x39\xec\x50\x1b\xa9\x32\xe8\x94\xa1\x3c\x61\x03\x94\x26\x92\x9a\xb0\x74\x00\x0d\x2a\x27\x5c\x1d\x44\xb6\x87\x54\xd8\x23\xf4\x27\x12\x72\x3c\xf7\x0a\x64\xe6\xc2\x6c\x54\x8e\x60\x37\xc2\xd2\xa9\x1f\x65\x9a\xc2\x02\xa1\x30\x98\x14\x69\x83\xd8\x16\x85\x85\xbf\xc6\xb3\x3f\x6f\xef\x67\x30\xbc\xf9\x0a\x7f\x0d\x27\x93\xe1\xcd\xec\xeb\x05\x3c\x4a\xbb\x51\x85\x05\xdc\x21\x53\xc9\x6d\x9e\x4a\x5c\xc1\xa3\xd0\x5a\x64\x76\x0f\x2a\x21\x86\x2f\xa3\xc9\xd5\x9f\xc3\x9b\xd9\xf0\x5f\xe3\xeb\xf1\xec\x2b\x28\x0d\x9f\xc7\xb3\x9b\xd1\x74\x0a\x9f\x6f\x27\x30\x84\xbb\xe1\x64\x36\xbe\xba\xbf\x1e\x4e\xe0\xee\x7e\x72\x77\x3b\x1d\x35\x61\x8a\xa4\x0a\x09\xff\xe3\x9c\x27\xae\x7a\x1a\x61\x85\x56\xc8\xd4\x94\x99\xf8\xaa\x0a\x30\x1b\x55\xa4\x2b\xd8\x88\x1d\x82\xc6\x25\xca\x1d\xae\x40\xc0\x52\xe5\xfb\x9f\x2e\x2a\x71\x89\x54\x65\x6b\x77\xe6\x77\x1b\x12\xc6\x09\x64\xca\x36\xc0\x20\xc2\x6f\x1b\x6b\xf3\xb8\xd5\x7a\x7c\x7c\x6c\xae\xb3\xa2\xa9\xf4\xba\x95\x32\x9d\x69\xfd\xde\xac\x12\x27\xee\xb6\x2b\x69\x66\x5a\x2c\x51\x83\x46\x5b\xe8\xcc\x80\x29\x92\x44\x2e\x25\x66\x16\x64\x96\x28\xbd\x75\x7d\x02\x89\x56\x5b\x10\x60\xc9\x19\xac\x82\x1c\x35\x6d\x7a\x8e\x8f\xc6\xee\x53\xa7\x73\x25\x8d\x30\x06\xb7\x8b\x74\xdf\xac\x7e\xaf\x56\x8c\x15\xcb\x6f\x31\xcc\xbf\xab\xdc\xc4\x30\x7f\x78\x7a\x68\x54\xab\x95\x2c\x2f\xcc\x06\x4d\x0c\xdf\xdb\x31\xb4\x1b\x10\xc4\x10\x34\x20\x74\x6b\xc7\xad\x91\x5b\xbb\x6e\xed\xb9\xf5\xdc\xad\x7d\xb7\x0e\xdc\x1a\xb4\xd9\x30\x3a\x60\xb7\x80\xfd\x02\x76\x0c\xd8\x33\x64\xcf\xd0\xc7\xe1\x40\x21\x47\x0a\x39\x54\xc8\xb1\x42\x66\xe9\xb0\x4b\xc4\x2c\x11\xb3\x74\x99\xa5\xcb\x2c\x5d\x76\xe9\x32\x4b\xd7\x0b\xee\xba\xf3\x74\x99\xa5\x7b\xce\x4f\xcc\xd2\x65\x96\x1e\x1f\xb9\xc7\x80\x9e\x3f\x22\x03\x7a\x2c\xbe\xc7\x80\x1e\x03\xfa\x0c\xe8\x73\xd8\x7e\xc8\x4f\x1d\x36\xcc\xd2\xe7\xb0\xfd\x1e\x1b\x0e\xdb\x67\x96\x3e\xb3\x0c\x58\xfc\x20\x70\x7b\x03\x8e\x37\xe0\x78\x03\x9f\xd5\x32\xad\x3e\xaf\x6d\x9f\xd8\x76\xe8\x6d\xc7\xdb\xc8\xdb\xae\xb7\x3e\xf3\x6d\x9f\xfa\xb6\xcf\x7d\xdb\xf3\x1d\xea\xe4\xf9\x02\xcf\x17\x78\xbe\xc0\xf3\x05\x9e\xaf\xac\x64\x59\xca\xb2\x96\xbe\x98\x81\xaf\x66\xe0\xcb\x19\xf8\x7a\x06\xbe\xa0\x81\xaf\x68\xe0\x4b\x1a\xf8\x9a\x06\xa1\xe7\x0b\xfb\x31\x84\x64\x07\x31\x74\x1a\x10\x74\xda\x31\x44\x64\x83\x18\xba\x64\xc3\x18\x7a\x64\x3b\x31\x9c\x93\x8d\x62\xe8\x93\xed\xc6\x30\x20\x4b\x7c\xd4\xb5\x1d\x22\x24\xc6\x0e\x29\x24\xca\x0e\x49\x24\xce\x88\x34\x12\x69\x44\x22\x89\x35\x22\x95\x44\x1b\x91\x4c\xe2\x8d\x22\xd6\x11\x75\x59\x47\xd4\x63\x1d\xd1\x39\xeb\xa0\xee\x73\x80\x01\xeb\xa0\xfe\x23\x1d\xd4\x80\xa4\xc3\x75\x20\xe9\x70\x3d\x48\x3a\x5c\x17\x12\x25\xf5\xa1\xd3\xe1\x3a\x91\x48\xa9\x17\x9d\x0e\xd7\x8d\x44\xeb\xfa\x91\x78\x7d\x47\x06\xbd\xc0\xdb\xd0\xdb\x8e\xb7\x91\xb3\x61\xe4\xbf\xa2\xc8\x7f\x46\x91\xff\x8e\xa2\x8e\xdf\xf7\x7e\xee\x23\x78\xa2\xef\xbc\xd5\x02\x8d\xa6\x48\x2d\x4d\x7f\x99\xed\xd4\x37\x9a\xcf\x1b\xcc\x40\xa4\xa9\x1b\x64\x2a\x5f\xaa\x15\x1a\x1e\x90\x0b\xc4\x0c\xa4\x45\x2d\xe8\x86\x50\x3b\xd4\x74\x39\x96\xa3\xc9\xd1\x11\x26\x91\x99\x48\x4b\x62\x3f\x44\x69\x30\xc9\x6c\xdd\xac\x56\xf8\x7d\x0c\x49\x91\x2d\x69\x74\xd5\xea\xf0\xdd\x53\x80\xdd\x48\xd3\x74\x23\x69\xde\x7e\x68\xaa\xdc\x5c\x40\xa9\x33\x11\x6f\xc9\x24\x6a\xb1\xb4\x85\x48\x01\xff\xc6\x65\xe1\x66\xa1\x4a\x40\x64\x5e\x39\x24\x3c\xf1\x2b\x0e\x7f\x12\x35\x55\xeb\x06\xac\x16\x14\xbc\x0c\x61\x2c\xe6\xa7\x11\xe8\xde\xc0\x1d\xea\x7d\xc9\xe5\xee\x41\x0a\xf9\x9f\x2f\x3e\x1c\x12\x35\xe1\xde\x64\xae\x56\x2a\x3b\xa1\x21\xd1\x62\x8b\x70\x79\x7a\xba\xe3\x7f\x9b\x29\x66\x6b\xbb\x81\x8f\x10\x3c\x5c\x54\x3d\x02\xb5\x56\x1a\x2e\x21\x55\xeb\xe6\x1a\xed\x88\x1e\x6b\xf5\x8b\x6a\xa5\x22\x13\xa8\xb9\x5d\xa6\xaf\x38\xee\xf9\x99\x7b\x75\xf6\x00\x97\x0c\x25\xcf\x27\xc0\xd4\x20\x10\xc0\xd3\x7c\xc2\xdc\x6e\x6a\x75\xb8\x3c\x95\xe2\xe3\x7b\x3a\x95\xd3\xa5\x02\x97\xfc\x54\x51\x79\x0c\xf4\x8f\x08\x54\xde\xb4\xea\xa6\xd8\x2e\x50\xd7\xea\x0d\xb7\xbd\x22\x42\x88\xe1\x39\x3f\xef\x95\x65\x9e\x3f\xb8\xe7\x27\x92\xe4\xd4\x3b\xc5\x54\xdb\xf2\xe4\xbf\x43\xdb\x47\x77\x67\xcf\x35\xee\x54\x0e\x97\x70\x70\x9c\xbf\x82\x70\xb2\x08\x91\x28\x5d\x23\x94\x84\x4b\x68\x5f\x80\x84\xdf\xf8\x6c\xfe\x06\x9b\x33\x5b\x53\xe5\x0f\x17\x20\x3f\x7c\xa8\x3b\x50\xc5\xbf\x65\x8d\x4d\x72\x75\x39\xe2\x84\xe4\x88\xdf\x6a\xb2\xde\xb4\x6a\x6a\xb5\xcc\xd6\xb5\xa0\x57\x77\xb9\xaf\x3c\xd1\x62\x1e\xa5\x5d\xb2\xbf\x4b\x89\x77\xaa\xfb\x33\x2c\x85\x41\x38\xbb\x1a\x5e\x5f\x9f\xc5\x70\x7c\xb8\xba\xfd\x34\x3a\x8b\x0f\x87\x94\x99\xb1\xf4\xfb\x95\x4b\x7c\x12\xb7\x53\x6f\xee\x44\x5a\xe0\x6d\xc2\xf5\x3e\xb8\xcb\xff\xe2\x6b\xef\xe8\x95\x37\x17\x70\x7e\xb6\x16\xc6\xb5\xc3\x0b\x40\xfb\x5d\x80\x55\x6f\xf9\x07\xcf\xd3\xf0\x1c\xe2\x98\xde\x42\x85\x27\xa8\x17\x18\x99\xe5\x85\x3d\x60\xb6\xb8\x55\x7a\xdf\x34\xf4\xcb\xa7\xe6\x73\xd2\x38\x24\xe7\x83\x3f\xf7\x0b\x8a\x63\xaf\x67\x45\x9a\x3e\xdf\xe3\x39\xf2\xce\xa6\xca\x39\x27\x73\xdf\x3b\x27\x1f\x81\x6b\x01\xf6\xf3\xd1\x16\x1a\xc5\xb7\x8b\x63\x45\x3f\x8d\xae\x47\x7f\x0c\x67\xa3\x67\x95\x9d\xce\x86\xb3\xf1\x15\xbf\xfa\x71\x6d\xc3\x5f\xaa\xed\xeb\x4e\x38\x9e\xc3\x1d\x03\x5e\xb5\xe0\xdb\x2d\xf0\xcb\x3d\xf0\x4b\x4d\x70\x2c\xe8\x3f\x51\xd1\xff\x5f\xd2\x7f\xba\xa6\x93\xd1\xec\x7e\x72\x73\x52\x3a\xfa\x7b\xe5\x27\xbe\x19\xef\xfa\x76\xdd\x82\x57\xee\x3c\xbe\xfc\x15\xf7\x46\xe3\xab\xc2\x36\x5c\xe8\x0f\x25\xeb\x3b\x7a\xa7\xb3\xdb\xbb\x63\xef\xdd\x8f\xaf\xc6\x87\xa1\xf2\xa3\x18\xed\x06\xb4\xdf\x61\xfd\xf7\xfd\x97\xbb\x4f\xa3\xe9\xcc\x33\x95\x99\xcd\x97\x87\xcf\x74\x8d\xf6\xee\xaa\x76\x32\x03\x65\x52\xce\x3f\x69\xee\x28\xcd\xe5\xf4\x3b\xa0\x53\xcc\x0e\xf0\x67\x37\x07\x7c\x84\xf6\xdf\x5d\x3c\x72\x1d\x87\xfb\xcb\x82\xf9\x1b\xcc\x11\x1f\xeb\xfa\xec\x22\x3d\x9e\xee\xf9\x1d\xc4\xf8\x6a\xe5\xa9\xfa\x54\xfd\x5f\x00\x00\x00\xff\xff\xdf\x2f\xd9\xfa\x63\x10\x00\x00") func evmdis_tracerJsBytes() ([]byte, error) { return bindataRead( @@ -153,7 +153,7 @@ func evmdis_tracerJs() (*asset, error) { } info := bindataFileInfo{name: "evmdis_tracer.js", size: 0, mode: os.FileMode(0), modTime: time.Unix(0, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xd5, 0xe8, 0x96, 0xa1, 0x8b, 0xc, 0x68, 0x3c, 0xe8, 0x5d, 0x7e, 0xf0, 0xab, 0xfe, 0xec, 0xd1, 0xb, 0x3d, 0xfc, 0xc7, 0xac, 0xb5, 0xa, 0x41, 0x55, 0x0, 0x3a, 0x60, 0xa7, 0x8e, 0x46, 0x93}} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xb5, 0xc8, 0x73, 0x8e, 0xfb, 0x1f, 0x84, 0x7d, 0x37, 0xd9, 0x26, 0x24, 0x37, 0xb8, 0x65, 0xb1, 0xed, 0xa0, 0x76, 0x9a, 0xf0, 0x8e, 0x3a, 0x9b, 0x20, 0x93, 0x27, 0x26, 0x2e, 0xc9, 0x9b, 0xde}} return a, nil } @@ -348,25 +348,20 @@ func AssetNames() []string { // _bindata is a table, holding each asset generator, mapped to its name. var _bindata = map[string]func() (*asset, error){ - "4byte_tracer.js": _4byte_tracerJs, - - "bigram_tracer.js": bigram_tracerJs, - - "call_tracer.js": call_tracerJs, - - "evmdis_tracer.js": evmdis_tracerJs, - - "noop_tracer.js": noop_tracerJs, - - "opcount_tracer.js": opcount_tracerJs, - + "4byte_tracer.js": _4byte_tracerJs, + "bigram_tracer.js": bigram_tracerJs, + "call_tracer.js": call_tracerJs, + "evmdis_tracer.js": evmdis_tracerJs, + "noop_tracer.js": noop_tracerJs, + "opcount_tracer.js": opcount_tracerJs, "prestate_tracer.js": prestate_tracerJs, - - "trigram_tracer.js": trigram_tracerJs, - - "unigram_tracer.js": unigram_tracerJs, + "trigram_tracer.js": trigram_tracerJs, + "unigram_tracer.js": unigram_tracerJs, } +// AssetDebug is true if the assets were built with the debug flag enabled. +const AssetDebug = false + // AssetDir returns the file names below a certain // directory embedded in the file by go-bindata. // For example if you run go-bindata on data/... and data contains the diff --git a/examples/chain/main.go b/examples/chain/main.go index d474c2b..5edd055 100644 --- a/examples/chain/main.go +++ b/examples/chain/main.go @@ -7,10 +7,10 @@ import ( "github.com/ava-labs/coreth/core" "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/eth" + "github.com/ava-labs/coreth/params" "github.com/ava-labs/go-ethereum/common" "github.com/ava-labs/go-ethereum/common/hexutil" "github.com/ava-labs/go-ethereum/log" - "github.com/ava-labs/go-ethereum/params" "github.com/ava-labs/go-ethereum/rlp" "math/big" "sync" diff --git a/examples/counter/main.go b/examples/counter/main.go index 48a0e6f..85aa9d1 100644 --- a/examples/counter/main.go +++ b/examples/counter/main.go @@ -9,11 +9,11 @@ import ( "github.com/ava-labs/coreth/core" "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/eth" + "github.com/ava-labs/coreth/params" "github.com/ava-labs/go-ethereum/common" "github.com/ava-labs/go-ethereum/common/compiler" "github.com/ava-labs/go-ethereum/crypto" "github.com/ava-labs/go-ethereum/log" - "github.com/ava-labs/go-ethereum/params" "go/build" "math/big" "os" diff --git a/examples/multicoin/main.go b/examples/multicoin/main.go new file mode 100644 index 0000000..3e42010 --- /dev/null +++ b/examples/multicoin/main.go @@ -0,0 +1,218 @@ +package main + +import ( + "crypto/rand" + //"encoding/hex" + "encoding/json" + "fmt" + "github.com/ava-labs/coreth" + "github.com/ava-labs/coreth/core" + "github.com/ava-labs/coreth/eth" + //"github.com/ava-labs/coreth/accounts/abi" + "github.com/ava-labs/coreth/core/types" + "github.com/ava-labs/coreth/params" + "github.com/ava-labs/go-ethereum/common" + "github.com/ava-labs/go-ethereum/common/compiler" + "github.com/ava-labs/go-ethereum/crypto" + "github.com/ava-labs/go-ethereum/log" + "go/build" + "math/big" + "os" + "os/signal" + "path/filepath" + //"strings" + "syscall" + "time" +) + +var ( + codeHex = "6101c7610026600b82828239805160001a60731461001957fe5b30600052607381538281f3fe730000000000000000000000000000000000000000301460806040526004361061004b5760003560e01c80631e01043914610050578063abb24ba014610092578063b6510bb3146100a9575b600080fd5b61007c6004803603602081101561006657600080fd5b8101908080359060200190929190505050610118565b6040518082815260200191505060405180910390f35b81801561009e57600080fd5b506100a761013b565b005b8180156100b557600080fd5b50610116600480360360808110156100cc57600080fd5b81019080803573ffffffffffffffffffffffffffffffffffffffff16906020019092919080359060200190929190803590602001909291908035906020019092919050505061013e565b005b60003373ffffffffffffffffffffffffffffffffffffffff1682905d9050919050565b5c565b8373ffffffffffffffffffffffffffffffffffffffff1681836108fc8690811502906040516000604051808303818888878c8af69550505050505015801561018a573d6000803e3d6000fd5b505050505056fea26469706673582212204e4689067358d8e82903012dcaca0c004d36e39fc99360919fb824c4ce718bdd64736f6c634300060a0033" + codeABI = `[{"inputs":[{"internalType":"uint256","name":"coinid","type":"uint256"}],"name":"getBalance","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"}]` +) + +func checkError(err error) { + if err != nil { + panic(err) + } +} + +func main() { + // configure the chain + config := eth.DefaultConfig + config.ManualCanonical = true + chainConfig := ¶ms.ChainConfig{ + ChainID: big.NewInt(1), + HomesteadBlock: big.NewInt(0), + DAOForkBlock: big.NewInt(0), + DAOForkSupport: true, + EIP150Block: big.NewInt(0), + EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"), + EIP155Block: big.NewInt(0), + EIP158Block: big.NewInt(0), + ByzantiumBlock: big.NewInt(0), + ConstantinopleBlock: big.NewInt(0), + PetersburgBlock: big.NewInt(0), + IstanbulBlock: nil, + Ethash: nil, + } + + // configure the genesis block + //genBalance := big.NewInt(100000000000000000) + genKey, _ := coreth.NewKey(rand.Reader) + bob, _ := coreth.NewKey(rand.Reader) + + g := new(core.Genesis) + b := `{"config":{"chainId":1,"homesteadBlock":0,"daoForkBlock":0,"daoForkSupport":true,"eip150Block":0,"eip150Hash":"0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0","eip155Block":0,"eip158Block":0,"byzantiumBlock":0,"constantinopleBlock":0,"petersburgBlock":0},"nonce":"0x0","timestamp":"0x0","extraData":"0x00","gasLimit":"0x5f5e100","difficulty":"0x0","mixHash":"0x0000000000000000000000000000000000000000000000000000000000000000","coinbase":"0x0000000000000000000000000000000000000000","alloc":{"751a0b96e1042bee789452ecb20253fba40dbe85":{"balance":"0x1000000000000000", "mcbalance": {"0x0000000000000000000000000000000000000000000000000000000000000000": 1000000000000000000}}},"number":"0x0","gasUsed":"0x0","parentHash":"0x0000000000000000000000000000000000000000000000000000000000000000"}` + k := "0xabd71b35d559563fea757f0f5edbde286fb8c043105b15abb7cd57189306d7d1" + err := json.Unmarshal([]byte(b), g) + checkError(err) + config.Genesis = g + hk, _ := crypto.HexToECDSA(k[2:]) + genKey = coreth.NewKeyFromECDSA(hk) + //config.Genesis = &core.Genesis{ + // Config: chainConfig, + // Nonce: 0, + // Number: 0, + // ExtraData: hexutil.MustDecode("0x00"), + // GasLimit: 100000000, + // Difficulty: big.NewInt(0), + // Alloc: core.GenesisAlloc{genKey.Address: {Balance: genBalance}}, + //} + + // grab the control of block generation and disable auto uncle + config.Miner.ManualMining = true + config.Miner.ManualUncle = true + + // compile the smart contract + gopath := os.Getenv("GOPATH") + if gopath == "" { + gopath = build.Default.GOPATH + } + + // info required to generate a transaction + chainID := chainConfig.ChainID + nonce := uint64(0) + gasLimit := 10000000 + gasPrice := big.NewInt(1000000000) + + blockCount := 0 + chain := coreth.NewETHChain(&config, nil, nil, nil) + newTxPoolHeadChan := make(chan core.NewTxPoolHeadEvent, 1) + log.Info(chain.GetGenesisBlock().Hash().Hex()) + firstBlock := false + var contractAddr common.Address + coin0 := common.HexToHash("0x0") + //var calls [][]byte + postGen := func(block *types.Block) bool { + if blockCount == 15 { + state, err := chain.CurrentState() + checkError(err) + log.Info(fmt.Sprintf("genesis balance = %s", state.GetBalance(genKey.Address))) + log.Info(fmt.Sprintf("genesis balance2 = %s", state.GetBalanceMultiCoin(genKey.Address, coin0))) + log.Info(fmt.Sprintf("contract balance = %s", state.GetBalance(contractAddr))) + log.Info(fmt.Sprintf("bob's balance = %s", state.GetBalance(bob.Address))) + log.Info(fmt.Sprintf("bob's balance2 = %s", state.GetBalanceMultiCoin(bob.Address, coin0))) + log.Info(fmt.Sprintf("state = %s", state.Dump(true, false, true))) + log.Info(fmt.Sprintf("x = %s", state.GetState(contractAddr, common.BigToHash(big.NewInt(0))).String())) + return true + } + if !firstBlock { + firstBlock = true + receipts := chain.GetReceiptsByHash(block.Hash()) + if len(receipts) != 1 { + panic(fmt.Sprintf("# receipts is %d != 1", len(receipts))) + } + contractAddr = receipts[0].ContractAddress + txHash := receipts[0].TxHash + log.Info(fmt.Sprintf("deploy tx = %s", txHash.String())) + log.Info(fmt.Sprintf("contract addr = %s", contractAddr.String())) + //var call []byte + //if len(calls) > 0 { + // call = calls[0] + // calls = calls[1:] + //} else { + // call = nil + //} + //state, _ := chain.CurrentState() + //call := common.Hex2Bytes("1003e2d20000000000000000000000000000000000000000000000000000000000000001") + //log.Info(fmt.Sprintf("code = %s", hex.EncodeToString(state.GetCode(contractAddr)))) + go func() { + tx := types.NewTransaction(nonce, bob.Address, big.NewInt(300000000000000000), uint64(gasLimit), gasPrice, nil) + signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), genKey.PrivateKey) + checkError(err) + chain.AddRemoteTxs([]*types.Transaction{signedTx}) + nonce++ + time.Sleep(20 * time.Millisecond) + + // self-loop tx to enable MC + tx = types.NewTransaction(0, bob.Address, big.NewInt(1), uint64(gasLimit), gasPrice, nil) + tx.SetMultiCoinValue(&coin0, big.NewInt(0)) + signedTx, err = types.SignTx(tx, types.NewEIP155Signer(chainID), bob.PrivateKey) + checkError(err) + chain.AddRemoteTxs([]*types.Transaction{signedTx}) + time.Sleep(20 * time.Millisecond) + + for i := 0; i < 10; i++ { + tx := types.NewTransaction(nonce, bob.Address, big.NewInt(10000000000000000), uint64(gasLimit), gasPrice, nil) + tx.SetMultiCoinValue(&coin0, big.NewInt(100000000000000000)) + signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), genKey.PrivateKey) + checkError(err) + chain.AddRemoteTxs([]*types.Transaction{signedTx}) + nonce++ + } + }() + } + return false + } + chain.SetOnHeaderNew(func(header *types.Header) { + hid := make([]byte, 32) + _, err := rand.Read(hid) + if err != nil { + panic("cannot generate hid") + } + header.Extra = append(header.Extra, hid...) + }) + chain.SetOnSealFinish(func(block *types.Block) error { + blockCount++ + if postGen(block) { + return nil + } + go func() { + <-newTxPoolHeadChan + time.Sleep(10 * time.Millisecond) + chain.GenBlock() + }() + return nil + }) + + // start the chain + chain.GetTxPool().SubscribeNewHeadEvent(newTxPoolHeadChan) + chain.Start() + + //code := common.Hex2Bytes(codeHex) + counterSrc, err := filepath.Abs(gopath + "/src/github.com/ava-labs/coreth/examples/counter/counter.sol") + checkError(err) + contracts, err := compiler.CompileSolidity("", counterSrc) + checkError(err) + contract, _ := contracts[fmt.Sprintf("%s:%s", counterSrc, "Counter")] + code := common.Hex2Bytes(contract.Code[2:]) + + //abi, err := abi.JSON(strings.NewReader(codeABI)) + //cc, err := abi.Pack("getBalance", big.NewInt(0)) + //checkError(err) + //calls = append(calls, cc) + tx := types.NewContractCreation(nonce, big.NewInt(0), uint64(gasLimit), gasPrice, code) + signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), genKey.PrivateKey) + checkError(err) + chain.AddRemoteTxs([]*types.Transaction{signedTx}) + time.Sleep(1000 * time.Millisecond) + nonce++ + + chain.GenBlock() + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + signal.Notify(c, os.Interrupt, syscall.SIGINT) + <-c + chain.Stop() +} diff --git a/plugin/evm/static_service_test.go b/plugin/evm/static_service_test.go deleted file mode 100644 index c492798..0000000 --- a/plugin/evm/static_service_test.go +++ /dev/null @@ -1,64 +0,0 @@ -// (c) 2019-2020, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package evm - -import ( - "math/big" - "testing" - - "github.com/ava-labs/go-ethereum/common" - "github.com/ava-labs/go-ethereum/params" - - "github.com/ava-labs/coreth/core" -) - -func TestBuildGenesis(t *testing.T) { - expected := "3wP629bGfSGj9trh1UNBp5qGRGCcma5d8ezLeSmd9hnUJjSMUJesHHoxbZNcVUC9CjH7PEGNA96htNTd1saZCMt1Mf1dZFG7JDhcYNok6RS4TZufejXdxbVVgquohSa7nCCcrXpiVeiRFwzLJAxyQbXzYRhaCRtcDDfCcqfaVdtkFsPbNeQ49pDTbEC5hVkmfopeQ2Zz8tAG5QXKBdbYBCukR3xNHJ4xDxeixmEwPr1odb42yQRYrL7xREKNn2LFoFwAWUjBTsCkf5GPNgY2GvvN9o8wFWXTroW5fp754DhpdxHYxkMTfuE9DGyNWHTyrEbrUHutUdsfitcSHVj5ctFtkN2wGCs3cyv1eRRNvFFMggWTbarjne6AYaeCrJ631qAu3CbrUtrTH5N2E6G2yQKX4sT4Sk3qWPJdsGXuT95iKKcgNn1u5QRHHw9DXXuGPpJjkcKQRGUCuqpXy61iF5RNPEwAwKDa8f2Y25WMmNgWynUuLj8iSAyePj7USPWk54QFUr86ApVzqAdzzdD1qSVScpmudGnGbz9UNXdzHqSot6XLrNTYsgkabiu6TGntFm7qywbCRmtNdBuT9aznGQdUVimjt5QzUz68HXhUxBzTkrz7yXfVGV5JcWxVHQXYS4oc41U5yu83mH3A7WBrZLVq6UyNrvQVbim5nDxeKKbALPxwzVwywjgY5cp39AvzGnY8CX2AtuBNnKmZaAvG8JWAkx3yxjnJrwWhLgpDQYcCvRp2jg1EPBqN8FKJxSPE6eedjDHDJfB57mNzyEtmg22BPnem3eLdiovX8awkhBUHdE7uPrapNSVprnS85u1saW2Kwza3FsS2jAM3LckGW8KdtfPTpHBTRKAUo49zZLuPsyGL5WduedGyAdaM3a2KPoyXuz4UbexTVUWFNypFvvgyoDS8FMxDCNoMMaD7y4yVnoDpSpVFEVZD6EuSGHe9U8Ew57xLPbjhepDx6" - - balance, success := new(big.Int).SetString("33b2e3c9fd0804000000000", 16) - if !success { - t.Fatal("Failed to initialize balance") - } - - args := core.Genesis{ - Config: ¶ms.ChainConfig{ - ChainID: big.NewInt(43110), - HomesteadBlock: big.NewInt(0), - DAOForkBlock: big.NewInt(0), - DAOForkSupport: true, - EIP150Block: big.NewInt(0), - EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"), - EIP155Block: big.NewInt(0), - EIP158Block: big.NewInt(0), - ByzantiumBlock: big.NewInt(0), - ConstantinopleBlock: big.NewInt(0), - PetersburgBlock: big.NewInt(0), - }, - Nonce: 0, - Timestamp: 0, - ExtraData: []byte{}, - GasLimit: 100000000, - Difficulty: big.NewInt(0), - Mixhash: common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"), - Coinbase: common.HexToAddress("0x0000000000000000000000000000000000000000"), - Alloc: core.GenesisAlloc{ - common.HexToAddress("751a0b96e1042bee789452ecb20253fba40dbe85"): core.GenesisAccount{ - Balance: balance, - }, - }, - Number: 0, - GasUsed: 0, - ParentHash: common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"), - } - - ss := StaticService{} - result, err := ss.BuildGenesis(nil, &args) - if err != nil { - t.Fatal(err) - } - - if result.String() != expected { - t.Fatalf("StaticService.BuildGenesis:\nReturned: %s\nExpected: %s", result, expected) - } -} diff --git a/plugin/evm/vm_genesis_parse_test.go b/plugin/evm/vm_genesis_parse_test.go deleted file mode 100644 index 9a113fb..0000000 --- a/plugin/evm/vm_genesis_parse_test.go +++ /dev/null @@ -1,32 +0,0 @@ -// (c) 2019-2020, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package evm - -import ( - "encoding/json" - "testing" - - "github.com/ava-labs/coreth/core" -) - -func TestParseGenesis(t *testing.T) { - genesis := []byte(`{"config":{"chainId":43110,"homesteadBlock":0,"daoForkBlock":0,"daoForkSupport":true,"eip150Block":0,"eip150Hash":"0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0","eip155Block":0,"eip158Block":0,"byzantiumBlock":0,"constantinopleBlock":0,"petersburgBlock":0},"nonce":"0x0","timestamp":"0x0","extraData":"0x00","gasLimit":"0x5f5e100","difficulty":"0x0","mixHash":"0x0000000000000000000000000000000000000000000000000000000000000000","coinbase":"0x0000000000000000000000000000000000000000","alloc":{"751a0b96e1042bee789452ecb20253fba40dbe85":{"balance":"0x33b2e3c9fd0804000000000"}},"number":"0x0","gasUsed":"0x0","parentHash":"0x0000000000000000000000000000000000000000000000000000000000000000"}`) - - genesisBlock := new(core.Genesis) - err := json.Unmarshal(genesis, genesisBlock) - if err != nil { - t.Fatal(err) - } - - marshalledBytes, err := json.Marshal(genesisBlock) - if err != nil { - t.Fatal(err) - } - - secondGenesisBlock := new(core.Genesis) - err = json.Unmarshal(marshalledBytes, secondGenesisBlock) - if err != nil { - t.Fatal(err) - } -} diff --git a/rpc/client_example_test.go b/rpc/client_example_test.go deleted file mode 100644 index 149de2c..0000000 --- a/rpc/client_example_test.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package rpc_test - -import ( - "context" - "fmt" - "math/big" - "time" - - "github.com/ava-labs/go-ethereum/rpc" -) - -// In this example, our client wishes to track the latest 'block number' -// known to the server. The server supports two methods: -// -// eth_getBlockByNumber("latest", {}) -// returns the latest block object. -// -// eth_subscribe("newBlocks") -// creates a subscription which fires block objects when new blocks arrive. - -type Block struct { - Number *big.Int -} - -func ExampleClientSubscription() { - // Connect the client. - client, _ := rpc.Dial("ws://127.0.0.1:8485") - subch := make(chan Block) - - // Ensure that subch receives the latest block. - go func() { - for i := 0; ; i++ { - if i > 0 { - time.Sleep(2 * time.Second) - } - subscribeBlocks(client, subch) - } - }() - - // Print events from the subscription as they arrive. - for block := range subch { - fmt.Println("latest block:", block.Number) - } -} - -// subscribeBlocks runs in its own goroutine and maintains -// a subscription for new blocks. -func subscribeBlocks(client *rpc.Client, subch chan Block) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - // Subscribe to new blocks. - sub, err := client.EthSubscribe(ctx, subch, "newHeads") - if err != nil { - fmt.Println("subscribe error:", err) - return - } - - // The connection is established now. - // Update the channel with the current block. - var lastBlock Block - if err := client.CallContext(ctx, &lastBlock, "eth_getBlockByNumber", "latest"); err != nil { - fmt.Println("can't get latest block:", err) - return - } - subch <- lastBlock - - // The subscription will deliver events to the channel. Wait for the - // subscription to end for any reason, then loop around to re-establish - // the connection. - fmt.Println("connection lost: ", <-sub.Err()) -} diff --git a/rpc/client_test.go b/rpc/client_test.go deleted file mode 100644 index 79ea32e..0000000 --- a/rpc/client_test.go +++ /dev/null @@ -1,569 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package rpc - -import ( - "context" - "fmt" - "math/rand" - "net" - "net/http" - "net/http/httptest" - "os" - "reflect" - "runtime" - "sync" - "testing" - "time" - - "github.com/davecgh/go-spew/spew" - "github.com/ava-labs/go-ethereum/log" -) - -func TestClientRequest(t *testing.T) { - server := newTestServer() - defer server.Stop() - client := DialInProc(server) - defer client.Close() - - var resp Result - if err := client.Call(&resp, "test_echo", "hello", 10, &Args{"world"}); err != nil { - t.Fatal(err) - } - if !reflect.DeepEqual(resp, Result{"hello", 10, &Args{"world"}}) { - t.Errorf("incorrect result %#v", resp) - } -} - -func TestClientBatchRequest(t *testing.T) { - server := newTestServer() - defer server.Stop() - client := DialInProc(server) - defer client.Close() - - batch := []BatchElem{ - { - Method: "test_echo", - Args: []interface{}{"hello", 10, &Args{"world"}}, - Result: new(Result), - }, - { - Method: "test_echo", - Args: []interface{}{"hello2", 11, &Args{"world"}}, - Result: new(Result), - }, - { - Method: "no_such_method", - Args: []interface{}{1, 2, 3}, - Result: new(int), - }, - } - if err := client.BatchCall(batch); err != nil { - t.Fatal(err) - } - wantResult := []BatchElem{ - { - Method: "test_echo", - Args: []interface{}{"hello", 10, &Args{"world"}}, - Result: &Result{"hello", 10, &Args{"world"}}, - }, - { - Method: "test_echo", - Args: []interface{}{"hello2", 11, &Args{"world"}}, - Result: &Result{"hello2", 11, &Args{"world"}}, - }, - { - Method: "no_such_method", - Args: []interface{}{1, 2, 3}, - Result: new(int), - Error: &jsonError{Code: -32601, Message: "the method no_such_method does not exist/is not available"}, - }, - } - if !reflect.DeepEqual(batch, wantResult) { - t.Errorf("batch results mismatch:\ngot %swant %s", spew.Sdump(batch), spew.Sdump(wantResult)) - } -} - -func TestClientNotify(t *testing.T) { - server := newTestServer() - defer server.Stop() - client := DialInProc(server) - defer client.Close() - - if err := client.Notify(context.Background(), "test_echo", "hello", 10, &Args{"world"}); err != nil { - t.Fatal(err) - } -} - -// func TestClientCancelInproc(t *testing.T) { testClientCancel("inproc", t) } -func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) } -func TestClientCancelHTTP(t *testing.T) { testClientCancel("http", t) } -func TestClientCancelIPC(t *testing.T) { testClientCancel("ipc", t) } - -// This test checks that requests made through CallContext can be canceled by canceling -// the context. -func testClientCancel(transport string, t *testing.T) { - // These tests take a lot of time, run them all at once. - // You probably want to run with -parallel 1 or comment out - // the call to t.Parallel if you enable the logging. - t.Parallel() - - server := newTestServer() - defer server.Stop() - - // What we want to achieve is that the context gets canceled - // at various stages of request processing. The interesting cases - // are: - // - cancel during dial - // - cancel while performing a HTTP request - // - cancel while waiting for a response - // - // To trigger those, the times are chosen such that connections - // are killed within the deadline for every other call (maxKillTimeout - // is 2x maxCancelTimeout). - // - // Once a connection is dead, there is a fair chance it won't connect - // successfully because the accept is delayed by 1s. - maxContextCancelTimeout := 300 * time.Millisecond - fl := &flakeyListener{ - maxAcceptDelay: 1 * time.Second, - maxKillTimeout: 600 * time.Millisecond, - } - - var client *Client - switch transport { - case "ws", "http": - c, hs := httpTestClient(server, transport, fl) - defer hs.Close() - client = c - case "ipc": - c, l := ipcTestClient(server, fl) - defer l.Close() - client = c - default: - panic("unknown transport: " + transport) - } - - // The actual test starts here. - var ( - wg sync.WaitGroup - nreqs = 10 - ncallers = 6 - ) - caller := func(index int) { - defer wg.Done() - for i := 0; i < nreqs; i++ { - var ( - ctx context.Context - cancel func() - timeout = time.Duration(rand.Int63n(int64(maxContextCancelTimeout))) - ) - if index < ncallers/2 { - // For half of the callers, create a context without deadline - // and cancel it later. - ctx, cancel = context.WithCancel(context.Background()) - time.AfterFunc(timeout, cancel) - } else { - // For the other half, create a context with a deadline instead. This is - // different because the context deadline is used to set the socket write - // deadline. - ctx, cancel = context.WithTimeout(context.Background(), timeout) - } - // Now perform a call with the context. - // The key thing here is that no call will ever complete successfully. - sleepTime := maxContextCancelTimeout + 20*time.Millisecond - err := client.CallContext(ctx, nil, "test_sleep", sleepTime) - if err != nil { - log.Debug(fmt.Sprint("got expected error:", err)) - } else { - t.Errorf("no error for call with %v wait time", timeout) - } - cancel() - } - } - wg.Add(ncallers) - for i := 0; i < ncallers; i++ { - go caller(i) - } - wg.Wait() -} - -func TestClientSubscribeInvalidArg(t *testing.T) { - server := newTestServer() - defer server.Stop() - client := DialInProc(server) - defer client.Close() - - check := func(shouldPanic bool, arg interface{}) { - defer func() { - err := recover() - if shouldPanic && err == nil { - t.Errorf("EthSubscribe should've panicked for %#v", arg) - } - if !shouldPanic && err != nil { - t.Errorf("EthSubscribe shouldn't have panicked for %#v", arg) - buf := make([]byte, 1024*1024) - buf = buf[:runtime.Stack(buf, false)] - t.Error(err) - t.Error(string(buf)) - } - }() - client.EthSubscribe(context.Background(), arg, "foo_bar") - } - check(true, nil) - check(true, 1) - check(true, (chan int)(nil)) - check(true, make(<-chan int)) - check(false, make(chan int)) - check(false, make(chan<- int)) -} - -func TestClientSubscribe(t *testing.T) { - server := newTestServer() - defer server.Stop() - client := DialInProc(server) - defer client.Close() - - nc := make(chan int) - count := 10 - sub, err := client.Subscribe(context.Background(), "nftest", nc, "someSubscription", count, 0) - if err != nil { - t.Fatal("can't subscribe:", err) - } - for i := 0; i < count; i++ { - if val := <-nc; val != i { - t.Fatalf("value mismatch: got %d, want %d", val, i) - } - } - - sub.Unsubscribe() - select { - case v := <-nc: - t.Fatal("received value after unsubscribe:", v) - case err := <-sub.Err(): - if err != nil { - t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err) - } - case <-time.After(1 * time.Second): - t.Fatalf("subscription not closed within 1s after unsubscribe") - } -} - -// In this test, the connection drops while Subscribe is waiting for a response. -func TestClientSubscribeClose(t *testing.T) { - server := newTestServer() - service := ¬ificationTestService{ - gotHangSubscriptionReq: make(chan struct{}), - unblockHangSubscription: make(chan struct{}), - } - if err := server.RegisterName("nftest2", service); err != nil { - t.Fatal(err) - } - - defer server.Stop() - client := DialInProc(server) - defer client.Close() - - var ( - nc = make(chan int) - errc = make(chan error) - sub *ClientSubscription - err error - ) - go func() { - sub, err = client.Subscribe(context.Background(), "nftest2", nc, "hangSubscription", 999) - errc <- err - }() - - <-service.gotHangSubscriptionReq - client.Close() - service.unblockHangSubscription <- struct{}{} - - select { - case err := <-errc: - if err == nil { - t.Errorf("Subscribe returned nil error after Close") - } - if sub != nil { - t.Error("Subscribe returned non-nil subscription after Close") - } - case <-time.After(1 * time.Second): - t.Fatalf("Subscribe did not return within 1s after Close") - } -} - -// This test reproduces https://github.com/ethereum/go-ethereum/issues/17837 where the -// client hangs during shutdown when Unsubscribe races with Client.Close. -func TestClientCloseUnsubscribeRace(t *testing.T) { - server := newTestServer() - defer server.Stop() - - for i := 0; i < 20; i++ { - client := DialInProc(server) - nc := make(chan int) - sub, err := client.Subscribe(context.Background(), "nftest", nc, "someSubscription", 3, 1) - if err != nil { - t.Fatal(err) - } - go client.Close() - go sub.Unsubscribe() - select { - case <-sub.Err(): - case <-time.After(5 * time.Second): - t.Fatal("subscription not closed within timeout") - } - } -} - -// This test checks that Client doesn't lock up when a single subscriber -// doesn't read subscription events. -func TestClientNotificationStorm(t *testing.T) { - server := newTestServer() - defer server.Stop() - - doTest := func(count int, wantError bool) { - client := DialInProc(server) - defer client.Close() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - // Subscribe on the server. It will start sending many notifications - // very quickly. - nc := make(chan int) - sub, err := client.Subscribe(ctx, "nftest", nc, "someSubscription", count, 0) - if err != nil { - t.Fatal("can't subscribe:", err) - } - defer sub.Unsubscribe() - - // Process each notification, try to run a call in between each of them. - for i := 0; i < count; i++ { - select { - case val := <-nc: - if val != i { - t.Fatalf("(%d/%d) unexpected value %d", i, count, val) - } - case err := <-sub.Err(): - if wantError && err != ErrSubscriptionQueueOverflow { - t.Fatalf("(%d/%d) got error %q, want %q", i, count, err, ErrSubscriptionQueueOverflow) - } else if !wantError { - t.Fatalf("(%d/%d) got unexpected error %q", i, count, err) - } - return - } - var r int - err := client.CallContext(ctx, &r, "nftest_echo", i) - if err != nil { - if !wantError { - t.Fatalf("(%d/%d) call error: %v", i, count, err) - } - return - } - } - if wantError { - t.Fatalf("didn't get expected error") - } - } - - doTest(8000, false) - doTest(21000, true) -} - -func TestClientHTTP(t *testing.T) { - server := newTestServer() - defer server.Stop() - - client, hs := httpTestClient(server, "http", nil) - defer hs.Close() - defer client.Close() - - // Launch concurrent requests. - var ( - results = make([]Result, 100) - errc = make(chan error) - wantResult = Result{"a", 1, new(Args)} - ) - defer client.Close() - for i := range results { - i := i - go func() { - errc <- client.Call(&results[i], "test_echo", - wantResult.String, wantResult.Int, wantResult.Args) - }() - } - - // Wait for all of them to complete. - timeout := time.NewTimer(5 * time.Second) - defer timeout.Stop() - for i := range results { - select { - case err := <-errc: - if err != nil { - t.Fatal(err) - } - case <-timeout.C: - t.Fatalf("timeout (got %d/%d) results)", i+1, len(results)) - } - } - - // Check results. - for i := range results { - if !reflect.DeepEqual(results[i], wantResult) { - t.Errorf("result %d mismatch: got %#v, want %#v", i, results[i], wantResult) - } - } -} - -func TestClientReconnect(t *testing.T) { - startServer := func(addr string) (*Server, net.Listener) { - srv := newTestServer() - l, err := net.Listen("tcp", addr) - if err != nil { - t.Fatal("can't listen:", err) - } - go http.Serve(l, srv.WebsocketHandler([]string{"*"})) - return srv, l - } - - ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second) - defer cancel() - - // Start a server and corresponding client. - s1, l1 := startServer("127.0.0.1:0") - client, err := DialContext(ctx, "ws://"+l1.Addr().String()) - if err != nil { - t.Fatal("can't dial", err) - } - - // Perform a call. This should work because the server is up. - var resp Result - if err := client.CallContext(ctx, &resp, "test_echo", "", 1, nil); err != nil { - t.Fatal(err) - } - - // Shut down the server and allow for some cool down time so we can listen on the same - // address again. - l1.Close() - s1.Stop() - time.Sleep(2 * time.Second) - - // Try calling again. It shouldn't work. - if err := client.CallContext(ctx, &resp, "test_echo", "", 2, nil); err == nil { - t.Error("successful call while the server is down") - t.Logf("resp: %#v", resp) - } - - // Start it up again and call again. The connection should be reestablished. - // We spawn multiple calls here to check whether this hangs somehow. - s2, l2 := startServer(l1.Addr().String()) - defer l2.Close() - defer s2.Stop() - - start := make(chan struct{}) - errors := make(chan error, 20) - for i := 0; i < cap(errors); i++ { - go func() { - <-start - var resp Result - errors <- client.CallContext(ctx, &resp, "test_echo", "", 3, nil) - }() - } - close(start) - errcount := 0 - for i := 0; i < cap(errors); i++ { - if err = <-errors; err != nil { - errcount++ - } - } - t.Logf("%d errors, last error: %v", errcount, err) - if errcount > 1 { - t.Errorf("expected one error after disconnect, got %d", errcount) - } -} - -func httpTestClient(srv *Server, transport string, fl *flakeyListener) (*Client, *httptest.Server) { - // Create the HTTP server. - var hs *httptest.Server - switch transport { - case "ws": - hs = httptest.NewUnstartedServer(srv.WebsocketHandler([]string{"*"})) - case "http": - hs = httptest.NewUnstartedServer(srv) - default: - panic("unknown HTTP transport: " + transport) - } - // Wrap the listener if required. - if fl != nil { - fl.Listener = hs.Listener - hs.Listener = fl - } - // Connect the client. - hs.Start() - client, err := Dial(transport + "://" + hs.Listener.Addr().String()) - if err != nil { - panic(err) - } - return client, hs -} - -func ipcTestClient(srv *Server, fl *flakeyListener) (*Client, net.Listener) { - // Listen on a random endpoint. - endpoint := fmt.Sprintf("go-ethereum-test-ipc-%d-%d", os.Getpid(), rand.Int63()) - if runtime.GOOS == "windows" { - endpoint = `\\.\pipe\` + endpoint - } else { - endpoint = os.TempDir() + "/" + endpoint - } - l, err := ipcListen(endpoint) - if err != nil { - panic(err) - } - // Connect the listener to the server. - if fl != nil { - fl.Listener = l - l = fl - } - go srv.ServeListener(l) - // Connect the client. - client, err := Dial(endpoint) - if err != nil { - panic(err) - } - return client, l -} - -// flakeyListener kills accepted connections after a random timeout. -type flakeyListener struct { - net.Listener - maxKillTimeout time.Duration - maxAcceptDelay time.Duration -} - -func (l *flakeyListener) Accept() (net.Conn, error) { - delay := time.Duration(rand.Int63n(int64(l.maxAcceptDelay))) - time.Sleep(delay) - - c, err := l.Listener.Accept() - if err == nil { - timeout := time.Duration(rand.Int63n(int64(l.maxKillTimeout))) - time.AfterFunc(timeout, func() { - log.Debug(fmt.Sprintf("killing conn %v after %v", c.LocalAddr(), timeout)) - c.Close() - }) - } - return c, err -} diff --git a/rpc/http_test.go b/rpc/http_test.go deleted file mode 100644 index b3f694d..0000000 --- a/rpc/http_test.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package rpc - -import ( - "net/http" - "net/http/httptest" - "strings" - "testing" -) - -func TestHTTPErrorResponseWithDelete(t *testing.T) { - testHTTPErrorResponse(t, http.MethodDelete, contentType, "", http.StatusMethodNotAllowed) -} - -func TestHTTPErrorResponseWithPut(t *testing.T) { - testHTTPErrorResponse(t, http.MethodPut, contentType, "", http.StatusMethodNotAllowed) -} - -func TestHTTPErrorResponseWithMaxContentLength(t *testing.T) { - body := make([]rune, maxRequestContentLength+1) - testHTTPErrorResponse(t, - http.MethodPost, contentType, string(body), http.StatusRequestEntityTooLarge) -} - -func TestHTTPErrorResponseWithEmptyContentType(t *testing.T) { - testHTTPErrorResponse(t, http.MethodPost, "", "", http.StatusUnsupportedMediaType) -} - -func TestHTTPErrorResponseWithValidRequest(t *testing.T) { - testHTTPErrorResponse(t, http.MethodPost, contentType, "", 0) -} - -func testHTTPErrorResponse(t *testing.T, method, contentType, body string, expected int) { - request := httptest.NewRequest(method, "http://url.com", strings.NewReader(body)) - request.Header.Set("content-type", contentType) - if code, _ := validateRequest(request); code != expected { - t.Fatalf("response code should be %d not %d", expected, code) - } -} diff --git a/rpc/server_test.go b/rpc/server_test.go deleted file mode 100644 index 3909954..0000000 --- a/rpc/server_test.go +++ /dev/null @@ -1,152 +0,0 @@ -// Copyright 2015 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package rpc - -import ( - "bufio" - "bytes" - "io" - "io/ioutil" - "net" - "path/filepath" - "strings" - "testing" - "time" -) - -func TestServerRegisterName(t *testing.T) { - server := NewServer() - service := new(testService) - - if err := server.RegisterName("test", service); err != nil { - t.Fatalf("%v", err) - } - - if len(server.services.services) != 2 { - t.Fatalf("Expected 2 service entries, got %d", len(server.services.services)) - } - - svc, ok := server.services.services["test"] - if !ok { - t.Fatalf("Expected service calc to be registered") - } - - wantCallbacks := 7 - if len(svc.callbacks) != wantCallbacks { - t.Errorf("Expected %d callbacks for service 'service', got %d", wantCallbacks, len(svc.callbacks)) - } -} - -func TestServer(t *testing.T) { - files, err := ioutil.ReadDir("testdata") - if err != nil { - t.Fatal("where'd my testdata go?") - } - for _, f := range files { - if f.IsDir() || strings.HasPrefix(f.Name(), ".") { - continue - } - path := filepath.Join("testdata", f.Name()) - name := strings.TrimSuffix(f.Name(), filepath.Ext(f.Name())) - t.Run(name, func(t *testing.T) { - runTestScript(t, path) - }) - } -} - -func runTestScript(t *testing.T, file string) { - server := newTestServer() - content, err := ioutil.ReadFile(file) - if err != nil { - t.Fatal(err) - } - - clientConn, serverConn := net.Pipe() - defer clientConn.Close() - go server.ServeCodec(NewJSONCodec(serverConn), OptionMethodInvocation|OptionSubscriptions) - readbuf := bufio.NewReader(clientConn) - for _, line := range strings.Split(string(content), "\n") { - line = strings.TrimSpace(line) - switch { - case len(line) == 0 || strings.HasPrefix(line, "//"): - // skip comments, blank lines - continue - case strings.HasPrefix(line, "--> "): - t.Log(line) - // write to connection - clientConn.SetWriteDeadline(time.Now().Add(5 * time.Second)) - if _, err := io.WriteString(clientConn, line[4:]+"\n"); err != nil { - t.Fatalf("write error: %v", err) - } - case strings.HasPrefix(line, "<-- "): - t.Log(line) - want := line[4:] - // read line from connection and compare text - clientConn.SetReadDeadline(time.Now().Add(5 * time.Second)) - sent, err := readbuf.ReadString('\n') - if err != nil { - t.Fatalf("read error: %v", err) - } - sent = strings.TrimRight(sent, "\r\n") - if sent != want { - t.Errorf("wrong line from server\ngot: %s\nwant: %s", sent, want) - } - default: - panic("invalid line in test script: " + line) - } - } -} - -// This test checks that responses are delivered for very short-lived connections that -// only carry a single request. -func TestServerShortLivedConn(t *testing.T) { - server := newTestServer() - defer server.Stop() - - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatal("can't listen:", err) - } - defer listener.Close() - go server.ServeListener(listener) - - var ( - request = `{"jsonrpc":"2.0","id":1,"method":"rpc_modules"}` + "\n" - wantResp = `{"jsonrpc":"2.0","id":1,"result":{"nftest":"1.0","rpc":"1.0","test":"1.0"}}` + "\n" - deadline = time.Now().Add(10 * time.Second) - ) - for i := 0; i < 20; i++ { - conn, err := net.Dial("tcp", listener.Addr().String()) - if err != nil { - t.Fatal("can't dial:", err) - } - defer conn.Close() - conn.SetDeadline(deadline) - // Write the request, then half-close the connection so the server stops reading. - conn.Write([]byte(request)) - conn.(*net.TCPConn).CloseWrite() - // Now try to get the response. - buf := make([]byte, 2000) - n, err := conn.Read(buf) - if err != nil { - t.Fatal("read error:", err) - } - if !bytes.Equal(buf[:n], []byte(wantResp)) { - t.Fatalf("wrong response: %s", buf[:n]) - } - } -} diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go deleted file mode 100644 index eba1924..0000000 --- a/rpc/subscription_test.go +++ /dev/null @@ -1,206 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package rpc - -import ( - "encoding/json" - "fmt" - "net" - "strings" - "testing" - "time" -) - -func TestNewID(t *testing.T) { - hexchars := "0123456789ABCDEFabcdef" - for i := 0; i < 100; i++ { - id := string(NewID()) - if !strings.HasPrefix(id, "0x") { - t.Fatalf("invalid ID prefix, want '0x...', got %s", id) - } - - id = id[2:] - if len(id) == 0 || len(id) > 32 { - t.Fatalf("invalid ID length, want len(id) > 0 && len(id) <= 32), got %d", len(id)) - } - - for i := 0; i < len(id); i++ { - if strings.IndexByte(hexchars, id[i]) == -1 { - t.Fatalf("unexpected byte, want any valid hex char, got %c", id[i]) - } - } - } -} - -func TestSubscriptions(t *testing.T) { - var ( - namespaces = []string{"eth", "shh", "bzz"} - service = ¬ificationTestService{} - subCount = len(namespaces) - notificationCount = 3 - - server = NewServer() - clientConn, serverConn = net.Pipe() - out = json.NewEncoder(clientConn) - in = json.NewDecoder(clientConn) - successes = make(chan subConfirmation) - notifications = make(chan subscriptionResult) - errors = make(chan error, subCount*notificationCount+1) - ) - - // setup and start server - for _, namespace := range namespaces { - if err := server.RegisterName(namespace, service); err != nil { - t.Fatalf("unable to register test service %v", err) - } - } - go server.ServeCodec(NewJSONCodec(serverConn), OptionMethodInvocation|OptionSubscriptions) - defer server.Stop() - - // wait for message and write them to the given channels - go waitForMessages(in, successes, notifications, errors) - - // create subscriptions one by one - for i, namespace := range namespaces { - request := map[string]interface{}{ - "id": i, - "method": fmt.Sprintf("%s_subscribe", namespace), - "version": "2.0", - "params": []interface{}{"someSubscription", notificationCount, i}, - } - if err := out.Encode(&request); err != nil { - t.Fatalf("Could not create subscription: %v", err) - } - } - - timeout := time.After(30 * time.Second) - subids := make(map[string]string, subCount) - count := make(map[string]int, subCount) - allReceived := func() bool { - done := len(count) == subCount - for _, c := range count { - if c < notificationCount { - done = false - } - } - return done - } - for !allReceived() { - select { - case confirmation := <-successes: // subscription created - subids[namespaces[confirmation.reqid]] = string(confirmation.subid) - case notification := <-notifications: - count[notification.ID]++ - case err := <-errors: - t.Fatal(err) - case <-timeout: - for _, namespace := range namespaces { - subid, found := subids[namespace] - if !found { - t.Errorf("subscription for %q not created", namespace) - continue - } - if count, found := count[subid]; !found || count < notificationCount { - t.Errorf("didn't receive all notifications (%d<%d) in time for namespace %q", count, notificationCount, namespace) - } - } - t.Fatal("timed out") - } - } -} - -// This test checks that unsubscribing works. -func TestServerUnsubscribe(t *testing.T) { - // Start the server. - server := newTestServer() - service := ¬ificationTestService{unsubscribed: make(chan string)} - server.RegisterName("nftest2", service) - p1, p2 := net.Pipe() - go server.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions) - - p2.SetDeadline(time.Now().Add(10 * time.Second)) - - // Subscribe. - p2.Write([]byte(`{"jsonrpc":"2.0","id":1,"method":"nftest2_subscribe","params":["someSubscription",0,10]}`)) - - // Handle received messages. - resps := make(chan subConfirmation) - notifications := make(chan subscriptionResult) - errors := make(chan error) - go waitForMessages(json.NewDecoder(p2), resps, notifications, errors) - - // Receive the subscription ID. - var sub subConfirmation - select { - case sub = <-resps: - case err := <-errors: - t.Fatal(err) - } - - // Unsubscribe and check that it is handled on the server side. - p2.Write([]byte(`{"jsonrpc":"2.0","method":"nftest2_unsubscribe","params":["` + sub.subid + `"]}`)) - for { - select { - case id := <-service.unsubscribed: - if id != string(sub.subid) { - t.Errorf("wrong subscription ID unsubscribed") - } - return - case err := <-errors: - t.Fatal(err) - case <-notifications: - // drop notifications - } - } -} - -type subConfirmation struct { - reqid int - subid ID -} - -func waitForMessages(in *json.Decoder, successes chan subConfirmation, notifications chan subscriptionResult, errors chan error) { - for { - var msg jsonrpcMessage - if err := in.Decode(&msg); err != nil { - errors <- fmt.Errorf("decode error: %v", err) - return - } - switch { - case msg.isNotification(): - var res subscriptionResult - if err := json.Unmarshal(msg.Params, &res); err != nil { - errors <- fmt.Errorf("invalid subscription result: %v", err) - } else { - notifications <- res - } - case msg.isResponse(): - var c subConfirmation - if msg.Error != nil { - errors <- msg.Error - } else if err := json.Unmarshal(msg.Result, &c.subid); err != nil { - errors <- fmt.Errorf("invalid response: %v", err) - } else { - json.Unmarshal(msg.ID, &c.reqid) - successes <- c - } - default: - errors <- fmt.Errorf("unrecognized message: %v", msg) - return - } - } -} diff --git a/rpc/testservice_test.go b/rpc/testservice_test.go deleted file mode 100644 index 98871b5..0000000 --- a/rpc/testservice_test.go +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package rpc - -import ( - "context" - "encoding/binary" - "errors" - "sync" - "time" -) - -func newTestServer() *Server { - server := NewServer() - server.idgen = sequentialIDGenerator() - if err := server.RegisterName("test", new(testService)); err != nil { - panic(err) - } - if err := server.RegisterName("nftest", new(notificationTestService)); err != nil { - panic(err) - } - return server -} - -func sequentialIDGenerator() func() ID { - var ( - mu sync.Mutex - counter uint64 - ) - return func() ID { - mu.Lock() - defer mu.Unlock() - counter++ - id := make([]byte, 8) - binary.BigEndian.PutUint64(id, counter) - return encodeID(id) - } -} - -type testService struct{} - -type Args struct { - S string -} - -type Result struct { - String string - Int int - Args *Args -} - -func (s *testService) NoArgsRets() {} - -func (s *testService) Echo(str string, i int, args *Args) Result { - return Result{str, i, args} -} - -func (s *testService) EchoWithCtx(ctx context.Context, str string, i int, args *Args) Result { - return Result{str, i, args} -} - -func (s *testService) Sleep(ctx context.Context, duration time.Duration) { - time.Sleep(duration) -} - -func (s *testService) Rets() (string, error) { - return "", nil -} - -func (s *testService) InvalidRets1() (error, string) { - return nil, "" -} - -func (s *testService) InvalidRets2() (string, string) { - return "", "" -} - -func (s *testService) InvalidRets3() (string, string, error) { - return "", "", nil -} - -func (s *testService) CallMeBack(ctx context.Context, method string, args []interface{}) (interface{}, error) { - c, ok := ClientFromContext(ctx) - if !ok { - return nil, errors.New("no client") - } - var result interface{} - err := c.Call(&result, method, args...) - return result, err -} - -func (s *testService) CallMeBackLater(ctx context.Context, method string, args []interface{}) error { - c, ok := ClientFromContext(ctx) - if !ok { - return errors.New("no client") - } - go func() { - <-ctx.Done() - var result interface{} - c.Call(&result, method, args...) - }() - return nil -} - -func (s *testService) Subscription(ctx context.Context) (*Subscription, error) { - return nil, nil -} - -type notificationTestService struct { - unsubscribed chan string - gotHangSubscriptionReq chan struct{} - unblockHangSubscription chan struct{} -} - -func (s *notificationTestService) Echo(i int) int { - return i -} - -func (s *notificationTestService) Unsubscribe(subid string) { - if s.unsubscribed != nil { - s.unsubscribed <- subid - } -} - -func (s *notificationTestService) SomeSubscription(ctx context.Context, n, val int) (*Subscription, error) { - notifier, supported := NotifierFromContext(ctx) - if !supported { - return nil, ErrNotificationsUnsupported - } - - // By explicitly creating an subscription we make sure that the subscription id is send - // back to the client before the first subscription.Notify is called. Otherwise the - // events might be send before the response for the *_subscribe method. - subscription := notifier.CreateSubscription() - go func() { - for i := 0; i < n; i++ { - if err := notifier.Notify(subscription.ID, val+i); err != nil { - return - } - } - select { - case <-notifier.Closed(): - case <-subscription.Err(): - } - if s.unsubscribed != nil { - s.unsubscribed <- string(subscription.ID) - } - }() - return subscription, nil -} - -// HangSubscription blocks on s.unblockHangSubscription before sending anything. -func (s *notificationTestService) HangSubscription(ctx context.Context, val int) (*Subscription, error) { - notifier, supported := NotifierFromContext(ctx) - if !supported { - return nil, ErrNotificationsUnsupported - } - s.gotHangSubscriptionReq <- struct{}{} - <-s.unblockHangSubscription - subscription := notifier.CreateSubscription() - - go func() { - notifier.Notify(subscription.ID, val) - }() - return subscription, nil -} diff --git a/rpc/types_test.go b/rpc/types_test.go deleted file mode 100644 index 0465849..0000000 --- a/rpc/types_test.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2015 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package rpc - -import ( - "encoding/json" - "testing" - - "github.com/ava-labs/go-ethereum/common/math" -) - -func TestBlockNumberJSONUnmarshal(t *testing.T) { - tests := []struct { - input string - mustFail bool - expected BlockNumber - }{ - 0: {`"0x"`, true, BlockNumber(0)}, - 1: {`"0x0"`, false, BlockNumber(0)}, - 2: {`"0X1"`, false, BlockNumber(1)}, - 3: {`"0x00"`, true, BlockNumber(0)}, - 4: {`"0x01"`, true, BlockNumber(0)}, - 5: {`"0x1"`, false, BlockNumber(1)}, - 6: {`"0x12"`, false, BlockNumber(18)}, - 7: {`"0x7fffffffffffffff"`, false, BlockNumber(math.MaxInt64)}, - 8: {`"0x8000000000000000"`, true, BlockNumber(0)}, - 9: {"0", true, BlockNumber(0)}, - 10: {`"ff"`, true, BlockNumber(0)}, - 11: {`"pending"`, false, PendingBlockNumber}, - 12: {`"latest"`, false, LatestBlockNumber}, - 13: {`"earliest"`, false, EarliestBlockNumber}, - 14: {`someString`, true, BlockNumber(0)}, - 15: {`""`, true, BlockNumber(0)}, - 16: {``, true, BlockNumber(0)}, - } - - for i, test := range tests { - var num BlockNumber - err := json.Unmarshal([]byte(test.input), &num) - if test.mustFail && err == nil { - t.Errorf("Test %d should fail", i) - continue - } - if !test.mustFail && err != nil { - t.Errorf("Test %d should pass but got err: %v", i, err) - continue - } - if num != test.expected { - t.Errorf("Test %d got unexpected value, want %d, got %d", i, test.expected, num) - } - } -} diff --git a/rpc/websocket_test.go b/rpc/websocket_test.go deleted file mode 100644 index 9dc1084..0000000 --- a/rpc/websocket_test.go +++ /dev/null @@ -1,259 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package rpc - -import ( - "context" - "net" - "net/http" - "net/http/httptest" - "reflect" - "strings" - "testing" - "time" - - "github.com/gorilla/websocket" -) - -func TestWebsocketClientHeaders(t *testing.T) { - t.Parallel() - - endpoint, header, err := wsClientHeaders("wss://testuser:test-PASS_01@example.com:1234", "https://example.com") - if err != nil { - t.Fatalf("wsGetConfig failed: %s", err) - } - if endpoint != "wss://example.com:1234" { - t.Fatal("User should have been stripped from the URL") - } - if header.Get("authorization") != "Basic dGVzdHVzZXI6dGVzdC1QQVNTXzAx" { - t.Fatal("Basic auth header is incorrect") - } - if header.Get("origin") != "https://example.com" { - t.Fatal("Origin not set") - } -} - -// This test checks that the server rejects connections from disallowed origins. -func TestWebsocketOriginCheck(t *testing.T) { - t.Parallel() - - var ( - srv = newTestServer() - httpsrv = httptest.NewServer(srv.WebsocketHandler([]string{"http://example.com"})) - wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") - ) - defer srv.Stop() - defer httpsrv.Close() - - client, err := DialWebsocket(context.Background(), wsURL, "http://ekzample.com") - if err == nil { - client.Close() - t.Fatal("no error for wrong origin") - } - wantErr := wsHandshakeError{websocket.ErrBadHandshake, "403 Forbidden"} - if !reflect.DeepEqual(err, wantErr) { - t.Fatalf("wrong error for wrong origin: %q", err) - } - - // Connections without origin header should work. - client, err = DialWebsocket(context.Background(), wsURL, "") - if err != nil { - t.Fatal("error for empty origin") - } - client.Close() -} - -// This test checks whether calls exceeding the request size limit are rejected. -func TestWebsocketLargeCall(t *testing.T) { - t.Parallel() - - var ( - srv = newTestServer() - httpsrv = httptest.NewServer(srv.WebsocketHandler([]string{"*"})) - wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") - ) - defer srv.Stop() - defer httpsrv.Close() - - client, err := DialWebsocket(context.Background(), wsURL, "") - if err != nil { - t.Fatalf("can't dial: %v", err) - } - defer client.Close() - - // This call sends slightly less than the limit and should work. - var result Result - arg := strings.Repeat("x", maxRequestContentLength-200) - if err := client.Call(&result, "test_echo", arg, 1); err != nil { - t.Fatalf("valid call didn't work: %v", err) - } - if result.String != arg { - t.Fatal("wrong string echoed") - } - - // This call sends twice the allowed size and shouldn't work. - arg = strings.Repeat("x", maxRequestContentLength*2) - err = client.Call(&result, "test_echo", arg) - if err == nil { - t.Fatal("no error for too large call") - } -} - -// This test checks that client handles WebSocket ping frames correctly. -func TestClientWebsocketPing(t *testing.T) { - t.Parallel() - - var ( - sendPing = make(chan struct{}) - server = wsPingTestServer(t, sendPing) - ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) - ) - defer cancel() - defer server.Shutdown(ctx) - - client, err := DialContext(ctx, "ws://"+server.Addr) - if err != nil { - t.Fatalf("client dial error: %v", err) - } - resultChan := make(chan int) - sub, err := client.EthSubscribe(ctx, resultChan, "foo") - if err != nil { - t.Fatalf("client subscribe error: %v", err) - } - - // Wait for the context's deadline to be reached before proceeding. - // This is important for reproducing https://github.com/ethereum/go-ethereum/issues/19798 - <-ctx.Done() - close(sendPing) - - // Wait for the subscription result. - timeout := time.NewTimer(5 * time.Second) - for { - select { - case err := <-sub.Err(): - t.Error("client subscription error:", err) - case result := <-resultChan: - t.Log("client got result:", result) - return - case <-timeout.C: - t.Error("didn't get any result within the test timeout") - return - } - } -} - -// wsPingTestServer runs a WebSocket server which accepts a single subscription request. -// When a value arrives on sendPing, the server sends a ping frame, waits for a matching -// pong and finally delivers a single subscription result. -func wsPingTestServer(t *testing.T, sendPing <-chan struct{}) *http.Server { - var srv http.Server - shutdown := make(chan struct{}) - srv.RegisterOnShutdown(func() { - close(shutdown) - }) - srv.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Upgrade to WebSocket. - upgrader := websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { return true }, - } - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - t.Errorf("server WS upgrade error: %v", err) - return - } - defer conn.Close() - - // Handle the connection. - wsPingTestHandler(t, conn, shutdown, sendPing) - }) - - // Start the server. - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatal("can't listen:", err) - } - srv.Addr = listener.Addr().String() - go srv.Serve(listener) - return &srv -} - -func wsPingTestHandler(t *testing.T, conn *websocket.Conn, shutdown, sendPing <-chan struct{}) { - // Canned responses for the eth_subscribe call in TestClientWebsocketPing. - const ( - subResp = `{"jsonrpc":"2.0","id":1,"result":"0x00"}` - subNotify = `{"jsonrpc":"2.0","method":"eth_subscription","params":{"subscription":"0x00","result":1}}` - ) - - // Handle subscribe request. - if _, _, err := conn.ReadMessage(); err != nil { - t.Errorf("server read error: %v", err) - return - } - if err := conn.WriteMessage(websocket.TextMessage, []byte(subResp)); err != nil { - t.Errorf("server write error: %v", err) - return - } - - // Read from the connection to process control messages. - var pongCh = make(chan string) - conn.SetPongHandler(func(d string) error { - t.Logf("server got pong: %q", d) - pongCh <- d - return nil - }) - go func() { - for { - typ, msg, err := conn.ReadMessage() - if err != nil { - return - } - t.Logf("server got message (%d): %q", typ, msg) - } - }() - - // Write messages. - var ( - sendResponse <-chan time.Time - wantPong string - ) - for { - select { - case _, open := <-sendPing: - if !open { - sendPing = nil - } - t.Logf("server sending ping") - conn.WriteMessage(websocket.PingMessage, []byte("ping")) - wantPong = "ping" - case data := <-pongCh: - if wantPong == "" { - t.Errorf("unexpected pong") - } else if data != wantPong { - t.Errorf("got pong with wrong data %q", data) - } - wantPong = "" - sendResponse = time.NewTimer(200 * time.Millisecond).C - case <-sendResponse: - t.Logf("server sending response") - conn.WriteMessage(websocket.TextMessage, []byte(subNotify)) - sendResponse = nil - case <-shutdown: - conn.Close() - return - } - } -} -- cgit v1.2.3-70-g09d2