@@ -100,6 +100,21 @@ var eventFilterConfigMap = map[string]map[string]filterConfig{
100
100
},
101
101
}
102
102
103
+ // Map of event name to offchain attribute to its subkey index for querying the LogPoller
104
+ // Corresponds to the indexed fields in the eventFilterConfigMap above
105
+ var eventFilterSubkeyIndexMap = map [string ]map [string ]uint64 {
106
+ consts .EventNameCCIPMessageSent : {
107
+ consts .EventAttributeSourceChain : 0 ,
108
+ consts .EventAttributeDestChain : 1 ,
109
+ consts .EventAttributeSequenceNumber : 2 ,
110
+ },
111
+ consts .EventNameExecutionStateChanged : {
112
+ consts .EventAttributeSourceChain : 0 ,
113
+ consts .EventAttributeSequenceNumber : 1 ,
114
+ consts .EventAttributeState : 2 ,
115
+ },
116
+ }
117
+
103
118
// bindContractEvent binds contract events to the logpoller for monitoring blockchain events.
104
119
// This operation is idempotent - if the same address exists, it performs no operation;
105
120
// if the address is changed, it updates to the new address, overwriting the existing one;
@@ -457,7 +472,7 @@ func (a *SolanaAccessor) processPriceUpdates(priceUpdates ccip_offramp.PriceUpda
457
472
return updates , nil
458
473
}
459
474
460
- func createExecutedMessagesKeyFilter (rangesPerChain map [ccipocr3.ChainSelector ][]ccipocr3.SeqNumRange ) (query.KeyFilter , uint64 ) {
475
+ func createExecutedMessagesKeyFilter (rangesPerChain map [ccipocr3.ChainSelector ][]ccipocr3.SeqNumRange ) (query.KeyFilter , uint64 , error ) {
461
476
var chainExpressions []query.Expression
462
477
var countSqNrs uint64
463
478
// final query should look like
@@ -492,20 +507,30 @@ func createExecutedMessagesKeyFilter(rangesPerChain map[ccipocr3.ChainSelector][
492
507
}
493
508
extendedQuery := query .Or (chainExpressions ... )
494
509
510
+ attributeIndexes , ok := eventFilterSubkeyIndexMap [consts .EventNameExecutionStateChanged ]
511
+ if ! ok {
512
+ return query.KeyFilter {}, 0 , fmt .Errorf ("failed to find attribute indexes for event %s" , consts .EventNameExecutionStateChanged )
513
+ }
514
+ stateAttributeIndex , ok := attributeIndexes [consts .EventAttributeState ]
515
+ if ! ok {
516
+ return query.KeyFilter {}, 0 , fmt .Errorf ("failed to find index for attribute %s for event %s" , consts .EventAttributeState , consts .EventNameExecutionStateChanged )
517
+ }
518
+ // We don't need to wait for an execute state changed event to be finalized
519
+ // before we optimistically mark a message as executed.
520
+ subKeyFilter , err := logpoller .NewEventBySubKeyFilter (stateAttributeIndex , []primitives.ValueComparator {{Value : 0 , Operator : primitives .Gt }},)
521
+ if err != nil {
522
+ return query.KeyFilter {}, 0 , fmt .Errorf ("failed to build event sub key filter for state attribute: %w" , err )
523
+ }
524
+
495
525
keyFilter := query.KeyFilter {
496
526
Key : consts .EventNameExecutionStateChanged ,
497
527
Expressions : []query.Expression {
498
528
extendedQuery ,
499
- // We don't need to wait for an execute state changed event to be finalized
500
- // before we optimistically mark a message as executed.
501
- query .Comparator (consts .EventAttributeState , primitives.ValueComparator {
502
- Value : 0 , // > 0 corresponds to: IN_PROGRESS, SUCCESS, FAILURE
503
- Operator : primitives .Gt ,
504
- }),
529
+ subKeyFilter ,
505
530
query .Confidence (primitives .Finalized ),
506
531
},
507
532
}
508
- return keyFilter , countSqNrs
533
+ return keyFilter , countSqNrs , nil
509
534
}
510
535
511
536
func (a * SolanaAccessor ) processExecutionStateChangesEvents (logs []logpollertypes.Log , nonEmptyRangesPerChain map [ccipocr3.ChainSelector ][]ccipocr3.SeqNumRange ) (map [ccipocr3.ChainSelector ][]ccipocr3.SeqNum , error ) {
0 commit comments