Skip to content

Commit

Permalink
[Aggregator] Do not return in case of final proof error (#1691)
Browse files Browse the repository at this point in the history
* fix: do not return in case of final proof error

This commit fixes the flow going on after a failure generating a final
proof. The code was returning the error without "unlocking" the
underlying proof, which appeared to be stuck and eventually cleared by
the scheduled cleanup job. Instead we now just log it and carry on
handling the underlying proof.

* test: add test cases for tryBuildFinalProof error
  • Loading branch information
kind84 authored Feb 27, 2023
1 parent 83b4998 commit 30c78ec
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 58 deletions.
121 changes: 85 additions & 36 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface
"prover", prover.Name(),
"proverId", prover.ID(),
"proverAddr", prover.Addr(),
"recursiveProofId", *proof.ProofID,
"batches", fmt.Sprintf("%d-%d", proof.BatchNumber, proof.BatchNumberFinal),
)
log.Info("Generating final proof")
Expand All @@ -320,7 +321,7 @@ func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface
proof.ProofID = finalProofID

log.Infof("Final proof ID for batches [%d-%d]: %s", proof.BatchNumber, proof.BatchNumberFinal, *proof.ProofID)
log = log.WithFields("proofId", finalProofID)
log = log.WithFields("finalProofId", finalProofID)

finalProof, err := prover.WaitFinalProof(ctx, *proof.ProofID)
if err != nil {
Expand Down Expand Up @@ -353,7 +354,12 @@ func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface
func (a *Aggregator) tryBuildFinalProof(ctx context.Context, prover proverInterface, proof *state.Proof) (bool, error) {
proverName := prover.Name()
proverID := prover.ID()
log := log.WithFields("prover", proverName, "proverId", proverID, "proverAddr", prover.Addr())

log := log.WithFields(
"prover", proverName,
"proverId", proverID,
"proverAddr", prover.Addr(),
)
log.Debug("tryBuildFinalProof start")

var err error
Expand Down Expand Up @@ -398,7 +404,7 @@ func (a *Aggregator) tryBuildFinalProof(ctx context.Context, prover proverInterf
proof.GeneratingSince = nil
err2 := a.State.UpdateGeneratedProof(a.ctx, proof, nil)
if err2 != nil {
log.Errorf("failed to delete proof in progress, err: %v", err2)
log.Errorf("failed to unlock proof: %v", err2)
}
}
}()
Expand All @@ -414,6 +420,11 @@ func (a *Aggregator) tryBuildFinalProof(ctx context.Context, prover proverInterf
}
}

log = log.WithFields(
"proofId", *proof.ProofID,
"batches", fmt.Sprintf("%d-%d", proof.BatchNumber, proof.BatchNumberFinal),
)

// at this point we have an eligible proof, build the final one using it
finalProof, err := a.buildFinalProof(ctx, prover, proof)
if err != nil {
Expand Down Expand Up @@ -443,13 +454,13 @@ func (a *Aggregator) validateEligibleFinalProof(ctx context.Context, proof *stat
if proof.BatchNumber != batchNumberToVerify {
if proof.BatchNumber < batchNumberToVerify && proof.BatchNumberFinal >= batchNumberToVerify {
// We have a proof that contains some batches below the last batch verified, anyway can be eligible as final proof
log.Warnf("Proof %d-%d contains some batches lower than last batch verified %d. Check anyway if it is eligible", proof.BatchNumber, lastVerifiedBatchNum, batchNumberToVerify)
log.Warnf("Proof %d-%d contains some batches lower than last batch verified %d. Check anyway if it is eligible", proof.BatchNumber, proof.BatchNumberFinal, lastVerifiedBatchNum)
} else if proof.BatchNumberFinal < batchNumberToVerify {
// We have a proof that contains batches below that the last batch verified, we need to delete this proof
log.Warnf("Proof %d-%d lower than last batch verified %d. Delete it", proof.BatchNumber, lastVerifiedBatchNum, batchNumberToVerify)
log.Warnf("Proof %d-%d lower than next batch to verify %d. Deleting it", proof.BatchNumber, proof.BatchNumberFinal, batchNumberToVerify)
err := a.State.DeleteGeneratedProofs(ctx, proof.BatchNumber, proof.BatchNumberFinal, nil)
if err != nil {
return false, fmt.Errorf("Failed to delete discarded proof, err: %v", err)
return false, fmt.Errorf("Failed to delete discarded proof, err: %w", err)
}
return false, nil
} else {
Expand Down Expand Up @@ -507,11 +518,11 @@ func (a *Aggregator) unlockProofsToAggregate(ctx context.Context, proof1 *state.

if err != nil {
if err := dbTx.Rollback(ctx); err != nil {
err := fmt.Errorf("failed to rollback proof aggregation state %w", err)
err := fmt.Errorf("failed to rollback proof aggregation state: %w", err)
log.Error(err.Error())
return err
}
return fmt.Errorf("failed to release proof aggregation state %w", err)
return fmt.Errorf("failed to release proof aggregation state: %w", err)
}

err = dbTx.Commit(ctx)
Expand All @@ -523,6 +534,12 @@ func (a *Aggregator) unlockProofsToAggregate(ctx context.Context, proof1 *state.
}

func (a *Aggregator) getAndLockProofsToAggregate(ctx context.Context, prover proverInterface) (*state.Proof, *state.Proof, error) {
log := log.WithFields(
"prover", prover.Name(),
"proverId", prover.ID(),
"proverAddr", prover.Addr(),
)

a.StateDBMutex.Lock()
defer a.StateDBMutex.Unlock()

Expand Down Expand Up @@ -566,7 +583,12 @@ func (a *Aggregator) getAndLockProofsToAggregate(ctx context.Context, prover pro
func (a *Aggregator) tryAggregateProofs(ctx context.Context, prover proverInterface) (bool, error) {
proverName := prover.Name()
proverID := prover.ID()
log := log.WithFields("prover", proverName, "proverId", proverID, "proverAddr", prover.Addr())

log := log.WithFields(
"prover", proverName,
"proverId", proverID,
"proverAddr", prover.Addr(),
)
log.Debug("tryAggregateProofs start")

proof1, proof2, err0 := a.getAndLockProofsToAggregate(ctx, prover)
Expand Down Expand Up @@ -595,6 +617,7 @@ func (a *Aggregator) tryAggregateProofs(ctx context.Context, prover proverInterf
}()

log.Infof("Aggregating proofs: %d-%d and %d-%d", proof1.BatchNumber, proof1.BatchNumberFinal, proof2.BatchNumber, proof2.BatchNumberFinal)
log = log.WithFields("batches", fmt.Sprintf("%d-%d", proof1.BatchNumber, proof2.BatchNumberFinal))

inputProver := map[string]interface{}{
"recursive_proof_1": proof1.Proof,
Expand All @@ -605,14 +628,12 @@ func (a *Aggregator) tryAggregateProofs(ctx context.Context, prover proverInterf
return false, fmt.Errorf("failed to serialize input prover, %w", err)
}

now := time.Now().Round(time.Microsecond)
proof := &state.Proof{
BatchNumber: proof1.BatchNumber,
BatchNumberFinal: proof2.BatchNumberFinal,
Prover: &proverName,
ProverID: &proverID,
InputProver: string(b),
GeneratingSince: &now,
}

aggrProofID, err = prover.AggregatedProof(proof1.Proof, proof2.Proof)
Expand All @@ -622,53 +643,62 @@ func (a *Aggregator) tryAggregateProofs(ctx context.Context, prover proverInterf

proof.ProofID = aggrProofID

log.Infof("Proof ID for aggregated proof %d-%d: %v", proof.BatchNumber, proof.BatchNumberFinal, *proof.ProofID)
log.Infof("Proof ID for aggregated proof: %v", *proof.ProofID)
log = log.WithFields("proofId", *proof.ProofID)

recursiveProof, err := prover.WaitRecursiveProof(ctx, *proof.ProofID)
if err != nil {
return false, fmt.Errorf("failed to get aggregated proof from prover, %w", err)
}

log.Infof("Aggregated proof %s generated", *proof.ProofID)
log.Info("Aggregated proof generated")

proof.Proof = recursiveProof

// update the state by removing the 2 aggregated proofs and storing the
// newly generated recursive proof
dbTx, err := a.State.BeginStateTransaction(ctx)
if err != nil {
return false, fmt.Errorf("failed to begin transaction to update proof aggregation state %w", err)
return false, fmt.Errorf("failed to begin transaction to update proof aggregation state: %w", err)
}

err = a.State.DeleteGeneratedProofs(ctx, proof1.BatchNumber, proof2.BatchNumberFinal, dbTx)
if err != nil {
if err := dbTx.Rollback(ctx); err != nil {
err := fmt.Errorf("failed to rollback proof aggregation state %w", err)
err := fmt.Errorf("failed to rollback proof aggregation state: %w", err)
log.Error(err.Error())
return false, err
}
return false, fmt.Errorf("failed to delete previously aggregated proofs %w", err)
return false, fmt.Errorf("failed to delete previously aggregated proofs: %w", err)
}

now := time.Now().Round(time.Microsecond)
proof.GeneratingSince = &now

err = a.State.AddGeneratedProof(ctx, proof, dbTx)
if err != nil {
if err := dbTx.Rollback(ctx); err != nil {
err := fmt.Errorf("failed to rollback proof aggregation state %w", err)
err := fmt.Errorf("failed to rollback proof aggregation state: %w", err)
log.Error(err.Error())
return false, err
}
return false, fmt.Errorf("failed to store the recursive proof %w", err)
return false, fmt.Errorf("failed to store the recursive proof: %w", err)
}

err = dbTx.Commit(ctx)
if err != nil {
return false, fmt.Errorf("failed to store the recursive proof %w", err)
return false, fmt.Errorf("failed to store the recursive proof: %w", err)
}

// NOTE(pg): the defer func is useless from now on, use a different variable
// name for errors (or shadow err in inner scopes) to not trigger it.

// state is up to date, check if we can send the final proof using the
// one just crafted.
finalProofBuilt, err := a.tryBuildFinalProof(ctx, prover, proof)
if err != nil {
return false, fmt.Errorf("failed trying to check if recursive proof can be verified: %w", err)
finalProofBuilt, finalProofErr := a.tryBuildFinalProof(ctx, prover, proof)
if finalProofErr != nil {
// just log the error and continue to handle the aggregated proof
log.Errorf("failed trying to check if recursive proof can be verified: %v", finalProofErr)
}

// NOTE(pg): prover is done, use a.ctx from now on
Expand All @@ -677,7 +707,7 @@ func (a *Aggregator) tryAggregateProofs(ctx context.Context, prover proverInterf
proof.GeneratingSince = nil

// final proof has not been generated, update the recursive proof
err = a.State.UpdateGeneratedProof(a.ctx, proof, nil)
err := a.State.UpdateGeneratedProof(a.ctx, proof, nil)
if err != nil {
log.Errorf("Failed to store batch proof result, err %v", err)
return false, err
Expand All @@ -688,6 +718,15 @@ func (a *Aggregator) tryAggregateProofs(ctx context.Context, prover proverInterf
}

func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverInterface) (*state.Batch, *state.Proof, error) {
proverID := prover.ID()
proverName := prover.Name()

log := log.WithFields(
"prover", proverName,
"proverId", proverID,
"proverAddr", prover.Addr(),
)

a.StateDBMutex.Lock()
defer a.StateDBMutex.Unlock()

Expand All @@ -703,8 +742,9 @@ func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverIn
}

log.Infof("Found virtual batch %d pending to generate proof", batchToVerify.BatchNumber)
log = log.WithFields("batch", batchToVerify.BatchNumber)

log.Infof("Checking profitability to aggregate batch, batchNumber: %d", batchToVerify.BatchNumber)
log.Info("Checking profitability to aggregate batch")

// pass matic collateral as zero here, bcs in smart contract fee for aggregator is not defined yet
isProfitable, err := a.ProfitabilityChecker.IsProfitable(ctx, big.NewInt(0))
Expand All @@ -714,12 +754,10 @@ func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverIn
}

if !isProfitable {
log.Infof("Batch %d is not profitable, matic collateral %d", batchToVerify.BatchNumber, big.NewInt(0))
log.Infof("Batch is not profitable, matic collateral %d", big.NewInt(0))
return nil, nil, err
}

proverID := prover.ID()
proverName := prover.Name()
now := time.Now().Round(time.Microsecond)
proof := &state.Proof{
BatchNumber: batchToVerify.BatchNumber,
Expand All @@ -740,7 +778,11 @@ func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverIn
}

func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInterface) (bool, error) {
log := log.WithFields("prover", prover.Name(), "proverId", prover.ID(), "proverAddr", prover.Addr())
log := log.WithFields(
"prover", prover.Name(),
"proverId", prover.ID(),
"proverAddr", prover.Addr(),
)
log.Debug("tryGenerateBatchProof start")

batchToProve, proof, err0 := a.getAndLockBatchToProve(ctx, prover)
Expand All @@ -758,6 +800,8 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInt
err error
)

log = log.WithFields("batch", batchToProve.BatchNumber)

defer func() {
if err != nil {
err2 := a.State.DeleteGeneratedProofs(a.ctx, proof.BatchNumber, proof.BatchNumberFinal, nil)
Expand All @@ -768,7 +812,7 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInt
log.Debug("tryGenerateBatchProof end")
}()

log.Infof("Generating proof from batch [%d]", batchToProve.BatchNumber)
log.Info("Generating proof from batch")

log.Infof("Sending zki + batch to the prover, batchNumber [%d]", batchToProve.BatchNumber)
inputProver, err := a.buildInputProver(ctx, batchToProve)
Expand All @@ -793,29 +837,34 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInt

proof.ProofID = genProofID

log.Infof("Proof ID for batch %d: %v", proof.BatchNumber, *proof.ProofID)
log.Infof("Proof ID %v", *proof.ProofID)
log = log.WithFields("proofId", *proof.ProofID)

resGetProof, err := prover.WaitRecursiveProof(ctx, *proof.ProofID)
if err != nil {
return false, fmt.Errorf("failed to get proof from prover %w", err)
}

log.Infof("Batch proof %s generated", *proof.ProofID)
log.Info("Batch proof generated")

proof.Proof = resGetProof

finalProofBuilt, err := a.tryBuildFinalProof(ctx, prover, proof)
if err != nil {
return false, fmt.Errorf("failed trying to build final proof %w", err)
// NOTE(pg): the defer func is useless from now on, use a different variable
// name for errors (or shadow err in inner scopes) to not trigger it.

finalProofBuilt, finalProofErr := a.tryBuildFinalProof(ctx, prover, proof)
if finalProofErr != nil {
// just log the error and continue to handle the generated proof
log.Errorf("error trying to build final proof %v", finalProofErr)
}

// NOTE(pg): prover is done, use a.ctx from now on

if !finalProofBuilt {
proof.GeneratingSince = nil

// final proof has not been generated, update the recursive proof
err = a.State.UpdateGeneratedProof(a.ctx, proof, nil)
// final proof has not been generated, update the batch proof
err := a.State.UpdateGeneratedProof(a.ctx, proof, nil)
if err != nil {
log.Errorf("Failed to store batch proof result, err %v", err)
return false, err
Expand Down
Loading

0 comments on commit 30c78ec

Please sign in to comment.