Skip to content

Commit

Permalink
feat: introduce balances table
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Sep 9, 2024
1 parent bd7eaa0 commit 3429e6d
Show file tree
Hide file tree
Showing 10 changed files with 289 additions and 101 deletions.
2 changes: 2 additions & 0 deletions components/ledger/internal/controller/ledger/writer/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/stack/libs/go-libs/metadata"
"github.com/uptrace/bun"
"math/big"
)

//go:generate mockgen -source store.go -destination store_generated.go -package writer . TX
Expand All @@ -28,6 +29,7 @@ type TX interface {
Commit(ctx context.Context) error
Rollback(ctx context.Context) error
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
AddToBalance(ctx context.Context, addr, asset string, amount *big.Int) (*big.Int, error)
}

//go:generate mockgen -source store.go -destination store_generated.go -package writer . Store
Expand Down
53 changes: 39 additions & 14 deletions components/ledger/internal/controller/ledger/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pkg/errors"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"math/big"
"slices"
)

Expand Down Expand Up @@ -94,6 +95,7 @@ func (w *Writer) CreateTransaction(ctx context.Context, parameters Parameters, r
log, err := w.withTX(ctx, parameters, func(sqlTX TX) (*ledger.Log, error) {

if len(machineInit.BoundedSources) > 0 {
// TODO: remove lock, and use SELECT FOR UPDATE
_, latency, err := tracer.TraceWithLatency(ctx, "LockAccounts", func(ctx context.Context) (*struct{}, error) {
return nil, sqlTX.LockAccounts(ctx, machineInit.BoundedSources...)
}, func(ctx context.Context, _ *struct{}) {
Expand Down Expand Up @@ -167,29 +169,52 @@ func (w *Writer) CreateTransaction(ctx context.Context, parameters Parameters, r
}
}

for _, account := range transaction.GetMoves().InvolvedAccounts() {
_, latency, err = tracer.TraceWithLatency(ctx, "LockAccounts", func(ctx context.Context) (struct{}, error) {
return struct{}{}, sqlTX.LockAccounts(ctx, account)
}, func(ctx context.Context, _ struct{}) {
trace.SpanFromContext(ctx).SetAttributes(
attribute.StringSlice("accounts", transaction.GetMoves().InvolvedAccounts()),
)
})
if err != nil {
return nil, errors.Wrapf(err, "failed to acquire account lock on %s", account)
}
logger.WithField("latency", latency.String()).Debugf("account locked: %s", account)
}
//for _, account := range transaction.GetMoves().InvolvedAccounts() {
// _, latency, err = tracer.TraceWithLatency(ctx, "LockAccounts", func(ctx context.Context) (struct{}, error) {
// return struct{}{}, sqlTX.LockAccounts(ctx, account)
// }, func(ctx context.Context, _ struct{}) {
// trace.SpanFromContext(ctx).SetAttributes(
// attribute.StringSlice("accounts", transaction.GetMoves().InvolvedAccounts()),
// )
// })
// if err != nil {
// return nil, errors.Wrapf(err, "failed to acquire account lock on %s", account)
// }
// logger.WithField("latency", latency.String()).Debugf("account locked: %s", account)
//}

_, latency, err = tracer.TraceWithLatency(ctx, "InsertMoves", func(ctx context.Context) (struct{}, error) {
return struct{}{}, sqlTX.InsertMoves(ctx, transaction.GetMoves()...)
})
if err != nil {
return nil, errors.Wrap(err, "failed to insert moves")
}

logger.WithField("latency", latency.String()).Debugf("moves inserted")

// TODO: merge move for the same account
for _, move := range transaction.GetMoves() {
newBalance, latency, err := tracer.TraceWithLatency(ctx, "AddToBalance", func(ctx context.Context) (*big.Int, error) {
return sqlTX.AddToBalance(ctx, move.Account, move.Asset, move.Amount)
}, func(ctx context.Context, newBalance *big.Int) {
trace.SpanFromContext(ctx).SetAttributes(
attribute.String("new_balance", newBalance.String()),
attribute.String("account", move.Account),
attribute.String("asset", move.Asset),
attribute.String("amount", move.Amount.String()),
)
})
if err != nil {
return nil, errors.Wrap(err, "failed to update balance")
}
logger.
WithField("latency", latency.String()).
WithField("account", move.Account).
WithField("asset", move.Asset).
WithField("amount", move.Amount).
WithField("new_balance", newBalance.String()).
Debugf("balance updated")
}

// notes(gfyrag): force date to be zero to let postgres fill it
// todo: clean that
return pointer.For(ledger.NewTransactionLogWithDate(*transaction, result.AccountMetadata, time.Time{})), err
Expand Down
132 changes: 63 additions & 69 deletions components/ledger/internal/storage/bucket/migrations/11-stateless.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,86 +12,80 @@ alter table "{{.Bucket}}".transactions
alter column id
type bigint;

-- create function "{{.Bucket}}".insert_moves_from_transaction() returns trigger
-- create function "{{.Bucket}}".set_effective_volumes()
-- returns trigger
-- security definer
-- language plpgsql
-- as
-- $$
-- declare
-- posting jsonb;
-- begin
-- for posting in (select jsonb_array_elements(new.postings::jsonb)) loop
-- perform "{{.Bucket}}".insert_posting(new.seq, new.ledger, new.inserted_at, new.timestamp, posting, '{}'::jsonb);
-- end loop;
-- new.post_commit_effective_volumes = coalesce((
-- select (
-- (post_commit_effective_volumes).inputs + case when new.is_source then 0 else new.amount end,
-- (post_commit_effective_volumes).outputs + case when new.is_source then new.amount else 0 end
-- )
-- from "{{.Bucket}}".moves
-- where accounts_seq = new.accounts_seq
-- and asset = new.asset
-- and ledger = new.ledger
-- and (effective_date < new.effective_date or (effective_date = new.effective_date and seq < new.seq))
-- order by effective_date desc, seq desc
-- limit 1
-- ), (
-- case when new.is_source then 0 else new.amount end,
-- case when new.is_source then new.amount else 0 end
-- ));
--
-- return new;
-- end;
-- $$;
--
-- create trigger "{{.Bucket}}_project_moves_for_transaction"
-- after insert
-- on "{{.Bucket}}"."transactions"
-- create trigger "{{.Bucket}}_set_effective_volumes"
-- before insert
-- on "{{.Bucket}}"."moves"
-- for each row
-- execute procedure "{{.Bucket}}".insert_moves_from_transaction();

create function "{{.Bucket}}".set_effective_volumes()
returns trigger
security definer
language plpgsql
as
$$
begin
new.post_commit_effective_volumes = coalesce((
select (
(post_commit_effective_volumes).inputs + case when new.is_source then 0 else new.amount end,
(post_commit_effective_volumes).outputs + case when new.is_source then new.amount else 0 end
)
from "{{.Bucket}}".moves
where accounts_seq = new.accounts_seq
and asset = new.asset
and ledger = new.ledger
and (effective_date < new.effective_date or (effective_date = new.effective_date and seq < new.seq))
order by effective_date desc, seq desc
limit 1
), (
case when new.is_source then 0 else new.amount end,
case when new.is_source then new.amount else 0 end
));

return new;
end;
$$;

create trigger "{{.Bucket}}_set_effective_volumes"
before insert
on "{{.Bucket}}"."moves"
for each row
execute procedure "{{.Bucket}}".set_effective_volumes();
-- execute procedure "{{.Bucket}}".set_effective_volumes();
--
-- create function "{{.Bucket}}".update_effective_volumes()
-- returns trigger
-- security definer
-- language plpgsql
-- as
-- $$
-- begin
-- update "{{.Bucket}}".moves
-- set post_commit_effective_volumes =
-- (
-- (post_commit_effective_volumes).inputs + case when new.is_source then 0 else new.amount end,
-- (post_commit_effective_volumes).outputs + case when new.is_source then new.amount else 0 end
-- )
-- where accounts_seq = new.accounts_seq
-- and asset = new.asset
-- and effective_date > new.effective_date
-- and ledger = new.ledger;
--
-- return new;
-- end;
-- $$;
--
-- create trigger "{{.Bucket}}_update_effective_volumes"
-- after insert
-- on "{{.Bucket}}"."moves"
-- for each row
-- execute procedure "{{.Bucket}}".update_effective_volumes();

create function "{{.Bucket}}".update_effective_volumes()
returns trigger
security definer
language plpgsql
as
$$
begin
update "{{.Bucket}}".moves
set post_commit_effective_volumes =
(
(post_commit_effective_volumes).inputs + case when new.is_source then 0 else new.amount end,
(post_commit_effective_volumes).outputs + case when new.is_source then new.amount else 0 end
)
where accounts_seq = new.accounts_seq
and asset = new.asset
and effective_date > new.effective_date
and ledger = new.ledger;
-- todo: need to populate balances with existing data
create table "{{.Bucket}}".balances (
ledger varchar,
account varchar,
asset varchar,
balance numeric,

return new;
end;
$$;
primary key (ledger, account, asset)
);

create trigger "{{.Bucket}}_update_effective_volumes"
after insert
on "{{.Bucket}}"."moves"
for each row
execute procedure "{{.Bucket}}".update_effective_volumes();
alter table "{{.Bucket}}".moves
alter column post_commit_volumes
drop not null,
alter column post_commit_effective_volumes
drop not null;
30 changes: 30 additions & 0 deletions components/ledger/internal/storage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ import (
"github.com/uptrace/bun"
)

type Balances struct {
bun.BaseModel `bun:"balances"`

Ledger string `bun:"ledger,type:varchar"`
Account string `bun:"account,type:varchar"`
Asset string `bun:"asset,type:varchar"`
Balance *big.Int `bun:"balance,type:numeric"`
}

func (s *Store) GetAggregatedBalances(ctx context.Context, q ledgercontroller.GetAggregatedBalanceQuery) (ledger.BalancesByAssets, error) {

var (
Expand Down Expand Up @@ -155,3 +164,24 @@ func (s *Store) GetBalance(ctx context.Context, address, asset string) (*big.Int

return v.Balance, nil
}

func (s *Store) AddToBalance(ctx context.Context, addr, asset string, amount *big.Int) (*big.Int, error) {
r := &Balances{
Ledger: s.ledgerName,
Account: addr,
Asset: asset,
Balance: amount,
}
_, err := s.db.NewInsert().
Model(r).
ModelTableExpr(s.PrefixWithBucket("balances")).
On("conflict (ledger, account, asset) do update").
Set("balance = balances.balance + excluded.balance").
Returning("balance").
Exec(ctx)
if err != nil {
return nil, err
}

return r.Balance, err
}
15 changes: 15 additions & 0 deletions components/ledger/internal/storage/ledger/balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,18 @@ func TestGetBalancesAggregated(t *testing.T) {
}, ret)
})
}

func TestAddToBalance(t *testing.T) {
t.Parallel()

store := newLedgerStore(t)
ctx := logging.TestingContext()

balance, err := store.AddToBalance(ctx, "world", "USD/2", big.NewInt(-100))
require.NoError(t, err)
require.Equal(t, int64(-100), balance.Int64())

balance, err = store.AddToBalance(ctx, "world", "USD/2", big.NewInt(50))
require.NoError(t, err)
require.Equal(t, int64(-50), balance.Int64())
}
34 changes: 18 additions & 16 deletions components/ledger/internal/storage/ledger/moves.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ func (s *Store) InsertMoves(ctx context.Context, moves ...ledger.Move) error {
}
}))

// todo: rewrite, we just need to basically insert moves
_, err := s.db.NewInsert().
With("_rows", s.db.NewValues(mappedMoves)).
//todo: we should already have the sequence when using UpsertAccount
With("_account_sequences",
s.db.NewSelect().
Column("seq", "address").
Expand All @@ -54,22 +56,22 @@ func (s *Store) InsertMoves(ctx context.Context, moves ...ledger.Move) error {
s.db.NewSelect().
ColumnExpr("_rows.*").
ColumnExpr("_account_sequences.seq as accounts_seq").
ColumnExpr("("+
"coalesce(((last_move_by_seq.post_commit_volumes).inputs), 0) + case when is_source then 0 else amount end, "+
"coalesce(((last_move_by_seq.post_commit_volumes).outputs), 0) + case when is_source then amount else 0 end"+
")::"+s.PrefixWithBucket("volumes")+" as post_commit_volumes").
//ColumnExpr("("+
// "coalesce(((last_move_by_seq.post_commit_volumes).inputs), 0) + case when is_source then 0 else amount end, "+
// "coalesce(((last_move_by_seq.post_commit_volumes).outputs), 0) + case when is_source then amount else 0 end"+
// ")::"+s.PrefixWithBucket("volumes")+" as post_commit_volumes").
Join("join _account_sequences on _account_sequences.address = address").
Join("left join lateral ("+
s.db.NewSelect().
ColumnExpr("last_move.post_commit_volumes").
ModelTableExpr(s.PrefixWithBucketUsingModel(Move{})+" as last_move").
Where("_rows.account_address = last_move.account_address").
Where("_rows.asset = last_move.asset").
Where("_rows.ledger = last_move.ledger").
Order("seq desc").
Limit(1).
String()+
") last_move_by_seq on true").
//Join("left join lateral ("+
// s.db.NewSelect().
// ColumnExpr("last_move.post_commit_volumes").
// ModelTableExpr(s.PrefixWithBucketUsingModel(Move{})+" as last_move").
// Where("_rows.account_address = last_move.account_address").
// Where("_rows.asset = last_move.asset").
// Where("_rows.ledger = last_move.ledger").
// Order("seq desc").
// Limit(1).
// String()+
// ") last_move_by_seq on true").
Table("_rows"),
).
Model(&Move{}).
Expand All @@ -84,7 +86,7 @@ func (s *Store) InsertMoves(ctx context.Context, moves ...ledger.Move) error {
"insertion_date",
"effective_date",
"accounts_seq",
"post_commit_volumes",
//"post_commit_volumes",
).
ModelTableExpr(s.PrefixWithBucketUsingModel(Move{})).
Table("_computed_rows").
Expand Down
Loading

0 comments on commit 3429e6d

Please sign in to comment.