From 78745551c077bf54151202138c2629f288769561 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 15 Sep 2020 23:55:34 -0400 Subject: WIP: geth-tavum --- rpc/websocket.go | 105 +++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 82 insertions(+), 23 deletions(-) (limited to 'rpc/websocket.go') diff --git a/rpc/websocket.go b/rpc/websocket.go index d87e8a5..a716383 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -25,26 +25,22 @@ import ( "os" "strings" "sync" + "time" mapset "github.com/deckarep/golang-set" - "github.com/ava-labs/go-ethereum/log" + "github.com/ethereum/go-ethereum/log" "github.com/gorilla/websocket" ) const ( - wsReadBuffer = 1024 - wsWriteBuffer = 1024 + wsReadBuffer = 1024 + wsWriteBuffer = 1024 + wsPingInterval = 60 * time.Second + wsPingWriteTimeout = 5 * time.Second ) var wsBufferPool = new(sync.Pool) -// NewWSServer creates a new websocket RPC server around an API provider. -// -// Deprecated: use Server.WebsocketHandler -func NewWSServer(allowedOrigins []string, srv *Server) *http.Server { - return &http.Server{Handler: srv.WebsocketHandler(allowedOrigins)} -} - // WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections. // // allowedOrigins should be a comma-separated list of allowed origin URLs. @@ -63,7 +59,7 @@ func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler { return } codec := newWebsocketCodec(conn) - s.ServeCodec(codec, OptionMethodInvocation|OptionSubscriptions) + s.ServeCodec(codec, 0) }) } @@ -124,21 +120,13 @@ func (e wsHandshakeError) Error() string { return s } -// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server -// that is listening on the given endpoint. -// -// The context is used for the initial connection establishment. It does not -// affect subsequent interactions with the client. -func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) { +// DialWebsocketWithDialer creates a new RPC client that communicates with a JSON-RPC server +// that is listening on the given endpoint using the provided dialer. +func DialWebsocketWithDialer(ctx context.Context, endpoint, origin string, dialer websocket.Dialer) (*Client, error) { endpoint, header, err := wsClientHeaders(endpoint, origin) if err != nil { return nil, err } - dialer := websocket.Dialer{ - ReadBufferSize: wsReadBuffer, - WriteBufferSize: wsWriteBuffer, - WriteBufferPool: wsBufferPool, - } return newClient(ctx, func(ctx context.Context) (ServerCodec, error) { conn, resp, err := dialer.DialContext(ctx, endpoint, header) if err != nil { @@ -152,6 +140,20 @@ func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error }) } +// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server +// that is listening on the given endpoint. +// +// The context is used for the initial connection establishment. It does not +// affect subsequent interactions with the client. +func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) { + dialer := websocket.Dialer{ + ReadBufferSize: wsReadBuffer, + WriteBufferSize: wsWriteBuffer, + WriteBufferPool: wsBufferPool, + } + return DialWebsocketWithDialer(ctx, endpoint, origin, dialer) +} + func wsClientHeaders(endpoint, origin string) (string, http.Header, error) { endpointURL, err := url.Parse(endpoint) if err != nil { @@ -169,7 +171,64 @@ func wsClientHeaders(endpoint, origin string) (string, http.Header, error) { return endpointURL.String(), header, nil } +type websocketCodec struct { + *jsonCodec + conn *websocket.Conn + + wg sync.WaitGroup + pingReset chan struct{} +} + func newWebsocketCodec(conn *websocket.Conn) ServerCodec { conn.SetReadLimit(maxRequestContentLength) - return newCodec(conn, conn.WriteJSON, conn.ReadJSON) + wc := &websocketCodec{ + jsonCodec: NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON).(*jsonCodec), + conn: conn, + pingReset: make(chan struct{}, 1), + } + wc.wg.Add(1) + go wc.pingLoop() + return wc +} + +func (wc *websocketCodec) close() { + wc.jsonCodec.close() + wc.wg.Wait() +} + +func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}) error { + err := wc.jsonCodec.writeJSON(ctx, v) + if err == nil { + // Notify pingLoop to delay the next idle ping. + select { + case wc.pingReset <- struct{}{}: + default: + } + } + return err +} + +// pingLoop sends periodic ping frames when the connection is idle. +func (wc *websocketCodec) pingLoop() { + var timer = time.NewTimer(wsPingInterval) + defer wc.wg.Done() + defer timer.Stop() + + for { + select { + case <-wc.closed(): + return + case <-wc.pingReset: + if !timer.Stop() { + <-timer.C + } + timer.Reset(wsPingInterval) + case <-timer.C: + wc.jsonCodec.encMu.Lock() + wc.conn.SetWriteDeadline(time.Now().Add(wsPingWriteTimeout)) + wc.conn.WriteMessage(websocket.PingMessage, nil) + wc.jsonCodec.encMu.Unlock() + timer.Reset(wsPingInterval) + } + } } -- cgit v1.2.3