aboutsummaryrefslogtreecommitdiff
path: root/rpc/subscription.go
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/subscription.go')
-rw-r--r--rpc/subscription.go29
1 files changed, 16 insertions, 13 deletions
diff --git a/rpc/subscription.go b/rpc/subscription.go
index c1e869b..233215d 100644
--- a/rpc/subscription.go
+++ b/rpc/subscription.go
@@ -17,7 +17,6 @@
package rpc
import (
- "bufio"
"container/list"
"context"
crand "crypto/rand"
@@ -51,10 +50,14 @@ func NewID() ID {
// randomIDGenerator returns a function generates a random IDs.
func randomIDGenerator() func() ID {
- seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader))
- if err != nil {
+ var buf = make([]byte, 8)
+ var seed int64
+ if _, err := crand.Read(buf); err == nil {
+ seed = int64(binary.BigEndian.Uint64(buf))
+ } else {
seed = int64(time.Now().Nanosecond())
}
+
var (
mu sync.Mutex
rng = rand.New(rand.NewSource(seed))
@@ -141,7 +144,7 @@ func (n *Notifier) Notify(id ID, data interface{}) error {
// Closed returns a channel that is closed when the RPC connection is closed.
// Deprecated: use subscription error channel
func (n *Notifier) Closed() <-chan interface{} {
- return n.h.conn.Closed()
+ return n.h.conn.closed()
}
// takeSubscription returns the subscription (if one has been created). No subscription can
@@ -153,7 +156,7 @@ func (n *Notifier) takeSubscription() *Subscription {
return n.sub
}
-// acticate is called after the subscription ID was sent to client. Notifications are
+// activate is called after the subscription ID was sent to client. Notifications are
// buffered before activation. This prevents notifications being sent to the client before
// the subscription ID is sent to the client.
func (n *Notifier) activate() error {
@@ -172,14 +175,14 @@ func (n *Notifier) activate() error {
func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data})
ctx := context.Background()
- return n.h.conn.Write(ctx, &jsonrpcMessage{
+ return n.h.conn.writeJSON(ctx, &jsonrpcMessage{
Version: vsn,
Method: n.namespace + notificationMethodSuffix,
Params: params,
})
}
-// A Subscription is created by a notifier and tight to that notifier. The client can use
+// A Subscription is created by a notifier and tied to that notifier. The client can use
// this subscription to wait for an unsubscribe request for the client, see Err().
type Subscription struct {
ID ID
@@ -241,11 +244,11 @@ func (sub *ClientSubscription) Err() <-chan error {
// Unsubscribe unsubscribes the notification and closes the error channel.
// It can safely be called more than once.
func (sub *ClientSubscription) Unsubscribe() {
- sub.quitWithError(nil, true)
+ sub.quitWithError(true, nil)
sub.errOnce.Do(func() { close(sub.err) })
}
-func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) {
+func (sub *ClientSubscription) quitWithError(unsubscribeServer bool, err error) {
sub.quitOnce.Do(func() {
// The dispatch loop won't be able to execute the unsubscribe call
// if it is blocked on deliver. Close sub.quit first because it
@@ -276,7 +279,7 @@ func (sub *ClientSubscription) start() {
sub.quitWithError(sub.forward())
}
-func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) {
+func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
cases := []reflect.SelectCase{
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
@@ -298,14 +301,14 @@ func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) {
switch chosen {
case 0: // <-sub.quit
- return nil, false
+ return false, nil
case 1: // <-sub.in
val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
if err != nil {
- return err, true
+ return true, err
}
if buffer.Len() == maxClientSubscriptionBuffer {
- return ErrSubscriptionQueueOverflow, true
+ return true, ErrSubscriptionQueueOverflow
}
buffer.PushBack(val)
case 2: // sub.channel<-