diff options
Diffstat (limited to 'rpc/handler.go')
-rw-r--r-- | rpc/handler.go | 42 |
1 files changed, 31 insertions, 11 deletions
diff --git a/rpc/handler.go b/rpc/handler.go index 187d0f8..23023ea 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -25,7 +25,7 @@ import ( "sync" "time" - "github.com/ava-labs/go-ethereum/log" + "github.com/ethereum/go-ethereum/log" ) // handler handles JSON-RPC messages. There is one handler per connection. Note that @@ -85,8 +85,8 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg * serverSubs: make(map[ID]*Subscription), log: log.Root(), } - if conn.RemoteAddr() != "" { - h.log = h.log.New("conn", conn.RemoteAddr()) + if conn.remoteAddr() != "" { + h.log = h.log.New("conn", conn.remoteAddr()) } h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe)) return h @@ -97,7 +97,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { // Emit error response for empty batches: if len(msgs) == 0 { h.startCallProc(func(cp *callProc) { - h.conn.Write(cp.ctx, errorMessage(&invalidRequestError{"empty batch"})) + h.conn.writeJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"})) }) return } @@ -122,7 +122,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { } h.addSubscriptions(cp.notifiers) if len(answers) > 0 { - h.conn.Write(cp.ctx, answers) + h.conn.writeJSON(cp.ctx, answers) } for _, n := range cp.notifiers { n.activate() @@ -139,7 +139,7 @@ func (h *handler) handleMsg(msg *jsonrpcMessage) { answer := h.handleCallMsg(cp, msg) h.addSubscriptions(cp.notifiers) if answer != nil { - h.conn.Write(cp.ctx, answer) + h.conn.writeJSON(cp.ctx, answer) } for _, n := range cp.notifiers { n.activate() @@ -189,7 +189,7 @@ func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) { } for id, sub := range h.clientSubs { delete(h.clientSubs, id) - sub.quitWithError(err, false) + sub.quitWithError(false, err) } } @@ -296,10 +296,16 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess return nil case msg.isCall(): resp := h.handleCall(ctx, msg) + var ctx []interface{} + ctx = append(ctx, "reqid", idForLog{msg.ID}, "t", time.Since(start)) if resp.Error != nil { - h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message) + ctx = append(ctx, "err", resp.Error.Message) + if resp.Error.Data != nil { + ctx = append(ctx, "errdata", resp.Error.Data) + } + h.log.Warn("Served "+msg.Method, ctx...) } else { - h.log.Debug("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start)) + h.log.Debug("Served "+msg.Method, ctx...) } return resp case msg.hasValidID(): @@ -327,8 +333,22 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage if err != nil { return msg.errorResponse(&invalidParamsError{err.Error()}) } - - return h.runMethod(cp.ctx, msg, callb, args) + start := time.Now() + answer := h.runMethod(cp.ctx, msg, callb, args) + + // Collect the statistics for RPC calls if metrics is enabled. + // We only care about pure rpc call. Filter out subscription. + if callb != h.unsubscribeCb { + rpcRequestGauge.Inc(1) + if answer.Error != nil { + failedReqeustGauge.Inc(1) + } else { + successfulRequestGauge.Inc(1) + } + rpcServingTimer.UpdateSince(start) + newRPCServingTimer(msg.Method, answer.Error == nil).UpdateSince(start) + } + return answer } // handleSubscribe processes *_subscribe method calls. |