Skip to content

Commit

Permalink
Merge pull request #20 from automata-network/dev-v0.3
Browse files Browse the repository at this point in the history
release v0.3.0 (holesky)
  • Loading branch information
chzyer authored Jul 11, 2024
2 parents 4dda178 + 530016c commit 3a26ded
Show file tree
Hide file tree
Showing 20 changed files with 1,018 additions and 334 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ docs/
config/
.DS_Store
/bin
contracts/script/output/avs_deploy_localhost.json
contracts/script/output/tee_deploy_output_localhost.json
scripts/deploy.sh
31 changes: 25 additions & 6 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
type Config struct {
ListenAddr string
TimeToExpirySecs int
MinWaitSecs int

EcdsaPrivateKey string
EthHttpEndpoint string
Expand All @@ -49,13 +50,29 @@ type Config struct {
Threshold uint64
Sampling uint64

GenTaskSampling uint64
ExecTaskSampling uint64

OpenTelemetry *xmetric.OpenTelemetryConfig

TaskFetcher []*xtask.TaskManagerConfig

Simulation bool
}

func (cfg *Config) Init() error {
if cfg.Sampling == 0 {
cfg.Sampling = 2000
}
if cfg.ExecTaskSampling == 0 {
cfg.ExecTaskSampling = cfg.Sampling
}
if cfg.GenTaskSampling == 0 {
cfg.GenTaskSampling = cfg.Sampling
}
return nil
}

type Aggregator struct {
cfg *Config

Expand Down Expand Up @@ -89,9 +106,10 @@ type Task struct {
}

func NewAggregator(ctx context.Context, cfg *Config) (*Aggregator, error) {
if cfg.Sampling == 0 {
cfg.Sampling = 2000
if err := cfg.Init(); err != nil {
return nil, logex.Trace(err)
}

logex.Info("Multi Prover Aggregator Initializing...")
ecdsaPrivateKey, err := crypto.HexToECDSA(cfg.EcdsaPrivateKey)
if err != nil {
Expand Down Expand Up @@ -148,7 +166,7 @@ func NewAggregator(ctx context.Context, cfg *Config) (*Aggregator, error) {

collector := xmetric.NewAggregatorCollector("avs")

taskManager, err := xtask.NewTaskManager(collector, int64(cfg.Sampling), eigenClients.EthHttpClient, cfg.TaskFetcher)
taskManager, err := xtask.NewTaskManager(collector, int64(cfg.GenTaskSampling), eigenClients.EthHttpClient, cfg.TaskFetcher)
if err != nil {
return nil, logex.Trace(err)
}
Expand Down Expand Up @@ -321,7 +339,7 @@ func (agg *Aggregator) submitStateHeader(ctx context.Context, req *TaskRequest)
return logex.Trace(err)
}
if md.BatchId > 0 {
if md.BatchId%agg.cfg.Sampling != 0 {
if md.BatchId%agg.cfg.ExecTaskSampling != 0 {
logex.Infof("[scroll] skip task: %#v", md)
return nil
}
Expand All @@ -340,6 +358,7 @@ func (agg *Aggregator) submitStateHeader(ctx context.Context, req *TaskRequest)
}
req.Task.QuorumThresholdPercentages = types.QuorumThresholdPercentages(quorumThresholdPercentages).UnderlyingType()
timeToExpiry := time.Duration(agg.cfg.TimeToExpirySecs) * time.Second
minWait := time.Duration(agg.cfg.MinWaitSecs) * time.Second

agg.taskMutex.Lock()
task, ok := agg.taskIndexMap[digest]
Expand All @@ -351,7 +370,7 @@ func (agg *Aggregator) submitStateHeader(ctx context.Context, req *TaskRequest)
agg.taskIndexMap[digest] = task
agg.taskIndexSeq += 1

err = agg.blsAggregationService.InitializeNewTask(task.index, req.Task.ReferenceBlockNumber, quorumNumbers, quorumThresholdPercentages, timeToExpiry)
err = agg.blsAggregationService.InitializeNewTask(ctx, task.index, req.Task.ReferenceBlockNumber, quorumNumbers, quorumThresholdPercentages, minWait, timeToExpiry)
}
agg.taskMutex.Unlock()

Expand All @@ -367,7 +386,7 @@ func (agg *Aggregator) submitStateHeader(ctx context.Context, req *TaskRequest)
return nil
}

func (agg *Aggregator) sendAggregatedResponseToContract(ctx context.Context, task *Task, blsAggServiceResp BlsAggregationServiceResponse) error {
func (agg *Aggregator) sendAggregatedResponseToContract(ctx context.Context, task *Task, blsAggServiceResp *BlsAggregationServiceResponse) error {
if blsAggServiceResp.Err != nil {
return logex.Trace(blsAggServiceResp.Err)
}
Expand Down
2 changes: 2 additions & 0 deletions aggregator/aggregator_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (a *AggregatorApi) fetchTask(ctx context.Context, req *FetchTaskReq) (*Fetc
func (a *AggregatorApi) SubmitTask(ctx context.Context, req *TaskRequest) error {
defer func() {
if err := recover(); err != nil {
logex.Pretty(req)
logex.Error(err, string(debug.Stack()))
panic(err)
}
Expand All @@ -108,6 +109,7 @@ func (a *AggregatorApi) submitTask(ctx context.Context, req *TaskRequest) error
if err != nil {
return logex.Trace(err, taskCtx)
}
taskCtx = append(taskCtx, fmt.Sprintf("digest=%x", digest))

operatorAddr, err := bindings.GetOperatorAddrByOperatorID(a.agg.client, a.agg.registryCoordinator, req.OperatorId)
if err != nil {
Expand Down
Loading

0 comments on commit 3a26ded

Please sign in to comment.