aboutsummaryrefslogtreecommitdiff
path: root/rpc
diff options
context:
space:
mode:
authorTed Yin <[email protected]>2020-09-18 13:14:29 -0400
committerGitHub <[email protected]>2020-09-18 13:14:29 -0400
commitd048937c48753d9eaef771bf71820cf95d79df26 (patch)
tree1a7f65fcd72e77092525ab01625b8b9d365e3e40 /rpc
parent7d1388c743b4ec8f4a86bea95bfada785dee83f7 (diff)
parent7d8c85cf8895b0f998d8eafb02f99d5b689fcd59 (diff)
Merge pull request #34 from ava-labs/devv0.3.0-rc.5
Dev
Diffstat (limited to 'rpc')
-rw-r--r--rpc/client.go39
-rw-r--r--rpc/doc.go12
-rw-r--r--rpc/errors.go9
-rw-r--r--rpc/gzip.go66
-rw-r--r--rpc/handler.go42
-rw-r--r--rpc/http.go163
-rw-r--r--rpc/json.go87
-rw-r--r--rpc/metrics.go39
-rw-r--r--rpc/server.go14
-rw-r--r--rpc/service.go30
-rw-r--r--rpc/subscription.go29
-rw-r--r--rpc/types.go119
-rw-r--r--rpc/websocket.go105
13 files changed, 418 insertions, 336 deletions
diff --git a/rpc/client.go b/rpc/client.go
index 1c7058b..4f36a05 100644
--- a/rpc/client.go
+++ b/rpc/client.go
@@ -28,7 +28,7 @@ import (
"sync/atomic"
"time"
- "github.com/ava-labs/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/log"
)
var (
@@ -85,7 +85,7 @@ type Client struct {
// writeConn is used for writing to the connection on the caller's goroutine. It should
// only be accessed outside of dispatch, with the write lock held. The write lock is
- // taken by sending on requestOp and released by sending on sendDone.
+ // taken by sending on reqInit and released by sending on reqSent.
writeConn jsonWriter
// for dispatch
@@ -117,7 +117,7 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn {
func (cc *clientConn) close(err error, inflightReq *requestOp) {
cc.handler.close(err, inflightReq)
- cc.codec.Close()
+ cc.codec.close()
}
type readOp struct {
@@ -260,6 +260,19 @@ func (c *Client) Close() {
}
}
+// SetHeader adds a custom HTTP header to the client's requests.
+// This method only works for clients using HTTP, it doesn't have
+// any effect for clients using another transport.
+func (c *Client) SetHeader(key, value string) {
+ if !c.isHTTP {
+ return
+ }
+ conn := c.writeConn.(*httpConn)
+ conn.mu.Lock()
+ conn.headers.Set(key, value)
+ conn.mu.Unlock()
+}
+
// Call performs a JSON-RPC call with the given arguments and unmarshals into
// result if no error occurred.
//
@@ -276,6 +289,9 @@ func (c *Client) Call(result interface{}, method string, args ...interface{}) er
// The result must be a pointer so that package json can unmarshal into it. You
// can also pass nil, in which case the result is ignored.
func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
+ if result != nil && reflect.TypeOf(result).Kind() != reflect.Ptr {
+ return fmt.Errorf("call result parameter must be pointer or nil interface: %v", result)
+ }
msg, err := c.newMessage(method, args...)
if err != nil {
return err
@@ -465,7 +481,7 @@ func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMes
func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
select {
case c.reqInit <- op:
- err := c.write(ctx, msg)
+ err := c.write(ctx, msg, false)
c.reqSent <- err
return err
case <-ctx.Done():
@@ -477,16 +493,19 @@ func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error
}
}
-func (c *Client) write(ctx context.Context, msg interface{}) error {
+func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error {
// The previous write failed. Try to establish a new connection.
if c.writeConn == nil {
if err := c.reconnect(ctx); err != nil {
return err
}
}
- err := c.writeConn.Write(ctx, msg)
+ err := c.writeConn.writeJSON(ctx, msg)
if err != nil {
c.writeConn = nil
+ if !retry {
+ return c.write(ctx, msg, true)
+ }
}
return err
}
@@ -511,7 +530,7 @@ func (c *Client) reconnect(ctx context.Context) error {
c.writeConn = newconn
return nil
case <-c.didClose:
- newconn.Close()
+ newconn.close()
return ErrClientQuit
}
}
@@ -558,7 +577,7 @@ func (c *Client) dispatch(codec ServerCodec) {
// Reconnect:
case newcodec := <-c.reconnected:
- log.Debug("RPC client reconnected", "reading", reading, "conn", newcodec.RemoteAddr())
+ log.Debug("RPC client reconnected", "reading", reading, "conn", newcodec.remoteAddr())
if reading {
// Wait for the previous read loop to exit. This is a rare case which
// happens if this loop isn't notified in time after the connection breaks.
@@ -612,9 +631,9 @@ func (c *Client) drainRead() {
// read decodes RPC messages from a codec, feeding them into dispatch.
func (c *Client) read(codec ServerCodec) {
for {
- msgs, batch, err := codec.Read()
+ msgs, batch, err := codec.readBatch()
if _, ok := err.(*json.SyntaxError); ok {
- codec.Write(context.Background(), errorMessage(&parseError{err.Error()}))
+ codec.writeJSON(context.Background(), errorMessage(&parseError{err.Error()}))
}
if err != nil {
c.readErr <- err
diff --git a/rpc/doc.go b/rpc/doc.go
index e5840c3..e0a6324 100644
--- a/rpc/doc.go
+++ b/rpc/doc.go
@@ -29,8 +29,6 @@ Methods that satisfy the following criteria are made available for remote access
- method must be exported
- method returns 0, 1 (response or error) or 2 (response and error) values
- - method argument(s) must be exported or builtin types
- - method returned value(s) must be exported or builtin types
An example method:
@@ -73,14 +71,9 @@ An example server which uses the JSON codec:
calculator := new(CalculatorService)
server := NewServer()
- server.RegisterName("calculator", calculator")
-
+ server.RegisterName("calculator", calculator)
l, _ := net.ListenUnix("unix", &net.UnixAddr{Net: "unix", Name: "/tmp/calculator.sock"})
- for {
- c, _ := l.AcceptUnix()
- codec := v2.NewJSONCodec(c)
- go server.ServeCodec(codec, 0)
- }
+ server.ServeListener(l)
Subscriptions
@@ -90,7 +83,6 @@ criteria:
- method must be exported
- first method argument type must be context.Context
- - method argument(s) must be exported or builtin types
- method must have return types (rpc.Subscription, error)
An example method:
diff --git a/rpc/errors.go b/rpc/errors.go
index c3aa826..dbfde8b 100644
--- a/rpc/errors.go
+++ b/rpc/errors.go
@@ -18,6 +18,15 @@ package rpc
import "fmt"
+var (
+ _ Error = new(methodNotFoundError)
+ _ Error = new(subscriptionNotFoundError)
+ _ Error = new(parseError)
+ _ Error = new(invalidRequestError)
+ _ Error = new(invalidMessageError)
+ _ Error = new(invalidParamsError)
+)
+
const defaultErrorCode = -32000
type methodNotFoundError struct{ method string }
diff --git a/rpc/gzip.go b/rpc/gzip.go
deleted file mode 100644
index a14fd09..0000000
--- a/rpc/gzip.go
+++ /dev/null
@@ -1,66 +0,0 @@
-// Copyright 2019 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package rpc
-
-import (
- "compress/gzip"
- "io"
- "io/ioutil"
- "net/http"
- "strings"
- "sync"
-)
-
-var gzPool = sync.Pool{
- New: func() interface{} {
- w := gzip.NewWriter(ioutil.Discard)
- return w
- },
-}
-
-type gzipResponseWriter struct {
- io.Writer
- http.ResponseWriter
-}
-
-func (w *gzipResponseWriter) WriteHeader(status int) {
- w.Header().Del("Content-Length")
- w.ResponseWriter.WriteHeader(status)
-}
-
-func (w *gzipResponseWriter) Write(b []byte) (int, error) {
- return w.Writer.Write(b)
-}
-
-func newGzipHandler(next http.Handler) http.Handler {
- return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
- next.ServeHTTP(w, r)
- return
- }
-
- w.Header().Set("Content-Encoding", "gzip")
-
- gz := gzPool.Get().(*gzip.Writer)
- defer gzPool.Put(gz)
-
- gz.Reset(w)
- defer gz.Close()
-
- next.ServeHTTP(&gzipResponseWriter{ResponseWriter: w, Writer: gz}, r)
- })
-}
diff --git a/rpc/handler.go b/rpc/handler.go
index 187d0f8..23023ea 100644
--- a/rpc/handler.go
+++ b/rpc/handler.go
@@ -25,7 +25,7 @@ import (
"sync"
"time"
- "github.com/ava-labs/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/log"
)
// handler handles JSON-RPC messages. There is one handler per connection. Note that
@@ -85,8 +85,8 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *
serverSubs: make(map[ID]*Subscription),
log: log.Root(),
}
- if conn.RemoteAddr() != "" {
- h.log = h.log.New("conn", conn.RemoteAddr())
+ if conn.remoteAddr() != "" {
+ h.log = h.log.New("conn", conn.remoteAddr())
}
h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe))
return h
@@ -97,7 +97,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
// Emit error response for empty batches:
if len(msgs) == 0 {
h.startCallProc(func(cp *callProc) {
- h.conn.Write(cp.ctx, errorMessage(&invalidRequestError{"empty batch"}))
+ h.conn.writeJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"}))
})
return
}
@@ -122,7 +122,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
}
h.addSubscriptions(cp.notifiers)
if len(answers) > 0 {
- h.conn.Write(cp.ctx, answers)
+ h.conn.writeJSON(cp.ctx, answers)
}
for _, n := range cp.notifiers {
n.activate()
@@ -139,7 +139,7 @@ func (h *handler) handleMsg(msg *jsonrpcMessage) {
answer := h.handleCallMsg(cp, msg)
h.addSubscriptions(cp.notifiers)
if answer != nil {
- h.conn.Write(cp.ctx, answer)
+ h.conn.writeJSON(cp.ctx, answer)
}
for _, n := range cp.notifiers {
n.activate()
@@ -189,7 +189,7 @@ func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) {
}
for id, sub := range h.clientSubs {
delete(h.clientSubs, id)
- sub.quitWithError(err, false)
+ sub.quitWithError(false, err)
}
}
@@ -296,10 +296,16 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess
return nil
case msg.isCall():
resp := h.handleCall(ctx, msg)
+ var ctx []interface{}
+ ctx = append(ctx, "reqid", idForLog{msg.ID}, "t", time.Since(start))
if resp.Error != nil {
- h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message)
+ ctx = append(ctx, "err", resp.Error.Message)
+ if resp.Error.Data != nil {
+ ctx = append(ctx, "errdata", resp.Error.Data)
+ }
+ h.log.Warn("Served "+msg.Method, ctx...)
} else {
- h.log.Debug("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start))
+ h.log.Debug("Served "+msg.Method, ctx...)
}
return resp
case msg.hasValidID():
@@ -327,8 +333,22 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage
if err != nil {
return msg.errorResponse(&invalidParamsError{err.Error()})
}
-
- return h.runMethod(cp.ctx, msg, callb, args)
+ start := time.Now()
+ answer := h.runMethod(cp.ctx, msg, callb, args)
+
+ // Collect the statistics for RPC calls if metrics is enabled.
+ // We only care about pure rpc call. Filter out subscription.
+ if callb != h.unsubscribeCb {
+ rpcRequestGauge.Inc(1)
+ if answer.Error != nil {
+ failedReqeustGauge.Inc(1)
+ } else {
+ successfulRequestGauge.Inc(1)
+ }
+ rpcServingTimer.UpdateSince(start)
+ newRPCServingTimer(msg.Method, answer.Error == nil).UpdateSince(start)
+ }
+ return answer
}
// handleSubscribe processes *_subscribe method calls.
diff --git a/rpc/http.go b/rpc/http.go
index 2dffc5d..87a96e4 100644
--- a/rpc/http.go
+++ b/rpc/http.go
@@ -25,14 +25,10 @@ import (
"io"
"io/ioutil"
"mime"
- "net"
"net/http"
- "strings"
+ "net/url"
"sync"
"time"
-
- "github.com/ava-labs/go-ethereum/log"
- "github.com/rs/cors"
)
const (
@@ -45,31 +41,33 @@ var acceptedContentTypes = []string{contentType, "application/json-rpc", "applic
type httpConn struct {
client *http.Client
- req *http.Request
+ url string
closeOnce sync.Once
- closed chan interface{}
+ closeCh chan interface{}
+ mu sync.Mutex // protects headers
+ headers http.Header
}
// httpConn is treated specially by Client.
-func (hc *httpConn) Write(context.Context, interface{}) error {
- panic("Write called on httpConn")
+func (hc *httpConn) writeJSON(context.Context, interface{}) error {
+ panic("writeJSON called on httpConn")
}
-func (hc *httpConn) RemoteAddr() string {
- return hc.req.URL.String()
+func (hc *httpConn) remoteAddr() string {
+ return hc.url
}
-func (hc *httpConn) Read() ([]*jsonrpcMessage, bool, error) {
- <-hc.closed
+func (hc *httpConn) readBatch() ([]*jsonrpcMessage, bool, error) {
+ <-hc.closeCh
return nil, false, io.EOF
}
-func (hc *httpConn) Close() {
- hc.closeOnce.Do(func() { close(hc.closed) })
+func (hc *httpConn) close() {
+ hc.closeOnce.Do(func() { close(hc.closeCh) })
}
-func (hc *httpConn) Closed() <-chan interface{} {
- return hc.closed
+func (hc *httpConn) closed() <-chan interface{} {
+ return hc.closeCh
}
// HTTPTimeouts represents the configuration params for the HTTP RPC server.
@@ -107,16 +105,24 @@ var DefaultHTTPTimeouts = HTTPTimeouts{
// DialHTTPWithClient creates a new RPC client that connects to an RPC server over HTTP
// using the provided HTTP Client.
func DialHTTPWithClient(endpoint string, client *http.Client) (*Client, error) {
- req, err := http.NewRequest(http.MethodPost, endpoint, nil)
+ // Sanity check URL so we don't end up with a client that will fail every request.
+ _, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
- req.Header.Set("Content-Type", contentType)
- req.Header.Set("Accept", contentType)
initctx := context.Background()
+ headers := make(http.Header, 2)
+ headers.Set("accept", contentType)
+ headers.Set("content-type", contentType)
return newClient(initctx, func(context.Context) (ServerCodec, error) {
- return &httpConn{client: client, req: req, closed: make(chan interface{})}, nil
+ hc := &httpConn{
+ client: client,
+ headers: headers,
+ url: endpoint,
+ closeCh: make(chan interface{}),
+ }
+ return hc, nil
})
}
@@ -136,7 +142,7 @@ func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) e
if respBody != nil {
buf := new(bytes.Buffer)
if _, err2 := buf.ReadFrom(respBody); err2 == nil {
- return fmt.Errorf("%v %v", err, buf.String())
+ return fmt.Errorf("%v: %v", err, buf.String())
}
}
return err
@@ -171,10 +177,18 @@ func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadClos
if err != nil {
return nil, err
}
- req := hc.req.WithContext(ctx)
- req.Body = ioutil.NopCloser(bytes.NewReader(body))
+ req, err := http.NewRequestWithContext(ctx, "POST", hc.url, ioutil.NopCloser(bytes.NewReader(body)))
+ if err != nil {
+ return nil, err
+ }
req.ContentLength = int64(len(body))
+ // set headers
+ hc.mu.Lock()
+ req.Header = hc.headers.Clone()
+ hc.mu.Unlock()
+
+ // do request
resp, err := hc.client.Do(req)
if err != nil {
return nil, err
@@ -195,7 +209,7 @@ type httpServerConn struct {
func newHTTPServerConn(r *http.Request, w http.ResponseWriter) ServerCodec {
body := io.LimitReader(r.Body, maxRequestContentLength)
conn := &httpServerConn{Reader: body, Writer: w, r: r}
- return NewJSONCodec(conn)
+ return NewCodec(conn)
}
// Close does nothing and always returns nil.
@@ -209,49 +223,19 @@ func (t *httpServerConn) RemoteAddr() string {
// SetWriteDeadline does nothing and always returns nil.
func (t *httpServerConn) SetWriteDeadline(time.Time) error { return nil }
-// NewHTTPServer creates a new HTTP RPC server around an API provider.
-//
-// Deprecated: Server implements http.Handler
-func NewHTTPServer(cors []string, vhosts []string, timeouts HTTPTimeouts, srv http.Handler) *http.Server {
- // Wrap the CORS-handler within a host-handler
- handler := newCorsHandler(srv, cors)
- handler = newVHostHandler(vhosts, handler)
- handler = newGzipHandler(handler)
-
- // Make sure timeout values are meaningful
- if timeouts.ReadTimeout < time.Second {
- log.Warn("Sanitizing invalid HTTP read timeout", "provided", timeouts.ReadTimeout, "updated", DefaultHTTPTimeouts.ReadTimeout)
- timeouts.ReadTimeout = DefaultHTTPTimeouts.ReadTimeout
- }
- if timeouts.WriteTimeout < time.Second {
- log.Warn("Sanitizing invalid HTTP write timeout", "provided", timeouts.WriteTimeout, "updated", DefaultHTTPTimeouts.WriteTimeout)
- timeouts.WriteTimeout = DefaultHTTPTimeouts.WriteTimeout
- }
- if timeouts.IdleTimeout < time.Second {
- log.Warn("Sanitizing invalid HTTP idle timeout", "provided", timeouts.IdleTimeout, "updated", DefaultHTTPTimeouts.IdleTimeout)
- timeouts.IdleTimeout = DefaultHTTPTimeouts.IdleTimeout
- }
- // Bundle and start the HTTP server
- return &http.Server{
- Handler: handler,
- ReadTimeout: timeouts.ReadTimeout,
- WriteTimeout: timeouts.WriteTimeout,
- IdleTimeout: timeouts.IdleTimeout,
- }
-}
-
// ServeHTTP serves JSON-RPC requests over HTTP.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Permit dumb empty requests for remote health-checks (AWS)
if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" {
+ w.WriteHeader(http.StatusOK)
return
}
if code, err := validateRequest(r); err != nil {
http.Error(w, err.Error(), code)
return
}
- // All checks passed, create a codec that reads direct from the request body
- // untilEOF and writes the response to w and order the server to process a
+ // All checks passed, create a codec that reads directly from the request body
+ // until EOF, writes the response to w, and orders the server to process a
// single request.
ctx := r.Context()
ctx = context.WithValue(ctx, "remote", r.RemoteAddr)
@@ -266,7 +250,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("content-type", contentType)
codec := newHTTPServerConn(r, w)
- defer codec.Close()
+ defer codec.close()
s.serveSingleRequest(ctx, codec)
}
@@ -296,64 +280,3 @@ func validateRequest(r *http.Request) (int, error) {
err := fmt.Errorf("invalid content type, only %s is supported", contentType)
return http.StatusUnsupportedMediaType, err
}
-
-func newCorsHandler(srv http.Handler, allowedOrigins []string) http.Handler {
- // disable CORS support if user has not specified a custom CORS configuration
- if len(allowedOrigins) == 0 {
- return srv
- }
- c := cors.New(cors.Options{
- AllowedOrigins: allowedOrigins,
- AllowedMethods: []string{http.MethodPost, http.MethodGet},
- MaxAge: 600,
- AllowedHeaders: []string{"*"},
- })
- return c.Handler(srv)
-}
-
-// virtualHostHandler is a handler which validates the Host-header of incoming requests.
-// The virtualHostHandler can prevent DNS rebinding attacks, which do not utilize CORS-headers,
-// since they do in-domain requests against the RPC api. Instead, we can see on the Host-header
-// which domain was used, and validate that against a whitelist.
-type virtualHostHandler struct {
- vhosts map[string]struct{}
- next http.Handler
-}
-
-// ServeHTTP serves JSON-RPC requests over HTTP, implements http.Handler
-func (h *virtualHostHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- // if r.Host is not set, we can continue serving since a browser would set the Host header
- if r.Host == "" {
- h.next.ServeHTTP(w, r)
- return
- }
- host, _, err := net.SplitHostPort(r.Host)
- if err != nil {
- // Either invalid (too many colons) or no port specified
- host = r.Host
- }
- if ipAddr := net.ParseIP(host); ipAddr != nil {
- // It's an IP address, we can serve that
- h.next.ServeHTTP(w, r)
- return
-
- }
- // Not an ip address, but a hostname. Need to validate
- if _, exist := h.vhosts["*"]; exist {
- h.next.ServeHTTP(w, r)
- return
- }
- if _, exist := h.vhosts[host]; exist {
- h.next.ServeHTTP(w, r)
- return
- }
- http.Error(w, "invalid host specified", http.StatusForbidden)
-}
-
-func newVHostHandler(vhosts []string, next http.Handler) http.Handler {
- vhostMap := make(map[string]struct{})
- for _, allowedHost := range vhosts {
- vhostMap[strings.ToLower(allowedHost)] = struct{}{}
- }
- return &virtualHostHandler{vhostMap, next}
-}
diff --git a/rpc/json.go b/rpc/json.go
index 75c2210..1daee3d 100644
--- a/rpc/json.go
+++ b/rpc/json.go
@@ -115,6 +115,10 @@ func errorMessage(err error) *jsonrpcMessage {
if ok {
msg.Error.Code = ec.ErrorCode()
}
+ de, ok := err.(DataError)
+ if ok {
+ msg.Error.Data = de.ErrorData()
+ }
return msg
}
@@ -135,6 +139,10 @@ func (err *jsonError) ErrorCode() int {
return err.Code
}
+func (err *jsonError) ErrorData() interface{} {
+ return err.Data
+}
+
// Conn is a subset of the methods of net.Conn which are sufficient for ServerCodec.
type Conn interface {
io.ReadWriteCloser
@@ -153,66 +161,66 @@ type ConnRemoteAddr interface {
RemoteAddr() string
}
-// connWithRemoteAddr overrides the remote address of a connection.
-type connWithRemoteAddr struct {
- Conn
- addr string
-}
-
-func (c connWithRemoteAddr) RemoteAddr() string { return c.addr }
-
// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has
// support for parsing arguments and serializing (result) objects.
type jsonCodec struct {
- remoteAddr string
- closer sync.Once // close closed channel once
- closed chan interface{} // closed on Close
- decode func(v interface{}) error // decoder to allow multiple transports
- encMu sync.Mutex // guards the encoder
- encode func(v interface{}) error // encoder to allow multiple transports
- conn deadlineCloser
-}
-
-func newCodec(conn deadlineCloser, encode, decode func(v interface{}) error) ServerCodec {
+ remote string
+ closer sync.Once // close closed channel once
+ closeCh chan interface{} // closed on Close
+ decode func(v interface{}) error // decoder to allow multiple transports
+ encMu sync.Mutex // guards the encoder
+ encode func(v interface{}) error // encoder to allow multiple transports
+ conn deadlineCloser
+}
+
+// NewFuncCodec creates a codec which uses the given functions to read and write. If conn
+// implements ConnRemoteAddr, log messages will use it to include the remote address of
+// the connection.
+func NewFuncCodec(conn deadlineCloser, encode, decode func(v interface{}) error) ServerCodec {
codec := &jsonCodec{
- closed: make(chan interface{}),
- encode: encode,
- decode: decode,
- conn: conn,
+ closeCh: make(chan interface{}),
+ encode: encode,
+ decode: decode,
+ conn: conn,
}
if ra, ok := conn.(ConnRemoteAddr); ok {
- codec.remoteAddr = ra.RemoteAddr()
+ codec.remote = ra.RemoteAddr()
}
return codec
}
-// NewJSONCodec creates a codec that reads from the given connection. If conn implements
-// ConnRemoteAddr, log messages will use it to include the remote address of the
-// connection.
-func NewJSONCodec(conn Conn) ServerCodec {
+// NewCodec creates a codec on the given connection. If conn implements ConnRemoteAddr, log
+// messages will use it to include the remote address of the connection.
+func NewCodec(conn Conn) ServerCodec {
enc := json.NewEncoder(conn)
dec := json.NewDecoder(conn)
dec.UseNumber()
- return newCodec(conn, enc.Encode, dec.Decode)
+ return NewFuncCodec(conn, enc.Encode, dec.Decode)
}
-func (c *jsonCodec) RemoteAddr() string {
- return c.remoteAddr
+func (c *jsonCodec) remoteAddr() string {
+ return c.remote
}
-func (c *jsonCodec) Read() (msg []*jsonrpcMessage, batch bool, err error) {
+func (c *jsonCodec) readBatch() (messages []*jsonrpcMessage, batch bool, err error) {
// Decode the next JSON object in the input stream.
// This verifies basic syntax, etc.
var rawmsg json.RawMessage
if err := c.decode(&rawmsg); err != nil {
return nil, false, err
}
- msg, batch = parseMessage(rawmsg)
- return msg, batch, nil
+ messages, batch = parseMessage(rawmsg)
+ for i, msg := range messages {
+ if msg == nil {
+ // Message is JSON 'null'. Replace with zero value so it
+ // will be treated like any other invalid message.
+ messages[i] = new(jsonrpcMessage)
+ }
+ }
+ return messages, batch, nil
}
-// Write sends a message to client.
-func (c *jsonCodec) Write(ctx context.Context, v interface{}) error {
+func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}) error {
c.encMu.Lock()
defer c.encMu.Unlock()
@@ -224,17 +232,16 @@ func (c *jsonCodec) Write(ctx context.Context, v interface{}) error {
return c.encode(v)
}
-// Close the underlying connection
-func (c *jsonCodec) Close() {
+func (c *jsonCodec) close() {
c.closer.Do(func() {
- close(c.closed)
+ close(c.closeCh)
c.conn.Close()
})
}
// Closed returns a channel which will be closed when Close is called
-func (c *jsonCodec) Closed() <-chan interface{} {
- return c.closed
+func (c *jsonCodec) closed() <-chan interface{} {
+ return c.closeCh
}
// parseMessage parses raw bytes as a (batch of) JSON-RPC message(s). There are no error
diff --git a/rpc/metrics.go b/rpc/metrics.go
new file mode 100644
index 0000000..7fb6fc0
--- /dev/null
+++ b/rpc/metrics.go
@@ -0,0 +1,39 @@
+// Copyright 2020 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rpc
+
+import (
+ "fmt"
+
+ "github.com/ethereum/go-ethereum/metrics"
+)
+
+var (
+ rpcRequestGauge = metrics.NewRegisteredGauge("rpc/requests", nil)
+ successfulRequestGauge = metrics.NewRegisteredGauge("rpc/success", nil)
+ failedReqeustGauge = metrics.NewRegisteredGauge("rpc/failure", nil)
+ rpcServingTimer = metrics.NewRegisteredTimer("rpc/duration/all", nil)
+)
+
+func newRPCServingTimer(method string, valid bool) metrics.Timer {
+ flag := "success"
+ if !valid {
+ flag = "failure"
+ }
+ m := fmt.Sprintf("rpc/duration/%s/%s", method, flag)
+ return metrics.GetOrRegisterTimer(m, nil)
+}
diff --git a/rpc/server.go b/rpc/server.go
index bf5d93e..64e078a 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -22,7 +22,7 @@ import (
"sync/atomic"
mapset "github.com/deckarep/golang-set"
- "github.com/ava-labs/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/log"
)
const MetadataApi = "rpc"
@@ -36,7 +36,7 @@ const (
// OptionMethodInvocation is an indication that the codec supports RPC method calls
OptionMethodInvocation CodecOption = 1 << iota
- // OptionSubscriptions is an indication that the codec suports RPC notifications
+ // OptionSubscriptions is an indication that the codec supports RPC notifications
OptionSubscriptions = 1 << iota // support pub sub
)
@@ -72,7 +72,7 @@ func (s *Server) RegisterName(name string, receiver interface{}) error {
//
// Note that codec options are no longer supported.
func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
- defer codec.Close()
+ defer codec.close()
// Don't serve if server is stopped.
if atomic.LoadInt32(&s.run) == 0 {
@@ -84,7 +84,7 @@ func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
defer s.codecs.Remove(codec)
c := initClient(codec, s.idgen, &s.services)
- <-codec.Closed()
+ <-codec.closed()
c.Close()
}
@@ -101,10 +101,10 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
h.allowSubscribe = false
defer h.close(io.EOF, nil)
- reqs, batch, err := codec.Read()
+ reqs, batch, err := codec.readBatch()
if err != nil {
if err != io.EOF {
- codec.Write(ctx, errorMessage(&invalidMessageError{"parse error"}))
+ codec.writeJSON(ctx, errorMessage(&invalidMessageError{"parse error"}))
}
return
}
@@ -122,7 +122,7 @@ func (s *Server) Stop() {
if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
log.Debug("RPC server shutting down")
s.codecs.Each(func(c interface{}) bool {
- c.(ServerCodec).Close()
+ c.(ServerCodec).close()
return true
})
}
diff --git a/rpc/service.go b/rpc/service.go
index ead6fb6..bef891e 100644
--- a/rpc/service.go
+++ b/rpc/service.go
@@ -25,9 +25,8 @@ import (
"strings"
"sync"
"unicode"
- "unicode/utf8"
- "github.com/ava-labs/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/log"
)
var (
@@ -139,16 +138,14 @@ func newCallback(receiver, fn reflect.Value) *callback {
c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype)}
// Determine parameter types. They must all be exported or builtin types.
c.makeArgTypes()
- if !allExportedOrBuiltin(c.argTypes) {
- return nil
- }
+
// Verify return types. The function must return at most one error
// and/or one other non-error value.
outs := make([]reflect.Type, fntype.NumOut())
for i := 0; i < fntype.NumOut(); i++ {
outs[i] = fntype.Out(i)
}
- if len(outs) > 2 || !allExportedOrBuiltin(outs) {
+ if len(outs) > 2 {
return nil
}
// If an error is returned, it must be the last returned value.
@@ -218,27 +215,6 @@ func (c *callback) call(ctx context.Context, method string, args []reflect.Value
return results[0].Interface(), nil
}
-// Is this an exported - upper case - name?
-func isExported(name string) bool {
- rune, _ := utf8.DecodeRuneInString(name)
- return unicode.IsUpper(rune)
-}
-
-// Are all those types exported or built-in?
-func allExportedOrBuiltin(types []reflect.Type) bool {
- for _, typ := range types {
- for typ.Kind() == reflect.Ptr {
- typ = typ.Elem()
- }
- // PkgPath will be non-empty even for an exported type,
- // so we need to check the type name as well.
- if !isExported(typ.Name()) && typ.PkgPath() != "" {
- return false
- }
- }
- return true
-}
-
// Is t context.Context or *context.Context?
func isContextType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
diff --git a/rpc/subscription.go b/rpc/subscription.go
index c1e869b..233215d 100644
--- a/rpc/subscription.go
+++ b/rpc/subscription.go
@@ -17,7 +17,6 @@
package rpc
import (
- "bufio"
"container/list"
"context"
crand "crypto/rand"
@@ -51,10 +50,14 @@ func NewID() ID {
// randomIDGenerator returns a function generates a random IDs.
func randomIDGenerator() func() ID {
- seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader))
- if err != nil {
+ var buf = make([]byte, 8)
+ var seed int64
+ if _, err := crand.Read(buf); err == nil {
+ seed = int64(binary.BigEndian.Uint64(buf))
+ } else {
seed = int64(time.Now().Nanosecond())
}
+
var (
mu sync.Mutex
rng = rand.New(rand.NewSource(seed))
@@ -141,7 +144,7 @@ func (n *Notifier) Notify(id ID, data interface{}) error {
// Closed returns a channel that is closed when the RPC connection is closed.
// Deprecated: use subscription error channel
func (n *Notifier) Closed() <-chan interface{} {
- return n.h.conn.Closed()
+ return n.h.conn.closed()
}
// takeSubscription returns the subscription (if one has been created). No subscription can
@@ -153,7 +156,7 @@ func (n *Notifier) takeSubscription() *Subscription {
return n.sub
}
-// acticate is called after the subscription ID was sent to client. Notifications are
+// activate is called after the subscription ID was sent to client. Notifications are
// buffered before activation. This prevents notifications being sent to the client before
// the subscription ID is sent to the client.
func (n *Notifier) activate() error {
@@ -172,14 +175,14 @@ func (n *Notifier) activate() error {
func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data})
ctx := context.Background()
- return n.h.conn.Write(ctx, &jsonrpcMessage{
+ return n.h.conn.writeJSON(ctx, &jsonrpcMessage{
Version: vsn,
Method: n.namespace + notificationMethodSuffix,
Params: params,
})
}
-// A Subscription is created by a notifier and tight to that notifier. The client can use
+// A Subscription is created by a notifier and tied to that notifier. The client can use
// this subscription to wait for an unsubscribe request for the client, see Err().
type Subscription struct {
ID ID
@@ -241,11 +244,11 @@ func (sub *ClientSubscription) Err() <-chan error {
// Unsubscribe unsubscribes the notification and closes the error channel.
// It can safely be called more than once.
func (sub *ClientSubscription) Unsubscribe() {
- sub.quitWithError(nil, true)
+ sub.quitWithError(true, nil)
sub.errOnce.Do(func() { close(sub.err) })
}
-func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) {
+func (sub *ClientSubscription) quitWithError(unsubscribeServer bool, err error) {
sub.quitOnce.Do(func() {
// The dispatch loop won't be able to execute the unsubscribe call
// if it is blocked on deliver. Close sub.quit first because it
@@ -276,7 +279,7 @@ func (sub *ClientSubscription) start() {
sub.quitWithError(sub.forward())
}
-func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) {
+func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
cases := []reflect.SelectCase{
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
@@ -298,14 +301,14 @@ func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) {
switch chosen {
case 0: // <-sub.quit
- return nil, false
+ return false, nil
case 1: // <-sub.in
val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
if err != nil {
- return err, true
+ return true, err
}
if buffer.Len() == maxClientSubscriptionBuffer {
- return ErrSubscriptionQueueOverflow, true
+ return true, ErrSubscriptionQueueOverflow
}
buffer.PushBack(val)
case 2: // sub.channel<-
diff --git a/rpc/types.go b/rpc/types.go
index 703f4a7..99e29fc 100644
--- a/rpc/types.go
+++ b/rpc/types.go
@@ -18,11 +18,13 @@ package rpc
import (
"context"
+ "encoding/json"
"fmt"
"math"
"strings"
- "github.com/ava-labs/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/hexutil"
)
// API describes the set of methods offered over the RPC interface
@@ -39,23 +41,29 @@ type Error interface {
ErrorCode() int // returns the code
}
+// A DataError contains some data in addition to the error message.
+type DataError interface {
+ Error() string // returns the message
+ ErrorData() interface{} // returns the error data
+}
+
// ServerCodec implements reading, parsing and writing RPC messages for the server side of
// a RPC session. Implementations must be go-routine safe since the codec can be called in
// multiple go-routines concurrently.
type ServerCodec interface {
- Read() (msgs []*jsonrpcMessage, isBatch bool, err error)
- Close()
+ readBatch() (msgs []*jsonrpcMessage, isBatch bool, err error)
+ close()
jsonWriter
}
// jsonWriter can write JSON messages to its underlying connection.
// Implementations must be safe for concurrent use.
type jsonWriter interface {
- Write(context.Context, interface{}) error
+ writeJSON(context.Context, interface{}) error
// Closed returns a channel which is closed when the connection is closed.
- Closed() <-chan interface{}
+ closed() <-chan interface{}
// RemoteAddr returns the peer address of the connection.
- RemoteAddr() string
+ remoteAddr() string
}
type BlockNumber int64
@@ -84,7 +92,6 @@ func (bn *BlockNumber) UnmarshalJSON(data []byte) error {
*bn = EarliestBlockNumber
return nil
case "latest":
- //*bn = LatestBlockNumber
*bn = AcceptedBlockNumber
return nil
case "pending":
@@ -100,9 +107,8 @@ func (bn *BlockNumber) UnmarshalJSON(data []byte) error {
return err
}
if blckNum > math.MaxInt64 {
- return fmt.Errorf("Blocknumber too high")
+ return fmt.Errorf("block number larger than int64")
}
-
*bn = BlockNumber(blckNum)
return nil
}
@@ -110,3 +116,98 @@ func (bn *BlockNumber) UnmarshalJSON(data []byte) error {
func (bn BlockNumber) Int64() int64 {
return (int64)(bn)
}
+
+type BlockNumberOrHash struct {
+ BlockNumber *BlockNumber `json:"blockNumber,omitempty"`
+ BlockHash *common.Hash `json:"blockHash,omitempty"`
+ RequireCanonical bool `json:"requireCanonical,omitempty"`
+}
+
+func (bnh *BlockNumberOrHash) UnmarshalJSON(data []byte) error {
+ type erased BlockNumberOrHash
+ e := erased{}
+ err := json.Unmarshal(data, &e)
+ if err == nil {
+ if e.BlockNumber != nil && e.BlockHash != nil {
+ return fmt.Errorf("cannot specify both BlockHash and BlockNumber, choose one or the other")
+ }
+ bnh.BlockNumber = e.BlockNumber
+ bnh.BlockHash = e.BlockHash
+ bnh.RequireCanonical = e.RequireCanonical
+ return nil
+ }
+ var input string
+ err = json.Unmarshal(data, &input)
+ if err != nil {
+ return err
+ }
+ switch input {
+ case "earliest":
+ bn := EarliestBlockNumber
+ bnh.BlockNumber = &bn
+ return nil
+ case "latest":
+ bn := AcceptedBlockNumber
+ bnh.BlockNumber = &bn
+ return nil
+ case "pending":
+ bn := PendingBlockNumber
+ bnh.BlockNumber = &bn
+ return nil
+ case "accepted":
+ bn := AcceptedBlockNumber
+ bnh.BlockNumber = &bn
+ return nil
+ default:
+ if len(input) == 66 {
+ hash := common.Hash{}
+ err := hash.UnmarshalText([]byte(input))
+ if err != nil {
+ return err
+ }
+ bnh.BlockHash = &hash
+ return nil
+ } else {
+ blckNum, err := hexutil.DecodeUint64(input)
+ if err != nil {
+ return err
+ }
+ if blckNum > math.MaxInt64 {
+ return fmt.Errorf("blocknumber too high")
+ }
+ bn := BlockNumber(blckNum)
+ bnh.BlockNumber = &bn
+ return nil
+ }
+ }
+}
+
+func (bnh *BlockNumberOrHash) Number() (BlockNumber, bool) {
+ if bnh.BlockNumber != nil {
+ return *bnh.BlockNumber, true
+ }
+ return BlockNumber(0), false
+}
+
+func (bnh *BlockNumberOrHash) Hash() (common.Hash, bool) {
+ if bnh.BlockHash != nil {
+ return *bnh.BlockHash, true
+ }
+ return common.Hash{}, false
+}
+
+func BlockNumberOrHashWithNumber(blockNr BlockNumber) BlockNumberOrHash {
+ return BlockNumberOrHash{
+ BlockNumber: &blockNr,
+ BlockHash: nil,
+ RequireCanonical: false,
+ }
+}
+
+func BlockNumberOrHashWithHash(hash common.Hash, canonical bool) BlockNumberOrHash {
+ return BlockNumberOrHash{
+ BlockNumber: nil,
+ BlockHash: &hash,
+ RequireCanonical: canonical,
+ }
+}
diff --git a/rpc/websocket.go b/rpc/websocket.go
index d87e8a5..a716383 100644
--- a/rpc/websocket.go
+++ b/rpc/websocket.go
@@ -25,26 +25,22 @@ import (
"os"
"strings"
"sync"
+ "time"
mapset "github.com/deckarep/golang-set"
- "github.com/ava-labs/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/log"
"github.com/gorilla/websocket"
)
const (
- wsReadBuffer = 1024
- wsWriteBuffer = 1024
+ wsReadBuffer = 1024
+ wsWriteBuffer = 1024
+ wsPingInterval = 60 * time.Second
+ wsPingWriteTimeout = 5 * time.Second
)
var wsBufferPool = new(sync.Pool)
-// NewWSServer creates a new websocket RPC server around an API provider.
-//
-// Deprecated: use Server.WebsocketHandler
-func NewWSServer(allowedOrigins []string, srv *Server) *http.Server {
- return &http.Server{Handler: srv.WebsocketHandler(allowedOrigins)}
-}
-
// WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections.
//
// allowedOrigins should be a comma-separated list of allowed origin URLs.
@@ -63,7 +59,7 @@ func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler {
return
}
codec := newWebsocketCodec(conn)
- s.ServeCodec(codec, OptionMethodInvocation|OptionSubscriptions)
+ s.ServeCodec(codec, 0)
})
}
@@ -124,21 +120,13 @@ func (e wsHandshakeError) Error() string {
return s
}
-// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server
-// that is listening on the given endpoint.
-//
-// The context is used for the initial connection establishment. It does not
-// affect subsequent interactions with the client.
-func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) {
+// DialWebsocketWithDialer creates a new RPC client that communicates with a JSON-RPC server
+// that is listening on the given endpoint using the provided dialer.
+func DialWebsocketWithDialer(ctx context.Context, endpoint, origin string, dialer websocket.Dialer) (*Client, error) {
endpoint, header, err := wsClientHeaders(endpoint, origin)
if err != nil {
return nil, err
}
- dialer := websocket.Dialer{
- ReadBufferSize: wsReadBuffer,
- WriteBufferSize: wsWriteBuffer,
- WriteBufferPool: wsBufferPool,
- }
return newClient(ctx, func(ctx context.Context) (ServerCodec, error) {
conn, resp, err := dialer.DialContext(ctx, endpoint, header)
if err != nil {
@@ -152,6 +140,20 @@ func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error
})
}
+// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server
+// that is listening on the given endpoint.
+//
+// The context is used for the initial connection establishment. It does not
+// affect subsequent interactions with the client.
+func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) {
+ dialer := websocket.Dialer{
+ ReadBufferSize: wsReadBuffer,
+ WriteBufferSize: wsWriteBuffer,
+ WriteBufferPool: wsBufferPool,
+ }
+ return DialWebsocketWithDialer(ctx, endpoint, origin, dialer)
+}
+
func wsClientHeaders(endpoint, origin string) (string, http.Header, error) {
endpointURL, err := url.Parse(endpoint)
if err != nil {
@@ -169,7 +171,64 @@ func wsClientHeaders(endpoint, origin string) (string, http.Header, error) {
return endpointURL.String(), header, nil
}
+type websocketCodec struct {
+ *jsonCodec
+ conn *websocket.Conn
+
+ wg sync.WaitGroup
+ pingReset chan struct{}
+}
+
func newWebsocketCodec(conn *websocket.Conn) ServerCodec {
conn.SetReadLimit(maxRequestContentLength)
- return newCodec(conn, conn.WriteJSON, conn.ReadJSON)
+ wc := &websocketCodec{
+ jsonCodec: NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON).(*jsonCodec),
+ conn: conn,
+ pingReset: make(chan struct{}, 1),
+ }
+ wc.wg.Add(1)
+ go wc.pingLoop()
+ return wc
+}
+
+func (wc *websocketCodec) close() {
+ wc.jsonCodec.close()
+ wc.wg.Wait()
+}
+
+func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}) error {
+ err := wc.jsonCodec.writeJSON(ctx, v)
+ if err == nil {
+ // Notify pingLoop to delay the next idle ping.
+ select {
+ case wc.pingReset <- struct{}{}:
+ default:
+ }
+ }
+ return err
+}
+
+// pingLoop sends periodic ping frames when the connection is idle.
+func (wc *websocketCodec) pingLoop() {
+ var timer = time.NewTimer(wsPingInterval)
+ defer wc.wg.Done()
+ defer timer.Stop()
+
+ for {
+ select {
+ case <-wc.closed():
+ return
+ case <-wc.pingReset:
+ if !timer.Stop() {
+ <-timer.C
+ }
+ timer.Reset(wsPingInterval)
+ case <-timer.C:
+ wc.jsonCodec.encMu.Lock()
+ wc.conn.SetWriteDeadline(time.Now().Add(wsPingWriteTimeout))
+ wc.conn.WriteMessage(websocket.PingMessage, nil)
+ wc.jsonCodec.encMu.Unlock()
+ timer.Reset(wsPingInterval)
+ }
+ }
}