Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
AnkushinDaniil committed Jan 22, 2025
1 parent 1d51032 commit 8d13938
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 25 deletions.
101 changes: 101 additions & 0 deletions p2p/p2p_pkg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package p2p

import (
"context"
"testing"
"time"

"github.com/NethermindEth/juno/db/pebble"
"github.com/NethermindEth/juno/utils"
"github.com/stretchr/testify/require"
)

func TestDiscoverPeers(t *testing.T) {
bootstrapPeer, err := New(
"",
"",
"bootstrapPeer",
"",
"",
true,
nil,
&utils.Sepolia,
utils.NewNopZapLogger(),
pebble.NewMemTest(t),
)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bootstrapPeerInfo := bootstrapPeer.host.Peerstore().PeerInfo(bootstrapPeer.host.ID())
bootstrapPeerAddress := bootstrapPeerInfo.Addrs[0].String() + "/p2p/" + bootstrapPeerInfo.ID.String()

peerA, err := New(
"",
"",
"peerA",
bootstrapPeerAddress,
"",
true,
nil,
&utils.Sepolia,
utils.NewNopZapLogger(),
pebble.NewMemTest(t),
)
require.NoError(t, err)

peerB, err := New(
"",
"",
"peerB",
bootstrapPeerAddress,
"",
true,
nil,
&utils.Sepolia,
utils.NewNopZapLogger(),
pebble.NewMemTest(t),
)
require.NoError(t, err)

errorChan := make(chan error, 2)

go func() {
if runErr := peerA.Run(ctx); runErr != nil {
errorChan <- runErr
}
}()

go func() {
if runErr := peerB.Run(ctx); runErr != nil {
errorChan <- runErr
}
}()

// Allow some time for the peers to start and discover each other
time.Sleep(2 * time.Second)

bootstrapPeer.host.Close()

time.Sleep(2 * time.Second)

// Verify peer discovery
peerAPeers := peerA.host.Peerstore().Peers()
require.Contains(t, peerAPeers, peerB.host.ID())
peerBPeers := peerB.host.Peerstore().Peers()
require.Contains(t, peerBPeers, peerA.host.ID())

// Cancel the context to stop the `Run` method
cancel()

// Wait for goroutines to clean up and check for errors
time.Sleep(time.Second / 2)

select {
case err := <-errorChan:
require.NoError(t, err, "Error in peer Run method")
default:
// No errors reported
}
}
4 changes: 1 addition & 3 deletions rpc/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ func (h *Handler) Events(args EventsArg) (*EventsChunk, *jsonrpc.Error) {
// unsubscribe assumes h.mu is unlocked. It releases all subscription resources.
func (h *Handler) unsubscribe(sub *subscription, id uint64) {
sub.cancel()
h.mu.Lock()
delete(h.subscriptions, id)
h.mu.Unlock()
h.subscriptions.remove(id)
}

func setEventFilterRange(filter blockchain.EventFilterer, fromID, toID *BlockID, latestHeight uint64) error {
Expand Down
102 changes: 95 additions & 7 deletions rpc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"iter"
"log"
"maps"
"math"
Expand Down Expand Up @@ -101,7 +102,7 @@ type Handler struct {

idgen func() uint64
mu stdsync.Mutex // protects subscriptions.

Check failure on line 104 in rpc/handlers.go

View workflow job for this annotation

GitHub Actions / lint

field `mu` is unused (unused)
subscriptions map[uint64]*subscription
subscriptions subscriptions

blockTraceCache *lru.Cache[traceCacheKey, []TracedBlockTransaction]

Expand All @@ -112,6 +113,97 @@ type Handler struct {
coreContractABI abi.ABI
}

func newSubscriptions() subscriptions {
return subscriptions{
id2sub: make(map[uint64]*subscription),
event: make(chan event),
valuesChan: make(chan iter.Seq[*subscription]),
}
}

type subscriptions struct {
id2sub map[uint64]*subscription
event chan event
valueChan chan value
valuesChan chan iter.Seq[*subscription]
}

type value struct {
sub *subscription
ok bool
}

type event struct {
id uint64
sub *subscription
action action
}

type action uint8

const (
add action = iota
remove
getValue
getValues
)

func (s subscriptions) add(id uint64, sub *subscription) {
fmt.Println("add")
s.event <- event{id: id, sub: sub, action: add}
fmt.Println("added")
}

func (s subscriptions) remove(id uint64) {
fmt.Println("remove")
s.event <- event{id: id, sub: nil, action: remove}
fmt.Println("removed")
}

func (s subscriptions) getValue(id uint64) (*subscription, bool) {
fmt.Println("getValue")
s.event <- event{id: id, action: getValue}
v := <-s.valueChan
fmt.Println("gotValue")
return v.sub, v.ok
}

func (s subscriptions) getValues() iter.Seq[*subscription] {
fmt.Println("getValues")
s.event <- event{action: getValues}
fmt.Println("gotValues")
return <-s.valuesChan
}

func (s subscriptions) run(ctx context.Context) {

Check failure on line 178 in rpc/handlers.go

View workflow job for this annotation

GitHub Actions / lint

func `subscriptions.run` is unused (unused)
fmt.Println("run")
defer close(s.valuesChan)
for {
fmt.Println("select")
select {
case e := <-s.event:
switch e.action {
case add:
fmt.Println("add in run")
s.id2sub[e.id] = e.sub
case remove:
fmt.Println("remove in run")
delete(s.id2sub, e.id)
case getValue:
fmt.Println("getValue in run")
sub, ok := s.id2sub[e.id]
s.valueChan <- value{sub: sub, ok: ok}
case getValues:
fmt.Println("getValues in run")
s.valuesChan <- maps.Values(s.id2sub)
}
case <-ctx.Done():
fmt.Println("done")
return
}
}
}

type subscription struct {
cancel func()
wg conc.WaitGroup
Expand Down Expand Up @@ -141,7 +233,7 @@ func New(bcReader blockchain.Reader, syncReader sync.Reader, virtualMachine vm.V
reorgs: feed.New[*sync.ReorgBlockRange](),
pendingTxs: feed.New[[]core.Transaction](),
l1Heads: feed.New[*core.L1Head](),
subscriptions: make(map[uint64]*subscription),
subscriptions: newSubscriptions(),

blockTraceCache: lru.NewCache[traceCacheKey, []TracedBlockTransaction](traceCacheSize),
filterLimit: math.MaxUint,
Expand Down Expand Up @@ -196,11 +288,7 @@ func (h *Handler) Run(ctx context.Context) error {

<-ctx.Done()

h.mu.Lock()
subscriptions := maps.Values(h.subscriptions)
h.mu.Unlock()

for sub := range subscriptions {
for sub := range h.subscriptions.getValues() {
sub.wg.Wait()
}

Expand Down
20 changes: 5 additions & 15 deletions rpc/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys
cancel: subscriptionCtxCancel,
conn: w,
}
h.mu.Lock()
h.subscriptions[id] = sub
h.mu.Unlock()
h.subscriptions.add(id, sub)

headerSub := h.newHeads.Subscribe()
reorgSub := h.reorgs.Subscribe() // as per the spec, reorgs are also sent in the events subscription
Expand Down Expand Up @@ -163,9 +161,7 @@ func (h *Handler) SubscribeTransactionStatus(ctx context.Context, txHash felt.Fe
cancel: subscriptionCtxCancel,
conn: w,
}
h.mu.Lock()
h.subscriptions[id] = sub
h.mu.Unlock()
h.subscriptions.add(id, sub)

l2HeadSub := h.newHeads.Subscribe()
l1HeadSub := h.l1Heads.Subscribe()
Expand Down Expand Up @@ -359,9 +355,7 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context, blockID *BlockID) (*Sub
cancel: subscriptionCtxCancel,
conn: w,
}
h.mu.Lock()
h.subscriptions[id] = sub
h.mu.Unlock()
h.subscriptions.add(id, sub)

headerSub := h.newHeads.Subscribe()
reorgSub := h.reorgs.Subscribe() // as per the spec, reorgs are also sent in the new heads subscription
Expand Down Expand Up @@ -414,9 +408,7 @@ func (h *Handler) SubscribePendingTxs(ctx context.Context, getDetails *bool, sen
cancel: subscriptionCtxCancel,
conn: w,
}
h.mu.Lock()
h.subscriptions[id] = sub
h.mu.Unlock()
h.subscriptions.add(id, sub)

pendingTxsSub := h.pendingTxs.Subscribe()
sub.wg.Go(func() {
Expand Down Expand Up @@ -659,9 +651,7 @@ func (h *Handler) Unsubscribe(ctx context.Context, id uint64) (bool, *jsonrpc.Er
if !ok {
return false, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
}
h.mu.Lock()
sub, ok := h.subscriptions[id]
h.mu.Unlock() // Don't defer since h.unsubscribe acquires the lock.
sub, ok := h.subscriptions.getValue(id)
if !ok || !sub.conn.Equal(w) {
return false, ErrInvalidSubscriptionID
}
Expand Down

0 comments on commit 8d13938

Please sign in to comment.