Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: reconnect on connection errors #286

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9012927
Add a recovering mechanism to reconnect and retry a query if the serv…
xiam Oct 4, 2016
cabedd7
Fix bug with ghost values in execer
xiam Oct 5, 2016
c93dbcf
Update retry mechanism.
xiam Oct 17, 2016
1de668f
Merge branch 'issue-226' of github.com:upper/db into issue-226
xiam Oct 17, 2016
6475aea
Make d.recoverFromErr only catch ErrBadConn.
xiam Oct 17, 2016
b3306f0
Merge pull request #282 from upper/issue-280
Oct 21, 2016
c267726
Merge pull request #281 from upper/issue-279
Oct 21, 2016
de7bff7
Manual merge
xiam Oct 24, 2016
7a86715
Add reconnect (WIP +debug messages)
xiam Oct 24, 2016
0f9bfe3
Better logic for handling connect/reconnect
xiam Oct 26, 2016
4af4871
Cleaning up remaining reconnection bits
xiam Oct 27, 2016
f08946e
Removing log messages and adding a test for bad connection.
xiam Nov 4, 2016
e523258
Update docstrings
xiam Nov 4, 2016
2a02240
Remove "could not read block" from db.ErrServerRefusedConnection
xiam Nov 4, 2016
cfb43e3
Ping within a transaction.
xiam Nov 4, 2016
3538ba3
Don't block Transaction() before connecting.
xiam Nov 4, 2016
13181c0
Merge pull request #269 from upper/issue-226
Nov 8, 2016
ec0678d
Use db.ErrTooManyReconnectionAttempts instead of db.ErrGivingUpTrying…
xiam Nov 8, 2016
6d206db
Merge branch 'master' into feature/err-too-many-reconnection-attempts
xiam Nov 8, 2016
53a97dd
Merge pull request #283 from upper/feature/err-too-many-reconnection-…
Nov 10, 2016
37e2780
Fixes after testing reconnect with a low number of available connecti…
xiam Nov 10, 2016
78d28bc
Make retry-on-error configurable and disable it by default.
xiam Nov 10, 2016
340d8eb
Add db.Conf.SetRetryQueryOnError to PostgreSQL test.
xiam Nov 10, 2016
118944d
Fix call to c.setBinaryOption
xiam Nov 10, 2016
7017914
Add missing db.Conf.RetryQueryOnError
xiam Nov 10, 2016
ed13348
Add d.recoverAttempts for counting recovery attempts up to discardCon…
xiam Nov 14, 2016
02b5a72
Use flushConnectionPoolAfterRecoverAttempts and remove ConnCheck.
xiam Nov 14, 2016
963d413
Merge branch 'master' into feature/test-reconnect-with-few-connections
xiam Nov 16, 2016
838332f
Removing flawed logic: don't close before having a valid new conn to …
xiam Nov 16, 2016
33b8d62
Mark prepared statements as ready to be closed after using them
xiam Nov 16, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Removing log messages and adding a test for bad connection.
xiam committed Nov 4, 2016
commit f08946e8a17d5f66321a9f7d3603789a75cb0850
91 changes: 38 additions & 53 deletions internal/sqladapter/database.go
Original file line number Diff line number Diff line change
@@ -5,7 +5,6 @@ import (
"database/sql/driver"
"errors"
"io"
//"log"
"math"
"strconv"
"sync"
@@ -32,7 +31,7 @@ var (
// If a query fails with a recoverable error the connection is going to be
// re-estalished and the query can be retried, each retry adds a max wait
// time of maxConnectionRetryTime
maxQueryRetryAttempts = 2
maxQueryRetryAttempts = 3

// Minimum interval when trying to reconnect.
minConnectionRetryInterval = time.Millisecond * 100
@@ -43,8 +42,8 @@ var (
// Maximun time each connection retry attempt can take.
maxConnectionRetryTime = time.Second * 20

// Maximun reconnection attempts before the session gives up.
maxReconnectionAttempts uint64 = 3
// Maximun reconnection attempts per session before giving up.
maxReconnectionAttempts uint64 = 6
)

var (
@@ -163,7 +162,6 @@ var (
)

func (d *database) reconnect() error {
//log.Printf("reconnect: start")
if d.Transaction() != nil {
// Don't even attempt to recover from within a transaction, this is not
// possible.
@@ -172,22 +170,17 @@ func (d *database) reconnect() error {

err := d.PartialDatabase.Err(d.Ping())
if err == nil {
//log.Printf("reconnect: Conn is still there!")
return nil
}
//log.Printf("reconnect: Ping: %v", err)

return d.connect(d.connFn)
}

func (d *database) connect(connFn func() error) error {
//log.Printf("connect: start")

if connFn == nil {
return errors.New("Missing connect function")
}

//log.Printf("connect: wait...")
d.connectMu.Lock()
defer d.connectMu.Unlock()

@@ -198,33 +191,28 @@ func (d *database) connect(connFn func() error) error {

waitTime := minConnectionRetryInterval

for start, i := time.Now(), 0; time.Now().Sub(start) < maxConnectionRetryTime; i++ {
for start, i := time.Now(), 1; time.Now().Sub(start) < maxConnectionRetryTime; i++ {
waitTime = time.Duration(i) * minConnectionRetryInterval
if waitTime > maxConnectionRetryInterval {
waitTime = maxConnectionRetryInterval
}
// Wait a bit until retrying.
if waitTime > time.Duration(0) {
//log.Printf("connect[%d]: Wait %v", i, waitTime)
time.Sleep(waitTime)
}

err := connFn()
//log.Printf("connect[%d]: connFn: %v", i, err)
if err == nil {
atomic.StoreUint64(&d.connectAttempts, 0)
//log.Printf("connect[%d]: connected!", i)
d.connFn = connFn
return nil
}

if !d.isRecoverableError(err) {
//log.Printf("connect[%d]: We don't know how to handle: %v", i, err)
return err
}
}

//log.Printf("connect: Failed to connect")
return db.ErrGivingUpTryingToConnect
}

@@ -249,16 +237,11 @@ func (d *database) BindTx(t *sql.Tx) error {
d.sessMu.Lock()
defer d.sessMu.Unlock()

//log.Printf("BindTx")

d.baseTx = newTx(t)
if err := d.Ping(); err != nil {
//log.Printf("BindTx: err: %v", err)
return err
}

//log.Printf("BindTx: OK")

d.txID = newTxID()
return nil
}
@@ -294,6 +277,7 @@ func (d *database) BindSession(sess *sql.DB) error {

d.sessMu.Lock()
if d.sess != nil {
d.ClearCache()
d.sess.Close() // Close before rebind.
}
d.sess = sess
@@ -333,7 +317,6 @@ func (d *database) recoverFromErr(err error) error {

if d.isRecoverableError(err) {
err := d.reconnect()
//log.Printf("recoverFromErr: %v", err)
return err
}

@@ -345,7 +328,17 @@ func (d *database) recoverFromErr(err error) error {
// it
func (d *database) Ping() error {
if sess := d.Session(); sess != nil {
return sess.Ping()
if err := sess.Ping(); err != nil {
return err
}

_, err := sess.Exec("SELECT 1")
if err != nil {
return err
}

return nil

}
return db.ErrNotConnected
}
@@ -458,22 +451,19 @@ func (d *database) StatementExec(stmt *exql.Statement, args ...interface{}) (res
}(time.Now())
}

//log.Printf("Exec[%d] start", queryID)
for i := 0; i < maxQueryRetryAttempts; i++ {
for i := 0; ; i++ {
query, res, err = d.prepareAndExec(stmt, args...)
if err == nil {
return res, nil // success
if err == nil || i >= maxQueryRetryAttempts {
return res, err
}

//log.Printf("exec[%d] (%d): %v", queryID, i, err)
if d.recoverFromErr(err) == nil {
continue // retry
// Try to recover
if recoverErr := d.recoverFromErr(err); recoverErr != nil {
return nil, err // Unable to recover.
}
return nil, err
}

// All retry attempts failed, return res and err.
return nil, err
panic("reached")
}

func (d *database) prepareAndQuery(stmt *exql.Statement, args ...interface{}) (string, *sql.Rows, error) {
@@ -483,7 +473,7 @@ func (d *database) prepareAndQuery(stmt *exql.Statement, args ...interface{}) (s
}

rows, err := p.Query(args...)
return query, rows, nil
return query, rows, err
}

// StatementQuery compiles and executes a statement that returns rows.
@@ -507,22 +497,19 @@ func (d *database) StatementQuery(stmt *exql.Statement, args ...interface{}) (ro
}(time.Now())
}

//log.Printf("Query[%d] start", queryID)
for i := 0; i < maxQueryRetryAttempts; i++ {
for i := 0; ; i++ {
query, rows, err = d.prepareAndQuery(stmt, args...)
if err == nil {
return rows, err // success
if err == nil || i >= maxQueryRetryAttempts {
return rows, err
}

//log.Printf("query[%d] (%d): %v", queryID, i, err)
if d.recoverFromErr(err) == nil {
continue // retry
// Try to recover
if recoverErr := d.recoverFromErr(err); recoverErr != nil {
return nil, err // Unable to recover.
}
return nil, err
}

// All retry attempts failed, return rows and err.
return nil, err
panic("reached")
}

func (d *database) prepareAndQueryRow(stmt *exql.Statement, args ...interface{}) (string, *sql.Row, error) {
@@ -559,21 +546,19 @@ func (d *database) StatementQueryRow(stmt *exql.Statement, args ...interface{})
}(time.Now())
}

//log.Printf("QueryRow[%d] start", queryID)
for i := 0; i < maxQueryRetryAttempts; i++ {
for i := 0; ; i++ {
query, row, err = d.prepareAndQueryRow(stmt, args...)
if err == nil {
return row, nil // success
if err == nil || i >= maxQueryRetryAttempts {
return row, err
}

//log.Printf("queryRow[%d] (%d): %v", queryID, i, err)
if d.recoverFromErr(err) == nil {
continue // retry
// Try to recover
if recoverErr := d.recoverFromErr(err); recoverErr != nil {
return nil, err // Unable to recover.
}
return nil, err
}

return nil, err
panic("reached")
}

// Driver returns the underlying *sql.DB or *sql.Tx instance.
3 changes: 3 additions & 0 deletions lib/sqlbuilder/fetch.go
Original file line number Diff line number Diff line change
@@ -81,6 +81,9 @@ func fetchRow(rows *sql.Rows, dst interface{}) error {
// slice of structs given by the pointer `dst`.
func fetchRows(rows *sql.Rows, dst interface{}) error {
var err error
if rows == nil {
panic("rows cannot be nil")
}

defer rows.Close()

43 changes: 43 additions & 0 deletions postgresql/local_test.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@ package postgresql

import (
"database/sql"
"os"
"sync"
"testing"

"github.com/stretchr/testify/assert"
@@ -117,3 +119,44 @@ func TestIssue210(t *testing.T) {
_, err = sess.Collection("hello").Find().Count()
assert.NoError(t, err)
}

// The "driver: bad connection" problem (driver.ErrBadConn) happens when the
// database process is abnormally interrupted, this can happen for a variety of
// reasons, for instance when the database process is OOM killed by the OS.
func TestDriverBadConnection(t *testing.T) {
sess := mustOpen()
defer sess.Close()

var tMu sync.Mutex
tFatal := func(err error) {
tMu.Lock()
defer tMu.Unlock()
t.Fatal(err)
}

const concurrentOpts = 20
var wg sync.WaitGroup

for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
_, err := sess.Collection("artist").Find().Count()
if err != nil {
tFatal(err)
}
}(i)
if i%concurrentOpts == (concurrentOpts - 1) {
// This triggers the "bad connection" problem, if you want to see that
// instead of retrying the query set maxQueryRetryAttempts to 0.
wg.Add(1)
go func() {
defer wg.Done()
sess.Query("SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND datname = ?", os.Getenv("DB_NAME"))
}()
wg.Wait()
}
}

wg.Wait()
}