aboutsummaryrefslogtreecommitdiff
path: root/rpc
diff options
context:
space:
mode:
authorDeterminant <tederminant@gmail.com>2020-06-28 14:47:41 -0400
committerDeterminant <tederminant@gmail.com>2020-06-28 14:47:41 -0400
commitd235e2c6a5788ec4a6cff15a16f56b38a3876a0d (patch)
tree5f2727f7a50ee5840f889c82776d3a30a88dd59b /rpc
parent13ebd8bd9468e9d769d598b0ca2afb72ba78cb97 (diff)
...
Diffstat (limited to 'rpc')
-rw-r--r--rpc/client.go625
-rw-r--r--rpc/client_example_test.go88
-rw-r--r--rpc/client_test.go569
-rw-r--r--rpc/constants_unix_nocgo.go25
-rw-r--r--rpc/doc.go118
-rw-r--r--rpc/errors.go65
-rw-r--r--rpc/gzip.go66
-rw-r--r--rpc/handler.go397
-rw-r--r--rpc/http.go359
-rw-r--r--rpc/http_test.go54
-rw-r--r--rpc/ipc_js.go37
-rw-r--r--rpc/json.go335
-rw-r--r--rpc/server.go147
-rw-r--r--rpc/server_test.go152
-rw-r--r--rpc/service.go285
-rw-r--r--rpc/subscription.go327
-rw-r--r--rpc/subscription_test.go206
-rw-r--r--rpc/testservice_test.go180
-rw-r--r--rpc/types.go34
-rw-r--r--rpc/types_test.go66
-rw-r--r--rpc/websocket.go175
-rw-r--r--rpc/websocket_test.go259
22 files changed, 4569 insertions, 0 deletions
diff --git a/rpc/client.go b/rpc/client.go
new file mode 100644
index 0000000..1c7058b
--- /dev/null
+++ b/rpc/client.go
@@ -0,0 +1,625 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rpc
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net/url"
+ "reflect"
+ "strconv"
+ "sync/atomic"
+ "time"
+
+ "github.com/ava-labs/go-ethereum/log"
+)
+
+var (
+ ErrClientQuit = errors.New("client is closed")
+ ErrNoResult = errors.New("no result in JSON-RPC response")
+ ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow")
+ errClientReconnected = errors.New("client reconnected")
+ errDead = errors.New("connection lost")
+)
+
+const (
+ // Timeouts
+ defaultDialTimeout = 10 * time.Second // used if context has no deadline
+ subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls
+)
+
+const (
+ // Subscriptions are removed when the subscriber cannot keep up.
+ //
+ // This can be worked around by supplying a channel with sufficiently sized buffer,
+ // but this can be inconvenient and hard to explain in the docs. Another issue with
+ // buffered channels is that the buffer is static even though it might not be needed
+ // most of the time.
+ //
+ // The approach taken here is to maintain a per-subscription linked list buffer
+ // shrinks on demand. If the buffer reaches the size below, the subscription is
+ // dropped.
+ maxClientSubscriptionBuffer = 20000
+)
+
+// BatchElem is an element in a batch request.
+type BatchElem struct {
+ Method string
+ Args []interface{}
+ // The result is unmarshaled into this field. Result must be set to a
+ // non-nil pointer value of the desired type, otherwise the response will be
+ // discarded.
+ Result interface{}
+ // Error is set if the server returns an error for this request, or if
+ // unmarshaling into Result fails. It is not set for I/O errors.
+ Error error
+}
+
+// Client represents a connection to an RPC server.
+type Client struct {
+ idgen func() ID // for subscriptions
+ isHTTP bool
+ services *serviceRegistry
+
+ idCounter uint32
+
+ // This function, if non-nil, is called when the connection is lost.
+ reconnectFunc reconnectFunc
+
+ // writeConn is used for writing to the connection on the caller's goroutine. It should
+ // only be accessed outside of dispatch, with the write lock held. The write lock is
+ // taken by sending on requestOp and released by sending on sendDone.
+ writeConn jsonWriter
+
+ // for dispatch
+ close chan struct{}
+ closing chan struct{} // closed when client is quitting
+ didClose chan struct{} // closed when client quits
+ reconnected chan ServerCodec // where write/reconnect sends the new connection
+ readOp chan readOp // read messages
+ readErr chan error // errors from read
+ reqInit chan *requestOp // register response IDs, takes write lock
+ reqSent chan error // signals write completion, releases write lock
+ reqTimeout chan *requestOp // removes response IDs when call timeout expires
+}
+
+type reconnectFunc func(ctx context.Context) (ServerCodec, error)
+
+type clientContextKey struct{}
+
+type clientConn struct {
+ codec ServerCodec
+ handler *handler
+}
+
+func (c *Client) newClientConn(conn ServerCodec) *clientConn {
+ ctx := context.WithValue(context.Background(), clientContextKey{}, c)
+ handler := newHandler(ctx, conn, c.idgen, c.services)
+ return &clientConn{conn, handler}
+}
+
+func (cc *clientConn) close(err error, inflightReq *requestOp) {
+ cc.handler.close(err, inflightReq)
+ cc.codec.Close()
+}
+
+type readOp struct {
+ msgs []*jsonrpcMessage
+ batch bool
+}
+
+type requestOp struct {
+ ids []json.RawMessage
+ err error
+ resp chan *jsonrpcMessage // receives up to len(ids) responses
+ sub *ClientSubscription // only set for EthSubscribe requests
+}
+
+func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, error) {
+ select {
+ case <-ctx.Done():
+ // Send the timeout to dispatch so it can remove the request IDs.
+ if !c.isHTTP {
+ select {
+ case c.reqTimeout <- op:
+ case <-c.closing:
+ }
+ }
+ return nil, ctx.Err()
+ case resp := <-op.resp:
+ return resp, op.err
+ }
+}
+
+// Dial creates a new client for the given URL.
+//
+// The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a
+// file name with no URL scheme, a local socket connection is established using UNIX
+// domain sockets on supported platforms and named pipes on Windows. If you want to
+// configure transport options, use DialHTTP, DialWebsocket or DialIPC instead.
+//
+// For websocket connections, the origin is set to the local host name.
+//
+// The client reconnects automatically if the connection is lost.
+func Dial(rawurl string) (*Client, error) {
+ return DialContext(context.Background(), rawurl)
+}
+
+// DialContext creates a new RPC client, just like Dial.
+//
+// The context is used to cancel or time out the initial connection establishment. It does
+// not affect subsequent interactions with the client.
+func DialContext(ctx context.Context, rawurl string) (*Client, error) {
+ u, err := url.Parse(rawurl)
+ if err != nil {
+ return nil, err
+ }
+ switch u.Scheme {
+ case "http", "https":
+ return DialHTTP(rawurl)
+ case "ws", "wss":
+ return DialWebsocket(ctx, rawurl, "")
+ //case "stdio":
+ // return DialStdIO(ctx)
+ //case "":
+ // return DialIPC(ctx, rawurl)
+ default:
+ return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme)
+ }
+}
+
+// Client retrieves the client from the context, if any. This can be used to perform
+// 'reverse calls' in a handler method.
+func ClientFromContext(ctx context.Context) (*Client, bool) {
+ client, ok := ctx.Value(clientContextKey{}).(*Client)
+ return client, ok
+}
+
+func newClient(initctx context.Context, connect reconnectFunc) (*Client, error) {
+ conn, err := connect(initctx)
+ if err != nil {
+ return nil, err
+ }
+ c := initClient(conn, randomIDGenerator(), new(serviceRegistry))
+ c.reconnectFunc = connect
+ return c, nil
+}
+
+func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client {
+ _, isHTTP := conn.(*httpConn)
+ c := &Client{
+ idgen: idgen,
+ isHTTP: isHTTP,
+ services: services,
+ writeConn: conn,
+ close: make(chan struct{}),
+ closing: make(chan struct{}),
+ didClose: make(chan struct{}),
+ reconnected: make(chan ServerCodec),
+ readOp: make(chan readOp),
+ readErr: make(chan error),
+ reqInit: make(chan *requestOp),
+ reqSent: make(chan error, 1),
+ reqTimeout: make(chan *requestOp),
+ }
+ if !isHTTP {
+ go c.dispatch(conn)
+ }
+ return c
+}
+
+// RegisterName creates a service for the given receiver type under the given name. When no
+// methods on the given receiver match the criteria to be either a RPC method or a
+// subscription an error is returned. Otherwise a new service is created and added to the
+// service collection this client provides to the server.
+func (c *Client) RegisterName(name string, receiver interface{}) error {
+ return c.services.registerName(name, receiver)
+}
+
+func (c *Client) nextID() json.RawMessage {
+ id := atomic.AddUint32(&c.idCounter, 1)
+ return strconv.AppendUint(nil, uint64(id), 10)
+}
+
+// SupportedModules calls the rpc_modules method, retrieving the list of
+// APIs that are available on the server.
+func (c *Client) SupportedModules() (map[string]string, error) {
+ var result map[string]string
+ ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
+ defer cancel()
+ err := c.CallContext(ctx, &result, "rpc_modules")
+ return result, err
+}
+
+// Close closes the client, aborting any in-flight requests.
+func (c *Client) Close() {
+ if c.isHTTP {
+ return
+ }
+ select {
+ case c.close <- struct{}{}:
+ <-c.didClose
+ case <-c.didClose:
+ }
+}
+
+// Call performs a JSON-RPC call with the given arguments and unmarshals into
+// result if no error occurred.
+//
+// The result must be a pointer so that package json can unmarshal into it. You
+// can also pass nil, in which case the result is ignored.
+func (c *Client) Call(result interface{}, method string, args ...interface{}) error {
+ ctx := context.Background()
+ return c.CallContext(ctx, result, method, args...)
+}
+
+// CallContext performs a JSON-RPC call with the given arguments. If the context is
+// canceled before the call has successfully returned, CallContext returns immediately.
+//
+// The result must be a pointer so that package json can unmarshal into it. You
+// can also pass nil, in which case the result is ignored.
+func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
+ msg, err := c.newMessage(method, args...)
+ if err != nil {
+ return err
+ }
+ op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
+
+ if c.isHTTP {
+ err = c.sendHTTP(ctx, op, msg)
+ } else {
+ err = c.send(ctx, op, msg)
+ }
+ if err != nil {
+ return err
+ }
+
+ // dispatch has accepted the request and will close the channel when it quits.
+ switch resp, err := op.wait(ctx, c); {
+ case err != nil:
+ return err
+ case resp.Error != nil:
+ return resp.Error
+ case len(resp.Result) == 0:
+ return ErrNoResult
+ default:
+ return json.Unmarshal(resp.Result, &result)
+ }
+}
+
+// BatchCall sends all given requests as a single batch and waits for the server
+// to return a response for all of them.
+//
+// In contrast to Call, BatchCall only returns I/O errors. Any error specific to
+// a request is reported through the Error field of the corresponding BatchElem.
+//
+// Note that batch calls may not be executed atomically on the server side.
+func (c *Client) BatchCall(b []BatchElem) error {
+ ctx := context.Background()
+ return c.BatchCallContext(ctx, b)
+}
+
+// BatchCall sends all given requests as a single batch and waits for the server
+// to return a response for all of them. The wait duration is bounded by the
+// context's deadline.
+//
+// In contrast to CallContext, BatchCallContext only returns errors that have occurred
+// while sending the request. Any error specific to a request is reported through the
+// Error field of the corresponding BatchElem.
+//
+// Note that batch calls may not be executed atomically on the server side.
+func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
+ msgs := make([]*jsonrpcMessage, len(b))
+ op := &requestOp{
+ ids: make([]json.RawMessage, len(b)),
+ resp: make(chan *jsonrpcMessage, len(b)),
+ }
+ for i, elem := range b {
+ msg, err := c.newMessage(elem.Method, elem.Args...)
+ if err != nil {
+ return err
+ }
+ msgs[i] = msg
+ op.ids[i] = msg.ID
+ }
+
+ var err error
+ if c.isHTTP {
+ err = c.sendBatchHTTP(ctx, op, msgs)
+ } else {
+ err = c.send(ctx, op, msgs)
+ }
+
+ // Wait for all responses to come back.
+ for n := 0; n < len(b) && err == nil; n++ {
+ var resp *jsonrpcMessage
+ resp, err = op.wait(ctx, c)
+ if err != nil {
+ break
+ }
+ // Find the element corresponding to this response.
+ // The element is guaranteed to be present because dispatch
+ // only sends valid IDs to our channel.
+ var elem *BatchElem
+ for i := range msgs {
+ if bytes.Equal(msgs[i].ID, resp.ID) {
+ elem = &b[i]
+ break
+ }
+ }
+ if resp.Error != nil {
+ elem.Error = resp.Error
+ continue
+ }
+ if len(resp.Result) == 0 {
+ elem.Error = ErrNoResult
+ continue
+ }
+ elem.Error = json.Unmarshal(resp.Result, elem.Result)
+ }
+ return err
+}
+
+// Notify sends a notification, i.e. a method call that doesn't expect a response.
+func (c *Client) Notify(ctx context.Context, method string, args ...interface{}) error {
+ op := new(requestOp)
+ msg, err := c.newMessage(method, args...)
+ if err != nil {
+ return err
+ }
+ msg.ID = nil
+
+ if c.isHTTP {
+ return c.sendHTTP(ctx, op, msg)
+ } else {
+ return c.send(ctx, op, msg)
+ }
+}
+
+// EthSubscribe registers a subscripion under the "eth" namespace.
+func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
+ return c.Subscribe(ctx, "eth", channel, args...)
+}
+
+// ShhSubscribe registers a subscripion under the "shh" namespace.
+func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
+ return c.Subscribe(ctx, "shh", channel, args...)
+}
+
+// Subscribe calls the "<namespace>_subscribe" method with the given arguments,
+// registering a subscription. Server notifications for the subscription are
+// sent to the given channel. The element type of the channel must match the
+// expected type of content returned by the subscription.
+//
+// The context argument cancels the RPC request that sets up the subscription but has no
+// effect on the subscription after Subscribe has returned.
+//
+// Slow subscribers will be dropped eventually. Client buffers up to 20000 notifications
+// before considering the subscriber dead. The subscription Err channel will receive
+// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure
+// that the channel usually has at least one reader to prevent this issue.
+func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
+ // Check type of channel first.
+ chanVal := reflect.ValueOf(channel)
+ if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
+ panic("first argument to Subscribe must be a writable channel")
+ }
+ if chanVal.IsNil() {
+ panic("channel given to Subscribe must not be nil")
+ }
+ if c.isHTTP {
+ return nil, ErrNotificationsUnsupported
+ }
+
+ msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...)
+ if err != nil {
+ return nil, err
+ }
+ op := &requestOp{
+ ids: []json.RawMessage{msg.ID},
+ resp: make(chan *jsonrpcMessage),
+ sub: newClientSubscription(c, namespace, chanVal),
+ }
+
+ // Send the subscription request.
+ // The arrival and validity of the response is signaled on sub.quit.
+ if err := c.send(ctx, op, msg); err != nil {
+ return nil, err
+ }
+ if _, err := op.wait(ctx, c); err != nil {
+ return nil, err
+ }
+ return op.sub, nil
+}
+
+func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) {
+ msg := &jsonrpcMessage{Version: vsn, ID: c.nextID(), Method: method}
+ if paramsIn != nil { // prevent sending "params":null
+ var err error
+ if msg.Params, err = json.Marshal(paramsIn); err != nil {
+ return nil, err
+ }
+ }
+ return msg, nil
+}
+
+// send registers op with the dispatch loop, then sends msg on the connection.
+// if sending fails, op is deregistered.
+func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
+ select {
+ case c.reqInit <- op:
+ err := c.write(ctx, msg)
+ c.reqSent <- err
+ return err
+ case <-ctx.Done():
+ // This can happen if the client is overloaded or unable to keep up with
+ // subscription notifications.
+ return ctx.Err()
+ case <-c.closing:
+ return ErrClientQuit
+ }
+}
+
+func (c *Client) write(ctx context.Context, msg interface{}) error {
+ // The previous write failed. Try to establish a new connection.
+ if c.writeConn == nil {
+ if err := c.reconnect(ctx); err != nil {
+ return err
+ }
+ }
+ err := c.writeConn.Write(ctx, msg)
+ if err != nil {
+ c.writeConn = nil
+ }
+ return err
+}
+
+func (c *Client) reconnect(ctx context.Context) error {
+ if c.reconnectFunc == nil {
+ return errDead
+ }
+
+ if _, ok := ctx.Deadline(); !ok {
+ var cancel func()
+ ctx, cancel = context.WithTimeout(ctx, defaultDialTimeout)
+ defer cancel()
+ }
+ newconn, err := c.reconnectFunc(ctx)
+ if err != nil {
+ log.Trace("RPC client reconnect failed", "err", err)
+ return err
+ }
+ select {
+ case c.reconnected <- newconn:
+ c.writeConn = newconn
+ return nil
+ case <-c.didClose:
+ newconn.Close()
+ return ErrClientQuit
+ }
+}
+
+// dispatch is the main loop of the client.
+// It sends read messages to waiting calls to Call and BatchCall
+// and subscription notifications to registered subscriptions.
+func (c *Client) dispatch(codec ServerCodec) {
+ var (
+ lastOp *requestOp // tracks last send operation
+ reqInitLock = c.reqInit // nil while the send lock is held
+ conn = c.newClientConn(codec)
+ reading = true
+ )
+ defer func() {
+ close(c.closing)
+ if reading {
+ conn.close(ErrClientQuit, nil)
+ c.drainRead()
+ }
+ close(c.didClose)
+ }()
+
+ // Spawn the initial read loop.
+ go c.read(codec)
+
+ for {
+ select {
+ case <-c.close:
+ return
+
+ // Read path:
+ case op := <-c.readOp:
+ if op.batch {
+ conn.handler.handleBatch(op.msgs)
+ } else {
+ conn.handler.handleMsg(op.msgs[0])
+ }
+
+ case err := <-c.readErr:
+ conn.handler.log.Debug("RPC connection read error", "err", err)
+ conn.close(err, lastOp)
+ reading = false
+
+ // Reconnect:
+ case newcodec := <-c.reconnected:
+ log.Debug("RPC client reconnected", "reading", reading, "conn", newcodec.RemoteAddr())
+ if reading {
+ // Wait for the previous read loop to exit. This is a rare case which
+ // happens if this loop isn't notified in time after the connection breaks.
+ // In those cases the caller will notice first and reconnect. Closing the
+ // handler terminates all waiting requests (closing op.resp) except for
+ // lastOp, which will be transferred to the new handler.
+ conn.close(errClientReconnected, lastOp)
+ c.drainRead()
+ }
+ go c.read(newcodec)
+ reading = true
+ conn = c.newClientConn(newcodec)
+ // Re-register the in-flight request on the new handler
+ // because that's where it will be sent.
+ conn.handler.addRequestOp(lastOp)
+
+ // Send path:
+ case op := <-reqInitLock:
+ // Stop listening for further requests until the current one has been sent.
+ reqInitLock = nil
+ lastOp = op
+ conn.handler.addRequestOp(op)
+
+ case err := <-c.reqSent:
+ if err != nil {
+ // Remove response handlers for the last send. When the read loop
+ // goes down, it will signal all other current operations.
+ conn.handler.removeRequestOp(lastOp)
+ }
+ // Let the next request in.
+ reqInitLock = c.reqInit
+ lastOp = nil
+
+ case op := <-c.reqTimeout:
+ conn.handler.removeRequestOp(op)
+ }
+ }
+}
+
+// drainRead drops read messages until an error occurs.
+func (c *Client) drainRead() {
+ for {
+ select {
+ case <-c.readOp:
+ case <-c.readErr:
+ return
+ }
+ }
+}
+
+// read decodes RPC messages from a codec, feeding them into dispatch.
+func (c *Client) read(codec ServerCodec) {
+ for {
+ msgs, batch, err := codec.Read()
+ if _, ok := err.(*json.SyntaxError); ok {
+ codec.Write(context.Background(), errorMessage(&parseError{err.Error()}))
+ }
+ if err != nil {
+ c.readErr <- err
+ return
+ }
+ c.readOp <- readOp{msgs, batch}
+ }
+}
diff --git a/rpc/client_example_test.go b/rpc/client_example_test.go
new file mode 100644
index 0000000..149de2c
--- /dev/null
+++ b/rpc/client_example_test.go
@@ -0,0 +1,88 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rpc_test
+
+import (
+ "context"
+ "fmt"
+ "math/big"
+ "time"
+
+ "github.com/ava-labs/go-ethereum/rpc"
+)
+
+// In this example, our client wishes to track the latest 'block number'
+// known to the server. The server supports two methods:
+//
+// eth_getBlockByNumber("latest", {})
+// returns the latest block object.
+//
+// eth_subscribe("newBlocks")
+// creates a subscription which fires block objects when new blocks arrive.
+
+type Block struct {
+ Number *big.Int
+}
+
+func ExampleClientSubscription() {
+ // Connect the client.
+ client, _ := rpc.Dial("ws://127.0.0.1:8485")
+ subch := make(chan Block)
+
+ // Ensure that subch receives the latest block.
+ go func() {
+ for i := 0; ; i++ {
+ if i > 0 {
+ time.Sleep(2 * time.Second)
+ }
+ subscribeBlocks(client, subch)
+ }
+ }()
+
+ // Print events from the subscription as they arrive.
+ for block := range subch {
+ fmt.Println("latest block:", block.Number)
+ }
+}
+
+// subscribeBlocks runs in its own goroutine and maintains
+// a subscription for new blocks.
+func subscribeBlocks(client *rpc.Client, subch chan Block) {
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ // Subscribe to new blocks.
+ sub, err := client.EthSubscribe(ctx, subch, "newHeads")
+ if err != nil {
+ fmt.Println("subscribe error:", err)
+ return
+ }
+
+ // The connection is established now.
+ // Update the channel with the current block.
+ var lastBlock Block
+ if err := client.CallContext(ctx, &lastBlock, "eth_getBlockByNumber", "latest"); err != nil {
+ fmt.Println("can't get latest block:", err)
+ return
+ }
+ subch <- lastBlock
+
+ // The subscription will deliver events to the channel. Wait for the
+ // subscription to end for any reason, then loop around to re-establish
+ // the connection.
+ fmt.Println("connection lost: ", <-sub.Err())
+}
diff --git a/rpc/client_test.go b/rpc/client_test.go
new file mode 100644
index 0000000..79ea32e
--- /dev/null
+++ b/rpc/client_test.go
@@ -0,0 +1,569 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rpc
+
+import (
+ "context"
+ "fmt"
+ "math/rand"
+ "net"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "reflect"
+ "runtime"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/davecgh/go-spew/spew"
+ "github.com/ava-labs/go-ethereum/log"
+)
+
+func TestClientRequest(t *testing.T) {
+ server := newTestServer()
+ defer server.Stop()
+ client := DialInProc(server)
+ defer client.Close()
+
+ var resp Result
+ if err := client.Call(&resp, "test_echo", "hello", 10, &Args{"world"}); err != nil {
+ t.Fatal(err)
+ }
+ if !reflect.DeepEqual(resp, Result{"hello", 10, &Args{"world"}}) {
+ t.Errorf("incorrect result %#v", resp)
+ }
+}
+
+func TestClientBatchRequest(t *testing.T) {
+ server := newTestServer()
+ defer server.Stop()
+ client := DialInProc(server)
+ defer client.Close()
+
+ batch := []BatchElem{
+ {
+ Method: "test_echo",
+ Args: []interface{}{"hello", 10, &Args{"world"}},
+ Result: new(Result),
+ },
+ {
+ Method: "test_echo",
+ Args: []interface{}{"hello2", 11, &Args{"world"}},
+ Result: new(Result),
+ },
+ {
+ Method: "no_such_method",
+ Args: []interface{}{1, 2, 3},
+ Result: new(int),
+ },
+ }
+ if err := client.BatchCall(batch); err != nil {
+ t.Fatal(err)
+ }
+ wantResult := []BatchElem{
+ {
+ Method: "test_echo",
+ Args: []interface{}{"hello", 10, &Args{"world"}},
+ Result: &Result{"hello", 10, &Args{"world"}},
+ },
+ {
+ Method: "test_echo",
+ Args: []interface{}{"hello2", 11, &Args{"world"}},
+ Result: &Result{"hello2", 11, &Args{"world"}},
+ },
+ {
+ Method: "no_such_method",
+ Args: []interface{}{1, 2, 3},
+ Result: new(int),
+ Error: &jsonError{Code: -32601, Message: "the method no_such_method does not exist/is not available"},
+ },
+ }
+ if !reflect.DeepEqual(batch, wantResult) {
+ t.Errorf("batch results mismatch:\ngot %swant %s", spew.Sdump(batch), spew.Sdump(wantResult))
+ }
+}
+
+func TestClientNotify(t *testing.T) {
+ server := newTestServer()
+ defer server.Stop()
+ client := DialInProc(server)
+ defer client.Close()
+
+ if err := client.Notify(context.Background(), "test_echo", "hello", 10, &Args{"world"}); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// func TestClientCancelInproc(t *testing.T) { testClientCancel("inproc", t) }
+func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) }
+func TestClientCancelHTTP(t *testing.T) { testClientCancel("http", t) }
+func TestClientCancelIPC(t *testing.T) { testClientCancel("ipc", t) }
+
+// This test checks that requests made through CallContext can be canceled by canceling
+// the context.
+func testClientCancel(transport string, t *testing.T) {
+ // These tests take a lot of time, run them all at once.
+ // You probably want to run with -parallel 1 or comment out
+ // the call to t.Parallel if you enable the logging.
+ t.Parallel()
+
+ server := newTestServer()
+ defer server.Stop()
+
+ // What we want to achieve is that the context gets canceled
+ // at various stages of request processing. The interesting cases
+ // are:
+ // - cancel during dial
+ // - cancel while performing a HTTP request
+ // - cancel while waiting for a response
+ //
+ // To trigger those, the times are chosen such that connections
+ // are killed within the deadline for every other call (maxKillTimeout
+ // is 2x maxCancelTimeout).
+ //
+ // Once a connection is dead, there is a fair chance it won't connect
+ // successfully because the accept is delayed by 1s.
+ maxContextCancelTimeout := 300 * time.Millisecond
+ fl := &flakeyListener{
+ maxAcceptDelay: 1 * time.Second,
+ maxKillTimeout: 600 * time.Millisecond,
+ }
+
+ var client *Client
+ switch transport {
+ case "ws", "http":
+ c, hs := httpTestClient(server, transport, fl)
+ defer hs.Close()
+ client = c
+ case "ipc":
+ c, l := ipcTestClient(server, fl)
+ defer l.Close()
+ client = c
+ default:
+ panic("unknown transport: " + transport)
+ }
+
+ // The actual test starts here.
+ var (
+ wg sync.WaitGroup
+ nreqs = 10
+ ncallers = 6
+ )
+ caller := func(index int) {
+ defer wg.Done()
+ for i := 0; i < nreqs; i++ {
+ var (
+ ctx context.Context
+ cancel func()
+ timeout = time.Duration(rand.Int63n(int64(maxContextCancelTimeout)))
+ )
+ if index < ncallers/2 {
+ // For half of the callers, create a context without deadline
+ // and cancel it later.
+ ctx, cancel = context.WithCancel(context.Background())
+ time.AfterFunc(timeout, cancel)
+ } else {
+ // For the other half, create a context with a deadline instead. This