Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions lib/ae_mdw/db/model.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ defmodule AeMdw.Db.Model do

# index is timestamp (daylight saving order should be handle case by case)
@typep timestamp :: pos_integer()
@type async_task_type :: :update_aex9_state | :store_acc_balance | :migrate | :update_tx_stats
@type async_task_type :: :update_aex9_state | :store_acc_balance | :migrate
@type async_task_index :: {timestamp(), async_task_type()}
@type async_task_args :: list()

Expand Down Expand Up @@ -157,8 +157,13 @@ defmodule AeMdw.Db.Model do
)

# txs table :
# index = tx_index (0..), id = tx_id, block_index = {kbi, mbi} time = time, fee = fee
@tx_defaults [index: nil, id: nil, block_index: nil, time: nil, fee: nil]
# index = tx_index (0..),
# id = tx_id
# block_index = {kbi, mbi}
# time = time
# fee = fee
# accumulated_fee = sum of previous fees
@tx_defaults [index: nil, id: nil, block_index: nil, time: nil, fee: nil, accumulated_fee: nil]
defrecord :tx, @tx_defaults

@type tx_index() :: txi()
Expand All @@ -168,7 +173,8 @@ defmodule AeMdw.Db.Model do
id: Txs.tx_hash(),
block_index: block_index(),
time: Blocks.time(),
fee: non_neg_integer()
fee: non_neg_integer(),
accumulated_fee: non_neg_integer()
)

# txs time index :
Expand Down
60 changes: 60 additions & 0 deletions lib/ae_mdw/db/mutations/transaction_fee_mutation.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
defmodule AeMdw.Db.TransactionFeeMutation do
@moduledoc """
Add
"""

alias AeMdw.Blocks
alias AeMdw.Db.Model
alias AeMdw.Db.State
alias AeMdw.Txs

require Model

@derive AeMdw.Db.Mutation
defstruct [:txi, :tx_hash, :block_index, :time, :fee]

@typep fee() :: non_neg_integer()

@opaque t() :: %__MODULE__{
txi: Txs.txi(),
tx_hash: Txs.tx_hash(),
block_index: Blocks.block_index(),
time: Blocks.time(),
fee: fee()
}

@spec new(Txs.txi(), Txs.tx_hash(), Blocks.block_index(), Blocks.time(), fee()) :: t()
def new(txi, tx_hash, block_index, mb_time, fee) do
%__MODULE__{txi: txi, tx_hash: tx_hash, block_index: block_index, time: mb_time, fee: fee}
end

@spec execute(t(), State.t()) :: State.t()
def execute(
%__MODULE__{
txi: txi,
tx_hash: tx_hash,
block_index: block_index,
time: mb_time,
fee: fee
},
state
) do
accumulated_fee =
case State.get(state, Model.Tx, txi - 1) do
{:ok, Model.tx(accumulated_fee: accumulated_fee)} -> accumulated_fee + fee
:not_found -> fee
end

m_tx =
Model.tx(
index: txi,
id: tx_hash,
block_index: block_index,
time: mb_time,
fee: fee,
accumulated_fee: accumulated_fee
)

State.put(state, Model.Tx, m_tx)
end
end
4 changes: 2 additions & 2 deletions lib/ae_mdw/db/sync/transaction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ defmodule AeMdw.Db.Sync.Transaction do
alias AeMdw.Db.Sync.Name, as: SyncName
alias AeMdw.Db.Sync.Oracle
alias AeMdw.Db.Sync.Origin
alias AeMdw.Db.TransactionFeeMutation
alias AeMdw.Db.WriteFieldsMutation
alias AeMdw.Db.WriteMutation
alias AeMdw.Db.Mutation
Expand Down Expand Up @@ -85,7 +86,6 @@ defmodule AeMdw.Db.Sync.Transaction do
{type, tx} = :aetx.specialize_type(:aetx_sign.tx(signed_tx))
tx_hash = :aetx_sign.hash(signed_tx)
fee = Db.get_tx_fee(tx_hash)
m_tx = Model.tx(index: txi, id: tx_hash, block_index: block_index, time: mb_time, fee: fee)

tx_context =
TxContext.new(
Expand All @@ -100,7 +100,7 @@ defmodule AeMdw.Db.Sync.Transaction do
)

[
WriteMutation.new(Model.Tx, m_tx),
TransactionFeeMutation.new(txi, tx_hash, block_index, mb_time, fee),
WriteMutation.new(Model.Type, Model.type(index: {type, txi})),
WriteMutation.new(Model.Time, Model.time(index: {mb_time, txi})),
WriteFieldsMutation.new(type, tx, block_index, txi)
Expand Down
52 changes: 37 additions & 15 deletions lib/ae_mdw/stats.ex
Original file line number Diff line number Diff line change
Expand Up @@ -787,25 +787,47 @@ defmodule AeMdw.Stats do
end

defp last_24hs_txs_count_and_fee_with_trend(state) do
state
|> State.get(Model.Stat, :tx_stats)
|> case do
{:ok,
Model.stat(
payload:
{_started_at, {{txs_count_24hs, trend_str}, {average_tx_fees_24hs_str, fee_trend_str}}}
)} ->
trend = String.to_float(trend_str)
fees_trend = String.to_float(fee_trend_str)
average_tx_fees_24hs = String.to_float(average_tx_fees_24hs_str)

{{txs_count_24hs, trend}, {average_tx_fees_24hs, fees_trend}}

:not_found ->
time_24hs_ago = :aeu_time.now_in_msecs() - @seconds_per_day * 1_000

with {:ok, {_time, tx_index_24hs_ago}} <-
State.next(state, Model.Time, {time_24hs_ago, -1}),
{:ok, last_tx_index} <- State.prev(state, Model.Tx, nil),
time_48hs_ago <- time_24hs_ago - @seconds_per_day * 1_000,
{:ok, {_time, tx_index_48hs_ago}} <- State.next(state, Model.Time, {time_48hs_ago, -1}),
txs_count_24hs when txs_count_24hs > 0 <- last_tx_index - tx_index_24hs_ago + 1,
txs_count_48hs <- tx_index_24hs_ago - tx_index_48hs_ago,
trend <- Float.round((txs_count_24hs - txs_count_48hs) / txs_count_24hs, 2),
average_tx_fees_24hs when average_tx_fees_24hs > 0 <-
average_tx_fees(state, tx_index_24hs_ago, last_tx_index) do
average_tx_fees_48hs = average_tx_fees(state, tx_index_48hs_ago, tx_index_24hs_ago)

fee_trend =
Float.round((average_tx_fees_24hs - average_tx_fees_48hs) / average_tx_fees_24hs, 2)

{{txs_count_24hs, trend}, {average_tx_fees_24hs, fee_trend}}
else
_error ->
{{0, 0.0}, {0.0, 0.0}}
end
end

defp average_tx_fees(state, txi, txi) do
Model.tx(fee: fee) = State.fetch!(state, Model.Tx, txi)

fee * 1.0
end

defp average_tx_fees(state, start_txi, end_txi) do
txs_count = end_txi - start_txi + 1

Model.tx(accumulated_fee: start_accumulated_fee, fee: fee) =
State.fetch!(state, Model.Tx, start_txi)

Model.tx(accumulated_fee: end_accumulated_fee) = State.fetch!(state, Model.Tx, end_txi)

(end_accumulated_fee - start_accumulated_fee + fee) / txs_count
end

defp months_to_iso(months) do
year = div(months, 12)
month = rem(months, 12) + 1
Expand Down
93 changes: 3 additions & 90 deletions lib/ae_mdw/sync/async_tasks/update_tx_stats.ex
Original file line number Diff line number Diff line change
@@ -1,100 +1,13 @@
defmodule AeMdw.Sync.AsyncTasks.UpdateTxStats do
@moduledoc """
Async work to update tx count, fee and stats without blocking sync.
Temporary module to get rid of pending tasks.
"""
@behaviour AeMdw.Sync.AsyncTasks.Work

alias AeMdw.Db.Model
alias AeMdw.Db.WriteMutation
alias AeMdw.Db.RocksDbCF
alias AeMdw.Db.State

alias AeMdw.Sync.AsyncStoreServer

require Model
require Logger

@microsecs 1_000_000
@seconds_per_day 24 * 3_600

@spec process(args :: list(), done_fn :: fun()) :: :ok
def process([started_at], done_fn) do
state = State.mem_state()

state
|> State.get(Model.Stat, :tx_stats)
|> case do
{:ok, Model.stat(payload: {old_started_at, _old_stats})} when old_started_at > started_at ->
done_fn.()
:ok

{:ok, _old_stats} ->
update_stats(state, started_at, done_fn)

:not_found ->
update_stats(state, started_at, done_fn)
end
end

defp update_stats(state, started_at, done_fn) do
{time_delta, :ok} =
:timer.tc(fn ->
tx_stats =
calculate_fees(state, started_at)

write_mutation =
WriteMutation.new(
Model.Stat,
Model.stat(index: :tx_stats, payload: tx_stats)
)

AsyncStoreServer.write_mutations(
[write_mutation],
done_fn
)
end)

Logger.debug("[update_tx_stats] after #{time_delta / @microsecs}s")
def process(_args, done_fn) do
_task = Task.async(done_fn)

:ok
end

defp calculate_fees(state, started_at) do
time_24hs_ago = started_at - @seconds_per_day * 1_000

with {:ok, {_time, tx_index_24hs_ago}} <- State.next(state, Model.Time, {time_24hs_ago, -1}),
{:ok, last_tx_index} <- State.prev(state, Model.Tx, nil),
time_48hs_ago <- time_24hs_ago - @seconds_per_day * 1_000,
{:ok, {_time, tx_index_48hs_ago}} <- State.next(state, Model.Time, {time_48hs_ago, -1}),
txs_count_24hs when txs_count_24hs > 0 <- last_tx_index - tx_index_24hs_ago + 1,
txs_count_48hs <- tx_index_24hs_ago - tx_index_48hs_ago,
trend <- Float.round((txs_count_24hs - txs_count_48hs) / txs_count_24hs, 2),
average_tx_fees_24hs when average_tx_fees_24hs > 0 <-
average_tx_fees(tx_index_24hs_ago, last_tx_index),
average_tx_fees_48hs <- average_tx_fees(tx_index_48hs_ago, tx_index_24hs_ago),
fee_trend <-
Float.round((average_tx_fees_24hs - average_tx_fees_48hs) / average_tx_fees_24hs, 2) do
{started_at,
{{txs_count_24hs, Float.to_string(trend)},
{Float.to_string(average_tx_fees_24hs), Float.to_string(fee_trend)}}}
else
_error ->
{started_at, {{0, "0.0"}, {"0.0", "0.0"}}}
end
end

defp average_tx_fees(start_txi, end_txi) do
txs_count = end_txi - start_txi + 1

if txs_count != 0 do
Model.Tx
|> RocksDbCF.stream(key_boundary: {start_txi, end_txi})
|> Enum.reduce(0, fn Model.tx(fee: fee), acc ->
acc + fee
end)
|> then(&(&1 / txs_count))
else
0
end
end
end
11 changes: 2 additions & 9 deletions lib/ae_mdw/sync/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ defmodule AeMdw.Sync.Server do

:ok = profile_sync("sync_db", height, ts, blocks_mutations)

add_tx_fees_job(new_state)
new_state
end)

broadcast_blocks(gens_mutations)
Expand Down Expand Up @@ -421,9 +421,7 @@ defmodule AeMdw.Sync.Server do
block_mutations
end)

state
|> State.commit_mem(mutations)
|> add_tx_fees_job()
State.commit_mem(state, mutations)
end)

:ok = profile_sync("sync_mem", height, ts, gen_mutations)
Expand Down Expand Up @@ -525,9 +523,4 @@ defmodule AeMdw.Sync.Server do

:ok
end

defp add_tx_fees_job(state) do
now = :aeu_time.now_in_msecs()
State.enqueue(state, :update_tx_stats, [now], [])
end
end
60 changes: 60 additions & 0 deletions priv/migrations/20250421120001_add_accumulated_fee_to_txs.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
defmodule AeMdw.Migrations.AddAccumulatedFeeToTxs do
@moduledoc """
Adds the new accumulated fee field to tx.
"""
alias AeMdw.Db.WriteMutation
alias AeMdw.Db.Util, as: DbUtil
alias AeMdw.Db.State
alias AeMdw.Db.Model

import Record, only: [defrecord: 2]

require Model
require Logger

defrecord(:tx, index: nil, id: nil, block_index: nil, time: nil, fee: nil)

@spec run(State.t(), boolean()) :: {:ok, non_neg_integer()}
def run(state, from_start?) do
case DbUtil.last_txi(state) do
{:ok, last_txi} -> run(state, from_start?, last_txi)
:none -> {:ok, 0}
end
end

defp run(state, _from_start?, last_txi) do
1..last_txi
|> Stream.map(&State.fetch!(state, Model.Tx, &1))
|> Stream.transform(0, fn
tx(index: index, id: id, block_index: block_index, time: time, fee: fee), acc_fee ->
acc_fee = acc_fee + fee

if rem(index, 10_000) == 0 do
Logger.info("Processed #{index} out of #{last_txi}")
end

tx =
Model.tx(
index: index,
id: id,
block_index: block_index,
time: time,
fee: fee,
accumulated_fee: acc_fee
)

{[WriteMutation.new(Model.Tx, tx)], acc_fee}

Model.tx(accumulated_fee: acc_fee), _acc_fee ->
{[], acc_fee}
end)
|> Stream.chunk_every(10_000)
|> Stream.map(fn mutations ->
_state = State.commit_db(state, mutations)

length(mutations)
end)
|> Enum.sum()
|> then(&{:ok, &1})
end
end
Loading
Loading