Skip to content

Commit

Permalink
Merge pull request #363 from stellar/horizon-v0.12.2-fixes
Browse files Browse the repository at this point in the history
Horizon v0.12.2 fixes
  • Loading branch information
nullstyle authored Mar 14, 2018
2 parents f8a5cca + 30e4167 commit 66cbd5f
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 30 deletions.
9 changes: 9 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).
As this project is pre 1.0, breaking changes may happen for minor version
bumps. A breaking change will get clearly notified in this log.

## v0.12.2 - 2017-03-14

This release is a bug fix release for v0.12.1 and v0.12.2. *Please see the upgrade notes below if you did not already migrate your db for v0.12.0*

### Changes

- Remove strict validation on the `resolution` parameter for trade aggregations endpoint. We will add this feature back in to the next major release.


## v0.12.1 - 2017-03-13

This release is a bug fix release for v0.12.0. *Please see the upgrade notes below if you did not already migrate your db for v0.12.0*
Expand Down
11 changes: 7 additions & 4 deletions services/horizon/internal/actions_trade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,16 @@ func TestTradeActions_Aggregation(t *testing.T) {
q.Add("order", "asc")

//test illegal resolution
q.Add("resolution", strconv.FormatInt(hour/2, 10))
w := ht.GetWithParams(aggregationPath, q)
ht.Assert.Equal(500, w.Code)

if history.StrictResolutionFiltering {
q.Add("resolution", strconv.FormatInt(hour/2, 10))
w := ht.GetWithParams(aggregationPath, q)
ht.Assert.Equal(500, w.Code)
}

//test one bucket for all trades
q.Set("resolution", strconv.FormatInt(hour, 10))
w = ht.GetWithParams(aggregationPath, q)
w := ht.GetWithParams(aggregationPath, q)
if ht.Assert.Equal(200, w.Code) {
ht.Assert.PageOf(1, w.Body)
ht.UnmarshalPage(w.Body, &records)
Expand Down
60 changes: 34 additions & 26 deletions services/horizon/internal/db2/history/trade_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package history

import (
"fmt"
"time"

sq "github.com/Masterminds/squirrel"
"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/support/errors"
. "github.com/stellar/go/support/time"
strtime "github.com/stellar/go/support/time"
"github.com/stellar/go/xdr"
"time"
)

// AllowedResolutions is the set of trade aggregation time windows allowed to be used as the
// `resolution` parameter.
var AllowedResolutions = map[time.Duration]struct{}{
time.Minute: {}, //1 minute
time.Minute * 15: {}, //15 minutes
Expand All @@ -19,7 +21,11 @@ var AllowedResolutions = map[time.Duration]struct{}{
time.Hour * 24 * 7: {}, //week
}

// Trade aggregation represents an aggregation of trades from the trades table
// StrictResolutionFiltering represents a simple feature flag to determine whether only
// predetermined resolutions of trade aggregations are allowed.
var StrictResolutionFiltering = false

// TradeAggregation represents an aggregation of trades from the trades table
type TradeAggregation struct {
Timestamp int64 `db:"timestamp"`
TradeCount int64 `db:"count"`
Expand All @@ -35,70 +41,72 @@ type TradeAggregation struct {
// TradeAggregationsQ is a helper struct to aid in configuring queries to
// bucket and aggregate trades
type TradeAggregationsQ struct {
baseAssetId int64
counterAssetId int64
baseAssetID int64
counterAssetID int64
resolution int64
startTime Millis
endTime Millis
startTime strtime.Millis
endTime strtime.Millis
pagingParams db2.PageQuery
}

// GetTradeAggregationsQ initializes a TradeAggregationsQ query builder based on the required parameters
func (q Q) GetTradeAggregationsQ(baseAssetId int64, counterAssetId int64, resolution int64, pagingParams db2.PageQuery) (*TradeAggregationsQ, error) {
func (q Q) GetTradeAggregationsQ(baseAssetID int64, counterAssetID int64, resolution int64, pagingParams db2.PageQuery) (*TradeAggregationsQ, error) {

//convert resolution to a duration struct
resolutionDuration := time.Duration(resolution)*time.Millisecond
resolutionDuration := time.Duration(resolution) * time.Millisecond

//check if resolution allowed
if _, ok := AllowedResolutions[resolutionDuration]; !ok {
return &TradeAggregationsQ{}, errors.New("resolution is not allowed")
if StrictResolutionFiltering {
if _, ok := AllowedResolutions[resolutionDuration]; !ok {
return &TradeAggregationsQ{}, errors.New("resolution is not allowed")
}
}

return &TradeAggregationsQ{
baseAssetId: baseAssetId,
counterAssetId: counterAssetId,
baseAssetID: baseAssetID,
counterAssetID: counterAssetID,
resolution: resolution,
pagingParams: pagingParams,
}, nil
}

// WithStartTime adds an optional lower time boundary filter to the trades being aggregated
func (q *TradeAggregationsQ) WithStartTime(startTime Millis) *TradeAggregationsQ {
func (q *TradeAggregationsQ) WithStartTime(startTime strtime.Millis) *TradeAggregationsQ {
// Round lower boundary up, if start time is in the middle of a bucket
q.startTime = startTime.RoundUp(q.resolution)
return q
}

// WithEndTime adds an upper optional time boundary filter to the trades being aggregated
func (q *TradeAggregationsQ) WithEndTime(endTime Millis) *TradeAggregationsQ {
func (q *TradeAggregationsQ) WithEndTime(endTime strtime.Millis) *TradeAggregationsQ {
// Round upper boundary down, to not deliver partial bucket
q.endTime = endTime.RoundDown(q.resolution)
return q
}

// Generate a sql statement to aggregate Trades based on given parameters
// GetSql generates a sql statement to aggregate Trades based on given parameters
func (q *TradeAggregationsQ) GetSql() sq.SelectBuilder {
var orderPreserved bool
orderPreserved, q.baseAssetId, q.counterAssetId = getCanonicalAssetOrder(q.baseAssetId, q.counterAssetId)
orderPreserved, q.baseAssetID, q.counterAssetID = getCanonicalAssetOrder(q.baseAssetID, q.counterAssetID)

var bucketSql sq.SelectBuilder
var bucketSQL sq.SelectBuilder
if orderPreserved {
bucketSql = bucketTrades(q.resolution)
bucketSQL = bucketTrades(q.resolution)
} else {
bucketSql = reverseBucketTrades(q.resolution)
bucketSQL = reverseBucketTrades(q.resolution)
}

bucketSql = bucketSql.From("history_trades").
Where(sq.Eq{"base_asset_id": q.baseAssetId, "counter_asset_id": q.counterAssetId})
bucketSQL = bucketSQL.From("history_trades").
Where(sq.Eq{"base_asset_id": q.baseAssetID, "counter_asset_id": q.counterAssetID})

//adjust time range and apply time filters
bucketSql = bucketSql.Where(sq.GtOrEq{"ledger_closed_at": q.startTime.ToTime()})
bucketSQL = bucketSQL.Where(sq.GtOrEq{"ledger_closed_at": q.startTime.ToTime()})
if !q.endTime.IsNil() {
bucketSql = bucketSql.Where(sq.Lt{"ledger_closed_at": q.endTime.ToTime()})
bucketSQL = bucketSQL.Where(sq.Lt{"ledger_closed_at": q.endTime.ToTime()})
}

//ensure open/close order for cases when multiple trades occur in the same ledger
bucketSql = bucketSql.OrderBy("history_operation_id ", "\"order\"")
bucketSQL = bucketSQL.OrderBy("history_operation_id ", "\"order\"")

return sq.Select(
"timestamp",
Expand All @@ -111,7 +119,7 @@ func (q *TradeAggregationsQ) GetSql() sq.SelectBuilder {
"first(price) as open",
"last(price) as close",
).
FromSelect(bucketSql, "htrd").
FromSelect(bucketSQL, "htrd").
GroupBy("timestamp").
Limit(q.pagingParams.Limit).
OrderBy("timestamp " + q.pagingParams.Order)
Expand Down

0 comments on commit 66cbd5f

Please sign in to comment.