Skip to content

Commit

Permalink
Replace infinity timeouts with 10m timeouts in gen_server calls
Browse files Browse the repository at this point in the history
  • Loading branch information
shizzard committed Feb 27, 2025
1 parent 680ecb4 commit 77d8c35
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 93 deletions.
10 changes: 5 additions & 5 deletions apps/arweave/src/ar_coordination.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ send_h1_batch_to_peer() ->

%% @doc Compute h2 for a remote peer
compute_h2_for_peer(Peer, Candidate) ->
gen_server:cast(?MODULE, {compute_h2_for_peer,
gen_server:cast(?MODULE, {compute_h2_for_peer,
Candidate#mining_candidate{ cm_lead_peer = Peer }}).

computed_h2_for_peer(Candidate) ->
Expand Down Expand Up @@ -155,15 +155,15 @@ get_self_plus_external_partitions_list() ->
%% {pdiff, PackingDifficulty}
%% ]}
get_cluster_partitions_list() ->
gen_server:call(?MODULE, get_cluster_partitions_list, infinity).
gen_server:call(?MODULE, get_cluster_partitions_list, 600000).

%%%===================================================================
%%% Generic server callbacks.
%%%===================================================================

init([]) ->
{ok, Config} = application:get_env(arweave, config),

ar_util:cast_after(?BATCH_POLL_INTERVAL_MS, ?MODULE, check_batches),
State = #state{
last_peer_response = #{}
Expand Down Expand Up @@ -363,7 +363,7 @@ terminate(_Reason, _State) ->
%% @doc Return the list of the partitions of the given Peer, to the best
%% of our knowledge.
get_peer_partitions(Peer) ->
gen_server:call(?MODULE, {get_peer_partitions, Peer}, infinity).
gen_server:call(?MODULE, {get_peer_partitions, Peer}, 600000).

check_out_batches(#state{out_batches = OutBatches}) when map_size(OutBatches) == 0 ->
OutBatches;
Expand Down Expand Up @@ -460,7 +460,7 @@ remove_mining_peer(Peer, State) ->

refetch_peer_partitions(Peers) ->
spawn(fun() ->

ar_util:pmap(
fun(Peer) ->
case ar_http_iface_client:get_cm_partition_table(Peer) of
Expand Down
20 changes: 10 additions & 10 deletions apps/arweave/src/ar_data_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

-behaviour(gen_server).

-export([name/1, start_link/2, register_workers/0, join/1, add_tip_block/2, add_block/2,
-export([name/1, start_link/2, register_workers/0, join/1, add_tip_block/2, add_block/2,
invalidate_bad_data_record/4, is_chunk_proof_ratio_attractive/3,
add_chunk/5, add_data_root_to_disk_pool/3, maybe_drop_data_root_from_disk_pool/3,
get_chunk/2, get_chunk_data/2, get_chunk_proof/2, get_tx_data/1, get_tx_data/2,
Expand Down Expand Up @@ -270,7 +270,7 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) ->
-spec put_chunk_data(
ChunkDataKey :: binary(),
StoreID :: term(),
Value :: DataPath :: binary() | {Chunk :: binary(), DataPath :: binary()}) ->
Value :: DataPath :: binary() | {Chunk :: binary(), DataPath :: binary()}) ->
ok | {error, term()}.
put_chunk_data(ChunkDataKey, StoreID, Value) ->
ar_kv:put({chunk_data_db, StoreID}, ChunkDataKey, term_to_binary(Value)).
Expand All @@ -285,7 +285,7 @@ delete_chunk_data(ChunkDataKey, StoreID) ->
AbsoluteOffset :: non_neg_integer(),
StoreID :: term(),
Metadata :: term()) -> ok | {error, term()}.
put_chunk_metadata(AbsoluteOffset, StoreID,
put_chunk_metadata(AbsoluteOffset, StoreID,
{_ChunkDataKey, _TXRoot, _DataRoot, _TXPath, _Offset, _ChunkSize} = Metadata) ->
Key = << AbsoluteOffset:?OFFSET_KEY_BITSIZE >>,
ar_kv:put({chunks_index, StoreID}, Key, term_to_binary(Metadata)).
Expand Down Expand Up @@ -550,7 +550,7 @@ has_data_root(DataRoot, DataSize) ->

%% @doc Record the metadata of the given block.
add_block(B, SizeTaggedTXs) ->
gen_server:call(ar_data_sync_default, {add_block, B, SizeTaggedTXs}, infinity).
gen_server:call(ar_data_sync_default, {add_block, B, SizeTaggedTXs}, 600000).

%% @doc Request the removal of the transaction data.
request_tx_data_removal(TXID, Ref, ReplyTo) ->
Expand Down Expand Up @@ -622,7 +622,7 @@ get_chunk_by_byte(Byte, StoreID) ->

%% @doc: handle situation where get_chunks_by_byte returns invalid_iterator, so we can't
%% use the chunk's end offset to advance the cursor.
%%
%%
%% get_chunk_by_byte looks for a key with the same prefix or the next prefix.
%% Therefore, if there is no such key, it does not make sense to look for any
%% key smaller than the prefix + 2 in the next iteration.
Expand Down Expand Up @@ -887,7 +887,7 @@ handle_cast({add_tip_block, BlockTXPairs, BI}, State) ->
State#sync_data_state{
weave_size = WeaveSize,
block_index = BI,
disk_pool_threshold = DiskPoolThreshold
disk_pool_threshold = DiskPoolThreshold
}),
{noreply, State2};

Expand Down Expand Up @@ -1109,7 +1109,7 @@ handle_cast({store_chunk, ChunkArgs, Args} = Cast,
end;

handle_cast({store_fetched_chunk, Peer, Byte, Proof} = Cast, State) ->
{store_fetched_chunk, Peer, Byte, Proof} = Cast,
{store_fetched_chunk, Peer, Byte, Proof} = Cast,
#{ data_path := DataPath, tx_path := TXPath, chunk := Chunk, packing := Packing } = Proof,
SeekByte = get_chunk_seek_offset(Byte + 1) - 1,
{BlockStartOffset, BlockEndOffset, TXRoot} = ar_block_index:get_block_bounds(SeekByte),
Expand Down Expand Up @@ -1849,7 +1849,7 @@ read_chunk_with_metadata(
{storeID, StoreID},
{modules_covering_seek_offset, ModuleIDs},
{root_sync_records, RootRecords},
{stored_packing,
{stored_packing,
ar_serialize:encode_packing(StoredPacking, true)}]),
%% The chunk should have been re-packed
%% in the meantime - very unlucky timing.
Expand Down Expand Up @@ -3191,7 +3191,7 @@ get_required_chunk_packing(_Offset, _ChunkSize, #sync_data_state{ store_id = "de
unpacked;
get_required_chunk_packing(Offset, ChunkSize, State) ->
#sync_data_state{ store_id = StoreID } = State,
IsEarlySmallChunk =
IsEarlySmallChunk =
Offset =< ?STRICT_DATA_SPLIT_THRESHOLD andalso ChunkSize < ?DATA_CHUNK_SIZE,
case IsEarlySmallChunk of
true ->
Expand Down Expand Up @@ -3284,7 +3284,7 @@ parse_disk_pool_chunk(Bin) ->
end.

delete_disk_pool_chunk(Iterator, Args, State) ->
#sync_data_state{
#sync_data_state{
disk_pool_chunks_index = DiskPoolChunksIndex, store_id = StoreID } = State,
{Offset, _, ChunkSize, _, _, ChunkDataKey, DiskPoolKey, _, _, _} = Args,
case data_root_index_next_v2(Iterator, 10) of
Expand Down
6 changes: 3 additions & 3 deletions apps/arweave/src/ar_disksup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ get_disk_space_check_frequency() ->
Config#config.disk_space_check_frequency.

get_disk_data() ->
gen_server:call(?MODULE, get_disk_data, infinity).
gen_server:call(?MODULE, get_disk_data, 600000).

pause() ->
gen_server:call(?MODULE, pause, infinity).
gen_server:call(?MODULE, pause, 600000).

resume() ->
gen_server:call(?MODULE, resume, infinity).
gen_server:call(?MODULE, resume, 600000).

%%%===================================================================
%%% Generic server callbacks.
Expand Down
8 changes: 4 additions & 4 deletions apps/arweave/src/ar_entropy_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ store_entropy(
Entropies, BucketEndOffset2, RangeStart, RangeEnd, Keys, RewardAddr}).

is_ready(StoreID) ->
case catch gen_server:call(name(StoreID), is_ready, infinity) of
case catch gen_server:call(name(StoreID), is_ready, 600000) of
{'EXIT', {Reason, {gen_server, call, _}}} ->
?LOG_WARNING([{event, is_ready_error}, {module, ?MODULE},
{name, name(StoreID)}, {store_id, StoreID}, {reason, Reason}]),
Expand Down Expand Up @@ -378,7 +378,7 @@ reset_entropy_offset(BucketEndOffset) ->
%% End sanity checks
SliceIndex = ar_replica_2_9:get_slice_index(BucketEndOffset),
shift_entropy_offset(BucketEndOffset, -SliceIndex).

%% @doc Take the first slice of each entropy and combine into a single binary. This binary
%% can be used to encipher a single chunk.
-spec take_and_combine_entropy_slices(Entropies :: [binary()]) ->
Expand Down Expand Up @@ -561,7 +561,7 @@ test_replica_2_9() ->
after
ok = application:set_env(arweave, config, Config)
end.


assert_get(Expected, Offset) ->
assert_get(Expected, Offset, "default").
Expand All @@ -586,4 +586,4 @@ assert_get(Expected, Offset, StoreID) ->
?assertEqual(ExpectedResult,
ar_chunk_storage:get(Offset - ?DATA_CHUNK_SIZE div 2 - 1, StoreID)),
?assertEqual(ExpectedResult,
ar_chunk_storage:get(Offset - ?DATA_CHUNK_SIZE div 3, StoreID)).
ar_chunk_storage:get(Offset - ?DATA_CHUNK_SIZE div 3, StoreID)).
42 changes: 21 additions & 21 deletions apps/arweave/src/ar_http_iface_middleware.erl
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ handle(<<"GET">>, [<<"recent">>], Req, _Pid) ->
true ->
{200, #{}, ar_serialize:jsonify(ar_info:get_recent()), Req}
end;

handle(<<"GET">>, [<<"is_tx_blacklisted">>, EncodedTXID], Req, _Pid) ->
case ar_util:safe_decode(EncodedTXID) of
{error, invalid} ->
Expand Down Expand Up @@ -304,7 +304,7 @@ handle(<<"GET">>, [<<"sync_buckets">>], Req, _Pid) ->
false ->
not_joined(Req);
true ->
ok = ar_semaphore:acquire(get_sync_record, infinity),
ok = ar_semaphore:acquire(get_sync_record, 600000),
case ar_global_sync_record:get_serialized_sync_buckets() of
{ok, Binary} ->
{200, #{}, Binary, Req};
Expand All @@ -327,7 +327,7 @@ handle(<<"GET">>, [<<"data_sync_record">>], Req, _Pid) ->
_ ->
etf
end,
ok = ar_semaphore:acquire(get_sync_record, infinity),
ok = ar_semaphore:acquire(get_sync_record, 600000),
Options = #{ format => Format, random_subset => true },
case ar_global_sync_record:get_serialized_sync_record(Options) of
{ok, Binary} ->
Expand All @@ -350,7 +350,7 @@ handle(<<"GET">>, [<<"data_sync_record">>, EncodedStart, EncodedLimit], Req, _Pi
true ->
{400, #{}, jiffy:encode(#{ error => limit_too_big }), Req};
false ->
ok = ar_semaphore:acquire(get_sync_record, infinity),
ok = ar_semaphore:acquire(get_sync_record, 600000),
handle_get_data_sync_record(Start, Limit, Req)
end
end
Expand Down Expand Up @@ -847,7 +847,7 @@ handle(<<"GET">>, [<<"reward_history">>, EncodedBH], Req, _Pid) ->
false ->
not_joined(Req);
true ->
ok = ar_semaphore:acquire(get_reward_history, infinity),
ok = ar_semaphore:acquire(get_reward_history, 600000),
case ar_util:safe_decode(EncodedBH) of
{ok, BH} ->
Fork_2_6 = ar_fork:height_2_6(),
Expand Down Expand Up @@ -896,7 +896,7 @@ handle(<<"GET">>, [<<"hash_list">>], Req, _Pid) ->
handle(<<"GET">>, [<<"block_index">>], Req, _Pid);

handle(<<"GET">>, [<<"block_index">>], Req, _Pid) ->
ok = ar_semaphore:acquire(get_block_index, infinity),
ok = ar_semaphore:acquire(get_block_index, 600000),
case ar_node:is_joined() of
false ->
not_joined(Req);
Expand All @@ -919,7 +919,7 @@ handle(<<"GET">>, [<<"block_index">>], Req, _Pid) ->
%% Return the current binary-encoded block index held by the node.
%% GET request to endpoint /block_index2.
handle(<<"GET">>, [<<"block_index2">>], Req, _Pid) ->
ok = ar_semaphore:acquire(get_block_index, infinity),
ok = ar_semaphore:acquire(get_block_index, 600000),
case ar_node:is_joined() of
false ->
not_joined(Req);
Expand All @@ -945,7 +945,7 @@ handle(<<"GET">>, [<<"block_index2">>, From, To], Req, _Pid) ->
handle(<<"GET">>, [<<"block_index">>, From, To], Req, _Pid);

handle(<<"GET">>, [<<"block_index">>, From, To], Req, _Pid) ->
ok = ar_semaphore:acquire(get_block_index, infinity),
ok = ar_semaphore:acquire(get_block_index, 600000),
case ar_node:is_joined() of
false ->
not_joined(Req);
Expand Down Expand Up @@ -1018,7 +1018,7 @@ handle(<<"GET">>, [<<"total_supply">>], Req, _Pid) ->
false ->
not_joined(Req);
true ->
ok = ar_semaphore:acquire(get_wallet_list, infinity),
ok = ar_semaphore:acquire(get_wallet_list, 600000),
B = ar_node:get_current_block(),
TotalSupply = get_total_supply(B#block.wallet_list, first, 0,
B#block.denomination),
Expand Down Expand Up @@ -1185,7 +1185,7 @@ handle(<<"GET">>, [<<"block">>, Type, ID, Field], Req, _Pid)
%% Return the balance of the given wallet at the given block.
handle(<<"GET">>, [<<"block">>, <<"height">>, Height, <<"wallet">>, Addr, <<"balance">>], Req,
_Pid) ->
ok = ar_semaphore:acquire(get_wallet_list, infinity),
ok = ar_semaphore:acquire(get_wallet_list, 600000),
handle_get_block_wallet_balance(Height, Addr, Req);

%% Return the current block.
Expand Down Expand Up @@ -1599,7 +1599,7 @@ handle_get_tx(Hash, Req, Encoding) ->
{error, invalid} ->
{400, #{}, <<"Invalid hash.">>, Req};
{ok, ID} ->
ok = ar_semaphore:acquire(get_tx, infinity),
ok = ar_semaphore:acquire(get_tx, 600000),
case ar_storage:read_tx(ID) of
unavailable ->
maybe_tx_is_pending_response(ID, Req);
Expand Down Expand Up @@ -1657,7 +1657,7 @@ serve_tx_data(Req, #tx{ format = 2, id = ID, data_size = DataSize } = TX) ->
true ->
{200, #{}, sendfile(DataFilename), Req};
false ->
ok = ar_semaphore:acquire(get_tx_data, infinity),
ok = ar_semaphore:acquire(get_tx_data, 600000),
case ar_data_sync:get_tx_data(ID) of
{ok, Data} ->
{200, #{}, ar_util:encode(Data), Req};
Expand Down Expand Up @@ -1691,7 +1691,7 @@ serve_format_2_html_data(Req, ContentType, TX) ->
{ok, Data} ->
{200, #{ <<"content-type">> => ContentType }, Data, Req};
{error, enoent} ->
ok = ar_semaphore:acquire(get_tx_data, infinity),
ok = ar_semaphore:acquire(get_tx_data, 600000),
case ar_data_sync:get_tx_data(TX#tx.id) of
{ok, Data} ->
{200, #{ <<"content-type">> => ContentType }, Data, Req};
Expand Down Expand Up @@ -2011,14 +2011,14 @@ handle_get_chunk(OffsetBinary, Req, Encoding) ->
%% Chunk is recorded but packing is unknown.
{none, {reply, {404, #{}, <<>>, Req}}};
{{true, RequestedPacking}, _StoreID} ->
ok = ar_semaphore:acquire(get_chunk, infinity),
ok = ar_semaphore:acquire(get_chunk, 600000),
{RequestedPacking, ok};
{{true, {replica_2_9, _}}, _StoreID} when ?BLOCK_2_9_SYNCING ->
%% Don't serve replica 2.9 chunks as they are expensive to
%% unpack.
{none, {reply, {404, #{}, <<>>, Req}}};
{{true, Packing}, _StoreID} when RequestedPacking == any ->
ok = ar_semaphore:acquire(get_chunk, infinity),
ok = ar_semaphore:acquire(get_chunk, 600000),
{Packing, ok};
{{true, _}, _StoreID} ->
{ok, Config} = application:get_env(arweave, config),
Expand All @@ -2027,7 +2027,7 @@ handle_get_chunk(OffsetBinary, Req, Encoding) ->
{none, {reply, {404, #{}, <<>>, Req}}};
true ->
ok = ar_semaphore:acquire(get_and_pack_chunk,
infinity),
600000),
{RequestedPacking, ok}
end
end,
Expand Down Expand Up @@ -2064,9 +2064,9 @@ handle_get_chunk(OffsetBinary, Req, Encoding) ->
not_joined(Req);
{error, Error} ->
?LOG_ERROR([{event, get_chunk_error}, {offset, Offset},
{requested_packing,
{requested_packing,
ar_serialize:encode_packing(RequestedPacking, false)},
{read_packing,
{read_packing,
ar_serialize:encode_packing(ReadPacking, false)},
{error, Error}]),
{500, #{}, <<>>, Req}
Expand Down Expand Up @@ -2101,7 +2101,7 @@ handle_get_chunk_proof2(Offset, Req, Encoding) ->
_ ->
true
end,
ok = ar_semaphore:acquire(get_chunk, infinity),
ok = ar_semaphore:acquire(get_chunk, 600000),
CheckRecords =
case ar_sync_record:is_recorded(Offset, ar_data_sync) of
false ->
Expand Down Expand Up @@ -2722,7 +2722,7 @@ process_request(get_block, [Type, ID, <<"hash_list">>], Req) ->
unavailable ->
{404, #{}, <<"Not Found.">>, Req};
B ->
ok = ar_semaphore:acquire(get_block_index, infinity),
ok = ar_semaphore:acquire(get_block_index, 600000),
case ar_node:get_height() >= ar_fork:height_2_6() of
true ->
{400, #{}, jiffy:encode(#{ error => not_supported_since_fork_2_6 }), Req};
Expand All @@ -2748,7 +2748,7 @@ process_request(get_block, [Type, ID, <<"wallet_list">>], Req) ->
jiffy:encode(#{ error => does_not_serve_blocks_after_2_2_fork }),
Req};
{true, _} ->
ok = ar_semaphore:acquire(get_wallet_list, infinity),
ok = ar_semaphore:acquire(get_wallet_list, 600000),
case ar_storage:read_wallet_list(B#block.wallet_list) of
{ok, Tree} ->
{200, #{}, ar_serialize:jsonify(
Expand Down
6 changes: 3 additions & 3 deletions apps/arweave/src/ar_kv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ create_ets() ->
ets:new(?MODULE, [set, public, named_table, {keypos, #db.name}]).



%% @doc Open a key-value store located at the given filesystem path relative to
%% the data directory and identified by the given Name.
open(DataDirRelativePath, Name) ->
Expand All @@ -91,15 +91,15 @@ open(DataDirRelativePath, Name) ->
%% @doc Open a key-value store with the given options located at the given filesystem path
%% relative to the data directory and identified by the given Name.
open(DataDirRelativePath, UserOptions, Name) ->
gen_server:call(?MODULE, {open, {DataDirRelativePath, UserOptions, Name}}, infinity).
gen_server:call(?MODULE, {open, {DataDirRelativePath, UserOptions, Name}}, 600000).



%% @doc Open a key-value store with the column families located at the given filesystem path
%% relative to the data directory and identified by the given Name.
open(DataDirRelativePath, CfDescriptors, UserOptions, CfNames) ->
gen_server:call(
?MODULE, {open, {DataDirRelativePath, CfDescriptors, UserOptions, CfNames}}, infinity
?MODULE, {open, {DataDirRelativePath, CfDescriptors, UserOptions, CfNames}}, 600000
).


Expand Down
Loading

0 comments on commit 77d8c35

Please sign in to comment.