diff --git a/storage/badger/approvals.go b/storage/badger/approvals.go index 4733c8e3e77..3023204290c 100644 --- a/storage/badger/approvals.go +++ b/storage/badger/approvals.go @@ -84,7 +84,7 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app // no approval found, index the approval - return operation.IndexResultApproval(resultID, chunkIndex, approvalID)(rw.Writer()) + return operation.UnsafeIndexResultApproval(resultID, chunkIndex, approvalID)(rw.Writer()) } // an approval is already indexed, double check if it is the same diff --git a/storage/badger/operation/approvals.go b/storage/badger/operation/approvals.go index af64911df7e..7c78095e090 100644 --- a/storage/badger/operation/approvals.go +++ b/storage/badger/operation/approvals.go @@ -18,12 +18,14 @@ func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApp return retrieveR(makePrefix(codeResultApproval, approvalID), approval) } -// IndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID +// UnsafeIndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID // and chunk index. If a value for this key exists, a storage.ErrAlreadyExists // error is returned. This operation is only used by the ResultApprovals store, // which is only used within a Verification node, where it is assumed that there // is only one approval per chunk. -func IndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.Writer) error { +// CAUTION: In order to prevent overwriting, use of this function must be +// synchronized with check (RetrieveResultApproval) for existance of the key. +func UnsafeIndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.Writer) error { return insertW(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) } diff --git a/storage/badger/operation/reader_batch_writer.go b/storage/badger/operation/reader_batch_writer.go index d92df33d3b3..3c8a26530fd 100644 --- a/storage/badger/operation/reader_batch_writer.go +++ b/storage/badger/operation/reader_batch_writer.go @@ -16,15 +16,30 @@ type ReaderBatchWriter struct { batch *badger.WriteBatch addingCallback sync.Mutex // protect callbacks - callbacks []func(error) + + // callbacks are executed regardless of the success of the batch commit. + // if any function that is adding writes to the batch fails, the callbacks + // are also called with the error, in this case the callbacks are executed + // before the batch is submitted. This is useful for the locks in those functions + // to be released. + // callbacks must be non-blocking + callbacks []func(error) } var _ storage.BadgerReaderBatchWriter = (*ReaderBatchWriter)(nil) +// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation"). +// This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed. +// This reader may observe different values for the same key on subsequent reads. func (b *ReaderBatchWriter) GlobalReader() storage.Reader { return b } +// Writer returns a writer associated with a batch of writes. The batch is pending until it is committed. +// When we `Write` into the batch, that write operation is added to the pending batch, but not committed. +// The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are. +// Note: +// - The writer cannot be used concurrently for writing. func (b *ReaderBatchWriter) Writer() storage.Writer { return b }