Skip to content

Commit

Permalink
addressing review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Aug 23, 2024
1 parent 300119d commit ddd9404
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 4 deletions.
2 changes: 1 addition & 1 deletion storage/badger/approvals.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app

// no approval found, index the approval

return operation.IndexResultApproval(resultID, chunkIndex, approvalID)(rw.Writer())
return operation.UnsafeIndexResultApproval(resultID, chunkIndex, approvalID)(rw.Writer())
}

// an approval is already indexed, double check if it is the same
Expand Down
6 changes: 4 additions & 2 deletions storage/badger/operation/approvals.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApp
return retrieveR(makePrefix(codeResultApproval, approvalID), approval)
}

// IndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID
// UnsafeIndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID
// and chunk index. If a value for this key exists, a storage.ErrAlreadyExists
// error is returned. This operation is only used by the ResultApprovals store,
// which is only used within a Verification node, where it is assumed that there
// is only one approval per chunk.
func IndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.Writer) error {
// CAUTION: In order to prevent overwriting, use of this function must be
// synchronized with check (RetrieveResultApproval) for existance of the key.
func UnsafeIndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.Writer) error {
return insertW(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID)
}

Expand Down
17 changes: 16 additions & 1 deletion storage/badger/operation/reader_batch_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,30 @@ type ReaderBatchWriter struct {
batch *badger.WriteBatch

addingCallback sync.Mutex // protect callbacks
callbacks []func(error)

// callbacks are executed regardless of the success of the batch commit.
// if any function that is adding writes to the batch fails, the callbacks
// are also called with the error, in this case the callbacks are executed
// before the batch is submitted. This is useful for the locks in those functions
// to be released.
// callbacks must be non-blocking
callbacks []func(error)
}

var _ storage.BadgerReaderBatchWriter = (*ReaderBatchWriter)(nil)

// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation").
// This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed.
// This reader may observe different values for the same key on subsequent reads.
func (b *ReaderBatchWriter) GlobalReader() storage.Reader {
return b
}

// Writer returns a writer associated with a batch of writes. The batch is pending until it is committed.
// When we `Write` into the batch, that write operation is added to the pending batch, but not committed.
// The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are.
// Note:
// - The writer cannot be used concurrently for writing.
func (b *ReaderBatchWriter) Writer() storage.Writer {
return b
}
Expand Down

0 comments on commit ddd9404

Please sign in to comment.