Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
YoGhurt111 committed Oct 9, 2024
1 parent 51b885c commit a2fe08d
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 15 deletions.
2 changes: 1 addition & 1 deletion packages/taiko-client/cmd/flags/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ var (
"this flag only works post Ontake fork",
Category: proverCategory,
Value: 30 * time.Minute,
EnvVars: []string{"PROVE_MIN_PROVING_INTERVAL"},
EnvVars: []string{"PROVER_MIN_PROVING_INTERVAL"},
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type ProofSubmitter struct {
proofProducer proofProducer.ProofProducer
resultCh chan *proofProducer.ProofWithHeader
batchResultCh chan *proofProducer.BatchProofs
aggregationNotify chan struct{}
aggregationNotify chan uint16
anchorValidator *validator.AnchorTxValidator
txBuilder *transaction.ProveBlockTxBuilder
sender *transaction.Sender
Expand All @@ -63,7 +63,7 @@ func NewProofSubmitter(
proofProducer proofProducer.ProofProducer,
resultCh chan *proofProducer.ProofWithHeader,
batchResultCh chan *proofProducer.BatchProofs,
aggregationNotify chan struct{},
aggregationNotify chan uint16,
proverSetAddress common.Address,
taikoL2Address common.Address,
graffiti string,
Expand Down Expand Up @@ -207,7 +207,7 @@ func (s *ProofSubmitter) RequestProof(ctx context.Context, meta metadata.TaikoBl
"bufferSize", bufferSize,
)
if s.proofBuffer.MaxLength == uint64(bufferSize) {
s.aggregationNotify <- struct{}{}
s.aggregationNotify <- s.Tier()
}
} else {
s.resultCh <- result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ type ProofSubmitterTestSuite struct {
proposer *proposer.Proposer
proofCh chan *producer.ProofWithHeader
batchProofGenerationCh chan *producer.BatchProofs
aggregationNotify chan struct{}
aggregationNotify chan uint16
}

func (s *ProofSubmitterTestSuite) SetupTest() {
s.ClientTestSuite.SetupTest()

s.proofCh = make(chan *producer.ProofWithHeader, 1024)
s.batchProofGenerationCh = make(chan *producer.BatchProofs, 1024)
s.aggregationNotify = make(chan struct{}, 1)
s.aggregationNotify = make(chan uint16, 1)

builder := transaction.NewProveBlockTxBuilder(
s.RPCClient,
Expand Down
21 changes: 12 additions & 9 deletions packages/taiko-client/prover/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Prover struct {

assignmentExpiredCh chan metadata.TaikoBlockMetaData
proveNotify chan struct{}
aggregationNotify chan struct{}
aggregationNotify chan uint16

// Proof related channels
proofSubmissionCh chan *proofProducer.ProofRequestBody
Expand Down Expand Up @@ -133,7 +133,7 @@ func InitFromConfig(
p.proofSubmissionCh = make(chan *proofProducer.ProofRequestBody, p.cfg.Capacity)
p.proofContestCh = make(chan *proofProducer.ContestRequestBody, p.cfg.Capacity)
p.proveNotify = make(chan struct{}, 1)
p.aggregationNotify = make(chan struct{}, 1)
p.aggregationNotify = make(chan uint16, 1)

if err := p.initL1Current(cfg.StartingBlockID); err != nil {
return fmt.Errorf("initialize L1 current cursor error: %w", err)
Expand Down Expand Up @@ -278,7 +278,7 @@ func (p *Prover) eventLoop() {
// if we are already aggregating.
reqAggregation := func() {
select {
case p.aggregationNotify <- struct{}{}:
case p.aggregationNotify <- 0:
default:
}
}
Expand Down Expand Up @@ -340,8 +340,8 @@ func (p *Prover) eventLoop() {
if err := p.proveOp(); err != nil {
log.Error("Prove new blocks error", "error", err)
}
case <-p.aggregationNotify:
if err := p.aggregateOp(); err != nil {
case tier := <-p.aggregationNotify:
if err := p.aggregateOp(tier); err != nil {
log.Error("Aggregate proofs error", "error", err)
}
case e := <-blockVerifiedCh:
Expand Down Expand Up @@ -412,26 +412,29 @@ func (p *Prover) proveOp() error {
}

// aggregateOp aggregates all proofs in buffer.
func (p *Prover) aggregateOp() error {
func (p *Prover) aggregateOp(tier uint16) error {
var wg sync.WaitGroup
for _, submitter := range p.proofSubmitters {
wg.Add(1)
go func(s proofSubmitter.Submitter) {
defer wg.Done()
if s.BufferSize() > 1 {
if s.BufferSize() > 1 &&
(tier == 0 || s.Tier() == tier) {
if err := s.AggregateProofs(p.ctx); err != nil {
log.Error("Failed to aggregate proofs",
"error", err,
"tier", s.Tier(),
)
}
} else {
log.Debug("Skip this aggregateOp since low buffer size",
"tier", s.Tier(),
log.Debug("Skip this aggregateOp",
"tier", tier,
"bufferSize", s.BufferSize(),
)
}
}(submitter)
}

return nil
}

Expand Down

0 comments on commit a2fe08d

Please sign in to comment.