From 9737a92dfb5fe4fd187634a02abef3bad0e414b5 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 19 Aug 2024 16:23:24 -0700 Subject: [PATCH] making it concurrent-safe for indexing approval --- storage/pebble/approvals.go | 41 +++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/storage/pebble/approvals.go b/storage/pebble/approvals.go index 425e6de0c2a..a521e6757af 100644 --- a/storage/pebble/approvals.go +++ b/storage/pebble/approvals.go @@ -3,6 +3,7 @@ package pebble import ( "errors" "fmt" + "sync" "github.com/cockroachdb/pebble" @@ -15,8 +16,9 @@ import ( // ResultApprovals implements persistent storage for result approvals. type ResultApprovals struct { - db *pebble.DB - cache *Cache[flow.Identifier, *flow.ResultApproval] + indexing *sync.Mutex // preventing concurrent indexing of approvals + db *pebble.DB + cache *Cache[flow.Identifier, *flow.ResultApproval] } func NewResultApprovals(collector module.CacheMetrics, db *pebble.DB) *ResultApprovals { @@ -34,7 +36,8 @@ func NewResultApprovals(collector module.CacheMetrics, db *pebble.DB) *ResultApp } res := &ResultApprovals{ - db: db, + indexing: new(sync.Mutex), + db: db, cache: newCache[flow.Identifier, *flow.ResultApproval](collector, metrics.ResourceResultApprovals, withLimit[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100), withStore[flow.Identifier, *flow.ResultApproval](store), @@ -71,29 +74,29 @@ func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) f func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.PebbleReaderBatchWriter) error { return func(tx storage.PebbleReaderBatchWriter) error { + // acquring the lock to prevent dirty reads of check conflicted approvals + r.indexing.Lock() + defer r.indexing.Unlock() + r, w := tx.ReaderWriter() - err := operation.IndexResultApproval(resultID, chunkIndex, approvalID)(w) - if err == nil { - return nil - } + var storedApprovalID flow.Identifier + err := operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(r) + if err != nil { + if !errors.Is(err, storage.ErrNotFound) { + return fmt.Errorf("could not lookup result approval ID: %w", err) + } - if !errors.Is(err, storage.ErrAlreadyExists) { - return err + // no approval found, index the approval + + return operation.IndexResultApproval(resultID, chunkIndex, approvalID)(w) } - // When trying to index an approval for a result, and there is already - // an approval for the result, double check if the indexed approval is - // the same. + // an approval is already indexed, double check if it is the same // We don't allow indexing multiple approvals per chunk because the // store is only used within Verification nodes, and it is impossible // for a Verification node to compute different approvals for the same // chunk. - var storedApprovalID flow.Identifier - err = operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(r) - if err != nil { - return fmt.Errorf("there is an approval stored already, but cannot retrieve it: %w", err) - } if storedApprovalID != approvalID { return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w", @@ -104,7 +107,8 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app } } -// Store stores a ResultApproval +// Store stores a ResultApproval and indexes a ResultApproval by chunk (ResultID + chunk index). +// this method is concurrent-safe func (r *ResultApprovals) Store(approval *flow.ResultApproval) error { return operation.WithReaderBatchWriter(r.db, r.store(approval)) } @@ -112,6 +116,7 @@ func (r *ResultApprovals) Store(approval *flow.ResultApproval) error { // Index indexes a ResultApproval by chunk (ResultID + chunk index). // operation is idempotent (repeated calls with the same value are equivalent to // just calling the method once; still the method succeeds on each call). +// this method is concurrent-safe func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error { err := operation.WithReaderBatchWriter(r.db, r.index(resultID, chunkIndex, approvalID)) if err != nil {