Skip to content

Commit 4b38293

Browse files
committed
Send events from pending block instead of new Head
1 parent 9c0f8bb commit 4b38293

File tree

2 files changed

+65
-15
lines changed

2 files changed

+65
-15
lines changed

rpc/subscriptions.go

+44-11
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/NethermindEth/juno/core/felt"
1111
"github.com/NethermindEth/juno/feed"
1212
"github.com/NethermindEth/juno/jsonrpc"
13-
"github.com/NethermindEth/juno/sync"
13+
junoSync "github.com/NethermindEth/juno/sync"
1414
"github.com/sourcegraph/conc"
1515
)
1616

@@ -67,24 +67,46 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys
6767
h.mu.Unlock()
6868

6969
headerSub := h.newHeads.Subscribe()
70-
reorgSub := h.reorgs.Subscribe() // as per the spec, reorgs are also sent in the events subscription
70+
reorgSub := h.reorgs.Subscribe() // as per the spec, reorgs are also sent in the events subscription#
71+
pendingSub := h.pendingBlock.Subscribe()
7172
sub.wg.Go(func() {
7273
defer func() {
7374
h.unsubscribe(sub, id)
7475
headerSub.Unsubscribe()
7576
reorgSub.Unsubscribe()
77+
pendingSub.Unsubscribe()
7678
}()
7779

78-
// The specification doesn't enforce ordering of events therefore events from new blocks can be sent before
80+
// The specification doesn't enforce ordering of events, therefore, events from new blocks can be sent before
7981
// old blocks.
8082
var wg conc.WaitGroup
8183
wg.Go(func() {
84+
// Stores the transaction hash of the event
85+
eventsPreviouslySent := make(map[felt.Felt]struct{})
86+
8287
for {
8388
select {
8489
case <-subscriptionCtx.Done():
8590
return
8691
case header := <-headerSub.Recv():
87-
h.processEvents(subscriptionCtx, w, id, header.Number, header.Number, fromAddr, keys)
92+
// During syncing the events from the new head still need to be sent as there is no pending block.
93+
// However, it is not easy to tell when the node is syncing.
94+
// To solve this issue, we can send the events regardless, and if the node is done syncing, then the
95+
// latest header events would have been sent when the pending block was updated. Hence,
96+
// trying to resend the event should be of no consequences and the map can be safely emptied.
97+
h.processEvents(subscriptionCtx, w, id, header.Number, header.Number, fromAddr, keys, eventsPreviouslySent)
98+
99+
b, err := h.bcReader.BlockByNumber(header.Number)
100+
if err != nil {
101+
h.log.Warnw("Error retrieving block", "block number", header.Number, "err", err)
102+
return
103+
}
104+
105+
for _, t := range b.Transactions {
106+
delete(eventsPreviouslySent, *t.Hash())
107+
}
108+
case pending := <-pendingSub.Recv():
109+
h.processEvents(subscriptionCtx, w, id, pending.Number, pending.Number, fromAddr, keys, eventsPreviouslySent)
88110
}
89111
}
90112
})
@@ -94,7 +116,7 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys
94116
})
95117

96118
wg.Go(func() {
97-
h.processEvents(subscriptionCtx, w, id, requestedHeader.Number, headHeader.Number, fromAddr, keys)
119+
h.processEvents(subscriptionCtx, w, id, requestedHeader.Number, headHeader.Number, fromAddr, keys, nil)
98120
})
99121

100122
wg.Wait()
@@ -252,7 +274,9 @@ func (h *Handler) SubscribeTransactionStatus(ctx context.Context, txHash felt.Fe
252274
return &SubscriptionID{ID: id}, nil
253275
}
254276

255-
func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, to uint64, fromAddr *felt.Felt, keys [][]felt.Felt) {
277+
func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, to uint64, fromAddr *felt.Felt,
278+
keys [][]felt.Felt, eventsPreviouslySent map[felt.Felt]struct{},
279+
) {
256280
filter, err := h.bcReader.EventFilter(fromAddr, keys)
257281
if err != nil {
258282
h.log.Warnw("Error creating event filter", "err", err)
@@ -272,7 +296,7 @@ func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, t
272296
return
273297
}
274298

275-
err = sendEvents(ctx, w, filteredEvents, id)
299+
err = sendEvents(ctx, w, filteredEvents, eventsPreviouslySent, id)
276300
if err != nil {
277301
h.log.Warnw("Error sending events", "err", err)
278302
return
@@ -285,20 +309,29 @@ func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, t
285309
return
286310
}
287311

288-
err = sendEvents(ctx, w, filteredEvents, id)
312+
err = sendEvents(ctx, w, filteredEvents, eventsPreviouslySent, id)
289313
if err != nil {
290314
h.log.Warnw("Error sending events", "err", err)
291315
return
292316
}
293317
}
294318
}
295319

296-
func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.FilteredEvent, id uint64) error {
320+
func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.FilteredEvent,
321+
eventsPreviouslySent map[felt.Felt]struct{}, id uint64,
322+
) error {
297323
for _, event := range events {
298324
select {
299325
case <-ctx.Done():
300326
return ctx.Err()
301327
default:
328+
if eventsPreviouslySent != nil {
329+
if _, exists := eventsPreviouslySent[*event.TransactionHash]; exists {
330+
continue
331+
}
332+
eventsPreviouslySent[*event.TransactionHash] = struct{}{}
333+
}
334+
302335
emittedEvent := &EmittedEvent{
303336
BlockNumber: &event.BlockNumber, // This always be filled as subscribeEvents cannot be called on pending block
304337
BlockHash: event.BlockHash,
@@ -604,7 +637,7 @@ func (h *Handler) sendHeader(w jsonrpc.Conn, header *core.Header, id uint64) err
604637
return err
605638
}
606639

607-
func (h *Handler) processReorgs(ctx context.Context, reorgSub *feed.Subscription[*sync.ReorgBlockRange], w jsonrpc.Conn, id uint64) {
640+
func (h *Handler) processReorgs(ctx context.Context, reorgSub *feed.Subscription[*junoSync.ReorgBlockRange], w jsonrpc.Conn, id uint64) {
608641
for {
609642
select {
610643
case <-ctx.Done():
@@ -625,7 +658,7 @@ type ReorgEvent struct {
625658
EndBlockNum uint64 `json:"ending_block_number"`
626659
}
627660

628-
func (h *Handler) sendReorg(w jsonrpc.Conn, reorg *sync.ReorgBlockRange, id uint64) error {
661+
func (h *Handler) sendReorg(w jsonrpc.Conn, reorg *junoSync.ReorgBlockRange, id uint64) error {
629662
resp, err := json.Marshal(jsonrpc.Request{
630663
Version: "2.0",
631664
Method: "starknet_subscriptionReorg",

rpc/subscriptions_test.go

+21-4
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ func TestSubscribeEvents(t *testing.T) {
268268
cancel()
269269
})
270270

271-
t.Run("Events from new blocks", func(t *testing.T) {
271+
t.Run("Events from pending block without duplicates", func(t *testing.T) {
272272
mockCtrl := gomock.NewController(t)
273273
t.Cleanup(mockCtrl.Finish)
274274

@@ -277,8 +277,8 @@ func TestSubscribeEvents(t *testing.T) {
277277
mockEventFilterer := mocks.NewMockEventFilterer(mockCtrl)
278278

279279
handler := New(mockChain, mockSyncer, nil, "", log)
280-
headerFeed := feed.New[*core.Header]()
281-
handler.newHeads = headerFeed
280+
pendingFeed := feed.New[*core.Block]()
281+
handler.pendingBlock = pendingFeed
282282

283283
mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: b1.Number}, nil)
284284
mockChain.EXPECT().EventFilter(fromAddr, keys).Return(mockEventFilterer, nil)
@@ -306,12 +306,13 @@ func TestSubscribeEvents(t *testing.T) {
306306
require.NoError(t, err)
307307
assert.Equal(t, string(resp), string(got))
308308

309+
// Pending block events, due to the use of mocks events which were sent before are resent.
309310
mockChain.EXPECT().EventFilter(fromAddr, keys).Return(mockEventFilterer, nil)
310311

311312
mockEventFilterer.EXPECT().SetRangeEndBlockByNumber(gomock.Any(), gomock.Any()).Return(nil).MaxTimes(2)
312313
mockEventFilterer.EXPECT().Events(gomock.Any(), gomock.Any()).Return([]*blockchain.FilteredEvent{filteredEvents[1]}, nil, nil)
313314

314-
headerFeed.Send(&core.Header{Number: b1.Number + 1})
315+
pendingFeed.Send(&core.Block{Header: &core.Header{Number: b1.Number + 1}})
315316

316317
resp, err = marshalSubEventsResp(emittedEvents[1], id.ID)
317318
require.NoError(t, err)
@@ -321,6 +322,22 @@ func TestSubscribeEvents(t *testing.T) {
321322
require.NoError(t, err)
322323
assert.Equal(t, string(resp), string(got))
323324

325+
mockChain.EXPECT().EventFilter(fromAddr, keys).Return(mockEventFilterer, nil)
326+
327+
mockEventFilterer.EXPECT().SetRangeEndBlockByNumber(gomock.Any(), gomock.Any()).Return(nil).MaxTimes(2)
328+
mockEventFilterer.EXPECT().Events(gomock.Any(), gomock.Any()).Return([]*blockchain.
329+
FilteredEvent{filteredEvents[1], filteredEvents[0]}, nil, nil)
330+
331+
pendingFeed.Send(&core.Block{Header: &core.Header{Number: b1.Number + 1}})
332+
333+
resp, err = marshalSubEventsResp(emittedEvents[0], id.ID)
334+
require.NoError(t, err)
335+
336+
got = make([]byte, len(resp))
337+
_, err = clientConn.Read(got)
338+
require.NoError(t, err)
339+
assert.Equal(t, string(resp), string(got))
340+
324341
cancel()
325342
time.Sleep(100 * time.Millisecond)
326343
})

0 commit comments

Comments
 (0)