forked from pokt-network/gateway-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnode_selector_service.go
134 lines (115 loc) · 4.85 KB
/
node_selector_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package node_selector_service
import (
"github.com/pokt-network/gateway-server/internal/chain_configurations_registry"
"github.com/pokt-network/gateway-server/internal/global_config"
"github.com/pokt-network/gateway-server/internal/node_selector_service/checks"
"github.com/pokt-network/gateway-server/internal/node_selector_service/checks/evm_data_integrity_check"
"github.com/pokt-network/gateway-server/internal/node_selector_service/checks/evm_height_check"
"github.com/pokt-network/gateway-server/internal/node_selector_service/checks/pokt_data_integrity_check"
"github.com/pokt-network/gateway-server/internal/node_selector_service/checks/pokt_height_check"
"github.com/pokt-network/gateway-server/internal/node_selector_service/checks/solana_data_integrity_check"
"github.com/pokt-network/gateway-server/internal/node_selector_service/checks/solana_height_check"
"github.com/pokt-network/gateway-server/internal/node_selector_service/models"
"github.com/pokt-network/gateway-server/internal/session_registry"
"github.com/pokt-network/gateway-server/pkg/common"
"github.com/pokt-network/gateway-server/pkg/pokt/pokt_v0"
"go.uber.org/zap"
"sort"
"time"
)
const (
jobCheckInterval = time.Second
)
type NodeSelectorService interface {
FindNode(chainId string) (*models.QosNode, bool)
}
type NodeSelectorClient struct {
sessionRegistry session_registry.SessionRegistryService
pocketRelayer pokt_v0.PocketRelayer
logger *zap.Logger
checkJobs []checks.CheckJob
}
func NewNodeSelectorService(sessionRegistry session_registry.SessionRegistryService, pocketRelayer pokt_v0.PocketRelayer, chainConfiguration chain_configurations_registry.ChainConfigurationsService, networkProvider global_config.ChainNetworkProvider, logger *zap.Logger) *NodeSelectorClient {
// base checks will share same node list and pocket relayer
baseCheck := checks.NewCheck(pocketRelayer, chainConfiguration, networkProvider)
// enabled checks
enabledChecks := []checks.CheckJob{
evm_height_check.NewEvmHeightCheck(baseCheck, logger.Named("evm_height_checker")),
evm_data_integrity_check.NewEvmDataIntegrityCheck(baseCheck, logger.Named("evm_data_integrity_checker")),
solana_height_check.NewSolanaHeightCheck(baseCheck, logger.Named("solana_height_check")),
solana_data_integrity_check.NewSolanaDataIntegrityCheck(baseCheck, logger.Named("solana_data_integrity_check")),
pokt_height_check.NewPoktHeightCheck(baseCheck, logger.Named("pokt_height_check")),
pokt_data_integrity_check.NewPoktDataIntegrityCheck(baseCheck, logger.Named("pokt_data_integrity_check")),
}
selectorService := &NodeSelectorClient{
sessionRegistry: sessionRegistry,
logger: logger,
checkJobs: enabledChecks,
}
selectorService.startJobChecker()
return selectorService
}
func (q NodeSelectorClient) FindNode(chainId string) (*models.QosNode, bool) {
nodes := q.sessionRegistry.GetNodesByChain(chainId)
if len(nodes) == 0 {
return nil, false
}
// Filter nodes by health
healthyNodes := filterByHealthyNodes(nodes)
// Find a node that's closer to session height
sortedSessionHeights, nodeMap := filterBySessionHeightNodes(healthyNodes)
for _, sessionHeight := range sortedSessionHeights {
node, ok := common.GetRandomElement(nodeMap[sessionHeight])
if ok {
return node, true
}
}
return nil, false
}
// filterBySessionHeightNodes - filter by session height descending. This allows node selector to send relays with
// latest session height which nodes are more likely to serve vs session rollover relays.
func filterBySessionHeightNodes(nodes []*models.QosNode) ([]uint, map[uint][]*models.QosNode) {
nodesBySessionHeight := map[uint][]*models.QosNode{}
// Create map to retrieve nodes by session height
for _, r := range nodes {
sessionHeight := r.MorseSession.SessionHeader.SessionHeight
nodesBySessionHeight[sessionHeight] = append(nodesBySessionHeight[sessionHeight], r)
}
// Create slice to hold sorted session heights
var sortedSessionHeights []uint
for sessionHeight := range nodesBySessionHeight {
sortedSessionHeights = append(sortedSessionHeights, sessionHeight)
}
// Sort the slice of session heights by descending order
sort.Slice(sortedSessionHeights, func(i, j int) bool {
return sortedSessionHeights[i] > sortedSessionHeights[j]
})
return sortedSessionHeights, nodesBySessionHeight
}
func filterByHealthyNodes(nodes []*models.QosNode) []*models.QosNode {
var healthyNodes []*models.QosNode
for _, r := range nodes {
if r.IsHealthy() {
healthyNodes = append(healthyNodes, r)
}
}
return healthyNodes
}
func (q NodeSelectorClient) startJobChecker() {
ticker := time.Tick(jobCheckInterval)
go func() {
for {
select {
case <-ticker:
for _, job := range q.checkJobs {
if job.ShouldRun() {
for _, nodes := range q.sessionRegistry.GetNodesMap() {
job.SetNodes(nodes.Value())
job.Perform()
}
}
}
}
}
}()
}