aboutsummaryrefslogtreecommitdiff
path: root/rpc/handler.go
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2020-09-15 23:55:34 -0400
committerDeterminant <[email protected]>2020-09-15 23:55:34 -0400
commit78745551c077bf54151202138c2629f288769561 (patch)
tree2b628e99fd110617089778fa91235ecd2888f4ef /rpc/handler.go
parent7d1388c743b4ec8f4a86bea95bfada785dee83f7 (diff)
WIP: geth-tavum
Diffstat (limited to 'rpc/handler.go')
-rw-r--r--rpc/handler.go42
1 files changed, 31 insertions, 11 deletions
diff --git a/rpc/handler.go b/rpc/handler.go
index 187d0f8..23023ea 100644
--- a/rpc/handler.go
+++ b/rpc/handler.go
@@ -25,7 +25,7 @@ import (
"sync"
"time"
- "github.com/ava-labs/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/log"
)
// handler handles JSON-RPC messages. There is one handler per connection. Note that
@@ -85,8 +85,8 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *
serverSubs: make(map[ID]*Subscription),
log: log.Root(),
}
- if conn.RemoteAddr() != "" {
- h.log = h.log.New("conn", conn.RemoteAddr())
+ if conn.remoteAddr() != "" {
+ h.log = h.log.New("conn", conn.remoteAddr())
}
h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe))
return h
@@ -97,7 +97,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
// Emit error response for empty batches:
if len(msgs) == 0 {
h.startCallProc(func(cp *callProc) {
- h.conn.Write(cp.ctx, errorMessage(&invalidRequestError{"empty batch"}))
+ h.conn.writeJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"}))
})
return
}
@@ -122,7 +122,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
}
h.addSubscriptions(cp.notifiers)
if len(answers) > 0 {
- h.conn.Write(cp.ctx, answers)
+ h.conn.writeJSON(cp.ctx, answers)
}
for _, n := range cp.notifiers {
n.activate()
@@ -139,7 +139,7 @@ func (h *handler) handleMsg(msg *jsonrpcMessage) {
answer := h.handleCallMsg(cp, msg)
h.addSubscriptions(cp.notifiers)
if answer != nil {
- h.conn.Write(cp.ctx, answer)
+ h.conn.writeJSON(cp.ctx, answer)
}
for _, n := range cp.notifiers {
n.activate()
@@ -189,7 +189,7 @@ func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) {
}
for id, sub := range h.clientSubs {
delete(h.clientSubs, id)
- sub.quitWithError(err, false)
+ sub.quitWithError(false, err)
}
}
@@ -296,10 +296,16 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess
return nil
case msg.isCall():
resp := h.handleCall(ctx, msg)
+ var ctx []interface{}
+ ctx = append(ctx, "reqid", idForLog{msg.ID}, "t", time.Since(start))
if resp.Error != nil {
- h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message)
+ ctx = append(ctx, "err", resp.Error.Message)
+ if resp.Error.Data != nil {
+ ctx = append(ctx, "errdata", resp.Error.Data)
+ }
+ h.log.Warn("Served "+msg.Method, ctx...)
} else {
- h.log.Debug("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start))
+ h.log.Debug("Served "+msg.Method, ctx...)
}
return resp
case msg.hasValidID():
@@ -327,8 +333,22 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage
if err != nil {
return msg.errorResponse(&invalidParamsError{err.Error()})
}
-
- return h.runMethod(cp.ctx, msg, callb, args)
+ start := time.Now()
+ answer := h.runMethod(cp.ctx, msg, callb, args)
+
+ // Collect the statistics for RPC calls if metrics is enabled.
+ // We only care about pure rpc call. Filter out subscription.
+ if callb != h.unsubscribeCb {
+ rpcRequestGauge.Inc(1)
+ if answer.Error != nil {
+ failedReqeustGauge.Inc(1)
+ } else {
+ successfulRequestGauge.Inc(1)
+ }
+ rpcServingTimer.UpdateSince(start)
+ newRPCServingTimer(msg.Method, answer.Error == nil).UpdateSince(start)
+ }
+ return answer
}
// handleSubscribe processes *_subscribe method calls.