Skip to content

Commit ffcd70e

Browse files
committed
chore: add accumulated fee for calculating fee averages and trends
1 parent 7dde6d1 commit ffcd70e

File tree

8 files changed

+200
-153
lines changed

8 files changed

+200
-153
lines changed

lib/ae_mdw/db/model.ex

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ defmodule AeMdw.Db.Model do
5555

5656
# index is timestamp (daylight saving order should be handle case by case)
5757
@typep timestamp :: pos_integer()
58-
@type async_task_type :: :update_aex9_state | :store_acc_balance | :migrate | :update_tx_stats
58+
@type async_task_type :: :update_aex9_state | :store_acc_balance | :migrate
5959
@type async_task_index :: {timestamp(), async_task_type()}
6060
@type async_task_args :: list()
6161

@@ -157,8 +157,13 @@ defmodule AeMdw.Db.Model do
157157
)
158158

159159
# txs table :
160-
# index = tx_index (0..), id = tx_id, block_index = {kbi, mbi} time = time, fee = fee
161-
@tx_defaults [index: nil, id: nil, block_index: nil, time: nil, fee: nil]
160+
# index = tx_index (0..),
161+
# id = tx_id
162+
# block_index = {kbi, mbi}
163+
# time = time
164+
# fee = fee
165+
# accumulated_fee = sum of previous fees
166+
@tx_defaults [index: nil, id: nil, block_index: nil, time: nil, fee: nil, accumulated_fee: nil]
162167
defrecord :tx, @tx_defaults
163168

164169
@type tx_index() :: txi()
@@ -168,7 +173,8 @@ defmodule AeMdw.Db.Model do
168173
id: Txs.tx_hash(),
169174
block_index: block_index(),
170175
time: Blocks.time(),
171-
fee: non_neg_integer()
176+
fee: non_neg_integer(),
177+
accumulated_fee: non_neg_integer()
172178
)
173179

174180
# txs time index :
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
defmodule AeMdw.Db.TransactionFeeMutation do
2+
@moduledoc """
3+
Add
4+
"""
5+
6+
alias AeMdw.Blocks
7+
alias AeMdw.Db.Model
8+
alias AeMdw.Db.State
9+
alias AeMdw.Txs
10+
11+
require Model
12+
13+
@derive AeMdw.Db.Mutation
14+
defstruct [:txi, :tx_hash, :block_index, :time, :fee]
15+
16+
@typep fee() :: non_neg_integer()
17+
18+
@opaque t() :: %__MODULE__{
19+
txi: Txs.txi(),
20+
tx_hash: Txs.tx_hash(),
21+
block_index: Blocks.block_index(),
22+
time: Blocks.time(),
23+
fee: fee()
24+
}
25+
26+
@spec new(Txs.txi(), Txs.tx_hash(), Blocks.block_index(), Blocks.time(), fee()) :: t()
27+
def new(txi, tx_hash, block_index, mb_time, fee) do
28+
%__MODULE__{txi: txi, tx_hash: tx_hash, block_index: block_index, time: mb_time, fee: fee}
29+
end
30+
31+
@spec execute(t(), State.t()) :: State.t()
32+
def execute(
33+
%__MODULE__{
34+
txi: txi,
35+
tx_hash: tx_hash,
36+
block_index: block_index,
37+
time: mb_time,
38+
fee: fee
39+
},
40+
state
41+
) do
42+
accumulated_fee =
43+
case State.get(state, Model.Tx, txi - 1) do
44+
{:ok, Model.tx(accumulated_fee: accumulated_fee)} -> accumulated_fee + fee
45+
:not_found -> fee
46+
end
47+
48+
m_tx =
49+
Model.tx(
50+
index: txi,
51+
id: tx_hash,
52+
block_index: block_index,
53+
time: mb_time,
54+
fee: fee,
55+
accumulated_fee: accumulated_fee
56+
)
57+
58+
State.put(state, Model.Tx, m_tx)
59+
end
60+
end

lib/ae_mdw/db/sync/transaction.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ defmodule AeMdw.Db.Sync.Transaction do
1818
alias AeMdw.Db.Sync.Name, as: SyncName
1919
alias AeMdw.Db.Sync.Oracle
2020
alias AeMdw.Db.Sync.Origin
21+
alias AeMdw.Db.TransactionFeeMutation
2122
alias AeMdw.Db.WriteFieldsMutation
2223
alias AeMdw.Db.WriteMutation
2324
alias AeMdw.Db.Mutation
@@ -85,7 +86,6 @@ defmodule AeMdw.Db.Sync.Transaction do
8586
{type, tx} = :aetx.specialize_type(:aetx_sign.tx(signed_tx))
8687
tx_hash = :aetx_sign.hash(signed_tx)
8788
fee = Db.get_tx_fee(tx_hash)
88-
m_tx = Model.tx(index: txi, id: tx_hash, block_index: block_index, time: mb_time, fee: fee)
8989

9090
tx_context =
9191
TxContext.new(
@@ -100,7 +100,7 @@ defmodule AeMdw.Db.Sync.Transaction do
100100
)
101101

102102
[
103-
WriteMutation.new(Model.Tx, m_tx),
103+
TransactionFeeMutation.new(txi, tx_hash, block_index, mb_time, fee),
104104
WriteMutation.new(Model.Type, Model.type(index: {type, txi})),
105105
WriteMutation.new(Model.Time, Model.time(index: {mb_time, txi})),
106106
WriteFieldsMutation.new(type, tx, block_index, txi)

lib/ae_mdw/stats.ex

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -787,25 +787,47 @@ defmodule AeMdw.Stats do
787787
end
788788

789789
defp last_24hs_txs_count_and_fee_with_trend(state) do
790-
state
791-
|> State.get(Model.Stat, :tx_stats)
792-
|> case do
793-
{:ok,
794-
Model.stat(
795-
payload:
796-
{_started_at, {{txs_count_24hs, trend_str}, {average_tx_fees_24hs_str, fee_trend_str}}}
797-
)} ->
798-
trend = String.to_float(trend_str)
799-
fees_trend = String.to_float(fee_trend_str)
800-
average_tx_fees_24hs = String.to_float(average_tx_fees_24hs_str)
801-
802-
{{txs_count_24hs, trend}, {average_tx_fees_24hs, fees_trend}}
803-
804-
:not_found ->
790+
time_24hs_ago = :aeu_time.now_in_msecs() - @seconds_per_day * 1_000
791+
792+
with {:ok, {_time, tx_index_24hs_ago}} <-
793+
State.next(state, Model.Time, {time_24hs_ago, -1}),
794+
{:ok, last_tx_index} <- State.prev(state, Model.Tx, nil),
795+
time_48hs_ago <- time_24hs_ago - @seconds_per_day * 1_000,
796+
{:ok, {_time, tx_index_48hs_ago}} <- State.next(state, Model.Time, {time_48hs_ago, -1}),
797+
txs_count_24hs when txs_count_24hs > 0 <- last_tx_index - tx_index_24hs_ago + 1,
798+
txs_count_48hs <- tx_index_24hs_ago - tx_index_48hs_ago,
799+
trend <- Float.round((txs_count_24hs - txs_count_48hs) / txs_count_24hs, 2),
800+
average_tx_fees_24hs when average_tx_fees_24hs > 0 <-
801+
average_tx_fees(state, tx_index_24hs_ago, last_tx_index) do
802+
average_tx_fees_48hs = average_tx_fees(state, tx_index_48hs_ago, tx_index_24hs_ago)
803+
804+
fee_trend =
805+
Float.round((average_tx_fees_24hs - average_tx_fees_48hs) / average_tx_fees_24hs, 2)
806+
807+
{{txs_count_24hs, trend}, {average_tx_fees_24hs, fee_trend}}
808+
else
809+
_error ->
805810
{{0, 0.0}, {0.0, 0.0}}
806811
end
807812
end
808813

814+
defp average_tx_fees(state, txi, txi) do
815+
Model.tx(fee: fee) = State.fetch!(state, Model.Tx, txi)
816+
817+
fee * 1.0
818+
end
819+
820+
defp average_tx_fees(state, start_txi, end_txi) do
821+
txs_count = end_txi - start_txi + 1
822+
823+
Model.tx(accumulated_fee: start_accumulated_fee, fee: fee) =
824+
State.fetch!(state, Model.Tx, start_txi)
825+
826+
Model.tx(accumulated_fee: end_accumulated_fee) = State.fetch!(state, Model.Tx, end_txi)
827+
828+
(end_accumulated_fee - start_accumulated_fee + fee) / txs_count
829+
end
830+
809831
defp months_to_iso(months) do
810832
year = div(months, 12)
811833
month = rem(months, 12) + 1
Lines changed: 3 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,101 +1,13 @@
11
defmodule AeMdw.Sync.AsyncTasks.UpdateTxStats do
22
@moduledoc """
3-
Async work to update tx count, fee and stats without blocking sync.
3+
Temporary module to get rid of pending tasks.
44
"""
55
@behaviour AeMdw.Sync.AsyncTasks.Work
66

7-
alias AeMdw.Db.Model
8-
alias AeMdw.Db.WriteMutation
9-
alias AeMdw.Log
10-
alias AeMdw.Db.RocksDbCF
11-
alias AeMdw.Db.State
12-
13-
alias AeMdw.Sync.AsyncStoreServer
14-
15-
require Model
16-
require Logger
17-
18-
@microsecs 1_000_000
19-
@seconds_per_day 24 * 3_600
20-
217
@spec process(args :: list(), done_fn :: fun()) :: :ok
22-
def process([started_at], done_fn) do
23-
state = State.mem_state()
24-
25-
state
26-
|> State.get(Model.Stat, :tx_stats)
27-
|> case do
28-
{:ok, Model.stat(payload: {old_started_at, _old_stats})} when old_started_at > started_at ->
29-
done_fn.()
30-
:ok
31-
32-
{:ok, _old_stats} ->
33-
update_stats(state, started_at, done_fn)
34-
35-
:not_found ->
36-
update_stats(state, started_at, done_fn)
37-
end
38-
end
39-
40-
defp update_stats(state, started_at, done_fn) do
41-
{time_delta, :ok} =
42-
:timer.tc(fn ->
43-
tx_stats =
44-
calculate_fees(state, started_at)
45-
46-
write_mutation =
47-
WriteMutation.new(
48-
Model.Stat,
49-
Model.stat(index: :tx_stats, payload: tx_stats)
50-
)
51-
52-
AsyncStoreServer.write_mutations(
53-
[write_mutation],
54-
done_fn
55-
)
56-
end)
57-
58-
Log.info("[update_tx_stats] after #{time_delta / @microsecs}s")
8+
def process(_args, done_fn) do
9+
_task = Task.async(done_fn)
5910

6011
:ok
6112
end
62-
63-
defp calculate_fees(state, started_at) do
64-
time_24hs_ago = started_at - @seconds_per_day * 1_000
65-
66-
with {:ok, {_time, tx_index_24hs_ago}} <- State.next(state, Model.Time, {time_24hs_ago, -1}),
67-
{:ok, last_tx_index} <- State.prev(state, Model.Tx, nil),
68-
time_48hs_ago <- time_24hs_ago - @seconds_per_day * 1_000,
69-
{:ok, {_time, tx_index_48hs_ago}} <- State.next(state, Model.Time, {time_48hs_ago, -1}),
70-
txs_count_24hs when txs_count_24hs > 0 <- last_tx_index - tx_index_24hs_ago + 1,
71-
txs_count_48hs <- tx_index_24hs_ago - tx_index_48hs_ago,
72-
trend <- Float.round((txs_count_24hs - txs_count_48hs) / txs_count_24hs, 2),
73-
average_tx_fees_24hs when average_tx_fees_24hs > 0 <-
74-
average_tx_fees(tx_index_24hs_ago, last_tx_index),
75-
average_tx_fees_48hs <- average_tx_fees(tx_index_48hs_ago, tx_index_24hs_ago),
76-
fee_trend <-
77-
Float.round((average_tx_fees_24hs - average_tx_fees_48hs) / average_tx_fees_24hs, 2) do
78-
{started_at,
79-
{{txs_count_24hs, Float.to_string(trend)},
80-
{Float.to_string(average_tx_fees_24hs), Float.to_string(fee_trend)}}}
81-
else
82-
_error ->
83-
{started_at, {{0, "0.0"}, {"0.0", "0.0"}}}
84-
end
85-
end
86-
87-
defp average_tx_fees(start_txi, end_txi) do
88-
txs_count = end_txi - start_txi + 1
89-
90-
if txs_count != 0 do
91-
Model.Tx
92-
|> RocksDbCF.stream(key_boundary: {start_txi, end_txi})
93-
|> Enum.reduce(0, fn Model.tx(fee: fee), acc ->
94-
acc + fee
95-
end)
96-
|> then(&(&1 / txs_count))
97-
else
98-
0
99-
end
100-
end
10113
end

lib/ae_mdw/sync/server.ex

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ defmodule AeMdw.Sync.Server do
329329

330330
:ok = profile_sync("sync_db", height, ts, blocks_mutations)
331331

332-
add_tx_fees_job(new_state)
332+
new_state
333333
end)
334334

335335
broadcast_blocks(gens_mutations)
@@ -421,9 +421,7 @@ defmodule AeMdw.Sync.Server do
421421
block_mutations
422422
end)
423423

424-
state
425-
|> State.commit_mem(mutations)
426-
|> add_tx_fees_job()
424+
State.commit_mem(state, mutations)
427425
end)
428426

429427
:ok = profile_sync("sync_mem", height, ts, gen_mutations)
@@ -525,9 +523,4 @@ defmodule AeMdw.Sync.Server do
525523

526524
:ok
527525
end
528-
529-
defp add_tx_fees_job(state) do
530-
now = :aeu_time.now_in_msecs()
531-
State.enqueue(state, :update_tx_stats, [now], [])
532-
end
533526
end
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
defmodule AeMdw.Migrations.AddAccumulatedFeeToTxs do
2+
@moduledoc """
3+
Adds the new accumulated fee field to tx.
4+
"""
5+
alias AeMdw.Db.WriteMutation
6+
alias AeMdw.Db.Util, as: DbUtil
7+
alias AeMdw.Db.State
8+
alias AeMdw.Db.Model
9+
10+
import Record, only: [defrecord: 2]
11+
12+
require Model
13+
require Logger
14+
15+
defrecord(:tx, index: nil, id: nil, block_index: nil, time: nil, fee: nil)
16+
17+
@spec run(State.t(), boolean()) :: {:ok, non_neg_integer()}
18+
def run(state, from_start?) do
19+
case DbUtil.last_txi(state) do
20+
{:ok, last_txi} -> run(state, from_start?, last_txi)
21+
:none -> {:ok, 0}
22+
end
23+
end
24+
25+
defp run(state, _from_start?, last_txi) do
26+
1..last_txi
27+
|> Stream.map(&State.fetch!(state, Model.Tx, &1))
28+
|> Stream.transform(0, fn
29+
tx(index: index, id: id, block_index: block_index, time: time, fee: fee), acc_fee ->
30+
acc_fee = acc_fee + fee
31+
32+
if rem(index, 10_000) == 0 do
33+
Logger.info("Processed #{index} out of #{last_txi}")
34+
end
35+
36+
tx =
37+
Model.tx(
38+
index: index,
39+
id: id,
40+
block_index: block_index,
41+
time: time,
42+
fee: fee,
43+
accumulated_fee: acc_fee
44+
)
45+
46+
{[WriteMutation.new(Model.Tx, tx)], acc_fee}
47+
48+
Model.tx(accumulated_fee: acc_fee), _acc_fee ->
49+
{[], acc_fee}
50+
end)
51+
|> Stream.chunk_every(10_000)
52+
|> Stream.map(fn mutations ->
53+
_state = State.commit_db(state, mutations)
54+
55+
length(mutations)
56+
end)
57+
|> Enum.sum()
58+
|> then(&{:ok, &1})
59+
end
60+
end

0 commit comments

Comments
 (0)