Skip to content

Commit

Permalink
Refactor revalidate: compute key intersection
Browse files Browse the repository at this point in the history
This makes code a bit more readable by decreasing the nestedness.
  • Loading branch information
georgeee committed Dec 5, 2024
1 parent fa1c32c commit 3e405df
Showing 1 changed file with 107 additions and 111 deletions.
218 changes: 107 additions & 111 deletions src/lib/network_pool/indexed_pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -678,132 +678,128 @@ let revalidate :
-> [ `Entire_pool | `Subset of Account_id.Set.t ]
-> (Account_id.t -> Account.t)
-> t * Transaction_hash.User_command_with_valid_signature.t Sequence.t =
fun t ~logger scope f ->
fun t ~logger scope get_account_by_id ->
let requires_revalidation =
match scope with
| `Entire_pool ->
Fn.const true
t.all_by_sender
| `Subset subset ->
Set.mem subset
(* intersection of scope and all_by_sender *)
Account_id.Map.merge t.all_by_sender
(Account_id.Set.to_map subset ~f:(const ()))
~f:(fun ~key:_ -> function `Both (v, ()) -> Some v | _ -> None)
in
Map.fold t.all_by_sender ~init:(t, Sequence.empty)
Map.fold requires_revalidation ~init:(t, Sequence.empty)
~f:(fun
~key:sender
~data:(queue, currency_reserved)
((t', dropped_acc) as acc)
->
if not (requires_revalidation sender) then acc
let account : Account.t = get_account_by_id sender in
let current_balance =
Currency.Balance.to_amount
(Account.liquid_balance_at_slot
~global_slot:(global_slot_since_genesis t.config)
account )
in
[%log debug]
"Revalidating account $account in transaction pool ($account_nonce, \
$account_balance)"
~metadata:
[ ("account", `String (Sexp.to_string @@ Account_id.sexp_of_t sender))
; ("account_nonce", `Int (Account_nonce.to_int account.nonce))
; ( "account_balance"
, `String (Currency.Amount.to_mina_string current_balance) )
] ;
let first_cmd = F_sequence.head_exn queue in
let first_nonce =
first_cmd |> Transaction_hash.User_command_with_valid_signature.command
|> User_command.applicable_at_nonce
in
if
not
( Account.has_permission_to_send account
&& Account.has_permission_to_increment_nonce account )
then (
[%log debug] "Account no longer has permission to send; dropping queue" ;
let dropped, t'' = remove_with_dependents_exn' t first_cmd in
(t'', Sequence.append dropped_acc dropped) )
else if Account_nonce.(account.nonce < first_nonce) then (
[%log debug]
"Current account nonce precedes first nonce in queue; dropping queue" ;
let dropped, t'' = remove_with_dependents_exn' t first_cmd in
(t'', Sequence.append dropped_acc dropped) )
else
let account : Account.t = f sender in
let current_balance =
Currency.Balance.to_amount
(Account.liquid_balance_at_slot
~global_slot:(global_slot_since_genesis t.config)
account )
(* current_nonce >= first_nonce *)
let first_applicable_nonce_index =
F_sequence.findi queue ~f:(fun cmd' ->
let nonce =
Transaction_hash.User_command_with_valid_signature.command cmd'
|> User_command.applicable_at_nonce
in
Account_nonce.equal nonce account.nonce )
|> Option.value ~default:(F_sequence.length queue)
in
[%log debug]
"Revalidating account $account in transaction pool ($account_nonce, \
$account_balance)"
~metadata:
[ ( "account"
, `String (Sexp.to_string @@ Account_id.sexp_of_t sender) )
; ("account_nonce", `Int (Account_nonce.to_int account.nonce))
; ( "account_balance"
, `String (Currency.Amount.to_mina_string current_balance) )
] ;
let first_cmd = F_sequence.head_exn queue in
let first_nonce =
first_cmd
|> Transaction_hash.User_command_with_valid_signature.command
|> User_command.applicable_at_nonce
"Current account nonce succeeds first nonce in queue; splitting \
queue at $index"
~metadata:[ ("index", `Int first_applicable_nonce_index) ] ;
let drop_queue, keep_queue =
F_sequence.split_at queue first_applicable_nonce_index
in
if
not
( Account.has_permission_to_send account
&& Account.has_permission_to_increment_nonce account )
then (
[%log debug]
"Account no longer has permission to send; dropping queue" ;
let dropped, t'' = remove_with_dependents_exn' t first_cmd in
(t'', Sequence.append dropped_acc dropped) )
else if Account_nonce.(account.nonce < first_nonce) then (
[%log debug]
"Current account nonce precedes first nonce in queue; dropping \
queue" ;
let dropped, t'' = remove_with_dependents_exn' t first_cmd in
(t'', Sequence.append dropped_acc dropped) )
else
(* current_nonce >= first_nonce *)
let first_applicable_nonce_index =
F_sequence.findi queue ~f:(fun cmd' ->
let nonce =
Transaction_hash.User_command_with_valid_signature.command
cmd'
|> User_command.applicable_at_nonce
in
Account_nonce.equal nonce account.nonce )
|> Option.value ~default:(F_sequence.length queue)
in
[%log debug]
"Current account nonce succeeds first nonce in queue; splitting \
queue at $index"
~metadata:[ ("index", `Int first_applicable_nonce_index) ] ;
let drop_queue, keep_queue =
F_sequence.split_at queue first_applicable_nonce_index
in
let currency_reserved' =
F_sequence.foldl
(fun c cmd ->
Option.value_exn
Currency.Amount.(c - Option.value_exn (currency_consumed cmd))
)
currency_reserved drop_queue
in
let keep_queue', currency_reserved'', dropped_for_balance =
drop_until_sufficient_balance
(keep_queue, currency_reserved')
current_balance
in
let to_drop =
Sequence.append (F_sequence.to_seq drop_queue) dropped_for_balance
in
match Sequence.next to_drop with
| None ->
acc
| Some (head, tail) ->
let t'' =
Sequence.fold tail
~init:
(remove_all_by_fee_and_hash_and_expiration_exn
(remove_applicable_exn t' head)
head )
~f:remove_all_by_fee_and_hash_and_expiration_exn
in
let t''' =
match F_sequence.uncons keep_queue' with
| None ->
{ t'' with
all_by_sender = Map.remove t''.all_by_sender sender
}
| Some (first_kept, _) ->
let first_kept_unchecked =
Transaction_hash.User_command_with_valid_signature.command
let currency_reserved' =
F_sequence.foldl
(fun c cmd ->
Option.value_exn
Currency.Amount.(c - Option.value_exn (currency_consumed cmd))
)
currency_reserved drop_queue
in
let keep_queue', currency_reserved'', dropped_for_balance =
drop_until_sufficient_balance
(keep_queue, currency_reserved')
current_balance
in
let to_drop =
Sequence.append (F_sequence.to_seq drop_queue) dropped_for_balance
in
match Sequence.next to_drop with
| None ->
acc
| Some (head, tail) ->
let t'' =
Sequence.fold tail
~init:
(remove_all_by_fee_and_hash_and_expiration_exn
(remove_applicable_exn t' head)
head )
~f:remove_all_by_fee_and_hash_and_expiration_exn
in
let t''' =
match F_sequence.uncons keep_queue' with
| None ->
{ t'' with
all_by_sender = Map.remove t''.all_by_sender sender
}
| Some (first_kept, _) ->
let first_kept_unchecked =
Transaction_hash.User_command_with_valid_signature.command
first_kept
in
{ t'' with
all_by_sender =
Map.set t''.all_by_sender ~key:sender
~data:(keep_queue', currency_reserved'')
; applicable_by_fee =
Map_set.insert
( module Transaction_hash
.User_command_with_valid_signature )
t''.applicable_by_fee
(User_command.fee_per_wu first_kept_unchecked)
first_kept
in
{ t'' with
all_by_sender =
Map.set t''.all_by_sender ~key:sender
~data:(keep_queue', currency_reserved'')
; applicable_by_fee =
Map_set.insert
( module Transaction_hash
.User_command_with_valid_signature )
t''.applicable_by_fee
(User_command.fee_per_wu first_kept_unchecked)
first_kept
}
in
(t''', Sequence.append dropped_acc to_drop) )
}
in
(t''', Sequence.append dropped_acc to_drop) )

let expired_by_global_slot (t : t) :
Transaction_hash.User_command_with_valid_signature.t Sequence.t =
Expand Down

0 comments on commit 3e405df

Please sign in to comment.