aboutsummaryrefslogtreecommitdiff
path: root/rpc/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/server.go')
-rw-r--r--rpc/server.go147
1 files changed, 147 insertions, 0 deletions
diff --git a/rpc/server.go b/rpc/server.go
new file mode 100644
index 0000000..bf5d93e
--- /dev/null
+++ b/rpc/server.go
@@ -0,0 +1,147 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rpc
+
+import (
+ "context"
+ "io"
+ "sync/atomic"
+
+ mapset "github.com/deckarep/golang-set"
+ "github.com/ava-labs/go-ethereum/log"
+)
+
+const MetadataApi = "rpc"
+
+// CodecOption specifies which type of messages a codec supports.
+//
+// Deprecated: this option is no longer honored by Server.
+type CodecOption int
+
+const (
+ // OptionMethodInvocation is an indication that the codec supports RPC method calls
+ OptionMethodInvocation CodecOption = 1 << iota
+
+ // OptionSubscriptions is an indication that the codec suports RPC notifications
+ OptionSubscriptions = 1 << iota // support pub sub
+)
+
+// Server is an RPC server.
+type Server struct {
+ services serviceRegistry
+ idgen func() ID
+ run int32
+ codecs mapset.Set
+}
+
+// NewServer creates a new server instance with no registered handlers.
+func NewServer() *Server {
+ server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1}
+ // Register the default service providing meta information about the RPC service such
+ // as the services and methods it offers.
+ rpcService := &RPCService{server}
+ server.RegisterName(MetadataApi, rpcService)
+ return server
+}
+
+// RegisterName creates a service for the given receiver type under the given name. When no
+// methods on the given receiver match the criteria to be either a RPC method or a
+// subscription an error is returned. Otherwise a new service is created and added to the
+// service collection this server provides to clients.
+func (s *Server) RegisterName(name string, receiver interface{}) error {
+ return s.services.registerName(name, receiver)
+}
+
+// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
+// the response back using the given codec. It will block until the codec is closed or the
+// server is stopped. In either case the codec is closed.
+//
+// Note that codec options are no longer supported.
+func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
+ defer codec.Close()
+
+ // Don't serve if server is stopped.
+ if atomic.LoadInt32(&s.run) == 0 {
+ return
+ }
+
+ // Add the codec to the set so it can be closed by Stop.
+ s.codecs.Add(codec)
+ defer s.codecs.Remove(codec)
+
+ c := initClient(codec, s.idgen, &s.services)
+ <-codec.Closed()
+ c.Close()
+}
+
+// serveSingleRequest reads and processes a single RPC request from the given codec. This
+// is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in
+// this mode.
+func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
+ // Don't serve if server is stopped.
+ if atomic.LoadInt32(&s.run) == 0 {
+ return
+ }
+
+ h := newHandler(ctx, codec, s.idgen, &s.services)
+ h.allowSubscribe = false
+ defer h.close(io.EOF, nil)
+
+ reqs, batch, err := codec.Read()
+ if err != nil {
+ if err != io.EOF {
+ codec.Write(ctx, errorMessage(&invalidMessageError{"parse error"}))
+ }
+ return
+ }
+ if batch {
+ h.handleBatch(reqs)
+ } else {
+ h.handleMsg(reqs[0])
+ }
+}
+
+// Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending
+// requests to finish, then closes all codecs which will cancel pending requests and
+// subscriptions.
+func (s *Server) Stop() {
+ if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
+ log.Debug("RPC server shutting down")
+ s.codecs.Each(func(c interface{}) bool {
+ c.(ServerCodec).Close()
+ return true
+ })
+ }
+}
+
+// RPCService gives meta information about the server.
+// e.g. gives information about the loaded modules.
+type RPCService struct {
+ server *Server
+}
+
+// Modules returns the list of RPC services with their version number
+func (s *RPCService) Modules() map[string]string {
+ s.server.services.mu.Lock()
+ defer s.server.services.mu.Unlock()
+
+ modules := make(map[string]string)
+ for name := range s.server.services.services {
+ modules[name] = "1.0"
+ }
+ return modules
+}