Skip to content

Commit

Permalink
Rework AsyncIter (#811)
Browse files Browse the repository at this point in the history
* Rework AsyncIter

* Add tests for finishing iter on error

* Improved error handling for  and additional tests

* Use new style of constructors

* Handle future cancellation

* Docs for constructors
  • Loading branch information
tbekas authored Jun 10, 2024
1 parent fe9d970 commit f51ef52
Show file tree
Hide file tree
Showing 14 changed files with 694 additions and 145 deletions.
5 changes: 2 additions & 3 deletions codex/erasure/erasure.nim
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ proc getPendingBlocks(
CatchableError,
"Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & $index)

Iter.new(genNext, isFinished)
AsyncIter[(?!bt.Block, int)].new(genNext, isFinished)

proc prepareEncodingData(
self: Erasure,
Expand Down Expand Up @@ -440,8 +440,7 @@ proc decode*(
if treeCid != encoded.originalTreeCid:
return failure("Original tree root differs from the tree root computed out of recovered data")

let idxIter = Iter
.fromItems(recoveredIndices)
let idxIter = Iter[Natural].new(recoveredIndices)
.filter((i: Natural) => i < tree.leavesCount)

if err =? (await self.store.putSomeProofs(tree, idxIter)).errorOption:
Expand Down
19 changes: 3 additions & 16 deletions codex/indexingstrategy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,9 @@ func checkIteration(self: IndexingStrategy, iteration: int): void {.raises: [Ind
IndexingError,
"Indexing iteration can't be greater than or equal to iterations.")

proc getIter(first, last, step: int): Iter[int] =
var
finish = false
cur = first

func get(): int =
result = cur
cur += step

if cur > last:
finish = true

func isFinished(): bool =
finish

Iter.new(get, isFinished)
func getIter(first, last, step: int): Iter[int] =
{.cast(noSideEffect).}:
Iter[int].new(first, last, step)

func getLinearIndicies(
self: IndexingStrategy,
Expand Down
8 changes: 3 additions & 5 deletions codex/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,8 @@ proc updateExpiry*(

try:
let
ensuringFutures = Iter
.fromSlice(0..<manifest.blocksCount)
.mapIt(
self.networkStore.localStore.ensureExpiry( manifest.treeCid, it, expiry ))
ensuringFutures = Iter[int].new(0..<manifest.blocksCount)
.mapIt(self.networkStore.localStore.ensureExpiry( manifest.treeCid, it, expiry ))
await allFuturesThrowing(ensuringFutures)
except CancelledError as exc:
raise exc
Expand Down Expand Up @@ -209,7 +207,7 @@ proc fetchBatched*(

trace "Fetching blocks in batches of", size = batchSize

let iter = Iter.fromSlice(0..<manifest.blocksCount)
let iter = Iter[int].new(0..<manifest.blocksCount)
self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch)

proc streamSingleBlock(
Expand Down
68 changes: 26 additions & 42 deletions codex/stores/cachestore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -132,51 +132,35 @@ method listBlocks*(
## Get the list of blocks in the BlockStore. This is an intensive operation
##

var
iter = AsyncIter[?Cid]()

let
cids = self.cids()

proc next(): Future[?Cid] {.async.} =
await idleAsync()

var cid: Cid
while true:
if iter.finished:
return Cid.none

cid = cids()

if finished(cids):
iter.finish
return Cid.none

without isManifest =? cid.isManifest, err:
trace "Error checking if cid is a manifest", err = err.msg
return Cid.none

case blockType:
of BlockType.Manifest:
if not isManifest:
trace "Cid is not manifest, skipping", cid
continue

break
of BlockType.Block:
if isManifest:
trace "Cid is a manifest, skipping", cid
continue

break
of BlockType.Both:
break

return cid.some

iter.next = next

return success iter
proc isFinished(): bool =
return finished(cids)

proc genNext(): Future[Cid] {.async.} =
cids()

let iter = await (AsyncIter[Cid].new(genNext, isFinished)
.filter(
proc (cid: Cid): Future[bool] {.async.} =
without isManifest =? cid.isManifest, err:
trace "Error checking if cid is a manifest", err = err.msg
return false

case blockType:
of BlockType.Both:
return true
of BlockType.Manifest:
return isManifest
of BlockType.Block:
return not isManifest
))

return success(map[Cid, ?Cid](iter,
proc (cid: Cid): Future[?Cid] {.async.} =
some(cid)
))

func putBlockSync(self: CacheStore, blk: Block): bool =

Expand Down
66 changes: 66 additions & 0 deletions codex/stores/queryiterhelper.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import pkg/questionable
import pkg/questionable/results
import pkg/chronos
import pkg/chronicles
import pkg/datastore/typedds

import ../utils/asynciter

type KeyVal[T] = tuple[key: Key, value: T]

proc toAsyncIter*[T](
queryIter: QueryIter[T],
finishOnErr: bool = true
): Future[?!AsyncIter[?!QueryResponse[T]]] {.async.} =
## Converts `QueryIter[T]` to `AsyncIter[?!QueryResponse[T]]` and automatically
## runs dispose whenever `QueryIter` finishes or whenever an error occurs (only
## if the flag finishOnErr is set to true)
##

if queryIter.finished:
trace "Disposing iterator"
if error =? (await queryIter.dispose()).errorOption:
return failure(error)
return success(AsyncIter[?!QueryResponse[T]].empty())

var errOccurred = false

proc genNext: Future[?!QueryResponse[T]] {.async.} =
let queryResOrErr = await queryIter.next()

if queryResOrErr.isErr:
errOccurred = true

if queryIter.finished or (errOccurred and finishOnErr):
trace "Disposing iterator"
if error =? (await queryIter.dispose()).errorOption:
return failure(error)

return queryResOrErr

proc isFinished(): bool =
queryIter.finished or (errOccurred and finishOnErr)

AsyncIter[?!QueryResponse[T]].new(genNext, isFinished).success

proc filterSuccess*[T](
iter: AsyncIter[?!QueryResponse[T]]
): Future[AsyncIter[tuple[key: Key, value: T]]] {.async.} =
## Filters out any items that are not success

proc mapping(resOrErr: ?!QueryResponse[T]): Future[?KeyVal[T]] {.async.} =
without res =? resOrErr, error:
error "Error occurred when getting QueryResponse", msg = error.msg
return KeyVal[T].none

without key =? res.key:
warn "No key for a QueryResponse"
return KeyVal[T].none

without value =? res.value, error:
error "Error occurred when getting a value from QueryResponse", msg = error.msg
return KeyVal[T].none

(key: key, value: value).some

await mapFilter[?!QueryResponse[T], KeyVal[T]](iter, mapping)
2 changes: 1 addition & 1 deletion codex/stores/treehelper.nim
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ proc putSomeProofs*(store: BlockStore, tree: CodexTree, iter: Iter[Natural]): Fu
store.putSomeProofs(tree, iter.map((i: Natural) => i.ord))

proc putAllProofs*(store: BlockStore, tree: CodexTree): Future[?!void] =
store.putSomeProofs(tree, Iter.fromSlice(0..<tree.leavesCount))
store.putSomeProofs(tree, Iter[int].new(0..<tree.leavesCount))
Loading

0 comments on commit f51ef52

Please sign in to comment.