@@ -10,6 +10,7 @@ import (
10
10
"github.com/NethermindEth/juno/core/felt"
11
11
"github.com/NethermindEth/juno/db"
12
12
"github.com/NethermindEth/juno/encoder"
13
+ "github.com/NethermindEth/juno/utils"
13
14
)
14
15
15
16
var ErrTxnPoolFull = errors .New ("transaction pool is full" )
@@ -34,6 +35,7 @@ type txnList struct {
34
35
35
36
// Pool stores the transactions in a linked list for its inherent FCFS behaviour
36
37
type Pool struct {
38
+ log utils.SimpleLogger
37
39
state core.StateReader
38
40
db db.DB // persistent mempool
39
41
txPushed chan struct {}
@@ -43,20 +45,21 @@ type Pool struct {
43
45
wg sync.WaitGroup
44
46
}
45
47
46
- // New initializes the Pool and starts the database writer goroutine.
48
+ // New initialises the Pool and starts the database writer goroutine.
47
49
// It is the responsibility of the user to call the cancel function if the context is cancelled
48
- func New (db db.DB , state core.StateReader , maxNumTxns uint16 ) (* Pool , func () error , error ) {
50
+ func New (persistentPool db.DB , state core.StateReader , maxNumTxns uint16 , log utils. SimpleLogger ) (* Pool , func () error , error ) {
49
51
pool := & Pool {
52
+ log : log ,
50
53
state : state ,
51
- db : db , // todo: txns should be deleted everytime a new block is stored (builder responsibility)
54
+ db : persistentPool , // todo: txns should be deleted everytime a new block is stored (builder responsibility)
52
55
txPushed : make (chan struct {}, 1 ),
53
56
txnList : & txnList {},
54
57
maxNumTxns : maxNumTxns ,
55
58
dbWriteChan : make (chan * BroadcastedTransaction , maxNumTxns ),
56
59
}
57
60
58
61
if err := pool .loadFromDB (); err != nil {
59
- return nil , nil , fmt .Errorf ("failed to load transactions from database into the in-memory transaction list: %v\n " , err )
62
+ return nil , nil , fmt .Errorf ("failed to load transactions from database into the in-memory transaction list: %v" , err )
60
63
}
61
64
62
65
pool .wg .Add (1 )
@@ -74,25 +77,20 @@ func New(db db.DB, state core.StateReader, maxNumTxns uint16) (*Pool, func() err
74
77
75
78
func (p * Pool ) dbWriter () {
76
79
defer p .wg .Done ()
77
- for {
78
- select {
79
- case txn , ok := <- p .dbWriteChan :
80
- if ! ok {
81
- return
82
- }
83
- p .handleTransaction (txn )
84
- }
80
+ for txn := range p .dbWriteChan {
81
+ err := p .handleTransaction (txn )
82
+ p .log .Errorw ("error in handling user transaction in persistent mempool" , "err" , err )
85
83
}
86
84
}
87
85
88
86
// loadFromDB restores the in-memory transaction pool from the database
89
87
func (p * Pool ) loadFromDB () error {
90
88
return p .db .View (func (txn db.Transaction ) error {
91
- len , err := p .LenDB ()
89
+ lenDB , err := p .LenDB ()
92
90
if err != nil {
93
91
return err
94
92
}
95
- if len >= p .maxNumTxns {
93
+ if lenDB >= p .maxNumTxns {
96
94
return ErrTxnPoolFull
97
95
}
98
96
headValue := new (felt.Felt )
@@ -152,13 +150,11 @@ func (p *Pool) handleTransaction(userTxn *BroadcastedTransaction) error {
152
150
}
153
151
tailValue = nil
154
152
}
155
-
156
153
if err := p .putdbElem (dbTxn , userTxn .Transaction .Hash (), & storageElem {
157
154
Txn : * userTxn ,
158
155
}); err != nil {
159
156
return err
160
157
}
161
-
162
158
if tailValue != nil {
163
159
// Update old tail to point to the new item
164
160
var oldTailElem storageElem
@@ -176,16 +172,14 @@ func (p *Pool) handleTransaction(userTxn *BroadcastedTransaction) error {
176
172
return err
177
173
}
178
174
}
179
-
180
175
if err := p .updateTail (dbTxn , userTxn .Transaction .Hash ()); err != nil {
181
176
return err
182
177
}
183
-
184
178
pLen , err := p .lenDB (dbTxn )
185
179
if err != nil {
186
180
return err
187
181
}
188
- return p .updateLen (dbTxn , uint16 ( pLen + 1 ) )
182
+ return p .updateLen (dbTxn , pLen + 1 )
189
183
})
190
184
}
191
185
@@ -196,18 +190,17 @@ func (p *Pool) Push(userTxn *BroadcastedTransaction) error {
196
190
return err
197
191
}
198
192
199
- // todo: should db overloading block the in-memory mempool??
200
193
select {
201
194
case p .dbWriteChan <- userTxn :
202
195
default :
203
196
select {
204
197
case _ , ok := <- p .dbWriteChan :
205
198
if ! ok {
206
- return errors . New ( "transaction pool database write channel is closed" )
199
+ p . log . Errorw ( "cannot store user transasction in persistent pool, database write channel is closed" )
207
200
}
208
- return ErrTxnPoolFull
201
+ p . log . Errorw ( "cannot store user transasction in persistent pool, database is full" )
209
202
default :
210
- return ErrTxnPoolFull
203
+ p . log . Errorw ( "cannot store user transasction in persistent pool, database is full" )
211
204
}
212
205
}
213
206
@@ -232,7 +225,7 @@ func (p *Pool) Push(userTxn *BroadcastedTransaction) error {
232
225
}
233
226
234
227
func (p * Pool ) validate (userTxn * BroadcastedTransaction ) error {
235
- if p .txnList .len + 1 >= uint16 ( p .maxNumTxns ) {
228
+ if p .txnList .len + 1 >= p .maxNumTxns {
236
229
return ErrTxnPoolFull
237
230
}
238
231
@@ -246,7 +239,7 @@ func (p *Pool) validate(userTxn *BroadcastedTransaction) error {
246
239
case * core.DeclareTransaction :
247
240
nonce , err := p .state .ContractNonce (t .SenderAddress )
248
241
if err != nil {
249
- return fmt .Errorf ("validation failed, error when retrieving nonce, %v: " , err )
242
+ return fmt .Errorf ("validation failed, error when retrieving nonce, %v" , err )
250
243
}
251
244
if nonce .Cmp (t .Nonce ) > 0 {
252
245
return fmt .Errorf ("validation failed, existing nonce %s, but received nonce %s" , nonce , t .Nonce )
@@ -257,7 +250,7 @@ func (p *Pool) validate(userTxn *BroadcastedTransaction) error {
257
250
}
258
251
nonce , err := p .state .ContractNonce (t .SenderAddress )
259
252
if err != nil {
260
- return fmt .Errorf ("validation failed, error when retrieving nonce, %v: " , err )
253
+ return fmt .Errorf ("validation failed, error when retrieving nonce, %v" , err )
261
254
}
262
255
if nonce .Cmp (t .Nonce ) > 0 {
263
256
return fmt .Errorf ("validation failed, existing nonce %s, but received nonce %s" , nonce , t .Nonce )
@@ -302,12 +295,17 @@ func (p *Pool) Len() uint16 {
302
295
303
296
// Len returns the number of transactions in the persistent pool
304
297
func (p * Pool ) LenDB () (uint16 , error ) {
298
+ p .wg .Add (1 )
299
+ defer p .wg .Done ()
305
300
txn , err := p .db .NewTransaction (false )
306
301
if err != nil {
307
302
return 0 , err
308
303
}
309
- defer txn .Discard ()
310
- return p .lenDB (txn )
304
+ lenDB , err := p .lenDB (txn )
305
+ if err != nil {
306
+ return 0 , err
307
+ }
308
+ return lenDB , txn .Discard ()
311
309
}
312
310
313
311
func (p * Pool ) lenDB (txn db.Transaction ) (uint16 , error ) {
@@ -338,19 +336,6 @@ func (p *Pool) headHash(txn db.Transaction, head *felt.Felt) error {
338
336
})
339
337
}
340
338
341
- func (p * Pool ) HeadHash () (* felt.Felt , error ) {
342
- txn , err := p .db .NewTransaction (false )
343
- if err != nil {
344
- return nil , err
345
- }
346
- var head * felt.Felt
347
- err = txn .Get (Head .Key (), func (b []byte ) error {
348
- head = new (felt.Felt ).SetBytes (b )
349
- return nil
350
- })
351
- return head , err
352
- }
353
-
354
339
func (p * Pool ) updateHead (txn db.Transaction , head * felt.Felt ) error {
355
340
return txn .Set (Head .Key (), head .Marshal ())
356
341
}
@@ -366,6 +351,8 @@ func (p *Pool) updateTail(txn db.Transaction, tail *felt.Felt) error {
366
351
return txn .Set (Tail .Key (), tail .Marshal ())
367
352
}
368
353
354
+ // todo : error when unmarshalling the core.Transasction...
355
+ // but unmarshalling core.Transaction works fine in TransactionsByBlockNumber...
369
356
func (p * Pool ) dbElem (txn db.Transaction , itemKey * felt.Felt ) (storageElem , error ) {
370
357
var item storageElem
371
358
keyBytes := itemKey .Bytes ()
0 commit comments