-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathcluster.go
198 lines (164 loc) · 5.28 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
/*
Copyright 2020 YANDEX LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package hasql provides simple and reliable way to access high-availability database setups with multiple hosts.
package hasql
import (
"context"
"errors"
"io"
"sync"
"sync/atomic"
"time"
)
// Cluster consists of number of 'nodes' of a single SQL database.
type Cluster[T Querier] struct {
// configuration
updateInterval time.Duration
updateTimeout time.Duration
discoverer NodeDiscoverer[T]
checker NodeChecker
picker NodePicker[T]
tracer Tracer[T]
// status
checkedNodes atomic.Value
stop context.CancelFunc
// broadcast
subscribersMu sync.Mutex
subscribers []updateSubscriber[T]
}
// NewCluster returns object representing a single 'cluster' of SQL databases
func NewCluster[T Querier](discoverer NodeDiscoverer[T], checker NodeChecker, opts ...ClusterOpt[T]) (*Cluster[T], error) {
if discoverer == nil {
return nil, errors.New("node discoverer required")
}
// prepare internal 'stop' context
ctx, stopFn := context.WithCancel(context.Background())
cl := &Cluster[T]{
updateInterval: 5 * time.Second,
updateTimeout: time.Second,
discoverer: discoverer,
checker: checker,
picker: new(RandomNodePicker[T]),
stop: stopFn,
}
// apply options
for _, opt := range opts {
opt(cl)
}
// Store initial nodes state
cl.checkedNodes.Store(CheckedNodes[T]{})
// Start update routine
go cl.backgroundNodesUpdate(ctx)
return cl, nil
}
// Close stops node updates.
// Close function must be called when cluster is not needed anymore.
// It returns combined error if multiple nodes returned errors
func (cl *Cluster[T]) Close() (err error) {
cl.stop()
// close all nodes underlying connection pools
discovered := cl.checkedNodes.Load().(CheckedNodes[T]).discovered
for _, node := range discovered {
if closer, ok := any(node.DB()).(io.Closer); ok {
err = errors.Join(err, closer.Close())
}
}
// discard any collected state of nodes
cl.checkedNodes.Store(CheckedNodes[T]{})
return err
}
// Err returns cause of nodes most recent check failures.
// In most cases error is a list of errors of type CheckNodeErrors, original errors
// could be extracted using `errors.As`.
// Example:
//
// var cerrs NodeCheckErrors
// if errors.As(err, &cerrs) {
// for _, cerr := range cerrs {
// fmt.Printf("node: %s, err: %s\n", cerr.Node(), cerr.Err())
// }
// }
func (cl *Cluster[T]) Err() error {
return cl.checkedNodes.Load().(CheckedNodes[T]).Err()
}
// Node returns cluster node with specified status
func (cl *Cluster[T]) Node(criterion NodeStateCriterion) *Node[T] {
return pickNodeByCriterion(cl.checkedNodes.Load().(CheckedNodes[T]), cl.picker, criterion)
}
// WaitForNode with specified status to appear or until context is canceled
func (cl *Cluster[T]) WaitForNode(ctx context.Context, criterion NodeStateCriterion) (*Node[T], error) {
// Node already exists?
node := cl.Node(criterion)
if node != nil {
return node, nil
}
ch := cl.addUpdateSubscriber(criterion)
// Node might have appeared while we were adding waiter, recheck
node = cl.Node(criterion)
if node != nil {
return node, nil
}
// If channel is unbuffered and we are right here when nodes are updated,
// the update code won't be able to write into channel and will 'forget' it.
// Then we will report nil to the caller, either because update code
// closes channel or because context is canceled.
//
// In both cases its not what user wants.
//
// We can solve it by doing cl.Node(ns) if/when we are about to return nil.
// But if another update runs between channel read and cl.Node(ns) AND no
// nodes have requested status, we will still return nil.
//
// Also code becomes more complex.
//
// Wait for node to appear...
select {
case <-ctx.Done():
return nil, ctx.Err()
case node := <-ch:
return node, nil
}
}
// backgroundNodesUpdate periodically checks list of registered nodes
func (cl *Cluster[T]) backgroundNodesUpdate(ctx context.Context) {
// initial update
cl.updateNodes(ctx)
ticker := time.NewTicker(cl.updateInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
cl.updateNodes(ctx)
}
}
}
// updateNodes performs a new round of cluster state check
// and notifies all subscribers afterwards
func (cl *Cluster[T]) updateNodes(ctx context.Context) {
if cl.tracer.UpdateNodes != nil {
cl.tracer.UpdateNodes()
}
ctx, cancel := context.WithTimeout(ctx, cl.updateTimeout)
defer cancel()
checked := checkNodes(ctx, cl.discoverer, cl.checker, cl.picker.CompareNodes, cl.tracer)
cl.checkedNodes.Store(checked)
if cl.tracer.NodesUpdated != nil {
cl.tracer.NodesUpdated(checked)
}
cl.notifyUpdateSubscribers(checked)
if cl.tracer.WaitersNotified != nil {
cl.tracer.WaitersNotified()
}
}