diff --git a/src/dev_copycat_arweave.erl b/src/dev_copycat_arweave.erl index c48e51af1..6299f8c8f 100644 --- a/src/dev_copycat_arweave.erl +++ b/src/dev_copycat_arweave.erl @@ -9,6 +9,38 @@ -include_lib("eunit/include/eunit.hrl"). -define(ARWEAVE_DEVICE, <<"~arweave@2.9">>). +-define(TX_CODEC, <<"tx@1.0">>). +-define(ANS104_CODEC, <<"ans104@1.0">>). +%% Depth semantics: +%% 1 => index L1 TX IDs -> offsets only +%% 2 => index immediate bundle children +%% 3..N => recurse nested bundle children +-define(DEPTH_L1_OFFSETS, 1). +-define(DEPTH_IMMEDIATE_CHILDREN, 2). +-define(DEPTH_RECURSION_CAP, 4). +%% Policies filters +-define(AO_LEGACY_AUTHORITY, <<"fcoN_xJeisVsPXA-trzVAuIiqO3ydLQxM-L4XbrQKzY">>). +%% policy 1: AO messages are L3 dataitems: +%% 1- AO_BUNDLER_ADDR (Ardrive Turbo) sends bundles on Arweave (L1 txs) +%% 2- those bundles (1) are direct parents to nested bundles (nested with AO_LEGACY_AUTHORITY as owner) +%% 3- the (2) nested bundles are parents of L3 dataitems (ao messages) +%% 4- full path: AO_BUNDLER_ADDR L1 TXs (bundles) -> nested bundles owner by AO_LEGACY_AUTHORITY -> ao messages +%% example: hXztSyj_V6PXttCfzkeCWrgul7owCGcmYnz58ydgMCU +-define(AO_BUNDLER_ADDR, <<"JNC6vBhjHY1EPwV3pEeNmrsgFMxH5d38_LHsZ7jful8">>). +%% policy 2.1: AO messages are L2 dataitmes: +%% 1- AO_LEGACY_BUNDLER sends bundles on Arweave (L1 txs) +%% 2- those bundles are direct parents to ao messages (L2 messages, have AO_LEGACY_AUTHORITY as owner) +%% 3- full path: L1 TXs (bundles) -> L2 dataitems (ao messages) +%% example: 8DcCpFij5Dpfd2P7EjeGKZWSpOmpyT1COAM9MNc5VII + +%% policy 2.2: AO messages are L3 dataitems +%% 1- AO_LEGACY_BUNDLER sends bundles on Arweave (L1 txs) +%% 2- those bundles (1) are direct parents to nested bundles (nested with AO_LEGACY_AUTHORITY as owner) +%% 3- the (2) nested bundles are parents of L3 dataitems (ao messages) +%% 4- full path: AO_LEGACY_BUNDLER L1 TXs (bundles) -> nested bundles owner by AO_LEGACY_AUTHORITY -> ao messages +%% example: -MpPRIUBCBsWaGebFj-BtD42GKmuw8Wmkw37t_f63-I +-define(AO_LEGACY_BUNDLER, <<"FPjbN_btYKzcf8QASjs30v5C0FPv7XpwKXENBW8dqVw">>). + % GET /~cron@1.0/once&cron-path=~copycat@1.0/arweave @@ -16,14 +48,169 @@ %% latest known block towards the Genesis block. If no range is provided, we %% fetch blocks from the latest known block towards the Genesis block. arweave(_Base, Request, Opts) -> - {From, To} = parse_range(Request, Opts), - case hb_maps:get(<<"mode">>, Request, <<"write">>, Opts) of - <<"write">> -> fetch_blocks(Request, From, To, Opts); - <<"list">> -> list_index(From, To, Opts); - Mode -> - {error, <<"Unsupported mode `", (hb_util:bin(Mode))/binary, "`. Supported modes are: write, list">>} + FilteredOpts = with_filter_protocol(Request, Opts), + case hb_maps:find(<<"id">>, Request, FilteredOpts) of + {ok, ID} -> + Depth = request_depth(Request, <<"1">>, FilteredOpts), + index_id(ID, Depth, FilteredOpts); + error -> + {From, To} = parse_range(Request, FilteredOpts), + Depth = + request_depth( + Request, + hb_util:bin(?DEPTH_IMMEDIATE_CHILDREN), + FilteredOpts + ), + WriteOpts = FilteredOpts#{ arweave_index_depth => Depth }, + case hb_maps:get(<<"mode">>, Request, <<"write">>, FilteredOpts) of + <<"write">> -> fetch_blocks(Request, From, To, WriteOpts); + <<"list">> -> list_index(From, To, FilteredOpts); + Mode -> + {error, <<"Unsupported mode `", (hb_util:bin(Mode))/binary, "`. Supported modes are: write, list">>} + end + end. + +with_filter_protocol(Request, Opts) -> + FilterProtocolBin = + hb_util:to_lower( + hb_util:bin( + hb_maps:get(<<"filter-protocol">>, Request, <<"all">>, Opts) + ) + ), + FilterProtocol = + case FilterProtocolBin of + <<"ao">> -> ao; + _ -> all + end, + case FilterProtocol of + ao -> + Opts#{ + arweave_filter_protocol => FilterProtocol, + ao_bundler_turbo_addr => hb_util:native_id(?AO_BUNDLER_ADDR), + ao_bundler_legacy_addr => hb_util:native_id(?AO_LEGACY_BUNDLER), + ao_legacy_authority_addr => hb_util:native_id(?AO_LEGACY_AUTHORITY) + }; + _ -> + Opts#{ arweave_filter_protocol => FilterProtocol } + end. + +request_depth(Request, Default, Opts) -> + erlang:min( + ?DEPTH_RECURSION_CAP, + erlang:max( + ?DEPTH_L1_OFFSETS, + hb_util:int(hb_maps:get(<<"depth">>, Request, Default, Opts)) + ) + ). + +index_id(_ID, Depth, _Opts) when Depth =< 0 -> + {ok, 0}; +index_id(ID, Depth, Opts) -> + Store = hb_store_arweave:store_from_opts(Opts), + case hb_store_arweave:read_offset(Store, ID) of + not_found -> + {error, not_found}; + {ok, ItemOffset} -> + case payload_bounds(ID, ItemOffset, Opts) of + {error, _} = Error -> + Error; + {ok, PayloadStart, PayloadLength} -> + case download_bundle_header( + PayloadStart + PayloadLength, + PayloadLength, + Opts + ) of + {ok, {BundleIndex, HeaderSize}} -> + index_bundle_children( + BundleIndex, + PayloadStart + HeaderSize, + Depth, + Store, + Opts + ); + {error, _} -> + {ok, 0} + end + end + end. + +payload_bounds(_ID, #{ + <<"codec-device">> := ?TX_CODEC, + <<"start-offset">> := StartOffset, + <<"length">> := Length +}, _Opts) -> + {ok, StartOffset, Length}; +payload_bounds(ID, #{ + <<"codec-device">> := ?ANS104_CODEC +}, Opts) -> + Base = #{ <<"device">> => <<"arweave@2.9">>, <<"raw">> => ID }, + case hb_ao:resolve( + Base, + #{ <<"path">> => <<"raw">>, <<"method">> => <<"HEAD">> }, + Opts + ) of + {ok, #{ + <<"arweave-data-offset">> := DataOffset, + <<"content-length">> := ContentLength + }} -> + {ok, DataOffset, ContentLength}; + Error -> + Error + end; +payload_bounds(_ID, _Offset, _Opts) -> + {error, unsupported_codec_device}. + +index_bundle_children(BundleIndex, StartOffset, Depth, Store, Opts) -> + {_FinalOffset, Count} = + lists:foldl( + fun({ItemID, Size}, {ItemStartOffset, CountAcc}) -> + EncodedID = hb_util:encode(ItemID), + ShouldIndexItem = should_index_item(Depth, ItemStartOffset, Size, Opts), + case ShouldIndexItem of + true -> + hb_store_arweave:write_offset( + Store, + EncodedID, + ?ANS104_CODEC, + ItemStartOffset, + Size + ); + false -> + ok + end, + Descendants = + case Depth > 1 of + true -> + case index_id(EncodedID, Depth - 1, Opts) of + {ok, DescendantCount} -> DescendantCount; + _ -> 0 + end; + false -> 0 + end, + IndexedItemCount = case ShouldIndexItem of true -> 1; false -> 0 end, + {ItemStartOffset + Size, CountAcc + IndexedItemCount + Descendants} + end, + {StartOffset, 0}, + BundleIndex + ), + {ok, Count}. + +should_index_item(Depth, ItemStartOffset, ItemSize, Opts) -> + case hb_opts:get(ao_assume_authority_children, false, Opts) of + true -> + true; + false -> + should_index_item_by_depth(Depth, ItemStartOffset, ItemSize, Opts) end. +should_index_item_by_depth(1, ItemStartOffset, ItemSize, Opts) -> + case protocol_filter(Opts) of + ao -> is_ao_message_at_offset(ItemStartOffset, ItemSize, Opts); + _ -> true + end; +should_index_item_by_depth(_Depth, _ItemStartOffset, _ItemSize, _Opts) -> + true. + %% @doc Parse the range from the request. parse_range(Request, Opts) -> From = @@ -64,6 +251,95 @@ is_tx_indexed(TXID, Opts) -> end end. +%% @doc Check if a dataitem is an AO protocol message +is_ao_message(#tx{tags = Tags}) -> + Value = dev_arweave_common:tagfind(<<"Data-Protocol">>, Tags, <<>>), + hb_util:to_lower(Value) =:= <<"ao">>; +is_ao_message(_) -> + false. +protocol_filter(Opts) -> + hb_opts:get(arweave_filter_protocol, all, Opts). + +ao_policy_from_l1_tx(TX, Opts) -> + Owner = ar_tx:get_owner_address(TX), + Turbo = hb_opts:get( + ao_bundler_turbo_addr, + hb_util:native_id(?AO_BUNDLER_ADDR), + Opts + ), + Legacy = hb_opts:get( + ao_bundler_legacy_addr, + hb_util:native_id(?AO_LEGACY_BUNDLER), + Opts + ), + case Owner of + Turbo -> turbo; + Legacy -> legacy; + _ -> none + end. + +is_ao_legacy_authority_bundle(HeaderTX, Opts) -> + Authority = hb_opts:get( + ao_legacy_authority_addr, + hb_util:native_id(?AO_LEGACY_AUTHORITY), + Opts + ), + ar_tx:get_owner_address(HeaderTX) =:= Authority. + +should_index_l1_tx(TX, Opts) -> + case protocol_filter(Opts) of + ao -> ao_policy_from_l1_tx(TX, Opts) =/= none; + _ -> true + end. + +is_ao_message_at_offset(ItemStartOffset, ItemSize, Opts) -> + case dataitem_header_at_offset(ItemStartOffset, ItemSize, Opts) of + {ok, ParsedDataitem} -> is_ao_message(ParsedDataitem); + _ -> false + end. + +dataitem_header_at_offset(ItemStartOffset, ItemSize, Opts) -> + ReadSize = min(ItemSize, 262144), + case hb_store_arweave:read_chunks(ItemStartOffset, ReadSize, Opts) of + {ok, ChunkData} -> + try ar_bundles:deserialize_header(ChunkData) of + {ok, _HeaderSize, ParsedDataitem} -> + {ok, ParsedDataitem}; + _ -> + error + catch _:_ -> + error + end; + _ -> + error + end. + +should_index_ao_immediate_child({ok, HeaderTX}, Opts) -> + is_ao_legacy_authority_bundle(HeaderTX, Opts) orelse is_ao_message(HeaderTX); +should_index_ao_immediate_child(_HeaderRes, _Opts) -> + false. + +is_ao_authority_bundle({ok, HeaderTX}, Opts) -> + is_ao_legacy_authority_bundle(HeaderTX, Opts) andalso is_bundle_tx(HeaderTX, Opts); +is_ao_authority_bundle(_HeaderRes, _Opts) -> + false. + +is_bundle_header({ok, HeaderTX}, Opts) -> + is_bundle_tx(HeaderTX, Opts); +is_bundle_header(_HeaderRes, _Opts) -> + false. + +%% @doc Check if the TX owner is the AO +%% bundler address. useful to optimistically +%% identity ao parent bundles. +is_ao_bundler_owner(TX, Bundler) -> + Owner = ar_tx:get_owner_address(TX), + case Bundler of + ao_bundler_turbo_addr -> Owner =:= hb_util:native_id(?AO_BUNDLER_ADDR); + _ -> Owner =:= hb_util:native_id(?AO_LEGACY_BUNDLER) + end. + + %% @doc List indexed blocks and transactions in the given range. %% Returns JSON with block heights as keys, each containing indexed and not-indexed lists. list_index(From, undefined, Opts) -> @@ -280,66 +556,171 @@ parallel_map(Items, Fun, Opts) -> process_tx({{padding, _PaddingRoot}, _EndOffset}, _BlockStartOffset, _Opts) -> #{items_count => 0, bundle_count => 0, skipped_count => 0}; process_tx({{TX, _TXDataRoot}, EndOffset}, BlockStartOffset, Opts) -> - IndexStore = hb_store_arweave:store_from_opts(Opts), - TXID = hb_util:encode(TX#tx.id), - TXEndOffset = BlockStartOffset + EndOffset, - TXStartOffset = TXEndOffset - TX#tx.data_size, - ?event(copycat_debug, {writing_index, - {id, {explicit, TXID}}, - {offset, TXStartOffset}, - {size, TX#tx.data_size} - }), - observe_event(<<"item_indexed">>, fun() -> - hb_store_arweave:write_offset( - IndexStore, - TXID, - <<"tx@1.0">>, - TXStartOffset, - TX#tx.data_size - ) - end), - case is_bundle_tx(TX, Opts) of - false -> #{items_count => 0, bundle_count => 0, skipped_count => 0}; + case should_index_l1_tx(TX, Opts) of + false -> + #{items_count => 0, bundle_count => 0, skipped_count => 0}; true -> - ?event(copycat_debug, {fetching_bundle_header, - {tx_id, {explicit, TXID}}, - {tx_end_offset, TXEndOffset}, - {tx_data_size, TX#tx.data_size} + IndexDepth = hb_opts:get(arweave_index_depth, ?DEPTH_IMMEDIATE_CHILDREN, Opts), + AOPolicy = ao_policy_from_l1_tx(TX, Opts), + IndexStore = hb_store_arweave:store_from_opts(Opts), + TXID = hb_util:encode(TX#tx.id), + TXEndOffset = BlockStartOffset + EndOffset, + TXStartOffset = TXEndOffset - TX#tx.data_size, + ?event(copycat_debug, {writing_index, + {id, {explicit, TXID}}, + {offset, TXStartOffset}, + {size, TX#tx.data_size} }), - BundleRes = download_bundle_header( - TXEndOffset, TX#tx.data_size, Opts - ), - case BundleRes of - {ok, {BundleIndex, HeaderSize}} -> - % Batch event tracking: measure total time and count for all write_offset calls - {TotalTime, {_, ItemsCount}} = timer:tc(fun() -> - lists:foldl( - fun({ItemID, Size}, {ItemStartOffset, ItemsCountAcc}) -> - hb_store_arweave:write_offset( - IndexStore, - hb_util:encode(ItemID), - <<"ans104@1.0">>, - ItemStartOffset, - Size - ), - {ItemStartOffset + Size, ItemsCountAcc + 1} - end, - {TXStartOffset + HeaderSize, 0}, - BundleIndex - ) - end), - % Single event increment for the batch - record_event_metrics(<<"item_indexed">>, ItemsCount, TotalTime), - #{items_count => ItemsCount, bundle_count => 1, skipped_count => 0}; - {error, Reason} -> - ?event( - copycat_short, - {arweave_bundle_skipped, - {tx_id, {explicit, TXID}}, - {reason, Reason} - } + observe_event(<<"item_indexed">>, fun() -> + hb_store_arweave:write_offset( + IndexStore, + TXID, + <<"tx@1.0">>, + TXStartOffset, + TX#tx.data_size + ) + end), + case is_bundle_tx(TX, Opts) of + false -> #{items_count => 0, bundle_count => 0, skipped_count => 0}; + true -> + ?event(copycat_debug, {fetching_bundle_header, + {tx_id, {explicit, TXID}}, + {tx_end_offset, TXEndOffset}, + {tx_data_size, TX#tx.data_size} + }), + BundleRes = download_bundle_header( + TXEndOffset, TX#tx.data_size, Opts ), - #{items_count => 0, bundle_count => 1, skipped_count => 1} + case BundleRes of + {ok, {_BundleIndex, _HeaderSize}} when IndexDepth =< ?DEPTH_L1_OFFSETS -> + #{items_count => 0, bundle_count => 1, skipped_count => 0}; + {ok, {BundleIndex, HeaderSize}} -> + % Batch event tracking: measure total time and count for all write_offset calls + {TotalTime, {_, ItemsCount}} = timer:tc(fun() -> + lists:foldl( + fun({ItemID, Size}, {ItemStartOffset, ItemsCountAcc}) -> + EncodedID = hb_util:encode(ItemID), + HeaderRes = dataitem_header_at_offset( + ItemStartOffset, + Size, + Opts + ), + ShouldIndexItem = + case protocol_filter(Opts) of + ao -> + case AOPolicy of + legacy -> true; + turbo -> should_index_ao_immediate_child(HeaderRes, Opts); + _ -> false + end; + _ -> + true + end, + case ShouldIndexItem of + true -> + hb_store_arweave:write_offset( + IndexStore, + EncodedID, + <<"ans104@1.0">>, + ItemStartOffset, + Size + ); + false -> + ok + end, + ChildOpts = + case protocol_filter(Opts) of + ao -> + case AOPolicy of + legacy -> + case is_bundle_header(HeaderRes, Opts) of + true -> + Opts#{ ao_assume_authority_children => true }; + false -> + Opts + end; + turbo -> + case is_ao_authority_bundle(HeaderRes, Opts) of + true -> + Opts#{ ao_assume_authority_children => true }; + false -> + Opts + end; + _ -> + Opts + end; + _ -> + Opts + end, + Descendants = + case IndexDepth > ?DEPTH_IMMEDIATE_CHILDREN of + true -> + case protocol_filter(Opts) of + ao -> + case AOPolicy of + legacy -> + case is_bundle_header(HeaderRes, Opts) of + true -> + case index_id( + EncodedID, + IndexDepth - ?DEPTH_IMMEDIATE_CHILDREN, + ChildOpts + ) of + {ok, DescendantCount} -> DescendantCount; + _ -> 0 + end; + false -> + 0 + end; + turbo -> + case is_ao_authority_bundle(HeaderRes, Opts) of + true -> + case index_id( + EncodedID, + IndexDepth - ?DEPTH_IMMEDIATE_CHILDREN, + ChildOpts + ) of + {ok, DescendantCount} -> DescendantCount; + _ -> 0 + end; + false -> + 0 + end; + _ -> + 0 + end; + _ -> + case index_id( + EncodedID, + IndexDepth - ?DEPTH_IMMEDIATE_CHILDREN, + Opts + ) of + {ok, DescendantCount} -> DescendantCount; + _ -> 0 + end + end; + false -> 0 + end, + IndexedItemCount = case ShouldIndexItem of true -> 1; false -> 0 end, + {ItemStartOffset + Size, ItemsCountAcc + IndexedItemCount + Descendants} + end, + {TXStartOffset + HeaderSize, 0}, + BundleIndex + ) + end), + % Single event increment for the batch + record_event_metrics(<<"item_indexed">>, ItemsCount, TotalTime), + #{items_count => ItemsCount, bundle_count => 1, skipped_count => 0}; + {error, Reason} -> + ?event( + copycat_short, + {arweave_bundle_skipped, + {tx_id, {explicit, TXID}}, + {reason, Reason} + } + ), + #{items_count => 0, bundle_count => 1, skipped_count => 1} + end end end. @@ -385,13 +766,18 @@ download_bundle_header(EndOffset, Size, Opts) -> % thousands of items might require multiple chunks to fully % represent the item index. HeaderSize = ar_bundles:bundle_header_size(FirstChunk), - case header_chunk(HeaderSize, FirstChunk, StartOffset, Opts) of - {ok, BundleHeader} -> - {_ItemsBin, BundleIndex} = - ar_bundles:decode_bundle_header(BundleHeader), - {ok, {BundleIndex, HeaderSize}}; - Error -> - Error + case HeaderSize =:= invalid_bundle_header orelse HeaderSize > Size of + true -> + {error, invalid_bundle_header}; + false -> + case header_chunk(HeaderSize, FirstChunk, StartOffset, Opts) of + {ok, BundleHeader} -> + {_ItemsBin, BundleIndex} = + ar_bundles:decode_bundle_header(BundleHeader), + {ok, {BundleIndex, HeaderSize}}; + Error -> + Error + end end; Error -> Error