From 3e405df9a102f6c2b33e8a129313cfb468d2cc6e Mon Sep 17 00:00:00 2001 From: georgeee Date: Thu, 5 Dec 2024 12:14:43 +0000 Subject: [PATCH] Refactor revalidate: compute key intersection This makes code a bit more readable by decreasing the nestedness. --- src/lib/network_pool/indexed_pool.ml | 218 +++++++++++++-------------- 1 file changed, 107 insertions(+), 111 deletions(-) diff --git a/src/lib/network_pool/indexed_pool.ml b/src/lib/network_pool/indexed_pool.ml index 1dbc0c6ec6e..3ea4914122c 100644 --- a/src/lib/network_pool/indexed_pool.ml +++ b/src/lib/network_pool/indexed_pool.ml @@ -678,132 +678,128 @@ let revalidate : -> [ `Entire_pool | `Subset of Account_id.Set.t ] -> (Account_id.t -> Account.t) -> t * Transaction_hash.User_command_with_valid_signature.t Sequence.t = - fun t ~logger scope f -> + fun t ~logger scope get_account_by_id -> let requires_revalidation = match scope with | `Entire_pool -> - Fn.const true + t.all_by_sender | `Subset subset -> - Set.mem subset + (* intersection of scope and all_by_sender *) + Account_id.Map.merge t.all_by_sender + (Account_id.Set.to_map subset ~f:(const ())) + ~f:(fun ~key:_ -> function `Both (v, ()) -> Some v | _ -> None) in - Map.fold t.all_by_sender ~init:(t, Sequence.empty) + Map.fold requires_revalidation ~init:(t, Sequence.empty) ~f:(fun ~key:sender ~data:(queue, currency_reserved) ((t', dropped_acc) as acc) -> - if not (requires_revalidation sender) then acc + let account : Account.t = get_account_by_id sender in + let current_balance = + Currency.Balance.to_amount + (Account.liquid_balance_at_slot + ~global_slot:(global_slot_since_genesis t.config) + account ) + in + [%log debug] + "Revalidating account $account in transaction pool ($account_nonce, \ + $account_balance)" + ~metadata: + [ ("account", `String (Sexp.to_string @@ Account_id.sexp_of_t sender)) + ; ("account_nonce", `Int (Account_nonce.to_int account.nonce)) + ; ( "account_balance" + , `String (Currency.Amount.to_mina_string current_balance) ) + ] ; + let first_cmd = F_sequence.head_exn queue in + let first_nonce = + first_cmd |> Transaction_hash.User_command_with_valid_signature.command + |> User_command.applicable_at_nonce + in + if + not + ( Account.has_permission_to_send account + && Account.has_permission_to_increment_nonce account ) + then ( + [%log debug] "Account no longer has permission to send; dropping queue" ; + let dropped, t'' = remove_with_dependents_exn' t first_cmd in + (t'', Sequence.append dropped_acc dropped) ) + else if Account_nonce.(account.nonce < first_nonce) then ( + [%log debug] + "Current account nonce precedes first nonce in queue; dropping queue" ; + let dropped, t'' = remove_with_dependents_exn' t first_cmd in + (t'', Sequence.append dropped_acc dropped) ) else - let account : Account.t = f sender in - let current_balance = - Currency.Balance.to_amount - (Account.liquid_balance_at_slot - ~global_slot:(global_slot_since_genesis t.config) - account ) + (* current_nonce >= first_nonce *) + let first_applicable_nonce_index = + F_sequence.findi queue ~f:(fun cmd' -> + let nonce = + Transaction_hash.User_command_with_valid_signature.command cmd' + |> User_command.applicable_at_nonce + in + Account_nonce.equal nonce account.nonce ) + |> Option.value ~default:(F_sequence.length queue) in [%log debug] - "Revalidating account $account in transaction pool ($account_nonce, \ - $account_balance)" - ~metadata: - [ ( "account" - , `String (Sexp.to_string @@ Account_id.sexp_of_t sender) ) - ; ("account_nonce", `Int (Account_nonce.to_int account.nonce)) - ; ( "account_balance" - , `String (Currency.Amount.to_mina_string current_balance) ) - ] ; - let first_cmd = F_sequence.head_exn queue in - let first_nonce = - first_cmd - |> Transaction_hash.User_command_with_valid_signature.command - |> User_command.applicable_at_nonce + "Current account nonce succeeds first nonce in queue; splitting \ + queue at $index" + ~metadata:[ ("index", `Int first_applicable_nonce_index) ] ; + let drop_queue, keep_queue = + F_sequence.split_at queue first_applicable_nonce_index in - if - not - ( Account.has_permission_to_send account - && Account.has_permission_to_increment_nonce account ) - then ( - [%log debug] - "Account no longer has permission to send; dropping queue" ; - let dropped, t'' = remove_with_dependents_exn' t first_cmd in - (t'', Sequence.append dropped_acc dropped) ) - else if Account_nonce.(account.nonce < first_nonce) then ( - [%log debug] - "Current account nonce precedes first nonce in queue; dropping \ - queue" ; - let dropped, t'' = remove_with_dependents_exn' t first_cmd in - (t'', Sequence.append dropped_acc dropped) ) - else - (* current_nonce >= first_nonce *) - let first_applicable_nonce_index = - F_sequence.findi queue ~f:(fun cmd' -> - let nonce = - Transaction_hash.User_command_with_valid_signature.command - cmd' - |> User_command.applicable_at_nonce - in - Account_nonce.equal nonce account.nonce ) - |> Option.value ~default:(F_sequence.length queue) - in - [%log debug] - "Current account nonce succeeds first nonce in queue; splitting \ - queue at $index" - ~metadata:[ ("index", `Int first_applicable_nonce_index) ] ; - let drop_queue, keep_queue = - F_sequence.split_at queue first_applicable_nonce_index - in - let currency_reserved' = - F_sequence.foldl - (fun c cmd -> - Option.value_exn - Currency.Amount.(c - Option.value_exn (currency_consumed cmd)) - ) - currency_reserved drop_queue - in - let keep_queue', currency_reserved'', dropped_for_balance = - drop_until_sufficient_balance - (keep_queue, currency_reserved') - current_balance - in - let to_drop = - Sequence.append (F_sequence.to_seq drop_queue) dropped_for_balance - in - match Sequence.next to_drop with - | None -> - acc - | Some (head, tail) -> - let t'' = - Sequence.fold tail - ~init: - (remove_all_by_fee_and_hash_and_expiration_exn - (remove_applicable_exn t' head) - head ) - ~f:remove_all_by_fee_and_hash_and_expiration_exn - in - let t''' = - match F_sequence.uncons keep_queue' with - | None -> - { t'' with - all_by_sender = Map.remove t''.all_by_sender sender - } - | Some (first_kept, _) -> - let first_kept_unchecked = - Transaction_hash.User_command_with_valid_signature.command + let currency_reserved' = + F_sequence.foldl + (fun c cmd -> + Option.value_exn + Currency.Amount.(c - Option.value_exn (currency_consumed cmd)) + ) + currency_reserved drop_queue + in + let keep_queue', currency_reserved'', dropped_for_balance = + drop_until_sufficient_balance + (keep_queue, currency_reserved') + current_balance + in + let to_drop = + Sequence.append (F_sequence.to_seq drop_queue) dropped_for_balance + in + match Sequence.next to_drop with + | None -> + acc + | Some (head, tail) -> + let t'' = + Sequence.fold tail + ~init: + (remove_all_by_fee_and_hash_and_expiration_exn + (remove_applicable_exn t' head) + head ) + ~f:remove_all_by_fee_and_hash_and_expiration_exn + in + let t''' = + match F_sequence.uncons keep_queue' with + | None -> + { t'' with + all_by_sender = Map.remove t''.all_by_sender sender + } + | Some (first_kept, _) -> + let first_kept_unchecked = + Transaction_hash.User_command_with_valid_signature.command + first_kept + in + { t'' with + all_by_sender = + Map.set t''.all_by_sender ~key:sender + ~data:(keep_queue', currency_reserved'') + ; applicable_by_fee = + Map_set.insert + ( module Transaction_hash + .User_command_with_valid_signature ) + t''.applicable_by_fee + (User_command.fee_per_wu first_kept_unchecked) first_kept - in - { t'' with - all_by_sender = - Map.set t''.all_by_sender ~key:sender - ~data:(keep_queue', currency_reserved'') - ; applicable_by_fee = - Map_set.insert - ( module Transaction_hash - .User_command_with_valid_signature ) - t''.applicable_by_fee - (User_command.fee_per_wu first_kept_unchecked) - first_kept - } - in - (t''', Sequence.append dropped_acc to_drop) ) + } + in + (t''', Sequence.append dropped_acc to_drop) ) let expired_by_global_slot (t : t) : Transaction_hash.User_command_with_valid_signature.t Sequence.t =