From 78745551c077bf54151202138c2629f288769561 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 15 Sep 2020 23:55:34 -0400 Subject: WIP: geth-tavum --- rpc/subscription.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) (limited to 'rpc/subscription.go') diff --git a/rpc/subscription.go b/rpc/subscription.go index c1e869b..233215d 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -17,7 +17,6 @@ package rpc import ( - "bufio" "container/list" "context" crand "crypto/rand" @@ -51,10 +50,14 @@ func NewID() ID { // randomIDGenerator returns a function generates a random IDs. func randomIDGenerator() func() ID { - seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader)) - if err != nil { + var buf = make([]byte, 8) + var seed int64 + if _, err := crand.Read(buf); err == nil { + seed = int64(binary.BigEndian.Uint64(buf)) + } else { seed = int64(time.Now().Nanosecond()) } + var ( mu sync.Mutex rng = rand.New(rand.NewSource(seed)) @@ -141,7 +144,7 @@ func (n *Notifier) Notify(id ID, data interface{}) error { // Closed returns a channel that is closed when the RPC connection is closed. // Deprecated: use subscription error channel func (n *Notifier) Closed() <-chan interface{} { - return n.h.conn.Closed() + return n.h.conn.closed() } // takeSubscription returns the subscription (if one has been created). No subscription can @@ -153,7 +156,7 @@ func (n *Notifier) takeSubscription() *Subscription { return n.sub } -// acticate is called after the subscription ID was sent to client. Notifications are +// activate is called after the subscription ID was sent to client. Notifications are // buffered before activation. This prevents notifications being sent to the client before // the subscription ID is sent to the client. func (n *Notifier) activate() error { @@ -172,14 +175,14 @@ func (n *Notifier) activate() error { func (n *Notifier) send(sub *Subscription, data json.RawMessage) error { params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data}) ctx := context.Background() - return n.h.conn.Write(ctx, &jsonrpcMessage{ + return n.h.conn.writeJSON(ctx, &jsonrpcMessage{ Version: vsn, Method: n.namespace + notificationMethodSuffix, Params: params, }) } -// A Subscription is created by a notifier and tight to that notifier. The client can use +// A Subscription is created by a notifier and tied to that notifier. The client can use // this subscription to wait for an unsubscribe request for the client, see Err(). type Subscription struct { ID ID @@ -241,11 +244,11 @@ func (sub *ClientSubscription) Err() <-chan error { // Unsubscribe unsubscribes the notification and closes the error channel. // It can safely be called more than once. func (sub *ClientSubscription) Unsubscribe() { - sub.quitWithError(nil, true) + sub.quitWithError(true, nil) sub.errOnce.Do(func() { close(sub.err) }) } -func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) { +func (sub *ClientSubscription) quitWithError(unsubscribeServer bool, err error) { sub.quitOnce.Do(func() { // The dispatch loop won't be able to execute the unsubscribe call // if it is blocked on deliver. Close sub.quit first because it @@ -276,7 +279,7 @@ func (sub *ClientSubscription) start() { sub.quitWithError(sub.forward()) } -func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) { +func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) { cases := []reflect.SelectCase{ {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)}, {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)}, @@ -298,14 +301,14 @@ func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) { switch chosen { case 0: // <-sub.quit - return nil, false + return false, nil case 1: // <-sub.in val, err := sub.unmarshal(recv.Interface().(json.RawMessage)) if err != nil { - return err, true + return true, err } if buffer.Len() == maxClientSubscriptionBuffer { - return ErrSubscriptionQueueOverflow, true + return true, ErrSubscriptionQueueOverflow } buffer.PushBack(val) case 2: // sub.channel<- -- cgit v1.2.3