diff --git a/apps/arweave/include/ar.hrl b/apps/arweave/include/ar.hrl index d9b6db566..1146f69c9 100644 --- a/apps/arweave/include/ar.hrl +++ b/apps/arweave/include/ar.hrl @@ -7,8 +7,6 @@ %% (e.g. bin/test or bin/shell) -define(IS_TEST, erlang:get_cookie() == test). --define(DATA_SIZE(Term), erlang:byte_size(term_to_binary(Term))). - %% The mainnet name. Does not change at the hard forks. -ifndef(NETWORK_NAME). -ifdef(DEBUG). diff --git a/apps/arweave/include/ar_config.hrl b/apps/arweave/include/ar_config.hrl index eae1f3d00..1f57f5b05 100644 --- a/apps/arweave/include/ar_config.hrl +++ b/apps/arweave/include/ar_config.hrl @@ -191,6 +191,7 @@ = ?DEFAULT_MAX_NONCE_LIMITER_LAST_STEP_VALIDATION_THREAD_COUNT, nonce_limiter_server_trusted_peers = [], nonce_limiter_client_peers = [], + p3_server_peers = [], debug = false, repair_rocksdb = [], run_defragmentation = false, diff --git a/apps/arweave/include/ar_peers.hrl b/apps/arweave/include/ar_peers.hrl new file mode 100644 index 000000000..0d2c2200f --- /dev/null +++ b/apps/arweave/include/ar_peers.hrl @@ -0,0 +1,21 @@ +-ifndef(AR_PEERS_HRL). +-define(AR_PEERS_HRL, true). + +-include_lib("ar.hrl"). + +%% factor to scale the average throughput by when rating gossiped data - lower is better +-define(GOSSIP_ADVANTAGE, 0.5). + +-record(performance, { + version = 3, + release = -1, + average_bytes = 0.0, + total_bytes = 0, + average_latency = 0.0, + total_latency = 0.0, + transfers = 0, + average_success = 1.0, + rating = 0 +}). + +-endif. \ No newline at end of file diff --git a/apps/arweave/src/ar.erl b/apps/arweave/src/ar.erl index a0e26b932..8d7d5c056 100644 --- a/apps/arweave/src/ar.erl +++ b/apps/arweave/src/ar.erl @@ -541,6 +541,9 @@ parse_cli_args(["vdf_server_trusted_peer", Peer | Rest], C) -> parse_cli_args(["vdf_client_peer", RawPeer | Rest], C = #config{ nonce_limiter_client_peers = Peers }) -> parse_cli_args(Rest, C#config{ nonce_limiter_client_peers = [RawPeer | Peers] }); +parse_cli_args(["p3_server_peer", RawPeer | Rest], + C = #config{ p3_server_peers = Peers }) -> + parse_cli_args(Rest, C#config{ p3_server_peers = [RawPeer | Peers] }); parse_cli_args(["debug" | Rest], C) -> parse_cli_args(Rest, C#config{ debug = true }); parse_cli_args(["repair_rocksdb", Path | Rest], #config{ repair_rocksdb = L } = C) -> diff --git a/apps/arweave/src/ar_blacklist_middleware.erl b/apps/arweave/src/ar_blacklist_middleware.erl index 86a93220f..84d86273f 100644 --- a/apps/arweave/src/ar_blacklist_middleware.erl +++ b/apps/arweave/src/ar_blacklist_middleware.erl @@ -45,7 +45,6 @@ start() -> ban_peer(Peer, TTLSeconds) -> Key = {ban, peer_to_ip_addr(Peer)}, Expires = os:system_time(seconds) + TTLSeconds, - ar_events:send(peer, {banned, Peer}), ets:insert(?MODULE, {Key, Expires}). is_peer_banned(Peer) -> diff --git a/apps/arweave/src/ar_block_pre_validator.erl b/apps/arweave/src/ar_block_pre_validator.erl index ce534398f..a8d3b806a 100644 --- a/apps/arweave/src/ar_block_pre_validator.erl +++ b/apps/arweave/src/ar_block_pre_validator.erl @@ -2,7 +2,7 @@ -behaviour(gen_server). --export([start_link/2, pre_validate/5]). +-export([start_link/2, pre_validate/4]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -39,14 +39,21 @@ start_link(Name, Workers) -> %% is called. Afterwards, the block is put in a limited-size priority queue. %% Bigger-height blocks from better-rated peers have higher priority. Additionally, %% the processing is throttled by IP and solution hash. -pre_validate(B, Peer, Timestamp, ReadBodyTime, BodySize) -> +%% Returns: ok, invalid, skipped +pre_validate(B, Peer, QueryBlockTime, ReceiveTimestamp) -> #block{ indep_hash = H } = B, - case ar_ignore_registry:member(H) of + ValidationStatus = case ar_ignore_registry:member(H) of true -> - ok; + skipped; false -> - pre_validate_is_peer_banned(B, Peer, Timestamp, ReadBodyTime, BodySize) - end. + pre_validate_is_peer_banned(B, Peer, QueryBlockTime) + end, + case ValidationStatus of + ok -> record_block_pre_validation_time(ReceiveTimestamp); + _ -> ok + end, + ValidationStatus. + %%%=================================================================== %%% gen_server callbacks. @@ -71,9 +78,10 @@ handle_cast(pre_validate, #state{ pqueue = Q, size = Size, ip_timestamps = IPTim ar_util:cast_after(50, ?MODULE, pre_validate), {noreply, State}; false -> - {{_, {B, PrevB, SolutionResigned, Peer, Timestamp, ReadBodyTime, BodySize}}, + {{_, {B, PrevB, SolutionResigned, Peer, QueryBlockTime}}, Q2} = gb_sets:take_largest(Q), - Size2 = Size - BodySize, + BlockSize = byte_size(term_to_binary(B)), + Size2 = Size - BlockSize, case ar_ignore_registry:permanent_member(B#block.indep_hash) of true -> gen_server:cast(?MODULE, pre_validate), @@ -102,8 +110,7 @@ handle_cast(pre_validate, #state{ pqueue = Q, size = Size, ip_timestamps = IPTim {previous_block, ar_util:encode(PrevB#block.indep_hash)}]), pre_validate_nonce_limiter_seed_data(B, PrevB, - SolutionResigned, Peer, Timestamp, - ReadBodyTime, BodySize), + SolutionResigned, Peer, QueryBlockTime), {IPTimestamps2, HashTimestamps2}; false -> {IPTimestamps2, HashTimestamps} @@ -115,13 +122,13 @@ handle_cast(pre_validate, #state{ pqueue = Q, size = Size, ip_timestamps = IPTim end end; -handle_cast({enqueue, {B, PrevB, SolutionResigned, Peer, Timestamp, ReadBodyTime, BodySize}}, +handle_cast({enqueue, {B, PrevB, SolutionResigned, Peer, QueryBlockTime}}, State) -> #state{ pqueue = Q, size = Size } = State, Priority = priority(B, Peer), - Size2 = Size + BodySize, - Q2 = gb_sets:add_element({Priority, {B, PrevB, SolutionResigned, Peer, Timestamp, - ReadBodyTime, BodySize}}, Q), + BlockSize = byte_size(term_to_binary(B)), + Size2 = Size + BlockSize, + Q2 = gb_sets:add_element({Priority, {B, PrevB, SolutionResigned, Peer, QueryBlockTime}}, Q), {Q3, Size3} = case Size2 > ?MAX_PRE_VALIDATION_QUEUE_SIZE of true -> @@ -177,15 +184,15 @@ terminate(_Reason, _State) -> %%% Private functions. %%%=================================================================== -pre_validate_is_peer_banned(B, Peer, Timestamp, ReadBodyTime, BodySize) -> +pre_validate_is_peer_banned(B, Peer, QueryBlockTime) -> case ar_blacklist_middleware:is_peer_banned(Peer) of not_banned -> - pre_validate_previous_block(B, Peer, Timestamp, ReadBodyTime, BodySize); + pre_validate_previous_block(B, Peer, QueryBlockTime); banned -> - ok + skipped end. -pre_validate_previous_block(B, Peer, Timestamp, ReadBodyTime, BodySize) -> +pre_validate_previous_block(B, Peer, QueryBlockTime) -> PrevH = B#block.previous_block, case ar_node:get_block_shadow_from_cache(PrevH) of not_found -> @@ -193,73 +200,67 @@ pre_validate_previous_block(B, Peer, Timestamp, ReadBodyTime, BodySize) -> %% successive blocks are distributed at the same time. Do not %% ban the peer as the block might be valid. If the network adopts %% this block, ar_poller will catch up. - ok; + skipped; #block{ height = PrevHeight } = PrevB -> case B#block.height == PrevHeight + 1 of false -> - ok; + invalid; true -> case B#block.height >= ar_fork:height_2_6() of true -> PrevCDiff = B#block.previous_cumulative_diff, case PrevB#block.cumulative_diff == PrevCDiff of true -> - pre_validate_indep_hash(B, PrevB, Peer, Timestamp, - ReadBodyTime, BodySize); + pre_validate_indep_hash(B, PrevB, Peer, QueryBlockTime); false -> - ok + invalid end; false -> - pre_validate_may_be_fetch_chunk(B, PrevB, Peer, Timestamp, - ReadBodyTime, BodySize) + pre_validate_may_be_fetch_chunk(B, PrevB, Peer, QueryBlockTime) end end end. -pre_validate_indep_hash(#block{ indep_hash = H } = B, PrevB, Peer, Timestamp, ReadBodyTime, - BodySize) -> +pre_validate_indep_hash(#block{ indep_hash = H } = B, PrevB, Peer, QueryBlockTime) -> case catch compute_hash(B, PrevB#block.cumulative_diff) of {ok, {BDS, H}} -> ar_ignore_registry:add_temporary(H, 5000), - pre_validate_timestamp(B, BDS, PrevB, Peer, Timestamp, ReadBodyTime, BodySize); + pre_validate_timestamp(B, BDS, PrevB, Peer, QueryBlockTime); {ok, H} -> case ar_ignore_registry:permanent_member(H) of true -> - ok; + skipped; false -> ar_ignore_registry:add_temporary(H, 5000), - pre_validate_timestamp(B, none, PrevB, Peer, Timestamp, ReadBodyTime, - BodySize) + pre_validate_timestamp(B, none, PrevB, Peer, QueryBlockTime) end; {error, invalid_signature} -> post_block_reject_warn(B, check_signature, Peer), ar_events:send(block, {rejected, invalid_signature, B#block.indep_hash, Peer}), - ok; + invalid; {ok, _DifferentH} -> post_block_reject_warn(B, check_indep_hash, Peer), ar_events:send(block, {rejected, invalid_hash, B#block.indep_hash, Peer}), - ok + invalid end. -pre_validate_timestamp(B, BDS, PrevB, Peer, Timestamp, ReadBodyTime, BodySize) -> +pre_validate_timestamp(B, BDS, PrevB, Peer, QueryBlockTime) -> #block{ indep_hash = H } = B, case ar_block:verify_timestamp(B, PrevB) of true -> - pre_validate_existing_solution_hash(B, BDS, PrevB, Peer, Timestamp, ReadBodyTime, - BodySize); + pre_validate_existing_solution_hash(B, BDS, PrevB, Peer, QueryBlockTime); false -> post_block_reject_warn(B, check_timestamp, Peer, [{block_time, B#block.timestamp}, {current_time, os:system_time(seconds)}]), ar_events:send(block, {rejected, invalid_timestamp, H, Peer}), ar_ignore_registry:remove_temporary(B#block.indep_hash), - ok + invalid end. -pre_validate_existing_solution_hash(B, BDS, PrevB, Peer, Timestamp, ReadBodyTime, BodySize) -> +pre_validate_existing_solution_hash(B, BDS, PrevB, Peer, QueryBlockTime) -> case B#block.height >= ar_fork:height_2_6() of false -> - pre_validate_last_retarget(B, BDS, PrevB, false, Peer, Timestamp, ReadBodyTime, - BodySize); + pre_validate_last_retarget(B, BDS, PrevB, false, Peer, QueryBlockTime); true -> SolutionH = B#block.hash, #block{ hash = SolutionH, nonce = Nonce, reward_addr = RewardAddr, @@ -316,15 +317,15 @@ pre_validate_existing_solution_hash(B, BDS, PrevB, Peer, Timestamp, ReadBodyTime case ValidatedCachedSolutionDiff of not_found -> pre_validate_nonce_limiter_global_step_number(B, BDS, PrevB, false, Peer, - Timestamp, ReadBodyTime, BodySize); + QueryBlockTime); invalid -> post_block_reject_warn(B, check_resigned_solution_hash, Peer), ar_events:send(block, {rejected, invalid_resigned_solution_hash, B#block.indep_hash, Peer}), - ok; + invalid; {valid, B3} -> pre_validate_nonce_limiter_global_step_number(B3, BDS, PrevB, true, Peer, - Timestamp, ReadBodyTime, BodySize) + QueryBlockTime) end end. @@ -358,8 +359,8 @@ get_last_step_prev_output(B) -> PrevOutput end. -pre_validate_nonce_limiter_global_step_number(B, BDS, PrevB, SolutionResigned, Peer, Timestamp, - ReadBodyTime, BodySize) -> +pre_validate_nonce_limiter_global_step_number(B, BDS, PrevB, SolutionResigned, Peer, + QueryBlockTime) -> BlockInfo = B#block.nonce_limiter_info, StepNumber = BlockInfo#nonce_limiter_info.global_step_number, PrevBlockInfo = PrevB#block.nonce_limiter_info, @@ -389,49 +390,41 @@ pre_validate_nonce_limiter_global_step_number(B, BDS, PrevB, SolutionResigned, P ar_events:send(block, {rejected, invalid_nonce_limiter_global_step_number, H, Peer}), ar_ignore_registry:remove_temporary(B#block.indep_hash), - ok; + invalid; true -> prometheus_gauge:set(block_vdf_advance, StepNumber - CurrentStepNumber), pre_validate_previous_solution_hash(B, BDS, PrevB, SolutionResigned, Peer, - Timestamp, ReadBodyTime, BodySize) + QueryBlockTime) end. -pre_validate_previous_solution_hash(B, BDS, PrevB, SolutionResigned, Peer, Timestamp, - ReadBodyTime, BodySize) -> +pre_validate_previous_solution_hash(B, BDS, PrevB, SolutionResigned, Peer, QueryBlockTime) -> case B#block.previous_solution_hash == PrevB#block.hash of false -> post_block_reject_warn(B, check_previous_solution_hash, Peer), - ar_blacklist_middleware:ban_peer(Peer, ?BAD_BLOCK_BAN_TIME), ar_events:send(block, {rejected, invalid_previous_solution_hash, B#block.indep_hash, Peer}), - ok; + invalid; true -> - pre_validate_last_retarget(B, BDS, PrevB, SolutionResigned, Peer, Timestamp, - ReadBodyTime, BodySize) + pre_validate_last_retarget(B, BDS, PrevB, SolutionResigned, Peer, QueryBlockTime) end. -pre_validate_last_retarget(B, BDS, PrevB, SolutionResigned, Peer, Timestamp, ReadBodyTime, - BodySize) -> +pre_validate_last_retarget(B, BDS, PrevB, SolutionResigned, Peer, QueryBlockTime) -> case B#block.height >= ar_fork:height_2_6() of false -> - pre_validate_difficulty(B, BDS, PrevB, SolutionResigned, Peer, Timestamp, - ReadBodyTime, BodySize); + pre_validate_difficulty(B, BDS, PrevB, SolutionResigned, Peer, QueryBlockTime); true -> case ar_block:verify_last_retarget(B, PrevB) of true -> - pre_validate_difficulty(B, BDS, PrevB, SolutionResigned, Peer, Timestamp, - ReadBodyTime, BodySize); + pre_validate_difficulty(B, BDS, PrevB, SolutionResigned, Peer, QueryBlockTime); false -> post_block_reject_warn(B, check_last_retarget, Peer), - ar_blacklist_middleware:ban_peer(Peer, ?BAD_BLOCK_BAN_TIME), ar_events:send(block, {rejected, invalid_last_retarget, B#block.indep_hash, Peer}), - ok + invalid end end. -pre_validate_difficulty(B, BDS, PrevB, SolutionResigned, Peer, Timestamp, ReadBodyTime, - BodySize) -> +pre_validate_difficulty(B, BDS, PrevB, SolutionResigned, Peer, QueryBlockTime) -> DiffValid = case B#block.height >= ar_fork:height_2_6() of true -> @@ -442,40 +435,37 @@ pre_validate_difficulty(B, BDS, PrevB, SolutionResigned, Peer, Timestamp, ReadBo case DiffValid of true -> pre_validate_cumulative_difficulty(B, BDS, PrevB, SolutionResigned, Peer, - Timestamp, ReadBodyTime, BodySize); + QueryBlockTime); _ -> post_block_reject_warn(B, check_difficulty, Peer), - ar_blacklist_middleware:ban_peer(Peer, ?BAD_BLOCK_BAN_TIME), ar_events:send(block, {rejected, invalid_difficulty, B#block.indep_hash, Peer}), - ok + invalid end. -pre_validate_cumulative_difficulty(B, BDS, PrevB, SolutionResigned, Peer, Timestamp, - ReadBodyTime, BodySize) -> +pre_validate_cumulative_difficulty(B, BDS, PrevB, SolutionResigned, Peer, QueryBlockTime) -> case B#block.height >= ar_fork:height_2_6() of true -> case ar_block:verify_cumulative_diff(B, PrevB) of false -> post_block_reject_warn(B, check_cumulative_difficulty, Peer), - ar_blacklist_middleware:ban_peer(Peer, ?BAD_BLOCK_BAN_TIME), ar_events:send(block, {rejected, invalid_cumulative_difficulty, B#block.indep_hash, Peer}), - ok; + invalid; true -> case SolutionResigned of true -> gen_server:cast(?MODULE, {enqueue, {B, PrevB, true, Peer, - Timestamp, ReadBodyTime, BodySize}}); + QueryBlockTime}}), + ok; false -> - pre_validate_quick_pow(B, PrevB, false, Peer, Timestamp, - ReadBodyTime, BodySize) + pre_validate_quick_pow(B, PrevB, false, Peer, QueryBlockTime) end end; false -> - pre_validate_pow(B, BDS, PrevB, Peer, Timestamp, ReadBodyTime, BodySize) + pre_validate_pow(B, BDS, PrevB, Peer, QueryBlockTime) end. -pre_validate_quick_pow(B, PrevB, SolutionResigned, Peer, Timestamp, ReadBodyTime, BodySize) -> +pre_validate_quick_pow(B, PrevB, SolutionResigned, Peer, QueryBlockTime) -> #block{ hash_preimage = HashPreimage, diff = Diff, nonce_limiter_info = NonceLimiterInfo, partition_number = PartitionNumber, reward_addr = RewardAddr } = B, PrevNonceLimiterInfo = get_prev_nonce_limiter_info(PrevB), @@ -494,18 +484,18 @@ pre_validate_quick_pow(B, PrevB, SolutionResigned, Peer, Timestamp, ReadBodyTime not_found -> %% The new blocks should have been applied in the meantime since we %% looked for the previous block in the block cache. - ok; + skipped; _ -> case binary:decode_unsigned(SolutionHash, big) > Diff of false -> post_block_reject_warn(B, check_hash_preimage, Peer), - ar_blacklist_middleware:ban_peer(Peer, ?BAD_BLOCK_BAN_TIME), ar_events:send(block, {rejected, invalid_hash_preimage, B#block.indep_hash, Peer}), - ok; + invalid; true -> gen_server:cast(?MODULE, {enqueue, {B, PrevB, SolutionResigned, Peer, - Timestamp, ReadBodyTime, BodySize}}) + QueryBlockTime}}), + ok end end. @@ -527,8 +517,7 @@ get_prev_nonce_limiter_info(#block{ indep_hash = PrevH, height = PrevHeight } = PrevB#block.nonce_limiter_info end. -pre_validate_nonce_limiter_seed_data(B, PrevB, SolutionResigned, Peer, Timestamp, ReadBodyTime, - BodySize) -> +pre_validate_nonce_limiter_seed_data(B, PrevB, SolutionResigned, Peer, QueryBlockTime) -> Info = B#block.nonce_limiter_info, #nonce_limiter_info{ global_step_number = StepNumber, seed = Seed, next_seed = NextSeed, partition_upper_bound = PartitionUpperBound, @@ -538,7 +527,7 @@ pre_validate_nonce_limiter_seed_data(B, PrevB, SolutionResigned, Peer, Timestamp not_found -> %% The new blocks should have been applied in the meantime since we %% looked for the previous block in the block cache. - ok; + skipped; PrevNonceLimiterInfo -> ExpectedSeedData = ar_nonce_limiter:get_seed_data(StepNumber, PrevNonceLimiterInfo, PrevB#block.indep_hash, PrevB#block.weave_size), @@ -546,53 +535,49 @@ pre_validate_nonce_limiter_seed_data(B, PrevB, SolutionResigned, Peer, Timestamp NextPartitionUpperBound} of true -> pre_validate_partition_number(B, PrevB, PartitionUpperBound, - SolutionResigned, Peer, Timestamp, ReadBodyTime, BodySize); + SolutionResigned, Peer, QueryBlockTime); false -> post_block_reject_warn(B, check_nonce_limiter_seed_data, Peer), - ar_blacklist_middleware:ban_peer(Peer, ?BAD_BLOCK_BAN_TIME), ar_events:send(block, {rejected, invalid_nonce_limiter_seed_data, B#block.indep_hash, Peer}), - ok + invalid end end. -pre_validate_partition_number(B, PrevB, PartitionUpperBound, SolutionResigned, Peer, Timestamp, - ReadBodyTime, BodySize) -> +pre_validate_partition_number(B, PrevB, PartitionUpperBound, SolutionResigned, Peer, + QueryBlockTime) -> Max = max(0, PartitionUpperBound div ?PARTITION_SIZE - 1), case B#block.partition_number > Max of true -> post_block_reject_warn(B, check_partition_number, Peer), - ar_blacklist_middleware:ban_peer(Peer, ?BAD_BLOCK_BAN_TIME), ar_events:send(block, {rejected, invalid_partition_number, B#block.indep_hash, Peer}), - ok; + invalid; false -> pre_validate_nonce(B, PrevB, PartitionUpperBound, SolutionResigned, Peer, - Timestamp, ReadBodyTime, BodySize) + QueryBlockTime) end. -pre_validate_nonce(B, PrevB, PartitionUpperBound, SolutionResigned, Peer, Timestamp, - ReadBodyTime, BodySize) -> +pre_validate_nonce(B, PrevB, PartitionUpperBound, SolutionResigned, Peer, QueryBlockTime) -> Max = max(0, (?RECALL_RANGE_SIZE) div ?DATA_CHUNK_SIZE - 1), case B#block.nonce > Max of true -> post_block_reject_warn(B, check_nonce, Peer), - ar_blacklist_middleware:ban_peer(Peer, ?BAD_BLOCK_BAN_TIME), ar_events:send(block, {rejected, invalid_nonce, B#block.indep_hash, Peer}), - ok; + invalid; false -> case SolutionResigned of true -> - accept_block(B, Peer, ReadBodyTime, BodySize, Timestamp, false); + accept_block(B, Peer, QueryBlockTime, false); false -> pre_validate_may_be_fetch_first_chunk(B, PrevB, PartitionUpperBound, Peer, - Timestamp, ReadBodyTime, BodySize) + QueryBlockTime) end end. pre_validate_may_be_fetch_first_chunk(#block{ recall_byte = RecallByte, - poa = #poa{ chunk = <<>> } } = B, PrevB, PartitionUpperBound, Peer, Timestamp, - ReadBodyTime, BodySize) when RecallByte /= undefined -> + poa = #poa{ chunk = <<>> } } = B, PrevB, PartitionUpperBound, Peer, QueryBlockTime) + when RecallByte /= undefined -> case ar_data_sync:get_chunk(RecallByte + 1, #{ pack => true, packing => {spora_2_6, B#block.reward_addr}, bucket_based_offset => true }) of {ok, #{ chunk := Chunk, data_path := DataPath, tx_path := TXPath }} -> @@ -600,39 +585,34 @@ pre_validate_may_be_fetch_first_chunk(#block{ recall_byte = RecallByte, B2 = B#block{ poa = #poa{ chunk = Chunk, data_path = DataPath, tx_path = TXPath } }, pre_validate_may_be_fetch_second_chunk(B2, PrevB, PartitionUpperBound, - Peer, Timestamp, ReadBodyTime, BodySize); + Peer, QueryBlockTime); _ -> ar_events:send(block, {rejected, failed_to_fetch_first_chunk, B#block.indep_hash, Peer}), - ok + invalid end; -pre_validate_may_be_fetch_first_chunk(B, PrevB, PartitionUpperBound, Peer, Timestamp, - ReadBodyTime, BodySize) -> - pre_validate_may_be_fetch_second_chunk(B, PrevB, PartitionUpperBound, Peer, Timestamp, - ReadBodyTime, BodySize). +pre_validate_may_be_fetch_first_chunk(B, PrevB, PartitionUpperBound, Peer, QueryBlockTime) -> + pre_validate_may_be_fetch_second_chunk(B, PrevB, PartitionUpperBound, Peer, QueryBlockTime). pre_validate_may_be_fetch_second_chunk(#block{ recall_byte2 = RecallByte2, - poa2 = #poa{ chunk = <<>> } } = B, PrevB, PartitionUpperBound, Peer, Timestamp, - ReadBodyTime, BodySize) when RecallByte2 /= undefined -> + poa2 = #poa{ chunk = <<>> } } = B, PrevB, PartitionUpperBound, Peer, QueryBlockTime) + when RecallByte2 /= undefined -> case ar_data_sync:get_chunk(RecallByte2 + 1, #{ pack => true, packing => {spora_2_6, B#block.reward_addr}, bucket_based_offset => true }) of {ok, #{ chunk := Chunk, data_path := DataPath, tx_path := TXPath }} -> prometheus_counter:inc(block2_fetched_chunks), B2 = B#block{ poa2 = #poa{ chunk = Chunk, data_path = DataPath, tx_path = TXPath } }, - pre_validate_pow_2_6(B2, PrevB, PartitionUpperBound, Peer, Timestamp, - ReadBodyTime, BodySize); + pre_validate_pow_2_6(B2, PrevB, PartitionUpperBound, Peer, QueryBlockTime); _ -> ar_events:send(block, {rejected, failed_to_fetch_second_chunk, B#block.indep_hash, Peer}), - ok + invalid end; -pre_validate_may_be_fetch_second_chunk(B, PrevB, PartitionUpperBound, Peer, Timestamp, - ReadBodyTime, BodySize) -> - pre_validate_pow_2_6(B, PrevB, PartitionUpperBound, Peer, Timestamp, ReadBodyTime, - BodySize). +pre_validate_may_be_fetch_second_chunk(B, PrevB, PartitionUpperBound, Peer, QueryBlockTime) -> + pre_validate_pow_2_6(B, PrevB, PartitionUpperBound, Peer, QueryBlockTime). -pre_validate_pow_2_6(B, PrevB, PartitionUpperBound, Peer, Timestamp, ReadBodyTime, BodySize) -> +pre_validate_pow_2_6(B, PrevB, PartitionUpperBound, Peer, QueryBlockTime) -> NonceLimiterInfo = B#block.nonce_limiter_info, NonceLimiterOutput = NonceLimiterInfo#nonce_limiter_info.output, PrevNonceLimiterInfo = get_prev_nonce_limiter_info(PrevB), @@ -645,26 +625,22 @@ pre_validate_pow_2_6(B, PrevB, PartitionUpperBound, Peer, Timestamp, ReadBodyTim andalso Preimage1 == B#block.hash_preimage andalso B#block.recall_byte2 == undefined of true -> - pre_validate_poa(B, PrevB, PartitionUpperBound, H0, H1, Peer, Timestamp, - ReadBodyTime, BodySize); + pre_validate_poa(B, PrevB, PartitionUpperBound, H0, H1, Peer, QueryBlockTime); false -> Chunk2 = (B#block.poa2)#poa.chunk, {H2, Preimage2} = ar_block:compute_h2(H1, Chunk2, H0), case H2 == B#block.hash andalso binary:decode_unsigned(H2, big) > B#block.diff andalso Preimage2 == B#block.hash_preimage of true -> - pre_validate_poa(B, PrevB, PartitionUpperBound, H0, H1, Peer, Timestamp, - ReadBodyTime, BodySize); + pre_validate_poa(B, PrevB, PartitionUpperBound, H0, H1, Peer, QueryBlockTime); false -> post_block_reject_warn(B, check_pow, Peer), - ar_blacklist_middleware:ban_peer(Peer, ?BAD_BLOCK_BAN_TIME), ar_events:send(block, {rejected, invalid_pow, B#block.indep_hash, Peer}), - ok + invalid end end. -pre_validate_poa(B, PrevB, PartitionUpperBound, H0, H1, Peer, Timestamp, ReadBodyTime, - BodySize) -> +pre_validate_poa(B, PrevB, PartitionUpperBound, H0, H1, Peer, QueryBlockTime) -> {RecallRange1Start, RecallRange2Start} = ar_block:get_recall_range(H0, B#block.partition_number, PartitionUpperBound), RecallByte1 = RecallRange1Start + B#block.nonce * ?DATA_CHUNK_SIZE, @@ -675,17 +651,16 @@ pre_validate_poa(B, PrevB, PartitionUpperBound, H0, H1, Peer, Timestamp, ReadBod andalso RecallByte1 == B#block.recall_byte of error -> ?LOG_ERROR([{event, failed_to_validate_proof_of_access}, - {block, ar_util:encode(B#block.indep_hash)}]); + {block, ar_util:encode(B#block.indep_hash)}]), + invalid; false -> post_block_reject_warn(B, check_poa, Peer), - ar_blacklist_middleware:ban_peer(Peer, ?BAD_BLOCK_BAN_TIME), ar_events:send(block, {rejected, invalid_poa, B#block.indep_hash, Peer}), - ok; + invalid; true -> case B#block.hash == H1 of true -> - pre_validate_nonce_limiter(B, PrevB, Peer, Timestamp, ReadBodyTime, - BodySize); + pre_validate_nonce_limiter(B, PrevB, Peer, QueryBlockTime); false -> RecallByte2 = RecallRange2Start + B#block.nonce * ?DATA_CHUNK_SIZE, {BlockStart2, BlockEnd2, TXRoot2} = ar_block_index:get_block_bounds( @@ -697,65 +672,62 @@ pre_validate_poa(B, PrevB, PartitionUpperBound, H0, H1, Peer, Timestamp, ReadBod andalso RecallByte2 == B#block.recall_byte2 of error -> ?LOG_ERROR([{event, failed_to_validate_proof_of_access}, - {block, ar_util:encode(B#block.indep_hash)}]); + {block, ar_util:encode(B#block.indep_hash)}]), + invalid; false -> post_block_reject_warn(B, check_poa2, Peer), - ar_blacklist_middleware:ban_peer(Peer, ?BAD_BLOCK_BAN_TIME), ar_events:send(block, {rejected, invalid_poa2, B#block.indep_hash, Peer}), - ok; + invalid; true -> - pre_validate_nonce_limiter(B, PrevB, Peer, Timestamp, ReadBodyTime, - BodySize) + pre_validate_nonce_limiter(B, PrevB, Peer, QueryBlockTime) end end end. -pre_validate_nonce_limiter(B, PrevB, Peer, Timestamp, ReadBodyTime, BodySize) -> +pre_validate_nonce_limiter(B, PrevB, Peer, QueryBlockTime) -> PrevOutput = get_last_step_prev_output(B), case ar_nonce_limiter:validate_last_step_checkpoints(B, PrevB, PrevOutput) of {false, cache_mismatch} -> ar_ignore_registry:add(B#block.indep_hash), post_block_reject_warn(B, check_nonce_limiter, Peer), - ar_events:send(block, {rejected, invalid_nonce_limiter, B#block.indep_hash, Peer}), - ok; + ar_events:send(block, {rejected, invalid_nonce_limiter_cache_mismatch, B#block.indep_hash, Peer}), + invalid; false -> post_block_reject_warn(B, check_nonce_limiter, Peer), - ar_blacklist_middleware:ban_peer(Peer, ?BAD_BLOCK_BAN_TIME), ar_events:send(block, {rejected, invalid_nonce_limiter, B#block.indep_hash, Peer}), - ok; + invalid; {true, cache_match} -> - accept_block(B, Peer, ReadBodyTime, BodySize, Timestamp, true); + accept_block(B, Peer, QueryBlockTime, true); true -> - accept_block(B, Peer, ReadBodyTime, BodySize, Timestamp, false) + accept_block(B, Peer, QueryBlockTime, false) end. -accept_block(B, Peer, ReadBodyTime, BodySize, Timestamp, Gossip) -> +accept_block(B, Peer, QueryBlockTime, Gossip) -> ar_ignore_registry:add(B#block.indep_hash), - ar_events:send(block, {new, B, #{ source => {peer, Peer}, gossip => Gossip }}), - ar_events:send(peer, {gossiped_block, Peer, ReadBodyTime, BodySize}), - record_block_pre_validation_time(Timestamp), + ar_events:send(block, {new, B, + #{ source => {peer, Peer}, query_block_time => QueryBlockTime, gossip => Gossip }}), ?LOG_INFO([{event, accepted_block}, {height, B#block.height}, - {indep_hash, ar_util:encode(B#block.indep_hash)}]). + {indep_hash, ar_util:encode(B#block.indep_hash)}]), + ok. pre_validate_may_be_fetch_chunk(#block{ recall_byte = RecallByte, - poa = #poa{ chunk = <<>> } } = B, PrevB, Peer, Timestamp, ReadBodyTime, - BodySize) when RecallByte /= undefined -> + poa = #poa{ chunk = <<>> } } = B, PrevB, Peer, QueryBlockTime) when RecallByte /= undefined -> Options = #{ pack => false, packing => spora_2_5, bucket_based_offset => true }, case ar_data_sync:get_chunk(RecallByte + 1, Options) of {ok, #{ chunk := Chunk, data_path := DataPath, tx_path := TXPath }} -> prometheus_counter:inc(block2_fetched_chunks), B2 = B#block{ poa = #poa{ chunk = Chunk, tx_path = TXPath, data_path = DataPath } }, - pre_validate_indep_hash(B2, PrevB, Peer, Timestamp, ReadBodyTime, BodySize); + pre_validate_indep_hash(B2, PrevB, Peer, QueryBlockTime); _ -> ar_events:send(block, {rejected, failed_to_fetch_chunk, B#block.indep_hash, Peer}), - ok + invalid end; -pre_validate_may_be_fetch_chunk(B, PrevB, Peer, Timestamp, ReadBodyTime, BodySize) -> - pre_validate_indep_hash(B, PrevB, Peer, Timestamp, ReadBodyTime, BodySize). +pre_validate_may_be_fetch_chunk(B, PrevB, Peer, QueryBlockTime) -> + pre_validate_indep_hash(B, PrevB, Peer, QueryBlockTime). -pre_validate_pow(B, BDS, PrevB, Peer, Timestamp, ReadBodyTime, BodySize) -> +pre_validate_pow(B, BDS, PrevB, Peer, QueryBlockTime) -> #block{ indep_hash = PrevH } = PrevB, MaybeValid = case ar_node:get_recent_partition_upper_bound_by_prev_h(PrevH) of @@ -768,26 +740,24 @@ pre_validate_pow(B, BDS, PrevB, Peer, Timestamp, ReadBodyTime, BodySize) -> not_found -> %% The new blocks should have been applied in the meantime since we %% looked for the previous block in the block cache. - ok; + skipped; {true, RecallByte} -> H = B#block.indep_hash, %% Include all transactions found in the mempool in place of the %% corresponding transaction identifiers so that we can gossip them to %% peers who miss them along with the block. B2 = B#block{ txs = include_transactions(B#block.txs) }, - ar_events:send(block, {new, B2, #{ source => {peer, Peer}, - recall_byte => RecallByte }}), - ar_events:send(peer, {gossiped_block, Peer, ReadBodyTime, BodySize}), - record_block_pre_validation_time(Timestamp), + ar_events:send(block, {new, B2, #{ + source => {peer, Peer}, query_block_time => QueryBlockTime, + recall_byte => RecallByte }}), prometheus_counter:inc(block2_received_transactions, count_received_transactions(B#block.txs)), ?LOG_INFO([{event, accepted_block}, {indep_hash, ar_util:encode(H)}]), ok; false -> post_block_reject_warn(B, check_pow, Peer), - ar_blacklist_middleware:ban_peer(Peer, ?BAD_BLOCK_BAN_TIME), ar_events:send(block, {rejected, invalid_pow, B#block.indep_hash, Peer}), - ok + invalid end. compute_hash(B, PrevCDiff) -> diff --git a/apps/arweave/src/ar_block_propagation_worker.erl b/apps/arweave/src/ar_block_propagation_worker.erl index 12faac972..eab815901 100644 --- a/apps/arweave/src/ar_block_propagation_worker.erl +++ b/apps/arweave/src/ar_block_propagation_worker.erl @@ -49,12 +49,10 @@ handle_cast({send_block2, Peer, SendAnnouncementFun, SendFun, RetryCount, From}, {ok, {{<<"200">>, _}, _, Body, _, _}} -> case catch ar_serialize:binary_to_block_announcement_response(Body) of {'EXIT', Reason} -> - ar_events:send(peer, {bad_response, - {Peer, block_announcement, Reason}}), + ar_peers:issue_warning(Peer, block_announcement, Reason), From ! {worker_sent_block, self()}; {error, Reason} -> - ar_events:send(peer, {bad_response, - {Peer, block_announcement, Reason}}), + ar_peers:issue_warning(Peer, block_announcement, Reason), From ! {worker_sent_block, self()}; {ok, #block_announcement_response{ missing_tx_indices = L, missing_chunk = MissingChunk, missing_chunk2 = MissingChunk2 }} -> diff --git a/apps/arweave/src/ar_config.erl b/apps/arweave/src/ar_config.erl index 5c8c45323..992abe6c8 100644 --- a/apps/arweave/src/ar_config.erl +++ b/apps/arweave/src/ar_config.erl @@ -473,18 +473,27 @@ parse_options([{<<"max_nonce_limiter_last_step_validation_thread_count">>, D} | parse_options([{<<"vdf_server_trusted_peer">>, <<>>} | Rest], Config) -> parse_options(Rest, Config); parse_options([{<<"vdf_server_trusted_peer">>, Peer} | Rest], Config) -> - parse_options(Rest, parse_vdf_server_trusted_peer(Peer, Config)); + #config{ nonce_limiter_server_trusted_peers = Peers } = Config, + parse_options(Rest, + Config#config{ nonce_limiter_server_trusted_peers = Peers ++ parse_peers([Peer]) }); parse_options([{<<"vdf_server_trusted_peers">>, Peers} | Rest], Config) when is_list(Peers) -> - parse_options(Rest, parse_vdf_server_trusted_peers(Peers, Config)); + #config{ nonce_limiter_server_trusted_peers = ExistingPeers } = Config, + parse_options(Rest, + Config#config{ nonce_limiter_server_trusted_peers = ExistingPeers ++ parse_peers(Peers) }); parse_options([{<<"vdf_server_trusted_peers">>, Peers} | _], _) -> {error, {bad_type, vdf_server_trusted_peers, array}, Peers}; parse_options([{<<"vdf_client_peers">>, Peers} | Rest], Config) when is_list(Peers) -> - parse_options(Rest, Config#config{ nonce_limiter_client_peers = Peers }); + parse_options(Rest, Config#config{ nonce_limiter_client_peers = parse_peers(Peers) }); parse_options([{<<"vdf_client_peers">>, Peers} | _], _) -> {error, {bad_type, vdf_client_peers, array}, Peers}; +parse_options([{<<"p3_server_peers">>, Peers} | Rest], Config) when is_list(Peers) -> + parse_options(Rest, Config#config{ p3_server_peers = parse_peers(Peers) }); +parse_options([{<<"p3_server_peers">>, Peers} | _], _) -> + {error, {bad_type, vdf_clp3_server_peersient_peers, array}, Peers}; + parse_options([{<<"debug">>, B} | Rest], Config) when is_boolean(B) -> parse_options(Rest, Config#config{ debug = B }); @@ -631,17 +640,8 @@ parse_requests_per_minute_limit_by_ip({[]}, Parsed) -> parse_requests_per_minute_limit_by_ip(_, _) -> error. -parse_vdf_server_trusted_peers([Peer | Rest], Config) -> - Config2 = parse_vdf_server_trusted_peer(Peer, Config), - parse_vdf_server_trusted_peers(Rest, Config2); -parse_vdf_server_trusted_peers([], Config) -> - Config. - -parse_vdf_server_trusted_peer(Peer, Config) when is_binary(Peer) -> - parse_vdf_server_trusted_peer(binary_to_list(Peer), Config); -parse_vdf_server_trusted_peer(Peer, Config) -> - #config{ nonce_limiter_server_trusted_peers = Peers } = Config, - Config#config{ nonce_limiter_server_trusted_peers = Peers ++ [Peer] }. +parse_peers(Peers) -> + [ar_util:peer_to_str(Peer) || Peer <- Peers]. format_config(Config) -> Fields = record_info(fields, config), diff --git a/apps/arweave/src/ar_data_discovery.erl b/apps/arweave/src/ar_data_discovery.erl index 8ed48be30..5dde7510d 100644 --- a/apps/arweave/src/ar_data_discovery.erl +++ b/apps/arweave/src/ar_data_discovery.erl @@ -163,7 +163,7 @@ handle_info({'EXIT', _, normal}, State) -> handle_info({'DOWN', _, process, _, _}, #state{ peers_pending = N } = State) -> {noreply, State#state{ peers_pending = N - 1 }}; -handle_info({event, peer, {bad_response, {Peer, _Resource, _Reason}}}, State) -> +handle_info({event, peer, {removed, Peer}}, State) -> gen_server:cast(?MODULE, {remove_peer, Peer}), {noreply, State}; diff --git a/apps/arweave/src/ar_data_sync.erl b/apps/arweave/src/ar_data_sync.erl index 96aed7423..79d721776 100644 --- a/apps/arweave/src/ar_data_sync.erl +++ b/apps/arweave/src/ar_data_sync.erl @@ -913,7 +913,7 @@ handle_cast(sync_intervals, State) -> sync_intervals_queue_intervals = I2 }} end; -handle_cast({store_fetched_chunk, Peer, Time, TransferSize, Byte, Proof} = Cast, State) -> +handle_cast({store_fetched_chunk, Peer, Byte, Proof} = Cast, State) -> #sync_data_state{ packing_map = PackingMap } = State, #{ data_path := DataPath, tx_path := TXPath, chunk := Chunk, packing := Packing } = Proof, SeekByte = get_chunk_seek_offset(Byte + 1) - 1, @@ -944,7 +944,6 @@ handle_cast({store_fetched_chunk, Peer, Time, TransferSize, Byte, Proof} = Cast, ar_util:cast_after(1000, self(), Cast), {noreply, State}; false -> - ar_events:send(peer, {served_chunk, Peer, Time, TransferSize}), ar_packing_server:request_unpack(AbsoluteOffset, ChunkArgs), ?LOG_DEBUG([{event, requested_fetched_chunk_unpacking}, {data_path_hash, ar_util:encode(crypto:hash(sha256, @@ -964,7 +963,6 @@ handle_cast({store_fetched_chunk, Peer, Time, TransferSize, Byte, Proof} = Cast, decrement_chunk_cache_size(), process_invalid_fetched_chunk(Peer, Byte, State); {true, DataRoot, TXStartOffset, ChunkEndOffset, TXSize, ChunkSize, ChunkID} -> - ar_events:send(peer, {served_chunk, Peer, Time, TransferSize}), AbsoluteTXStartOffset = BlockStartOffset + TXStartOffset, AbsoluteEndOffset = AbsoluteTXStartOffset + ChunkEndOffset, ChunkArgs = {unpacked, Chunk, AbsoluteEndOffset, TXRoot, ChunkSize}, @@ -1369,7 +1367,7 @@ get_chunk(Offset, SeekOffset, Pack, Packing, StoredPacking, StoreID) -> {error, Reason}; {ok, {Chunk, DataPath}, AbsoluteOffset, TXRoot, ChunkSize, TXPath} -> ChunkID = - case validate_served_chunk({AbsoluteOffset, DataPath, TXPath, TXRoot, + case validate_fetched_chunk({AbsoluteOffset, DataPath, TXPath, TXRoot, ChunkSize, StoreID}) of {true, ID} -> ID; @@ -1498,7 +1496,7 @@ invalidate_bad_data_record({Start, End, ChunksIndex, StoreID, Case}) -> end end. -validate_served_chunk(Args) -> +validate_fetched_chunk(Args) -> {Offset, DataPath, TXPath, TXRoot, ChunkSize, StoreID} = Args, [{_, T}] = ets:lookup(ar_data_sync_state, disk_pool_threshold), case Offset > T orelse not ar_node:is_joined() of diff --git a/apps/arweave/src/ar_data_sync_worker.erl b/apps/arweave/src/ar_data_sync_worker.erl index 5b88a11ee..02e410a99 100644 --- a/apps/arweave/src/ar_data_sync_worker.erl +++ b/apps/arweave/src/ar_data_sync_worker.erl @@ -54,7 +54,6 @@ handle_cast({read_range, Args}, State) -> {noreply, State}; handle_cast({sync_range, Args}, State) -> - {_Start, _End, Peer, _TargetStoreID, _RetryCount} = Args, StartTime = erlang:monotonic_time(), SyncResult = sync_range(Args), EndTime = erlang:monotonic_time(), @@ -62,8 +61,8 @@ handle_cast({sync_range, Args}, State) -> recast -> ok; _ -> - gen_server:cast(ar_data_sync_worker_master, - {task_completed, {sync_range, {State#state.name, SyncResult, Peer, EndTime-StartTime}}}) + gen_server:cast(ar_data_sync_worker_master, {task_completed, + {sync_range, {State#state.name, SyncResult, Args, EndTime-StartTime}}}) end, {noreply, State}; diff --git a/apps/arweave/src/ar_data_sync_worker_master.erl b/apps/arweave/src/ar_data_sync_worker_master.erl index 6f6492ec7..a26e8352b 100644 --- a/apps/arweave/src/ar_data_sync_worker_master.erl +++ b/apps/arweave/src/ar_data_sync_worker_master.erl @@ -12,23 +12,23 @@ -include_lib("arweave/include/ar_consensus.hrl"). -include_lib("arweave/include/ar_config.hrl"). -include_lib("arweave/include/ar_data_sync.hrl"). +-include_lib("arweave/include/ar_peers.hrl"). -include_lib("eunit/include/eunit.hrl"). +-define(REBALANCE_FREQUENCY_MS, 60*1000). -define(READ_RANGE_CHUNKS, 10). -define(MIN_MAX_ACTIVE, 8). -define(LATENCY_ALPHA, 0.1). -define(SUCCESS_ALPHA, 0.1). --define(STARTING_LATENCY_EMA, 1000). %% initial value to avoid over-weighting the first response --define(STARTING_LATENCY_TARGET, 2000). %% initial value to avoid over-weighting the first response -record(peer_tasks, { peer = undefined, task_queue = queue:new(), task_queue_len = 0, active_count = 0, - max_active = ?MIN_MAX_ACTIVE, - latency_ema = ?STARTING_LATENCY_EMA, - success_ema = 1.0 + max_active = ?MIN_MAX_ACTIVE + % latency_ema = ?STARTING_LATENCY_EMA, + % success_ema = 1.0 }). -record(state, { @@ -39,7 +39,7 @@ workers = queue:new(), worker_count = 0, worker_loads = #{}, - latency_target = ?STARTING_LATENCY_TARGET, + throughput_target = 0, peer_tasks = #{} }). @@ -73,6 +73,7 @@ ready_for_work() -> init(Workers) -> process_flag(trap_exit, true), gen_server:cast(?MODULE, process_main_queue), + ar_util:cast_after(?REBALANCE_FREQUENCY_MS, ?MODULE, rebalance_peers), {ok, #state{ workers = queue:from_list(Workers), @@ -109,13 +110,24 @@ handle_cast({task_completed, {read_range, {Worker, _, _}}}, State) -> State2 = update_scheduled_task_count(Worker, read_range, "localhost", -1, State), {noreply, State2}; -handle_cast({task_completed, {sync_range, {Worker, Result, Peer, Duration}}}, State) -> +handle_cast({task_completed, {sync_range, {Worker, Result, Args, ElapsedNative}}}, State) -> + {Start, End, Peer, _} = Args, + DataSize = End - Start, State2 = update_scheduled_task_count(Worker, sync_range, ar_util:format_peer(Peer), -1, State), PeerTasks = get_peer_tasks(Peer, State2), - {PeerTasks2, State3} = complete_sync_range(PeerTasks, Result, Duration, State2), + {PeerTasks2, State3} = complete_sync_range(PeerTasks, Result, ElapsedNative, DataSize, State2), {PeerTasks3, State4} = process_peer_queue(PeerTasks2, State3), {noreply, set_peer_tasks(PeerTasks3, State4)}; +handle_cast(rebalance_peers, State) -> + ar_util:cast_after(?REBALANCE_FREQUENCY_MS, ?MODULE, rebalance_peers), + ?LOG_DEBUG([{event, rebalance_peers}]), + Peers = maps:keys(State#state.peer_tasks), + AllPeerTasks =[ maps:get(Peer, State#state.peer_tasks) || Peer <- Peers], + AllPeerPerformances = ar_peers:get_peer_performances(Peers), + ThroughputTarget = lists:sum([ Performance#performance.rating || Performance <- AllPeerPerformances ]) / length(Peers), + {noreply, rebalance_peers(AllPeerTasks, AllPeerPerformances, ThroughputTarget, State)}; + handle_cast(Cast, State) -> ?LOG_WARNING("event: unhandled_cast, cast: ~p", [Cast]), {noreply, State}. @@ -200,20 +212,20 @@ max_tasks() -> Config#config.sync_jobs * 50. %% @doc The maximum number of tasks we can have queued for a given peer. -max_peer_queue(_PeerTasks, #state{ scheduled_task_count = 0 } = _State) -> +max_peer_queue(_PeerTasks, _Performance, #state{ scheduled_task_count = 0 } = _State) -> undefined; -max_peer_queue(_PeerTasks, #state{ latency_target = 0 } = _State) -> +max_peer_queue(_PeerTasks, _Peformance, #state{ latency_target = 0 } = _State) -> undefined; -max_peer_queue(_PeerTasks, #state{ latency_target = 0.0 } = _State) -> +max_peer_queue(_PeerTasks, _Performance, #state{ latency_target = 0.0 } = _State) -> undefined; -max_peer_queue(#peer_tasks{ latency_ema = 0 } = _PeerTasks, _State) -> +max_peer_queue(_PeerTasks, #performance{ latency = 0 } = _Performance, _State) -> undefined; -max_peer_queue(#peer_tasks{ latency_ema = 0.0 } = _PeerTasks, _State) -> +max_peer_queue(_PeerTasks, #performance{ latency = 0.0 } = _Performance, _State) -> undefined; -max_peer_queue(PeerTasks, State) -> +max_peer_queue(PeerTasks, Performance, State) -> CurActive = PeerTasks#peer_tasks.active_count, - LatencyEMA = PeerTasks#peer_tasks.latency_ema, - SuccessEMA = PeerTasks#peer_tasks.success_ema, + LatencyEMA = Performance#performance.latency, + SuccessEMA = Performance#performance.success, LatencyTarget = State#state.latency_target, ScheduledTasks = State#state.scheduled_task_count, %% estimate of our current total throughput @@ -246,8 +258,6 @@ cut_peer_queue(MaxQueue, PeerTasks, State) -> {peer, ar_util:format_peer(Peer)}, {active_count, PeerTasks#peer_tasks.active_count}, {scheduled_tasks, State#state.scheduled_task_count}, - {success_ema, PeerTasks#peer_tasks.success_ema}, - {latency_ema, PeerTasks#peer_tasks.latency_ema}, {latency_target, State#state.latency_target}, {max_queue, MaxQueue}, {tasks_to_cut, TasksToCut}]), {TaskQueue2, _} = queue:split(MaxQueue, TaskQueue), @@ -316,29 +326,45 @@ schedule_task(Task, Args, State) -> %% Stage 3: record a completed task and update related values (i.e. %% EMA, max_active, peer queue length) %%-------------------------------------------------------------------- -complete_sync_range(PeerTasks, Result, Duration, State) -> - Milliseconds = erlang:convert_time_unit(Duration, native, millisecond) / 1.0, - - IsOK = (Result == ok andalso Milliseconds > 10), - LatencyEMA = trunc(calculate_ema( - PeerTasks#peer_tasks.latency_ema, IsOK, Milliseconds, ?LATENCY_ALPHA)), - SuccessEMA = calculate_ema( - PeerTasks#peer_tasks.success_ema, true, ar_util:bool_to_int(IsOK) / 1.0, - ?SUCCESS_ALPHA), - %% Target Latency is the EMA of all peers' latencies - LatencyTargetAlpha = 2.0 / (State#state.worker_count + 1), %% heuristic - update as needed. - LatencyTarget = trunc(calculate_ema( - State#state.latency_target, IsOK, Milliseconds, LatencyTargetAlpha)), - - PeerTasks2 = PeerTasks#peer_tasks{ latency_ema = LatencyEMA, success_ema = SuccessEMA }, - {PeerTasks3, State2} = cut_peer_queue( - max_peer_queue(PeerTasks2, State), - PeerTasks2, - State), - PeerTasks4 = update_active( - PeerTasks3, IsOK, Milliseconds, State2#state.worker_count, LatencyTarget), - {PeerTasks4, State2#state{ latency_target = LatencyTarget }}. +complete_sync_range(PeerTasks, Result, ElapsedNative, DataSize, State) -> + PeerTasks2 = PeerTasks#peer_tasks{ + active_count = PeerTasks#peer_tasks.active_count - 1 + }, + ar_peers:rate_fetched_data( + PeerTasks2#peer_tasks.peer, chunk, Result, + erlang:convert_time_unit(ElapsedNative, native, microsecond), DataSize, + PeerTasks2#peer_tasks.max_active), + {PeerTasks2, State}. +rebalance_peers([], [], _, State) -> + State; +rebalance_peers( + [PeerTasks | AllPeerTasks], + [Performance | AllPeerPerformances], + ThroughputTarget, + State) -> + {PeerTasks2, State2} = rebalance_peer(PeerTasks, Performance, ThroughputTarget, State), + State3 = set_peer_tasks(PeerTasks2, State2), + rebalance_peers(AllPeerTasks, AllPeerPerformances, ThroughputTarget, State3). + +rebalance_peer(PeerTasks, Performance, ThroughputTarget, State) -> + {PeerTasks2, State2} = cut_peer_queue( + max_peer_queue(PeerTasks, Performance, State), + PeerTasks, + State), + WorkerCount = State2#state.worker_count, + PeerTasks3 = update_active(PeerTasks2, Performance, WorkerCount, ThroughputTarget), + ?LOG_DEBUG([ + {event, update_active}, + {peer, ar_util:format_peer(PeerTasks3#peer_tasks.peer)}, + {before_max, PeerTasks2#peer_tasks.max_active}, + {after_max, PeerTasks3#peer_tasks.max_active}, + {worker_count, WorkerCount}, + {active_count, PeerTasks2#peer_tasks.active_count}, + {throughput_target, ThroughputTarget}, + {latency_ema, Performance#performance.average_latency} + ]), + {PeerTasks3, State2}. %%-------------------------------------------------------------------- %% Helpers @@ -355,11 +381,6 @@ update_scheduled_task_count(Worker, Task, FormattedPeer, N, State) -> }, State2. -calculate_ema(OldEMA, false, _Value, _Alpha) -> - OldEMA; -calculate_ema(OldEMA, true, Value, Alpha) -> - Alpha * Value + (1 - Alpha) * OldEMA. - get_peer_tasks(Peer, State) -> maps:get(Peer, State#state.peer_tasks, #peer_tasks{peer = Peer}). @@ -390,9 +411,9 @@ format_peer(Task, Args) -> ar_util:format_peer(element(3, Args)) end. -update_active(PeerTasks, IsOK, Milliseconds, WorkerCount, LatencyTarget) -> +update_active(PeerTasks, Performance, WorkerCount, ThroughputTarget) -> %% Determine target max_active: - %% 1. Increase max_active when the EMA is less than the threshold + %% 1. Increase max_active when the EthrMA is less than the threshold %% 2. Decrease max_active if the most recent request was slower than the threshold - this %% allows us to respond more quickly to a sudden drop in performance %% @@ -401,23 +422,15 @@ update_active(PeerTasks, IsOK, Milliseconds, WorkerCount, LatencyTarget) -> %% This prevents situations where we have a low number of active tasks and no queue which %% causes each request to complete fast and hikes up the max_active. Then we get a new %% batch of queued tasks and since the max_active is so high we overwhelm the peer. - LatencyEMA = PeerTasks#peer_tasks.latency_ema, MaxActive = PeerTasks#peer_tasks.max_active, - ActiveCount = PeerTasks#peer_tasks.active_count - 1, - TargetMaxActive = case { - IsOK, Milliseconds < LatencyTarget, LatencyEMA < LatencyTarget} of - {false, _, _} -> - %% Always reduce if there was an error - MaxActive-1; - {true, false, _} -> - %% Milliseconds > threshold, decrease max_active + ActiveCount = PeerTasks#peer_tasks.active_count, + TargetMaxActive = case Performance#performance.rating < ThroughputTarget of + false -> + %% throughput > target, decrease max_active MaxActive-1; - {true, true, true} -> - %% Milliseconds < threshold and EMA < threshold, increase max_active. - MaxActive+1; - _ -> - %% Milliseconds < threshold and EMA > threshold, do nothing. - MaxActive + true -> + %% througput < target, increase max_active. + MaxActive+1 end, %% Can't have more active tasks than workers. @@ -429,7 +442,6 @@ update_active(PeerTasks, IsOK, Milliseconds, WorkerCount, LatencyTarget) -> ), %% Can't have less than the minimum. PeerTasks#peer_tasks{ - active_count = ActiveCount, max_active = max(TaskLimitedMaxActive, ?MIN_MAX_ACTIVE) }. diff --git a/apps/arweave/src/ar_events_sup.erl b/apps/arweave/src/ar_events_sup.erl index 634029f66..f5022b031 100644 --- a/apps/arweave/src/ar_events_sup.erl +++ b/apps/arweave/src/ar_events_sup.erl @@ -40,8 +40,7 @@ init([]) -> ?CHILD(ar_events, block, worker), %% Events: unpacked, packed. ?CHILD(ar_events, chunk, worker), - %% Events: made_request, bad_response, served_tx, served_block, served_chunk, - %% gossiped_tx, gossiped_block, banned + %% Events: made_request, bad_response, fetched_tx, fetched_block, fetched_chunk, banned ?CHILD(ar_events, peer, worker), %% Events: initializing, initialized, validated_pre_fork_2_6_block, new_tip, %% checkpoint_block, search_space_upper_bound. diff --git a/apps/arweave/src/ar_header_sync.erl b/apps/arweave/src/ar_header_sync.erl index 62119d821..4c7c4297b 100644 --- a/apps/arweave/src/ar_header_sync.erl +++ b/apps/arweave/src/ar_header_sync.erl @@ -511,10 +511,10 @@ download_block(Peers, H, H2, TXRoot) -> end, case BH of H when Height >= Fork_2_0 -> - ar_events:send(peer, {served_block, Peer, Time, Size}), + ar_peers:rate_fetched_data(Peer, block, Time, Size), download_txs(Peers, B, TXRoot); H2 when Height < Fork_2_0 -> - ar_events:send(peer, {served_block, Peer, Time, Size}), + ar_peers:rate_fetched_data(Peer, block, Time, Size), download_txs(Peers, B, TXRoot); _ -> ?LOG_WARNING([ diff --git a/apps/arweave/src/ar_http.erl b/apps/arweave/src/ar_http.erl index 7862e28bd..f8392bacb 100644 --- a/apps/arweave/src/ar_http.erl +++ b/apps/arweave/src/ar_http.erl @@ -63,6 +63,7 @@ req(Args) -> req(Args, ReestablishedConnection) -> StartTime = erlang:monotonic_time(), #{ peer := Peer, path := Path, method := Method } = Args, + PathLabel = ar_http_iface_server:label_http_path(list_to_binary(Path)), Response = case catch gen_server:call(?MODULE, {get_connection, Args}, infinity) of {ok, PID} -> ar_rate_limiter:throttle(Peer, Path), @@ -89,6 +90,8 @@ req(Args, ReestablishedConnection) -> true -> ok; false -> + Status = ar_metrics:get_status_class(Response), + ElapsedNative = EndTime - StartTime, %% NOTE: the erlang prometheus client looks at the metric name to determine units. %% If it sees _duration_ it assumes the observed value is in %% native units and it converts it to .To query native units, use: @@ -96,9 +99,9 @@ req(Args, ReestablishedConnection) -> %% See: https://github.com/deadtrickster/prometheus.erl/blob/6dd56bf321e99688108bb976283a80e4d82b3d30/src/prometheus_time.erl#L2-L84 prometheus_histogram:observe(ar_http_request_duration_seconds, [ method_to_list(Method), - ar_http_iface_server:label_http_path(list_to_binary(Path)), - ar_metrics:get_status_class(Response) - ], EndTime - StartTime) + PathLabel, + Status + ], ElapsedNative) end, Response. %%% ================================================================== diff --git a/apps/arweave/src/ar_http_iface_client.erl b/apps/arweave/src/ar_http_iface_client.erl index edf74898a..7310bb4c0 100644 --- a/apps/arweave/src/ar_http_iface_client.erl +++ b/apps/arweave/src/ar_http_iface_client.erl @@ -10,7 +10,7 @@ get_wallet_list_chunk/2, get_wallet_list_chunk/3, get_wallet_list/2, add_peer/1, get_info/1, get_info/2, get_peers/1, get_time/2, get_height/1, get_block_index/3, get_sync_record/1, get_sync_record/3, - get_chunk_json/3, get_chunk_binary/3, get_mempool/1, get_sync_buckets/1, + get_chunk_binary/3, get_mempool/1, get_sync_buckets/1, get_recent_hash_list/1, get_recent_hash_list_diff/2, get_reward_history/3, push_nonce_limiter_update/2, get_vdf_update/1, get_vdf_session/1, get_previous_vdf_session/1]). @@ -302,13 +302,7 @@ get_sync_record(Peer, Start, Limit) -> headers => Headers }), Start, Limit). -get_chunk_json(Peer, Offset, RequestedPacking) -> - get_chunk(Peer, Offset, RequestedPacking, json). - get_chunk_binary(Peer, Offset, RequestedPacking) -> - get_chunk(Peer, Offset, RequestedPacking, binary). - -get_chunk(Peer, Offset, RequestedPacking, Encoding) -> PackingBinary = case RequestedPacking of any -> @@ -337,7 +331,7 @@ get_chunk(Peer, Offset, RequestedPacking, Encoding) -> Response = ar_http:req(#{ peer => Peer, method => get, - path => get_chunk_path(Offset, Encoding), + path => "/chunk2/" ++ integer_to_binary(Offset), timeout => 120 * 1000, connect_timeout => 5000, limit => ?MAX_SERIALIZED_CHUNK_PROOF_SIZE, @@ -351,12 +345,7 @@ get_chunk(Peer, Offset, RequestedPacking, Encoding) -> ], erlang:monotonic_time() - StartTime), - handle_chunk_response(Encoding, Response). - -get_chunk_path(Offset, json) -> - "/chunk/" ++ integer_to_binary(Offset); -get_chunk_path(Offset, binary) -> - "/chunk2/" ++ integer_to_binary(Offset). + handle_chunk_response(Response). get_mempool(Peer) -> handle_mempool_response(ar_http:req(#{ @@ -562,41 +551,25 @@ handle_sync_record_response({ok, {{<<"200">>, _}, _, Body, _, _}}, Start, Limit) handle_sync_record_response(Reply, _, _) -> {error, Reply}. -handle_chunk_response(Encoding, {ok, {{<<"200">>, _}, _, Body, Start, End}}) -> - DecodeFun = - case Encoding of - json -> - fun(Bin) -> - ar_serialize:json_map_to_chunk_proof(jiffy:decode(Bin, [return_maps])) - end; - binary -> - fun(Bin) -> - case ar_serialize:binary_to_poa(Bin) of - {ok, Reply} -> - Reply; - {error, Reason} -> - {error, Reason} - end - end - end, - case catch DecodeFun(Body) of +handle_chunk_response({ok, {{<<"200">>, _}, _, Body, _, _}}) -> + case catch ar_serialize:binary_to_poa(Body) of {'EXIT', Reason} -> {error, Reason}; {error, Reason} -> {error, Reason}; - Proof -> + {ok, Proof} -> case maps:get(chunk, Proof) of <<>> -> {error, empty_chunk}; Chunk when byte_size(Chunk) > ?DATA_CHUNK_SIZE -> {error, chunk_bigger_than_256kib}; _ -> - {ok, Proof, End - Start, byte_size(term_to_binary(Proof))} + {ok, Proof} end end; -handle_chunk_response(_Encoding, {error, _} = Response) -> +handle_chunk_response({error, _} = Response) -> Response; -handle_chunk_response(_Encoding, Response) -> +handle_chunk_response(Response) -> {error, Response}. handle_mempool_response({ok, {{<<"200">>, _}, _, Body, _, _}}) -> @@ -669,10 +642,10 @@ handle_get_recent_hash_list_response(Response) -> handle_get_recent_hash_list_diff_response({ok, {{<<"200">>, _}, _, Body, _, _}}, HL, Peer) -> case parse_recent_hash_list_diff(Body, HL) of {error, invalid_input} -> - ar_events:send(peer, {bad_response, {Peer, recent_hash_list_diff, invalid_input}}), + ar_peers:issue_warning(Peer, recent_hash_list_diff, invalid_input), {error, invalid_input}; {error, unknown_base} -> - ar_events:send(peer, {bad_response, {Peer, recent_hash_list_diff, unknown_base}}), + ar_peers:issue_warning(Peer, recent_hash_list_diff, unknown_base), {error, unknown_base}; {ok, Reply} -> {ok, Reply} @@ -837,10 +810,10 @@ get_tx_from_remote_peer(Peer, TXID) -> {peer, ar_util:format_peer(Peer)}, {tx, ar_util:encode(TXID)} ]), - ar_events:send(peer, {bad_response, {Peer, tx, invalid}}), + ar_peers:issue_warning(Peer, tx, invalid), {error, invalid_tx}; true -> - ar_events:send(peer, {served_tx, Peer, Time, Size}), + ar_peers:rate_fetched_data(Peer, tx, Time, Size), TX end; Error -> @@ -992,7 +965,7 @@ handle_block_response(Peer, Encoding, {ok, {{<<"200">>, _}, _, Body, Start, End} ?LOG_INFO( "event: failed_to_parse_block_response, peer: ~s, reason: ~p", [ar_util:format_peer(Peer), Reason]), - ar_events:send(peer, {bad_response, {Peer, block, Reason}}), + ar_peers:issue_warning(Peer, block, Reason), not_found; {ok, B} -> {ok, B, End - Start, byte_size(term_to_binary(B))}; @@ -1002,11 +975,11 @@ handle_block_response(Peer, Encoding, {ok, {{<<"200">>, _}, _, Body, Start, End} ?LOG_INFO( "event: failed_to_parse_block_response, peer: ~s, error: ~p", [ar_util:format_peer(Peer), Error]), - ar_events:send(peer, {bad_response, {Peer, block, Error}}), + ar_peers:issue_warning(Peer, block, Error), not_found end; handle_block_response(Peer, _Encoding, Response) -> - ar_events:send(peer, {bad_response, {Peer, block, Response}}), + ar_peers:issue_warning(Peer, block, Response), not_found. %% @doc Process the response of a GET /unconfirmed_tx call. @@ -1037,14 +1010,14 @@ handle_tx_response(Peer, Encoding, {ok, {{<<"200">>, _}, _, Body, Start, End}}) {ok, TX#tx{ data = <<>> }, End - Start, Size - DataSize} end; {'EXIT', Reason} -> - ar_events:send(peer, {bad_response, {Peer, tx, Reason}}), + ar_peers:issue_warning(Peer, tx, Reason), {error, Reason}; Reply -> - ar_events:send(peer, {bad_response, {Peer, tx, Reply}}), + ar_peers:issue_warning(Peer, tx, Reply), Reply end; handle_tx_response(Peer, _Encoding, Response) -> - ar_events:send(peer, {bad_response, {Peer, tx, Response}}), + ar_peers:issue_warning(Peer, tx, Response), {error, Response}. p2p_headers() -> diff --git a/apps/arweave/src/ar_http_iface_middleware.erl b/apps/arweave/src/ar_http_iface_middleware.erl index 92dd6962f..06a1d0801 100644 --- a/apps/arweave/src/ar_http_iface_middleware.erl +++ b/apps/arweave/src/ar_http_iface_middleware.erl @@ -1830,8 +1830,7 @@ handle_post_tx_accepted(Req, TX, Peer) -> %% of excessive transaction volumes. {A, B, C, D, _} = Peer, ar_blacklist_middleware:decrement_ip_addr({A, B, C, D}, Req), - ar_events:send(peer, {gossiped_tx, Peer, erlang:get(read_body_time), - erlang:get(body_size)}), + ar_peers:rate_gossiped_data(Peer, byte_size(term_to_binary(TX))), ar_events:send(tx, {new, TX, Peer}), TXID = TX#tx.id, ar_ignore_registry:remove_temporary(TXID), @@ -1916,7 +1915,7 @@ handle_get_chunk(OffsetBinary, Req, Encoding) -> {Packing, ok}; {{true, _}, _StoreID} -> {ok, Config} = application:get_env(arweave, config), - case lists:member(pack_served_chunks, Config#config.enable) of + case lists:member(pack_fetched_chunks, Config#config.enable) of false -> {none, {reply, {404, #{}, <<>>, Req}}}; true -> @@ -2318,9 +2317,6 @@ post_block(read_body, Peer, {Req, Pid, Encoding}, ReceiveTimestamp) -> {error, _} -> {400, #{}, <<"Invalid block.">>, Req2}; {ok, BShadow} -> - ReadBodyTime = timer:now_diff(erlang:timestamp(), ReceiveTimestamp), - erlang:put(read_body_time, ReadBodyTime), - erlang:put(body_size, byte_size(term_to_binary(BShadow))), post_block(check_transactions_are_present, {BShadow, Peer}, Req2, ReceiveTimestamp) end; @@ -2343,7 +2339,7 @@ post_block(check_transactions_are_present, {BShadow, Peer}, Req, ReceiveTimestam _ -> % POST /block; do not reject for backwards-compatibility post_block(enqueue_block, {BShadow, Peer}, Req, ReceiveTimestamp) end; -post_block(enqueue_block, {B, Peer}, Req, Timestamp) -> +post_block(enqueue_block, {B, Peer}, Req, ReceiveTimestamp) -> B2 = case B#block.height >= ar_fork:height_2_6() of true -> @@ -2362,8 +2358,7 @@ post_block(enqueue_block, {B, Peer}, Req, Timestamp) -> end end, ?LOG_INFO([{event, received_block}, {block, ar_util:encode(B#block.indep_hash)}]), - ar_block_pre_validator:pre_validate(B2, Peer, Timestamp, erlang:get(read_body_time), - erlang:get(body_size)), + ar_block_pre_validator:pre_validate(B2, Peer, undefined, ReceiveTimestamp), {200, #{}, <<"OK">>, Req}. encode_txids([]) -> @@ -2706,21 +2701,20 @@ post_tx_parse_id(check_ignore_list, {TXID, Req, Pid, Encoding}) -> post_tx_parse_id(read_body, {TXID, Req, Pid, Encoding}) end; post_tx_parse_id(read_body, {TXID, Req, Pid, Encoding}) -> - Timestamp = erlang:timestamp(), case read_complete_body(Req, Pid) of {ok, Body, Req2} -> case Encoding of json -> - post_tx_parse_id(parse_json, {TXID, Req2, Body, Timestamp}); + post_tx_parse_id(parse_json, {TXID, Req2, Body}); binary -> - post_tx_parse_id(parse_binary, {TXID, Req2, Body, Timestamp}) + post_tx_parse_id(parse_binary, {TXID, Req2, Body}) end; {error, body_size_too_large} -> {error, body_size_too_large, Req}; {error, timeout} -> {error, timeout} end; -post_tx_parse_id(parse_json, {TXID, Req, Body, Timestamp}) -> +post_tx_parse_id(parse_json, {TXID, Req, Body}) -> case catch ar_serialize:json_struct_to_tx(Body) of {'EXIT', _} -> case TXID of @@ -2748,12 +2742,9 @@ post_tx_parse_id(parse_json, {TXID, Req, Body, Timestamp}) -> end, {error, invalid_json, Req}; TX -> - Time = timer:now_diff(erlang:timestamp(), Timestamp), - erlang:put(read_body_time, Time), - erlang:put(body_size, byte_size(term_to_binary(TX))), post_tx_parse_id(verify_id_match, {TXID, Req, TX}) end; -post_tx_parse_id(parse_binary, {TXID, Req, Body, Timestamp}) -> +post_tx_parse_id(parse_binary, {TXID, Req, Body}) -> case catch ar_serialize:binary_to_tx(Body) of {'EXIT', _} -> case TXID of @@ -2772,9 +2763,6 @@ post_tx_parse_id(parse_binary, {TXID, Req, Body, Timestamp}) -> end, {error, invalid_json, Req}; {ok, TX} -> - Time = timer:now_diff(erlang:timestamp(), Timestamp), - erlang:put(read_body_time, Time), - erlang:put(body_size, byte_size(term_to_binary(TX))), post_tx_parse_id(verify_id_match, {TXID, Req, TX}) end; post_tx_parse_id(verify_id_match, {MaybeTXID, Req, TX}) -> diff --git a/apps/arweave/src/ar_http_iface_server.erl b/apps/arweave/src/ar_http_iface_server.erl index cdb6cc2a1..e8430775a 100644 --- a/apps/arweave/src/ar_http_iface_server.erl +++ b/apps/arweave/src/ar_http_iface_server.erl @@ -5,7 +5,7 @@ -module(ar_http_iface_server). -export([start/0, stop/0]). --export([split_path/1, label_http_path/1]). +-export([split_path/1, label_http_path/1, label_req/1]). -include_lib("arweave/include/ar.hrl"). -include_lib("arweave/include/ar_config.hrl"). @@ -45,6 +45,10 @@ label_http_path(Path) when is_list(Path) -> label_http_path(Path) -> label_http_path(split_path(Path)). +label_req(Req) -> + SplitPath = ar_http_iface_server:split_path(cowboy_req:path(Req)), + ar_http_iface_server:label_http_path(SplitPath). + %%%=================================================================== %%% Private functions. %%%=================================================================== @@ -309,10 +313,14 @@ name_route([<<"hash_list">>, _From, _To]) -> name_route([<<"hash_list2">>, _From, _To]) -> "/hash_list2/{from}/{to}"; +name_route([<<"block">>]) -> + "/block"; name_route([<<"block">>, <<"hash">>, _IndepHash]) -> "/block/hash/{indep_hash}"; name_route([<<"block">>, <<"height">>, _Height]) -> "/block/height/{height}"; +name_route([<<"block2">>]) -> + "/block2"; name_route([<<"block2">>, <<"hash">>, _IndepHash]) -> "/block2/hash/{indep_hash}"; name_route([<<"block2">>, <<"height">>, _Height]) -> diff --git a/apps/arweave/src/ar_http_req.erl b/apps/arweave/src/ar_http_req.erl index ffe3a03c6..158ad871e 100644 --- a/apps/arweave/src/ar_http_req.erl +++ b/apps/arweave/src/ar_http_req.erl @@ -10,6 +10,7 @@ body(Req, SizeLimit) -> not_set -> read_complete_body(Req, #{ acc => [], counter => 0, size_limit => SizeLimit }); Body -> + ?LOG_DEBUG([{event, cached_completed_body}, {size, byte_size(Body)}, {path, ar_http_iface_server:label_req(Req)}]), {ok, Body, Req} end. @@ -42,6 +43,7 @@ read_complete_body(more, Data, Req) -> read_complete_body(Req, Data); read_complete_body(ok, #{ acc := Acc }, Req) -> Body = iolist_to_binary(Acc), + ?LOG_DEBUG([{event, read_completed_body}, {size, byte_size(Body)}, {path, ar_http_iface_server:label_req(Req)}]), {ok, Body, with_body_req_field(Req, Body)}. with_body_req_field(Req, Body) -> diff --git a/apps/arweave/src/ar_network_middleware.erl b/apps/arweave/src/ar_network_middleware.erl index 34ab96416..7b0402b65 100644 --- a/apps/arweave/src/ar_network_middleware.erl +++ b/apps/arweave/src/ar_network_middleware.erl @@ -31,7 +31,7 @@ maybe_add_peer(Peer, Req) -> not_set -> ok; _ -> - ar_events:send(peer, {made_request, Peer, get_release(Req)}) + ar_peers:add_peer(Peer, get_release(Req)) end. wrong_network(Req) -> diff --git a/apps/arweave/src/ar_p3_config.erl b/apps/arweave/src/ar_p3_config.erl index 1ab5388b5..7ae3034fc 100644 --- a/apps/arweave/src/ar_p3_config.erl +++ b/apps/arweave/src/ar_p3_config.erl @@ -80,8 +80,7 @@ get_payments_value(P3Config, Asset, Field) when end. get_service_config(P3Config, Req) -> - SplitPath = ar_http_iface_server:split_path(cowboy_req:path(Req)), - Path = ar_http_iface_server:label_http_path(SplitPath), + Path = ar_http_iface_server:label_req(Req), case Path of undefined -> undefined; diff --git a/apps/arweave/src/ar_peers.erl b/apps/arweave/src/ar_peers.erl index c2e50a0fa..4ffbe2e2d 100644 --- a/apps/arweave/src/ar_peers.erl +++ b/apps/arweave/src/ar_peers.erl @@ -5,12 +5,15 @@ -include_lib("arweave/include/ar.hrl"). -include_lib("arweave/include/ar_config.hrl"). +-include_lib("arweave/include/ar_peers.hrl"). -include_lib("eunit/include/eunit.hrl"). --export([start_link/0, get_peers/0, get_trusted_peers/0, is_public_peer/1, - get_peer_release/1, stats/0, discover_peers/0, rank_peers/1, - resolve_and_cache_peer/2]). +-export([start_link/0, get_peers/0, get_peer_performances/1, get_trusted_peers/0, is_public_peer/1, + get_peer_release/1, stats/0, discover_peers/0, add_peer/2, rank_peers/1, + resolve_and_cache_peer/2, rate_response/4, rate_fetched_data/4, rate_fetched_data/6, + rate_gossiped_data/3 +]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -36,21 +39,44 @@ %% The number of failed requests in a row we tolerate before dropping the peer. -define(TOLERATE_FAILURE_COUNT, 20). +-define(MINIMUM_SUCCESS, 0.5). +-define(THROUGHPUT_ALPHA, 0.1). +-define(SUCCESS_ALPHA, 0.01). + +%% When processing block rejected events for blocks received from a peer, we handle rejections +%% differently based on the rejection reason. +-define(BLOCK_REJECTION_WARNING, [ + failed_to_fetch_first_chunk, + failed_to_fetch_second_chunk, + failed_to_fetch_chunk +]). +-define(BLOCK_REJECTION_BAN, [ + invalid_previous_solution_hash, + invalid_last_retarget, + invalid_difficulty, + invalid_cumulative_difficulty, + invalid_hash_preimage, + invalid_nonce_limiter_seed_data, + invalid_partition_number, + invalid_nonce, + invalid_pow, + invalid_poa, + invalid_poa2, + invalid_nonce_limiter +]). +-define(BLOCK_REJECTION_IGNORE, [ + invalid_signature, + invalid_hash, + invalid_timestamp, + invalid_resigned_solution_hash, + invalid_nonce_limiter_cache_mismatch +]). %% We only do scoring of this many TCP ports per IP address. When there are not enough slots, %% we remove the peer from the first slot. -define(DEFAULT_PEER_PORT_MAP, {empty_slot, empty_slot, empty_slot, empty_slot, empty_slot, empty_slot, empty_slot, empty_slot, empty_slot, empty_slot}). --record(performance, { - bytes = 0, - time = 0, - transfers = 0, - failures = 0, - rating = 0, - release = -1 -}). - -record(state, {}). %%%=================================================================== @@ -71,6 +97,9 @@ get_peers() -> Peers end. +get_peer_performances(Peers) -> + [get_or_init_performance(Peer) || Peer <- Peers]. + -if(?NETWORK_NAME == "arweave.N.1"). get_trusted_peers() -> {ok, Config} = application:get_env(arweave, config), @@ -78,7 +107,8 @@ get_trusted_peers() -> [] -> ArweavePeers = ["sfo-1.na-west-1.arweave.net", "ams-1.eu-central-1.arweave.net", "fra-1.eu-central-2.arweave.net", "blr-1.ap-central-1.arweave.net", - "sgp-1.ap-central-2.arweave.net"], + "sgp-1.ap-central-2.arweave.net" + ], resolve_peers(ArweavePeers); Peers -> Peers @@ -96,8 +126,7 @@ resolve_peers([RawPeer | Peers]) -> {ok, Peer} -> [Peer | resolve_peers(Peers)]; {error, invalid} -> - ?LOG_WARNING([{event, failed_to_resolve_trusted_peer}, - {peer, RawPeer}]), + ?LOG_WARNING([{event, failed_to_resolve_trusted_peer}, {peer, RawPeer}]), resolve_peers(Peers) end. @@ -148,18 +177,53 @@ get_peer_release(Peer) -> -1 end. +rate_response({_Host, _Port}, _, _, _) -> + %% Only track requests for IP-based peers as the rest of the stack assumes an IP-based peer. + ok; +rate_response(Peer, PathLabel, get, Response) -> + gen_server:cast( + ?MODULE, {rate_response, Peer, PathLabel, get, ar_metrics:get_status_class(Response)} + ); +rate_response(_Peer, _PathLabel, _Method, _Response) -> + ok. + +rate_fetched_data(Peer, DataType, LatencyMicroseconds, DataSize) -> + rate_fetched_data(Peer, DataType, ok, LatencyMicroseconds, DataSize, 1). +rate_fetched_data(Peer, DataType, ok, LatencyMicroseconds, DataSize, Concurrency) -> + gen_server:cast(?MODULE, + {fetched_data, Peer, DataType, LatencyMicroseconds, DataSize, Concurrency}); +rate_fetched_data(Peer, DataType, _, _LatencyMicroseconds, _DataSize, _Concurrency) -> + gen_server:cast(?MODULE, {invalid_fetched_data, Peer, DataType}). + +rate_gossiped_data(Peer, DataType, DataSize) -> + gen_server:cast(?MODULE, {gossiped_data, Peer, DataType, DataSize}). + +issue_warning(Peer, _Type, _Reason) -> + gen_server:cast(?MODULE, {warning, Peer}). + +add_peer(Peer, Release) -> + gen_server:cast(?MODULE, {add_peer, Peer, Release}). + %% @doc Print statistics about the current peers. stats() -> Connected = get_peers(), io:format("Connected peers, in preference order:~n"), stats(Connected), io:format("Other known peers:~n"), - All = ets:foldl(fun({{peer, Peer}, _}, Acc) -> [Peer | Acc]; - (_, Acc) -> Acc end, [], ?MODULE), + All = ets:foldl( + fun + ({{peer, Peer}, _}, Acc) -> [Peer | Acc]; + (_, Acc) -> Acc + end, + [], + ?MODULE + ), stats(All -- Connected). stats(Peers) -> - lists:foreach(fun(Peer) -> format_stats(Peer, get_or_init_performance(Peer)) end, - Peers). + lists:foreach( + fun(Peer) -> format_stats(Peer, get_or_init_performance(Peer)) end, + Peers + ). discover_peers() -> case ets:lookup(?MODULE, peers) of @@ -201,7 +265,7 @@ resolve_and_cache_peer(RawPeer, Type) -> init([]) -> process_flag(trap_exit, true), - [ok, ok] = ar_events:subscribe([peer, block]), + [ok, ok] = ar_events:subscribe(block), load_peers(), gen_server:cast(?MODULE, rank_peers), gen_server:cast(?MODULE, ping_peers), @@ -213,36 +277,23 @@ handle_call(Request, _From, State) -> {reply, ok, State}. handle_cast({add_peer, Peer, Release}, State) -> - may_be_rotate_peer_ports(Peer), - case ets:lookup(?MODULE, {peer, Peer}) of - [{_, #performance{ release = Release }}] -> - ok; - [{_, Performance}] -> - ets:insert(?MODULE, {{peer, Peer}, - Performance#performance{ release = Release }}); - [] -> - ets:insert(?MODULE, {{peer, Peer}, #performance{ release = Release }}) - end, + maybe_add_peer(Peer, Release), {noreply, State}; handle_cast(rank_peers, State) -> - Total = - case ets:lookup(?MODULE, rating_total) of - [] -> - 0; - [{_, T}] -> - T - end, + Total = get_total_rating(), Peers = ets:foldl( - fun ({{peer, Peer}, Performance}, Acc) -> + fun + ({{peer, Peer}, Performance}, Acc) -> %% Bigger score increases the chances to end up on the top %% of the peer list, but at the same time the ranking is %% probabilistic to always give everyone a chance to improve %% in the competition (i.e., reduce the advantage gained by %% being the first to earn a reputation). - Score = rand:uniform() * Performance#performance.rating - / (Total + 0.0001), + Score = + rand:uniform() * Performance#performance.rating / + (Total + 0.0001), [{Peer, Score} | Acc]; (_, Acc) -> Acc @@ -253,6 +304,7 @@ handle_cast(rank_peers, State) -> prometheus_gauge:set(arweave_peer_count, length(Peers)), ets:insert(?MODULE, {peers, lists:sublist(rank_peers(Peers), ?MAX_PEERS)}), ar_util:cast_after(?RANK_PEERS_FREQUENCY_MS, ?MODULE, rank_peers), + stats(), {noreply, State}; handle_cast(ping_peers, State) -> @@ -260,81 +312,125 @@ handle_cast(ping_peers, State) -> ping_peers(lists:sublist(Peers, 100)), {noreply, State}; -handle_cast(Cast, State) -> - ?LOG_WARNING("event: unhandled_cast, cast: ~p", [Cast]), - {noreply, State}. - -handle_info({event, peer, {made_request, Peer, Release}}, State) -> - may_be_rotate_peer_ports(Peer), - case ets:lookup(?MODULE, {peer, Peer}) of - [{_, #performance{ release = Release }}] -> +handle_cast({rate_response, Peer, PathLabel, get, Status}, State) -> + case Status of + "success" -> + update_rating(Peer, true); + "redirection" -> + %% don't update rating ok; - [{_, Performance}] -> - ets:insert(?MODULE, {{peer, Peer}, - Performance#performance{ release = Release }}); - [] -> - case check_external_peer(Peer) of - ok -> - ets:insert(?MODULE, {{peer, Peer}, - #performance{ release = Release }}); - _ -> - ok - end + "client-error" -> + %% don't update rating + ok; + _ -> + update_rating(Peer, false) end, + ?LOG_DEBUG([ + {event, update_rating}, + {update_type, response}, + {path, PathLabel}, + {status, Status}, + {peer, ar_util:format_peer(Peer)} + ]), {noreply, State}; -handle_info({event, peer, {served_tx, Peer, TimeDelta, Size}}, State) -> - update_rating(Peer, TimeDelta, Size), +handle_cast({fetched_data, Peer, DataType, LatencyMicroseconds, DataSize, Concurrency}, State) -> + ?LOG_DEBUG([ + {event, update_rating}, + {update_type, fetched_data}, + {data_type, DataType}, + {peer, ar_util:format_peer(Peer)}, + {latency, LatencyMicroseconds / 1000}, + {data_size, DataSize}, + {concurrency, Concurrency} + ]), + update_rating(Peer, LatencyMicroseconds, DataSize, Concurrency, true), {noreply, State}; -handle_info({event, peer, {served_block, Peer, TimeDelta, Size}}, State) -> - update_rating(Peer, TimeDelta, Size), - {noreply, State}; -handle_info({event, peer, {gossiped_tx, Peer, TimeDelta, Size}}, State) -> - %% Only the first peer who sent the given transaction is rated. - %% Otherwise, one may exploit the endpoint to gain reputation. - case check_external_peer(Peer) of - ok -> - update_rating(Peer, TimeDelta, Size); - _ -> - ok - end, +handle_cast({invalid_fetched_data, Peer, DataType}, State) -> + ?LOG_DEBUG([ + {event, update_rating}, + {update_type, invalid_fetched_data}, + {data_type, DataType}, + {peer, ar_util:format_peer(Peer)} + ]), + update_rating(Peer, false), {noreply, State}; -handle_info({event, peer, {gossiped_block, Peer, TimeDelta, Size}}, State) -> - %% Only the first peer who sent the given block is rated. - %% Otherwise, one may exploit the endpoint to gain reputation. - case check_external_peer(Peer) of +handle_cast({gossiped_data, Peer, DataType, DataSize}, State) -> + case check_peer(Peer) of ok -> - update_rating(Peer, TimeDelta, Size); + %% Since gossiped data is pushed to us we don't know the latency, but we do want + %% to incentivize peers to gossip data quickly and frequently, so we will assign + %% a latency that is guaranteed to improve the peer's rating: + %% 1. Calculate the latency that would be required to transfer DataSize bytes at the + %% peer's current average rate. + %% 2. Scale that latency by ?GOSSIP_ADVANTAGE and rate using the scaled latency + Performance = get_or_init_performance(Peer), + #performance{ + average_bytes = AverageBytes, + average_latency = AverageLatency + } = Performance, + AverageThroughput = AverageBytes / AverageLatency, + GossipLatency = (DataSize / AverageThroughput) * ?GOSSIP_ADVANTAGE, + LatencyMicroseconds = GossipLatency * 1000, + ?LOG_DEBUG([ + {event, update_rating}, + {update_type, gossiped_data}, + {data_type, DataType}, + {peer, ar_util:format_peer(Peer)}, + {latency, LatencyMicroseconds / 1000}, + {data_size, DataSize} + ]), + update_rating(Peer, LatencyMicroseconds, DataSize, 1, true); _ -> ok end, - {noreply, State}; -handle_info({event, peer, {served_chunk, Peer, TimeDelta, Size}}, State) -> - update_rating(Peer, TimeDelta, Size), {noreply, State}; -handle_info({event, peer, {bad_response, {Peer, _Type, _Reason}}}, State) -> - issue_warning(Peer), +handle_cast({warning, Peer}, State) -> + Performance = update_rating(Peer, false), + case Performance#performance.average_success < ?MINIMUM_SUCCESS of + true -> + remove_peer(Peer); + false -> + ok + end, {noreply, State}; -handle_info({event, peer, {banned, BannedPeer}}, State) -> - remove_peer(BannedPeer), - {noreply, State}; +handle_cast(Cast, State) -> + ?LOG_WARNING("event: unhandled_cast, cast: ~p", [Cast]), + {noreply, State}. -handle_info({event, block, {rejected, failed_to_fetch_first_chunk, _H, Peer}}, State) -> - issue_warning(Peer), - {noreply, State}; +handle_info({event, block, {rejected, Reason, _H, Peer}}, State) when Peer /= no_peer -> + IssueBan = lists:member(Reason, ?BLOCK_REJECTION_BAN), + IssueWarning = lists:member(Reason, ?BLOCK_REJECTION_WARNING), + Ignore = lists:member(Reason, ?BLOCK_REJECTION_IGNORE), -handle_info({event, block, {rejected, failed_to_fetch_second_chunk, _H, Peer}}, State) -> - issue_warning(Peer), + case {IssueBan, IssueWarning, Ignore} of + {true, false, false} -> + ar_blacklist_middleware:ban_peer(Peer, ?BAD_BLOCK_BAN_TIME), + remove_peer(Peer); + {false, true, false} -> + issue_warning(Peer, block_rejected, Reason); + {false, false, true} -> + %% ignore + ok; + _ -> + %% Ever reason should be in exactly 1 list. + error("invalid block rejection reason") + end, {noreply, State}; -handle_info({event, block, {rejected, failed_to_fetch_chunk, _H, Peer}}, State) -> - issue_warning(Peer), +handle_info({event, block, {new, B, + #{ source := {peer, Peer}, query_block_time := QueryBlockTime }}}, State) -> + DataSize = byte_size(term_to_binary(B)), + case QueryBlockTime of + undefined -> ar_peers:rate_gossiped_data(Peer, block, DataSize); + _ -> ar_peers:rate_fetched_data(Peer, block, QueryBlockTime, DataSize) + end, {noreply, State}; handle_info({event, block, _}, State) -> @@ -360,6 +456,28 @@ get_peer_peers(Peer) -> Peers -> Peers end. +get_or_init_performance(Peer) -> + case ets:lookup(?MODULE, {peer, Peer}) of + [] -> + #performance{}; + [{_, Performance}] -> + Performance + end. + +set_performance(Peer, Performance) -> + ets:insert(?MODULE, [{{peer, Peer}, Performance}]). + +get_total_rating() -> + case ets:lookup(?MODULE, rating_total) of + [] -> + 0; + [{_, Total}] -> + Total + end. + +set_total_rating(Total) -> + ets:insert(?MODULE, {rating_total, Total}). + discover_peers([]) -> ok; discover_peers([Peer | Peers]) -> @@ -367,28 +485,27 @@ discover_peers([Peer | Peers]) -> true -> ok; false -> - IsPublic = is_public_peer(Peer), - IsBanned = ar_blacklist_middleware:is_peer_banned(Peer) == banned, - IsBlacklisted = lists:member(Peer, ?PEER_PERMANENT_BLACKLIST), - case IsPublic andalso not IsBanned andalso not IsBlacklisted of - false -> - ok; - true -> + case check_peer(Peer, is_public_peer(Peer)) of + ok -> case ar_http_iface_client:get_info(Peer, release) of {<<"release">>, Release} when is_integer(Release) -> - gen_server:cast(?MODULE, {add_peer, Peer, Release}); + maybe_add_peer(Peer, Release); _ -> ok - end + end; + _ -> + ok end end, discover_peers(Peers). format_stats(Peer, Perf) -> - io:format("\t~s ~.2f kB/s (~p transfers, ~B failures)~n", - [string:pad(ar_util:format_peer(Peer), 20, trailing, $ ), - (Perf#performance.bytes / 1024) / ((Perf#performance.time + 1) / 1000000), - Perf#performance.transfers, Perf#performance.failures]). + KB = Perf#performance.average_bytes / 1024, + io:format( + "\t~s ~.2f kB/s (~.2f kB, ~B latency, ~.2f success, ~p transfers)~n", + [string:pad(ar_util:format_peer(Peer), 21, trailing, $\s), + float(Perf#performance.rating), KB, trunc(Perf#performance.average_latency), + Perf#performance.average_success, Perf#performance.transfers]). load_peers() -> case ar_storage:read_term(peers) of @@ -400,10 +517,10 @@ load_peers() -> load_peers(Records), TotalRating = ets:foldl( - fun ({{peer_ip, _IP}, _}, Acc) -> - Acc; - ({{peer, _Peer}, Performance}, Acc) -> - Acc + Performance#performance.rating + fun ({{peer, _Peer}, Performance}, Acc) -> + Acc + Performance#performance.rating; + (_, Acc) -> + Acc end, 0, ?MODULE @@ -428,14 +545,32 @@ load_peer({Peer, Performance}) -> <> -> may_be_rotate_peer_ports(Peer), case Performance of - {performance, Bytes, Time, Transfers, Failures, Rating} -> + {performance, TotalBytes, TotalLatency, Transfers, _Failures, Rating} -> %% For compatibility with a few nodes already storing the records %% without the release field. - ets:insert(?MODULE, {{peer, Peer}, #performance{ bytes = Bytes, - time = Time, transfers = Transfers, failures = Failures, - rating = Rating, release = -1 }}); - _ -> - ets:insert(?MODULE, {{peer, Peer}, Performance}) + set_performance(Peer, #performance{ + total_bytes = TotalBytes, + total_latency = TotalLatency, + transfers = Transfers, + rating = Rating + }); + {performance, TotalBytes, TotalLatency, Transfers, _Failures, Rating, Release} -> + %% For compatibility with nodes storing records from before the introduction of + %% the version field + set_performance(Peer, #performance{ + release = Release, + total_bytes = TotalBytes, + total_latency = TotalLatency, + transfers = Transfers, + rating = Rating + }); + {performance, 3, + _Release, _AverageBytes, _TotalBytes, _AverageLatency, _TotalLatency, + _Transfers, _AverageSuccess, _Rating} -> + %% Going forward whenever we change the #performance record we should increment the + %% version field so we can match on it when doing a load. Here we're handling the + %% version 3 format. + set_performance(Peer, Performance) end, ok; Network -> @@ -449,7 +584,8 @@ may_be_rotate_peer_ports(Peer) -> case ets:lookup(?MODULE, {peer_ip, IP}) of [] -> ets:insert(?MODULE, {{peer_ip, IP}, - {erlang:setelement(1, ?DEFAULT_PEER_PORT_MAP, Port), 1}}); + {erlang:setelement(1, ?DEFAULT_PEER_PORT_MAP, Port), 1}} + ); [{_, {PortMap, Position}}] -> case is_in_port_map(Port, PortMap) of {true, _} -> @@ -460,7 +596,7 @@ may_be_rotate_peer_ports(Peer) -> true -> ets:insert(?MODULE, {{peer_ip, IP}, {erlang:setelement(Position + 1, PortMap, Port), - Position + 1}}); + Position + 1}}); false -> RemovedPeer = construct_peer(IP, element(1, PortMap)), PortMap2 = shift_port_map_left(PortMap), @@ -527,7 +663,8 @@ is_loopback_ip({_, _, _, _}) -> false. %% @doc Return a ranked list of peers. rank_peers(ScoredPeers) -> SortedReversed = lists:reverse( - lists:sort(fun({_, S1}, {_, S2}) -> S1 >= S2 end, ScoredPeers)), + lists:sort(fun({_, S1}, {_, S2}) -> S1 >= S2 end, ScoredPeers) + ), GroupedBySubnet = lists:foldl( fun({{A, B, _C, _D, _Port}, _Score} = Peer, Acc) -> @@ -552,59 +689,110 @@ rank_peers(ScoredPeers) -> [], GroupedBySubnet ), - [Peer || {Peer, _} <- lists:sort(fun({_, S1}, {_, S2}) -> S1 >= S2 end, - ScoredSubnetPeers)]. - -check_external_peer(Peer) -> - IsLoopbackIP = is_loopback_ip(Peer), + [Peer || {Peer, _} <- lists:sort( + fun({_, S1}, {_, S2}) -> S1 >= S2 end, + ScoredSubnetPeers + )]. + +check_peer(Peer) -> + check_peer(Peer, not is_loopback_ip(Peer)). +check_peer(Peer, IsPeerScopeValid) -> IsBlacklisted = lists:member(Peer, ?PEER_PERMANENT_BLACKLIST), IsBanned = ar_blacklist_middleware:is_peer_banned(Peer) == banned, - case {IsLoopbackIP, IsBlacklisted, IsBanned} of - {true, _, _} -> - reject; - {_, true, _} -> - reject; - {_, _, true} -> - reject; - _ -> - ok + case IsPeerScopeValid andalso not IsBlacklisted andalso not IsBanned of + true -> + ok; + false -> + reject end. -update_rating(Peer, TimeDelta, Size) -> +update_rating(Peer, IsSuccess) -> + update_rating(Peer, undefined, undefined, 1, IsSuccess). +update_rating(Peer, LatencyMicroseconds, DataSize, Concurrency, IsSuccess) -> Performance = get_or_init_performance(Peer), Total = get_total_rating(), - #performance{ bytes = Bytes, time = Time, - rating = Rating, transfers = N } = Performance, - Bytes2 = Bytes + Size, - Time2 = Time + TimeDelta / 1000, - Performance2 = Performance#performance{ bytes = Bytes2, time = Time2, - rating = Rating2 = Bytes2 / (Time2 + 1), failures = 0, transfers = N + 1 }, + LatencyMilliseconds = LatencyMicroseconds / 1000, + #performance{ + average_bytes = AverageBytes, + total_bytes = TotalBytes, + average_latency = AverageLatency, + total_latency = TotalLatency, + average_success = AverageSuccess, + rating = Rating, + transfers = Transfers + } = Performance, + TotalBytes2 = case DataSize of + undefined -> TotalBytes; + _ -> TotalBytes + DataSize + end, + %% AverageBytes is the average number of bytes transferred during the AverageLatency time + %% period. In order to approximate the impact of multiple concurrent requests we multiply + %% DataSize by the Concurrency value. We do this *only* when updating the AverageBytes + %% value so that it doesn't distort the TotalBytes. + AverageBytes2 = case DataSize of + undefined -> AverageBytes; + _ -> calculate_ema(AverageBytes, (DataSize * Concurrency), ?THROUGHPUT_ALPHA) + end, + TotalLatency2 = case LatencyMilliseconds of + undefined -> TotalLatency; + _ -> TotalLatency + LatencyMilliseconds + end, + AverageLatency2 = case LatencyMilliseconds of + undefined -> AverageLatency; + _ -> calculate_ema(AverageLatency, LatencyMilliseconds, ?THROUGHPUT_ALPHA) + end, + Transfers2 = case DataSize of + undefined -> Transfers; + _ -> Transfers + 1 + end, + AverageSuccess2 = calculate_ema(AverageSuccess, ar_util:bool_to_int(IsSuccess), ?SUCCESS_ALPHA), + %% Rating is an estimate of the peer's effective throughput in bytes per second. + Rating2 = (AverageBytes2 / AverageLatency2) * AverageSuccess2, + Performance2 = Performance#performance{ + average_bytes = AverageBytes2, + total_bytes = TotalBytes2, + average_latency = AverageLatency2, + total_latency = TotalLatency2, + average_success = AverageSuccess2, + rating = Rating2, + transfers = Transfers2 + }, Total2 = Total - Rating + Rating2, may_be_rotate_peer_ports(Peer), - ets:insert(?MODULE, [{{peer, Peer}, Performance2}, {rating_total, Total2}]). + set_performance(Peer, Performance2), + set_total_rating(Total2), + Performance2. -get_or_init_performance(Peer) -> +calculate_ema(OldEMA, Value, Alpha) -> + Alpha * Value + (1 - Alpha) * OldEMA. + +maybe_add_peer(Peer, Release) -> + may_be_rotate_peer_ports(Peer), case ets:lookup(?MODULE, {peer, Peer}) of - [] -> - #performance{}; + [{_, #performance{ release = Release }}] -> + ok; [{_, Performance}] -> - Performance - end. - -get_total_rating() -> - case ets:lookup(?MODULE, rating_total) of + set_performance(Peer, Performance#performance{ release = Release }); [] -> - 0; - [{_, Total}] -> - Total + case check_peer(Peer) of + ok -> + set_performance(Peer, #performance{ release = Release }); + _ -> + ok + end end. remove_peer(RemovedPeer) -> - Total = get_total_rating(), + ?LOG_DEBUG([ + {event, remove_peer}, + {peer, ar_util:format_peer(RemovedPeer)} + ]), Performance = get_or_init_performance(RemovedPeer), - ets:insert(?MODULE, {rating_total, Total - Performance#performance.rating}), + Total = get_total_rating(), + set_total_rating(Total - Performance#performance.rating), ets:delete(?MODULE, {peer, RemovedPeer}), - remove_peer_port(RemovedPeer). + remove_peer_port(RemovedPeer), + ar_events:send(peer, {removed, RemovedPeer}). remove_peer_port(Peer) -> {IP, Port} = get_ip_port(Peer), @@ -640,33 +828,22 @@ is_port_map_empty(PortMap, Max, N) -> end. store_peers() -> - case ets:lookup(?MODULE, rating_total) of + Records = + ets:foldl( + fun + ({{peer, Peer}, Performance}, Acc) -> + [{Peer, Performance} | Acc]; + (_, Acc) -> + Acc + end, + [], + ?MODULE + ), + case Records of [] -> ok; - [{_, Total}] -> - Records = - ets:foldl( - fun ({{peer, Peer}, Performance}, Acc) -> - [{Peer, Performance} | Acc]; - (_, Acc) -> - Acc - end, - [], - ?MODULE - ), - ar_storage:write_term(peers, {Total, Records}) - end. - -issue_warning(Peer) -> - Performance = get_or_init_performance(Peer), - Failures = Performance#performance.failures, - case Failures + 1 > ?TOLERATE_FAILURE_COUNT of - true -> - remove_peer(Peer); - false -> - Performance2 = Performance#performance{ failures = Failures + 1 }, - may_be_rotate_peer_ports(Peer), - ets:insert(?MODULE, {{peer, Peer}, Performance2}) + _ -> + ar_storage:write_term(peers, Records) end. %%%=================================================================== diff --git a/apps/arweave/src/ar_poller.erl b/apps/arweave/src/ar_poller.erl index 95fb7cd07..1416b388c 100644 --- a/apps/arweave/src/ar_poller.erl +++ b/apps/arweave/src/ar_poller.erl @@ -133,7 +133,7 @@ handle_cast(Msg, State) -> ?LOG_ERROR([{event, unhandled_cast}, {module, ?MODULE}, {message, Msg}]), {noreply, State}. -handle_info({event, block, {discovered, Peer, B, Time, Size}}, State) -> +handle_info({event, block, {discovered, Peer, B, QueryBlockTime}}, State) -> case ar_ignore_registry:member(B#block.indep_hash) of false -> ?LOG_INFO([{event, fetched_block_for_validation}, @@ -142,7 +142,7 @@ handle_info({event, block, {discovered, Peer, B, Time, Size}}, State) -> true -> ok end, - ar_block_pre_validator:pre_validate(B, Peer, erlang:timestamp(), Time, Size), + ar_block_pre_validator:pre_validate(B, Peer, QueryBlockTime, erlang:timestamp()), {noreply, State}; handle_info({event, block, _}, State) -> {noreply, State}; diff --git a/apps/arweave/src/ar_poller_worker.erl b/apps/arweave/src/ar_poller_worker.erl index 92138ff06..c893f406a 100644 --- a/apps/arweave/src/ar_poller_worker.erl +++ b/apps/arweave/src/ar_poller_worker.erl @@ -97,7 +97,7 @@ handle_cast({poll, Ref}, #state{ ref = Ref, peer = Peer, {ok, TXs} -> B2 = B#block{ txs = TXs }, ar_ignore_registry:remove_temporary(H), - ar_events:send(block, {discovered, Peer, B2, Time, Size}), + ar_events:send(block, {discovered, Peer, B2, Time}), ok; failed -> ?LOG_WARNING([{event, failed_to_get_block_txs_from_peer}, diff --git a/apps/arweave/src/ar_randomx_state.erl b/apps/arweave/src/ar_randomx_state.erl index 89dca9096..6ad2e33f9 100644 --- a/apps/arweave/src/ar_randomx_state.erl +++ b/apps/arweave/src/ar_randomx_state.erl @@ -331,7 +331,7 @@ get_block2(BH, Peers, RetryCount) -> {Peer, B, Time, Size} -> case ar_block:indep_hash(B) of BH -> - ar_events:send(peer, {served_block, Peer, Time, Size}), + ar_peers:rate_fetched_data(Peer, block, Time, Size), {ok, B}; InvalidBH -> ?LOG_WARNING([ diff --git a/apps/arweave/test/ar_config_tests.erl b/apps/arweave/test/ar_config_tests.erl index 3497e42e4..437b461d4 100644 --- a/apps/arweave/test/ar_config_tests.erl +++ b/apps/arweave/test/ar_config_tests.erl @@ -110,7 +110,8 @@ parse_config() -> max_nonce_limiter_validation_thread_count = 2, max_nonce_limiter_last_step_validation_thread_count = 3, nonce_limiter_server_trusted_peers = ["127.0.0.1", "2.3.4.5", "6.7.8.9:1982"], - nonce_limiter_client_peers = [<<"2.3.6.7:1984">>, <<"4.7.3.1:1983">>, <<"3.3.3.3">>], + nonce_limiter_client_peers = ["2.3.6.7:1984", "4.7.3.1:1983", "3.3.3.3"], + p3_server_peers = ["10.1.2.3:1985", "10.4.5.6"], run_defragmentation = true, defragmentation_trigger_threshold = 1_000, defragmentation_modules = [ diff --git a/apps/arweave/test/ar_config_tests_config_fixture.json b/apps/arweave/test/ar_config_tests_config_fixture.json index 73c6a5a49..8658317fe 100644 --- a/apps/arweave/test/ar_config_tests_config_fixture.json +++ b/apps/arweave/test/ar_config_tests_config_fixture.json @@ -115,6 +115,7 @@ "vdf_server_trusted_peer": "127.0.0.1", "vdf_server_trusted_peers": ["2.3.4.5", "6.7.8.9:1982"], "vdf_client_peers": ["2.3.6.7:1984", "4.7.3.1:1983", "3.3.3.3"], + "p3_server_peers": ["10.1.2.3:1985", "10.4.5.6"], "run_defragmentation": true, "defragmentation_trigger_threshold": 1000, "defragment_modules": [ diff --git a/apps/arweave/test/ar_data_sync_tests.erl b/apps/arweave/test/ar_data_sync_tests.erl index f9e7482ac..2c84289f9 100644 --- a/apps/arweave/test/ar_data_sync_tests.erl +++ b/apps/arweave/test/ar_data_sync_tests.erl @@ -682,8 +682,9 @@ test_mines_off_only_second_last_chunks() -> ). packs_chunks_depending_on_packing_threshold_test_() -> - test_with_mocked_functions([{ar_fork, height_2_6, fun() -> 10 end}, - {ar_fork, height_2_6_8, fun() -> 15 end}], + test_with_mocked_functions([{ar_fork, height_2_6, fun() -> 0 end}, + {ar_fork, height_2_6_8, fun() -> 0 end}, + {ar_fork, height_2_7, fun() -> 10 end}], fun test_packs_chunks_depending_on_packing_threshold/0). test_packs_chunks_depending_on_packing_threshold() -> @@ -754,66 +755,37 @@ test_packs_chunks_depending_on_packing_threshold() -> B = read_block_when_stored(H), PoA = B#block.poa, BI = lists:reverse(lists:sublist(lists:reverse(BILast), Height)), - {RecallByte, PartitionUpperBound} = - case B#block.height >= ar_fork:height_2_6() of + PrevNonceLimiterInfo = PrevB#block.nonce_limiter_info, + PrevSeed = + case B#block.height == ar_fork:height_2_6() of true -> - PrevNonceLimiterInfo = PrevB#block.nonce_limiter_info, - PrevSeed = - case B#block.height == ar_fork:height_2_6() of - true -> - element(1, lists:nth(?SEARCH_SPACE_UPPER_BOUND_DEPTH, BI)); - false -> - PrevNonceLimiterInfo#nonce_limiter_info.seed - end, - NonceLimiterInfo = B#block.nonce_limiter_info, - Output = NonceLimiterInfo#nonce_limiter_info.output, - UpperBound = - NonceLimiterInfo#nonce_limiter_info.partition_upper_bound, - H0 = ar_block:compute_h0(Output, B#block.partition_number, PrevSeed, - B#block.reward_addr), - {RecallRange1Start, _} = ar_block:get_recall_range(H0, - B#block.partition_number, UpperBound), - Byte = RecallRange1Start + B#block.nonce * ?DATA_CHUNK_SIZE, - {Byte, UpperBound}; + element(1, lists:nth(?SEARCH_SPACE_UPPER_BOUND_DEPTH, BI)); false -> - UpperBound = element(2, - lists:nth(?SEARCH_SPACE_UPPER_BOUND_DEPTH, BI)), - BDS = ar_block:generate_block_data_segment(B), - {H0, _Entropy} = ar_mine:spora_h0_with_entropy(BDS, B#block.nonce, - Height), - {ok, Byte} = ar_mine:pick_recall_byte(H0, PrevB#block.indep_hash, - UpperBound), - {Byte, UpperBound} + PrevNonceLimiterInfo#nonce_limiter_info.seed end, + NonceLimiterInfo = B#block.nonce_limiter_info, + Output = NonceLimiterInfo#nonce_limiter_info.output, + PartitionUpperBound = + NonceLimiterInfo#nonce_limiter_info.partition_upper_bound, + H0 = ar_block:compute_h0(Output, B#block.partition_number, PrevSeed, + B#block.reward_addr), + {RecallRange1Start, _} = ar_block:get_recall_range(H0, + B#block.partition_number, PartitionUpperBound), + RecallByte = RecallRange1Start + B#block.nonce * ?DATA_CHUNK_SIZE, {BlockStart, BlockEnd, TXRoot} = ar_block_index:get_block_bounds(RecallByte), - case B#block.height >= ar_fork:height_2_6() of - true -> - ?debugFmt("Mined a 2.6 block. " - "Computed recall byte: ~B, block's recall byte: ~p. " - "Height: ~B. Previous block: ~s. " - "Computed search space upper bound: ~B. " - "Block start: ~B. Block end: ~B. TX root: ~s.", - [RecallByte, B#block.recall_byte, Height, - ar_util:encode(PrevB#block.indep_hash), PartitionUpperBound, - BlockStart, BlockEnd, ar_util:encode(TXRoot)]), - ?assertEqual(RecallByte, B#block.recall_byte), - ?assertEqual(true, ar_poa:validate({BlockStart, RecallByte, TXRoot, - BlockEnd - BlockStart, PoA, B#block.strict_data_split_threshold, - {spora_2_6, B#block.reward_addr}})); - false -> - ?debugFmt("Mined a 2.5 block. " - "Computed recall byte: ~B, block's recall byte: ~p. " - "Height: ~B. Previous block: ~s. " - "Computed search space upper bound: ~B. " - "Block start: ~B. Block end: ~B. TX root: ~s.", - [RecallByte, B#block.recall_byte, Height, - ar_util:encode(PrevB#block.indep_hash), PartitionUpperBound, - BlockStart, BlockEnd, ar_util:encode(TXRoot)]), - ?assertEqual(RecallByte, B#block.recall_byte), - ?assertEqual(true, ar_poa:validate({BlockStart, RecallByte, TXRoot, - BlockEnd - BlockStart, PoA, B#block.strict_data_split_threshold, - spora_2_5})) - end, + ?debugFmt("Mined a block. " + "Computed recall byte: ~B, block's recall byte: ~p. " + "Height: ~B. Previous block: ~s. " + "Computed search space upper bound: ~B. " + "Block start: ~B. Block end: ~B. TX root: ~s.", + [RecallByte, B#block.recall_byte, Height, + ar_util:encode(PrevB#block.indep_hash), PartitionUpperBound, + BlockStart, BlockEnd, ar_util:encode(TXRoot)]), + ?assertEqual(RecallByte, B#block.recall_byte), + ?assertEqual(true, ar_poa:validate({BlockStart, RecallByte, TXRoot, + BlockEnd - BlockStart, PoA, B#block.strict_data_split_threshold, + {spora_2_6, B#block.reward_addr}, + B#block.merkle_rebase_support_threshold})), B end, LastB, @@ -888,7 +860,7 @@ setup_nodes() -> setup_nodes(MasterAddr, SlaveAddr) -> Wallet = {_, Pub} = ar_wallet:new(), - [B0] = ar_weave:init([{ar_wallet:to_address(Pub), ?AR(200), <<>>}]), + [B0] = ar_weave:init([{ar_wallet:to_address(Pub), ?AR(20000), <<>>}]), {ok, Config} = application:get_env(arweave, config), {Master, _} = start(B0, MasterAddr, Config), {ok, SlaveConfig} = slave_call(application, get_env, [arweave, config]), diff --git a/apps/arweave/test/ar_test_node.erl b/apps/arweave/test/ar_test_node.erl index 315d4beb0..819f490f0 100644 --- a/apps/arweave/test/ar_test_node.erl +++ b/apps/arweave/test/ar_test_node.erl @@ -84,7 +84,7 @@ start(B0, RewardAddr, Config, StorageModules) -> header_sync_jobs = 2, enable = [search_in_rocksdb_when_mining, serve_tx_data_without_limits, double_check_nonce_limiter, legacy_storage_repacking, serve_wallet_lists, - pack_served_chunks | Config#config.enable], + pack_fetched_chunks | Config#config.enable], mining_server_chunk_cache_size_limit = 4, debug = true }),