Skip to content

Commit

Permalink
feat: index affiliate fees across multiple addresses (#1037)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaladinlight authored Sep 16, 2024
1 parent 930b824 commit a7dfa1d
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 77 deletions.
150 changes: 82 additions & 68 deletions go/coinstacks/thorchain/api/affiliateFees.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ import (
)

const (
blockWorkers = 10
resultWorkers = 100
pageSize = 50
affiliateAddress = "thor1xmaggkcln5m5fnha2780xrdrulmplvfrz6wj3l"
blockWorkers = 10
resultWorkers = 100
pageSize = 50
)

type AffiliateFeeIndexer struct {
AffiliateFees []*AffiliateFee
conf cosmos.Config
httpClients []*cosmos.HTTPClient
mu sync.Mutex
pageChs []chan int
resultChs []chan *coretypes.ResultBlockSearch
wg sync.WaitGroup
AffiliateAddresses []string
AffiliateFees []*AffiliateFee
conf cosmos.Config
httpClients []*cosmos.HTTPClient
mu sync.Mutex
pageChs []chan int
resultChs []chan *coretypes.ResultBlockSearch
wg sync.WaitGroup
}

type AffiliateFee struct {
Expand All @@ -50,22 +50,25 @@ type AffiliateFee struct {
}

func NewAffiliateFeeIndexer(conf cosmos.Config, httpClients []*cosmos.HTTPClient) *AffiliateFeeIndexer {
pageChs := make([]chan int, len(httpClients))
affiliateAddresses := []string{"thor1xmaggkcln5m5fnha2780xrdrulmplvfrz6wj3l", "thor1crs0y53jfg224mettqeg883e6ume49tllktg2s"}

pageChs := make([]chan int, len(httpClients)*len(affiliateAddresses))
for i := range pageChs {
pageChs[i] = make(chan int)
}

resultChs := make([]chan *coretypes.ResultBlockSearch, len(httpClients))
resultChs := make([]chan *coretypes.ResultBlockSearch, len(httpClients)*len(affiliateAddresses))
for i := range resultChs {
resultChs[i] = make(chan *coretypes.ResultBlockSearch)
}

return &AffiliateFeeIndexer{
AffiliateFees: []*AffiliateFee{},
conf: conf,
httpClients: httpClients,
pageChs: pageChs,
resultChs: resultChs,
AffiliateAddresses: affiliateAddresses,
AffiliateFees: []*AffiliateFee{},
conf: conf,
httpClients: httpClients,
pageChs: pageChs,
resultChs: resultChs,
}
}

Expand All @@ -85,56 +88,64 @@ func (i *AffiliateFeeIndexer) Sync() error {

g := new(errgroup.Group)

for j, httpClient := range i.httpClients {
httpClient := httpClient
resultCh := i.resultChs[j]
pageCh := i.pageChs[j]
idx := 0
for _, affiliateAddress := range i.AffiliateAddresses {
affiliateAddress := affiliateAddress

g.Go(func() error {
for w := 0; w < resultWorkers; w++ {
go func() {
for result := range resultCh {
for _, b := range result.Blocks {
block := b.Block
i.handleBlock(httpClient, block)
for _, httpClient := range i.httpClients {
httpClient := httpClient

resultCh := i.resultChs[idx]
pageCh := i.pageChs[idx]

idx++

g.Go(func() error {
for w := 0; w < resultWorkers; w++ {
go func() {
for result := range resultCh {
for _, b := range result.Blocks {
block := b.Block
i.handleBlock(httpClient, block, affiliateAddress)
}
}
}
}()
}
}()
}

result, err := httpClient.BlockSearch(fmt.Sprintf(`"outbound.to='%s'"`, "thor1xmaggkcln5m5fnha2780xrdrulmplvfrz6wj3l"), 1, pageSize)
if err != nil {
return err
}
result, err := httpClient.BlockSearch(fmt.Sprintf(`"outbound.to='%s'"`, affiliateAddress), 1, pageSize)
if err != nil {
return err
}

maxPages := int(math.Ceil(float64(result.TotalCount) / float64(pageSize)))
resultCh <- result
maxPages := int(math.Ceil(float64(result.TotalCount) / float64(pageSize)))
resultCh <- result

i.wg.Add(blockWorkers)
for w := 0; w < blockWorkers; w++ {
go i.fetchBlocks(httpClient, pageCh, resultCh)
}
i.wg.Add(blockWorkers)
for w := 0; w < blockWorkers; w++ {
go i.fetchBlocks(httpClient, affiliateAddress, pageCh, resultCh)
}

go func() {
page := 2
for {
if page > maxPages {
close(pageCh)
break
go func() {
page := 2
for {
if page > maxPages {
close(pageCh)
break
}
pageCh <- page
page++
}
pageCh <- page
page++
}
}()
}()

i.wg.Wait()
i.wg.Wait()

return nil
})
}
return nil
})
}

if err := g.Wait(); err != nil {
return err
if err := g.Wait(); err != nil {
return err
}
}

logger.Infof("Finished indexing affiliate fees (%s)", time.Since(start))
Expand Down Expand Up @@ -211,11 +222,11 @@ func (i *AffiliateFeeIndexer) listen() error {
return nil
}

func (i *AffiliateFeeIndexer) fetchBlocks(httpClient *cosmos.HTTPClient, pageCh <-chan int, resultCh chan<- *coretypes.ResultBlockSearch) {
func (i *AffiliateFeeIndexer) fetchBlocks(httpClient *cosmos.HTTPClient, affiliateAddress string, pageCh <-chan int, resultCh chan<- *coretypes.ResultBlockSearch) {
defer i.wg.Done()

for page := range pageCh {
result, err := httpClient.BlockSearch(fmt.Sprintf(`"outbound.to='%s'"`, "thor1xmaggkcln5m5fnha2780xrdrulmplvfrz6wj3l"), page, pageSize)
result, err := httpClient.BlockSearch(fmt.Sprintf(`"outbound.to='%s'"`, affiliateAddress), page, pageSize)
if err != nil {
logger.Panicf("failed to fetch blocks for page: %d: %+v", page, err)
}
Expand All @@ -224,7 +235,7 @@ func (i *AffiliateFeeIndexer) fetchBlocks(httpClient *cosmos.HTTPClient, pageCh
}
}

func (i *AffiliateFeeIndexer) handleBlock(httpClient *cosmos.HTTPClient, block *tmtypes.Block) {
func (i *AffiliateFeeIndexer) handleBlock(httpClient *cosmos.HTTPClient, block *tmtypes.Block, affiliateAddress string) {
blockResult, err := httpClient.BlockResults(int(block.Height))
if err != nil {
logger.Panicf("failed to handle block: %d: %+v", block.Height, err)
Expand All @@ -234,18 +245,18 @@ func (i *AffiliateFeeIndexer) handleBlock(httpClient *cosmos.HTTPClient, block *
Block: block,
}

i.processAffiliateFees(b, blockResult.EndBlockEvents)
i.processAffiliateFees(b, blockResult.EndBlockEvents, []string{affiliateAddress})
}

func (i *AffiliateFeeIndexer) handleNewBlockHeader(newBlockHeader types.EventDataNewBlockHeader) {
b := &thorchain.NewBlockHeader{
EventDataNewBlockHeader: newBlockHeader,
}

i.processAffiliateFees(b, newBlockHeader.ResultEndBlock.Events)
i.processAffiliateFees(b, newBlockHeader.ResultEndBlock.Events, i.AffiliateAddresses)
}

func (i *AffiliateFeeIndexer) processAffiliateFees(block thorchain.Block, endBlockEvents []abci.Event) {
func (i *AffiliateFeeIndexer) processAffiliateFees(block thorchain.Block, endBlockEvents []abci.Event, affiliateAddresses []string) {
_, typedEvents, err := thorchain.ParseBlockEvents(endBlockEvents)
if err != nil {
logger.Panicf("failed to parse block events for block: %d: %+v", block.Height(), err)
Expand All @@ -269,10 +280,13 @@ func (i *AffiliateFeeIndexer) processAffiliateFees(block thorchain.Block, endBlo
continue
}

if affiliateFee.Address == affiliateAddress {
i.mu.Lock()
i.AffiliateFees = append(i.AffiliateFees, affiliateFee)
i.mu.Unlock()
for _, affiliateAddress := range affiliateAddresses {
if affiliateFee.Address == affiliateAddress {
i.mu.Lock()
i.AffiliateFees = append(i.AffiliateFees, affiliateFee)
i.mu.Unlock()
break
}
}
}
}
8 changes: 4 additions & 4 deletions go/coinstacks/thorchain/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ func (h *Handler) GetTxHistory(pubkey string, cursor string, pageSize int) (api.
// Contains info about the affiliate revenue earned
// swagger:model AffiliateRevenue
type AffiliateRevenue struct {
// Affiliate address
// Affiliate addresses
// required: true
Address string `json:"address"`
Addresses []string `json:"addresses"`
// Amount earned (RUNE)
// required: true
Amount string `json:"amount"`
Expand All @@ -104,8 +104,8 @@ func (h *Handler) GetAffiliateRevenue(start int, end int) (*AffiliateRevenue, er
}

a := &AffiliateRevenue{
Address: affiliateAddress,
Amount: total.String(),
Addresses: h.indexer.AffiliateAddresses,
Amount: total.String(),
}

return a, nil
Expand Down
13 changes: 8 additions & 5 deletions go/coinstacks/thorchain/api/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -334,14 +334,17 @@
"description": "Contains info about the affiliate revenue earned",
"type": "object",
"required": [
"address",
"addresses",
"amount"
],
"properties": {
"address": {
"description": "Affiliate address",
"type": "string",
"x-go-name": "Address"
"addresses": {
"description": "Affiliate addresses",
"type": "array",
"items": {
"type": "string"
},
"x-go-name": "Addresses"
},
"amount": {
"description": "Amount earned (RUNE)",
Expand Down

0 comments on commit a7dfa1d

Please sign in to comment.