aboutsummaryrefslogtreecommitdiff
path: root/rpc/websocket.go
diff options
context:
space:
mode:
authorDeterminant <tederminant@gmail.com>2020-09-15 23:55:34 -0400
committerDeterminant <tederminant@gmail.com>2020-09-15 23:55:34 -0400
commit78745551c077bf54151202138c2629f288769561 (patch)
tree2b628e99fd110617089778fa91235ecd2888f4ef /rpc/websocket.go
parent7d1388c743b4ec8f4a86bea95bfada785dee83f7 (diff)
WIP: geth-tavum
Diffstat (limited to 'rpc/websocket.go')
-rw-r--r--rpc/websocket.go105
1 files changed, 82 insertions, 23 deletions
diff --git a/rpc/websocket.go b/rpc/websocket.go
index d87e8a5..a716383 100644
--- a/rpc/websocket.go
+++ b/rpc/websocket.go
@@ -25,26 +25,22 @@ import (
"os"
"strings"
"sync"
+ "time"
mapset "github.com/deckarep/golang-set"
- "github.com/ava-labs/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/log"
"github.com/gorilla/websocket"
)
const (
- wsReadBuffer = 1024
- wsWriteBuffer = 1024
+ wsReadBuffer = 1024
+ wsWriteBuffer = 1024
+ wsPingInterval = 60 * time.Second
+ wsPingWriteTimeout = 5 * time.Second
)
var wsBufferPool = new(sync.Pool)
-// NewWSServer creates a new websocket RPC server around an API provider.
-//
-// Deprecated: use Server.WebsocketHandler
-func NewWSServer(allowedOrigins []string, srv *Server) *http.Server {
- return &http.Server{Handler: srv.WebsocketHandler(allowedOrigins)}
-}
-
// WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections.
//
// allowedOrigins should be a comma-separated list of allowed origin URLs.
@@ -63,7 +59,7 @@ func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler {
return
}
codec := newWebsocketCodec(conn)
- s.ServeCodec(codec, OptionMethodInvocation|OptionSubscriptions)
+ s.ServeCodec(codec, 0)
})
}
@@ -124,21 +120,13 @@ func (e wsHandshakeError) Error() string {
return s
}
-// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server
-// that is listening on the given endpoint.
-//
-// The context is used for the initial connection establishment. It does not
-// affect subsequent interactions with the client.
-func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) {
+// DialWebsocketWithDialer creates a new RPC client that communicates with a JSON-RPC server
+// that is listening on the given endpoint using the provided dialer.
+func DialWebsocketWithDialer(ctx context.Context, endpoint, origin string, dialer websocket.Dialer) (*Client, error) {
endpoint, header, err := wsClientHeaders(endpoint, origin)
if err != nil {
return nil, err
}
- dialer := websocket.Dialer{
- ReadBufferSize: wsReadBuffer,
- WriteBufferSize: wsWriteBuffer,
- WriteBufferPool: wsBufferPool,
- }
return newClient(ctx, func(ctx context.Context) (ServerCodec, error) {
conn, resp, err := dialer.DialContext(ctx, endpoint, header)
if err != nil {
@@ -152,6 +140,20 @@ func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error
})
}
+// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server
+// that is listening on the given endpoint.
+//
+// The context is used for the initial connection establishment. It does not
+// affect subsequent interactions with the client.
+func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) {
+ dialer := websocket.Dialer{
+ ReadBufferSize: wsReadBuffer,
+ WriteBufferSize: wsWriteBuffer,
+ WriteBufferPool: wsBufferPool,
+ }
+ return DialWebsocketWithDialer(ctx, endpoint, origin, dialer)
+}
+
func wsClientHeaders(endpoint, origin string) (string, http.Header, error) {
endpointURL, err := url.Parse(endpoint)
if err != nil {
@@ -169,7 +171,64 @@ func wsClientHeaders(endpoint, origin string) (string, http.Header, error) {
return endpointURL.String(), header, nil
}
+type websocketCodec struct {
+ *jsonCodec
+ conn *websocket.Conn
+
+ wg sync.WaitGroup
+ pingReset chan struct{}
+}
+
func newWebsocketCodec(conn *websocket.Conn) ServerCodec {
conn.SetReadLimit(maxRequestContentLength)
- return newCodec(conn, conn.WriteJSON, conn.ReadJSON)
+ wc := &websocketCodec{
+ jsonCodec: NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON).(*jsonCodec),
+ conn: conn,
+ pingReset: make(chan struct{}, 1),
+ }
+ wc.wg.Add(1)
+ go wc.pingLoop()
+ return wc
+}
+
+func (wc *websocketCodec) close() {
+ wc.jsonCodec.close()
+ wc.wg.Wait()
+}
+
+func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}) error {
+ err := wc.jsonCodec.writeJSON(ctx, v)
+ if err == nil {
+ // Notify pingLoop to delay the next idle ping.
+ select {
+ case wc.pingReset <- struct{}{}:
+ default:
+ }
+ }
+ return err
+}
+
+// pingLoop sends periodic ping frames when the connection is idle.
+func (wc *websocketCodec) pingLoop() {
+ var timer = time.NewTimer(wsPingInterval)
+ defer wc.wg.Done()
+ defer timer.Stop()
+
+ for {
+ select {
+ case <-wc.closed():
+ return
+ case <-wc.pingReset:
+ if !timer.Stop() {
+ <-timer.C
+ }
+ timer.Reset(wsPingInterval)
+ case <-timer.C:
+ wc.jsonCodec.encMu.Lock()
+ wc.conn.SetWriteDeadline(time.Now().Add(wsPingWriteTimeout))
+ wc.conn.WriteMessage(websocket.PingMessage, nil)
+ wc.jsonCodec.encMu.Unlock()
+ timer.Reset(wsPingInterval)
+ }
+ }
}