Skip to content

Commit

Permalink
Added subscription hook and injection
Browse files Browse the repository at this point in the history
  • Loading branch information
philip-morlier committed Oct 17, 2024
1 parent f784482 commit d8fc46b
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 0 deletions.
4 changes: 4 additions & 0 deletions rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func (r *serviceRegistry) registerName(name string, rcvr interface{}) error {
return fmt.Errorf("service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
}

// begin PluGeth code injection
pluginExtendedCallbacks(callbacks, rcvrVal)
// end PluGeth code injection

r.mu.Lock()
defer r.mu.Unlock()

Expand Down
147 changes: 147 additions & 0 deletions rpc/xplugeth_subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package rpc

import (
"context"
"reflect"
"github.com/ethereum/go-ethereum/log"
)

// Is t context.Context or *context.Context?
func isContextType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t == contextType
}

func isChanType(t reflect.Type) bool {
// Pointers to channels are weird, but whatever
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// Make sure we have a channel
if t.Kind() != reflect.Chan {
return false
}
// Make sure it is a receivable channel
return (t.ChanDir() & reflect.RecvDir) == reflect.RecvDir
}

func isChanPubsub(methodType reflect.Type) bool {
if methodType.NumIn() < 2 || methodType.NumOut() != 2 {
return false
}
return isContextType(methodType.In(1)) &&
isChanType(methodType.Out(0)) &&
isErrorType(methodType.Out(1))
}

func callbackifyChanPubSub(receiver, fn reflect.Value) *callback {
c := &callback{rcvr: receiver, errPos: 1, isSubscribe: true}
fntype := fn.Type()
// Skip receiver and context.Context parameter (if present).
firstArg := 0
if c.rcvr.IsValid() {
firstArg++
}
if fntype.NumIn() > firstArg && fntype.In(firstArg) == contextType {
c.hasCtx = true
firstArg++
}
// Add all remaining parameters.
c.argTypes = make([]reflect.Type, fntype.NumIn()-firstArg)
for i := firstArg; i < fntype.NumIn(); i++ {
c.argTypes[i-firstArg] = fntype.In(i)
}

retFnType := reflect.FuncOf(append([]reflect.Type{receiver.Type(), contextType}, c.argTypes...), []reflect.Type{reflect.PointerTo(subscriptionType), errorType}, false)

// // What follows uses reflection to construct a dynamically typed function equivalent to:
// func(receiver <T>, cctx context.Context, args ...<T>) (*rpc.Subscription, error) {
// notifier, supported := NotifierFromContext(cctx)
// if !supported { return Subscription{}, ErrNotificationsUnsupported}
// ctx, cancel := context.WithCancel(context.Background())
// ch, err := fn()
// if err != nil { return Subscription{}, err }
// rpcSub := notifier.CreateSubscription()
// go func() {
// select {
// case v, ok := <- ch:
// if !ok { return }
// notifier.Notify(rpcSub.ID, v)
// case <-rpcSub.Err():
// cancel()
// return
// case <-notifier.Closed():
// cancel()
// return
// }
// }()
// return rpcSub, nil
// }
//

c.fn = reflect.MakeFunc(retFnType, func(args []reflect.Value) ([]reflect.Value) {
notifier, supported := NotifierFromContext(args[1].Interface().(context.Context))
if !supported {
return []reflect.Value{reflect.Zero(subscriptionType), reflect.ValueOf(ErrNotificationsUnsupported)}
}
ctx, cancel := context.WithCancel(context.Background())

args[1] = reflect.ValueOf(ctx)
out := fn.Call(args)
if !out[1].IsNil() {
// This amounts to: if err != nil { return nil, err }
cancel()
return []reflect.Value{reflect.Zero(subscriptionType), out[1]}
}
// Geth's provided context is done once we've returned the subscription id.
// This new context will cancel when the notifier closes.

rpcSub := notifier.CreateSubscription()
go func() {
defer cancel()
defer log.Info("Plugin subscription goroutine closed")
selectCases := []reflect.SelectCase{
{Dir: reflect.SelectRecv, Chan: out[0]},
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(rpcSub.Err())},
// {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(notifier.Closed())},
}
for {
chosen, val, recvOK := reflect.Select(selectCases)
switch chosen {
case 0: // val, ok := <-ch
if !recvOK {
return
}
if err := notifier.Notify(rpcSub.ID, val.Interface()); err != nil {
log.Warn("Subscription notification failed", "id", rpcSub.ID, "err", err)
}
case 1:
return
// case 2:
// cancel()
// return
}
}
}()
return []reflect.Value{reflect.ValueOf(rpcSub), reflect.Zero(errorType)}
})
return c
}


func pluginExtendedCallbacks(callbacks map[string]*callback, receiver reflect.Value) {
typ := receiver.Type()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
if method.PkgPath != "" {
continue // method not exported
}
if isChanPubsub(method.Type) {
cb := callbackifyChanPubSub(receiver, method.Func)
name := formatName(method.Name)
callbacks[name] = cb
}
}
}

0 comments on commit d8fc46b

Please sign in to comment.