diff --git a/src/dev_arweave.erl b/src/dev_arweave.erl index 629901f9a..a1cec45d8 100644 --- a/src/dev_arweave.erl +++ b/src/dev_arweave.erl @@ -5,7 +5,7 @@ %%% `/arweave` route in the node's configuration message. -module(dev_arweave). -export([tx/3, raw/3, chunk/3, block/3, current/3, status/3, price/3, tx_anchor/3]). --export([post_tx/3, post_tx/4, post_binary_ans104/2, post_json_chunk/2]). +-export([post_tx_header/2, post_tx/3, post_tx/4, post_binary_ans104/2, post_json_chunk/2]). -include("include/hb.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -65,19 +65,7 @@ extract_target(Base, Request, Opts) -> post_tx(_Base, Request, Opts, <<"tx@1.0">>) -> TX = hb_message:convert(Request, <<"tx@1.0">>, Opts), - JSON = ar_tx:tx_to_json_struct(TX#tx{ data = <<>> }), - Serialized = hb_json:encode(JSON), - LogExtra = [ - {codec, <<"tx@1.0">>}, - {id, {explicit, hb_util:human_id(TX#tx.id)}} - ], - Res = request( - <<"POST">>, - <<"/tx">>, - #{ <<"body">> => Serialized }, - LogExtra, - Opts - ), + Res = post_tx_header(TX, Opts), case Res of {ok, _} -> CacheRes = hb_cache:write(Request, Opts), @@ -101,6 +89,22 @@ post_tx(_Base, Request, Opts, <<"ans104@1.0">>) -> ], post_binary_ans104(Serialized, LogExtra, Opts). + +post_tx_header(TX, Opts) -> + JSON = ar_tx:tx_to_json_struct(TX#tx{ data = <<>> }), + Serialized = hb_json:encode(JSON), + LogExtra = [ + {codec, <<"tx@1.0">>}, + {id, {explicit, hb_util:human_id(TX#tx.id)}} + ], + request( + <<"POST">>, + <<"/tx">>, + #{ <<"body">> => Serialized }, + LogExtra, + Opts + ). + post_binary_ans104(SerializedTX, Opts) -> LogExtra = [ {codec, <<"ans104@1.0">>}, diff --git a/src/dev_bundler.erl b/src/dev_bundler.erl index 439311959..ceffa327c 100644 --- a/src/dev_bundler.erl +++ b/src/dev_bundler.erl @@ -15,8 +15,9 @@ %%% available for reading instantly (`optimistically'), even before the %%% transaction is dispatched. -module(dev_bundler). --export([tx/3, item/3]). +-export([tx/3, item/3, ensure_server/1, stop_server/0, get_state/0]). -include("include/hb.hrl"). +-include("include/dev_bundler.hrl"). -include_lib("eunit/include/eunit.hrl"). %%% Default options. @@ -42,7 +43,7 @@ item(_Base, Req, Opts) -> ok -> % Queue the item for bundling % (fire-and-forget, ignore errors) - ServerPID ! {item, Item}, + ServerPID ! {enqueue_item, Item}, {ok, #{ <<"id">> => ItemID, <<"timestamp">> => erlang:system_time(millisecond) @@ -114,59 +115,79 @@ stop_server() -> hb_name:unregister(?SERVER_NAME) end. +%% @doc Return the current bundler server state for tests. +get_state() -> + case hb_name:lookup(?SERVER_NAME) of + undefined -> undefined; + PID -> + PID ! {get_state, self(), Ref = make_ref()}, + receive + {state, Ref, State} -> State + after 1000 -> timeout + end + end. + %% @doc Initialize the bundler server. init(Opts) -> - % Start the dispatcher to recover any in-progress bundles - dev_bundler_dispatch:ensure_dispatcher(Opts), - % Recover any unbundled items from cache - {UnbundledItems, RecoveredBytes} = recover_unbundled_items(Opts), - InitialState = #{ - max_size => hb_opts:get( - bundler_max_size, ?DEFAULT_MAX_SIZE, Opts), - max_idle_time => hb_opts:get( - bundler_max_idle_time, ?DEFAULT_MAX_IDLE_TIME, Opts), - max_items => hb_opts:get( - bundler_max_items, ?DEFAULT_MAX_ITEMS, Opts), - queue => UnbundledItems, - bytes => RecoveredBytes - }, - % If recovered items are ready to dispatch, do so immediately - State = maybe_dispatch(InitialState, Opts), - server(State, Opts). - -%% @doc Recover unbundled items from cache and calculate their total size. -%% Returns {Items, TotalBytes}. -recover_unbundled_items(Opts) -> - UnbundledItems = dev_bundler_cache:load_unbundled_items(Opts), - ?event(bundler_short, {recovered_unbundled_items, length(UnbundledItems)}), - % Calculate total bytes for recovered items - RecoveredBytes = lists:foldl( - fun(Item, Acc) -> - Acc + erlang:external_size(Item) + NumWorkers = hb_opts:get(bundler_workers, ?DEFAULT_NUM_WORKERS, Opts), + Workers = lists:map( + fun(_) -> + WorkerPID = spawn_link(fun dev_bundler_task:worker_loop/0), + {WorkerPID, idle} end, - 0, - UnbundledItems + lists:seq(1, NumWorkers) ), - {UnbundledItems, RecoveredBytes}. + InitialState = #state{ + max_size = hb_opts:get(bundler_max_size, ?DEFAULT_MAX_SIZE, Opts), + max_idle_time = hb_opts:get( + bundler_max_idle_time, ?DEFAULT_MAX_IDLE_TIME, Opts), + max_items = hb_opts:get(bundler_max_items, ?DEFAULT_MAX_ITEMS, Opts), + queue = [], + bytes = 0, + workers = maps:from_list(Workers), + task_queue = queue:new(), + bundles = #{}, + opts = Opts + }, + dev_bundler_recovery:recover_unbundled_items(self(), Opts), + dev_bundler_recovery:recover_bundles(self(), Opts), + server(assign_tasks(InitialState), Opts). -%% @doc The main loop of the bundler server. Simply waits for messages to be -%% added to the queue, and then dispatches them when the queue is large enough. -server(State = #{ max_idle_time := MaxIdleTime }, Opts) -> +%% @doc The main loop of the bundler server. +server(State = #state{max_idle_time = MaxIdleTime}, Opts) -> receive - {item, Item} -> - server(maybe_dispatch(add_to_queue(Item, State, Opts), Opts), Opts); + {enqueue_item, Item} -> + State1 = add_to_queue(Item, State, Opts), + server(assign_tasks(maybe_dispatch(State1)), Opts); + {recover_bundle, CommittedTX, Items} -> + State1 = recover_bundle(CommittedTX, Items, State), + server(assign_tasks(State1), Opts); + {task_complete, WorkerPID, Task, Result} -> + State1 = handle_task_complete(WorkerPID, Task, Result, State), + server(assign_tasks(State1), Opts); + {task_failed, WorkerPID, Task, Reason} -> + State1 = handle_task_failed(WorkerPID, Task, Reason, State), + server(assign_tasks(State1), Opts); + {retry_task, Task} -> + State1 = enqueue_task(Task, State), + server(assign_tasks(State1), Opts); + {get_state, From, Ref} -> + From ! {state, Ref, State}, + server(State, Opts); stop -> + maps:foreach( + fun(WorkerPID, _) -> WorkerPID ! stop end, + State#state.workers + ), exit(normal) after MaxIdleTime -> - Q = maps:get(queue, State), - dev_bundler_dispatch:dispatch(Q, Opts), - server(State#{ queue => [] }, Opts) + server(assign_tasks(dispatch_queue(State)), Opts) end. -%% @doc Add an item to the queue. Update the state with the new queue +%% @doc Add an enqueue_item to the queue. Update the state with the new queue %% and approximate total byte size of the queue. %% Note: Item has already been verified and cached before reaching here. -add_to_queue(Item, State = #{ queue := Queue, bytes := Bytes }, Opts) -> +add_to_queue(Item, State = #state{queue = Queue, bytes = Bytes}, Opts) -> ItemSize = erlang:external_size(Item), NewQueue = [Item | Queue], NewBytes = Bytes + ItemSize, @@ -176,28 +197,20 @@ add_to_queue(Item, State = #{ queue := Queue, bytes := Bytes }, Opts) -> {queue_size, length(NewQueue)}, {queue_bytes, NewBytes} }), - State#{ - queue => NewQueue, - bytes => NewBytes - }. + State#state{queue = NewQueue, bytes = NewBytes}. %% @doc Dispatch the queue if it is ready. %% Only dispatches up to max_items at a time to respect the limit. -maybe_dispatch(State = #{queue := Q, max_items := MaxItems}, Opts) -> - case dispatchable(State, Opts) of +maybe_dispatch(State = #state{queue = Q, max_items = MaxItems}) -> + case dispatchable(State) of true -> - % Only dispatch up to max_items, keep the rest in queue {ToDispatch, Remaining} = split_queue(Q, MaxItems), - dev_bundler_dispatch:dispatch(ToDispatch, Opts), - % Recalculate bytes for remaining items - RemainingBytes = lists:foldl( - fun(Item, Acc) -> Acc + erlang:external_size(Item) end, - 0, - Remaining - ), - NewState = State#{queue => Remaining, bytes => RemainingBytes}, - % Check if we should dispatch again (in case we have more than max_items) - maybe_dispatch(NewState, Opts); + State1 = create_bundle(ToDispatch, State), + NewState = State1#state{ + queue = Remaining, + bytes = queue_bytes(Remaining) + }, + maybe_dispatch(NewState); false -> State end. @@ -209,15 +222,247 @@ split_queue(Queue, MaxItems) -> {ToDispatch, Remaining}. %% @doc Returns whether the queue is dispatchable. -dispatchable(#{ queue := Q, max_items := MaxLen }, _Opts) - when length(Q) >= MaxLen -> +dispatchable(#state{queue = Q, max_items = MaxLen}) when length(Q) >= MaxLen -> true; -dispatchable(#{ bytes := Bytes, max_size := MaxSize }, _Opts) - when Bytes >= MaxSize -> +dispatchable(#state{bytes = Bytes, max_size = MaxSize}) when Bytes >= MaxSize -> true; -dispatchable(_State, _Opts) -> +dispatchable(_State) -> false. +%% @doc Return the total size of a queue of items. +queue_bytes(Items) -> + lists:foldl( + fun(Item, Acc) -> Acc + erlang:external_size(Item) end, + 0, + Items + ). + +%% @doc Dispatch all currently queued items immediately. +dispatch_queue(State = #state{queue = []}) -> + State; +dispatch_queue(State = #state{queue = Queue}) -> + create_bundle(Queue, State#state{queue = [], bytes = 0}). + +%% @doc Create a bundle and enqueue its initial post task. +create_bundle([], State) -> + State; +create_bundle(Items, State = #state{bundles = Bundles, opts = Opts}) -> + BundleID = make_ref(), + Bundle = #bundle{ + id = BundleID, + items = Items, + status = initializing, + tx = undefined, + proofs = #{}, + start_time = erlang:timestamp() + }, + State1 = State#state{ + bundles = maps:put(BundleID, Bundle, Bundles) + }, + ?event( + bundler_short, + {dispatching_bundle, + {timestamp, dev_bundler_task:format_timestamp()}, + {bundle_id, BundleID}, + {num_items, length(Items)} + } + ), + Task = #task{ + bundle_id = BundleID, + type = post_tx, + data = Items, + opts = Opts + }, + enqueue_task(Task, State1). + +%% @doc Enqueue a task for worker execution. +enqueue_task(Task, State = #state{task_queue = Queue}) -> + State#state{task_queue = queue:in(Task, Queue)}. + +%% @doc Assign pending tasks to all idle workers. +assign_tasks(State = #state{workers = Workers}) -> + IdleWorkers = maps:filter( + fun(_, Status) -> Status =:= idle end, + Workers + ), + assign_tasks(maps:keys(IdleWorkers), State). + +assign_tasks([], State) -> + State; +assign_tasks([WorkerPID | Rest], State = #state{workers = Workers, task_queue = Queue}) -> + case queue:out(Queue) of + {{value, Task}, Queue1} -> + WorkerPID ! {execute_task, self(), Task}, + State1 = State#state{ + task_queue = Queue1, + workers = maps:put(WorkerPID, {busy, Task}, Workers) + }, + assign_tasks(Rest, State1); + {empty, _} -> + State + end. + +%% @doc Handle successful task completion. +handle_task_complete(WorkerPID, Task, Result, State = #state{ + workers = Workers, + bundles = Bundles + }) -> + #task{bundle_id = BundleID} = Task, + ?event(bundler_debug, dev_bundler_task:log_task(task_complete, Task, [])), + State1 = State#state{ + workers = maps:put(WorkerPID, idle, Workers) + }, + case maps:get(BundleID, Bundles, undefined) of + undefined -> + ?event(bundler_short, {bundle_not_found, BundleID}), + State1; + Bundle -> + task_completed(Task, Bundle, Result, State1) + end. + +%% @doc Handle task failure and schedule a retry. +handle_task_failed(WorkerPID, Task, Reason, State = #state{ + workers = Workers, + opts = Opts + }) -> + RetryCount = Task#task.retry_count, + BaseDelay = hb_opts:get( + retry_base_delay_ms, ?DEFAULT_RETRY_BASE_DELAY_MS, Opts), + MaxDelay = hb_opts:get( + retry_max_delay_ms, ?DEFAULT_RETRY_MAX_DELAY_MS, Opts), + Jitter = hb_opts:get(retry_jitter, ?DEFAULT_RETRY_JITTER, Opts), + BaseDelayWithBackoff = min(BaseDelay * (1 bsl RetryCount), MaxDelay), + JitterFactor = (rand:uniform() * 2 - 1) * Jitter, + Delay = round(BaseDelayWithBackoff * (1 + JitterFactor)), + ?event( + bundler_short, + dev_bundler_task:log_task(task_failed_retrying, Task, [ + {reason, {explicit, Reason}}, + {retry_count, RetryCount}, + {delay_ms, Delay} + ]) + ), + Task1 = Task#task{retry_count = RetryCount + 1}, + erlang:send_after(Delay, self(), {retry_task, Task1}), + State#state{ + workers = maps:put(WorkerPID, idle, Workers) + }. + +%% @doc Apply task completion effects to server state. +task_completed(#task{bundle_id = BundleID, type = post_tx}, Bundle, CommittedTX, State) -> + Bundles = State#state.bundles, + Opts = State#state.opts, + Bundle1 = Bundle#bundle{status = tx_posted, tx = CommittedTX}, + State1 = State#state{ + bundles = maps:put(BundleID, Bundle1, Bundles) + }, + BuildProofsTask = #task{ + bundle_id = BundleID, + type = build_proofs, + data = CommittedTX, + opts = Opts + }, + enqueue_task(BuildProofsTask, State1); +task_completed(#task{bundle_id = BundleID, type = build_proofs}, Bundle, Proofs, State) -> + Bundles = State#state.bundles, + Opts = State#state.opts, + case Proofs of + [] -> + bundle_complete(Bundle, State); + _ -> + ProofsMap = maps:from_list([ + {maps:get(offset, Proof), #proof{proof = Proof, status = pending}} + || Proof <- Proofs + ]), + Bundle1 = Bundle#bundle{ + proofs = ProofsMap, + status = proofs_built + }, + State1 = State#state{ + bundles = maps:put(BundleID, Bundle1, Bundles) + }, + lists:foldl( + fun(ProofData, StateAcc) -> + ProofTask = #task{ + bundle_id = BundleID, + type = post_proof, + data = ProofData, + opts = Opts + }, + enqueue_task(ProofTask, StateAcc) + end, + State1, + Proofs + ) + end; +task_completed( + #task{bundle_id = BundleID, type = post_proof, data = ProofData}, + Bundle, + _Result, + State + ) -> + Bundles = State#state.bundles, + Offset = maps:get(offset, ProofData), + Proofs = Bundle#bundle.proofs, + Proofs1 = maps:update_with( + Offset, + fun(Proof) -> Proof#proof{status = seeded} end, + Proofs + ), + Bundle1 = Bundle#bundle{proofs = Proofs1}, + State1 = State#state{ + bundles = maps:put(BundleID, Bundle1, Bundles) + }, + AllSeeded = lists:all( + fun(#proof{status = Status}) -> Status =:= seeded end, + maps:values(Proofs1) + ), + case AllSeeded of + true -> + bundle_complete(Bundle1, State1); + false -> + State1 + end. + +%% @doc Mark a bundle as complete and remove it from state. +bundle_complete(Bundle, State = #state{opts = Opts}) -> + ok = dev_bundler_cache:complete_tx(Bundle#bundle.tx, Opts), + ElapsedTime = + timer:now_diff(erlang:timestamp(), Bundle#bundle.start_time) / 1000000, + ?event( + bundler_short, + {bundle_complete, + {bundle_id, Bundle#bundle.id}, + {timestamp, dev_bundler_task:format_timestamp()}, + {tx, {explicit, hb_message:id(Bundle#bundle.tx, signed, Opts)}}, + {elapsed_time_s, ElapsedTime} + } + ), + State#state{bundles = maps:remove(Bundle#bundle.id, State#state.bundles)}. + +%% @doc Recover a single bundle and enqueue any follow-up work. +recover_bundle(CommittedTX, Items, State = #state{opts = Opts}) -> + BundleID = make_ref(), + Bundle = #bundle{ + id = BundleID, + items = Items, + status = tx_posted, + tx = CommittedTX, + proofs = #{}, + start_time = erlang:timestamp() + }, + Bundles = State#state.bundles, + State1 = State#state{ + bundles = maps:put(BundleID, Bundle, Bundles) + }, + Task = #task{ + bundle_id = BundleID, + type = build_proofs, + data = CommittedTX, + opts = Opts + }, + enqueue_task(Task, State1). + %%%=================================================================== %%% Tests %%%=================================================================== @@ -451,29 +696,6 @@ dispatch_blocking_test() -> stop_test_servers(ServerHandle) end. -recover_unbundled_items_test() -> - Opts = #{store => hb_test_utils:test_store()}, - % Create and cache some items - Item1 = hb_message:convert(new_data_item(1, 10), <<"structured@1.0">>, <<"ans104@1.0">>, Opts), - Item2 = hb_message:convert(new_data_item(2, 10), <<"structured@1.0">>, <<"ans104@1.0">>, Opts), - Item3 = hb_message:convert(new_data_item(3, 10), <<"structured@1.0">>, <<"ans104@1.0">>, Opts), - ok = dev_bundler_cache:write_item(Item1, Opts), - ok = dev_bundler_cache:write_item(Item2, Opts), - ok = dev_bundler_cache:write_item(Item3, Opts), - % Bundle Item2 with a fake TX - FakeTX = ar_tx:sign(#tx{format = 2, tags = [{<<"test">>, <<"tx">>}]}, hb:wallet()), - StructuredTX = hb_message:convert(FakeTX, <<"structured@1.0">>, <<"tx@1.0">>, Opts), - ok = dev_bundler_cache:write_tx(StructuredTX, [Item2], Opts), - % Now recover unbundled items - {RecoveredItems, RecoveredBytes} = recover_unbundled_items(Opts), - ?assertEqual(3924, RecoveredBytes), - RecoveredItems2 = [ - hb_message:with_commitments( - #{ <<"commitment-device">> => <<"ans104@1.0">> }, Item, Opts) - || Item <- RecoveredItems], - ?assertEqual(lists:sort([Item1, Item3]), lists:sort(RecoveredItems2)), - ok. - %% @doc Test that items are recovered and posted while respecting the %% max_items limit. recover_respects_max_items_test() -> @@ -516,6 +738,457 @@ recover_respects_max_items_test() -> stop_test_servers(ServerHandle) end. +complete_task_sequence_test() -> + Anchor = rand:bytes(32), + Price = 12345, + {ServerHandle, NodeOpts} = start_mock_gateway(#{ + price => {200, integer_to_binary(Price)}, + tx_anchor => {200, hb_util:encode(Anchor)} + }), + try + Opts = NodeOpts#{ + priv_wallet => hb:wallet(), + store => hb_test_utils:test_store(), + bundler_max_items => 2, + retry_base_delay_ms => 100, + retry_jitter => 0 + }, + hb_http_server:start_node(Opts), + ensure_server(Opts), + Items = [ + new_structured_data_item(1, 10, Opts), + new_structured_data_item(2, 10, Opts) + ], + submit_test_items(Items, Opts), + TXs = hb_mock_server:get_requests(tx, 1, ServerHandle), + ?assertEqual(1, length(TXs)), + Proofs = hb_mock_server:get_requests(chunk, 1, ServerHandle), + ?assertEqual(1, length(Proofs)), + State = get_state(), + ?assertNotEqual(undefined, State), + ?assertNotEqual(timeout, State), + Workers = State#state.workers, + IdleWorkers = [ + PID + || {PID, Status} <- maps:to_list(Workers), Status =:= idle + ], + ?assertEqual(maps:size(Workers), length(IdleWorkers)), + Queue = State#state.task_queue, + ?assert(queue:is_empty(Queue)), + Bundles = State#state.bundles, + ?assertEqual(0, maps:size(Bundles)), + ok + after + stop_test_servers(ServerHandle) + end. + +recover_bundles_test() -> + Anchor = rand:bytes(32), + Price = 12345, + {ServerHandle, NodeOpts} = start_mock_gateway(#{ + chunk => fun(_Req) -> + timer:sleep(250), + {200, <<"OK">>} + end, + price => {200, integer_to_binary(Price)}, + tx_anchor => {200, hb_util:encode(Anchor)} + }), + try + Opts = NodeOpts#{ + priv_wallet => hb:wallet(), + store => hb_test_utils:test_store() + }, + hb_http_server:start_node(Opts), + Item1 = new_structured_data_item(1, 10, Opts), + Item2 = new_structured_data_item(2, 10, Opts), + Item3 = new_structured_data_item(3, 10, Opts), + ok = dev_bundler_cache:write_item(Item1, Opts), + ok = dev_bundler_cache:write_item(Item2, Opts), + ok = dev_bundler_cache:write_item(Item3, Opts), + {ok, TX} = dev_codec_tx:to( + lists:reverse([Item1, Item2, Item3]), #{}, #{}), + CommittedTX = hb_message:convert( + TX, <<"structured@1.0">>, <<"tx@1.0">>, Opts), + ok = dev_bundler_cache:write_tx(CommittedTX, [Item1, Item2, Item3], Opts), + Item4 = new_structured_data_item(4, 10, Opts), + ok = dev_bundler_cache:write_item(Item4, Opts), + {ok, TX2} = dev_codec_tx:to(lists:reverse([Item4]), #{}, #{}), + CommittedTX2 = hb_message:convert( + TX2, <<"structured@1.0">>, <<"tx@1.0">>, Opts), + ok = dev_bundler_cache:write_tx(CommittedTX2, [Item4], Opts), + ok = dev_bundler_cache:complete_tx(CommittedTX2, Opts), + ensure_server(Opts), + State = get_state(), + ?assertNotEqual(undefined, State), + ?assertNotEqual(timeout, State), + TXs = hb_mock_server:get_requests(tx, 1, ServerHandle, 200), + ?assertEqual([], TXs), + ?assert( + hb_util:wait_until( + fun() -> + dev_bundler_cache:load_bundle_states(Opts) =:= [] + end, + 2000 + ) + ), + FinalState = get_state(), + ?assertEqual(0, maps:size(FinalState#state.bundles)), + ok + after + stop_test_servers(ServerHandle) + end. + +post_tx_price_failure_retry_test() -> + Anchor = rand:bytes(32), + FailCount = 3, + setup_test_counter(price_attempts_counter), + {ServerHandle, NodeOpts} = start_mock_gateway(#{ + price => fun(_Req) -> + Count = increment_test_counter(price_attempts_counter) - 1, + case Count < FailCount of + true -> {500, <<"error">>}; + false -> {200, <<"12345">>} + end + end, + tx_anchor => {200, hb_util:encode(Anchor)} + }), + try + Opts = NodeOpts#{ + priv_wallet => hb:wallet(), + store => hb_test_utils:test_store(), + bundler_max_items => 1, + retry_base_delay_ms => 50, + retry_jitter => 0 + }, + hb_http_server:start_node(Opts), + ensure_server(Opts), + Items = [new_structured_data_item(1, 10, Opts)], + submit_test_items(Items, Opts), + TXs = hb_mock_server:get_requests(tx, 1, ServerHandle), + ?assertEqual(1, length(TXs)), + FinalCount = get_test_counter(price_attempts_counter), + ?assertEqual(FailCount + 1, FinalCount), + ok + after + cleanup_test_counter(price_attempts_counter), + stop_test_servers(ServerHandle) + end. + +post_tx_anchor_failure_retry_test() -> + Price = 12345, + FailCount = 3, + setup_test_counter(anchor_attempts_counter), + {ServerHandle, NodeOpts} = start_mock_gateway(#{ + price => {200, integer_to_binary(Price)}, + tx_anchor => fun(_Req) -> + Count = increment_test_counter(anchor_attempts_counter) - 1, + case Count < FailCount of + true -> {500, <<"error">>}; + false -> {200, hb_util:encode(rand:bytes(32))} + end + end + }), + try + Opts = NodeOpts#{ + priv_wallet => hb:wallet(), + store => hb_test_utils:test_store(), + bundler_max_items => 1, + retry_base_delay_ms => 50, + retry_jitter => 0 + }, + hb_http_server:start_node(Opts), + ensure_server(Opts), + Items = [new_structured_data_item(1, 10, Opts)], + submit_test_items(Items, Opts), + TXs = hb_mock_server:get_requests(tx, 1, ServerHandle), + ?assertEqual(1, length(TXs)), + FinalCount = get_test_counter(anchor_attempts_counter), + ?assertEqual(FailCount + 1, FinalCount), + ok + after + cleanup_test_counter(anchor_attempts_counter), + stop_test_servers(ServerHandle) + end. + +post_tx_post_failure_retry_test() -> + Anchor = rand:bytes(32), + Price = 12345, + FailCount = 4, + setup_test_counter(tx_attempts_counter), + {ServerHandle, NodeOpts} = start_mock_gateway(#{ + price => {200, integer_to_binary(Price)}, + tx_anchor => {200, hb_util:encode(Anchor)}, + tx => fun(_Req) -> + Count = increment_test_counter(tx_attempts_counter) - 1, + case Count < FailCount of + true -> {400, <<"Transaction verification failed">>}; + false -> {200, <<"OK">>} + end + end + }), + try + Opts = NodeOpts#{ + priv_wallet => hb:wallet(), + store => hb_test_utils:test_store(), + bundler_max_items => 1, + retry_base_delay_ms => 50, + retry_jitter => 0 + }, + hb_http_server:start_node(Opts), + ensure_server(Opts), + Items = [new_structured_data_item(1, 10, Opts)], + submit_test_items(Items, Opts), + TXs = hb_mock_server:get_requests(tx, FailCount + 1, ServerHandle), + ?assertEqual(FailCount + 1, length(TXs)), + FinalCount = get_test_counter(tx_attempts_counter), + ?assertEqual(FailCount + 1, FinalCount), + ok + after + cleanup_test_counter(tx_attempts_counter), + stop_test_servers(ServerHandle) + end. + +post_proof_failure_retry_test() -> + Anchor = rand:bytes(32), + Price = 12345, + FailCount = 2, + setup_test_counter(chunk_attempts_counter), + {ServerHandle, NodeOpts} = start_mock_gateway(#{ + price => {200, integer_to_binary(Price)}, + tx_anchor => {200, hb_util:encode(Anchor)}, + chunk => fun(_Req) -> + Count = increment_test_counter(chunk_attempts_counter) - 1, + case Count < FailCount of + true -> {500, <<"error">>}; + false -> {200, <<"OK">>} + end + end + }), + try + Opts = NodeOpts#{ + priv_wallet => hb:wallet(), + store => hb_test_utils:test_store(), + bundler_max_items => 1, + retry_base_delay_ms => 50, + retry_jitter => 0 + }, + hb_http_server:start_node(Opts), + ensure_server(Opts), + Items = [new_structured_data_item(1, floor(4.5 * ?DATA_CHUNK_SIZE), Opts)], + submit_test_items(Items, Opts), + TXs = hb_mock_server:get_requests(tx, 1, ServerHandle), + ?assertEqual(1, length(TXs)), + Chunks = hb_mock_server:get_requests(chunk, FailCount + 5, ServerHandle), + ?assertEqual(FailCount + 5, length(Chunks)), + FinalCount = get_test_counter(chunk_attempts_counter), + ?assertEqual(FailCount + 5, FinalCount), + ok + after + cleanup_test_counter(chunk_attempts_counter), + stop_test_servers(ServerHandle) + end. + +rapid_dispatch_test() -> + Anchor = rand:bytes(32), + Price = 12345, + {ServerHandle, NodeOpts} = start_mock_gateway(#{ + price => {200, integer_to_binary(Price)}, + tx_anchor => {200, hb_util:encode(Anchor)}, + tx => fun(_Req) -> + timer:sleep(100), + {200, <<"OK">>} + end + }), + try + Opts = NodeOpts#{ + priv_wallet => hb:wallet(), + store => hb_test_utils:test_store(), + bundler_max_items => 1, + bundler_workers => 3 + }, + hb_http_server:start_node(Opts), + ensure_server(Opts), + lists:foreach( + fun(I) -> + Items = [new_structured_data_item(I, 10, Opts)], + submit_test_items(Items, Opts) + end, + lists:seq(1, 10) + ), + TXs = hb_mock_server:get_requests(tx, 10, ServerHandle), + ?assertEqual(10, length(TXs)), + ok + after + stop_test_servers(ServerHandle) + end. + +one_bundle_fails_others_continue_test() -> + Anchor = rand:bytes(32), + Price = 12345, + setup_test_counter(mixed_attempts_counter), + {ServerHandle, NodeOpts} = start_mock_gateway(#{ + price => {200, integer_to_binary(Price)}, + tx_anchor => {200, hb_util:encode(Anchor)}, + tx => fun(_Req) -> + Count = increment_test_counter(mixed_attempts_counter) - 1, + case Count of + 0 -> {200, <<"OK">>}; + _ -> {400, <<"fail">>} + end + end + }), + try + Opts = NodeOpts#{ + priv_wallet => hb:wallet(), + store => hb_test_utils:test_store(), + bundler_max_items => 1, + retry_base_delay_ms => 100, + retry_jitter => 0 + }, + hb_http_server:start_node(Opts), + ensure_server(Opts), + Items1 = [new_structured_data_item(1, 10, Opts)], + submit_test_items(Items1, Opts), + Items2 = [new_structured_data_item(2, 10, Opts)], + submit_test_items(Items2, Opts), + TXs = hb_mock_server:get_requests(tx, 5, ServerHandle), + ?assert(length(TXs) >= 5, length(TXs)), + ok + after + cleanup_test_counter(mixed_attempts_counter), + stop_test_servers(ServerHandle) + end. + +parallel_task_execution_test() -> + Anchor = rand:bytes(32), + Price = 12345, + SleepTime = 120, + {ServerHandle, NodeOpts} = start_mock_gateway(#{ + price => {200, integer_to_binary(Price)}, + tx_anchor => {200, hb_util:encode(Anchor)}, + chunk => fun(_Req) -> + timer:sleep(SleepTime), + {200, <<"OK">>} + end + }), + try + Opts = NodeOpts#{ + priv_wallet => hb:wallet(), + store => hb_test_utils:test_store(), + bundler_max_items => 1, + bundler_workers => 5 + }, + hb_http_server:start_node(Opts), + ensure_server(Opts), + lists:foreach( + fun(I) -> + Items = [new_structured_data_item(I, 10, Opts)], + submit_test_items(Items, Opts) + end, + lists:seq(1, 10) + ), + StartTime = erlang:system_time(millisecond), + Chunks = hb_mock_server:get_requests(chunk, 10, ServerHandle), + ElapsedTime = erlang:system_time(millisecond) - StartTime, + ?assertEqual(10, length(Chunks)), + ?assert(ElapsedTime < 2000, "ElapsedTime: " ++ integer_to_list(ElapsedTime)), + ok + after + stop_test_servers(ServerHandle) + end. + +exponential_backoff_timing_test() -> + Anchor = rand:bytes(32), + Price = 12345, + FailCount = 5, + setup_test_counter(backoff_cap_counter), + {ServerHandle, NodeOpts} = start_mock_gateway(#{ + price => {200, integer_to_binary(Price)}, + tx_anchor => {200, hb_util:encode(Anchor)}, + tx => fun(_Req) -> + Timestamp = erlang:system_time(millisecond), + Attempt = increment_test_counter(backoff_cap_counter), + Count = Attempt - 1, + add_test_attempt_timestamp(backoff_cap_counter, Attempt, Timestamp), + case Count < FailCount of + true -> {400, <<"fail">>}; + false -> {200, <<"OK">>} + end + end + }), + try + Opts = NodeOpts#{ + priv_wallet => hb:wallet(), + store => hb_test_utils:test_store(), + bundler_max_items => 1, + retry_base_delay_ms => 100, + retry_max_delay_ms => 500, + retry_jitter => 0 + }, + hb_http_server:start_node(Opts), + ensure_server(Opts), + Items = [new_structured_data_item(1, 10, Opts)], + submit_test_items(Items, Opts), + TXs = hb_mock_server:get_requests(tx, FailCount + 1, ServerHandle, 5000), + ?assertEqual(FailCount + 1, length(TXs)), + Timestamps = test_attempt_timestamps(backoff_cap_counter), + ?assertEqual(6, length(Timestamps)), + [T1, T2, T3, T4, T5, T6] = Timestamps, + Delay1 = T2 - T1, + Delay2 = T3 - T2, + Delay3 = T4 - T3, + Delay4 = T5 - T4, + Delay5 = T6 - T5, + ?assert(Delay1 >= 70 andalso Delay1 =< 200, Delay1), + ?assert(Delay2 >= 150 andalso Delay2 =< 300, Delay2), + ?assert(Delay3 >= 300 andalso Delay3 =< 500, Delay3), + ?assert(Delay4 >= 400 andalso Delay4 =< 700, Delay4), + ?assert(Delay5 >= 400 andalso Delay5 =< 700, Delay5), + ok + after + cleanup_test_counter(backoff_cap_counter), + stop_test_servers(ServerHandle) + end. + +independent_task_retry_counts_test() -> + Anchor = rand:bytes(32), + Price = 12345, + setup_test_counter(independent_retry_counter), + {ServerHandle, NodeOpts} = start_mock_gateway(#{ + price => {200, integer_to_binary(Price)}, + tx_anchor => {200, hb_util:encode(Anchor)}, + tx => fun(_Req) -> + Count = increment_test_counter(independent_retry_counter) - 1, + case Count < 2 of + true -> {400, <<"fail">>}; + false -> {200, <<"OK">>} + end + end + }), + try + Opts = NodeOpts#{ + priv_wallet => hb:wallet(), + store => hb_test_utils:test_store(), + bundler_max_items => 1, + retry_base_delay_ms => 100, + retry_jitter => 0 + }, + hb_http_server:start_node(Opts), + ensure_server(Opts), + Items1 = [new_structured_data_item(1, 10, Opts)], + submit_test_items(Items1, Opts), + hb_mock_server:get_requests(tx, 3, ServerHandle), + Items2 = [new_structured_data_item(2, 10, Opts)], + submit_test_items(Items2, Opts), + TotalAttempts = 4, + TXs = hb_mock_server:get_requests(tx, TotalAttempts, ServerHandle), + ?assertEqual(TotalAttempts, length(TXs)), + ok + after + cleanup_test_counter(independent_retry_counter), + stop_test_servers(ServerHandle) + end. + invalid_item_test() -> Anchor = rand:bytes(32), Price = 12345, @@ -584,14 +1257,12 @@ cache_write_failure_test() -> <<"error">> := <<"cache_write_failed">>}}, Result), ok after - stop_server(), - dev_bundler_dispatch:stop_dispatcher() + stop_server() end. stop_test_servers(ServerHandle) -> hb_mock_server:stop(ServerHandle), - stop_server(), - dev_bundler_dispatch:stop_dispatcher(). + stop_server(). test_bundle(Opts) -> Anchor = rand:bytes(32), @@ -660,6 +1331,24 @@ test_api_error(Responses) -> new_data_item(Index, Size) -> new_data_item(Index, Size, hb:wallet()). +new_structured_data_item(Index, Size, Opts) -> + hb_message:convert( + new_data_item(Index, Size), + <<"structured@1.0">>, + <<"ans104@1.0">>, + Opts + ). + +submit_test_items([], _Opts) -> + ok; +submit_test_items(Items, Opts) -> + lists:foreach( + fun(Item) -> + ?assertMatch({ok, _}, item(#{}, Item, Opts)) + end, + Items + ). + new_data_item(Index, Size, Wallet) -> Data = rand:bytes(Size), Tag = <<"tag", (integer_to_binary(Index))/binary>>, @@ -780,4 +1469,35 @@ start_mock_gateway(Responses) -> } ] }, - {ServerHandle, NodeOpts}. \ No newline at end of file + {ServerHandle, NodeOpts}. + +setup_test_counter(Table) -> + cleanup_test_counter(Table), + ets:new(Table, [named_table, public, set]), + ok. + +cleanup_test_counter(Table) -> + case ets:info(Table) of + undefined -> ok; + _ -> ets:delete(Table), ok + end. + +increment_test_counter(Table) -> + ets:update_counter(Table, Table, {2, 1}, {Table, 0}). + +get_test_counter(Table) -> + case ets:lookup(Table, Table) of + [{_, Value}] -> Value; + [] -> 0 + end. + +add_test_attempt_timestamp(Table, Attempt, Timestamp) -> + ets:insert(Table, {{Table, Attempt}, Timestamp}). + +test_attempt_timestamps(Table) -> + TimestampEntries = [ + {Attempt, Timestamp} + || {{Prefix1, Attempt}, Timestamp} <- ets:tab2list(Table), + Prefix1 =:= Table + ], + [Timestamp || {_, Timestamp} <- lists:sort(TimestampEntries)]. \ No newline at end of file diff --git a/src/dev_bundler_cache.erl b/src/dev_bundler_cache.erl index e0df60d6a..ff5dac963 100644 --- a/src/dev_bundler_cache.erl +++ b/src/dev_bundler_cache.erl @@ -7,17 +7,18 @@ %%% %%% Recovery flow: %%% 1. Load unbundled items (where bundle = <<>>) back into dev_bundler queue -%%% 2. Load TX states and reconstruct dev_bundler_dispatch bundles +%%% 2. Load TX states and reconstruct in-progress bundler bundles %%% 3. Enqueue appropriate tasks based on status -module(dev_bundler_cache). -export([ write_item/2, write_tx/3, complete_tx/2, - load_unbundled_items/1, load_bundle_states/1, - load_bundled_items/2, - load_tx/2 + load_tx/2, + load_items/2, + load_items/4, + list_item_ids/1 ]). -include("include/hb.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -55,11 +56,13 @@ get_item_bundle(Item, Opts) when is_map(Item) -> %% @doc Construct the pseudopath for an item's bundle reference. %% Item should be a structured message. item_path(Item, Opts) when is_map(Item) -> + item_path(item_id(Item, Opts), Opts); +item_path(ItemID, Opts) when is_binary(ItemID) -> Store = hb_opts:get(store, no_viable_store, Opts), hb_store:path(Store, [ ?BUNDLER_PREFIX, <<"item">>, - item_id(Item, Opts), + ItemID, <<"bundle">> ]). @@ -78,7 +81,8 @@ write_tx(TX, Items, Opts) when is_map(TX) -> ok = link_item_to_tx(Item, TX, Opts) end, Items - ). + ), + ok. complete_tx(TX, Opts) -> set_tx_status(TX, <<"complete">>, Opts). @@ -110,57 +114,6 @@ tx_path(TX, Opts) -> %%% Recovery operations -%% @doc Load all unbundled items (where bundle = <<>>) from cache. -%% Returns list of actual Item messages for re-queuing. -load_unbundled_items(Opts) -> - Store = hb_opts:get(store, no_viable_store, Opts), - ItemsPath = hb_store:path(Store, [?BUNDLER_PREFIX, <<"item">>]), - % List all item IDs - ItemIDs = case hb_cache:list(ItemsPath, Opts) of - [] -> []; - List -> List - end, - ?event(bundler_short, - {recovering_all_unbundled_items, length(ItemIDs)} - ), - % Filter for unbundled items and load them - lists:filtermap( - fun(ItemIDStr) -> - % Read the bundle pseudopath directly - BundlePath = hb_store:path(Store, [ - ?BUNDLER_PREFIX, - <<"item">>, - ItemIDStr, - <<"bundle">> - ]), - case read_pseudopath(BundlePath, Opts) of - {ok, <<>>} -> - % Unbundled item - load it fully (resolve all links) - case hb_cache:read(ItemIDStr, Opts) of - {ok, Item} -> - FullyLoadedItem = hb_cache:ensure_all_loaded(Item, Opts), - ?event(bundler_short, - {recovered_unbundled_item, - {id, {string, ItemIDStr}} - } - ), - {true, FullyLoadedItem}; - _ -> - ?event(bundler_short, - {failed_to_recover_unbundled_item, - {id, {string, ItemIDStr}} - } - ), - false - end; - _ -> - % Already bundled or not found - false - end - end, - ItemIDs - ). - %% @doc Load all bundle TX states from cache. %% Returns list of {TXID, Status} tuples. load_bundle_states(Opts) -> @@ -193,60 +146,6 @@ load_bundle_states(Opts) -> TXIDs ). -%% @doc Load all data items associated with a bundle TX. -%% Uses the item pseudopaths to find items with matching tx-id. -load_bundled_items(TXID, Opts) -> - Store = hb_opts:get(store, no_viable_store, Opts), - ItemsPath = hb_store:path(Store, [?BUNDLER_PREFIX, <<"item">>]), - % List all item IDs - ItemIDs = case hb_cache:list(ItemsPath, Opts) of - [] -> []; - List -> List - end, - ?event(bundler_short, {recovering_bundled_items, - {count, length(ItemIDs)}}), - % Filter for items belonging to this TX and load them - lists:filtermap( - fun(ItemIDStr) -> - % Read the bundle pseudopath directly - BundlePath = hb_store:path(Store, [ - ?BUNDLER_PREFIX, - <<"item">>, - ItemIDStr, - <<"bundle">> - ]), - case read_pseudopath(BundlePath, Opts) of - {ok, BundleTXID} when BundleTXID =:= TXID -> - % This item belongs to our bundle - load it fully (resolve all links) - case hb_cache:read(ItemIDStr, Opts) of - {ok, Item} -> - FullyLoadedItem = hb_cache:ensure_all_loaded(Item, Opts), - ?event( - bundler_debug, - {loaded_tx_item, - {tx_id, {explicit, TXID}}, - {item_id, {explicit, ItemIDStr}} - } - ), - {true, FullyLoadedItem}; - _ -> - ?event( - error, - {failed_to_load_tx_item, - {tx_id, {explicit, TXID}}, - {item_id, {explicit, ItemIDStr}} - } - ), - false - end; - _ -> - % Doesn't belong to this bundle or not found - false - end - end, - ItemIDs - ). - %% @doc Load a TX from cache by its ID. load_tx(TXID, Opts) -> ?event(bundler_debug, {load_tx, {tx_id, {explicit, TXID}}}), @@ -265,7 +164,10 @@ load_tx(TXID, Opts) -> %% @doc Write a value to a pseudopath. write_pseudopath(Path, Value, Opts) -> Store = hb_opts:get(store, no_viable_store, Opts), - hb_store:write(Store, Path, Value). + Result = hb_store:write(Store, Path, Value), + % force a flush to disk + hb_store:read(Store, Path), + Result. %% @doc Read a value from a pseudopath. read_pseudopath(Path, Opts) -> @@ -275,6 +177,47 @@ read_pseudopath(Path, Opts) -> _ -> not_found end. +%% @doc List all cached bundler item IDs. +list_item_ids(Opts) -> + Store = hb_opts:get(store, no_viable_store, Opts), + ItemsPath = hb_store:path(Store, [?BUNDLER_PREFIX, <<"item">>]), + case hb_cache:list(ItemsPath, Opts) of + [] -> []; + List -> List + end. + +%% @doc Load all items whose bundle pseudopath matches BundleID. +load_items(BundleID, Opts) -> + load_items( + BundleID, + Opts, + fun(_ItemID, _Item) -> ok end, + fun(_ItemID) -> ok end + ). + +%% @doc Load all items whose bundle pseudopath matches BundleID and invoke callbacks. +load_items(BundleID, Opts, OnLoaded, OnFailed) -> + lists:filtermap( + fun(ItemID) -> + BundlePath = item_path(ItemID, Opts), + case read_pseudopath(BundlePath, Opts) of + {ok, BundleID} -> + case hb_cache:read(ItemID, Opts) of + {ok, Item} -> + FullyLoadedItem = hb_cache:ensure_all_loaded(Item, Opts), + OnLoaded(ItemID, FullyLoadedItem), + {true, FullyLoadedItem}; + _ -> + OnFailed(ItemID), + false + end; + _ -> + false + end + end, + list_item_ids(Opts) + ). + %%% Tests basic_cache_test() -> @@ -306,7 +249,7 @@ load_unbundled_items_test() -> % Link item2 to a bundle, leave others unbundled ok = write_tx(TX, [Item2], Opts), % Load unbundled items - UnbundledItems1 = load_unbundled_items(Opts), + UnbundledItems1 = load_items(<<>>, Opts), UnbundledItems2 = [ hb_message:with_commitments( #{ <<"commitment-device">> => <<"ans104@1.0">> }, @@ -317,6 +260,17 @@ load_unbundled_items_test() -> ?assertEqual(lists:sort([Item1, Item3]), UnbundledItems3), ok. +recovered_items_relink_to_original_bundle_path_test() -> + Opts = #{store => hb_test_utils:test_store()}, + Item = new_data_item(1, <<"data1">>, Opts), + ok = write_item(Item, Opts), + [RecoveredItem] = load_items(<<>>, Opts), + TX = new_tx(1, Opts), + ok = write_tx(TX, [RecoveredItem], Opts), + ?assertEqual(tx_id(TX, Opts), get_item_bundle(Item, Opts)), + ?assertEqual([], load_items(<<>>, Opts)), + ok. + load_bundle_states_test() -> Opts = #{store => hb_test_utils:test_store()}, TX1 = new_tx(1, Opts), @@ -348,7 +302,7 @@ load_bundled_items_test() -> ok = write_tx(TX1, [Item1, Item2], Opts), ok = write_tx(TX2, [Item3], Opts), % Load items for bundle 1 - Bundle1Items1 = load_bundled_items(tx_id(TX1, Opts), Opts), + Bundle1Items1 = load_items(tx_id(TX1, Opts), Opts), Bundle1Items2 = [ hb_message:with_commitments( #{ <<"commitment-device">> => <<"ans104@1.0">> }, @@ -357,7 +311,7 @@ load_bundled_items_test() -> Bundle1Items3 = lists:sort(Bundle1Items2), ?assertEqual(lists:sort([Item1, Item2]), Bundle1Items3), % Load items for bundle 2 - Bundle2Items1 = load_bundled_items(tx_id(TX2, Opts), Opts), + Bundle2Items1 = load_items(tx_id(TX2, Opts), Opts), Bundle2Items2 = [ hb_message:with_commitments( #{ <<"commitment-device">> => <<"ans104@1.0">> }, @@ -458,11 +412,7 @@ bundler_optimistic_cache_test() -> ), ok after - case hb_name:lookup(bundler_server) of - undefined -> ok; - PID -> PID ! stop, hb_name:unregister(bundler_server) - end, - dev_bundler_dispatch:stop_dispatcher() + dev_bundler:stop_server() end. new_data_item(Index, SizeOrData, Opts) -> diff --git a/src/dev_bundler_dispatch.erl b/src/dev_bundler_dispatch.erl deleted file mode 100644 index 2c286091c..000000000 --- a/src/dev_bundler_dispatch.erl +++ /dev/null @@ -1,1118 +0,0 @@ -%%% @doc A dispatcher for the bundler device (dev_bundler). This module -%%% manages a worker pool to handle bundle building, TX posting, proof -%%% generation, and chunk seeding. Failed tasks are automatically re-queued -%%% for immediate retry until successful. --module(dev_bundler_dispatch). --export([dispatch/2, ensure_dispatcher/1, stop_dispatcher/0]). --include("include/hb.hrl"). --include_lib("eunit/include/eunit.hrl"). - -%%% State record for the dispatcher process. --record(state, { - workers, % Map of WorkerPID => idle | {busy, Task} - task_queue, % Queue of pending tasks - bundles, % Map of BundleID => #bundle{} - opts % Configuration options -}). - -%%% Task record representing work to be done by a worker. --record(task, { - bundle_id, % ID of the bundle this task belongs to - type, % Task type: post_tx | build_proofs | post_proof - data, % Task-specific data (map) - opts, % Configuration options - retry_count = 0 % Number of times this task has been retried -}). - -%%% Proof record to track individual proof seeding status. --record(proof, { - proof, % The proof data (chunk, merkle path, etc) - status % pending | seeded -}). - -%%% Bundle record to track bundle progress through the dispatch pipeline. --record(bundle, { - id, % Unique bundle identifier - items, % List of dataitems to bundle - status, % Current state (initializing, tx_built, tx_posted, proofs_built) - tx, % The built/signed transaction - proofs, % Map of offset => #proof{} records - start_time % The time the bundle was started -}). - -%%% Default options. --define(DISPATCHER_NAME, bundler_dispatcher). --define(DEFAULT_NUM_WORKERS, 20). --define(DEFAULT_RETRY_BASE_DELAY_MS, 1000). --define(DEFAULT_RETRY_MAX_DELAY_MS, 600000). % 10 minutes --define(DEFAULT_RETRY_JITTER, 0.25). % ±25% jitter - -%% @doc Dispatch the queue. -dispatch([], _Opts) -> - ok; -dispatch(Items, Opts) -> - PID = ensure_dispatcher(Opts), - PID ! {dispatch, Items}. - -%% @doc Return the PID of the dispatch server. If the server is not running, -%% it is started and registered with the name `?SERVER_NAME'. -ensure_dispatcher(Opts) -> - case hb_name:lookup(?DISPATCHER_NAME) of - undefined -> - PID = spawn(fun() -> init(Opts) end), - ?event(bundler_short, {starting_dispatcher, {pid, PID}}), - hb_name:register(?DISPATCHER_NAME, PID), - hb_name:lookup(?DISPATCHER_NAME); - PID -> PID - end. - -stop_dispatcher() -> - case hb_name:lookup(?DISPATCHER_NAME) of - undefined -> ok; - PID -> - PID ! stop, - hb_name:unregister(?DISPATCHER_NAME) - end. - -get_state() -> - case hb_name:lookup(?DISPATCHER_NAME) of - undefined -> undefined; - PID -> - PID ! {get_state, self(), Ref = make_ref()}, - receive - {state, Ref, State} -> State - after 1000 -> timeout - end - end. - -%% @doc Initialize the dispatcher with worker pool. -init(Opts) -> - NumWorkers = hb_opts:get(bundler_workers, ?DEFAULT_NUM_WORKERS, Opts), - Workers = lists:map( - fun(_) -> - WorkerPID = spawn_link(fun() -> worker_loop() end), - {WorkerPID, idle} - end, - lists:seq(1, NumWorkers) - ), - State = #state{ - workers = maps:from_list(Workers), - task_queue = queue:new(), - bundles = #{}, - opts = Opts - }, - % Recover any in-progress bundles from cache - State1 = recover_bundles(State), - dispatcher(assign_tasks(State1)). - -%% @doc The main loop of the dispatcher. Manages task queue and worker pool. -dispatcher(State) -> - receive - {dispatch, Items} -> - % Create a new bundle and queue the post_tx task - Opts = State#state.opts, - BundleID = make_ref(), - Bundle = #bundle{ - id = BundleID, - items = Items, - status = initializing, - tx = undefined, - proofs = #{}, - start_time = erlang:timestamp() - }, - State1 = State#state{ - bundles = maps:put(BundleID, Bundle, State#state.bundles) - }, - ?event(bundler_short, - {dispatching_bundle, - {timestamp, format_timestamp()}, - {bundle_id, BundleID}, - {num_items, length(Items)} - } - ), - Task = #task{bundle_id = BundleID, type = post_tx, data = Items, opts = Opts}, - State2 = enqueue_task(Task, State1), - % Assign tasks to idle workers - dispatcher(assign_tasks(State2)); - {task_complete, WorkerPID, Task, Result} -> - State1 = handle_task_complete(WorkerPID, Task, Result, State), - dispatcher(assign_tasks(State1)); - {task_failed, WorkerPID, Task, Reason} -> - State1 = handle_task_failed(WorkerPID, Task, Reason, State), - dispatcher(assign_tasks(State1)); - {retry_task, Task} -> - % Re-enqueue the task after backoff delay - State1 = enqueue_task(Task, State), - dispatcher(assign_tasks(State1)); - {get_state, From, Ref} -> - From ! {state, Ref, State}, - dispatcher(State); - stop -> - % Stop all workers - maps:foreach( - fun(WorkerPID, _) -> WorkerPID ! stop end, - State#state.workers - ), - exit(normal) - end. - -%% @doc Enqueue a task to the task queue. -enqueue_task(Task, State) -> - Queue = State#state.task_queue, - State#state{task_queue = queue:in(Task, Queue)}. - -%% @doc Format a task for logging. -format_task(#task{bundle_id = BundleID, type = post_tx, data = DataItems}) -> - {post_tx, {timestamp, format_timestamp()}, {bundle, BundleID}, - {num_items, length(DataItems)}}; -format_task(#task{bundle_id = BundleID, type = build_proofs, data = CommittedTX}) -> - {build_proofs, {timestamp, format_timestamp()}, {bundle, BundleID}, - {tx, {explicit, hb_message:id(CommittedTX, signed, #{})}}}; -format_task(#task{bundle_id = BundleID, type = post_proof, data = Proof}) -> - Offset = maps:get(offset, Proof), - {post_proof, {timestamp, format_timestamp()}, {bundle, BundleID}, - {offset, Offset}}. - -%% @doc Format erlang:timestamp() as a user-friendly RFC3339 string with milliseconds. -format_timestamp() -> - {MegaSecs, Secs, MicroSecs} = erlang:timestamp(), - Millisecs = (MegaSecs * 1000000 + Secs) * 1000 + (MicroSecs div 1000), - calendar:system_time_to_rfc3339(Millisecs, [{unit, millisecond}, {offset, "Z"}]). - -%% @doc Assign tasks to all idle workers until no idle workers -%% or no tasks remain. -assign_tasks(State) -> - IdleWorkers = maps:filter( - fun(_, Status) -> Status =:= idle end, - State#state.workers), - assign_tasks(maps:keys(IdleWorkers), State). - -assign_tasks([], State) -> - % No more idle workers - State; -assign_tasks([WorkerPID | Rest], State) -> - Workers = State#state.workers, - Queue = State#state.task_queue, - case queue:out(Queue) of - {{value, Task}, Queue1} -> - % Assign task to this worker - WorkerPID ! {execute_task, self(), Task}, - State1 = State#state{ - task_queue = Queue1, - workers = maps:put(WorkerPID, {busy, Task}, Workers) - }, - % Continue with remaining idle workers - assign_tasks(Rest, State1); - {empty, _} -> - % No more tasks, stop - State - end. - -handle_task_complete(WorkerPID, Task, Result, State) -> - Workers = State#state.workers, - Bundles = State#state.bundles, - #task{bundle_id = BundleID} = Task, - ?event(bundler_debug, {task_complete, format_task(Task)}), - % Update worker to idle - State1 = State#state{ - workers = maps:put(WorkerPID, idle, Workers) - }, - case maps:get(BundleID, Bundles, undefined) of - undefined -> - ?event(bundler_short, {bundle_not_found, BundleID}), - State1; - Bundle -> - task_completed(Task, Bundle, Result, State1) - end. - -handle_task_failed(WorkerPID, Task, Reason, State) -> - Workers = State#state.workers, - Opts = State#state.opts, - RetryCount = Task#task.retry_count, - % Calculate exponential backoff delay - BaseDelay = hb_opts:get(retry_base_delay_ms, ?DEFAULT_RETRY_BASE_DELAY_MS, Opts), - MaxDelay = hb_opts:get(retry_max_delay_ms, ?DEFAULT_RETRY_MAX_DELAY_MS, Opts), - Jitter = hb_opts:get(retry_jitter, ?DEFAULT_RETRY_JITTER, Opts), - % Compute base delay with exponential backoff: min(base * 2^retry_count, max_delay) - BaseDelayWithBackoff = min(BaseDelay * (1 bsl RetryCount), MaxDelay), - % Apply jitter: delay * (1 + random(-jitter, +jitter)) - % This distributes the delay across [delay * (1-jitter), delay * (1+jitter)] - JitterFactor = (rand:uniform() * 2 - 1) * Jitter, % Random value in [-jitter, +jitter] - Delay = round(BaseDelayWithBackoff * (1 + JitterFactor)), - ?event( - bundler_short, - {task_failed_retrying, format_task(Task), - {reason, {explicit, Reason}}, - {retry_count, RetryCount}, {delay_ms, Delay} - } - ), - % Update worker to idle - State1 = State#state{ - workers = maps:put(WorkerPID, idle, Workers) - }, - % Increment retry count and schedule delayed retry - Task1 = Task#task{retry_count = RetryCount + 1}, - erlang:send_after(Delay, self(), {retry_task, Task1}), - State1. - -task_completed(#task{bundle_id = BundleID, type = post_tx}, Bundle, CommittedTX, State) -> - Bundles = State#state.bundles, - Opts = State#state.opts, - Bundle1 = Bundle#bundle{status = tx_posted, tx = CommittedTX}, - State1 = State#state{ - bundles = maps:put(BundleID, Bundle1, Bundles) - }, - BuildProofsTask = #task{ - bundle_id = BundleID, type = build_proofs, - data = CommittedTX, opts = Opts}, - enqueue_task(BuildProofsTask, State1); - -task_completed(#task{bundle_id = BundleID, type = build_proofs}, Bundle, Proofs, State) -> - Bundles = State#state.bundles, - Opts = State#state.opts, - case Proofs of - [] -> - % No proofs, bundle complete - bundle_complete(Bundle, State); - _ -> - % Proofs built, wrap each in a proof record with offset as key - ProofsMap = maps:from_list([ - {maps:get(offset, P), #proof{proof = P, status = pending}} || P <- Proofs - ]), - Bundle1 = Bundle#bundle{ - proofs = ProofsMap, - status = proofs_built - }, - State1 = State#state{ - bundles = maps:put(BundleID, Bundle1, Bundles) - }, - % Enqueue all post_proof tasks - lists:foldl( - fun(ProofData, S) -> - ProofTask = #task{ - bundle_id = BundleID, - type = post_proof, - data = ProofData, - opts = Opts - }, - enqueue_task(ProofTask, S) - end, - State1, - Proofs - ) - end; - -task_completed(#task{bundle_id = BundleID, type = post_proof, data = ProofData}, Bundle, _Result, State) -> - Bundles = State#state.bundles, - Offset = maps:get(offset, ProofData), - Proofs = Bundle#bundle.proofs, - Proofs1 = maps:update_with( - Offset, - fun(P) -> P#proof{status = seeded} end, - Proofs - ), - Bundle1 = Bundle#bundle{proofs = Proofs1}, - State1 = State#state{ - bundles = maps:put(BundleID, Bundle1, Bundles) - }, - % Check if all proofs are seeded - AllSeeded = lists:all( - fun(#proof{status = Status}) -> Status =:= seeded end, - maps:values(Proofs1) - ), - case AllSeeded of - true -> - bundle_complete(Bundle, State1); - false -> - State1 - end. - -%% @doc Mark a bundle as complete and remove it from state. -bundle_complete(Bundle, State) -> - Opts = State#state.opts, - ok = dev_bundler_cache:complete_tx(Bundle#bundle.tx, Opts), - ElapsedTime = - timer:now_diff(erlang:timestamp(), Bundle#bundle.start_time) / 1000000, - ?event(bundler_short, {bundle_complete, {bundle_id, Bundle#bundle.id}, - {timestamp, format_timestamp()}, - {tx, {explicit, hb_message:id(Bundle#bundle.tx, signed, Opts)}}, - {elapsed_time_s, ElapsedTime}}), - State#state{bundles = maps:remove(Bundle#bundle.id, State#state.bundles)}. - -%%% Recovery - -%% @doc Recover in-progress bundles from cache after a crash. -recover_bundles(State) -> - Opts = State#state.opts, - % Reconstruct bundles and enqueue appropriate tasks - lists:foldl( - fun({TXID, Status}, StateAcc) -> - recover_bundle(TXID, Status, StateAcc) - end, - State, - dev_bundler_cache:load_bundle_states(Opts) - ). - -%% @doc Recover a single bundle based on its cached state. -recover_bundle(TXID, Status, State) -> - Opts = State#state.opts, - ?event(bundler_short, {recovering_bundle, - {tx_id, {explicit, TXID}}, - {status, Status} - }), - try - % Load the TX and its items - CommittedTX = dev_bundler_cache:load_tx(TXID, Opts), - Items = dev_bundler_cache:load_bundled_items(TXID, Opts), - % Create a new bundle record - BundleID = make_ref(), - Bundle = #bundle{ - id = BundleID, - items = Items, - status = tx_posted, - tx = CommittedTX, - proofs = #{}, - start_time = erlang:timestamp() - }, - % Add bundle to state - Bundles = State#state.bundles, - State1 = State#state{ - bundles = maps:put(BundleID, Bundle, Bundles) - }, - - % Enqueue appropriate task based on status - Task = #task{ - bundle_id = BundleID, type = build_proofs, - data = CommittedTX, opts = Opts}, - enqueue_task(Task, State1) - catch - _:Error:Stack -> - ?event(bundler_short, {failed_to_recover_bundle, - {tx_id, {explicit, TXID}}, - {error, Error}, - {stack, Stack} - }), - % Skip this bundle and continue - State - end. - -%%% Worker implementation - -%% @doc Worker loop - executes tasks and reports back to dispatcher. -worker_loop() -> - receive - {execute_task, DispatcherPID, Task} -> - case execute_task(Task) of - {ok, Value} -> - DispatcherPID ! {task_complete, self(), Task, Value}; - {error, Reason} -> - DispatcherPID ! {task_failed, self(), Task, Reason} - end, - - worker_loop(); - stop -> - exit(normal) - end. - -%% @doc Execute a specific task. -execute_task(#task{type = post_tx, data = Items, opts = Opts} = Task) -> - try - ?event(bundler_debug, {execute_task, format_task(Task)}), - % Get price and anchor - {ok, TX} = dev_codec_tx:to(lists:reverse(Items), #{}, #{}), - DataSize = TX#tx.data_size, - PriceResult = get_price(DataSize, Opts), - AnchorResult = get_anchor(Opts), - case {PriceResult, AnchorResult} of - {{ok, Price}, {ok, Anchor}} -> - % Sign the TX - Wallet = hb_opts:get(priv_wallet, no_viable_wallet, Opts), - SignedTX = ar_tx:sign(TX#tx{ anchor = Anchor, reward = Price }, Wallet), - % Convert and post - Committed = hb_message:convert( - SignedTX, - #{ <<"device">> => <<"structured@1.0">>, <<"bundle">> => true }, - #{ <<"device">> => <<"tx@1.0">>, <<"bundle">> => true }, - Opts), - ?event(bundler_short, {posting_tx, - {tx, {explicit, hb_message:id(Committed, signed, Opts)}}}), - PostTXResponse = hb_ao:resolve( - #{ <<"device">> => <<"arweave@2.9">> }, - Committed#{ - <<"path">> => <<"/tx">>, - <<"method">> => <<"POST">> - }, - Opts - ), - case PostTXResponse of - {ok, _Result} -> - dev_bundler_cache:write_tx( - Committed, - Items, - Opts - ), - {ok, Committed}; - {_, ErrorReason} -> {error, ErrorReason} - end; - {PriceErr, AnchorErr} -> - ?event(bundle_short, {post_tx_failed, - format_task(Task), - {price, PriceErr}, - {anchor, AnchorErr}}), - {error, {PriceErr, AnchorErr}} - end - catch - _:Err:_Stack -> - ?event(bundle_short, {post_tx_failed, - format_task(Task), - {error, Err}}), - {error, Err} - end; - -execute_task(#task{type = build_proofs, data = CommittedTX, opts = Opts} = Task) -> - try - ?event(bundler_debug, {execute_task, format_task(Task)}), - % Calculate chunks and proofs - TX = hb_message:convert( - CommittedTX, <<"tx@1.0">>, <<"structured@1.0">>, Opts), - Data = TX#tx.data, - DataRoot = TX#tx.data_root, - DataSize = TX#tx.data_size, - Mode = ar_tx:chunking_mode(TX#tx.format), - Chunks = ar_tx:chunk_binary(Mode, ?DATA_CHUNK_SIZE, Data), - ?event(bundler_short, {building_proofs, - {bundle, Task#task.bundle_id}, - {data_size, DataSize}, - {num_chunks, length(Chunks)}}), - SizeTaggedChunks = ar_tx:chunks_to_size_tagged_chunks(Chunks), - SizeTaggedChunkIDs = ar_tx:sized_chunks_to_sized_chunk_ids(SizeTaggedChunks), - {_Root, DataTree} = ar_merkle:generate_tree(SizeTaggedChunkIDs), - % Build proof list - Proofs = lists:filtermap( - fun({Chunk, Offset}) -> - case Chunk of - <<>> -> false; - _ -> - DataPath = ar_merkle:generate_path( - DataRoot, Offset - 1, DataTree), - Proof = #{ - chunk => Chunk, - data_path => DataPath, - offset => Offset - 1, - data_size => DataSize, - data_root => DataRoot - }, - {true, Proof} - end - end, - SizeTaggedChunks - ), - % -1 because the `?event(...)' macro increments the counter by 1. - hb_event:increment(bundler_short, built_proofs, length(Proofs) - 1), - ?event( - bundler_short, - {built_proofs, - {bundle, Task#task.bundle_id}, - {num_proofs, length(Proofs)} - }, - Opts - ), - {ok, Proofs} - catch - _:Err:_Stack -> - ?event(bundler_short, {build_proofs_failed, - format_task(Task), - {error, Err}}), - {error, Err} - end; - -execute_task(#task{type = post_proof, data = Proof, opts = Opts} = Task) -> - #{chunk := Chunk, data_path := DataPath, offset := Offset, - data_size := DataSize, data_root := DataRoot} = Proof, - ?event(bundler_debug, {execute_task, format_task(Task)}), - Request = #{ - <<"chunk">> => hb_util:encode(Chunk), - <<"data_path">> => hb_util:encode(DataPath), - <<"offset">> => integer_to_binary(Offset), - <<"data_size">> => integer_to_binary(DataSize), - <<"data_root">> => hb_util:encode(DataRoot) - }, - try - Serialized = hb_json:encode(Request), - Response = dev_arweave:post_json_chunk(Serialized, Opts), - case Response of - {ok, _} -> {ok, proof_posted}; - {error, Reason} -> {error, Reason} - end - catch - _:Err:_Stack -> - ?event(bundler_short, {post_proof_failed, - format_task(Task), - {error, Err}}), - {error, Err} - end. - -get_price(DataSize, Opts) -> - hb_ao:resolve( - #{ <<"device">> => <<"arweave@2.9">> }, - #{ <<"path">> => <<"/price">>, <<"size">> => DataSize }, - Opts - ). - -get_anchor(Opts) -> - hb_ao:resolve( - #{ <<"device">> => <<"arweave@2.9">> }, - #{ <<"path">> => <<"/tx_anchor">> }, - Opts - ). - -%%%=================================================================== -%%% Tests -%%%=================================================================== - -complete_task_sequence_test() -> - Anchor = rand:bytes(32), - Price = 12345, - {ServerHandle, NodeOpts} = start_mock_gateway(#{ - price => {200, integer_to_binary(Price)}, - tx_anchor => {200, hb_util:encode(Anchor)} - }), - try - Opts = NodeOpts#{ - priv_wallet => hb:wallet(), - store => hb_test_utils:test_store(), - retry_base_delay_ms => 100, - retry_jitter => 0 - }, - hb_http_server:start_node(Opts), - Items = [new_data_item(1, 10, Opts), new_data_item(2, 10, Opts)], - dispatch(Items, Opts), - % Wait for TX to be posted - TXs = hb_mock_server:get_requests(tx, 1, ServerHandle), - ?assertEqual(1, length(TXs)), - % Wait for chunk to be posted - Proofs = hb_mock_server:get_requests(chunk, 1, ServerHandle), - ?assertEqual(1, length(Proofs)), - % Verify dispatcher state - State = get_state(), - ?assertNotEqual(undefined, State), - ?assertNotEqual(timeout, State), - % All workers should be idle - Workers = State#state.workers, - IdleWorkers = [PID || {PID, Status} <- maps:to_list(Workers), Status =:= idle], - ?assertEqual(maps:size(Workers), length(IdleWorkers)), - % Task queue should be empty - Queue = State#state.task_queue, - ?assert(queue:is_empty(Queue)), - % Bundle should be completed and removed - Bundles = State#state.bundles, - ?assertEqual(0, maps:size(Bundles)), - ok - after - cleanup_dispatcher(ServerHandle) - end. - -post_tx_price_failure_retry_test() -> - Anchor = rand:bytes(32), - FailCount = 3, - setup_test_counter(price_attempts_counter), - {ServerHandle, NodeOpts} = start_mock_gateway(#{ - price => fun(_Req) -> - Count = increment_test_counter(price_attempts_counter) - 1, - case Count < FailCount of - true -> {500, <<"error">>}; - false -> {200, <<"12345">>} - end - end, - tx_anchor => {200, hb_util:encode(Anchor)} - }), - try - Opts = NodeOpts#{ - priv_wallet => hb:wallet(), - store => hb_test_utils:test_store(), - retry_base_delay_ms => 50, - retry_jitter => 0 - }, - hb_http_server:start_node(Opts), - Items = [new_data_item(1, 10, Opts)], - dispatch(Items, Opts), - % Wait for TX to eventually be posted - TXs = hb_mock_server:get_requests(tx, 1, ServerHandle), - ?assertEqual(1, length(TXs)), - % Verify it retried multiple times - FinalCount = get_test_counter(price_attempts_counter), - ?assertEqual(FailCount+1, FinalCount), - ok - after - cleanup_test_counter(price_attempts_counter), - cleanup_dispatcher(ServerHandle) - end. - -post_tx_anchor_failure_retry_test() -> - Price = 12345, - FailCount = 3, - setup_test_counter(anchor_attempts_counter), - {ServerHandle, NodeOpts} = start_mock_gateway(#{ - price => {200, integer_to_binary(Price)}, - tx_anchor => fun(_Req) -> - Count = increment_test_counter(anchor_attempts_counter) - 1, - case Count < FailCount of - true -> {500, <<"error">>}; - false -> {200, hb_util:encode(rand:bytes(32))} - end - end - }), - try - Opts = NodeOpts#{ - priv_wallet => hb:wallet(), - store => hb_test_utils:test_store(), - retry_base_delay_ms => 50, - retry_jitter => 0 - }, - hb_http_server:start_node(Opts), - Items = [new_data_item(1, 10, Opts)], - dispatch(Items, Opts), - % Wait for TX to eventually be posted - TXs = hb_mock_server:get_requests(tx, 1, ServerHandle), - ?assertEqual(1, length(TXs)), - % Verify it retried multiple times - FinalCount = get_test_counter(anchor_attempts_counter), - ?assertEqual(FailCount+1, FinalCount), - ok - after - cleanup_test_counter(anchor_attempts_counter), - cleanup_dispatcher(ServerHandle) - end. - -post_tx_post_failure_retry_test() -> - Anchor = rand:bytes(32), - Price = 12345, - FailCount = 4, - setup_test_counter(tx_attempts_counter), - {ServerHandle, NodeOpts} = start_mock_gateway(#{ - price => {200, integer_to_binary(Price)}, - tx_anchor => {200, hb_util:encode(Anchor)}, - tx => fun(_Req) -> - Count = increment_test_counter(tx_attempts_counter) - 1, - case Count < FailCount of - true -> {400, <<"Transaction verification failed">>}; - false -> {200, <<"OK">>} - end - end - }), - try - % Use short retry delays for testing. - Opts = NodeOpts#{ - priv_wallet => hb:wallet(), - store => hb_test_utils:test_store(), - retry_base_delay_ms => 50, - retry_jitter => 0 % Disable jitter for deterministic tests - }, - hb_http_server:start_node(Opts), - Items = [new_data_item(1, 10, Opts)], - dispatch(Items, Opts), - % Wait for TX to eventually succeed - TXs = hb_mock_server:get_requests(tx, FailCount+1, ServerHandle), - ?assertEqual(FailCount+1, length(TXs)), - % Verify final attempt succeeded - FinalCount = get_test_counter(tx_attempts_counter), - ?assertEqual(FailCount+1, FinalCount), - ok - after - cleanup_test_counter(tx_attempts_counter), - cleanup_dispatcher(ServerHandle) - end. - -post_proof_failure_retry_test() -> - Anchor = rand:bytes(32), - Price = 12345, - FailCount = 2, - setup_test_counter(chunk_attempts_counter), - {ServerHandle, NodeOpts} = start_mock_gateway(#{ - price => {200, integer_to_binary(Price)}, - tx_anchor => {200, hb_util:encode(Anchor)}, - chunk => fun(_Req) -> - Count = increment_test_counter(chunk_attempts_counter) - 1, - case Count < FailCount of - true -> {500, <<"error">>}; - false -> {200, <<"OK">>} - end - end - }), - try - Opts = NodeOpts#{ - priv_wallet => hb:wallet(), - store => hb_test_utils:test_store(), - retry_base_delay_ms => 50, - retry_jitter => 0 - }, - hb_http_server:start_node(Opts), - % Large enough for multiple chunks - Items = [new_data_item(1, floor(4.5 * ?DATA_CHUNK_SIZE), Opts)], - dispatch(Items, Opts), - % Wait for TX - TXs = hb_mock_server:get_requests(tx, 1, ServerHandle), - ?assertEqual(1, length(TXs)), - % Wait for chunks to eventually succeed - Chunks = hb_mock_server:get_requests(chunk, FailCount+5, ServerHandle), - ?assertEqual( FailCount+5, length(Chunks)), - % Verify retries happened - FinalCount = get_test_counter(chunk_attempts_counter), - ?assertEqual(FailCount+5, FinalCount), - ok - after - cleanup_test_counter(chunk_attempts_counter), - cleanup_dispatcher(ServerHandle) - end. - -empty_dispatch_test() -> - Opts = #{}, - dispatch([], Opts), - % Should not crash - ok. - -rapid_dispatch_test() -> - Anchor = rand:bytes(32), - Price = 12345, - {ServerHandle, NodeOpts} = start_mock_gateway(#{ - price => {200, integer_to_binary(Price)}, - tx_anchor => {200, hb_util:encode(Anchor)}, - tx => fun(_Req) -> - timer:sleep(100), - {200, <<"OK">>} - end - }), - try - Opts = NodeOpts#{ - priv_wallet => hb:wallet(), - store => hb_test_utils:test_store(), - bundler_workers => 3 - }, - hb_http_server:start_node(Opts), - % Dispatch 10 bundles rapidly - lists:foreach( - fun(I) -> - Items = [new_data_item(I, 10, Opts)], - dispatch(Items, Opts) - end, - lists:seq(1, 10) - ), - - % Wait for all 10 TXs - TXs = hb_mock_server:get_requests(tx, 10, ServerHandle), - ?assertEqual(10, length(TXs)), - ok - after - cleanup_dispatcher(ServerHandle) - end. - -one_bundle_fails_others_continue_test() -> - Anchor = rand:bytes(32), - Price = 12345, - setup_test_counter(mixed_attempts_counter), - {ServerHandle, NodeOpts} = start_mock_gateway(#{ - price => {200, integer_to_binary(Price)}, - tx_anchor => {200, hb_util:encode(Anchor)}, - tx => fun(_Req) -> - % First TX succeeds, all following attempts fail. - Count = increment_test_counter(mixed_attempts_counter) - 1, - case Count of - 0 -> {200, <<"OK">>}; - _ -> {400, <<"fail">>} - end - end - }), - try - % Use short retry delays for testing (100ms base, with exponential backoff) - Opts = NodeOpts#{ - priv_wallet => hb:wallet(), - store => hb_test_utils:test_store(), - retry_base_delay_ms => 100, - retry_jitter => 0 % Disable jitter for deterministic tests - }, - hb_http_server:start_node(Opts), - % Dispatch first bundle (will keep failing) - Items1 = [new_data_item(1, 10, Opts)], - dispatch(Items1, Opts), - % Dispatch second bundle (will succeed) - Items2 = [new_data_item(2, 10, Opts)], - dispatch(Items2, Opts), - % Wait for at least 5 TX attempts (1 success + multiple retries) - TXs = hb_mock_server:get_requests(tx, 5, ServerHandle), - ?assert(length(TXs) >= 5, length(TXs)), - ok - after - cleanup_test_counter(mixed_attempts_counter), - cleanup_dispatcher(ServerHandle) - end. - -parallel_task_execution_test() -> - Anchor = rand:bytes(32), - Price = 12345, - SleepTime = 120, - {ServerHandle, NodeOpts} = start_mock_gateway(#{ - price => {200, integer_to_binary(Price)}, - tx_anchor => {200, hb_util:encode(Anchor)}, - chunk => fun(_Req) -> - timer:sleep(SleepTime), - {200, <<"OK">>} - end - }), - try - Opts = NodeOpts#{ - priv_wallet => hb:wallet(), - store => hb_test_utils:test_store(), - bundler_workers => 5 - }, - hb_http_server:start_node(Opts), - % Dispatch 3 bundles, each with 2 chunks - lists:foreach( - fun(I) -> - Items = [new_data_item(I, 10, Opts)], - dispatch(Items, Opts) - end, - lists:seq(1, 10) - ), - % With 3 workers and 1s delay, 10 chunks should complete in ~2s not 9s - StartTime = erlang:system_time(millisecond), - Chunks = hb_mock_server:get_requests(chunk, 10, ServerHandle), - ElapsedTime = erlang:system_time(millisecond) - StartTime, - ?assertEqual(10, length(Chunks)), - % Should take ~2-3 seconds with parallelism, not 9+ - ?assert(ElapsedTime < 2000, "ElapsedTime: " ++ integer_to_list(ElapsedTime)), - ok - after - cleanup_dispatcher(ServerHandle) - end. - -exponential_backoff_timing_test() -> - Anchor = rand:bytes(32), - Price = 12345, - FailCount = 5, - setup_test_counter(backoff_cap_counter), - {ServerHandle, NodeOpts} = start_mock_gateway(#{ - price => {200, integer_to_binary(Price)}, - tx_anchor => {200, hb_util:encode(Anchor)}, - tx => fun(_Req) -> - Timestamp = erlang:system_time(millisecond), - Attempt = increment_test_counter(backoff_cap_counter), - Count = Attempt - 1, - % Store timestamp by attempt number. - add_test_attempt_timestamp(backoff_cap_counter, Attempt, Timestamp), - case Count < FailCount of - true -> {400, <<"fail">>}; - false -> {200, <<"OK">>} - end - end - }), - try - Opts = NodeOpts#{ - priv_wallet => hb:wallet(), - store => hb_test_utils:test_store(), - retry_base_delay_ms => 100, - retry_max_delay_ms => 500, % Cap at 500ms - retry_jitter => 0 % Disable jitter for deterministic tests - }, - hb_http_server:start_node(Opts), - Items = [new_data_item(1, 10, Opts)], - dispatch(Items, Opts), - % Wait for TX to eventually succeed - TXs = hb_mock_server:get_requests(tx, FailCount+1, ServerHandle, 5000), - ?assertEqual(FailCount+1, length(TXs)), - % Verify backoff respects cap - Timestamps = test_attempt_timestamps(backoff_cap_counter), - ?assertEqual(6, length(Timestamps)), - [T1, T2, T3, T4, T5, T6] = Timestamps, - % Calculate actual delays - Delay1 = T2 - T1, - Delay2 = T3 - T2, - Delay3 = T4 - T3, - Delay4 = T5 - T4, - Delay5 = T6 - T5, - % Expected: ~100ms, ~200ms, ~400ms, ~500ms (capped), ~500ms (capped) - ?assert(Delay1 >= 70 andalso Delay1 =< 200, Delay1), - ?assert(Delay2 >= 150 andalso Delay2 =< 300, Delay2), - ?assert(Delay3 >= 300 andalso Delay3 =< 500, Delay3), - ?assert(Delay4 >= 400 andalso Delay4 =< 700, Delay4), - ?assert(Delay5 >= 400 andalso Delay5 =< 700, Delay5), - ok - after - cleanup_test_counter(backoff_cap_counter), - cleanup_dispatcher(ServerHandle) - end. - -independent_task_retry_counts_test() -> - Anchor = rand:bytes(32), - Price = 12345, - setup_test_counter(independent_retry_counter), - {ServerHandle, NodeOpts} = start_mock_gateway(#{ - price => {200, integer_to_binary(Price)}, - tx_anchor => {200, hb_util:encode(Anchor)}, - tx => fun(_Req) -> - % Use request ordering to distinguish bundles - % First 3 requests are bundle1 (fail, fail, succeed) - % 4th request is bundle2 (succeed) - Count = increment_test_counter(independent_retry_counter) - 1, - case Count < 2 of - true -> {400, <<"fail">>}; % First 2 attempts fail - false -> {200, <<"OK">>} % Rest succeed - end - end - }), - try - Opts = NodeOpts#{ - priv_wallet => hb:wallet(), - store => hb_test_utils:test_store(), - retry_base_delay_ms => 100, - retry_jitter => 0 % Disable jitter for deterministic tests - }, - hb_http_server:start_node(Opts), - % Dispatch first bundle (will fail twice and retry) - Items1 = [new_data_item(1, 10, Opts)], - dispatch(Items1, Opts), - % Wait a bit for first bundle to start failing - hb_mock_server:get_requests(tx, 3, ServerHandle), - % Dispatch second bundle (will succeed on first try since we're past the 2 failures) - Items2 = [new_data_item(2, 10, Opts)], - dispatch(Items2, Opts), - % Verify we got all TX requests logged - TotalAttempts = 4, - TXs = hb_mock_server:get_requests(tx, TotalAttempts, ServerHandle), - ?assertEqual(TotalAttempts, length(TXs)), - ok - after - cleanup_test_counter(independent_retry_counter), - cleanup_dispatcher(ServerHandle) - end. - -recover_bundles_test() -> - Anchor = rand:bytes(32), - Price = 12345, - {ServerHandle, NodeOpts} = start_mock_gateway(#{ - price => {200, integer_to_binary(Price)}, - tx_anchor => {200, hb_util:encode(Anchor)} - }), - try - Opts = NodeOpts#{ - priv_wallet => hb:wallet(), - store => hb_test_utils:test_store() - }, - hb_http_server:start_node(Opts), - % Create some test items - Item1 = new_data_item(1, 10, Opts), - Item2 = new_data_item(2, 10, Opts), - Item3 = new_data_item(3, 10, Opts), - % Write items to cache as unbundled - ok = dev_bundler_cache:write_item(Item1, Opts), - ok = dev_bundler_cache:write_item(Item2, Opts), - ok = dev_bundler_cache:write_item(Item3, Opts), - % Create a bundle TX and cache it with posted status - {ok, TX} = dev_codec_tx:to(lists:reverse([Item1, Item2, Item3]), #{}, #{}), - CommittedTX = hb_message:convert(TX, <<"structured@1.0">>, <<"tx@1.0">>, Opts), - ok = dev_bundler_cache:write_tx(CommittedTX, [Item1, Item2, Item3], Opts), - % Create a second bundle that is already complete (should not be recovered) - Item4 = new_data_item(4, 10, Opts), - ok = dev_bundler_cache:write_item(Item4, Opts), - {ok, TX2} = dev_codec_tx:to(lists:reverse([Item4]), #{}, #{}), - CommittedTX2 = hb_message:convert(TX2, <<"structured@1.0">>, <<"tx@1.0">>, Opts), - ok = dev_bundler_cache:write_tx(CommittedTX2, [Item4], Opts), - ok = dev_bundler_cache:complete_tx(CommittedTX2, Opts), - % Now initialize dispatcher which should recover only the posted bundle - ensure_dispatcher(Opts), - State = get_state(), - % Get the recovered bundle (should only be 1, not the completed one) - ?assertEqual(1, maps:size(State#state.bundles)), - [Bundle] = maps:values(State#state.bundles), - ?assertNotEqual(undefined, Bundle#bundle.start_time), - ?assertEqual(#{}, Bundle#bundle.proofs), - RecoveredItems = [ - hb_message:with_commitments( - #{ <<"commitment-device">> => <<"ans104@1.0">> }, Item, Opts) - || Item <- Bundle#bundle.items], - ?assertEqual( - lists:sort([Item1, Item2, Item3]), - lists:sort(RecoveredItems)), - ?assertEqual(tx_posted, Bundle#bundle.status), - ?assert(hb_message:verify(Bundle#bundle.tx)), - ?assertEqual( - hb_message:id(CommittedTX, signed, Opts), - hb_message:id(Bundle#bundle.tx, signed, Opts)), - ok - after - cleanup_dispatcher(ServerHandle) - end. - -%%% Test Helper Functions - -new_data_item(Index, Size, Opts) -> - Data = rand:bytes(Size), - Tag = <<"tag", (integer_to_binary(Index))/binary>>, - Value = <<"value", (integer_to_binary(Index))/binary>>, - Item = ar_bundles:sign_item( - #tx{ - data = Data, - tags = [{Tag, Value}] - }, - hb:wallet() - ), - hb_message:convert(Item, <<"structured@1.0">>, <<"ans104@1.0">>, Opts). - -start_mock_gateway(Responses) -> - DefaultResponse = {200, <<>>}, - Endpoints = [ - {"/chunk", chunk, maps:get(chunk, Responses, DefaultResponse)}, - {"/tx", tx, maps:get(tx, Responses, DefaultResponse)}, - {"/price/:size", price, maps:get(price, Responses, DefaultResponse)}, - {"/tx_anchor", tx_anchor, maps:get(tx_anchor, Responses, DefaultResponse)} - ], - {ok, MockServer, ServerHandle} = hb_mock_server:start(Endpoints), - NodeOpts = #{ - gateway => MockServer, - routes => [ - #{ - <<"template">> => <<"/arweave">>, - <<"node">> => #{ - <<"match">> => <<"^/arweave">>, - <<"with">> => MockServer, - <<"opts">> => #{http_client => httpc, protocol => http2} - } - } - ] - }, - {ServerHandle, NodeOpts}. - -cleanup_dispatcher(ServerHandle) -> - stop_dispatcher(), - timer:sleep(10), % Ensure dispatcher fully stops - hb_mock_server:stop(ServerHandle). - -setup_test_counter(Table) -> - cleanup_test_counter(Table), - ets:new(Table, [named_table, public, set]), - ok. - -cleanup_test_counter(Table) -> - case ets:info(Table) of - undefined -> ok; - _ -> ets:delete(Table), ok - end. - -increment_test_counter(Table) -> - ets:update_counter(Table, Table, {2, 1}, {Table, 0}). - -get_test_counter(Table) -> - case ets:lookup(Table, Table) of - [{_, Value}] -> Value; - [] -> 0 - end. - -add_test_attempt_timestamp(Table, Attempt, Timestamp) -> - ets:insert(Table, {{Table, Attempt}, Timestamp}). - -test_attempt_timestamps(Table) -> - TimestampEntries = [ - {Attempt, Timestamp} - || {{Prefix1, Attempt}, Timestamp} <- ets:tab2list(Table), - Prefix1 =:= Table - ], - [Timestamp || {_, Timestamp} <- lists:sort(TimestampEntries)]. diff --git a/src/dev_bundler_recovery.erl b/src/dev_bundler_recovery.erl new file mode 100644 index 000000000..5368fd70a --- /dev/null +++ b/src/dev_bundler_recovery.erl @@ -0,0 +1,278 @@ +%%% @doc Logic for handling bundler recocery on node restart. +%%% +%%% When a bundler is running it will cache the state of each uploaded item +%%% or bundle as it move through the bundling and upload process. If the node +%%% is restarted before it can finish including all uploaded items in a bundle, +%%% or finish seeding all bundles in process, the recovery process will ensure +%%% that the data in process is recovered and resumed. +-module(dev_bundler_recovery). +-export([ + recover_unbundled_items/2, + recover_bundles/2 +]). +-include("include/hb.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +%% @doc Spawn a process to recover unbundled items. +recover_unbundled_items(ServerPID, Opts) -> + spawn(fun() -> do_recover_unbundled_items(ServerPID, Opts) end). + +%% @doc Spawn a process to recover in-progress bundles. +recover_bundles(ServerPID, Opts) -> + spawn(fun() -> do_recover_bundles(ServerPID, Opts) end). + +do_recover_unbundled_items(ServerPID, Opts) -> + try + ?event(bundler_short, {recover_unbundled_items_start}), + UnbundledItems = dev_bundler_cache:load_items( + <<>>, + Opts, + fun(ItemID, Item) -> + ?event( + bundler_short, + {recovered_unbundled_item, + {id, {string, ItemID}} + } + ), + ServerPID ! {enqueue_item, Item} + end, + fun(ItemID) -> + ?event( + bundler_short, + {failed_to_recover_unbundled_item, + {id, {string, ItemID}} + } + ) + end + ), + ?event(bundler_short, {recover_unbundled_items_complete, + {count, length(UnbundledItems)}}), + ok + catch + _:Error:Stack -> + ?event( + error, + {recover_unbundled_items_failed, + {error, Error}, + {stack, Stack} + }, + Opts + ) + end. + +do_recover_bundles(ServerPID, Opts) -> + try + BundleStates = dev_bundler_cache:load_bundle_states(Opts), + ?event(bundler_short, {recover_bundles_start, + {count, length(BundleStates)}}), + lists:foreach( + fun({TXID, Status}) -> + recover_bundle(ServerPID, TXID, Status, Opts) + end, + BundleStates + ), + ?event(bundler_short, {recover_bundles_complete, + {count, length(BundleStates)}}), + ok + catch + _:Error:Stack -> + ?event( + error, + {recover_bundles_failed, + {error, Error}, + {stack, Stack} + }, + Opts + ) + end. + +recover_bundle(ServerPID, TXID, Status, Opts) -> + ?event( + bundler_short, + {recovering_bundle, + {tx_id, {explicit, TXID}}, + {status, Status} + } + ), + try + CommittedTX = dev_bundler_cache:load_tx(TXID, Opts), + case CommittedTX of + not_found -> + throw(tx_not_found); + _ -> + Items = dev_bundler_cache:load_items( + TXID, + Opts, + fun(ItemID, _Item) -> + ?event( + bundler_debug, + {loaded_bundle_item, + {tx_id, {explicit, TXID}}, + {item_id, {explicit, ItemID}} + } + ) + end, + fun(ItemID) -> + ?event( + error, + {failed_to_load_bundle_item, + {tx_id, {explicit, TXID}}, + {item_id, {explicit, ItemID}} + }, + Opts + ), + throw({failed_to_load_bundle_item, ItemID}) + end + ), + ServerPID ! {recover_bundle, CommittedTX, Items} + end + catch + _:Error:Stack -> + ?event( + error, + {failed_to_recover_bundle, + {tx_id, {explicit, TXID}}, + {error, Error}, + {stack, Stack} + }, + Opts + ) + end. + +%%%=================================================================== +%%% Tests +%%%=================================================================== + +recover_unbundled_items_test() -> + Opts = #{store => hb_test_utils:test_store()}, + Item1 = new_data_item(1, 10, Opts), + Item2 = new_data_item(2, 10, Opts), + Item3 = new_data_item(3, 10, Opts), + ok = dev_bundler_cache:write_item(Item1, Opts), + ok = dev_bundler_cache:write_item(Item2, Opts), + ok = dev_bundler_cache:write_item(Item3, Opts), + FakeTX = new_bundle_tx([Item2], Opts), + ok = dev_bundler_cache:write_tx(FakeTX, [Item2], Opts), + recover_unbundled_items(self(), Opts), + RecoveredItems = receive_enqueue_items(2), + RecoveredItems1 = normalize_items(RecoveredItems, Opts), + ?assertEqual( + lists:sort([Item1, Item3]), + lists:sort(RecoveredItems1) + ). + +recover_bundles_skips_complete_test() -> + Opts = #{store => hb_test_utils:test_store()}, + Item1 = new_data_item(1, 10, Opts), + Item2 = new_data_item(2, 10, Opts), + Item3 = new_data_item(3, 10, Opts), + ok = dev_bundler_cache:write_item(Item1, Opts), + ok = dev_bundler_cache:write_item(Item2, Opts), + ok = dev_bundler_cache:write_item(Item3, Opts), + PostedTX = new_bundle_tx([Item1, Item2], Opts), + CompletedTX = new_bundle_tx([Item3], Opts), + ok = dev_bundler_cache:write_tx(PostedTX, [Item1, Item2], Opts), + ok = dev_bundler_cache:write_tx(CompletedTX, [Item3], Opts), + ok = dev_bundler_cache:complete_tx(CompletedTX, Opts), + recover_bundles(self(), Opts), + {RecoveredTX, RecoveredItems} = receive_recovered_bundle(), + RecoveredItems1 = normalize_items(RecoveredItems, Opts), + ?assertEqual( + hb_message:id(PostedTX, signed, Opts), + hb_message:id(RecoveredTX, signed, Opts) + ), + ?assertEqual( + lists:sort([Item1, Item2]), + lists:sort(RecoveredItems1) + ), + receive + {recover_bundle, _, _} -> + erlang:error(unexpected_second_recovered_bundle) + after 200 -> + ok + end. + +recover_bundles_failed_bundle_items_continue_test() -> + Opts = #{ + store => hb_test_utils:test_store(), + debug_print => false + }, + ValidItem = new_data_item(1, 10, Opts), + ok = dev_bundler_cache:write_item(ValidItem, Opts), + ValidTX = new_bundle_tx([ValidItem], Opts), + ok = dev_bundler_cache:write_tx(ValidTX, [ValidItem], Opts), + BrokenTX = new_bundle_tx([], Opts), + ok = dev_bundler_cache:write_tx(BrokenTX, [], Opts), + MissingItemID = <<"missing-item">>, + ok = write_missing_item_bundle(MissingItemID, BrokenTX, Opts), + recover_bundles(self(), Opts), + {RecoveredTX, RecoveredItems} = receive_recovered_bundle(), + RecoveredItems1 = normalize_items(RecoveredItems, Opts), + ?assertEqual( + hb_message:id(ValidTX, signed, Opts), + hb_message:id(RecoveredTX, signed, Opts) + ), + ?assertEqual([ValidItem], RecoveredItems1), + receive + {recover_bundle, _, _} -> + erlang:error(unexpected_broken_bundle_recovered) + after 200 -> + ok + end. + +receive_enqueue_items(Count) -> + receive_enqueue_items(Count, []). + +receive_enqueue_items(0, Items) -> + lists:reverse(Items); +receive_enqueue_items(Count, Items) -> + receive + {enqueue_item, Item} -> + receive_enqueue_items(Count - 1, [Item | Items]) + after 1000 -> + erlang:error({missing_enqueue_items, Count}) + end. + +receive_recovered_bundle() -> + receive + {recover_bundle, CommittedTX, Items} -> + {CommittedTX, Items} + after 1000 -> + erlang:error(missing_recovered_bundle) + end. + +normalize_items(Items, Opts) -> + [ + hb_message:with_commitments( + #{ <<"commitment-device">> => <<"ans104@1.0">> }, + Item, + Opts + ) + || Item <- Items + ]. + +write_missing_item_bundle(ItemID, TX, Opts) -> + Store = hb_opts:get(store, no_viable_store, Opts), + Path = hb_store:path(Store, [ + <<"~bundler@1.0">>, + <<"item">>, + ItemID, + <<"bundle">> + ]), + hb_store:write(Store, Path, hb_message:id(TX, signed, Opts)). + +new_data_item(Index, Size, Opts) -> + Tag = <<"tag", (integer_to_binary(Index))/binary>>, + Value = <<"value", (integer_to_binary(Index))/binary>>, + Item = ar_bundles:sign_item( + #tx{ + data = rand:bytes(Size), + tags = [{Tag, Value}] + }, + hb:wallet() + ), + hb_message:convert(Item, <<"structured@1.0">>, <<"ans104@1.0">>, Opts). + +new_bundle_tx(Items, Opts) -> + {ok, TX} = dev_codec_tx:to(lists:reverse(Items), #{}, #{}), + hb_message:convert(TX, <<"structured@1.0">>, <<"tx@1.0">>, Opts). diff --git a/src/dev_bundler_task.erl b/src/dev_bundler_task.erl new file mode 100644 index 000000000..810b5dfa8 --- /dev/null +++ b/src/dev_bundler_task.erl @@ -0,0 +1,212 @@ +%%% @doc Implements the different bundling primitives: +%%% - post_tx: Building and posting an L1 transaction +%%% - build_proofs:Chunking up the bundle data and building the chunk proofs +%%% - post_proof: Seeding teh chunks to the Arweave network +-module(dev_bundler_task). +-export([worker_loop/0, log_task/3, format_timestamp/0]). +-include("include/hb.hrl"). +-include("include/dev_bundler.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +%% @doc Worker loop - executes tasks and reports back to dispatcher. +worker_loop() -> + receive + {execute_task, DispatcherPID, Task} -> + case execute_task(Task) of + {ok, Value} -> + DispatcherPID ! {task_complete, self(), Task, Value}; + {error, Reason} -> + DispatcherPID ! {task_failed, self(), Task, Reason} + end, + + worker_loop(); + stop -> + exit(normal) + end. + +%% @doc Execute a specific task. +execute_task(#task{type = post_tx, data = Items, opts = Opts} = Task) -> + try + ?event(bundler_debug, log_task(executing_task, Task, [])), + % Get price and anchor + {ok, TX} = dev_codec_tx:to(lists:reverse(Items), #{}, #{}), + DataSize = TX#tx.data_size, + PriceResult = get_price(DataSize, Opts), + AnchorResult = get_anchor(Opts), + case {PriceResult, AnchorResult} of + {{ok, Price}, {ok, Anchor}} -> + % Sign the TX + Wallet = hb_opts:get(priv_wallet, no_viable_wallet, Opts), + SignedTX = ar_tx:sign(TX#tx{ anchor = Anchor, reward = Price }, Wallet), + % TODO: as a future improvement we should be able to recover + % from the TX header alone, but we have to be careful about + % how we rebuild the TX data to ensure it matches the already + % posted TX. + Committed = hb_message:convert( + SignedTX, + #{ <<"device">> => <<"structured@1.0">>, <<"bundle">> => true }, + #{ <<"device">> => <<"tx@1.0">>, <<"bundle">> => true }, + Opts), + ?event(bundler_short, log_task(posting_tx, + Task, + [{tx, {explicit, hb_message:id(Committed, signed, Opts)}}] + )), + PostTXResponse = dev_arweave:post_tx_header( + SignedTX, + Opts + ), + ?event(bundler_short, {post_tx_response, PostTXResponse}), + case PostTXResponse of + {ok, _Result} -> + dev_bundler_cache:write_tx( + Committed, + Items, + Opts + ), + {ok, Committed}; + {_, ErrorReason} -> {error, ErrorReason} + end; + {PriceErr, AnchorErr} -> + ?event(bundler_short, + log_task(task_failed, Task, [ + {price, PriceErr}, + {anchor, AnchorErr} + ])), + {error, {PriceErr, AnchorErr}} + end + catch + _:Err:_Stack -> + ?event(bundler_short, log_task(task_failed, Task, [{error, Err}])), + {error, Err} + end; + +execute_task(#task{type = build_proofs, data = CommittedTX, opts = Opts} = Task) -> + try + ?event(bundler_debug, log_task(executing_task, Task, [])), + % Calculate chunks and proofs + TX = hb_message:convert( + CommittedTX, <<"tx@1.0">>, <<"structured@1.0">>, Opts), + Data = TX#tx.data, + DataRoot = TX#tx.data_root, + DataSize = TX#tx.data_size, + Mode = ar_tx:chunking_mode(TX#tx.format), + Chunks = ar_tx:chunk_binary(Mode, ?DATA_CHUNK_SIZE, Data), + ?event(bundler_short, {building_proofs, + {bundle, Task#task.bundle_id}, + {data_size, DataSize}, + {num_chunks, length(Chunks)}}), + SizeTaggedChunks = ar_tx:chunks_to_size_tagged_chunks(Chunks), + SizeTaggedChunkIDs = ar_tx:sized_chunks_to_sized_chunk_ids(SizeTaggedChunks), + {_Root, DataTree} = ar_merkle:generate_tree(SizeTaggedChunkIDs), + % Build proof list + Proofs = lists:filtermap( + fun({Chunk, Offset}) -> + case Chunk of + <<>> -> false; + _ -> + DataPath = ar_merkle:generate_path( + DataRoot, Offset - 1, DataTree), + Proof = #{ + chunk => Chunk, + data_path => DataPath, + offset => Offset - 1, + data_size => DataSize, + data_root => DataRoot + }, + {true, Proof} + end + end, + SizeTaggedChunks + ), + % -1 because the `?event(...)' macro increments the counter by 1. + hb_event:increment(bundler_short, built_proofs, length(Proofs) - 1), + ?event( + bundler_short, + {built_proofs, + {bundle, Task#task.bundle_id}, + {num_proofs, length(Proofs)} + }, + Opts + ), + {ok, Proofs} + catch + _:Err:_Stack -> + ?event(bundler_short, log_task(task_failed, Task, [{error, Err}])), + {error, Err} + end; + +execute_task(#task{type = post_proof, data = Proof, opts = Opts} = Task) -> + #{chunk := Chunk, data_path := DataPath, offset := Offset, + data_size := DataSize, data_root := DataRoot} = Proof, + ?event(bundler_debug, log_task(executing_task, Task, [])), + Request = #{ + <<"chunk">> => hb_util:encode(Chunk), + <<"data_path">> => hb_util:encode(DataPath), + <<"offset">> => integer_to_binary(Offset), + <<"data_size">> => integer_to_binary(DataSize), + <<"data_root">> => hb_util:encode(DataRoot) + }, + try + Serialized = hb_json:encode(Request), + Response = dev_arweave:post_json_chunk(Serialized, Opts), + case Response of + {ok, _} -> {ok, proof_posted}; + {error, Reason} -> {error, Reason} + end + catch + _:Err:_Stack -> + ?event(bundler_short, log_task(task_failed, Task, [{error, Err}])), + {error, Err} + end. + +get_price(DataSize, Opts) -> + hb_ao:resolve( + #{ <<"device">> => <<"arweave@2.9">> }, + #{ <<"path">> => <<"/price">>, <<"size">> => DataSize }, + Opts + ). + +get_anchor(Opts) -> + hb_ao:resolve( + #{ <<"device">> => <<"arweave@2.9">> }, + #{ <<"path">> => <<"/tx_anchor">> }, + Opts + ). + +%%%=================================================================== +%%% Logging +%%%=================================================================== + +%% @doc Return a complete task event tuple for logging. +log_task(Event, Task, ExtraLogs) -> + erlang:list_to_tuple([Event | format_task(Task) ++ ExtraLogs]). + +%% @doc Format a task for logging. +format_task(#task{bundle_id = BundleID, type = post_tx, data = DataItems}) -> + [ + {task_type, post_tx}, + {timestamp, format_timestamp()}, + {bundle, BundleID}, + {num_items, length(DataItems)} + ]; +format_task(#task{bundle_id = BundleID, type = build_proofs, data = CommittedTX}) -> + [ + {task_type, build_proofs}, + {timestamp, format_timestamp()}, + {bundle, BundleID}, + {tx, {explicit, hb_message:id(CommittedTX, signed, #{})}} + ]; +format_task(#task{bundle_id = BundleID, type = post_proof, data = Proof}) -> + Offset = maps:get(offset, Proof), + [ + {task_type, post_proof}, + {timestamp, format_timestamp()}, + {bundle, BundleID}, + {offset, Offset} + ]. + +%% @doc Format erlang:timestamp() as a user-friendly RFC3339 string with milliseconds. +format_timestamp() -> + {MegaSecs, Secs, MicroSecs} = erlang:timestamp(), + Millisecs = (MegaSecs * 1000000 + Secs) * 1000 + (MicroSecs div 1000), + calendar:system_time_to_rfc3339(Millisecs, [{unit, millisecond}, {offset, "Z"}]). \ No newline at end of file diff --git a/src/include/dev_bundler.hrl b/src/include/dev_bundler.hrl new file mode 100644 index 000000000..c601785e0 --- /dev/null +++ b/src/include/dev_bundler.hrl @@ -0,0 +1,40 @@ +%%% Shared state and task records for the bundler server and workers. + +-record(state, { + max_size, + max_idle_time, + max_items, + queue, + bytes, + workers, + task_queue, + bundles, + opts +}). + +-record(task, { + bundle_id, + type, + data, + opts, + retry_count = 0 +}). + +-record(proof, { + proof, + status +}). + +-record(bundle, { + id, + items, + status, + tx, + proofs, + start_time +}). + +-define(DEFAULT_NUM_WORKERS, 20). +-define(DEFAULT_RETRY_BASE_DELAY_MS, 1000). +-define(DEFAULT_RETRY_MAX_DELAY_MS, 600000). +-define(DEFAULT_RETRY_JITTER, 0.25). diff --git a/test/arbundles.js/upload-items.js b/test/arbundles.js/upload-items.js index f5e629f51..9bc63d475 100644 --- a/test/arbundles.js/upload-items.js +++ b/test/arbundles.js/upload-items.js @@ -4,11 +4,11 @@ const { ArweaveSigner, createData } = require("@dha-team/arbundles"); // Configuration const BUNDLER_URL = "http://localhost:8734"; -const WALLET_PATH = "../../hyperbeam-key.json"; +const DEFAULT_WALLET = "../../hyperbeam-key.json"; const CONCURRENT_UPLOADS = 100; // Number of parallel uploads -async function performanceTest(itemCount, bytesPerItem = 0) { - const wallet = require(WALLET_PATH); +async function performanceTest(walletPath, itemCount, bytesPerItem = 0) { + const wallet = require(path.resolve(walletPath)); const signer = new ArweaveSigner(wallet); const endpoint = `${BUNDLER_URL}/~bundler@1.0/item?codec-device=ans104@1.0`; @@ -68,6 +68,7 @@ async function performanceTest(itemCount, bytesPerItem = 0) { const uploadPromises = batch.map(async (item) => { try { + console.log(`Posting data item: ${item.id}`); const response = await fetch(endpoint, { method: "POST", headers: { @@ -132,24 +133,28 @@ async function performanceTest(itemCount, bytesPerItem = 0) { // Main execution if (require.main === module) { - const itemCount = parseInt(process.argv[2], 10); - const bytesPerItem = parseInt(process.argv[3], 10) || 0; - + // If the first arg looks like a number, treat it as itemCount and use the default wallet + const firstIsNumber = !isNaN(parseInt(process.argv[2], 10)); + const walletPath = firstIsNumber ? DEFAULT_WALLET : (process.argv[2] || DEFAULT_WALLET); + const itemCount = parseInt(firstIsNumber ? process.argv[2] : process.argv[3], 10); + const bytesPerItem = parseInt(firstIsNumber ? process.argv[3] : process.argv[4], 10) || 0; + if (!itemCount || itemCount < 1 || isNaN(itemCount)) { - console.error("Usage: node upload-items.js [bytes_per_item]"); + console.error("Usage: node upload-items.js [wallet_path] [bytes_per_item]"); console.error(""); console.error("Arguments:"); + console.error(" wallet_path - Path to Arweave wallet JSON (default: ../../hyperbeam-key.json)"); console.error(" number_of_items - Number of data items to create and upload"); console.error(" bytes_per_item - Minimum size of each item in bytes (optional)"); console.error(""); console.error("Examples:"); console.error(" node upload-items.js 100"); - console.error(" node upload-items.js 100 1024 # 100 items, ~1KB each"); - console.error(" node upload-items.js 50 10485760 # 50 items, ~10MB each"); + console.error(" node upload-items.js 100 1024"); + console.error(" node upload-items.js /path/to/wallet.json 100 1024"); process.exit(1); } - performanceTest(itemCount, bytesPerItem) + performanceTest(walletPath, itemCount, bytesPerItem) .then(() => { process.exit(0); })