@@ -3,20 +3,25 @@ package aggregator
3
3
import (
4
4
"context"
5
5
"encoding/json"
6
+ "fmt"
6
7
"net"
7
8
"net/http"
8
9
"os"
10
+ "strings"
9
11
"sync"
10
12
"time"
11
13
12
- "github.com/Layr-Labs/eigensdk-go/chainio/clients"
13
- "github.com/Layr-Labs/eigensdk-go/services/avsregistry"
14
- blsagg "github.com/Layr-Labs/eigensdk-go/services/bls_aggregation"
15
- "github.com/Layr-Labs/eigensdk-go/types"
16
14
"github.com/automata-network/multi-prover-avs/contracts/bindings"
17
15
"github.com/automata-network/multi-prover-avs/contracts/bindings/MultiProverServiceManager"
16
+ "github.com/automata-network/multi-prover-avs/contracts/bindings/RegistryCoordinator"
18
17
"github.com/automata-network/multi-prover-avs/contracts/bindings/TEELivenessVerifier"
19
18
"github.com/automata-network/multi-prover-avs/utils"
19
+ "github.com/automata-network/multi-prover-avs/xmetric"
20
+ "github.com/automata-network/multi-prover-avs/xtask"
21
+
22
+ "github.com/Layr-Labs/eigensdk-go/chainio/clients"
23
+ "github.com/Layr-Labs/eigensdk-go/services/avsregistry"
24
+ "github.com/Layr-Labs/eigensdk-go/types"
20
25
"github.com/chzyer/logex"
21
26
"github.com/ethereum/go-ethereum/accounts/abi/bind"
22
27
"github.com/ethereum/go-ethereum/common"
@@ -29,36 +34,48 @@ type Config struct {
29
34
ListenAddr string
30
35
TimeToExpirySecs int
31
36
32
- EcdsaPrivateKey string
33
- EthHttpEndpoint string
34
- EthWsEndpoint string
35
- AttestationLayerRpcURL string
36
- MultiProverContractAddress common.Address
37
- TEELivenessVerifierContractAddress common.Address
37
+ EcdsaPrivateKey string
38
+ EthHttpEndpoint string
39
+ EthWsEndpoint string
40
+ AttestationLayerRpcURL string
41
+ MultiProverContractAddress common.Address
42
+ TEELivenessVerifierContractAddressV1 common.Address
43
+ TEELivenessVerifierContractAddress common.Address
38
44
39
45
AVSRegistryCoordinatorAddress common.Address
40
46
OperatorStateRetrieverAddress common.Address
41
47
EigenMetricsIpPortAddress string
42
48
ScanStartBlock uint64
43
49
Threshold uint64
50
+ Sampling uint64
51
+
52
+ OpenTelemetry * xmetric.OpenTelemetryConfig
53
+
54
+ TaskFetcher []* xtask.TaskManagerConfig
44
55
45
56
Simulation bool
46
57
}
47
58
48
59
type Aggregator struct {
49
60
cfg * Config
50
61
51
- blsAggregationService blsagg. BlsAggregationService
62
+ blsAggregationService BlsAggregationService
52
63
transactOpt * bind.TransactOpts
53
64
65
+ TaskManager * xtask.TaskManager
66
+
54
67
client * ethclient.Client
55
68
56
- multiProverContract * MultiProverServiceManager.MultiProverServiceManager
57
- TEELivenessVerifier * TEELivenessVerifier.TEELivenessVerifierCaller
58
- registry * avsregistry.AvsRegistryServiceChainCaller
69
+ multiProverContract * MultiProverServiceManager.MultiProverServiceManager
70
+ TEELivenessVerifierV1 * TEELivenessVerifier.TEELivenessVerifierCaller
71
+ TEELivenessVerifierV2 * TEELivenessVerifier.TEELivenessVerifierCaller
72
+ registry * avsregistry.AvsRegistryServiceChainCaller
73
+ registryCoordinator * RegistryCoordinator.RegistryCoordinator
59
74
60
75
eigenClients * clients.Clients
61
76
77
+ Collector * xmetric.AggregatorCollector
78
+
62
79
taskMutex sync.Mutex
63
80
taskIndexSeq uint32
64
81
taskIndexMap map [types.TaskResponseDigest ]* Task
@@ -70,21 +87,25 @@ type Task struct {
70
87
}
71
88
72
89
func NewAggregator (ctx context.Context , cfg * Config ) (* Aggregator , error ) {
90
+ if cfg .Sampling == 0 {
91
+ cfg .Sampling = 2000
92
+ }
93
+ logex .Info ("Multi Prover Aggregator Initializing..." )
73
94
ecdsaPrivateKey , err := crypto .HexToECDSA (cfg .EcdsaPrivateKey )
74
95
if err != nil {
75
96
return nil , logex .Trace (err )
76
97
}
77
98
client , err := ethclient .Dial (cfg .EthHttpEndpoint )
78
99
if err != nil {
79
- return nil , logex .Trace (err )
100
+ return nil , logex .Trace (err , fmt . Sprintf ( "dial:%q" , cfg . EthHttpEndpoint ) )
80
101
}
81
102
attestationClient , err := ethclient .Dial (cfg .AttestationLayerRpcURL )
82
103
if err != nil {
83
- return nil , logex .Trace (err , cfg .AttestationLayerRpcURL )
104
+ return nil , logex .Trace (err , fmt . Sprintf ( "connecting to AttestationLayerRpcURL:%q" , cfg .AttestationLayerRpcURL ) )
84
105
}
85
106
chainId , err := client .ChainID (ctx )
86
107
if err != nil {
87
- return nil , logex .Trace (err )
108
+ return nil , logex .Trace (err , "fetch chainID" )
88
109
}
89
110
transactOpt , err := bind .NewKeyedTransactorWithChainID (ecdsaPrivateKey , chainId )
90
111
if err != nil {
@@ -106,18 +127,38 @@ func NewAggregator(ctx context.Context, cfg *Config) (*Aggregator, error) {
106
127
return nil , logex .Trace (err )
107
128
}
108
129
109
- operatorPubkeysService , err := NewOperatorPubkeysService ( ctx , client , eigenClients . AvsRegistryChainSubscriber , eigenClients . AvsRegistryChainReader , logger , "" , cfg .ScanStartBlock , 5000 )
130
+ multiProverContract , err := MultiProverServiceManager . NewMultiProverServiceManager ( cfg .MultiProverContractAddress , client )
110
131
if err != nil {
111
132
return nil , logex .Trace (err )
112
133
}
113
- avsRegistryService := avsregistry .NewAvsRegistryServiceChainCaller (eigenClients .AvsRegistryChainReader , operatorPubkeysService , logger )
114
- blsAggregationService := blsagg .NewBlsAggregatorService (avsRegistryService , logger )
134
+ teeLivenessVerifier , err := TEELivenessVerifier .NewTEELivenessVerifierCaller (cfg .TEELivenessVerifierContractAddress , attestationClient )
135
+ if err != nil {
136
+ return nil , logex .Trace (err )
137
+ }
138
+ var teeLivenessVerifierV1 * TEELivenessVerifier.TEELivenessVerifierCaller
139
+ var emptyAddr common.Address
140
+ if cfg .TEELivenessVerifierContractAddressV1 != emptyAddr {
141
+ teeLivenessVerifierV1 , err = TEELivenessVerifier .NewTEELivenessVerifierCaller (cfg .TEELivenessVerifierContractAddressV1 , attestationClient )
142
+ if err != nil {
143
+ return nil , logex .Trace (err )
144
+ }
145
+ }
115
146
116
- multiProverContract , err := MultiProverServiceManager .NewMultiProverServiceManager (cfg .MultiProverContractAddress , client )
147
+ collector := xmetric .NewAggregatorCollector ("avs" )
148
+
149
+ taskManager , err := xtask .NewTaskManager (collector , int64 (cfg .Sampling ), eigenClients .EthHttpClient , cfg .TaskFetcher )
150
+ if err != nil {
151
+ return nil , logex .Trace (err )
152
+ }
153
+
154
+ operatorPubkeysService , err := NewOperatorPubkeysService (ctx , client , eigenClients .AvsRegistryChainSubscriber , eigenClients .AvsRegistryChainReader , logger , "" , cfg .ScanStartBlock , 5000 )
117
155
if err != nil {
118
156
return nil , logex .Trace (err )
119
157
}
120
- TEELivenessVerifier , err := TEELivenessVerifier .NewTEELivenessVerifierCaller (cfg .TEELivenessVerifierContractAddress , attestationClient )
158
+ avsRegistryService := avsregistry .NewAvsRegistryServiceChainCaller (eigenClients .AvsRegistryChainReader , operatorPubkeysService , logger )
159
+ blsAggregationService := NewBlsAggregatorService (avsRegistryService , logger )
160
+
161
+ registryCoordinator , err := RegistryCoordinator .NewRegistryCoordinator (cfg .AVSRegistryCoordinatorAddress , client )
121
162
if err != nil {
122
163
return nil , logex .Trace (err )
123
164
}
@@ -126,14 +167,75 @@ func NewAggregator(ctx context.Context, cfg *Config) (*Aggregator, error) {
126
167
cfg : cfg ,
127
168
transactOpt : transactOpt ,
128
169
client : client ,
170
+ eigenClients : eigenClients ,
129
171
blsAggregationService : blsAggregationService ,
130
172
multiProverContract : multiProverContract ,
131
- TEELivenessVerifier : TEELivenessVerifier ,
173
+ TEELivenessVerifierV1 : teeLivenessVerifierV1 ,
174
+ TEELivenessVerifierV2 : teeLivenessVerifier ,
175
+ registryCoordinator : registryCoordinator ,
132
176
registry : avsRegistryService ,
177
+ TaskManager : taskManager ,
133
178
taskIndexMap : make (map [types.Bytes32 ]* Task ),
179
+ Collector : collector ,
134
180
}, nil
135
181
}
136
182
183
+ func (agg * Aggregator ) startUpdateOperators (ctx context.Context ) (func () error , error ) {
184
+ quorumNums := types.QuorumNums {0 }
185
+ blockNumber , err := agg .client .BlockNumber (ctx )
186
+ if err != nil {
187
+ return nil , logex .Trace (err )
188
+ }
189
+ states , err := agg .registry .GetOperatorsAvsStateAtBlock (ctx , quorumNums , uint32 (blockNumber ))
190
+ if err != nil {
191
+ return nil , logex .Trace (err )
192
+ }
193
+ var operators []common.Address
194
+ for k := range states {
195
+ operatorAddr , err := agg .eigenClients .AvsRegistryChainReader .GetOperatorFromId (nil , k )
196
+ if err != nil {
197
+ return nil , logex .Trace (err )
198
+ }
199
+ isRegistered , err := agg .eigenClients .AvsRegistryChainReader .IsOperatorRegistered (nil , operatorAddr )
200
+ if err != nil {
201
+ return nil , logex .Trace (err )
202
+ }
203
+ if isRegistered {
204
+ operators = append (operators , operatorAddr )
205
+ }
206
+ }
207
+
208
+ newOpt := * agg .transactOpt
209
+ newOpt .NoSend = true
210
+ for i := 1 ; i < len (operators ); i ++ {
211
+ tx , err := agg .registryCoordinator .UpdateOperators (& newOpt , operators [:i ])
212
+ if err != nil {
213
+ return nil , logex .Trace (err )
214
+ }
215
+ logex .Infof ("tx hash: %v -> %v" , i , tx .Gas ())
216
+ }
217
+ // logex.Info(states)
218
+ return func () error { return nil }, nil
219
+ }
220
+
221
+ func (agg * Aggregator ) verifyKey (x [32 ]byte , y [32 ]byte ) (bool , error ) {
222
+ if agg .TEELivenessVerifierV1 != nil {
223
+ pass , err := agg .TEELivenessVerifierV1 .VerifyLivenessProof (nil , x , y )
224
+ if err != nil {
225
+ return false , logex .Trace (err , "v1" )
226
+ }
227
+ if pass {
228
+ return true , nil
229
+ }
230
+ }
231
+
232
+ pass , err := agg .TEELivenessVerifierV2 .VerifyLivenessProof (nil , x , y )
233
+ if err != nil {
234
+ return false , logex .Trace (err , "v2" )
235
+ }
236
+ return pass , nil
237
+ }
238
+
137
239
func (agg * Aggregator ) startRpcServer (ctx context.Context ) (func () error , error ) {
138
240
rpcSvr := rpc .NewServer ()
139
241
api := & AggregatorApi {
@@ -161,26 +263,42 @@ func (agg *Aggregator) startRpcServer(ctx context.Context) (func() error, error)
161
263
}
162
264
163
265
func (agg * Aggregator ) Start (ctx context.Context ) error {
164
- isSimulation , err := agg .TEELivenessVerifier .Simulation (nil )
165
- if err != nil {
166
- return logex .Trace (err )
167
- }
168
- if isSimulation != agg .cfg .Simulation {
169
- return logex .NewErrorf ("simulation mode not match with the contract: local:%v, remote:%v" , agg .cfg .Simulation , isSimulation )
170
- }
266
+ // serveUpdateTask, err := agg.startUpdateOperators(context.Background())
267
+ // if err != nil {
268
+ // return logex.Trace(err)
269
+ // }
270
+ // serveUpdateTask()
171
271
172
272
serveHttp , err := agg .startRpcServer (ctx )
173
273
if err != nil {
174
274
return logex .Trace (err )
175
275
}
176
276
177
277
errChan := make (chan error )
278
+ go func () {
279
+ if err := xmetric .ExportMetricToOpenTelemetry (agg .cfg .OpenTelemetry , agg .Collector ); err != nil {
280
+ errChan <- logex .Trace (err )
281
+ }
282
+ }()
283
+
284
+ go func () {
285
+ if err := agg .Collector .Serve (agg .cfg .EigenMetricsIpPortAddress ); err != nil {
286
+ errChan <- logex .Trace (err )
287
+ }
288
+ }()
289
+
178
290
go func () {
179
291
if err := serveHttp (); err != nil {
180
292
errChan <- logex .Trace (err )
181
293
}
182
294
}()
183
295
296
+ go func () {
297
+ if err := agg .TaskManager .Run (ctx ); err != nil {
298
+ errChan <- logex .Trace (err )
299
+ }
300
+ }()
301
+
184
302
for {
185
303
select {
186
304
case response := <- agg .blsAggregationService .GetResponseChannel ():
@@ -205,8 +323,8 @@ func (agg *Aggregator) submitStateHeader(ctx context.Context, req *TaskRequest)
205
323
return logex .Trace (err )
206
324
}
207
325
if md .BatchId > 0 {
208
- if md .BatchId % 2000 != 0 {
209
- logex .Info ("[scroll] skip task: %#v" , md )
326
+ if md .BatchId % agg . cfg . Sampling != 0 {
327
+ logex .Infof ("[scroll] skip task: %#v" , md )
210
328
return nil
211
329
}
212
330
}
@@ -245,12 +363,14 @@ func (agg *Aggregator) submitStateHeader(ctx context.Context, req *TaskRequest)
245
363
}
246
364
247
365
if err := agg .blsAggregationService .ProcessNewSignature (ctx , task .index , digest , req .Signature , req .OperatorId ); err != nil {
248
- return logex .Trace (err )
366
+ if ! strings .Contains (err .Error (), "already completed" ) {
367
+ return logex .Trace (err )
368
+ }
249
369
}
250
370
return nil
251
371
}
252
372
253
- func (agg * Aggregator ) sendAggregatedResponseToContract (ctx context.Context , task * Task , blsAggServiceResp blsagg. BlsAggregationServiceResponse ) error {
373
+ func (agg * Aggregator ) sendAggregatedResponseToContract (ctx context.Context , task * Task , blsAggServiceResp BlsAggregationServiceResponse ) error {
254
374
if blsAggServiceResp .Err != nil {
255
375
return logex .Trace (blsAggServiceResp .Err )
256
376
}
@@ -287,6 +407,7 @@ func (agg *Aggregator) sendAggregatedResponseToContract(ctx context.Context, tas
287
407
select {
288
408
case <- ctx .Done ():
289
409
logex .Error (ctx .Err ())
410
+ return
290
411
default :
291
412
receipt , _ := agg .client .TransactionReceipt (ctx , tx .Hash ())
292
413
if receipt != nil {
0 commit comments