Skip to content

Commit

Permalink
feat(ledger): add capability to sort transactions by timestamp (#1606)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Jul 19, 2024
1 parent d706e50 commit 7a8cb98
Show file tree
Hide file tree
Showing 13 changed files with 213 additions and 66 deletions.
11 changes: 10 additions & 1 deletion components/ledger/internal/api/v2/controllers_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,16 @@ func getTransactions(w http.ResponseWriter, r *http.Request) {
if err != nil {
return nil, err
}
return pointer.For(ledgerstore.NewGetTransactionsQuery(*options)), nil
q := ledgerstore.NewGetTransactionsQuery(*options)

if r.URL.Query().Get("order") == "effective" {
q.Column = "timestamp"
}
if r.URL.Query().Get("reverse") == "true" {
q.Order = bunpaginate.OrderAsc
}

return pointer.For(q), nil
})
if err != nil {
sharedapi.BadRequest(w, ErrValidation, err)
Expand Down
60 changes: 35 additions & 25 deletions components/ledger/internal/api/v2/controllers_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func TestGetTransactions(t *testing.T) {
name string
queryParams url.Values
body string
expectQuery ledgerstore.PaginatedQueryOptions[ledgerstore.PITFilterWithVolumes]
expectQuery ledgerstore.GetTransactionsQuery
expectStatusCode int
expectedErrorCode string
}
Expand All @@ -576,90 +576,90 @@ func TestGetTransactions(t *testing.T) {
testCases := []testCase{
{
name: "nominal",
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
expectQuery: ledgerstore.NewGetTransactionsQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
PITFilter: ledgerstore.PITFilter{
PIT: &now,
},
}),
})),
},
{
name: "using metadata",
body: `{"$match": {"metadata[roles]": "admin"}}`,
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
expectQuery: ledgerstore.NewGetTransactionsQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
PITFilter: ledgerstore.PITFilter{
PIT: &now,
},
}).
WithQueryBuilder(query.Match("metadata[roles]", "admin")),
WithQueryBuilder(query.Match("metadata[roles]", "admin"))),
},
{
name: "using startTime",
body: fmt.Sprintf(`{"$gte": {"start_time": "%s"}}`, now.Format(time.DateFormat)),
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
expectQuery: ledgerstore.NewGetTransactionsQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
PITFilter: ledgerstore.PITFilter{
PIT: &now,
},
}).
WithQueryBuilder(query.Gte("start_time", now.Format(time.DateFormat))),
WithQueryBuilder(query.Gte("start_time", now.Format(time.DateFormat)))),
},
{
name: "using endTime",
body: fmt.Sprintf(`{"$lte": {"end_time": "%s"}}`, now.Format(time.DateFormat)),
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
expectQuery: ledgerstore.NewGetTransactionsQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
PITFilter: ledgerstore.PITFilter{
PIT: &now,
},
}).
WithQueryBuilder(query.Lte("end_time", now.Format(time.DateFormat))),
WithQueryBuilder(query.Lte("end_time", now.Format(time.DateFormat)))),
},
{
name: "using account",
body: `{"$match": {"account": "xxx"}}`,
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
expectQuery: ledgerstore.NewGetTransactionsQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
PITFilter: ledgerstore.PITFilter{
PIT: &now,
},
}).
WithQueryBuilder(query.Match("account", "xxx")),
WithQueryBuilder(query.Match("account", "xxx"))),
},
{
name: "using reference",
body: `{"$match": {"reference": "xxx"}}`,
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
expectQuery: ledgerstore.NewGetTransactionsQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
PITFilter: ledgerstore.PITFilter{
PIT: &now,
},
}).
WithQueryBuilder(query.Match("reference", "xxx")),
WithQueryBuilder(query.Match("reference", "xxx"))),
},
{
name: "using destination",
body: `{"$match": {"destination": "xxx"}}`,
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
expectQuery: ledgerstore.NewGetTransactionsQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
PITFilter: ledgerstore.PITFilter{
PIT: &now,
},
}).
WithQueryBuilder(query.Match("destination", "xxx")),
WithQueryBuilder(query.Match("destination", "xxx"))),
},
{
name: "using source",
body: `{"$match": {"source": "xxx"}}`,
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
expectQuery: ledgerstore.NewGetTransactionsQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
PITFilter: ledgerstore.PITFilter{
PIT: &now,
},
}).
WithQueryBuilder(query.Match("source", "xxx")),
WithQueryBuilder(query.Match("source", "xxx"))),
},
{
name: "using empty cursor",
queryParams: url.Values{
"cursor": []string{bunpaginate.EncodeCursor(ledgerstore.NewGetTransactionsQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{})))},
},
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
expectQuery: ledgerstore.NewGetTransactionsQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
PITFilter: ledgerstore.PITFilter{},
}),
})),
},
{
name: "using invalid cursor",
Expand All @@ -682,29 +682,39 @@ func TestGetTransactions(t *testing.T) {
queryParams: url.Values{
"pageSize": []string{"1000000"},
},
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
expectQuery: ledgerstore.NewGetTransactionsQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
PITFilter: ledgerstore.PITFilter{
PIT: &now,
},
}).
WithPageSize(v2.MaxPageSize),
WithPageSize(v2.MaxPageSize)),
},
{
name: "using cursor",
queryParams: url.Values{
"cursor": []string{"eyJwYWdlU2l6ZSI6MTUsImJvdHRvbSI6bnVsbCwiY29sdW1uIjoiaWQiLCJwYWdpbmF0aW9uSUQiOm51bGwsIm9yZGVyIjoxLCJmaWx0ZXJzIjp7InFiIjp7fSwicGFnZVNpemUiOjE1LCJvcHRpb25zIjp7InBpdCI6bnVsbCwidm9sdW1lcyI6ZmFsc2UsImVmZmVjdGl2ZVZvbHVtZXMiOmZhbHNlfX0sInJldmVyc2UiOmZhbHNlfQ"},
},
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}),
expectQuery: ledgerstore.NewGetTransactionsQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{})),
},
{
name: "using $exists metadata filter",
body: `{"$exists": {"metadata": "foo"}}`,
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
expectQuery: ledgerstore.NewGetTransactionsQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
PITFilter: ledgerstore.PITFilter{
PIT: &now,
},
}).
WithQueryBuilder(query.Exists("metadata", "foo")),
WithQueryBuilder(query.Exists("metadata", "foo"))),
},
{
name: "paginate using effective order",
queryParams: map[string][]string{"order": {"effective"}},
expectQuery: ledgerstore.NewGetTransactionsQuery(ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{
PITFilter: ledgerstore.PITFilter{
PIT: &now,
},
})).
WithColumn("timestamp"),
},
}
for _, testCase := range testCases {
Expand All @@ -729,7 +739,7 @@ func TestGetTransactions(t *testing.T) {
backend, mockLedger := newTestingBackend(t, true)
if testCase.expectStatusCode < 300 && testCase.expectStatusCode >= 200 {
mockLedger.EXPECT().
GetTransactions(gomock.Any(), ledgerstore.NewGetTransactionsQuery(testCase.expectQuery)).
GetTransactions(gomock.Any(), testCase.expectQuery).
Return(&expectedCursor, nil)
}

Expand Down
11 changes: 10 additions & 1 deletion components/ledger/internal/storage/ledgerstore/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"regexp"
"strings"

"github.com/formancehq/stack/libs/go-libs/pointer"

"github.com/formancehq/stack/libs/go-libs/time"

"github.com/formancehq/stack/libs/go-libs/bun/bunpaginate"
Expand Down Expand Up @@ -161,7 +163,7 @@ func (store *Store) buildTransactionQuery(p PITFilterWithVolumes, query *bun.Sel
if p.PIT != nil && !p.PIT.IsZero() {
query = query.
Where("timestamp <= ?", p.PIT).
ColumnExpr("distinct on(transactions.id) transactions.*").
ColumnExpr("transactions.*").
Column("transactions_metadata.metadata").
Join(fmt.Sprintf(`left join lateral (%s) as transactions_metadata on true`, selectMetadata.String())).
ColumnExpr(fmt.Sprintf("case when reverted_at is not null and reverted_at > '%s' then null else reverted_at end", p.PIT.Format(time.DateFormat)))
Expand Down Expand Up @@ -401,6 +403,13 @@ func (q GetTransactionsQuery) WithExpandEffectiveVolumes() GetTransactionsQuery
return q
}

func (q GetTransactionsQuery) WithColumn(column string) GetTransactionsQuery {
ret := pointer.For((bunpaginate.ColumnPaginatedQuery[PaginatedQueryOptions[PITFilterWithVolumes]])(q))
ret = ret.WithColumn(column)

return GetTransactionsQuery(*ret)
}

func NewGetTransactionsQuery(options PaginatedQueryOptions[PITFilterWithVolumes]) GetTransactionsQuery {
return GetTransactionsQuery{
PageSize: options.PageSize,
Expand Down
12 changes: 12 additions & 0 deletions components/ledger/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1757,6 +1757,18 @@ paths:
schema:
type: string
format: date-time
- name: order
in: query
required: false
schema:
type: string
enum:
- effective
- name: reverse
in: query
required: false
schema:
type: boolean
requestBody:
content:
application/json:
Expand Down
12 changes: 12 additions & 0 deletions components/ledger/openapi/v2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,18 @@ paths:
schema:
type: string
format: date-time
- name: order
in: query
required: false
schema:
type: string
enum:
- effective
- name: reverse
in: query
required: false
schema:
type: boolean
requestBody:
content:
application/json:
Expand Down
6 changes: 6 additions & 0 deletions libs/go-libs/bun/bunpaginate/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ func (a *ColumnPaginatedQuery[PAYLOAD]) WithPageSize(pageSize uint64) *ColumnPag
return a
}

func (a *ColumnPaginatedQuery[PAYLOAD]) WithColumn(column string) *ColumnPaginatedQuery[PAYLOAD] {
a.Column = column

return a
}

type OffsetPaginatedQuery[OPTIONS any] struct {
Offset uint64 `json:"offset"`
Order Order `json:"order"`
Expand Down
29 changes: 23 additions & 6 deletions libs/go-libs/bun/bunpaginate/pagination_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"math/big"
"reflect"
"strings"
"time"

libtime "github.com/formancehq/stack/libs/go-libs/time"
"github.com/uptrace/bun"
)

Expand Down Expand Up @@ -59,14 +61,29 @@ func UsingColumn[FILTERS any, ENTITY any](ctx context.Context,
}

var (
paginationIDs = make([]*BigInt, 0)
paginationIDs = make([]*big.Int, 0)
)
for _, t := range ret {
paginationID := reflect.ValueOf(t).
rawPaginationID := reflect.ValueOf(t).
Field(paginatedColumnIndex).
Interface().(*BigInt)
Interface()
var paginationID *big.Int
switch rawPaginationID := rawPaginationID.(type) {
case time.Time:
paginationID = big.NewInt(rawPaginationID.UTC().UnixMicro())
case libtime.Time:
paginationID = big.NewInt(rawPaginationID.UTC().UnixMicro())
case *BigInt:
paginationID = (*big.Int)(rawPaginationID)
case *big.Int:
paginationID = rawPaginationID
case int64:
paginationID = big.NewInt(rawPaginationID)
default:
panic(fmt.Sprintf("invalid paginationID, type %T not handled", rawPaginationID))
}
if query.Bottom == nil {
query.Bottom = (*big.Int)(paginationID)
query.Bottom = paginationID
}
paginationIDs = append(paginationIDs, paginationID)
}
Expand All @@ -90,13 +107,13 @@ func UsingColumn[FILTERS any, ENTITY any](ctx context.Context,

if hasMore {
cp := query
cp.PaginationID = (*big.Int)(paginationIDs[len(paginationIDs)-2])
cp.PaginationID = paginationIDs[len(paginationIDs)-2]
previous = &cp
}
} else {
if hasMore {
cp := query
cp.PaginationID = (*big.Int)(paginationIDs[len(paginationIDs)-1])
cp.PaginationID = paginationIDs[len(paginationIDs)-1]
next = &cp
}
if query.PaginationID != nil {
Expand Down
4 changes: 0 additions & 4 deletions libs/go-libs/bun/bunpaginate/pagination_offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,9 @@ func usingOffset[Q any, T any](ctx context.Context, sb *bun.SelectQuery, query O
}

func UsingOffset[Q any, T any](ctx context.Context, sb *bun.SelectQuery, query OffsetPaginatedQuery[Q], builders ...func(query *bun.SelectQuery) *bun.SelectQuery) (*Cursor[T], error) {

return usingOffset[Q, T](ctx, sb, query, true, builders...)

}

func UsingOffsetWithoutModel[Q any, T any](ctx context.Context, sb *bun.SelectQuery, query OffsetPaginatedQuery[Q], builders ...func(query *bun.SelectQuery) *bun.SelectQuery) (*Cursor[T], error) {

return usingOffset[Q, T](ctx, sb, query, false, builders...)

}
3 changes: 2 additions & 1 deletion releases/sdks/go/.speakeasy/gen.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
lockVersion: 2.0.0
id: 7eac0a45-60a2-40bb-9e85-26bd77ec2a6d
management:
docChecksum: f349c78f846ac290e8a10eff3984042d
docChecksum: 8a9ef86467c00955c6ddee01c0a79bcb
docVersion: v0.0.0
speakeasyVersion: 1.292.0
generationVersion: 2.332.4
Expand Down Expand Up @@ -673,6 +673,7 @@ generatedFiles:
- docs/pkg/models/operations/v2listledgersresponse.md
- docs/pkg/models/operations/v2listlogsrequest.md
- docs/pkg/models/operations/v2listlogsresponse.md
- docs/pkg/models/operations/order.md
- docs/pkg/models/operations/v2listtransactionsrequest.md
- docs/pkg/models/operations/v2listtransactionsresponse.md
- docs/pkg/models/operations/v2readstatsrequest.md
Expand Down
8 changes: 8 additions & 0 deletions releases/sdks/go/docs/pkg/models/operations/order.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Order


## Values

| Name | Value |
| ---------------- | ---------------- |
| `OrderEffective` | effective |
Loading

0 comments on commit 7a8cb98

Please sign in to comment.