Skip to content

Commit

Permalink
Add support for Reorg notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
thiagodeev committed Jan 23, 2025
1 parent 8f99018 commit 087ed79
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 14 deletions.
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ func (c *Client) Subscribe(ctx context.Context, namespace string, methodSuffix s
op := &requestOp{
ids: []json.RawMessage{msg.ID},
resp: make(chan []*jsonrpcMessage, 1),
sub: newClientSubscription(c, namespace, chanVal),
sub: newClientSubscription(c, namespace, chanVal, methodSuffix),
}

// Send the subscription request.
Expand Down
11 changes: 11 additions & 0 deletions client/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"strings"
"sync"
"time"

"github.com/NethermindEth/juno/core/felt"
)

const (
Expand Down Expand Up @@ -56,6 +58,15 @@ type subscriptionResultEnc struct {
Result any `json:"result"`
}

// Struct representing a reorganization of the chain
// Can be received from subscribing to newHeads, Events, TransactionStatus
type ReorgEvent struct {
StartBlockHash *felt.Felt `json:"starting_block_hash"`
StartBlockNum uint64 `json:"starting_block_number"`
EndBlockHash *felt.Felt `json:"ending_block_hash"`
EndBlockNum uint64 `json:"ending_block_number"`
}

type jsonrpcSubscriptionNotification struct {
Version string `json:"jsonrpc"`
Method string `json:"method"`
Expand Down
77 changes: 64 additions & 13 deletions client/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package client

import (
"bytes"
"container/list"
"context"
crand "crypto/rand"
Expand Down Expand Up @@ -203,11 +204,13 @@ func (s *Subscription) MarshalJSON() ([]byte, error) {

// ClientSubscription is a subscription established through the Client's Subscribe
type ClientSubscription struct {
client *Client
etype reflect.Type
channel reflect.Value
namespace string
subid string
client *Client
etype reflect.Type
channel reflect.Value
reorgEtype reflect.Type
reorgChannel chan *ReorgEvent
namespace string
subid string

// The in channel receives notification values from client dispatcher.
in chan json.RawMessage
Expand All @@ -228,7 +231,7 @@ type ClientSubscription struct {
// This is the sentinel value sent on sub.quit when Unsubscribe is called.
var errUnsubscribed = errors.New("unsubscribed")

func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
func newClientSubscription(c *Client, namespace string, channel reflect.Value, method string) *ClientSubscription {
sub := &ClientSubscription{
client: c,
namespace: namespace,
Expand All @@ -240,6 +243,13 @@ func newClientSubscription(c *Client, namespace string, channel reflect.Value) *
unsubDone: make(chan struct{}),
err: make(chan error, 1),
}

// A reorg event can be received from subscribing to newHeads, Events, TransactionStatus
if strings.HasSuffix(method, "NewHeads") || strings.HasSuffix(method, "Events") || strings.HasSuffix(method, "TransactionStatus") {
sub.reorgChannel = make(chan *ReorgEvent)
sub.reorgEtype = reflect.TypeOf(&ReorgEvent{})
}

return sub
}

Expand All @@ -255,6 +265,12 @@ func (sub *ClientSubscription) Err() <-chan error {
return sub.err
}

// Reorg returns a channel that notifies the subscriber of a reorganization of the chain.
// A reorg event could be received only from subscribing to NewHeads, Events, and TransactionStatus
func (sub *ClientSubscription) Reorg() <-chan *ReorgEvent {
return sub.reorgChannel
}

// Unsubscribe unsubscribes the notification and closes the error channel.
// It can safely be called more than once.
func (sub *ClientSubscription) Unsubscribe() {
Expand Down Expand Up @@ -295,6 +311,9 @@ func (sub *ClientSubscription) close(err error) {
// is launched by the client's handler after the subscription has been created.
func (sub *ClientSubscription) run() {
defer close(sub.unsubDone)
if sub.reorgChannel != nil {
defer close(sub.reorgChannel)
}

unsubscribe, err := sub.forward()

Expand Down Expand Up @@ -326,7 +345,15 @@ func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
{Dir: reflect.SelectSend, Chan: sub.channel},
}

// a separate case for reorg events as it'll come in the same subscription
var reorgCases []reflect.SelectCase
if sub.reorgChannel != nil {
reorgCases = append(cases[:2:2], reflect.SelectCase{Dir: reflect.SelectSend, Chan: reflect.ValueOf(sub.reorgChannel)})
}

buffer := list.New()
isReorg := false

for {
var chosen int
Expand All @@ -336,8 +363,13 @@ func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
chosen, recv, _ = reflect.Select(cases[:2])
} else {
// Non-empty buffer, send the first queued item.
cases[2].Send = reflect.ValueOf(buffer.Front().Value)
chosen, recv, _ = reflect.Select(cases)
if isReorg {
reorgCases[2].Send = reflect.ValueOf(buffer.Front().Value)
chosen, recv, _ = reflect.Select(reorgCases)
} else {
cases[2].Send = reflect.ValueOf(buffer.Front().Value)
chosen, recv, _ = reflect.Select(cases)
}
}

switch chosen {
Expand All @@ -352,7 +384,7 @@ func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
return false, err

case 1: // <-sub.in
val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
val, err := sub.unmarshal(recv.Interface().(json.RawMessage), &isReorg)
if err != nil {
return true, err
}
Expand All @@ -361,16 +393,35 @@ func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
}
buffer.PushBack(val)

case 2: // sub.channel<-
cases[2].Send = reflect.Value{} // Don't hold onto the value.
case 2: // sub.channel<- OR sub.reorgChannel<-
if isReorg {
reorgCases[2].Send = reflect.Value{} // Don't hold onto the value.
} else {
cases[2].Send = reflect.Value{} // Don't hold onto the value.
}
buffer.Remove(buffer.Front())
}
}
}

func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) {
func (sub *ClientSubscription) unmarshal(result json.RawMessage, isReorg *bool) (interface{}, error) {
val := reflect.New(sub.etype)
err := json.Unmarshal(result, val.Interface())
dec := json.NewDecoder(bytes.NewReader(result))
dec.DisallowUnknownFields()
err := dec.Decode(val.Interface())

// If there's an error when unmarshalling to the main channel type, maybe it's a reorg event
if err != nil && sub.reorgEtype != nil {
val = reflect.New(sub.reorgEtype)
err2 := json.Unmarshal(result, val.Interface())
if err2 != nil {
err = errors.Join(err, err2)
} else {
*isReorg = true
return val.Elem().Interface(), nil
}
}
*isReorg = false
return val.Elem().Interface(), err
}

Expand Down
9 changes: 9 additions & 0 deletions rpc/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
)

func TestSubscribeNewHeads(t *testing.T) {
t.Parallel()

if testEnv != "testnet" {
t.Skip("Skipping test as it requires a testnet environment")
}
Expand Down Expand Up @@ -115,6 +117,8 @@ func TestSubscribeNewHeads(t *testing.T) {
}

func TestSubscribeEvents(t *testing.T) {
t.Parallel()

if testEnv != "testnet" {
t.Skip("Skipping test as it requires a testnet environment")
}
Expand Down Expand Up @@ -359,6 +363,7 @@ func TestSubscribeEvents(t *testing.T) {
}

func TestSubscribeTransactionStatus(t *testing.T) {
t.Parallel()
if testEnv != "testnet" {
t.Skip("Skipping test as it requires a testnet environment")
}
Expand Down Expand Up @@ -409,6 +414,7 @@ func TestSubscribeTransactionStatus(t *testing.T) {
}

func TestSubscribePendingTransactions(t *testing.T) {
t.Parallel()
if testEnv != "testnet" {
t.Skip("Skipping test as it requires a testnet environment")
}
Expand Down Expand Up @@ -489,3 +495,6 @@ func TestSubscribePendingTransactions(t *testing.T) {
})
}
}

// TODO: Add mock for testing reorg events.
// A simple test was made to make sure the reorg events are received; it'll be added in the PR 651 comments

0 comments on commit 087ed79

Please sign in to comment.