-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpool.go
142 lines (120 loc) · 2.45 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package riago
import (
"errors"
"sync"
"sync/atomic"
"time"
)
var (
ErrPoolClosing = errors.New("pool closing")
ErrPoolWaitTimeout = errors.New("pool wait timed out")
)
// Pool represents a pool of connections to Riak hosts.
type Pool struct {
count int
closing int32
conns chan *Conn
mutex sync.Mutex
waitTimeout time.Duration
}
// Creates a new Pool for a given host and connection count.
// Dials all connections before returning to prevent a stampede.
// Connections that fail to connect will retry in the background.
func NewPool(addr string, count int) (p *Pool) {
p = &Pool{
count: count,
conns: make(chan *Conn, count),
waitTimeout: 5 * time.Second,
}
for i := 0; i < count; i++ {
c := NewConn(addr)
if err := c.Recover(); err != nil {
p.Fail(c)
} else {
p.Put(c)
}
}
go p.pinger()
return
}
// Close the connection pool after waiting for connections to
// gracefully return.
func (p *Pool) Close() (err error) {
atomic.StoreInt32(&p.closing, 1)
for i := 0; i < p.count; i++ {
c := <-p.conns
c.Close()
}
return
}
// Get a connection from the pool. Returns an error if the
// operation takes longer than the pool wait timeout duration.
func (p *Pool) Get() (c *Conn, err error) {
if p.isClosing() {
err = ErrPoolClosing
return
}
// Optimistically try a non-blocking read to avoid a timer.
select {
case c = <-p.conns:
return
default:
break
}
// Fall back to waiting on a timer.
select {
case c = <-p.conns:
break
case <-time.After(p.waitTimeout):
err = ErrPoolWaitTimeout
break
}
return
}
// Release a connection back to the pool.
func (p *Pool) Put(c *Conn) {
p.conns <- c
return
}
// Fail a connection, making it unavailable. Spawns a goroutine
// that attempts to recover the connection. It is not made
// available to the pool while failed.
func (p *Pool) Fail(c *Conn) {
go func() {
var err error
i := 0
for {
i++
if p.isClosing() {
p.Put(c)
return
}
if err = c.Recover(); err != nil {
time.Sleep(1 * time.Second)
continue
}
p.Put(c)
return
}
}()
}
func (p *Pool) pinger() {
t := time.NewTicker(time.Duration(10000.0/p.count) * time.Millisecond)
for {
<-t.C
if p.isClosing() {
t.Stop()
return
}
if c, err := p.Get(); err == nil {
if err = c.Ping(); err != nil {
p.Fail(c)
} else {
p.Put(c)
}
}
}
}
func (p *Pool) isClosing() bool {
return atomic.LoadInt32(&p.closing) == 1
}