Skip to content

Commit

Permalink
making it concurrent-safe for indexing approval
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Aug 19, 2024
1 parent 51969ec commit 9737a92
Showing 1 changed file with 23 additions and 18 deletions.
41 changes: 23 additions & 18 deletions storage/pebble/approvals.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pebble
import (
"errors"
"fmt"
"sync"

"github.com/cockroachdb/pebble"

Expand All @@ -15,8 +16,9 @@ import (

// ResultApprovals implements persistent storage for result approvals.
type ResultApprovals struct {
db *pebble.DB
cache *Cache[flow.Identifier, *flow.ResultApproval]
indexing *sync.Mutex // preventing concurrent indexing of approvals
db *pebble.DB
cache *Cache[flow.Identifier, *flow.ResultApproval]
}

func NewResultApprovals(collector module.CacheMetrics, db *pebble.DB) *ResultApprovals {
Expand All @@ -34,7 +36,8 @@ func NewResultApprovals(collector module.CacheMetrics, db *pebble.DB) *ResultApp
}

res := &ResultApprovals{
db: db,
indexing: new(sync.Mutex),
db: db,
cache: newCache[flow.Identifier, *flow.ResultApproval](collector, metrics.ResourceResultApprovals,
withLimit[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100),
withStore[flow.Identifier, *flow.ResultApproval](store),
Expand Down Expand Up @@ -71,29 +74,29 @@ func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) f

func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.PebbleReaderBatchWriter) error {
return func(tx storage.PebbleReaderBatchWriter) error {
// acquring the lock to prevent dirty reads of check conflicted approvals
r.indexing.Lock()
defer r.indexing.Unlock()

r, w := tx.ReaderWriter()

err := operation.IndexResultApproval(resultID, chunkIndex, approvalID)(w)
if err == nil {
return nil
}
var storedApprovalID flow.Identifier
err := operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(r)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return fmt.Errorf("could not lookup result approval ID: %w", err)
}

if !errors.Is(err, storage.ErrAlreadyExists) {
return err
// no approval found, index the approval

return operation.IndexResultApproval(resultID, chunkIndex, approvalID)(w)
}

// When trying to index an approval for a result, and there is already
// an approval for the result, double check if the indexed approval is
// the same.
// an approval is already indexed, double check if it is the same
// We don't allow indexing multiple approvals per chunk because the
// store is only used within Verification nodes, and it is impossible
// for a Verification node to compute different approvals for the same
// chunk.
var storedApprovalID flow.Identifier
err = operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(r)
if err != nil {
return fmt.Errorf("there is an approval stored already, but cannot retrieve it: %w", err)
}

if storedApprovalID != approvalID {
return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w",
Expand All @@ -104,14 +107,16 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app
}
}

// Store stores a ResultApproval
// Store stores a ResultApproval and indexes a ResultApproval by chunk (ResultID + chunk index).
// this method is concurrent-safe
func (r *ResultApprovals) Store(approval *flow.ResultApproval) error {
return operation.WithReaderBatchWriter(r.db, r.store(approval))
}

// Index indexes a ResultApproval by chunk (ResultID + chunk index).
// operation is idempotent (repeated calls with the same value are equivalent to
// just calling the method once; still the method succeeds on each call).
// this method is concurrent-safe
func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error {
err := operation.WithReaderBatchWriter(r.db, r.index(resultID, chunkIndex, approvalID))
if err != nil {
Expand Down

0 comments on commit 9737a92

Please sign in to comment.