From fb0b773d6bdb159eccb48e22cb8540903c32b454 Mon Sep 17 00:00:00 2001 From: Ragot Geoffrey Date: Wed, 22 Nov 2023 16:37:10 +0100 Subject: [PATCH] feat(ledger): add some columns on transactions table to speed up read queries (#883) --- .../ledgerstore/migrations/0-init-schema.sql | 48 +++++++++++++++-- .../storage/ledgerstore/transactions.go | 7 ++- .../storage/ledgerstore/transactions_test.go | 2 + .../internal/storage/ledgerstore/utils.go | 54 +++++++++++++++++++ 4 files changed, 104 insertions(+), 7 deletions(-) diff --git a/components/ledger/internal/storage/ledgerstore/migrations/0-init-schema.sql b/components/ledger/internal/storage/ledgerstore/migrations/0-init-schema.sql index 6d03402ba1..ba96b42a91 100644 --- a/components/ledger/internal/storage/ledgerstore/migrations/0-init-schema.sql +++ b/components/ledger/internal/storage/ledgerstore/migrations/0-init-schema.sql @@ -55,7 +55,11 @@ create table transactions ( timestamp timestamp without time zone not null, reference varchar, reverted_at timestamp without time zone, - postings varchar not null + postings varchar not null, + sources jsonb, + destinations jsonb, + sources_arrays jsonb, + destinations_arrays jsonb ); create table transactions_metadata ( @@ -127,6 +131,10 @@ create index moves_range_dates on moves (account_address, asset, effective_date) create index transactions_date on transactions (timestamp); create index transactions_metadata_metadata on transactions_metadata using gin (metadata); --create unique index transactions_revisions on transactions_metadata(id desc, revision desc); +create index transactions_sources on transactions using gin (sources jsonb_path_ops); +create index transactions_destinations on transactions using gin (destinations jsonb_path_ops); +create index transactions_sources_arrays on transactions using gin (sources_arrays jsonb_path_ops); +create index transactions_destinations_arrays on transactions using gin (destinations_arrays jsonb_path_ops); create index moves_account_address on moves (account_address); create index moves_account_address_array on moves using gin (account_address_array jsonb_ops); @@ -160,6 +168,23 @@ as $$ end; $$; +-- given the input : "a:b:c", the function will produce : '{"0": "a", "1": "b", "2": "c", "3": null}' +create function explode_address(_address varchar) + returns jsonb + language sql + immutable +as $$ + select aggregate_objects(jsonb_build_object(data.number - 1, data.value)) + from ( + select row_number() over () as number, v.value + from ( + select unnest(string_to_array(_address, ':')) as value + union all + select null + ) v + ) data +$$; + create function get_account(_account_address varchar, _before timestamp default null) returns setof accounts_metadata language sql @@ -405,11 +430,28 @@ as $$ declare posting jsonb; begin - insert into transactions (id, timestamp, reference, postings) + insert into transactions (id, timestamp, reference, postings, sources, destinations, sources_arrays, destinations_arrays) values ((data->>'id')::numeric, (data->>'timestamp')::timestamp without time zone, data->>'reference', - jsonb_pretty(data->'postings')); + jsonb_pretty(data->'postings'), + ( + select to_jsonb(array_agg(v->>'source')) as value + from jsonb_array_elements(data->'postings') v + ), + ( + select to_jsonb(array_agg(v->>'destination')) as value + from jsonb_array_elements(data->'postings') v + ), + ( + select to_jsonb(array_agg(explode_address(v->>'source'))) as value + from jsonb_array_elements(data->'postings') v + ), + ( + select to_jsonb(array_agg(explode_address(v->>'destination'))) as value + from jsonb_array_elements(data->'postings') v + ) + ); for posting in (select jsonb_array_elements(data->'postings')) loop -- todo: sometimes the balance is known at commit time (for sources != world), we need to forward the value to populate the pre_commit_aggregated_input and output diff --git a/components/ledger/internal/storage/ledgerstore/transactions.go b/components/ledger/internal/storage/ledgerstore/transactions.go index cb9dd8243a..3f79ef1e79 100644 --- a/components/ledger/internal/storage/ledgerstore/transactions.go +++ b/components/ledger/internal/storage/ledgerstore/transactions.go @@ -155,7 +155,6 @@ func (store *Store) buildTransactionQuery(p PITFilterWithVolumes, query *bun.Sel query = query. Table("transactions"). ColumnExpr("distinct on(transactions.id) transactions.*, transactions_metadata.metadata"). - Join("join moves m on transactions.id = m.transaction_id"). Join(fmt.Sprintf(`left join lateral (%s) as transactions_metadata on true`, selectMetadata.String())) if p.PIT != nil && !p.PIT.IsZero() { @@ -186,7 +185,7 @@ func (store *Store) transactionQueryContext(qb query.Builder) (string, []any, er } switch address := value.(type) { case string: - return filterAccountAddress(address, "m.account_address"), nil, nil + return filterAccountAddressOnTransactions(address, true, true), nil, nil default: return "", nil, newErrInvalidQuery("unexpected type %T for column 'account'", address) } @@ -197,7 +196,7 @@ func (store *Store) transactionQueryContext(qb query.Builder) (string, []any, er } switch address := value.(type) { case string: - return fmt.Sprintf("(%s) and m.is_source", filterAccountAddress(address, "m.account_address")), nil, nil + return filterAccountAddressOnTransactions(address, true, false), nil, nil default: return "", nil, newErrInvalidQuery("unexpected type %T for column 'source'", address) } @@ -208,7 +207,7 @@ func (store *Store) transactionQueryContext(qb query.Builder) (string, []any, er } switch address := value.(type) { case string: - return fmt.Sprintf("(%s) and not m.is_source", filterAccountAddress(address, "m.account_address")), nil, nil + return filterAccountAddressOnTransactions(address, false, true), nil, nil default: return "", nil, newErrInvalidQuery("unexpected type %T for column 'destination'", address) } diff --git a/components/ledger/internal/storage/ledgerstore/transactions_test.go b/components/ledger/internal/storage/ledgerstore/transactions_test.go index 329d05aa62..532001d4e7 100644 --- a/components/ledger/internal/storage/ledgerstore/transactions_test.go +++ b/components/ledger/internal/storage/ledgerstore/transactions_test.go @@ -1020,6 +1020,7 @@ func TestGetTransactions(t *testing.T) { tc := tc t.Run(tc.name, func(t *testing.T) { t.Parallel() + tc.query.Options.ExpandVolumes = true tc.query.Options.ExpandEffectiveVolumes = false cursor, err := store.GetTransactions(ctx, NewGetTransactionsQuery(tc.query)) @@ -1027,6 +1028,7 @@ func TestGetTransactions(t *testing.T) { require.True(t, errors.Is(err, tc.expectError)) } else { require.NoError(t, err) + require.Len(t, cursor.Data, len(tc.expected.Data)) internaltesting.RequireEqual(t, *tc.expected, *cursor) count, err := store.CountTransactions(ctx, NewGetTransactionsQuery(tc.query)) diff --git a/components/ledger/internal/storage/ledgerstore/utils.go b/components/ledger/internal/storage/ledgerstore/utils.go index 0ac30f9fd8..e7d09a9cd5 100644 --- a/components/ledger/internal/storage/ledgerstore/utils.go +++ b/components/ledger/internal/storage/ledgerstore/utils.go @@ -2,6 +2,7 @@ package ledgerstore import ( "context" + "encoding/json" "fmt" "reflect" "strings" @@ -94,6 +95,59 @@ func filterAccountAddress(address, key string) string { return strings.Join(parts, " and ") } +func filterAccountAddressOnTransactions(address string, source, destination bool) string { + src := strings.Split(address, ":") + + needSegmentCheck := false + for _, segment := range src { + needSegmentCheck = segment == "" + if needSegmentCheck { + break + } + } + + if needSegmentCheck { + m := map[string]any{ + fmt.Sprint(len(src)): nil, + } + parts := make([]string, 0) + + for i, segment := range src { + if len(segment) == 0 { + continue + } + m[fmt.Sprint(i)] = segment + } + + data, err := json.Marshal([]any{m}) + if err != nil { + panic(err) + } + + if source { + parts = append(parts, fmt.Sprintf("sources_arrays @> '%s'", string(data))) + } + if destination { + parts = append(parts, fmt.Sprintf("destinations_arrays @> '%s'", string(data))) + } + return strings.Join(parts, " or ") + } else { + data, err := json.Marshal([]string{address}) + if err != nil { + panic(err) + } + + parts := make([]string, 0) + if source { + parts = append(parts, fmt.Sprintf("sources @> '%s'", string(data))) + } + if destination { + parts = append(parts, fmt.Sprintf("destinations @> '%s'", string(data))) + } + return strings.Join(parts, " or ") + } +} + func filterPIT(pit *ledger.Time, column string) func(query *bun.SelectQuery) *bun.SelectQuery { return func(query *bun.SelectQuery) *bun.SelectQuery { if pit == nil || pit.IsZero() {