diff --git a/src/dev_copycat_arweave.erl b/src/dev_copycat_arweave.erl index c48e51af1..923905dea 100644 --- a/src/dev_copycat_arweave.erl +++ b/src/dev_copycat_arweave.erl @@ -152,7 +152,74 @@ fetch_blocks(Req, Current, To, _Opts) when is_integer(To), Current < To -> fetch_blocks(_Req, Current, undefined, _Opts) when Current < 0 -> {ok, 0}; fetch_blocks(Req, Current, undefined, Opts) -> - BlockRes = fetch_block_header(Current, Opts), + fetch_blocks(Req, Current, undefined, none, Opts); +fetch_blocks(Req, Current, To, Opts) -> + fetch_blocks_parallel(Req, Current, To, Opts). + +fetch_blocks_parallel(Req, From, To, Opts) -> + spawn(fun() -> + MaxWorkers = max(1, hb_opts:get(arweave_block_workers, 3, Opts)), + Failed = fetch_blocks_batch(From, To, MaxWorkers, Opts), + ?event(copycat_short, + {arweave_block_indexing_completed, + {range, {From, To}}, + {failed_blocks, length(Failed)}, + {initial_request, Req} + } + ), + case Failed of + [] -> ok; + _ -> + ?event(copycat_short, + {arweave_block_failures, + {heights, Failed}, + {range, {From, To}} + } + ) + end + end), + {ok, To}. + +fetch_blocks_batch(Current, To, _MaxWorkers, _Opts) when Current < To -> + []; +fetch_blocks_batch(Current, To, MaxWorkers, Opts) -> + BatchEnd = max(To, Current - MaxWorkers + 1), + Heights = lists:seq(Current, BatchEnd, -1), + Results = hb_pmap:parallel_map( + Heights, + fun(Height) -> + try + observe_event(<<"block_indexed">>, fun() -> + fetch_and_process_block(Height, To, Opts) + end), + ok + catch + Class:Reason:Stack -> + ?event(copycat_short, + {arweave_block_failed, + {height, Height}, + {class, Class}, + {reason, Reason}, + {stack, Stack} + } + ), + {failed, Height} + end + end, + MaxWorkers + ), + BatchFailed = [H || {failed, H} <- Results], + BatchFailed ++ fetch_blocks_batch(BatchEnd - 1, To, MaxWorkers, Opts). + +fetch_blocks(_Req, Current, undefined, _Prefetched, _Opts) when Current < 0 -> + {ok, 0}; +fetch_blocks(Req, Current, undefined, Prefetched, Opts) -> + ensure_cutover_height(Current, Opts), + BlockRes = + case Prefetched of + {prefetched, Res} -> Res; + none -> fetch_block_header(Current, Opts) + end, case is_already_indexed(BlockRes, Opts) of true -> ?event(copycat_short, @@ -163,25 +230,152 @@ fetch_blocks(Req, Current, undefined, Opts) -> ), {ok, Current}; false -> + PrefetchInfo = + case Current > 0 of + true -> spawn_prefetch(Current - 1, Opts); + false -> none + end, observe_event(<<"block_indexed">>, fun() -> process_block(BlockRes, Current, undefined, Opts) end), - fetch_blocks(Req, Current - 1, undefined, Opts) - end; -fetch_blocks(Req, Current, To, Opts) -> - observe_event(<<"block_indexed">>, fun() -> - fetch_and_process_block(Current, To, Opts) - end), - fetch_blocks(Req, Current - 1, To, Opts). + NextPrefetched = collect_prefetch(PrefetchInfo), + fetch_blocks(Req, Current - 1, undefined, NextPrefetched, Opts) + end. + +spawn_prefetch(Height, Opts) -> + Ref = make_ref(), + {Pid, MonRef} = spawn_monitor( + fun() -> + Result = + try fetch_block_header(Height, Opts) + catch _:_ -> {error, prefetch_failed} + end, + exit({block_prefetch, Ref, Result}) + end + ), + {Ref, Pid, MonRef}. + +collect_prefetch(none) -> + none; +collect_prefetch({Ref, _Pid, MonRef}) -> + receive + {'DOWN', MonRef, process, _, {block_prefetch, Ref, Res}} -> + {prefetched, Res}; + {'DOWN', MonRef, process, _, _} -> + none + end. + %% @doc Determine whether a fetched block is considered indexed. -%% A block is indexed when any TX from its `txs' list is in the index. +%% Checks for a block completion marker first. For blocks at or above the +%% cutover height, the marker is authoritative — no fallback. For blocks +%% below the cutover, falls back to legacy per-TX check for compatibility +%% with indexes built before markers were introduced. is_already_indexed({ok, Block}, Opts) -> - TXIDs = hb_maps:get(<<"txs">>, Block, [], Opts), - lists:any(fun(TXID) -> is_tx_indexed(TXID, Opts) end, TXIDs); + Height = hb_maps:get(<<"height">>, Block, undefined, Opts), + case is_block_indexed(Height, Opts) of + true -> + ?event(copycat_debug, + {auto_stop_check, {height, Height}, {stop_reason, marker}}), + true; + false -> + Cutover = read_cutover_height(Opts), + case Height =/= undefined andalso Cutover =/= undefined + andalso Height >= Cutover of + true -> + false; + false -> + Legacy = legacy_is_indexed(Block, Opts), + case Legacy of + true -> + ?event(copycat_debug, + {auto_stop_check, + {height, Height}, + {stop_reason, legacy_fallback} + } + ); + false -> ok + end, + Legacy + end + end; is_already_indexed({error, _}, _Opts) -> false. +legacy_is_indexed(Block, Opts) -> + TXIDs = hb_maps:get(<<"txs">>, Block, [], Opts), + lists:any(fun(TXID) -> is_tx_indexed(TXID, Opts) end, TXIDs). + +-define(BLOCK_MARKER_PREFIX, <<"block/">>). +-define(CUTOVER_KEY, <<"block/_marker_cutover_height">>). + +block_indexed_path(Height) -> + <>. + +is_block_indexed(undefined, _Opts) -> + false; +is_block_indexed(Height, Opts) -> + case hb_store_arweave:store_from_opts(Opts) of + no_store -> false; + #{ <<"index-store">> := Store } -> + case hb_store:read(Store, block_indexed_path(Height)) of + {ok, StoredBin} -> + try binary_to_integer(StoredBin) of + StoredDepth -> + ConfiguredDepth = + hb_opts:get( + arweave_index_depth, 1, Opts), + StoredDepth >= ConfiguredDepth + catch + _:_ -> false + end; + not_found -> false + end + end. + +mark_block_indexed(Height, Depth, Opts) -> + case hb_store_arweave:store_from_opts(Opts) of + no_store -> ok; + #{ <<"index-store">> := Store } -> + hb_store:write( + Store, + block_indexed_path(Height), + integer_to_binary(Depth) + ) + end. + +read_cutover_height(Opts) -> + case hb_opts:get(arweave_marker_cutover_height, undefined, Opts) of + undefined -> + case hb_store_arweave:store_from_opts(Opts) of + no_store -> undefined; + #{ <<"index-store">> := Store } -> + case hb_store:read(Store, ?CUTOVER_KEY) of + {ok, Bin} -> hb_util:int(Bin); + not_found -> undefined + end + end; + Override -> + hb_util:int(Override) + end. + +ensure_cutover_height(Height, Opts) -> + case hb_store_arweave:store_from_opts(Opts) of + no_store -> ok; + #{ <<"index-store">> := Store } -> + case hb_store:read(Store, ?CUTOVER_KEY) of + {ok, _} -> ok; + not_found -> + hb_store:write( + Store, ?CUTOVER_KEY, hb_util:bin(Height)), + ?event(copycat_short, + {marker_cutover_initialized, + {height, Height} + } + ) + end + end. + fetch_and_process_block(Current, To, Opts) -> BlockRes = fetch_block_header(Current, Opts), process_block(BlockRes, Current, To, Opts). @@ -208,6 +402,8 @@ process_block(BlockRes, Current, To, Opts) -> TotalTXs = maps:get(total_txs, Results, 0), BundleTXs = maps:get(bundle_count, Results, 0), SkippedTXs = maps:get(skipped_count, Results, 0), + AchievedDepth = maps:get(achieved_depth, Results, 1), + mark_block_indexed(Current, AchievedDepth, Opts), ?event( copycat_short, {arweave_block_indexed, @@ -216,6 +412,7 @@ process_block(BlockRes, Current, To, Opts) -> {total_txs, TotalTXs}, {bundle_txs, BundleTXs}, {skipped_txs, SkippedTXs}, + {achieved_depth, AchievedDepth}, {target, To} } ) @@ -277,13 +474,16 @@ parallel_map(Items, Fun, Opts) -> %% @doc Process a single transaction and return its contribution to the counters. %% Returns a map with keys: items_count, bundle_count, skipped_count -process_tx({{padding, _PaddingRoot}, _EndOffset}, _BlockStartOffset, _Opts) -> - #{items_count => 0, bundle_count => 0, skipped_count => 0}; +process_tx({{padding, _PaddingRoot}, _EndOffset}, _BlockStartOffset, Opts) -> + Depth = hb_opts:get(arweave_index_depth, 1, Opts), + #{items_count => 0, bundle_count => 0, skipped_count => 0, + achieved_depth => Depth}; process_tx({{TX, _TXDataRoot}, EndOffset}, BlockStartOffset, Opts) -> IndexStore = hb_store_arweave:store_from_opts(Opts), TXID = hb_util:encode(TX#tx.id), TXEndOffset = BlockStartOffset + EndOffset, TXStartOffset = TXEndOffset - TX#tx.data_size, + Depth = hb_opts:get(arweave_index_depth, 1, Opts), ?event(copycat_debug, {writing_index, {id, {explicit, TXID}}, {offset, TXStartOffset}, @@ -299,19 +499,39 @@ process_tx({{TX, _TXDataRoot}, EndOffset}, BlockStartOffset, Opts) -> ) end), case is_bundle_tx(TX, Opts) of - false -> #{items_count => 0, bundle_count => 0, skipped_count => 0}; + false -> + #{items_count => 0, bundle_count => 0, skipped_count => 0, + achieved_depth => Depth}; true -> - ?event(copycat_debug, {fetching_bundle_header, + IsRedstone = + dev_arweave_common:tagfind( + <<"Bundler-App-Name">>, TX#tx.tags, <<>> + ) =:= <<"Redstone">>, + EffectiveDepth = case IsRedstone of + true -> 1; + false -> Depth + end, + case IsRedstone of + true -> + ?event(copycat_short, + {arweave_redstone_skip, + {tx_id, {explicit, TXID}}, + {configured_depth, Depth} + } + ); + false -> ok + end, + ?event(copycat_debug, {fetching_bundle_header, {tx_id, {explicit, TXID}}, {tx_end_offset, TXEndOffset}, - {tx_data_size, TX#tx.data_size} + {tx_data_size, TX#tx.data_size}, + {effective_depth, EffectiveDepth} }), BundleRes = download_bundle_header( TXEndOffset, TX#tx.data_size, Opts ), case BundleRes of {ok, {BundleIndex, HeaderSize}} -> - % Batch event tracking: measure total time and count for all write_offset calls {TotalTime, {_, ItemsCount}} = timer:tc(fun() -> lists:foldl( fun({ItemID, Size}, {ItemStartOffset, ItemsCountAcc}) -> @@ -328,9 +548,29 @@ process_tx({{TX, _TXDataRoot}, EndOffset}, BlockStartOffset, Opts) -> BundleIndex ) end), - % Single event increment for the batch - record_event_metrics(<<"item_indexed">>, ItemsCount, TotalTime), - #{items_count => ItemsCount, bundle_count => 1, skipped_count => 0}; + record_event_metrics( + <<"item_indexed">>, ItemsCount, TotalTime), + {NestedItems, AchievedDepth} = + case EffectiveDepth > 1 of + false -> + {0, Depth}; + true -> + ItemsWithOffsets = items_with_offsets( + BundleIndex, + TXStartOffset + HeaderSize + ), + {NC, MinAchieved} = + nested_parallel_check( + ItemsWithOffsets, + EffectiveDepth - 1, + IndexStore, TXID, Opts + ), + {NC, 1 + MinAchieved} + end, + #{items_count => ItemsCount + NestedItems, + bundle_count => 1, + skipped_count => 0, + achieved_depth => AchievedDepth}; {error, Reason} -> ?event( copycat_short, @@ -339,7 +579,8 @@ process_tx({{TX, _TXDataRoot}, EndOffset}, BlockStartOffset, Opts) -> {reason, Reason} } ), - #{items_count => 0, bundle_count => 1, skipped_count => 1} + #{items_count => 0, bundle_count => 1, + skipped_count => 1, achieved_depth => 0} end end. @@ -349,6 +590,7 @@ process_tx({{TX, _TXDataRoot}, EndOffset}, BlockStartOffset, Opts) -> %% When arweave_index_workers > 1, processes in parallel with the specified concurrency limit. %% Returns a map with keys: items_count, bundle_count, skipped_count. process_txs(ValidTXs, BlockStartOffset, Opts) -> + Depth = hb_opts:get(arweave_index_depth, 1, Opts), Results = parallel_map( ValidTXs, fun(TXWithData) -> process_tx(TXWithData, BlockStartOffset, Opts) end, @@ -357,18 +599,345 @@ process_txs(ValidTXs, BlockStartOffset, Opts) -> lists:foldl( fun(Result, Acc) -> #{ - items_count => maps:get(items_count, Result, 0) + maps:get(items_count, Acc, 0), - bundle_count => maps:get(bundle_count, Result, 0) + maps:get(bundle_count, Acc, 0), - skipped_count => maps:get(skipped_count, Result, 0) + maps:get(skipped_count, Acc, 0) + items_count => + maps:get(items_count, Result, 0) + + maps:get(items_count, Acc, 0), + bundle_count => + maps:get(bundle_count, Result, 0) + + maps:get(bundle_count, Acc, 0), + skipped_count => + maps:get(skipped_count, Result, 0) + + maps:get(skipped_count, Acc, 0), + achieved_depth => min( + maps:get(achieved_depth, Result, Depth), + maps:get(achieved_depth, Acc, Depth) + ) } end, - #{items_count => 0, bundle_count => 0, skipped_count => 0}, + #{items_count => 0, bundle_count => 0, skipped_count => 0, + achieved_depth => Depth}, Results ). is_bundle_tx(TX, _Opts) -> dev_arweave_common:type(TX) =/= binary. +items_with_offsets(BundleIndex, StartOffset) -> + {_, Items} = lists:foldl( + fun({IID, Size}, {Offset, Acc}) -> + {Offset + Size, [{IID, Size, Offset} | Acc]} + end, + {StartOffset, []}, + BundleIndex + ), + lists:reverse(Items). + +nested_parallel_check(Items, RemainingDepth, Store, ParentID, Opts) -> + MaxWorkers = max(1, hb_opts:get(arweave_nested_workers, 10, Opts)), + nested_parallel_check( + Items, RemainingDepth, Store, ParentID, + Opts, MaxWorkers, {0, RemainingDepth}). + +nested_parallel_check([], _Depth, _Store, _ParentID, _Opts, _Max, Acc) -> + Acc; +nested_parallel_check(Items, Depth, Store, ParentID, Opts, Max, {CAcc, MAcc}) -> + {Batch, Rest} = take_batch(Max, Items), + Results = hb_pmap:parallel_map( + Batch, + fun({_IID, Size, Offset}) -> + check_nested_bundle( + Offset, Size, Depth, + Store, ParentID, Opts, no_readahead + ) + end, + Max + ), + NewAcc = lists:foldl( + fun({ok, Count, Achieved, _Cache}, {C, M}) -> + {C + Count, min(M, Achieved)} + end, + {CAcc, MAcc}, + Results + ), + nested_parallel_check( + Rest, Depth, Store, ParentID, Opts, Max, NewAcc). + +take_batch(0, Rest) -> {[], Rest}; +take_batch(_N, []) -> {[], []}; +take_batch(N, [H | T]) -> + {Batch, Rest} = take_batch(N - 1, T), + {[H | Batch], Rest}. + +check_nested_bundle(ItemAbsOffset, ItemSize, RemainingDepth, + Store, ParentID, Opts, ChunkCache) -> + try + MinSize = min(ItemSize, 262144), + case cached_read(ItemAbsOffset, MinSize, ChunkCache, Opts) of + {ok, ChunkData, NewCache} -> + ClampedData = case byte_size(ChunkData) > ItemSize of + true -> binary:part(ChunkData, 0, ItemSize); + false -> ChunkData + end, + case parse_nested_item( + ClampedData, ItemAbsOffset, ItemSize, + RemainingDepth, Store, ParentID, Opts) of + {parse_failed, _} when ItemSize > byte_size(ClampedData) -> + retry_with_larger_chunk( + ItemAbsOffset, ItemSize, + RemainingDepth, Store, ParentID, Opts, + NewCache); + {parse_failed, Reason} -> + ?event(debug_copycat, + {nested_item_not_bundle, + {parent_id, {explicit, ParentID}}, + {offset, ItemAbsOffset}, + {reason, Reason} + } + ), + {ok, 0, RemainingDepth, NewCache}; + {ok, Count, Achieved} -> + {ok, Count, Achieved, NewCache} + end; + {error, Reason} -> + ?event(copycat_short, + {nested_bundle_chunk_failed, + {parent_id, {explicit, ParentID}}, + {offset, ItemAbsOffset}, + {reason, Reason} + } + ), + {ok, 0, 0, ChunkCache} + end + catch + Class:Error:_ -> + ?event(copycat_short, + {nested_bundle_error, + {parent_id, {explicit, ParentID}}, + {offset, ItemAbsOffset}, + {class, Class}, + {reason, Error} + } + ), + {ok, 0, 0, ChunkCache} + end. + +retry_with_larger_chunk(ItemAbsOffset, ItemSize, + RemainingDepth, Store, ParentID, Opts, PrevCache) -> + LargerSize = min(ItemSize, 1048576), + case hb_store_arweave:read_chunks(ItemAbsOffset, LargerSize, Opts) of + {ok, ChunkData} -> + NewCache = {ItemAbsOffset, ChunkData}, + ClampedData = case byte_size(ChunkData) > ItemSize of + true -> binary:part(ChunkData, 0, ItemSize); + false -> ChunkData + end, + case parse_nested_item( + ClampedData, ItemAbsOffset, ItemSize, + RemainingDepth, Store, ParentID, Opts) of + {parse_failed, Reason} -> + HaveFullItem = + byte_size(ClampedData) >= ItemSize, + Achieved = case HaveFullItem of + true -> RemainingDepth; + false -> 0 + end, + ?event(case HaveFullItem of + true -> debug_copycat; + false -> copycat_short + end, + {nested_bundle_parse_failed_after_retry, + {parent_id, {explicit, ParentID}}, + {offset, ItemAbsOffset}, + {reason, Reason}, + {have_full_item, HaveFullItem} + } + ), + {ok, 0, Achieved, NewCache}; + {ok, Count, Achieved} -> + {ok, Count, Achieved, NewCache} + end; + {error, Reason} -> + ?event(copycat_short, + {nested_bundle_chunk_failed, + {parent_id, {explicit, ParentID}}, + {offset, ItemAbsOffset}, + {reason, Reason} + } + ), + {ok, 0, 0, PrevCache} + end. + +cached_read(Offset, MinSize, no_readahead, Opts) -> + do_fetch_and_cache(Offset, MinSize, false, Opts); +cached_read(Offset, MinSize, undefined, Opts) -> + do_fetch_and_cache(Offset, MinSize, true, Opts); +cached_read(Offset, MinSize, {CacheOffset, CacheData} = Cache, Opts) -> + CacheEnd = CacheOffset + byte_size(CacheData), + case Offset >= CacheOffset andalso Offset + MinSize =< CacheEnd of + true -> + Skip = Offset - CacheOffset, + Data = binary:part( + CacheData, Skip, byte_size(CacheData) - Skip), + {ok, Data, Cache}; + false -> + do_fetch_and_cache(Offset, MinSize, true, Opts) + end. + +do_fetch_and_cache(Offset, MinSize, ReadAhead, Opts) -> + FetchSize = case ReadAhead of + true -> max(MinSize, 262144); + false -> MinSize + end, + case hb_store_arweave:read_chunks(Offset, FetchSize, Opts) of + {ok, Data} -> + {ok, Data, {Offset, Data}}; + {error, _Reason} when FetchSize > MinSize -> + case hb_store_arweave:read_chunks(Offset, MinSize, Opts) of + {ok, Data} -> + {ok, Data, {Offset, Data}}; + {error, Reason2} -> + {error, Reason2} + end; + {error, Reason} -> + {error, Reason} + end. + +parse_nested_item(ChunkData, ItemAbsOffset, ItemSize, + RemainingDepth, Store, ParentID, Opts) -> + try ar_bundles:deserialize_header(ChunkData) of + {ok, ANS104HeaderSize, ParsedItem} -> + case dev_arweave_common:type(ParsedItem) of + binary -> + {ok, 0, RemainingDepth}; + _BundleType -> + IsRedstone = + dev_arweave_common:tagfind( + <<"Bundler-App-Name">>, + ParsedItem#tx.tags, <<>> + ) =:= <<"Redstone">>, + case IsRedstone of + true -> + ?event(copycat_short, + {nested_redstone_skip, + {parent_id, {explicit, ParentID}}, + {offset, ItemAbsOffset} + } + ); + false -> ok + end, + ItemDataSize = ItemSize - ANS104HeaderSize, + process_inner_bundle( + ChunkData, ItemAbsOffset, ANS104HeaderSize, + ItemDataSize, RemainingDepth, IsRedstone, + Store, ParentID, Opts + ) + end; + _ -> + {parse_failed, deserialize_header_unexpected} + catch + _:_ -> + {parse_failed, deserialize_header_crashed} + end. + +process_inner_bundle(ChunkData, ItemAbsOffset, ANS104HeaderSize, + ItemDataSize, RemainingDepth, IsRedstone, Store, ParentID, Opts) -> + InnerData = binary:part( + ChunkData, ANS104HeaderSize, + byte_size(ChunkData) - ANS104HeaderSize + ), + case ar_bundles:bundle_header_size(InnerData) of + invalid_bundle_header -> + ?event(copycat_short, + {nested_bundle_invalid_header, + {parent_id, {explicit, ParentID}}, + {offset, ItemAbsOffset} + } + ), + {ok, 0, 0}; + InnerHeaderSize when InnerHeaderSize > ItemDataSize -> + ?event(copycat_short, + {nested_bundle_header_exceeds_item, + {parent_id, {explicit, ParentID}}, + {offset, ItemAbsOffset}, + {inner_header_size, InnerHeaderSize}, + {item_data_size, ItemDataSize} + } + ), + {ok, 0, 0}; + InnerHeaderSize -> + DataAbsOffset = ItemAbsOffset + ANS104HeaderSize, + case fetch_inner_header( + InnerData, InnerHeaderSize, DataAbsOffset, Opts) of + {ok, FullHeader} -> + case ar_bundles:decode_bundle_header(FullHeader) of + invalid_bundle_header -> + ?event(copycat_short, + {nested_bundle_decode_failed, + {parent_id, {explicit, ParentID}}, + {offset, ItemAbsOffset} + } + ), + {ok, 0, 0}; + {_ItemsBin, InnerBundleIndex} -> + index_inner_items( + InnerBundleIndex, + DataAbsOffset + InnerHeaderSize, + RemainingDepth, IsRedstone, + Store, ParentID, Opts + ) + end; + {error, Reason} -> + ?event(copycat_short, + {nested_bundle_header_fetch_failed, + {parent_id, {explicit, ParentID}}, + {offset, ItemAbsOffset}, + {reason, Reason} + } + ), + {ok, 0, 0} + end + end. + +fetch_inner_header(InnerData, InnerHeaderSize, _DataAbsOffset, _Opts) + when InnerHeaderSize =< byte_size(InnerData) -> + {ok, InnerData}; +fetch_inner_header(InnerData, InnerHeaderSize, DataAbsOffset, Opts) -> + Needed = InnerHeaderSize - byte_size(InnerData), + FetchOffset = DataAbsOffset + byte_size(InnerData), + case hb_store_arweave:read_chunks(FetchOffset, Needed, Opts) of + {ok, More} -> {ok, <>}; + {error, Reason} -> {error, Reason} + end. + +index_inner_items(InnerBundleIndex, DataStart, RemainingDepth, + IsRedstone, Store, ParentID, Opts) -> + {_FinalOffset, ItemsCount} = lists:foldl( + fun({ItemID, Size}, {Offset, Count}) -> + hb_store_arweave:write_offset( + Store, + hb_util:encode(ItemID), + <<"ans104@1.0">>, + Offset, + Size + ), + {Offset + Size, Count + 1} + end, + {DataStart, 0}, + InnerBundleIndex + ), + case RemainingDepth > 1 andalso not IsRedstone of + false -> + {ok, ItemsCount, RemainingDepth}; + true -> + ItemsWithOffsets = items_with_offsets( + InnerBundleIndex, DataStart), + {NC, MinAchieved} = nested_parallel_check( + ItemsWithOffsets, RemainingDepth - 1, + Store, ParentID, Opts + ), + {ok, ItemsCount + NC, 1 + MinAchieved} + end. + download_bundle_header(EndOffset, Size, Opts) -> observe_event(<<"bundle_header">>, fun() -> StartOffset = EndOffset - Size + 1,