From 78745551c077bf54151202138c2629f288769561 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 15 Sep 2020 23:55:34 -0400 Subject: WIP: geth-tavum --- rpc/doc.go | 12 +--- rpc/errors.go | 9 +++ rpc/gzip.go | 66 --------------------- rpc/handler.go | 42 ++++++++++---- rpc/http.go | 163 ++++++++++++++-------------------------------------- rpc/json.go | 87 +++++++++++++++------------- rpc/server.go | 14 ++--- rpc/service.go | 30 +--------- rpc/subscription.go | 29 +++++----- rpc/websocket.go | 105 +++++++++++++++++++++++++-------- 10 files changed, 240 insertions(+), 317 deletions(-) delete mode 100644 rpc/gzip.go (limited to 'rpc') diff --git a/rpc/doc.go b/rpc/doc.go index e5840c3..e0a6324 100644 --- a/rpc/doc.go +++ b/rpc/doc.go @@ -29,8 +29,6 @@ Methods that satisfy the following criteria are made available for remote access - method must be exported - method returns 0, 1 (response or error) or 2 (response and error) values - - method argument(s) must be exported or builtin types - - method returned value(s) must be exported or builtin types An example method: @@ -73,14 +71,9 @@ An example server which uses the JSON codec: calculator := new(CalculatorService) server := NewServer() - server.RegisterName("calculator", calculator") - + server.RegisterName("calculator", calculator) l, _ := net.ListenUnix("unix", &net.UnixAddr{Net: "unix", Name: "/tmp/calculator.sock"}) - for { - c, _ := l.AcceptUnix() - codec := v2.NewJSONCodec(c) - go server.ServeCodec(codec, 0) - } + server.ServeListener(l) Subscriptions @@ -90,7 +83,6 @@ criteria: - method must be exported - first method argument type must be context.Context - - method argument(s) must be exported or builtin types - method must have return types (rpc.Subscription, error) An example method: diff --git a/rpc/errors.go b/rpc/errors.go index c3aa826..dbfde8b 100644 --- a/rpc/errors.go +++ b/rpc/errors.go @@ -18,6 +18,15 @@ package rpc import "fmt" +var ( + _ Error = new(methodNotFoundError) + _ Error = new(subscriptionNotFoundError) + _ Error = new(parseError) + _ Error = new(invalidRequestError) + _ Error = new(invalidMessageError) + _ Error = new(invalidParamsError) +) + const defaultErrorCode = -32000 type methodNotFoundError struct{ method string } diff --git a/rpc/gzip.go b/rpc/gzip.go deleted file mode 100644 index a14fd09..0000000 --- a/rpc/gzip.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package rpc - -import ( - "compress/gzip" - "io" - "io/ioutil" - "net/http" - "strings" - "sync" -) - -var gzPool = sync.Pool{ - New: func() interface{} { - w := gzip.NewWriter(ioutil.Discard) - return w - }, -} - -type gzipResponseWriter struct { - io.Writer - http.ResponseWriter -} - -func (w *gzipResponseWriter) WriteHeader(status int) { - w.Header().Del("Content-Length") - w.ResponseWriter.WriteHeader(status) -} - -func (w *gzipResponseWriter) Write(b []byte) (int, error) { - return w.Writer.Write(b) -} - -func newGzipHandler(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { - next.ServeHTTP(w, r) - return - } - - w.Header().Set("Content-Encoding", "gzip") - - gz := gzPool.Get().(*gzip.Writer) - defer gzPool.Put(gz) - - gz.Reset(w) - defer gz.Close() - - next.ServeHTTP(&gzipResponseWriter{ResponseWriter: w, Writer: gz}, r) - }) -} diff --git a/rpc/handler.go b/rpc/handler.go index 187d0f8..23023ea 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -25,7 +25,7 @@ import ( "sync" "time" - "github.com/ava-labs/go-ethereum/log" + "github.com/ethereum/go-ethereum/log" ) // handler handles JSON-RPC messages. There is one handler per connection. Note that @@ -85,8 +85,8 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg * serverSubs: make(map[ID]*Subscription), log: log.Root(), } - if conn.RemoteAddr() != "" { - h.log = h.log.New("conn", conn.RemoteAddr()) + if conn.remoteAddr() != "" { + h.log = h.log.New("conn", conn.remoteAddr()) } h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe)) return h @@ -97,7 +97,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { // Emit error response for empty batches: if len(msgs) == 0 { h.startCallProc(func(cp *callProc) { - h.conn.Write(cp.ctx, errorMessage(&invalidRequestError{"empty batch"})) + h.conn.writeJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"})) }) return } @@ -122,7 +122,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { } h.addSubscriptions(cp.notifiers) if len(answers) > 0 { - h.conn.Write(cp.ctx, answers) + h.conn.writeJSON(cp.ctx, answers) } for _, n := range cp.notifiers { n.activate() @@ -139,7 +139,7 @@ func (h *handler) handleMsg(msg *jsonrpcMessage) { answer := h.handleCallMsg(cp, msg) h.addSubscriptions(cp.notifiers) if answer != nil { - h.conn.Write(cp.ctx, answer) + h.conn.writeJSON(cp.ctx, answer) } for _, n := range cp.notifiers { n.activate() @@ -189,7 +189,7 @@ func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) { } for id, sub := range h.clientSubs { delete(h.clientSubs, id) - sub.quitWithError(err, false) + sub.quitWithError(false, err) } } @@ -296,10 +296,16 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess return nil case msg.isCall(): resp := h.handleCall(ctx, msg) + var ctx []interface{} + ctx = append(ctx, "reqid", idForLog{msg.ID}, "t", time.Since(start)) if resp.Error != nil { - h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message) + ctx = append(ctx, "err", resp.Error.Message) + if resp.Error.Data != nil { + ctx = append(ctx, "errdata", resp.Error.Data) + } + h.log.Warn("Served "+msg.Method, ctx...) } else { - h.log.Debug("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start)) + h.log.Debug("Served "+msg.Method, ctx...) } return resp case msg.hasValidID(): @@ -327,8 +333,22 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage if err != nil { return msg.errorResponse(&invalidParamsError{err.Error()}) } - - return h.runMethod(cp.ctx, msg, callb, args) + start := time.Now() + answer := h.runMethod(cp.ctx, msg, callb, args) + + // Collect the statistics for RPC calls if metrics is enabled. + // We only care about pure rpc call. Filter out subscription. + if callb != h.unsubscribeCb { + rpcRequestGauge.Inc(1) + if answer.Error != nil { + failedReqeustGauge.Inc(1) + } else { + successfulRequestGauge.Inc(1) + } + rpcServingTimer.UpdateSince(start) + newRPCServingTimer(msg.Method, answer.Error == nil).UpdateSince(start) + } + return answer } // handleSubscribe processes *_subscribe method calls. diff --git a/rpc/http.go b/rpc/http.go index 2dffc5d..87a96e4 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -25,14 +25,10 @@ import ( "io" "io/ioutil" "mime" - "net" "net/http" - "strings" + "net/url" "sync" "time" - - "github.com/ava-labs/go-ethereum/log" - "github.com/rs/cors" ) const ( @@ -45,31 +41,33 @@ var acceptedContentTypes = []string{contentType, "application/json-rpc", "applic type httpConn struct { client *http.Client - req *http.Request + url string closeOnce sync.Once - closed chan interface{} + closeCh chan interface{} + mu sync.Mutex // protects headers + headers http.Header } // httpConn is treated specially by Client. -func (hc *httpConn) Write(context.Context, interface{}) error { - panic("Write called on httpConn") +func (hc *httpConn) writeJSON(context.Context, interface{}) error { + panic("writeJSON called on httpConn") } -func (hc *httpConn) RemoteAddr() string { - return hc.req.URL.String() +func (hc *httpConn) remoteAddr() string { + return hc.url } -func (hc *httpConn) Read() ([]*jsonrpcMessage, bool, error) { - <-hc.closed +func (hc *httpConn) readBatch() ([]*jsonrpcMessage, bool, error) { + <-hc.closeCh return nil, false, io.EOF } -func (hc *httpConn) Close() { - hc.closeOnce.Do(func() { close(hc.closed) }) +func (hc *httpConn) close() { + hc.closeOnce.Do(func() { close(hc.closeCh) }) } -func (hc *httpConn) Closed() <-chan interface{} { - return hc.closed +func (hc *httpConn) closed() <-chan interface{} { + return hc.closeCh } // HTTPTimeouts represents the configuration params for the HTTP RPC server. @@ -107,16 +105,24 @@ var DefaultHTTPTimeouts = HTTPTimeouts{ // DialHTTPWithClient creates a new RPC client that connects to an RPC server over HTTP // using the provided HTTP Client. func DialHTTPWithClient(endpoint string, client *http.Client) (*Client, error) { - req, err := http.NewRequest(http.MethodPost, endpoint, nil) + // Sanity check URL so we don't end up with a client that will fail every request. + _, err := url.Parse(endpoint) if err != nil { return nil, err } - req.Header.Set("Content-Type", contentType) - req.Header.Set("Accept", contentType) initctx := context.Background() + headers := make(http.Header, 2) + headers.Set("accept", contentType) + headers.Set("content-type", contentType) return newClient(initctx, func(context.Context) (ServerCodec, error) { - return &httpConn{client: client, req: req, closed: make(chan interface{})}, nil + hc := &httpConn{ + client: client, + headers: headers, + url: endpoint, + closeCh: make(chan interface{}), + } + return hc, nil }) } @@ -136,7 +142,7 @@ func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) e if respBody != nil { buf := new(bytes.Buffer) if _, err2 := buf.ReadFrom(respBody); err2 == nil { - return fmt.Errorf("%v %v", err, buf.String()) + return fmt.Errorf("%v: %v", err, buf.String()) } } return err @@ -171,10 +177,18 @@ func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadClos if err != nil { return nil, err } - req := hc.req.WithContext(ctx) - req.Body = ioutil.NopCloser(bytes.NewReader(body)) + req, err := http.NewRequestWithContext(ctx, "POST", hc.url, ioutil.NopCloser(bytes.NewReader(body))) + if err != nil { + return nil, err + } req.ContentLength = int64(len(body)) + // set headers + hc.mu.Lock() + req.Header = hc.headers.Clone() + hc.mu.Unlock() + + // do request resp, err := hc.client.Do(req) if err != nil { return nil, err @@ -195,7 +209,7 @@ type httpServerConn struct { func newHTTPServerConn(r *http.Request, w http.ResponseWriter) ServerCodec { body := io.LimitReader(r.Body, maxRequestContentLength) conn := &httpServerConn{Reader: body, Writer: w, r: r} - return NewJSONCodec(conn) + return NewCodec(conn) } // Close does nothing and always returns nil. @@ -209,49 +223,19 @@ func (t *httpServerConn) RemoteAddr() string { // SetWriteDeadline does nothing and always returns nil. func (t *httpServerConn) SetWriteDeadline(time.Time) error { return nil } -// NewHTTPServer creates a new HTTP RPC server around an API provider. -// -// Deprecated: Server implements http.Handler -func NewHTTPServer(cors []string, vhosts []string, timeouts HTTPTimeouts, srv http.Handler) *http.Server { - // Wrap the CORS-handler within a host-handler - handler := newCorsHandler(srv, cors) - handler = newVHostHandler(vhosts, handler) - handler = newGzipHandler(handler) - - // Make sure timeout values are meaningful - if timeouts.ReadTimeout < time.Second { - log.Warn("Sanitizing invalid HTTP read timeout", "provided", timeouts.ReadTimeout, "updated", DefaultHTTPTimeouts.ReadTimeout) - timeouts.ReadTimeout = DefaultHTTPTimeouts.ReadTimeout - } - if timeouts.WriteTimeout < time.Second { - log.Warn("Sanitizing invalid HTTP write timeout", "provided", timeouts.WriteTimeout, "updated", DefaultHTTPTimeouts.WriteTimeout) - timeouts.WriteTimeout = DefaultHTTPTimeouts.WriteTimeout - } - if timeouts.IdleTimeout < time.Second { - log.Warn("Sanitizing invalid HTTP idle timeout", "provided", timeouts.IdleTimeout, "updated", DefaultHTTPTimeouts.IdleTimeout) - timeouts.IdleTimeout = DefaultHTTPTimeouts.IdleTimeout - } - // Bundle and start the HTTP server - return &http.Server{ - Handler: handler, - ReadTimeout: timeouts.ReadTimeout, - WriteTimeout: timeouts.WriteTimeout, - IdleTimeout: timeouts.IdleTimeout, - } -} - // ServeHTTP serves JSON-RPC requests over HTTP. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Permit dumb empty requests for remote health-checks (AWS) if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" { + w.WriteHeader(http.StatusOK) return } if code, err := validateRequest(r); err != nil { http.Error(w, err.Error(), code) return } - // All checks passed, create a codec that reads direct from the request body - // untilEOF and writes the response to w and order the server to process a + // All checks passed, create a codec that reads directly from the request body + // until EOF, writes the response to w, and orders the server to process a // single request. ctx := r.Context() ctx = context.WithValue(ctx, "remote", r.RemoteAddr) @@ -266,7 +250,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("content-type", contentType) codec := newHTTPServerConn(r, w) - defer codec.Close() + defer codec.close() s.serveSingleRequest(ctx, codec) } @@ -296,64 +280,3 @@ func validateRequest(r *http.Request) (int, error) { err := fmt.Errorf("invalid content type, only %s is supported", contentType) return http.StatusUnsupportedMediaType, err } - -func newCorsHandler(srv http.Handler, allowedOrigins []string) http.Handler { - // disable CORS support if user has not specified a custom CORS configuration - if len(allowedOrigins) == 0 { - return srv - } - c := cors.New(cors.Options{ - AllowedOrigins: allowedOrigins, - AllowedMethods: []string{http.MethodPost, http.MethodGet}, - MaxAge: 600, - AllowedHeaders: []string{"*"}, - }) - return c.Handler(srv) -} - -// virtualHostHandler is a handler which validates the Host-header of incoming requests. -// The virtualHostHandler can prevent DNS rebinding attacks, which do not utilize CORS-headers, -// since they do in-domain requests against the RPC api. Instead, we can see on the Host-header -// which domain was used, and validate that against a whitelist. -type virtualHostHandler struct { - vhosts map[string]struct{} - next http.Handler -} - -// ServeHTTP serves JSON-RPC requests over HTTP, implements http.Handler -func (h *virtualHostHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // if r.Host is not set, we can continue serving since a browser would set the Host header - if r.Host == "" { - h.next.ServeHTTP(w, r) - return - } - host, _, err := net.SplitHostPort(r.Host) - if err != nil { - // Either invalid (too many colons) or no port specified - host = r.Host - } - if ipAddr := net.ParseIP(host); ipAddr != nil { - // It's an IP address, we can serve that - h.next.ServeHTTP(w, r) - return - - } - // Not an ip address, but a hostname. Need to validate - if _, exist := h.vhosts["*"]; exist { - h.next.ServeHTTP(w, r) - return - } - if _, exist := h.vhosts[host]; exist { - h.next.ServeHTTP(w, r) - return - } - http.Error(w, "invalid host specified", http.StatusForbidden) -} - -func newVHostHandler(vhosts []string, next http.Handler) http.Handler { - vhostMap := make(map[string]struct{}) - for _, allowedHost := range vhosts { - vhostMap[strings.ToLower(allowedHost)] = struct{}{} - } - return &virtualHostHandler{vhostMap, next} -} diff --git a/rpc/json.go b/rpc/json.go index 75c2210..1daee3d 100644 --- a/rpc/json.go +++ b/rpc/json.go @@ -115,6 +115,10 @@ func errorMessage(err error) *jsonrpcMessage { if ok { msg.Error.Code = ec.ErrorCode() } + de, ok := err.(DataError) + if ok { + msg.Error.Data = de.ErrorData() + } return msg } @@ -135,6 +139,10 @@ func (err *jsonError) ErrorCode() int { return err.Code } +func (err *jsonError) ErrorData() interface{} { + return err.Data +} + // Conn is a subset of the methods of net.Conn which are sufficient for ServerCodec. type Conn interface { io.ReadWriteCloser @@ -153,66 +161,66 @@ type ConnRemoteAddr interface { RemoteAddr() string } -// connWithRemoteAddr overrides the remote address of a connection. -type connWithRemoteAddr struct { - Conn - addr string -} - -func (c connWithRemoteAddr) RemoteAddr() string { return c.addr } - // jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has // support for parsing arguments and serializing (result) objects. type jsonCodec struct { - remoteAddr string - closer sync.Once // close closed channel once - closed chan interface{} // closed on Close - decode func(v interface{}) error // decoder to allow multiple transports - encMu sync.Mutex // guards the encoder - encode func(v interface{}) error // encoder to allow multiple transports - conn deadlineCloser -} - -func newCodec(conn deadlineCloser, encode, decode func(v interface{}) error) ServerCodec { + remote string + closer sync.Once // close closed channel once + closeCh chan interface{} // closed on Close + decode func(v interface{}) error // decoder to allow multiple transports + encMu sync.Mutex // guards the encoder + encode func(v interface{}) error // encoder to allow multiple transports + conn deadlineCloser +} + +// NewFuncCodec creates a codec which uses the given functions to read and write. If conn +// implements ConnRemoteAddr, log messages will use it to include the remote address of +// the connection. +func NewFuncCodec(conn deadlineCloser, encode, decode func(v interface{}) error) ServerCodec { codec := &jsonCodec{ - closed: make(chan interface{}), - encode: encode, - decode: decode, - conn: conn, + closeCh: make(chan interface{}), + encode: encode, + decode: decode, + conn: conn, } if ra, ok := conn.(ConnRemoteAddr); ok { - codec.remoteAddr = ra.RemoteAddr() + codec.remote = ra.RemoteAddr() } return codec } -// NewJSONCodec creates a codec that reads from the given connection. If conn implements -// ConnRemoteAddr, log messages will use it to include the remote address of the -// connection. -func NewJSONCodec(conn Conn) ServerCodec { +// NewCodec creates a codec on the given connection. If conn implements ConnRemoteAddr, log +// messages will use it to include the remote address of the connection. +func NewCodec(conn Conn) ServerCodec { enc := json.NewEncoder(conn) dec := json.NewDecoder(conn) dec.UseNumber() - return newCodec(conn, enc.Encode, dec.Decode) + return NewFuncCodec(conn, enc.Encode, dec.Decode) } -func (c *jsonCodec) RemoteAddr() string { - return c.remoteAddr +func (c *jsonCodec) remoteAddr() string { + return c.remote } -func (c *jsonCodec) Read() (msg []*jsonrpcMessage, batch bool, err error) { +func (c *jsonCodec) readBatch() (messages []*jsonrpcMessage, batch bool, err error) { // Decode the next JSON object in the input stream. // This verifies basic syntax, etc. var rawmsg json.RawMessage if err := c.decode(&rawmsg); err != nil { return nil, false, err } - msg, batch = parseMessage(rawmsg) - return msg, batch, nil + messages, batch = parseMessage(rawmsg) + for i, msg := range messages { + if msg == nil { + // Message is JSON 'null'. Replace with zero value so it + // will be treated like any other invalid message. + messages[i] = new(jsonrpcMessage) + } + } + return messages, batch, nil } -// Write sends a message to client. -func (c *jsonCodec) Write(ctx context.Context, v interface{}) error { +func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}) error { c.encMu.Lock() defer c.encMu.Unlock() @@ -224,17 +232,16 @@ func (c *jsonCodec) Write(ctx context.Context, v interface{}) error { return c.encode(v) } -// Close the underlying connection -func (c *jsonCodec) Close() { +func (c *jsonCodec) close() { c.closer.Do(func() { - close(c.closed) + close(c.closeCh) c.conn.Close() }) } // Closed returns a channel which will be closed when Close is called -func (c *jsonCodec) Closed() <-chan interface{} { - return c.closed +func (c *jsonCodec) closed() <-chan interface{} { + return c.closeCh } // parseMessage parses raw bytes as a (batch of) JSON-RPC message(s). There are no error diff --git a/rpc/server.go b/rpc/server.go index bf5d93e..64e078a 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -22,7 +22,7 @@ import ( "sync/atomic" mapset "github.com/deckarep/golang-set" - "github.com/ava-labs/go-ethereum/log" + "github.com/ethereum/go-ethereum/log" ) const MetadataApi = "rpc" @@ -36,7 +36,7 @@ const ( // OptionMethodInvocation is an indication that the codec supports RPC method calls OptionMethodInvocation CodecOption = 1 << iota - // OptionSubscriptions is an indication that the codec suports RPC notifications + // OptionSubscriptions is an indication that the codec supports RPC notifications OptionSubscriptions = 1 << iota // support pub sub ) @@ -72,7 +72,7 @@ func (s *Server) RegisterName(name string, receiver interface{}) error { // // Note that codec options are no longer supported. func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) { - defer codec.Close() + defer codec.close() // Don't serve if server is stopped. if atomic.LoadInt32(&s.run) == 0 { @@ -84,7 +84,7 @@ func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) { defer s.codecs.Remove(codec) c := initClient(codec, s.idgen, &s.services) - <-codec.Closed() + <-codec.closed() c.Close() } @@ -101,10 +101,10 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { h.allowSubscribe = false defer h.close(io.EOF, nil) - reqs, batch, err := codec.Read() + reqs, batch, err := codec.readBatch() if err != nil { if err != io.EOF { - codec.Write(ctx, errorMessage(&invalidMessageError{"parse error"})) + codec.writeJSON(ctx, errorMessage(&invalidMessageError{"parse error"})) } return } @@ -122,7 +122,7 @@ func (s *Server) Stop() { if atomic.CompareAndSwapInt32(&s.run, 1, 0) { log.Debug("RPC server shutting down") s.codecs.Each(func(c interface{}) bool { - c.(ServerCodec).Close() + c.(ServerCodec).close() return true }) } diff --git a/rpc/service.go b/rpc/service.go index ead6fb6..bef891e 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -25,9 +25,8 @@ import ( "strings" "sync" "unicode" - "unicode/utf8" - "github.com/ava-labs/go-ethereum/log" + "github.com/ethereum/go-ethereum/log" ) var ( @@ -139,16 +138,14 @@ func newCallback(receiver, fn reflect.Value) *callback { c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype)} // Determine parameter types. They must all be exported or builtin types. c.makeArgTypes() - if !allExportedOrBuiltin(c.argTypes) { - return nil - } + // Verify return types. The function must return at most one error // and/or one other non-error value. outs := make([]reflect.Type, fntype.NumOut()) for i := 0; i < fntype.NumOut(); i++ { outs[i] = fntype.Out(i) } - if len(outs) > 2 || !allExportedOrBuiltin(outs) { + if len(outs) > 2 { return nil } // If an error is returned, it must be the last returned value. @@ -218,27 +215,6 @@ func (c *callback) call(ctx context.Context, method string, args []reflect.Value return results[0].Interface(), nil } -// Is this an exported - upper case - name? -func isExported(name string) bool { - rune, _ := utf8.DecodeRuneInString(name) - return unicode.IsUpper(rune) -} - -// Are all those types exported or built-in? -func allExportedOrBuiltin(types []reflect.Type) bool { - for _, typ := range types { - for typ.Kind() == reflect.Ptr { - typ = typ.Elem() - } - // PkgPath will be non-empty even for an exported type, - // so we need to check the type name as well. - if !isExported(typ.Name()) && typ.PkgPath() != "" { - return false - } - } - return true -} - // Is t context.Context or *context.Context? func isContextType(t reflect.Type) bool { for t.Kind() == reflect.Ptr { diff --git a/rpc/subscription.go b/rpc/subscription.go index c1e869b..233215d 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -17,7 +17,6 @@ package rpc import ( - "bufio" "container/list" "context" crand "crypto/rand" @@ -51,10 +50,14 @@ func NewID() ID { // randomIDGenerator returns a function generates a random IDs. func randomIDGenerator() func() ID { - seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader)) - if err != nil { + var buf = make([]byte, 8) + var seed int64 + if _, err := crand.Read(buf); err == nil { + seed = int64(binary.BigEndian.Uint64(buf)) + } else { seed = int64(time.Now().Nanosecond()) } + var ( mu sync.Mutex rng = rand.New(rand.NewSource(seed)) @@ -141,7 +144,7 @@ func (n *Notifier) Notify(id ID, data interface{}) error { // Closed returns a channel that is closed when the RPC connection is closed. // Deprecated: use subscription error channel func (n *Notifier) Closed() <-chan interface{} { - return n.h.conn.Closed() + return n.h.conn.closed() } // takeSubscription returns the subscription (if one has been created). No subscription can @@ -153,7 +156,7 @@ func (n *Notifier) takeSubscription() *Subscription { return n.sub } -// acticate is called after the subscription ID was sent to client. Notifications are +// activate is called after the subscription ID was sent to client. Notifications are // buffered before activation. This prevents notifications being sent to the client before // the subscription ID is sent to the client. func (n *Notifier) activate() error { @@ -172,14 +175,14 @@ func (n *Notifier) activate() error { func (n *Notifier) send(sub *Subscription, data json.RawMessage) error { params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data}) ctx := context.Background() - return n.h.conn.Write(ctx, &jsonrpcMessage{ + return n.h.conn.writeJSON(ctx, &jsonrpcMessage{ Version: vsn, Method: n.namespace + notificationMethodSuffix, Params: params, }) } -// A Subscription is created by a notifier and tight to that notifier. The client can use +// A Subscription is created by a notifier and tied to that notifier. The client can use // this subscription to wait for an unsubscribe request for the client, see Err(). type Subscription struct { ID ID @@ -241,11 +244,11 @@ func (sub *ClientSubscription) Err() <-chan error { // Unsubscribe unsubscribes the notification and closes the error channel. // It can safely be called more than once. func (sub *ClientSubscription) Unsubscribe() { - sub.quitWithError(nil, true) + sub.quitWithError(true, nil) sub.errOnce.Do(func() { close(sub.err) }) } -func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) { +func (sub *ClientSubscription) quitWithError(unsubscribeServer bool, err error) { sub.quitOnce.Do(func() { // The dispatch loop won't be able to execute the unsubscribe call // if it is blocked on deliver. Close sub.quit first because it @@ -276,7 +279,7 @@ func (sub *ClientSubscription) start() { sub.quitWithError(sub.forward()) } -func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) { +func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) { cases := []reflect.SelectCase{ {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)}, {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)}, @@ -298,14 +301,14 @@ func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) { switch chosen { case 0: // <-sub.quit - return nil, false + return false, nil case 1: // <-sub.in val, err := sub.unmarshal(recv.Interface().(json.RawMessage)) if err != nil { - return err, true + return true, err } if buffer.Len() == maxClientSubscriptionBuffer { - return ErrSubscriptionQueueOverflow, true + return true, ErrSubscriptionQueueOverflow } buffer.PushBack(val) case 2: // sub.channel<- diff --git a/rpc/websocket.go b/rpc/websocket.go index d87e8a5..a716383 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -25,26 +25,22 @@ import ( "os" "strings" "sync" + "time" mapset "github.com/deckarep/golang-set" - "github.com/ava-labs/go-ethereum/log" + "github.com/ethereum/go-ethereum/log" "github.com/gorilla/websocket" ) const ( - wsReadBuffer = 1024 - wsWriteBuffer = 1024 + wsReadBuffer = 1024 + wsWriteBuffer = 1024 + wsPingInterval = 60 * time.Second + wsPingWriteTimeout = 5 * time.Second ) var wsBufferPool = new(sync.Pool) -// NewWSServer creates a new websocket RPC server around an API provider. -// -// Deprecated: use Server.WebsocketHandler -func NewWSServer(allowedOrigins []string, srv *Server) *http.Server { - return &http.Server{Handler: srv.WebsocketHandler(allowedOrigins)} -} - // WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections. // // allowedOrigins should be a comma-separated list of allowed origin URLs. @@ -63,7 +59,7 @@ func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler { return } codec := newWebsocketCodec(conn) - s.ServeCodec(codec, OptionMethodInvocation|OptionSubscriptions) + s.ServeCodec(codec, 0) }) } @@ -124,21 +120,13 @@ func (e wsHandshakeError) Error() string { return s } -// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server -// that is listening on the given endpoint. -// -// The context is used for the initial connection establishment. It does not -// affect subsequent interactions with the client. -func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) { +// DialWebsocketWithDialer creates a new RPC client that communicates with a JSON-RPC server +// that is listening on the given endpoint using the provided dialer. +func DialWebsocketWithDialer(ctx context.Context, endpoint, origin string, dialer websocket.Dialer) (*Client, error) { endpoint, header, err := wsClientHeaders(endpoint, origin) if err != nil { return nil, err } - dialer := websocket.Dialer{ - ReadBufferSize: wsReadBuffer, - WriteBufferSize: wsWriteBuffer, - WriteBufferPool: wsBufferPool, - } return newClient(ctx, func(ctx context.Context) (ServerCodec, error) { conn, resp, err := dialer.DialContext(ctx, endpoint, header) if err != nil { @@ -152,6 +140,20 @@ func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error }) } +// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server +// that is listening on the given endpoint. +// +// The context is used for the initial connection establishment. It does not +// affect subsequent interactions with the client. +func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) { + dialer := websocket.Dialer{ + ReadBufferSize: wsReadBuffer, + WriteBufferSize: wsWriteBuffer, + WriteBufferPool: wsBufferPool, + } + return DialWebsocketWithDialer(ctx, endpoint, origin, dialer) +} + func wsClientHeaders(endpoint, origin string) (string, http.Header, error) { endpointURL, err := url.Parse(endpoint) if err != nil { @@ -169,7 +171,64 @@ func wsClientHeaders(endpoint, origin string) (string, http.Header, error) { return endpointURL.String(), header, nil } +type websocketCodec struct { + *jsonCodec + conn *websocket.Conn + + wg sync.WaitGroup + pingReset chan struct{} +} + func newWebsocketCodec(conn *websocket.Conn) ServerCodec { conn.SetReadLimit(maxRequestContentLength) - return newCodec(conn, conn.WriteJSON, conn.ReadJSON) + wc := &websocketCodec{ + jsonCodec: NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON).(*jsonCodec), + conn: conn, + pingReset: make(chan struct{}, 1), + } + wc.wg.Add(1) + go wc.pingLoop() + return wc +} + +func (wc *websocketCodec) close() { + wc.jsonCodec.close() + wc.wg.Wait() +} + +func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}) error { + err := wc.jsonCodec.writeJSON(ctx, v) + if err == nil { + // Notify pingLoop to delay the next idle ping. + select { + case wc.pingReset <- struct{}{}: + default: + } + } + return err +} + +// pingLoop sends periodic ping frames when the connection is idle. +func (wc *websocketCodec) pingLoop() { + var timer = time.NewTimer(wsPingInterval) + defer wc.wg.Done() + defer timer.Stop() + + for { + select { + case <-wc.closed(): + return + case <-wc.pingReset: + if !timer.Stop() { + <-timer.C + } + timer.Reset(wsPingInterval) + case <-timer.C: + wc.jsonCodec.encMu.Lock() + wc.conn.SetWriteDeadline(time.Now().Add(wsPingWriteTimeout)) + wc.conn.WriteMessage(websocket.PingMessage, nil) + wc.jsonCodec.encMu.Unlock() + timer.Reset(wsPingInterval) + } + } } -- cgit v1.2.3-70-g09d2