From d235e2c6a5788ec4a6cff15a16f56b38a3876a0d Mon Sep 17 00:00:00 2001 From: Determinant Date: Sun, 28 Jun 2020 14:47:41 -0400 Subject: ... --- rpc/client.go | 625 ++++++++++++++++++++++++++++++++++++++++++++ rpc/client_example_test.go | 88 +++++++ rpc/client_test.go | 569 ++++++++++++++++++++++++++++++++++++++++ rpc/constants_unix_nocgo.go | 25 ++ rpc/doc.go | 118 +++++++++ rpc/errors.go | 65 +++++ rpc/gzip.go | 66 +++++ rpc/handler.go | 397 ++++++++++++++++++++++++++++ rpc/http.go | 359 +++++++++++++++++++++++++ rpc/http_test.go | 54 ++++ rpc/ipc_js.go | 37 +++ rpc/json.go | 335 ++++++++++++++++++++++++ rpc/server.go | 147 +++++++++++ rpc/server_test.go | 152 +++++++++++ rpc/service.go | 285 ++++++++++++++++++++ rpc/subscription.go | 327 +++++++++++++++++++++++ rpc/subscription_test.go | 206 +++++++++++++++ rpc/testservice_test.go | 180 +++++++++++++ rpc/types.go | 34 +++ rpc/types_test.go | 66 +++++ rpc/websocket.go | 175 +++++++++++++ rpc/websocket_test.go | 259 ++++++++++++++++++ 22 files changed, 4569 insertions(+) create mode 100644 rpc/client.go create mode 100644 rpc/client_example_test.go create mode 100644 rpc/client_test.go create mode 100644 rpc/constants_unix_nocgo.go create mode 100644 rpc/doc.go create mode 100644 rpc/errors.go create mode 100644 rpc/gzip.go create mode 100644 rpc/handler.go create mode 100644 rpc/http.go create mode 100644 rpc/http_test.go create mode 100644 rpc/ipc_js.go create mode 100644 rpc/json.go create mode 100644 rpc/server.go create mode 100644 rpc/server_test.go create mode 100644 rpc/service.go create mode 100644 rpc/subscription.go create mode 100644 rpc/subscription_test.go create mode 100644 rpc/testservice_test.go create mode 100644 rpc/types_test.go create mode 100644 rpc/websocket.go create mode 100644 rpc/websocket_test.go (limited to 'rpc') diff --git a/rpc/client.go b/rpc/client.go new file mode 100644 index 0000000..1c7058b --- /dev/null +++ b/rpc/client.go @@ -0,0 +1,625 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "net/url" + "reflect" + "strconv" + "sync/atomic" + "time" + + "github.com/ava-labs/go-ethereum/log" +) + +var ( + ErrClientQuit = errors.New("client is closed") + ErrNoResult = errors.New("no result in JSON-RPC response") + ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow") + errClientReconnected = errors.New("client reconnected") + errDead = errors.New("connection lost") +) + +const ( + // Timeouts + defaultDialTimeout = 10 * time.Second // used if context has no deadline + subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls +) + +const ( + // Subscriptions are removed when the subscriber cannot keep up. + // + // This can be worked around by supplying a channel with sufficiently sized buffer, + // but this can be inconvenient and hard to explain in the docs. Another issue with + // buffered channels is that the buffer is static even though it might not be needed + // most of the time. + // + // The approach taken here is to maintain a per-subscription linked list buffer + // shrinks on demand. If the buffer reaches the size below, the subscription is + // dropped. + maxClientSubscriptionBuffer = 20000 +) + +// BatchElem is an element in a batch request. +type BatchElem struct { + Method string + Args []interface{} + // The result is unmarshaled into this field. Result must be set to a + // non-nil pointer value of the desired type, otherwise the response will be + // discarded. + Result interface{} + // Error is set if the server returns an error for this request, or if + // unmarshaling into Result fails. It is not set for I/O errors. + Error error +} + +// Client represents a connection to an RPC server. +type Client struct { + idgen func() ID // for subscriptions + isHTTP bool + services *serviceRegistry + + idCounter uint32 + + // This function, if non-nil, is called when the connection is lost. + reconnectFunc reconnectFunc + + // writeConn is used for writing to the connection on the caller's goroutine. It should + // only be accessed outside of dispatch, with the write lock held. The write lock is + // taken by sending on requestOp and released by sending on sendDone. + writeConn jsonWriter + + // for dispatch + close chan struct{} + closing chan struct{} // closed when client is quitting + didClose chan struct{} // closed when client quits + reconnected chan ServerCodec // where write/reconnect sends the new connection + readOp chan readOp // read messages + readErr chan error // errors from read + reqInit chan *requestOp // register response IDs, takes write lock + reqSent chan error // signals write completion, releases write lock + reqTimeout chan *requestOp // removes response IDs when call timeout expires +} + +type reconnectFunc func(ctx context.Context) (ServerCodec, error) + +type clientContextKey struct{} + +type clientConn struct { + codec ServerCodec + handler *handler +} + +func (c *Client) newClientConn(conn ServerCodec) *clientConn { + ctx := context.WithValue(context.Background(), clientContextKey{}, c) + handler := newHandler(ctx, conn, c.idgen, c.services) + return &clientConn{conn, handler} +} + +func (cc *clientConn) close(err error, inflightReq *requestOp) { + cc.handler.close(err, inflightReq) + cc.codec.Close() +} + +type readOp struct { + msgs []*jsonrpcMessage + batch bool +} + +type requestOp struct { + ids []json.RawMessage + err error + resp chan *jsonrpcMessage // receives up to len(ids) responses + sub *ClientSubscription // only set for EthSubscribe requests +} + +func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, error) { + select { + case <-ctx.Done(): + // Send the timeout to dispatch so it can remove the request IDs. + if !c.isHTTP { + select { + case c.reqTimeout <- op: + case <-c.closing: + } + } + return nil, ctx.Err() + case resp := <-op.resp: + return resp, op.err + } +} + +// Dial creates a new client for the given URL. +// +// The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a +// file name with no URL scheme, a local socket connection is established using UNIX +// domain sockets on supported platforms and named pipes on Windows. If you want to +// configure transport options, use DialHTTP, DialWebsocket or DialIPC instead. +// +// For websocket connections, the origin is set to the local host name. +// +// The client reconnects automatically if the connection is lost. +func Dial(rawurl string) (*Client, error) { + return DialContext(context.Background(), rawurl) +} + +// DialContext creates a new RPC client, just like Dial. +// +// The context is used to cancel or time out the initial connection establishment. It does +// not affect subsequent interactions with the client. +func DialContext(ctx context.Context, rawurl string) (*Client, error) { + u, err := url.Parse(rawurl) + if err != nil { + return nil, err + } + switch u.Scheme { + case "http", "https": + return DialHTTP(rawurl) + case "ws", "wss": + return DialWebsocket(ctx, rawurl, "") + //case "stdio": + // return DialStdIO(ctx) + //case "": + // return DialIPC(ctx, rawurl) + default: + return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme) + } +} + +// Client retrieves the client from the context, if any. This can be used to perform +// 'reverse calls' in a handler method. +func ClientFromContext(ctx context.Context) (*Client, bool) { + client, ok := ctx.Value(clientContextKey{}).(*Client) + return client, ok +} + +func newClient(initctx context.Context, connect reconnectFunc) (*Client, error) { + conn, err := connect(initctx) + if err != nil { + return nil, err + } + c := initClient(conn, randomIDGenerator(), new(serviceRegistry)) + c.reconnectFunc = connect + return c, nil +} + +func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client { + _, isHTTP := conn.(*httpConn) + c := &Client{ + idgen: idgen, + isHTTP: isHTTP, + services: services, + writeConn: conn, + close: make(chan struct{}), + closing: make(chan struct{}), + didClose: make(chan struct{}), + reconnected: make(chan ServerCodec), + readOp: make(chan readOp), + readErr: make(chan error), + reqInit: make(chan *requestOp), + reqSent: make(chan error, 1), + reqTimeout: make(chan *requestOp), + } + if !isHTTP { + go c.dispatch(conn) + } + return c +} + +// RegisterName creates a service for the given receiver type under the given name. When no +// methods on the given receiver match the criteria to be either a RPC method or a +// subscription an error is returned. Otherwise a new service is created and added to the +// service collection this client provides to the server. +func (c *Client) RegisterName(name string, receiver interface{}) error { + return c.services.registerName(name, receiver) +} + +func (c *Client) nextID() json.RawMessage { + id := atomic.AddUint32(&c.idCounter, 1) + return strconv.AppendUint(nil, uint64(id), 10) +} + +// SupportedModules calls the rpc_modules method, retrieving the list of +// APIs that are available on the server. +func (c *Client) SupportedModules() (map[string]string, error) { + var result map[string]string + ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) + defer cancel() + err := c.CallContext(ctx, &result, "rpc_modules") + return result, err +} + +// Close closes the client, aborting any in-flight requests. +func (c *Client) Close() { + if c.isHTTP { + return + } + select { + case c.close <- struct{}{}: + <-c.didClose + case <-c.didClose: + } +} + +// Call performs a JSON-RPC call with the given arguments and unmarshals into +// result if no error occurred. +// +// The result must be a pointer so that package json can unmarshal into it. You +// can also pass nil, in which case the result is ignored. +func (c *Client) Call(result interface{}, method string, args ...interface{}) error { + ctx := context.Background() + return c.CallContext(ctx, result, method, args...) +} + +// CallContext performs a JSON-RPC call with the given arguments. If the context is +// canceled before the call has successfully returned, CallContext returns immediately. +// +// The result must be a pointer so that package json can unmarshal into it. You +// can also pass nil, in which case the result is ignored. +func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { + msg, err := c.newMessage(method, args...) + if err != nil { + return err + } + op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)} + + if c.isHTTP { + err = c.sendHTTP(ctx, op, msg) + } else { + err = c.send(ctx, op, msg) + } + if err != nil { + return err + } + + // dispatch has accepted the request and will close the channel when it quits. + switch resp, err := op.wait(ctx, c); { + case err != nil: + return err + case resp.Error != nil: + return resp.Error + case len(resp.Result) == 0: + return ErrNoResult + default: + return json.Unmarshal(resp.Result, &result) + } +} + +// BatchCall sends all given requests as a single batch and waits for the server +// to return a response for all of them. +// +// In contrast to Call, BatchCall only returns I/O errors. Any error specific to +// a request is reported through the Error field of the corresponding BatchElem. +// +// Note that batch calls may not be executed atomically on the server side. +func (c *Client) BatchCall(b []BatchElem) error { + ctx := context.Background() + return c.BatchCallContext(ctx, b) +} + +// BatchCall sends all given requests as a single batch and waits for the server +// to return a response for all of them. The wait duration is bounded by the +// context's deadline. +// +// In contrast to CallContext, BatchCallContext only returns errors that have occurred +// while sending the request. Any error specific to a request is reported through the +// Error field of the corresponding BatchElem. +// +// Note that batch calls may not be executed atomically on the server side. +func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { + msgs := make([]*jsonrpcMessage, len(b)) + op := &requestOp{ + ids: make([]json.RawMessage, len(b)), + resp: make(chan *jsonrpcMessage, len(b)), + } + for i, elem := range b { + msg, err := c.newMessage(elem.Method, elem.Args...) + if err != nil { + return err + } + msgs[i] = msg + op.ids[i] = msg.ID + } + + var err error + if c.isHTTP { + err = c.sendBatchHTTP(ctx, op, msgs) + } else { + err = c.send(ctx, op, msgs) + } + + // Wait for all responses to come back. + for n := 0; n < len(b) && err == nil; n++ { + var resp *jsonrpcMessage + resp, err = op.wait(ctx, c) + if err != nil { + break + } + // Find the element corresponding to this response. + // The element is guaranteed to be present because dispatch + // only sends valid IDs to our channel. + var elem *BatchElem + for i := range msgs { + if bytes.Equal(msgs[i].ID, resp.ID) { + elem = &b[i] + break + } + } + if resp.Error != nil { + elem.Error = resp.Error + continue + } + if len(resp.Result) == 0 { + elem.Error = ErrNoResult + continue + } + elem.Error = json.Unmarshal(resp.Result, elem.Result) + } + return err +} + +// Notify sends a notification, i.e. a method call that doesn't expect a response. +func (c *Client) Notify(ctx context.Context, method string, args ...interface{}) error { + op := new(requestOp) + msg, err := c.newMessage(method, args...) + if err != nil { + return err + } + msg.ID = nil + + if c.isHTTP { + return c.sendHTTP(ctx, op, msg) + } else { + return c.send(ctx, op, msg) + } +} + +// EthSubscribe registers a subscripion under the "eth" namespace. +func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { + return c.Subscribe(ctx, "eth", channel, args...) +} + +// ShhSubscribe registers a subscripion under the "shh" namespace. +func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { + return c.Subscribe(ctx, "shh", channel, args...) +} + +// Subscribe calls the "_subscribe" method with the given arguments, +// registering a subscription. Server notifications for the subscription are +// sent to the given channel. The element type of the channel must match the +// expected type of content returned by the subscription. +// +// The context argument cancels the RPC request that sets up the subscription but has no +// effect on the subscription after Subscribe has returned. +// +// Slow subscribers will be dropped eventually. Client buffers up to 20000 notifications +// before considering the subscriber dead. The subscription Err channel will receive +// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure +// that the channel usually has at least one reader to prevent this issue. +func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) { + // Check type of channel first. + chanVal := reflect.ValueOf(channel) + if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 { + panic("first argument to Subscribe must be a writable channel") + } + if chanVal.IsNil() { + panic("channel given to Subscribe must not be nil") + } + if c.isHTTP { + return nil, ErrNotificationsUnsupported + } + + msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...) + if err != nil { + return nil, err + } + op := &requestOp{ + ids: []json.RawMessage{msg.ID}, + resp: make(chan *jsonrpcMessage), + sub: newClientSubscription(c, namespace, chanVal), + } + + // Send the subscription request. + // The arrival and validity of the response is signaled on sub.quit. + if err := c.send(ctx, op, msg); err != nil { + return nil, err + } + if _, err := op.wait(ctx, c); err != nil { + return nil, err + } + return op.sub, nil +} + +func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) { + msg := &jsonrpcMessage{Version: vsn, ID: c.nextID(), Method: method} + if paramsIn != nil { // prevent sending "params":null + var err error + if msg.Params, err = json.Marshal(paramsIn); err != nil { + return nil, err + } + } + return msg, nil +} + +// send registers op with the dispatch loop, then sends msg on the connection. +// if sending fails, op is deregistered. +func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error { + select { + case c.reqInit <- op: + err := c.write(ctx, msg) + c.reqSent <- err + return err + case <-ctx.Done(): + // This can happen if the client is overloaded or unable to keep up with + // subscription notifications. + return ctx.Err() + case <-c.closing: + return ErrClientQuit + } +} + +func (c *Client) write(ctx context.Context, msg interface{}) error { + // The previous write failed. Try to establish a new connection. + if c.writeConn == nil { + if err := c.reconnect(ctx); err != nil { + return err + } + } + err := c.writeConn.Write(ctx, msg) + if err != nil { + c.writeConn = nil + } + return err +} + +func (c *Client) reconnect(ctx context.Context) error { + if c.reconnectFunc == nil { + return errDead + } + + if _, ok := ctx.Deadline(); !ok { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, defaultDialTimeout) + defer cancel() + } + newconn, err := c.reconnectFunc(ctx) + if err != nil { + log.Trace("RPC client reconnect failed", "err", err) + return err + } + select { + case c.reconnected <- newconn: + c.writeConn = newconn + return nil + case <-c.didClose: + newconn.Close() + return ErrClientQuit + } +} + +// dispatch is the main loop of the client. +// It sends read messages to waiting calls to Call and BatchCall +// and subscription notifications to registered subscriptions. +func (c *Client) dispatch(codec ServerCodec) { + var ( + lastOp *requestOp // tracks last send operation + reqInitLock = c.reqInit // nil while the send lock is held + conn = c.newClientConn(codec) + reading = true + ) + defer func() { + close(c.closing) + if reading { + conn.close(ErrClientQuit, nil) + c.drainRead() + } + close(c.didClose) + }() + + // Spawn the initial read loop. + go c.read(codec) + + for { + select { + case <-c.close: + return + + // Read path: + case op := <-c.readOp: + if op.batch { + conn.handler.handleBatch(op.msgs) + } else { + conn.handler.handleMsg(op.msgs[0]) + } + + case err := <-c.readErr: + conn.handler.log.Debug("RPC connection read error", "err", err) + conn.close(err, lastOp) + reading = false + + // Reconnect: + case newcodec := <-c.reconnected: + log.Debug("RPC client reconnected", "reading", reading, "conn", newcodec.RemoteAddr()) + if reading { + // Wait for the previous read loop to exit. This is a rare case which + // happens if this loop isn't notified in time after the connection breaks. + // In those cases the caller will notice first and reconnect. Closing the + // handler terminates all waiting requests (closing op.resp) except for + // lastOp, which will be transferred to the new handler. + conn.close(errClientReconnected, lastOp) + c.drainRead() + } + go c.read(newcodec) + reading = true + conn = c.newClientConn(newcodec) + // Re-register the in-flight request on the new handler + // because that's where it will be sent. + conn.handler.addRequestOp(lastOp) + + // Send path: + case op := <-reqInitLock: + // Stop listening for further requests until the current one has been sent. + reqInitLock = nil + lastOp = op + conn.handler.addRequestOp(op) + + case err := <-c.reqSent: + if err != nil { + // Remove response handlers for the last send. When the read loop + // goes down, it will signal all other current operations. + conn.handler.removeRequestOp(lastOp) + } + // Let the next request in. + reqInitLock = c.reqInit + lastOp = nil + + case op := <-c.reqTimeout: + conn.handler.removeRequestOp(op) + } + } +} + +// drainRead drops read messages until an error occurs. +func (c *Client) drainRead() { + for { + select { + case <-c.readOp: + case <-c.readErr: + return + } + } +} + +// read decodes RPC messages from a codec, feeding them into dispatch. +func (c *Client) read(codec ServerCodec) { + for { + msgs, batch, err := codec.Read() + if _, ok := err.(*json.SyntaxError); ok { + codec.Write(context.Background(), errorMessage(&parseError{err.Error()})) + } + if err != nil { + c.readErr <- err + return + } + c.readOp <- readOp{msgs, batch} + } +} diff --git a/rpc/client_example_test.go b/rpc/client_example_test.go new file mode 100644 index 0000000..149de2c --- /dev/null +++ b/rpc/client_example_test.go @@ -0,0 +1,88 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc_test + +import ( + "context" + "fmt" + "math/big" + "time" + + "github.com/ava-labs/go-ethereum/rpc" +) + +// In this example, our client wishes to track the latest 'block number' +// known to the server. The server supports two methods: +// +// eth_getBlockByNumber("latest", {}) +// returns the latest block object. +// +// eth_subscribe("newBlocks") +// creates a subscription which fires block objects when new blocks arrive. + +type Block struct { + Number *big.Int +} + +func ExampleClientSubscription() { + // Connect the client. + client, _ := rpc.Dial("ws://127.0.0.1:8485") + subch := make(chan Block) + + // Ensure that subch receives the latest block. + go func() { + for i := 0; ; i++ { + if i > 0 { + time.Sleep(2 * time.Second) + } + subscribeBlocks(client, subch) + } + }() + + // Print events from the subscription as they arrive. + for block := range subch { + fmt.Println("latest block:", block.Number) + } +} + +// subscribeBlocks runs in its own goroutine and maintains +// a subscription for new blocks. +func subscribeBlocks(client *rpc.Client, subch chan Block) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Subscribe to new blocks. + sub, err := client.EthSubscribe(ctx, subch, "newHeads") + if err != nil { + fmt.Println("subscribe error:", err) + return + } + + // The connection is established now. + // Update the channel with the current block. + var lastBlock Block + if err := client.CallContext(ctx, &lastBlock, "eth_getBlockByNumber", "latest"); err != nil { + fmt.Println("can't get latest block:", err) + return + } + subch <- lastBlock + + // The subscription will deliver events to the channel. Wait for the + // subscription to end for any reason, then loop around to re-establish + // the connection. + fmt.Println("connection lost: ", <-sub.Err()) +} diff --git a/rpc/client_test.go b/rpc/client_test.go new file mode 100644 index 0000000..79ea32e --- /dev/null +++ b/rpc/client_test.go @@ -0,0 +1,569 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "context" + "fmt" + "math/rand" + "net" + "net/http" + "net/http/httptest" + "os" + "reflect" + "runtime" + "sync" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/ava-labs/go-ethereum/log" +) + +func TestClientRequest(t *testing.T) { + server := newTestServer() + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + var resp Result + if err := client.Call(&resp, "test_echo", "hello", 10, &Args{"world"}); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(resp, Result{"hello", 10, &Args{"world"}}) { + t.Errorf("incorrect result %#v", resp) + } +} + +func TestClientBatchRequest(t *testing.T) { + server := newTestServer() + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + batch := []BatchElem{ + { + Method: "test_echo", + Args: []interface{}{"hello", 10, &Args{"world"}}, + Result: new(Result), + }, + { + Method: "test_echo", + Args: []interface{}{"hello2", 11, &Args{"world"}}, + Result: new(Result), + }, + { + Method: "no_such_method", + Args: []interface{}{1, 2, 3}, + Result: new(int), + }, + } + if err := client.BatchCall(batch); err != nil { + t.Fatal(err) + } + wantResult := []BatchElem{ + { + Method: "test_echo", + Args: []interface{}{"hello", 10, &Args{"world"}}, + Result: &Result{"hello", 10, &Args{"world"}}, + }, + { + Method: "test_echo", + Args: []interface{}{"hello2", 11, &Args{"world"}}, + Result: &Result{"hello2", 11, &Args{"world"}}, + }, + { + Method: "no_such_method", + Args: []interface{}{1, 2, 3}, + Result: new(int), + Error: &jsonError{Code: -32601, Message: "the method no_such_method does not exist/is not available"}, + }, + } + if !reflect.DeepEqual(batch, wantResult) { + t.Errorf("batch results mismatch:\ngot %swant %s", spew.Sdump(batch), spew.Sdump(wantResult)) + } +} + +func TestClientNotify(t *testing.T) { + server := newTestServer() + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + if err := client.Notify(context.Background(), "test_echo", "hello", 10, &Args{"world"}); err != nil { + t.Fatal(err) + } +} + +// func TestClientCancelInproc(t *testing.T) { testClientCancel("inproc", t) } +func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) } +func TestClientCancelHTTP(t *testing.T) { testClientCancel("http", t) } +func TestClientCancelIPC(t *testing.T) { testClientCancel("ipc", t) } + +// This test checks that requests made through CallContext can be canceled by canceling +// the context. +func testClientCancel(transport string, t *testing.T) { + // These tests take a lot of time, run them all at once. + // You probably want to run with -parallel 1 or comment out + // the call to t.Parallel if you enable the logging. + t.Parallel() + + server := newTestServer() + defer server.Stop() + + // What we want to achieve is that the context gets canceled + // at various stages of request processing. The interesting cases + // are: + // - cancel during dial + // - cancel while performing a HTTP request + // - cancel while waiting for a response + // + // To trigger those, the times are chosen such that connections + // are killed within the deadline for every other call (maxKillTimeout + // is 2x maxCancelTimeout). + // + // Once a connection is dead, there is a fair chance it won't connect + // successfully because the accept is delayed by 1s. + maxContextCancelTimeout := 300 * time.Millisecond + fl := &flakeyListener{ + maxAcceptDelay: 1 * time.Second, + maxKillTimeout: 600 * time.Millisecond, + } + + var client *Client + switch transport { + case "ws", "http": + c, hs := httpTestClient(server, transport, fl) + defer hs.Close() + client = c + case "ipc": + c, l := ipcTestClient(server, fl) + defer l.Close() + client = c + default: + panic("unknown transport: " + transport) + } + + // The actual test starts here. + var ( + wg sync.WaitGroup + nreqs = 10 + ncallers = 6 + ) + caller := func(index int) { + defer wg.Done() + for i := 0; i < nreqs; i++ { + var ( + ctx context.Context + cancel func() + timeout = time.Duration(rand.Int63n(int64(maxContextCancelTimeout))) + ) + if index < ncallers/2 { + // For half of the callers, create a context without deadline + // and cancel it later. + ctx, cancel = context.WithCancel(context.Background()) + time.AfterFunc(timeout, cancel) + } else { + // For the other half, create a context with a deadline instead. This is + // different because the context deadline is used to set the socket write + // deadline. + ctx, cancel = context.WithTimeout(context.Background(), timeout) + } + // Now perform a call with the context. + // The key thing here is that no call will ever complete successfully. + sleepTime := maxContextCancelTimeout + 20*time.Millisecond + err := client.CallContext(ctx, nil, "test_sleep", sleepTime) + if err != nil { + log.Debug(fmt.Sprint("got expected error:", err)) + } else { + t.Errorf("no error for call with %v wait time", timeout) + } + cancel() + } + } + wg.Add(ncallers) + for i := 0; i < ncallers; i++ { + go caller(i) + } + wg.Wait() +} + +func TestClientSubscribeInvalidArg(t *testing.T) { + server := newTestServer() + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + check := func(shouldPanic bool, arg interface{}) { + defer func() { + err := recover() + if shouldPanic && err == nil { + t.Errorf("EthSubscribe should've panicked for %#v", arg) + } + if !shouldPanic && err != nil { + t.Errorf("EthSubscribe shouldn't have panicked for %#v", arg) + buf := make([]byte, 1024*1024) + buf = buf[:runtime.Stack(buf, false)] + t.Error(err) + t.Error(string(buf)) + } + }() + client.EthSubscribe(context.Background(), arg, "foo_bar") + } + check(true, nil) + check(true, 1) + check(true, (chan int)(nil)) + check(true, make(<-chan int)) + check(false, make(chan int)) + check(false, make(chan<- int)) +} + +func TestClientSubscribe(t *testing.T) { + server := newTestServer() + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + nc := make(chan int) + count := 10 + sub, err := client.Subscribe(context.Background(), "nftest", nc, "someSubscription", count, 0) + if err != nil { + t.Fatal("can't subscribe:", err) + } + for i := 0; i < count; i++ { + if val := <-nc; val != i { + t.Fatalf("value mismatch: got %d, want %d", val, i) + } + } + + sub.Unsubscribe() + select { + case v := <-nc: + t.Fatal("received value after unsubscribe:", v) + case err := <-sub.Err(): + if err != nil { + t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err) + } + case <-time.After(1 * time.Second): + t.Fatalf("subscription not closed within 1s after unsubscribe") + } +} + +// In this test, the connection drops while Subscribe is waiting for a response. +func TestClientSubscribeClose(t *testing.T) { + server := newTestServer() + service := ¬ificationTestService{ + gotHangSubscriptionReq: make(chan struct{}), + unblockHangSubscription: make(chan struct{}), + } + if err := server.RegisterName("nftest2", service); err != nil { + t.Fatal(err) + } + + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + var ( + nc = make(chan int) + errc = make(chan error) + sub *ClientSubscription + err error + ) + go func() { + sub, err = client.Subscribe(context.Background(), "nftest2", nc, "hangSubscription", 999) + errc <- err + }() + + <-service.gotHangSubscriptionReq + client.Close() + service.unblockHangSubscription <- struct{}{} + + select { + case err := <-errc: + if err == nil { + t.Errorf("Subscribe returned nil error after Close") + } + if sub != nil { + t.Error("Subscribe returned non-nil subscription after Close") + } + case <-time.After(1 * time.Second): + t.Fatalf("Subscribe did not return within 1s after Close") + } +} + +// This test reproduces https://github.com/ethereum/go-ethereum/issues/17837 where the +// client hangs during shutdown when Unsubscribe races with Client.Close. +func TestClientCloseUnsubscribeRace(t *testing.T) { + server := newTestServer() + defer server.Stop() + + for i := 0; i < 20; i++ { + client := DialInProc(server) + nc := make(chan int) + sub, err := client.Subscribe(context.Background(), "nftest", nc, "someSubscription", 3, 1) + if err != nil { + t.Fatal(err) + } + go client.Close() + go sub.Unsubscribe() + select { + case <-sub.Err(): + case <-time.After(5 * time.Second): + t.Fatal("subscription not closed within timeout") + } + } +} + +// This test checks that Client doesn't lock up when a single subscriber +// doesn't read subscription events. +func TestClientNotificationStorm(t *testing.T) { + server := newTestServer() + defer server.Stop() + + doTest := func(count int, wantError bool) { + client := DialInProc(server) + defer client.Close() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Subscribe on the server. It will start sending many notifications + // very quickly. + nc := make(chan int) + sub, err := client.Subscribe(ctx, "nftest", nc, "someSubscription", count, 0) + if err != nil { + t.Fatal("can't subscribe:", err) + } + defer sub.Unsubscribe() + + // Process each notification, try to run a call in between each of them. + for i := 0; i < count; i++ { + select { + case val := <-nc: + if val != i { + t.Fatalf("(%d/%d) unexpected value %d", i, count, val) + } + case err := <-sub.Err(): + if wantError && err != ErrSubscriptionQueueOverflow { + t.Fatalf("(%d/%d) got error %q, want %q", i, count, err, ErrSubscriptionQueueOverflow) + } else if !wantError { + t.Fatalf("(%d/%d) got unexpected error %q", i, count, err) + } + return + } + var r int + err := client.CallContext(ctx, &r, "nftest_echo", i) + if err != nil { + if !wantError { + t.Fatalf("(%d/%d) call error: %v", i, count, err) + } + return + } + } + if wantError { + t.Fatalf("didn't get expected error") + } + } + + doTest(8000, false) + doTest(21000, true) +} + +func TestClientHTTP(t *testing.T) { + server := newTestServer() + defer server.Stop() + + client, hs := httpTestClient(server, "http", nil) + defer hs.Close() + defer client.Close() + + // Launch concurrent requests. + var ( + results = make([]Result, 100) + errc = make(chan error) + wantResult = Result{"a", 1, new(Args)} + ) + defer client.Close() + for i := range results { + i := i + go func() { + errc <- client.Call(&results[i], "test_echo", + wantResult.String, wantResult.Int, wantResult.Args) + }() + } + + // Wait for all of them to complete. + timeout := time.NewTimer(5 * time.Second) + defer timeout.Stop() + for i := range results { + select { + case err := <-errc: + if err != nil { + t.Fatal(err) + } + case <-timeout.C: + t.Fatalf("timeout (got %d/%d) results)", i+1, len(results)) + } + } + + // Check results. + for i := range results { + if !reflect.DeepEqual(results[i], wantResult) { + t.Errorf("result %d mismatch: got %#v, want %#v", i, results[i], wantResult) + } + } +} + +func TestClientReconnect(t *testing.T) { + startServer := func(addr string) (*Server, net.Listener) { + srv := newTestServer() + l, err := net.Listen("tcp", addr) + if err != nil { + t.Fatal("can't listen:", err) + } + go http.Serve(l, srv.WebsocketHandler([]string{"*"})) + return srv, l + } + + ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second) + defer cancel() + + // Start a server and corresponding client. + s1, l1 := startServer("127.0.0.1:0") + client, err := DialContext(ctx, "ws://"+l1.Addr().String()) + if err != nil { + t.Fatal("can't dial", err) + } + + // Perform a call. This should work because the server is up. + var resp Result + if err := client.CallContext(ctx, &resp, "test_echo", "", 1, nil); err != nil { + t.Fatal(err) + } + + // Shut down the server and allow for some cool down time so we can listen on the same + // address again. + l1.Close() + s1.Stop() + time.Sleep(2 * time.Second) + + // Try calling again. It shouldn't work. + if err := client.CallContext(ctx, &resp, "test_echo", "", 2, nil); err == nil { + t.Error("successful call while the server is down") + t.Logf("resp: %#v", resp) + } + + // Start it up again and call again. The connection should be reestablished. + // We spawn multiple calls here to check whether this hangs somehow. + s2, l2 := startServer(l1.Addr().String()) + defer l2.Close() + defer s2.Stop() + + start := make(chan struct{}) + errors := make(chan error, 20) + for i := 0; i < cap(errors); i++ { + go func() { + <-start + var resp Result + errors <- client.CallContext(ctx, &resp, "test_echo", "", 3, nil) + }() + } + close(start) + errcount := 0 + for i := 0; i < cap(errors); i++ { + if err = <-errors; err != nil { + errcount++ + } + } + t.Logf("%d errors, last error: %v", errcount, err) + if errcount > 1 { + t.Errorf("expected one error after disconnect, got %d", errcount) + } +} + +func httpTestClient(srv *Server, transport string, fl *flakeyListener) (*Client, *httptest.Server) { + // Create the HTTP server. + var hs *httptest.Server + switch transport { + case "ws": + hs = httptest.NewUnstartedServer(srv.WebsocketHandler([]string{"*"})) + case "http": + hs = httptest.NewUnstartedServer(srv) + default: + panic("unknown HTTP transport: " + transport) + } + // Wrap the listener if required. + if fl != nil { + fl.Listener = hs.Listener + hs.Listener = fl + } + // Connect the client. + hs.Start() + client, err := Dial(transport + "://" + hs.Listener.Addr().String()) + if err != nil { + panic(err) + } + return client, hs +} + +func ipcTestClient(srv *Server, fl *flakeyListener) (*Client, net.Listener) { + // Listen on a random endpoint. + endpoint := fmt.Sprintf("go-ethereum-test-ipc-%d-%d", os.Getpid(), rand.Int63()) + if runtime.GOOS == "windows" { + endpoint = `\\.\pipe\` + endpoint + } else { + endpoint = os.TempDir() + "/" + endpoint + } + l, err := ipcListen(endpoint) + if err != nil { + panic(err) + } + // Connect the listener to the server. + if fl != nil { + fl.Listener = l + l = fl + } + go srv.ServeListener(l) + // Connect the client. + client, err := Dial(endpoint) + if err != nil { + panic(err) + } + return client, l +} + +// flakeyListener kills accepted connections after a random timeout. +type flakeyListener struct { + net.Listener + maxKillTimeout time.Duration + maxAcceptDelay time.Duration +} + +func (l *flakeyListener) Accept() (net.Conn, error) { + delay := time.Duration(rand.Int63n(int64(l.maxAcceptDelay))) + time.Sleep(delay) + + c, err := l.Listener.Accept() + if err == nil { + timeout := time.Duration(rand.Int63n(int64(l.maxKillTimeout))) + time.AfterFunc(timeout, func() { + log.Debug(fmt.Sprintf("killing conn %v after %v", c.LocalAddr(), timeout)) + c.Close() + }) + } + return c, err +} diff --git a/rpc/constants_unix_nocgo.go b/rpc/constants_unix_nocgo.go new file mode 100644 index 0000000..ecb231f --- /dev/null +++ b/rpc/constants_unix_nocgo.go @@ -0,0 +1,25 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// +build !cgo,!windows + +package rpc + +var ( + // On Linux, sun_path is 108 bytes in size + // see http://man7.org/linux/man-pages/man7/unix.7.html + max_path_size = 108 +) diff --git a/rpc/doc.go b/rpc/doc.go new file mode 100644 index 0000000..e5840c3 --- /dev/null +++ b/rpc/doc.go @@ -0,0 +1,118 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +/* + +Package rpc implements bi-directional JSON-RPC 2.0 on multiple transports. + +It provides access to the exported methods of an object across a network or other I/O +connection. After creating a server or client instance, objects can be registered to make +them visible as 'services'. Exported methods that follow specific conventions can be +called remotely. It also has support for the publish/subscribe pattern. + +RPC Methods + +Methods that satisfy the following criteria are made available for remote access: + + - method must be exported + - method returns 0, 1 (response or error) or 2 (response and error) values + - method argument(s) must be exported or builtin types + - method returned value(s) must be exported or builtin types + +An example method: + + func (s *CalcService) Add(a, b int) (int, error) + +When the returned error isn't nil the returned integer is ignored and the error is sent +back to the client. Otherwise the returned integer is sent back to the client. + +Optional arguments are supported by accepting pointer values as arguments. E.g. if we want +to do the addition in an optional finite field we can accept a mod argument as pointer +value. + + func (s *CalcService) Add(a, b int, mod *int) (int, error) + +This RPC method can be called with 2 integers and a null value as third argument. In that +case the mod argument will be nil. Or it can be called with 3 integers, in that case mod +will be pointing to the given third argument. Since the optional argument is the last +argument the RPC package will also accept 2 integers as arguments. It will pass the mod +argument as nil to the RPC method. + +The server offers the ServeCodec method which accepts a ServerCodec instance. It will read +requests from the codec, process the request and sends the response back to the client +using the codec. The server can execute requests concurrently. Responses can be sent back +to the client out of order. + +An example server which uses the JSON codec: + + type CalculatorService struct {} + + func (s *CalculatorService) Add(a, b int) int { + return a + b + } + + func (s *CalculatorService) Div(a, b int) (int, error) { + if b == 0 { + return 0, errors.New("divide by zero") + } + return a/b, nil + } + + calculator := new(CalculatorService) + server := NewServer() + server.RegisterName("calculator", calculator") + + l, _ := net.ListenUnix("unix", &net.UnixAddr{Net: "unix", Name: "/tmp/calculator.sock"}) + for { + c, _ := l.AcceptUnix() + codec := v2.NewJSONCodec(c) + go server.ServeCodec(codec, 0) + } + +Subscriptions + +The package also supports the publish subscribe pattern through the use of subscriptions. +A method that is considered eligible for notifications must satisfy the following +criteria: + + - method must be exported + - first method argument type must be context.Context + - method argument(s) must be exported or builtin types + - method must have return types (rpc.Subscription, error) + +An example method: + + func (s *BlockChainService) NewBlocks(ctx context.Context) (rpc.Subscription, error) { + ... + } + +When the service containing the subscription method is registered to the server, for +example under the "blockchain" namespace, a subscription is created by calling the +"blockchain_subscribe" method. + +Subscriptions are deleted when the user sends an unsubscribe request or when the +connection which was used to create the subscription is closed. This can be initiated by +the client and server. The server will close the connection for any write error. + +For more information about subscriptions, see https://github.com/ethereum/go-ethereum/wiki/RPC-PUB-SUB. + +Reverse Calls + +In any method handler, an instance of rpc.Client can be accessed through the +ClientFromContext method. Using this client instance, server-to-client method calls can be +performed on the RPC connection. +*/ +package rpc diff --git a/rpc/errors.go b/rpc/errors.go new file mode 100644 index 0000000..c3aa826 --- /dev/null +++ b/rpc/errors.go @@ -0,0 +1,65 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import "fmt" + +const defaultErrorCode = -32000 + +type methodNotFoundError struct{ method string } + +func (e *methodNotFoundError) ErrorCode() int { return -32601 } + +func (e *methodNotFoundError) Error() string { + return fmt.Sprintf("the method %s does not exist/is not available", e.method) +} + +type subscriptionNotFoundError struct{ namespace, subscription string } + +func (e *subscriptionNotFoundError) ErrorCode() int { return -32601 } + +func (e *subscriptionNotFoundError) Error() string { + return fmt.Sprintf("no %q subscription in %s namespace", e.subscription, e.namespace) +} + +// Invalid JSON was received by the server. +type parseError struct{ message string } + +func (e *parseError) ErrorCode() int { return -32700 } + +func (e *parseError) Error() string { return e.message } + +// received message isn't a valid request +type invalidRequestError struct{ message string } + +func (e *invalidRequestError) ErrorCode() int { return -32600 } + +func (e *invalidRequestError) Error() string { return e.message } + +// received message is invalid +type invalidMessageError struct{ message string } + +func (e *invalidMessageError) ErrorCode() int { return -32700 } + +func (e *invalidMessageError) Error() string { return e.message } + +// unable to decode supplied params, or an invalid number of parameters +type invalidParamsError struct{ message string } + +func (e *invalidParamsError) ErrorCode() int { return -32602 } + +func (e *invalidParamsError) Error() string { return e.message } diff --git a/rpc/gzip.go b/rpc/gzip.go new file mode 100644 index 0000000..a14fd09 --- /dev/null +++ b/rpc/gzip.go @@ -0,0 +1,66 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "compress/gzip" + "io" + "io/ioutil" + "net/http" + "strings" + "sync" +) + +var gzPool = sync.Pool{ + New: func() interface{} { + w := gzip.NewWriter(ioutil.Discard) + return w + }, +} + +type gzipResponseWriter struct { + io.Writer + http.ResponseWriter +} + +func (w *gzipResponseWriter) WriteHeader(status int) { + w.Header().Del("Content-Length") + w.ResponseWriter.WriteHeader(status) +} + +func (w *gzipResponseWriter) Write(b []byte) (int, error) { + return w.Writer.Write(b) +} + +func newGzipHandler(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { + next.ServeHTTP(w, r) + return + } + + w.Header().Set("Content-Encoding", "gzip") + + gz := gzPool.Get().(*gzip.Writer) + defer gzPool.Put(gz) + + gz.Reset(w) + defer gz.Close() + + next.ServeHTTP(&gzipResponseWriter{ResponseWriter: w, Writer: gz}, r) + }) +} diff --git a/rpc/handler.go b/rpc/handler.go new file mode 100644 index 0000000..187d0f8 --- /dev/null +++ b/rpc/handler.go @@ -0,0 +1,397 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "context" + "encoding/json" + "reflect" + "strconv" + "strings" + "sync" + "time" + + "github.com/ava-labs/go-ethereum/log" +) + +// handler handles JSON-RPC messages. There is one handler per connection. Note that +// handler is not safe for concurrent use. Message handling never blocks indefinitely +// because RPCs are processed on background goroutines launched by handler. +// +// The entry points for incoming messages are: +// +// h.handleMsg(message) +// h.handleBatch(message) +// +// Outgoing calls use the requestOp struct. Register the request before sending it +// on the connection: +// +// op := &requestOp{ids: ...} +// h.addRequestOp(op) +// +// Now send the request, then wait for the reply to be delivered through handleMsg: +// +// if err := op.wait(...); err != nil { +// h.removeRequestOp(op) // timeout, etc. +// } +// +type handler struct { + reg *serviceRegistry + unsubscribeCb *callback + idgen func() ID // subscription ID generator + respWait map[string]*requestOp // active client requests + clientSubs map[string]*ClientSubscription // active client subscriptions + callWG sync.WaitGroup // pending call goroutines + rootCtx context.Context // canceled by close() + cancelRoot func() // cancel function for rootCtx + conn jsonWriter // where responses will be sent + log log.Logger + allowSubscribe bool + + subLock sync.Mutex + serverSubs map[ID]*Subscription +} + +type callProc struct { + ctx context.Context + notifiers []*Notifier +} + +func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler { + rootCtx, cancelRoot := context.WithCancel(connCtx) + h := &handler{ + reg: reg, + idgen: idgen, + conn: conn, + respWait: make(map[string]*requestOp), + clientSubs: make(map[string]*ClientSubscription), + rootCtx: rootCtx, + cancelRoot: cancelRoot, + allowSubscribe: true, + serverSubs: make(map[ID]*Subscription), + log: log.Root(), + } + if conn.RemoteAddr() != "" { + h.log = h.log.New("conn", conn.RemoteAddr()) + } + h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe)) + return h +} + +// handleBatch executes all messages in a batch and returns the responses. +func (h *handler) handleBatch(msgs []*jsonrpcMessage) { + // Emit error response for empty batches: + if len(msgs) == 0 { + h.startCallProc(func(cp *callProc) { + h.conn.Write(cp.ctx, errorMessage(&invalidRequestError{"empty batch"})) + }) + return + } + + // Handle non-call messages first: + calls := make([]*jsonrpcMessage, 0, len(msgs)) + for _, msg := range msgs { + if handled := h.handleImmediate(msg); !handled { + calls = append(calls, msg) + } + } + if len(calls) == 0 { + return + } + // Process calls on a goroutine because they may block indefinitely: + h.startCallProc(func(cp *callProc) { + answers := make([]*jsonrpcMessage, 0, len(msgs)) + for _, msg := range calls { + if answer := h.handleCallMsg(cp, msg); answer != nil { + answers = append(answers, answer) + } + } + h.addSubscriptions(cp.notifiers) + if len(answers) > 0 { + h.conn.Write(cp.ctx, answers) + } + for _, n := range cp.notifiers { + n.activate() + } + }) +} + +// handleMsg handles a single message. +func (h *handler) handleMsg(msg *jsonrpcMessage) { + if ok := h.handleImmediate(msg); ok { + return + } + h.startCallProc(func(cp *callProc) { + answer := h.handleCallMsg(cp, msg) + h.addSubscriptions(cp.notifiers) + if answer != nil { + h.conn.Write(cp.ctx, answer) + } + for _, n := range cp.notifiers { + n.activate() + } + }) +} + +// close cancels all requests except for inflightReq and waits for +// call goroutines to shut down. +func (h *handler) close(err error, inflightReq *requestOp) { + h.cancelAllRequests(err, inflightReq) + h.callWG.Wait() + h.cancelRoot() + h.cancelServerSubscriptions(err) +} + +// addRequestOp registers a request operation. +func (h *handler) addRequestOp(op *requestOp) { + for _, id := range op.ids { + h.respWait[string(id)] = op + } +} + +// removeRequestOps stops waiting for the given request IDs. +func (h *handler) removeRequestOp(op *requestOp) { + for _, id := range op.ids { + delete(h.respWait, string(id)) + } +} + +// cancelAllRequests unblocks and removes pending requests and active subscriptions. +func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) { + didClose := make(map[*requestOp]bool) + if inflightReq != nil { + didClose[inflightReq] = true + } + + for id, op := range h.respWait { + // Remove the op so that later calls will not close op.resp again. + delete(h.respWait, id) + + if !didClose[op] { + op.err = err + close(op.resp) + didClose[op] = true + } + } + for id, sub := range h.clientSubs { + delete(h.clientSubs, id) + sub.quitWithError(err, false) + } +} + +func (h *handler) addSubscriptions(nn []*Notifier) { + h.subLock.Lock() + defer h.subLock.Unlock() + + for _, n := range nn { + if sub := n.takeSubscription(); sub != nil { + h.serverSubs[sub.ID] = sub + } + } +} + +// cancelServerSubscriptions removes all subscriptions and closes their error channels. +func (h *handler) cancelServerSubscriptions(err error) { + h.subLock.Lock() + defer h.subLock.Unlock() + + for id, s := range h.serverSubs { + s.err <- err + close(s.err) + delete(h.serverSubs, id) + } +} + +// startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group. +func (h *handler) startCallProc(fn func(*callProc)) { + h.callWG.Add(1) + go func() { + ctx, cancel := context.WithCancel(h.rootCtx) + defer h.callWG.Done() + defer cancel() + fn(&callProc{ctx: ctx}) + }() +} + +// handleImmediate executes non-call messages. It returns false if the message is a +// call or requires a reply. +func (h *handler) handleImmediate(msg *jsonrpcMessage) bool { + start := time.Now() + switch { + case msg.isNotification(): + if strings.HasSuffix(msg.Method, notificationMethodSuffix) { + h.handleSubscriptionResult(msg) + return true + } + return false + case msg.isResponse(): + h.handleResponse(msg) + h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "t", time.Since(start)) + return true + default: + return false + } +} + +// handleSubscriptionResult processes subscription notifications. +func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) { + var result subscriptionResult + if err := json.Unmarshal(msg.Params, &result); err != nil { + h.log.Debug("Dropping invalid subscription message") + return + } + if h.clientSubs[result.ID] != nil { + h.clientSubs[result.ID].deliver(result.Result) + } +} + +// handleResponse processes method call responses. +func (h *handler) handleResponse(msg *jsonrpcMessage) { + op := h.respWait[string(msg.ID)] + if op == nil { + h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID}) + return + } + delete(h.respWait, string(msg.ID)) + // For normal responses, just forward the reply to Call/BatchCall. + if op.sub == nil { + op.resp <- msg + return + } + // For subscription responses, start the subscription if the server + // indicates success. EthSubscribe gets unblocked in either case through + // the op.resp channel. + defer close(op.resp) + if msg.Error != nil { + op.err = msg.Error + return + } + if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil { + go op.sub.start() + h.clientSubs[op.sub.subid] = op.sub + } +} + +// handleCallMsg executes a call message and returns the answer. +func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage { + start := time.Now() + switch { + case msg.isNotification(): + h.handleCall(ctx, msg) + h.log.Debug("Served "+msg.Method, "t", time.Since(start)) + return nil + case msg.isCall(): + resp := h.handleCall(ctx, msg) + if resp.Error != nil { + h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message) + } else { + h.log.Debug("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start)) + } + return resp + case msg.hasValidID(): + return msg.errorResponse(&invalidRequestError{"invalid request"}) + default: + return errorMessage(&invalidRequestError{"invalid request"}) + } +} + +// handleCall processes method calls. +func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage { + if msg.isSubscribe() { + return h.handleSubscribe(cp, msg) + } + var callb *callback + if msg.isUnsubscribe() { + callb = h.unsubscribeCb + } else { + callb = h.reg.callback(msg.Method) + } + if callb == nil { + return msg.errorResponse(&methodNotFoundError{method: msg.Method}) + } + args, err := parsePositionalArguments(msg.Params, callb.argTypes) + if err != nil { + return msg.errorResponse(&invalidParamsError{err.Error()}) + } + + return h.runMethod(cp.ctx, msg, callb, args) +} + +// handleSubscribe processes *_subscribe method calls. +func (h *handler) handleSubscribe(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage { + if !h.allowSubscribe { + return msg.errorResponse(ErrNotificationsUnsupported) + } + + // Subscription method name is first argument. + name, err := parseSubscriptionName(msg.Params) + if err != nil { + return msg.errorResponse(&invalidParamsError{err.Error()}) + } + namespace := msg.namespace() + callb := h.reg.subscription(namespace, name) + if callb == nil { + return msg.errorResponse(&subscriptionNotFoundError{namespace, name}) + } + + // Parse subscription name arg too, but remove it before calling the callback. + argTypes := append([]reflect.Type{stringType}, callb.argTypes...) + args, err := parsePositionalArguments(msg.Params, argTypes) + if err != nil { + return msg.errorResponse(&invalidParamsError{err.Error()}) + } + args = args[1:] + + // Install notifier in context so the subscription handler can find it. + n := &Notifier{h: h, namespace: namespace} + cp.notifiers = append(cp.notifiers, n) + ctx := context.WithValue(cp.ctx, notifierKey{}, n) + + return h.runMethod(ctx, msg, callb, args) +} + +// runMethod runs the Go callback for an RPC method. +func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *callback, args []reflect.Value) *jsonrpcMessage { + result, err := callb.call(ctx, msg.Method, args) + if err != nil { + return msg.errorResponse(err) + } + return msg.response(result) +} + +// unsubscribe is the callback function for all *_unsubscribe calls. +func (h *handler) unsubscribe(ctx context.Context, id ID) (bool, error) { + h.subLock.Lock() + defer h.subLock.Unlock() + + s := h.serverSubs[id] + if s == nil { + return false, ErrSubscriptionNotFound + } + close(s.err) + delete(h.serverSubs, id) + return true, nil +} + +type idForLog struct{ json.RawMessage } + +func (id idForLog) String() string { + if s, err := strconv.Unquote(string(id.RawMessage)); err == nil { + return s + } + return string(id.RawMessage) +} diff --git a/rpc/http.go b/rpc/http.go new file mode 100644 index 0000000..2dffc5d --- /dev/null +++ b/rpc/http.go @@ -0,0 +1,359 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "mime" + "net" + "net/http" + "strings" + "sync" + "time" + + "github.com/ava-labs/go-ethereum/log" + "github.com/rs/cors" +) + +const ( + maxRequestContentLength = 1024 * 1024 * 5 + contentType = "application/json" +) + +// https://www.jsonrpc.org/historical/json-rpc-over-http.html#id13 +var acceptedContentTypes = []string{contentType, "application/json-rpc", "application/jsonrequest"} + +type httpConn struct { + client *http.Client + req *http.Request + closeOnce sync.Once + closed chan interface{} +} + +// httpConn is treated specially by Client. +func (hc *httpConn) Write(context.Context, interface{}) error { + panic("Write called on httpConn") +} + +func (hc *httpConn) RemoteAddr() string { + return hc.req.URL.String() +} + +func (hc *httpConn) Read() ([]*jsonrpcMessage, bool, error) {