@@ -62,33 +62,42 @@ async def dragonsmouth_like_session(fumarole_config):
62
62
session = await client.dragonsmouth_subscribe(
63
63
consumer_group_name="test",
64
64
request=SubscribeRequest(
65
- # accounts={"fumarole": SubscribeRequestFilterAccounts()},
65
+ accounts={"fumarole" : SubscribeRequestFilterAccounts()},
66
66
transactions={"fumarole" : SubscribeRequestFilterTransactions()},
67
67
blocks_meta={"fumarole" : SubscribeRequestFilterBlocksMeta()},
68
68
entry={"fumarole" : SubscribeRequestFilterEntry()},
69
69
slots={"fumarole" : SubscribeRequestFilterSlots()},
70
70
),
71
71
)
72
- dragonsmouth_source = session.source
73
- handle = session.fumarole_handle
74
- block_map = defaultdict(BlockConstruction)
75
- while True :
76
- tasks = [asyncio.create_task(dragonsmouth_source.get()), handle]
77
- done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
78
- for t in done :
79
- if tasks[0] == t :
80
- result : SubscribeUpdate = t.result()
81
- if result.HasField("block_meta") :
82
- block_meta : SubscribeUpdateBlockMeta = result.block_meta
83
- elif result.HasField("transaction") :
84
- tx : SubscribeUpdateTransaction = result.transaction
85
- elif result.HasField("account") :
86
- account : SubscribeUpdateAccount = result.account
87
- elif result.HasField("entry") :
88
- entry : SubscribeUpdateEntry = result.entry
89
- elif result.HasField("slot") :
90
- result : SubscribeUpdateSlot = result.slot
91
- else :
92
- result = t.result()
93
- raise RuntimeError("failed to get dragonsmouth source : %s" % result)
94
- ` ` `
72
+ async with session :
73
+ dragonsmouth_like_source = session.source
74
+ # result: SubscribeUpdate
75
+ async for result in dragonsmouth_like_source :
76
+ if result.HasField("block_meta") :
77
+ block_meta : SubscribeUpdateBlockMeta = result.block_meta
78
+ elif result.HasField("transaction") :
79
+ tx : SubscribeUpdateTransaction = result.transaction
80
+ elif result.HasField("account") :
81
+ account : SubscribeUpdateAccount = result.account
82
+ elif result.HasField("entry") :
83
+ entry : SubscribeUpdateEntry = result.entry
84
+ elif result.HasField("slot") :
85
+ result : SubscribeUpdateSlot = result.slot
86
+
87
+ # OUTSIDE THE SCOPE, YOU SHOULD NEVER USE `session` again.
88
+ ```
89
+
90
+
91
+ At any point you can get a rough estimate if you are progression through the slot using ` DragonsmouthAdapterSession.stats() ` call:
92
+
93
+ ``` python
94
+
95
+ async with session:
96
+ stats: FumaroleSubscribeStats = session.stats()
97
+ print (f " { stats.log_committed_offset} , { stats.log_committable_offset} , { stats.max_slot_seen} " )
98
+ ```
99
+
100
+ ` log_committed_offset ` : what have been ACK so for to fumarole remote service.
101
+ ` log_committable_offset ` : what can be ACK to next commit call.
102
+ ` max_slot_seen ` : maximum slot seen in the inner fumarole client state -- not yet processed by your code.
103
+
0 commit comments