aboutsummaryrefslogtreecommitdiff
path: root/node/node.go
diff options
context:
space:
mode:
Diffstat (limited to 'node/node.go')
-rw-r--r--node/node.go225
1 files changed, 137 insertions, 88 deletions
diff --git a/node/node.go b/node/node.go
index d2a212b..c6e4181 100644
--- a/node/node.go
+++ b/node/node.go
@@ -27,37 +27,40 @@ import (
"github.com/ava-labs/coreth/core/rawdb"
"github.com/ava-labs/coreth/internal/debug"
"github.com/ava-labs/coreth/rpc"
- "github.com/ava-labs/go-ethereum/ethdb"
- "github.com/ava-labs/go-ethereum/event"
- "github.com/ava-labs/go-ethereum/log"
- "github.com/ava-labs/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p"
"github.com/prometheus/tsdb/fileutil"
)
// Node is a container on which services can be registered.
type Node struct {
- eventmux *event.TypeMux // Event multiplexer used between the services of a stack
- config *Config
- accman *accounts.Manager
-
- ephemeralKeystore string // if non-empty, the key directory that will be removed by Stop
- instanceDirLock fileutil.Releaser // prevents concurrent use of instance directory
-
- serverConfig p2p.Config
- server *p2p.Server // Currently running P2P networking layer
-
- serviceFuncs []ServiceConstructor // Service constructors (in dependency order)
- services map[reflect.Type]Service // Currently running services
+ eventmux *event.TypeMux
+ config *Config
+ accman *accounts.Manager
+ log log.Logger
+ ephemKeystore string // if non-empty, the key directory that will be removed by Stop
+ dirLock fileutil.Releaser // prevents concurrent use of instance directory
+ stop chan struct{} // Channel to wait for termination notifications
+ server *p2p.Server // Currently running P2P networking layer
+ startStopLock sync.Mutex // Start/Stop are protected by an additional lock
+ state int // Tracks state of node lifecycle
+ lock sync.Mutex
+ lifecycles []Lifecycle // All registered backends, services, and auxiliary services that have a lifecycle
rpcAPIs []rpc.API // List of APIs currently provided by the node
inprocHandler *rpc.Server // In-process RPC request handler to process the API requests
- stop chan struct{} // Channel to wait for termination notifications
- lock sync.RWMutex
-
- log log.Logger
+ databases map[*closeTrackingDB]struct{} // All open databases
}
+const (
+ initializingState = iota
+ runningState
+ closedState
+)
+
// New creates a new P2P node, ready for protocol registration.
func New(conf *Config) (*Node, error) {
// Copy config and resolve the datadir so future changes to the current
@@ -71,6 +74,10 @@ func New(conf *Config) (*Node, error) {
}
conf.DataDir = absdatadir
}
+ if conf.Logger == nil {
+ conf.Logger = log.New()
+ }
+
// Ensure that the instance name doesn't cause weird conflicts with
// other files in the data directory.
if strings.ContainsAny(conf.Name, `/\`) {
@@ -82,25 +89,50 @@ func New(conf *Config) (*Node, error) {
if strings.HasSuffix(conf.Name, ".ipc") {
return nil, errors.New(`Config.Name cannot end in ".ipc"`)
}
- // Ensure that the AccountManager method works before the node has started.
- // We rely on this in cmd/geth.
+
+ node := &Node{
+ config: conf,
+ inprocHandler: rpc.NewServer(),
+ eventmux: new(event.TypeMux),
+ log: conf.Logger,
+ stop: make(chan struct{}),
+ server: &p2p.Server{Config: conf.P2P},
+ databases: make(map[*closeTrackingDB]struct{}),
+ }
+
+ // Register built-in APIs.
+ node.rpcAPIs = append(node.rpcAPIs, node.apis()...)
+
+ // Acquire the instance directory lock.
+ if err := node.openDataDir(); err != nil {
+ return nil, err
+ }
+ // Ensure that the AccountManager method works before the node has started. We rely on
+ // this in cmd/geth.
am, ephemeralKeystore, err := makeAccountManager(conf)
if err != nil {
return nil, err
}
- if conf.Logger == nil {
- conf.Logger = log.New()
+ node.accman = am
+ node.ephemKeystore = ephemeralKeystore
+
+ // Initialize the p2p server. This creates the node key and discovery databases.
+ node.server.Config.PrivateKey = node.config.NodeKey()
+ node.server.Config.Name = node.config.NodeName()
+ node.server.Config.Logger = node.log
+ if node.server.Config.StaticNodes == nil {
+ node.server.Config.StaticNodes = node.config.StaticNodes()
+ }
+ if node.server.Config.TrustedNodes == nil {
+ node.server.Config.TrustedNodes = node.config.TrustedNodes()
+ }
+ if node.server.Config.NodeDatabase == "" {
+ node.server.Config.NodeDatabase = node.config.NodeDB()
}
- // Note: any interaction with Config that would create/touch files
- // in the data directory or instance directory is delayed until Start.
- return &Node{
- accman: am,
- ephemeralKeystore: ephemeralKeystore,
- config: conf,
- serviceFuncs: []ServiceConstructor{},
- eventmux: new(event.TypeMux),
- log: conf.Logger,
- }, nil
+
+ // Configure RPC servers.
+
+ return node, nil
}
// Config returns the configuration of node.
@@ -109,33 +141,15 @@ func (n *Node) Config() *Config {
}
// Server retrieves the currently running P2P network layer. This method is meant
-// only to inspect fields of the currently running server, life cycle management
-// should be left to this Node entity.
+// only to inspect fields of the currently running server. Callers should not
+// start or stop the returned server.
func (n *Node) Server() *p2p.Server {
- n.lock.RLock()
- defer n.lock.RUnlock()
+ n.lock.Lock()
+ defer n.lock.Unlock()
return n.server
}
-// Service retrieves a currently running service registered of a specific type.
-func (n *Node) Service(service interface{}) error {
- n.lock.RLock()
- defer n.lock.RUnlock()
-
- // Short circuit if the node's not running
- if n.server == nil {
- return ErrNodeStopped
- }
- // Otherwise try to find the service to return
- element := reflect.ValueOf(service).Elem()
- if running, ok := n.services[element.Type()]; ok {
- element.Set(reflect.ValueOf(running))
- return nil
- }
- return ErrServiceUnknown
-}
-
// DataDir retrieves the current datadir used by the protocol stack.
// Deprecated: No files should be stored in this directory, use InstanceDir instead.
func (n *Node) DataDir() string {
@@ -162,10 +176,24 @@ func (n *Node) EventMux() *event.TypeMux {
// previous can be found) from within the node's instance directory. If the node is
// ephemeral, a memory database is returned.
func (n *Node) OpenDatabase(name string, cache, handles int, namespace string) (ethdb.Database, error) {
+ n.lock.Lock()
+ defer n.lock.Unlock()
+ if n.state == closedState {
+ return nil, ErrNodeStopped
+ }
+
+ var db ethdb.Database
+ var err error
if n.config.DataDir == "" {
- return rawdb.NewMemoryDatabase(), nil
+ db = rawdb.NewMemoryDatabase()
+ } else {
+ db, err = rawdb.NewLevelDBDatabase(n.ResolvePath(name), cache, handles, namespace)
+ }
+
+ if err == nil {
+ db = n.wrapDatabase(db)
}
- return rawdb.NewLevelDBDatabase(n.config.ResolvePath(name), cache, handles, namespace)
+ return db, err
}
// OpenDatabaseWithFreezer opens an existing database with the given name (or
@@ -174,18 +202,31 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string) (
// database to immutable append-only files. If the node is an ephemeral one, a
// memory database is returned.
func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, freezer, namespace string) (ethdb.Database, error) {
+ n.lock.Lock()
+ defer n.lock.Unlock()
+ if n.state == closedState {
+ return nil, ErrNodeStopped
+ }
+
+ var db ethdb.Database
+ var err error
if n.config.DataDir == "" {
- return rawdb.NewMemoryDatabase(), nil
+ db = rawdb.NewMemoryDatabase()
+ } else {
+ root := n.ResolvePath(name)
+ switch {
+ case freezer == "":
+ freezer = filepath.Join(root, "ancient")
+ case !filepath.IsAbs(freezer):
+ freezer = n.ResolvePath(freezer)
+ }
+ db, err = rawdb.NewLevelDBDatabaseWithFreezer(root, cache, handles, freezer, namespace)
}
- root := n.config.ResolvePath(name)
- switch {
- case freezer == "":
- freezer = filepath.Join(root, "ancient")
- case !filepath.IsAbs(freezer):
- freezer = n.config.ResolvePath(freezer)
+ if err == nil {
+ db = n.wrapDatabase(db)
}
- return rawdb.NewLevelDBDatabaseWithFreezer(root, cache, handles, freezer, namespace)
+ return db, err
}
// ResolvePath returns the absolute path of a resource in the instance directory.
@@ -193,27 +234,35 @@ func (n *Node) ResolvePath(x string) string {
return n.config.ResolvePath(x)
}
-// apis returns the collection of RPC descriptors this node offers.
-func (n *Node) apis() []rpc.API {
- return []rpc.API{
- {
- Namespace: "admin",
- Version: "1.0",
- Service: NewPrivateAdminAPI(n),
- }, {
- Namespace: "admin",
- Version: "1.0",
- Service: NewPublicAdminAPI(n),
- Public: true,
- }, {
- Namespace: "debug",
- Version: "1.0",
- Service: debug.Handler,
- }, {
- Namespace: "web3",
- Version: "1.0",
- Service: NewPublicWeb3API(n),
- Public: true,
- },
+// closeTrackingDB wraps the Close method of a database. When the database is closed by the
+// service, the wrapper removes it from the node's database map. This ensures that Node
+// won't auto-close the database if it is closed by the service that opened it.
+type closeTrackingDB struct {
+ ethdb.Database
+ n *Node
+}
+
+func (db *closeTrackingDB) Close() error {
+ db.n.lock.Lock()
+ delete(db.n.databases, db)
+ db.n.lock.Unlock()
+ return db.Database.Close()
+}
+
+// wrapDatabase ensures the database will be auto-closed when Node is closed.
+func (n *Node) wrapDatabase(db ethdb.Database) ethdb.Database {
+ wrapper := &closeTrackingDB{db, n}
+ n.databases[wrapper] = struct{}{}
+ return wrapper
+}
+
+// closeDatabases closes all open databases.
+func (n *Node) closeDatabases() (errors []error) {
+ for db := range n.databases {
+ delete(n.databases, db)
+ if err := db.Database.Close(); err != nil {
+ errors = append(errors, err)
+ }
}
+ return errors
}