Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
YoGhurt111 committed Sep 30, 2024
1 parent 56db4ce commit 04250f9
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 53 deletions.
2 changes: 1 addition & 1 deletion packages/taiko-client/cmd/flags/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ var (
}
ProveInterval = &cli.DurationFlag{
Name: "prover.minProvingInterval",
Usage: "Time interval to prove blocks when the number of pending proof do not exceed prover.batchSize, " +
Usage: "Time interval to prove blocks even the number of pending proof do not exceed prover.batchSize, " +
"this flag only works post Ontake fork",
Category: proverCategory,
Value: 30 * time.Minute,
Expand Down
13 changes: 5 additions & 8 deletions packages/taiko-client/prover/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,10 @@ func (p *Prover) initProofSubmitters(
) error {
for _, tier := range p.sharedState.GetTiers() {
var (
producer proofProducer.ProofProducer
submitter proofSubmitter.Submitter
err error
bufferSize = p.cfg.ProofBufferSize
proveInterval = p.cfg.ProveInterval
producer proofProducer.ProofProducer
submitter proofSubmitter.Submitter
err error
bufferSize = p.cfg.ProofBufferSize
)
switch tier.ID {
case encoding.TierOptimisticID:
Expand Down Expand Up @@ -136,11 +135,9 @@ func (p *Prover) initProofSubmitters(
case encoding.TierGuardianMinorityID:
producer = proofProducer.NewGuardianProofProducer(encoding.TierGuardianMinorityID, p.cfg.EnableLivenessBondProof)
bufferSize = 0
proveInterval = 0
case encoding.TierGuardianMajorityID:
producer = proofProducer.NewGuardianProofProducer(encoding.TierGuardianMajorityID, p.cfg.EnableLivenessBondProof)
bufferSize = 0
proveInterval = 0
default:
return fmt.Errorf("unsupported tier: %d", tier.ID)
}
Expand All @@ -150,6 +147,7 @@ func (p *Prover) initProofSubmitters(
producer,
p.proofGenerationCh,
p.batchProofGenerationCh,
p.aggregationNotify,
p.cfg.ProverSetAddress,
p.cfg.TaikoL2Address,
p.cfg.Graffiti,
Expand All @@ -161,7 +159,6 @@ func (p *Prover) initProofSubmitters(
p.IsGuardianProver(),
p.cfg.GuardianProofSubmissionDelay,
bufferSize,
proveInterval,
); err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (s *SGXProofProducer) requestBatchProof(
defer cancel()

blocks := make([][2]*big.Int, len(blockIDs))
for i, _ := range blockIDs {
for i := range blockIDs {
blocks[i][0] = blockIDs[i]
}
reqBody := RaikoRequestProofBodyV3{
Expand Down
1 change: 1 addition & 0 deletions packages/taiko-client/prover/proof_submitter/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Submitter interface {
RequestProof(ctx context.Context, meta metadata.TaikoBlockMetaData) error
SubmitProof(ctx context.Context, proofWithHeader *proofProducer.ProofWithHeader) error
BatchSubmitProofs(ctx context.Context, proofsWithHeaders *proofProducer.BatchProofs) error
AggregateProofs(ctx context.Context) error
Producer() proofProducer.ProofProducer
Tier() uint16
}
Expand Down
77 changes: 37 additions & 40 deletions packages/taiko-client/prover/proof_submitter/proof_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,24 @@ var (
// ProofSubmitter is responsible requesting proofs for the given L2
// blocks, and submitting the generated proofs to the TaikoL1 smart contract.
type ProofSubmitter struct {
rpc *rpc.Client
proofProducer proofProducer.ProofProducer
resultCh chan *proofProducer.ProofWithHeader
batchResultCh chan *proofProducer.BatchProofs
anchorValidator *validator.AnchorTxValidator
txBuilder *transaction.ProveBlockTxBuilder
sender *transaction.Sender
proverAddress common.Address
proverSetAddress common.Address
taikoL2Address common.Address
graffiti [32]byte
tiers []*rpc.TierProviderTierWithID
rpc *rpc.Client
proofProducer proofProducer.ProofProducer
resultCh chan *proofProducer.ProofWithHeader
batchResultCh chan *proofProducer.BatchProofs
aggregationNotify chan struct{}
anchorValidator *validator.AnchorTxValidator
txBuilder *transaction.ProveBlockTxBuilder
sender *transaction.Sender
proverAddress common.Address
proverSetAddress common.Address
taikoL2Address common.Address
graffiti [32]byte
tiers []*rpc.TierProviderTierWithID
// Guardian prover related.
isGuardian bool
submissionDelay time.Duration
// Batch proof related
proofBuffer *ProofBuffer
proveInterval time.Duration
proofBuffer *ProofBuffer
}

// NewProofSubmitter creates a new ProofSubmitter instance.
Expand All @@ -63,6 +63,7 @@ func NewProofSubmitter(
proofProducer proofProducer.ProofProducer,
resultCh chan *proofProducer.ProofWithHeader,
batchResultCh chan *proofProducer.BatchProofs,
aggregationNotify chan struct{},
proverSetAddress common.Address,
taikoL2Address common.Address,
graffiti string,
Expand All @@ -74,30 +75,29 @@ func NewProofSubmitter(
isGuardian bool,
submissionDelay time.Duration,
proofBufferSize uint64,
proveInterval time.Duration,
) (*ProofSubmitter, error) {
anchorValidator, err := validator.New(taikoL2Address, rpcClient.L2.ChainID, rpcClient)
if err != nil {
return nil, err
}

return &ProofSubmitter{
rpc: rpcClient,
proofProducer: proofProducer,
resultCh: resultCh,
batchResultCh: batchResultCh,
anchorValidator: anchorValidator,
txBuilder: builder,
sender: transaction.NewSender(rpcClient, txmgr, privateTxmgr, proverSetAddress, gasLimit),
proverAddress: txmgr.From(),
proverSetAddress: proverSetAddress,
taikoL2Address: taikoL2Address,
graffiti: rpc.StringToBytes32(graffiti),
tiers: tiers,
isGuardian: isGuardian,
submissionDelay: submissionDelay,
proofBuffer: NewProofBuffer(proofBufferSize),
proveInterval: proveInterval,
rpc: rpcClient,
proofProducer: proofProducer,
resultCh: resultCh,
batchResultCh: batchResultCh,
aggregationNotify: aggregationNotify,
anchorValidator: anchorValidator,
txBuilder: builder,
sender: transaction.NewSender(rpcClient, txmgr, privateTxmgr, proverSetAddress, gasLimit),
proverAddress: txmgr.From(),
proverSetAddress: proverSetAddress,
taikoL2Address: taikoL2Address,
graffiti: rpc.StringToBytes32(graffiti),
tiers: tiers,
isGuardian: isGuardian,
submissionDelay: submissionDelay,
proofBuffer: NewProofBuffer(proofBufferSize),
}, nil
}

Expand Down Expand Up @@ -159,11 +159,6 @@ func (s *ProofSubmitter) RequestProof(ctx context.Context, meta metadata.TaikoBl
log.Error("Failed to request proof, context is canceled", "blockID", opts.BlockID, "error", ctx.Err())
return nil
}
if int(s.proofBuffer.MaxLength) == s.proofBuffer.Len() {
if err = s.AggregateProofs(ctx); err != nil {
return fmt.Errorf("failed to aggregate proof : %w", err)
}
}
// Check if there is a need to generate proof
proofStatus, err := rpc.GetBlockProofStatus(
ctx,
Expand Down Expand Up @@ -198,7 +193,6 @@ func (s *ProofSubmitter) RequestProof(ctx context.Context, meta metadata.TaikoBl
}
return fmt.Errorf("failed to request proof (id: %d): %w", meta.GetBlockID(), err)
}
metrics.ProverQueuedProofCounter.Add(1)
if meta.IsOntakeBlock() && s.proofBuffer.MaxLength > 1 {
bufferSize, err := s.proofBuffer.Write(result)
if err != nil {
Expand All @@ -213,13 +207,12 @@ func (s *ProofSubmitter) RequestProof(ctx context.Context, meta metadata.TaikoBl
"bufferSize", bufferSize,
)
if s.proofBuffer.MaxLength == uint64(bufferSize) {
if err = s.AggregateProofs(ctx); err != nil {
log.Error("failed to aggregate proof", "error", err)
}
s.aggregationNotify <- struct{}{}
}
} else {
s.resultCh <- result
}
metrics.ProverQueuedProofCounter.Add(1)
return nil
},
backoff.WithContext(backoff.NewConstantBackOff(proofPollingInterval), ctx),
Expand Down Expand Up @@ -449,6 +442,10 @@ func (s *ProofSubmitter) AggregateProofs(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to read proof from buffer: %w", err)
}
if len(buffer) == 0 {
log.Debug("Buffer is empty now, skip aggregating")
return nil
}

result, err := s.proofProducer.Aggregate(
ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ type ProofSubmitterTestSuite struct {
proposer *proposer.Proposer
proofCh chan *producer.ProofWithHeader
batchProofGenerationCh chan *producer.BatchProofs
aggregationNotify chan struct{}
}

func (s *ProofSubmitterTestSuite) SetupTest() {
s.ClientTestSuite.SetupTest()

s.proofCh = make(chan *producer.ProofWithHeader, 1024)
s.batchProofGenerationCh = make(chan *producer.BatchProofs, 1024)
s.aggregationNotify = make(chan struct{}, 1)

builder := transaction.NewProveBlockTxBuilder(
s.RPCClient,
Expand Down Expand Up @@ -86,6 +88,7 @@ func (s *ProofSubmitterTestSuite) SetupTest() {
&producer.OptimisticProofProducer{},
s.proofCh,
s.batchProofGenerationCh,
s.aggregationNotify,
rpc.ZeroAddress,
common.HexToAddress(os.Getenv("TAIKO_L2")),
"test",
Expand All @@ -97,7 +100,6 @@ func (s *ProofSubmitterTestSuite) SetupTest() {
false,
0*time.Second,
0,
0*time.Second,
)
s.Nil(err)
s.contester = NewProofContester(
Expand Down Expand Up @@ -185,6 +187,7 @@ func (s *ProofSubmitterTestSuite) TestGetRandomBumpedSubmissionDelay() {
&producer.OptimisticProofProducer{},
s.proofCh,
s.batchProofGenerationCh,
s.aggregationNotify,
common.Address{},
common.HexToAddress(os.Getenv("TAIKO_L2")),
"test",
Expand All @@ -196,7 +199,6 @@ func (s *ProofSubmitterTestSuite) TestGetRandomBumpedSubmissionDelay() {
false,
time.Duration(0),
0,
0*time.Second,
)
s.Nil(err)

Expand All @@ -209,6 +211,7 @@ func (s *ProofSubmitterTestSuite) TestGetRandomBumpedSubmissionDelay() {
&producer.OptimisticProofProducer{},
s.proofCh,
s.batchProofGenerationCh,
s.aggregationNotify,
common.Address{},
common.HexToAddress(os.Getenv("TAIKO_L2")),
"test",
Expand All @@ -220,7 +223,6 @@ func (s *ProofSubmitterTestSuite) TestGetRandomBumpedSubmissionDelay() {
false,
1*time.Hour,
0,
0*time.Second,
)
s.Nil(err)
delay, err = submitter2.getRandomBumpedSubmissionDelay(time.Now())
Expand Down
36 changes: 36 additions & 0 deletions packages/taiko-client/prover/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Prover struct {

assignmentExpiredCh chan metadata.TaikoBlockMetaData
proveNotify chan struct{}
aggregationNotify chan struct{}

// Proof related channels
proofSubmissionCh chan *proofProducer.ProofRequestBody
Expand Down Expand Up @@ -132,6 +133,7 @@ func InitFromConfig(
p.proofSubmissionCh = make(chan *proofProducer.ProofRequestBody, p.cfg.Capacity)
p.proofContestCh = make(chan *proofProducer.ContestRequestBody, p.cfg.Capacity)
p.proveNotify = make(chan struct{}, 1)
p.aggregationNotify = make(chan struct{}, 1)

if err := p.initL1Current(cfg.StartingBlockID); err != nil {
return fmt.Errorf("initialize L1 current cursor error: %w", err)
Expand Down Expand Up @@ -272,6 +274,14 @@ func (p *Prover) eventLoop() {
default:
}
}
// reqAggregation requests performing a aggregate operation, won't block
// if we are already aggregating.
reqAggregation := func() {
select {
case p.aggregationNotify <- struct{}{}:
default:
}
}
// Call reqProving() right away to catch up with the latest state.
reqProving()

Expand All @@ -281,6 +291,9 @@ func (p *Prover) eventLoop() {
forceProvingTicker := time.NewTicker(15 * time.Second)
defer forceProvingTicker.Stop()

forceAggregatingTicker := time.NewTicker(p.cfg.ProveInterval)
defer forceAggregatingTicker.Stop()

// Channels
chBufferSize := p.protocolConfig.BlockMaxProposals
blockProposedCh := make(chan *bindings.TaikoL1ClientBlockProposed, chBufferSize)
Expand Down Expand Up @@ -327,6 +340,10 @@ func (p *Prover) eventLoop() {
if err := p.proveOp(); err != nil {
log.Error("Prove new blocks error", "error", err)
}
case <-p.aggregationNotify:
if err := p.aggregateOp(); err != nil {
log.Error("Aggregate proofs error", "error", err)
}
case e := <-blockVerifiedCh:
p.blockVerifiedHandler.Handle(encoding.BlockVerifiedEventToV2(e))
case e := <-transitionProvedCh:
Expand Down Expand Up @@ -366,6 +383,8 @@ func (p *Prover) eventLoop() {
reqProving()
case <-forceProvingTicker.C:
reqProving()
case <-forceProvingTicker.C:
reqAggregation()
}
}
}
Expand All @@ -392,6 +411,23 @@ func (p *Prover) proveOp() error {
return iter.Iter()
}

// aggregateOp aggregates all proofs in buffer.
func (p *Prover) aggregateOp() error {
var wg sync.WaitGroup
for _, submitter := range p.proofSubmitters {
wg.Add(1)
go func(s proofSubmitter.Submitter) {
defer wg.Done()
err := s.AggregateProofs(p.ctx)
log.Error("Failed to aggregate proofs",
"error", err,
"tier", s.Tier(),
)
}(submitter)
}
return nil
}

// contestProofOp performs a proof contest operation.
func (p *Prover) contestProofOp(req *proofProducer.ContestRequestBody) error {
if err := p.proofContester.SubmitContest(
Expand Down

0 comments on commit 04250f9

Please sign in to comment.