From 901292765a0d206ae43ec95539b38f87f3fc9770 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Tue, 4 Oct 2016 11:09:32 -0500 Subject: [PATCH 01/21] Add a recovering mechanism to reconnect and retry a query if the server is restarting or refusing connections. --- errors.go | 1 + internal/sqladapter/database.go | 85 +++++++++++++++++++--- internal/sqladapter/testing/adapter.go.tpl | 4 +- postgresql/database.go | 13 +++- 4 files changed, 87 insertions(+), 16 deletions(-) diff --git a/errors.go b/errors.go index afccfc4c..5fc07915 100644 --- a/errors.go +++ b/errors.go @@ -49,4 +49,5 @@ var ( ErrMissingConnURL = errors.New(`upper: missing DSN`) ErrNotImplemented = errors.New(`upper: call not implemented`) ErrAlreadyWithinTransaction = errors.New(`upper: already within a transaction`) + ErrServerRefusedConnection = errors.New(`upper: database server refused connection`) ) diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index 26e7291f..37e09989 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -2,6 +2,7 @@ package sqladapter import ( "database/sql" + "database/sql/driver" "math" "strconv" "sync" @@ -14,6 +15,15 @@ import ( "upper.io/db.v2/lib/sqlbuilder" ) +var ( + maxQueryRetryAttempts = 2 // Each retry adds a max wait time of maxConnectionRetryTime + + minConnectionRetryInterval = time.Millisecond * 10 + maxConnectionRetryInterval = time.Millisecond * 500 + + maxConnectionRetryTime = time.Second * 10 +) + var ( lastSessID uint64 lastTxID uint64 @@ -90,6 +100,8 @@ type database struct { PartialDatabase baseTx BaseTx + recoverFromErrMu sync.Mutex + collectionMu sync.Mutex databaseMu sync.Mutex @@ -168,6 +180,38 @@ func (d *database) BindSession(sess *sql.DB) error { return nil } +// recoverFromErr attempts to reestablish a connection after a temporary error. +func (d *database) recoverFromErr(err error) error { + d.recoverFromErrMu.Lock() + defer d.recoverFromErrMu.Unlock() + + if d.Transaction() != nil { + // Don't even attempt to recover from within a transaction. + return err + } + + waitTime := minConnectionRetryInterval + + // Attempt to stablish a new connection using the current session. + lastErr := d.PartialDatabase.Err(err) + for ts := time.Now(); time.Now().Sub(ts) < maxConnectionRetryTime; { + switch lastErr { + case nil: + return nil + case db.ErrTooManyClients, db.ErrServerRefusedConnection, driver.ErrBadConn: + time.Sleep(waitTime) + if waitTime < maxConnectionRetryInterval { + waitTime = waitTime * 2 + } + default: + break // Other error we don't know how to deal with. + } + lastErr = d.PartialDatabase.Err(d.Ping()) + } + + return err +} + // Ping checks whether a connection to the database is still alive by pinging // it func (d *database) Ping() error { @@ -271,12 +315,20 @@ func (d *database) StatementExec(stmt *exql.Statement, args ...interface{}) (res return nil, err } - if execer, ok := d.PartialDatabase.(HasStatementExec); ok { - res, err = execer.StatementExec(p, args...) - return + for i := 0; i < maxQueryRetryAttempts; i++ { + if execer, ok := d.PartialDatabase.(HasStatementExec); ok { + res, err = execer.StatementExec(p, args...) + } else { + res, err = p.Exec(args...) + } + if err == nil { + return + } + if err = d.recoverFromErr(err); err != nil { + return + } } - res, err = p.Exec(args...) return } @@ -303,7 +355,16 @@ func (d *database) StatementQuery(stmt *exql.Statement, args ...interface{}) (ro return nil, err } - rows, err = p.Query(args...) + for i := 0; i < maxQueryRetryAttempts; i++ { + rows, err = p.Query(args...) + if err == nil { + return + } + if err = d.recoverFromErr(err); err != nil { + return + } + } + return } @@ -353,7 +414,6 @@ func (d *database) prepareStatement(stmt *exql.Statement) (*sql.Stmt, string, er } pc, ok := d.cachedStatements.ReadRaw(stmt) - if ok { // The statement was cached. ps := pc.(*cachedStatement) @@ -394,21 +454,22 @@ func (d *database) WaitForConnection(connectFn func() error) error { defer waitForConnMu.Unlock() // Minimum waiting time. - waitTime := time.Millisecond * 10 + waitTime := minConnectionRetryInterval // Waitig 5 seconds for a successful connection. - for timeStart := time.Now(); time.Now().Sub(timeStart) < time.Second*5; { + for timeStart := time.Now(); time.Now().Sub(timeStart) < maxConnectionRetryTime; { err := connectFn() if err == nil { return nil // Connected! } // Only attempt to reconnect if the error is too many clients. - if d.PartialDatabase.Err(err) == db.ErrTooManyClients { - // Sleep and try again if, and only if, the server replied with a "too - // many clients" error. + switch d.PartialDatabase.Err(err) { + case db.ErrTooManyClients, db.ErrServerRefusedConnection, driver.ErrBadConn: + // Sleep and try again if, and only if, the server replied with a + // temporary error. time.Sleep(waitTime) - if waitTime < time.Millisecond*500 { + if waitTime < maxConnectionRetryInterval { // Wait a bit more next time. waitTime = waitTime * 2 } diff --git a/internal/sqladapter/testing/adapter.go.tpl b/internal/sqladapter/testing/adapter.go.tpl index 92b2e899..5ea67cdf 100644 --- a/internal/sqladapter/testing/adapter.go.tpl +++ b/internal/sqladapter/testing/adapter.go.tpl @@ -1370,7 +1370,7 @@ func TestBuilder(t *testing.T) { assert.NotZero(t, all) } -func TestExhaustConnectionPool(t *testing.T) { +func TestExhaustConnectionPoolWithTransactions(t *testing.T) { if Adapter == "ql" { t.Skip("Currently not supported.") } @@ -1402,7 +1402,7 @@ func TestExhaustConnectionPool(t *testing.T) { // Requesting a new transaction session. start := time.Now() - tLogf("Tx: %d: NewTx") + tLogf("Tx: %d: NewTx", i) tx, err := sess.NewTx() if err != nil { tFatal(err) diff --git a/postgresql/database.go b/postgresql/database.go index d7807d99..91c238fd 100644 --- a/postgresql/database.go +++ b/postgresql/database.go @@ -153,10 +153,19 @@ func (d *database) CompileStatement(stmt *exql.Statement) string { // Err allows sqladapter to translate some known errors into generic errors. func (d *database) Err(err error) error { if err != nil { + // These errors are not exported so we have to check them by comparing + // string values. s := err.Error() - // These errors are not exported so we have to check them by they string value. - if strings.Contains(s, `too many clients`) || strings.Contains(s, `remaining connection slots are reserved`) || strings.Contains(s, `too many open`) { + switch { + case strings.Contains(s, `too many clients`), + strings.Contains(s, `remaining connection slots are reserved`), + strings.Contains(s, `too many open`): return db.ErrTooManyClients + case strings.Contains(s, `connection refused`), + strings.Contains(s, `reset by peer`), + strings.Contains(s, `is starting up`), + strings.Contains(s, `is shutting down`): + return db.ErrServerRefusedConnection } } return err From cabedd7ec27cddfe978a2925c849467f9d5b87f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Wed, 5 Oct 2016 10:51:29 -0500 Subject: [PATCH 02/21] Fix bug with ghost values in execer --- internal/sqladapter/database.go | 39 +++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index 37e09989..526c2e06 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -3,6 +3,7 @@ package sqladapter import ( "database/sql" "database/sql/driver" + "errors" "math" "strconv" "sync" @@ -24,6 +25,10 @@ var ( maxConnectionRetryTime = time.Second * 10 ) +var ( + errNothingToRecoverFrom = errors.New("Nothing to recover from") +) + var ( lastSessID uint64 lastTxID uint64 @@ -180,35 +185,40 @@ func (d *database) BindSession(sess *sql.DB) error { return nil } -// recoverFromErr attempts to reestablish a connection after a temporary error. +// recoverFromErr attempts to reestablish a connection after a temporary error, +// returns nil if the connection was reestablished and the query can be retried. func (d *database) recoverFromErr(err error) error { - d.recoverFromErrMu.Lock() - defer d.recoverFromErrMu.Unlock() + if err == nil { + return errNothingToRecoverFrom + } if d.Transaction() != nil { // Don't even attempt to recover from within a transaction. return err } + d.recoverFromErrMu.Lock() + defer d.recoverFromErrMu.Unlock() + waitTime := minConnectionRetryInterval // Attempt to stablish a new connection using the current session. lastErr := d.PartialDatabase.Err(err) for ts := time.Now(); time.Now().Sub(ts) < maxConnectionRetryTime; { switch lastErr { - case nil: - return nil case db.ErrTooManyClients, db.ErrServerRefusedConnection, driver.ErrBadConn: time.Sleep(waitTime) if waitTime < maxConnectionRetryInterval { waitTime = waitTime * 2 } default: - break // Other error we don't know how to deal with. + // We don't know how to deal with this error. + return err } lastErr = d.PartialDatabase.Err(d.Ping()) } + // Return original error return err } @@ -322,13 +332,15 @@ func (d *database) StatementExec(stmt *exql.Statement, args ...interface{}) (res res, err = p.Exec(args...) } if err == nil { - return + return // successful query } - if err = d.recoverFromErr(err); err != nil { - return + if d.recoverFromErr(err) == nil { + continue // retry } + return } + // All retry attempts failed, return res and err. return } @@ -358,13 +370,16 @@ func (d *database) StatementQuery(stmt *exql.Statement, args ...interface{}) (ro for i := 0; i < maxQueryRetryAttempts; i++ { rows, err = p.Query(args...) if err == nil { - return + return // successful query } - if err = d.recoverFromErr(err); err != nil { - return + + if d.recoverFromErr(err) == nil { + continue // retry } + return } + // All retry attempts failed, return rows and err. return } From 6475aea1ab186c29bf3179b1b0452cc574c4286b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Mon, 17 Oct 2016 09:42:08 -0500 Subject: [PATCH 03/21] Make d.recoverFromErr only catch ErrBadConn. --- internal/sqladapter/database.go | 57 +++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index a3c8cf02..dd9dff62 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -17,12 +17,15 @@ import ( ) var ( - maxQueryRetryAttempts = 2 // Each retry adds a max wait time of maxConnectionRetryTime + // Needs to be multiplied by maxConnectionRetryTime to get the total time + // upper-db will use to retry before giving up and returning an error. + maxQueryRetryAttempts = 2 minConnectionRetryInterval = time.Millisecond * 10 maxConnectionRetryInterval = time.Millisecond * 500 - maxConnectionRetryTime = time.Second * 10 + // The amount of time each retry can take. + maxConnectionRetryTime = time.Second * 5 ) var ( @@ -186,8 +189,9 @@ func (d *database) BindSession(sess *sql.DB) error { return nil } -// recoverFromErr attempts to reestablish a connection after a temporary error, -// returns nil if the connection was reestablished and the query can be retried. +// recoverFromErr attempts to reestablish a connection after a recoverable +// error (like "bad driver"). Returns nil if the connection was reestablished, +// afther this the query can be retried. func (d *database) recoverFromErr(err error) error { if err == nil { return errNothingToRecoverFrom @@ -207,16 +211,25 @@ func (d *database) recoverFromErr(err error) error { lastErr := d.PartialDatabase.Err(err) for ts := time.Now(); time.Now().Sub(ts) < maxConnectionRetryTime; { switch lastErr { - case db.ErrTooManyClients, db.ErrServerRefusedConnection, driver.ErrBadConn: - time.Sleep(waitTime) - if waitTime < maxConnectionRetryInterval { - waitTime = waitTime * 2 + // According to database/sql, this is returned by an sql driver when the + // connection is in bad state and it should not be returned if there's a + // possibility that the database server might have performed the operation. + case driver.ErrBadConn: + time.Sleep(waitTime) // Wait a bit before retrying. + waitTime = waitTime * 2 // Double the previous waiting time. + if waitTime > maxConnectionRetryInterval { + // Won't wait more than maxConnectionRetryInterval + waitTime = maxConnectionRetryInterval } default: - // We don't know how to deal with this error. + // We don't know how to deal with this error, nor if we can recover + // from it, return the original error. return err } lastErr = d.PartialDatabase.Err(d.Ping()) + if lastErr == nil { + return nil // Connection was reestablished. + } } // Return original error @@ -334,16 +347,18 @@ func (d *database) StatementExec(stmt *exql.Statement, args ...interface{}) (res res, err = p.Exec(args...) } if err == nil { - return // successful query + return res, nil // successful query } if d.recoverFromErr(err) == nil { - continue // retry + continue // Connection was reestablished, retry. } - return + // We got another error from recoverFromErr that means it could not + // recover. + return res, err } - // All retry attempts failed, return res and err. - return + // All retry attempts failed. + return res, err } // StatementQuery compiles and executes a statement that returns rows. @@ -373,17 +388,17 @@ func (d *database) StatementQuery(stmt *exql.Statement, args ...interface{}) (ro for i := 0; i < maxQueryRetryAttempts; i++ { rows, err = p.Query(args...) if err == nil { - return // successful query + return rows, err // this was a successful query } if d.recoverFromErr(err) == nil { - continue // retry + continue // we can retry } - return + return rows, err } - // All retry attempts failed, return rows and err. - return + // All retry attempts failed. + return rows, err } // StatementQueryRow compiles and executes a statement that returns at most one @@ -411,8 +426,8 @@ func (d *database) StatementQueryRow(stmt *exql.Statement, args ...interface{}) } defer p.Close() - row, err = p.QueryRow(args...), nil - return + row, err = p.QueryRow(args...), nil // We're setting row and err like this because they're going to be logged. + return row, err } // Driver returns the underlying *sql.DB or *sql.Tx instance. From 7a867154f10d8d10782519055552e53c3b6a401f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Mon, 24 Oct 2016 09:45:33 -0500 Subject: [PATCH 04/21] Add reconnect (WIP +debug messages) --- internal/sqladapter/database.go | 248 ++++++++++++++++++++------------ postgresql/database.go | 3 + 2 files changed, 163 insertions(+), 88 deletions(-) diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index addd5a33..8836baee 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -4,6 +4,8 @@ import ( "database/sql" "database/sql/driver" "errors" + "io" + "log" "math" "strconv" "sync" @@ -17,28 +19,28 @@ import ( ) var ( - // Needs to be multiplied by maxConnectionRetryTime to get the total time - // upper-db will use to retry before giving up and returning an error. - maxQueryRetryAttempts = 2 + maxQueryRetryAttempts = 6 // Each retry adds a max wait time of maxConnectionRetryTime - minConnectionRetryInterval = time.Millisecond * 10 - maxConnectionRetryInterval = time.Millisecond * 500 + minConnectionRetryInterval = time.Millisecond * 100 + maxConnectionRetryInterval = time.Millisecond * 1000 - // The amount of time each retry can take. - maxConnectionRetryTime = time.Second * 5 + maxConnectionRetryTime = time.Second * 20 ) var ( - errNothingToRecoverFrom = errors.New("Nothing to recover from") + errNothingToRecoverFrom = errors.New("Nothing to recover from") + errUnableToRecover = errors.New("Unable to recover from this error") + errAllAttemptsHaveFailed = errors.New("All attempts to recover have failed") ) var ( - lastSessID uint64 - lastTxID uint64 + lastSessID uint64 + lastTxID uint64 + lastOperationID uint64 ) -// HasCleanUp is implemented by structs that have a clean up routine that needs -// to be called before Close(). +// HasCleanUp is implemented by structs that have a clean up routine that +// needs to be called before Close(). type HasCleanUp interface { CleanUp() error } @@ -109,14 +111,19 @@ type database struct { PartialDatabase baseTx BaseTx - recoverFromErrMu sync.Mutex + reconnectMu sync.Mutex collectionMu sync.Mutex databaseMu sync.Mutex - name string - sess *sql.DB - sessMu sync.Mutex + connFn func() error + + name string + + sess *sql.DB + sessErr error + sessMu sync.Mutex + reconnecting int32 sessID uint64 txID uint64 @@ -131,8 +138,61 @@ var ( _ = db.Database(&database{}) ) +func (d *database) reconnect() error { + log.Printf("reconnect: wait...") + + d.reconnectMu.Lock() + defer d.reconnectMu.Unlock() + + lastErr := d.PartialDatabase.Err(d.Ping()) + if lastErr == nil { + log.Printf("reconnect: Conn is still there!") + return nil + } + log.Printf("reconnect: Ping: %v", lastErr) + + waitTime := minConnectionRetryInterval + + for start, i := time.Now(), 0; time.Now().Sub(start) < maxConnectionRetryTime; i++ { + switch lastErr { + case io.EOF, db.ErrTooManyClients, db.ErrServerRefusedConnection, driver.ErrBadConn, db.ErrGivingUpTryingToConnect: + log.Printf("reconnect[%d]: Sleeping... %v", i, waitTime) + time.Sleep(waitTime) + + waitTime = waitTime * 2 + if waitTime > maxConnectionRetryInterval { + waitTime = maxConnectionRetryInterval + } + default: + // We don't know how to deal with this error. + log.Printf("reconnect[%d]: We don't know how to handle: %v (%T), %v (%T)", i, lastErr, lastErr, driver.ErrBadConn, driver.ErrBadConn) + return lastErr + } + log.Printf("reconnect[%d]: Attempt to reconnect...", i) + + // Attempt to reconnect. + atomic.StoreInt32(&d.reconnecting, 1) + err := d.connFn() + log.Printf("reconnect[%d]: connFn: %v", i, err) + if err == nil { + atomic.StoreInt32(&d.reconnecting, 0) + log.Printf("reconnect[%d]: Reconnected!", i) + return nil + } + atomic.StoreInt32(&d.reconnecting, 0) + + lastErr = d.PartialDatabase.Err(err) + } + + log.Printf("reconnect: Failed to reconnect") + return errAllAttemptsHaveFailed +} + // Session returns the underlying *sql.DB func (d *database) Session() *sql.DB { + d.reconnectMu.Lock() + defer d.reconnectMu.Unlock() + return d.sess } @@ -170,7 +230,12 @@ func (d *database) Name() string { // BindSession binds a *sql.DB into *database func (d *database) BindSession(sess *sql.DB) error { + log.Printf("bindSession") + d.sessMu.Lock() + if d.sess != nil { + d.sess.Close() + } d.sess = sess d.sessMu.Unlock() @@ -178,20 +243,22 @@ func (d *database) BindSession(sess *sql.DB) error { return err } + if atomic.LoadInt32(&d.reconnecting) == 1 { + return nil + } + d.sessID = newSessionID() name, err := d.PartialDatabase.FindDatabaseName() if err != nil { return err } - d.name = name return nil } -// recoverFromErr attempts to reestablish a connection after a recoverable -// error (like "bad driver"). Returns nil if the connection was reestablished, -// afther this the query can be retried. +// recoverFromErr attempts to reestablish a connection after a temporary error, +// returns nil if the connection was reestablished and the query can be retried. func (d *database) recoverFromErr(err error) error { if err == nil { return errNothingToRecoverFrom @@ -202,38 +269,14 @@ func (d *database) recoverFromErr(err error) error { return err } - d.recoverFromErrMu.Lock() - defer d.recoverFromErrMu.Unlock() - - waitTime := minConnectionRetryInterval - - // Attempt to stablish a new connection using the current session. - lastErr := d.PartialDatabase.Err(err) - for ts := time.Now(); time.Now().Sub(ts) < maxConnectionRetryTime; { - switch lastErr { - // According to database/sql, this is returned by an sql driver when the - // connection is in bad state and it should not be returned if there's a - // possibility that the database server might have performed the operation. - case driver.ErrBadConn: - time.Sleep(waitTime) // Wait a bit before retrying. - waitTime = waitTime * 2 // Double the previous waiting time. - if waitTime > maxConnectionRetryInterval { - // Won't wait more than maxConnectionRetryInterval - waitTime = maxConnectionRetryInterval - } - default: - // We don't know how to deal with this error, nor if we can recover - // from it, return the original error. - return err - } - lastErr = d.PartialDatabase.Err(d.Ping()) - if lastErr == nil { - return nil // Connection was reestablished. - } + switch err { + case io.EOF, db.ErrTooManyClients, db.ErrServerRefusedConnection, driver.ErrBadConn, db.ErrGivingUpTryingToConnect: + err = d.reconnect() + log.Printf("recoverFromErr: %v", err) + return err } - // Return original error - return err + return errUnableToRecover } // Ping checks whether a connection to the database is still alive by pinging @@ -307,6 +350,8 @@ func (d *database) Collection(name string) db.Collection { func (d *database) StatementExec(stmt *exql.Statement, args ...interface{}) (res sql.Result, err error) { var query string + queryID := newOperationID() + if db.Conf.LoggingEnabled() { defer func(start time.Time) { @@ -334,37 +379,41 @@ func (d *database) StatementExec(stmt *exql.Statement, args ...interface{}) (res }(time.Now()) } - var p *Stmt - if p, query, err = d.prepareStatement(stmt); err != nil { - return nil, err - } - defer p.Close() - + log.Printf("Exec[%d] start", queryID) for i := 0; i < maxQueryRetryAttempts; i++ { + var p *Stmt + if p, query, err = d.prepareStatement(stmt); err != nil { + log.Printf("prepareStatement[%d] (%d): %v", queryID, i, err) + goto fail + } + if execer, ok := d.PartialDatabase.(HasStatementExec); ok { res, err = execer.StatementExec(p.Stmt, args...) } else { res, err = p.Exec(args...) } + log.Printf("Exec[%d] (%d): %v", queryID, i, err) if err == nil { return res, nil // successful query } + + fail: if d.recoverFromErr(err) == nil { - continue // Connection was reestablished, retry. + continue // retry } - // We got another error from recoverFromErr that means it could not - // recover. - return res, err + return nil, err } - // All retry attempts failed. - return res, err + // All retry attempts failed, return res and err. + return nil, err } // StatementQuery compiles and executes a statement that returns rows. func (d *database) StatementQuery(stmt *exql.Statement, args ...interface{}) (rows *sql.Rows, err error) { var query string + queryID := newOperationID() + if db.Conf.LoggingEnabled() { defer func(start time.Time) { db.Log(&db.QueryStatus{ @@ -379,26 +428,29 @@ func (d *database) StatementQuery(stmt *exql.Statement, args ...interface{}) (ro }(time.Now()) } - var p *Stmt - if p, query, err = d.prepareStatement(stmt); err != nil { - return nil, err - } - defer p.Close() - + log.Printf("Query[%d] start", queryID) for i := 0; i < maxQueryRetryAttempts; i++ { + var p *Stmt + if p, query, err = d.prepareStatement(stmt); err != nil { + log.Printf("prepareStatement[%d] (%d): %v", queryID, i, err) + goto fail + } + rows, err = p.Query(args...) + log.Printf("Query[%d] (%d): %v", queryID, i, err) if err == nil { - return rows, err // this was a successful query + return rows, nil // successful query } + fail: if d.recoverFromErr(err) == nil { - continue // we can retry + continue // retry } - return rows, err + return nil, err } - // All retry attempts failed. - return rows, err + // All retry attempts failed, return rows and err. + return nil, err } // StatementQueryRow compiles and executes a statement that returns at most one @@ -406,6 +458,8 @@ func (d *database) StatementQuery(stmt *exql.Statement, args ...interface{}) (ro func (d *database) StatementQueryRow(stmt *exql.Statement, args ...interface{}) (row *sql.Row, err error) { var query string + queryID := newOperationID() + if db.Conf.LoggingEnabled() { defer func(start time.Time) { db.Log(&db.QueryStatus{ @@ -420,14 +474,24 @@ func (d *database) StatementQueryRow(stmt *exql.Statement, args ...interface{}) }(time.Now()) } - var p *Stmt - if p, query, err = d.prepareStatement(stmt); err != nil { + log.Printf("QueryRow[%d] start", queryID) + for i := 0; i < maxQueryRetryAttempts; i++ { + var p *Stmt + if p, query, err = d.prepareStatement(stmt); err != nil { + log.Printf("prepareStatement [%d] (%d): %v", queryID, i, err) + goto fail + } + + row, err = p.QueryRow(args...), nil + return row, err + fail: + if d.recoverFromErr(err) == nil { + continue // retry + } return nil, err } - defer p.Close() - row, err = p.QueryRow(args...), nil // We're setting row and err like this because they're going to be logged. - return row, err + return nil, err } // Driver returns the underlying *sql.DB or *sql.Tx instance. @@ -443,7 +507,7 @@ func (d *database) Driver() interface{} { // *sql.Stmt. This method will attempt to used a cached prepared statement, if // available. func (d *database) prepareStatement(stmt *exql.Statement) (*Stmt, string, error) { - if d.sess == nil && d.Transaction() == nil { + if d.Session() == nil && d.Transaction() == nil { return nil, "", db.ErrNotConnected } @@ -476,11 +540,10 @@ func (d *database) prepareStatement(stmt *exql.Statement) (*Stmt, string, error) var waitForConnMu sync.Mutex -// WaitForConnection tries to execute the given connectFn function, if -// connectFn returns an error, then WaitForConnection will keep trying until -// connectFn returns nil. Maximum waiting time is 5s after having acquired the -// lock. -func (d *database) WaitForConnection(connectFn func() error) error { +// WaitForConnection tries to execute the given connFn function, if connFn +// returns an error, then WaitForConnection will keep trying until connFn +// returns nil. Maximum waiting time is 5s after having acquired the lock. +func (d *database) WaitForConnection(connFn func() error) error { // This lock ensures first-come, first-served and prevents opening too many // file descriptors. waitForConnMu.Lock() @@ -491,8 +554,9 @@ func (d *database) WaitForConnection(connectFn func() error) error { // Waitig 5 seconds for a successful connection. for timeStart := time.Now(); time.Now().Sub(timeStart) < maxConnectionRetryTime; { - err := connectFn() + err := connFn() if err == nil { + d.connFn = connFn return nil // Connected! } @@ -502,9 +566,9 @@ func (d *database) WaitForConnection(connectFn func() error) error { // Sleep and try again if, and only if, the server replied with a // temporary error. time.Sleep(waitTime) - if waitTime < maxConnectionRetryInterval { - // Wait a bit more next time. - waitTime = waitTime * 2 + waitTime = waitTime * 2 + if waitTime > maxConnectionRetryInterval { + waitTime = maxConnectionRetryInterval } continue } @@ -553,3 +617,11 @@ func newTxID() uint64 { } return atomic.AddUint64(&lastTxID, 1) } + +func newOperationID() uint64 { + if atomic.LoadUint64(&lastOperationID) == math.MaxUint64 { + atomic.StoreUint64(&lastOperationID, 0) + return 0 + } + return atomic.AddUint64(&lastOperationID, 1) +} diff --git a/postgresql/database.go b/postgresql/database.go index 91c238fd..77a55c76 100644 --- a/postgresql/database.go +++ b/postgresql/database.go @@ -164,6 +164,9 @@ func (d *database) Err(err error) error { case strings.Contains(s, `connection refused`), strings.Contains(s, `reset by peer`), strings.Contains(s, `is starting up`), + strings.Contains(s, `is in recovery mode`), + strings.Contains(s, `could not read block`), + strings.Contains(s, `is closed`), strings.Contains(s, `is shutting down`): return db.ErrServerRefusedConnection } From 0f9bfe3ea7b06dffcdd24fcccdbf8a5f183e3198 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Tue, 25 Oct 2016 20:51:15 -0500 Subject: [PATCH 05/21] Better logic for handling connect/reconnect --- internal/sqladapter/database.go | 264 ++++++++++++++++++-------------- logger.go | 5 +- postgresql/database.go | 1 - 3 files changed, 150 insertions(+), 120 deletions(-) diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index 8836baee..03721f5a 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -18,8 +18,18 @@ import ( "upper.io/db.v2/lib/sqlbuilder" ) +// A list of errors that mean the server is not working and that we should +// try to connect and retry the query. +var recoverableErrors = []error{ + io.EOF, + driver.ErrBadConn, + db.ErrNotConnected, + db.ErrTooManyClients, + db.ErrServerRefusedConnection, +} + var ( - maxQueryRetryAttempts = 6 // Each retry adds a max wait time of maxConnectionRetryTime + maxQueryRetryAttempts = 2 // Each retry adds a max wait time of maxConnectionRetryTime minConnectionRetryInterval = time.Millisecond * 100 maxConnectionRetryInterval = time.Millisecond * 1000 @@ -28,9 +38,8 @@ var ( ) var ( - errNothingToRecoverFrom = errors.New("Nothing to recover from") - errUnableToRecover = errors.New("Unable to recover from this error") - errAllAttemptsHaveFailed = errors.New("All attempts to recover have failed") + errNothingToRecoverFrom = errors.New("Nothing to recover from") + errUnableToRecover = errors.New("Unable to recover from this error") ) var ( @@ -98,20 +107,26 @@ type BaseDatabase interface { // NewBaseDatabase provides a BaseDatabase given a PartialDatabase func NewBaseDatabase(p PartialDatabase) BaseDatabase { d := &database{ - PartialDatabase: p, + PartialDatabase: p, + cachedCollections: cache.NewCache(), cachedStatements: cache.NewCache(), + connFn: defaultConnFn, } return d } +var defaultConnFn = func() error { + return errors.New("No connection function was defined.") +} + // database is the actual implementation of Database and joins methods from // BaseDatabase and PartialDatabase type database struct { PartialDatabase baseTx BaseTx - reconnectMu sync.Mutex + connectMu sync.Mutex collectionMu sync.Mutex databaseMu sync.Mutex @@ -120,10 +135,9 @@ type database struct { name string - sess *sql.DB - sessErr error - sessMu sync.Mutex - reconnecting int32 + sess *sql.DB + sessErr error + sessMu sync.Mutex sessID uint64 txID uint64 @@ -139,59 +153,76 @@ var ( ) func (d *database) reconnect() error { - log.Printf("reconnect: wait...") - - d.reconnectMu.Lock() - defer d.reconnectMu.Unlock() + log.Printf("reconnect: start") + if d.Transaction() != nil { + // Don't even attempt to recover from within a transaction, this is not + // possible. + return errors.New("Can't recover from a transaction.") + } - lastErr := d.PartialDatabase.Err(d.Ping()) - if lastErr == nil { + err := d.PartialDatabase.Err(d.ping()) + if err == nil { log.Printf("reconnect: Conn is still there!") return nil } - log.Printf("reconnect: Ping: %v", lastErr) + log.Printf("reconnect: Ping: %v", err) + + return d.connect(d.connFn) +} + +func (d *database) connect(connFn func() error) error { + log.Printf("connect: start") + + if connFn == nil { + return errors.New("Missing connect function") + } + + log.Printf("connect: wait...") + + d.connectMu.Lock() + defer d.connectMu.Unlock() waitTime := minConnectionRetryInterval for start, i := time.Now(), 0; time.Now().Sub(start) < maxConnectionRetryTime; i++ { - switch lastErr { - case io.EOF, db.ErrTooManyClients, db.ErrServerRefusedConnection, driver.ErrBadConn, db.ErrGivingUpTryingToConnect: - log.Printf("reconnect[%d]: Sleeping... %v", i, waitTime) + waitTime = time.Duration(i) * minConnectionRetryInterval + if waitTime > maxConnectionRetryInterval { + waitTime = maxConnectionRetryInterval + } + // Wait a bit until retrying. + if waitTime > time.Duration(0) { + log.Printf("connect[%d]: Wait %v", i, waitTime) time.Sleep(waitTime) - - waitTime = waitTime * 2 - if waitTime > maxConnectionRetryInterval { - waitTime = maxConnectionRetryInterval - } - default: - // We don't know how to deal with this error. - log.Printf("reconnect[%d]: We don't know how to handle: %v (%T), %v (%T)", i, lastErr, lastErr, driver.ErrBadConn, driver.ErrBadConn) - return lastErr } - log.Printf("reconnect[%d]: Attempt to reconnect...", i) - // Attempt to reconnect. - atomic.StoreInt32(&d.reconnecting, 1) - err := d.connFn() - log.Printf("reconnect[%d]: connFn: %v", i, err) + // Attempt to (re)connect + err := connFn() + log.Printf("connect[%d]: connFn: %v", i, err) if err == nil { - atomic.StoreInt32(&d.reconnecting, 0) - log.Printf("reconnect[%d]: Reconnected!", i) + log.Printf("connect[%d]: connected!", i) + d.connFn = connFn return nil } - atomic.StoreInt32(&d.reconnecting, 0) - lastErr = d.PartialDatabase.Err(err) + if !d.isRecoverableError(err) { + log.Printf("connect[%d]: We don't know how to handle: %v", i, err) + return err + } } - log.Printf("reconnect: Failed to reconnect") - return errAllAttemptsHaveFailed + log.Printf("connect: Failed to connect") + return db.ErrGivingUpTryingToConnect } // Session returns the underlying *sql.DB func (d *database) Session() *sql.DB { - d.reconnectMu.Lock() - defer d.reconnectMu.Unlock() + if atomic.LoadUint64(&d.sessID) == 0 { + return d.sess + } + + // Can't return while connecting. + d.connectMu.Lock() + defer d.connectMu.Unlock() return d.sess } @@ -201,11 +232,16 @@ func (d *database) BindTx(t *sql.Tx) error { d.sessMu.Lock() defer d.sessMu.Unlock() + log.Printf("BindTx") + d.baseTx = newTx(t) if err := d.Ping(); err != nil { + log.Printf("BindTx: err: %v", err) return err } + log.Printf("BindTx: OK") + d.txID = newTxID() return nil } @@ -228,35 +264,53 @@ func (d *database) Name() string { return d.name } +func (d *database) getDBName() error { + name, err := d.PartialDatabase.FindDatabaseName() + if err != nil { + return err + } + d.name = name + return nil +} + // BindSession binds a *sql.DB into *database func (d *database) BindSession(sess *sql.DB) error { - log.Printf("bindSession") + if err := sess.Ping(); err != nil { + return err + } d.sessMu.Lock() if d.sess != nil { - d.sess.Close() + d.sess.Close() // Close before rebind. } d.sess = sess d.sessMu.Unlock() - if err := d.Ping(); err != nil { - return err - } - - if atomic.LoadInt32(&d.reconnecting) == 1 { + // Does this session already have a session ID? + if atomic.LoadUint64(&d.sessID) != 0 { return nil } - d.sessID = newSessionID() - name, err := d.PartialDatabase.FindDatabaseName() - if err != nil { + // Is this connection really working? + if err := d.getDBName(); err != nil { return err } - d.name = name + // Assign an ID if everyting was OK. + d.sessID = newSessionID() return nil } +func (d *database) isRecoverableError(err error) bool { + err = d.PartialDatabase.Err(err) + for i := 0; i < len(recoverableErrors); i++ { + if err == recoverableErrors[i] { + return true + } + } + return false +} + // recoverFromErr attempts to reestablish a connection after a temporary error, // returns nil if the connection was reestablished and the query can be retried. func (d *database) recoverFromErr(err error) error { @@ -264,28 +318,30 @@ func (d *database) recoverFromErr(err error) error { return errNothingToRecoverFrom } - if d.Transaction() != nil { - // Don't even attempt to recover from within a transaction. - return err - } - - switch err { - case io.EOF, db.ErrTooManyClients, db.ErrServerRefusedConnection, driver.ErrBadConn, db.ErrGivingUpTryingToConnect: - err = d.reconnect() + if d.isRecoverableError(err) { + err := d.reconnect() log.Printf("recoverFromErr: %v", err) return err } + // This is not an error we can recover from. return errUnableToRecover } // Ping checks whether a connection to the database is still alive by pinging // it func (d *database) Ping() error { - if d.sess != nil { - return d.sess.Ping() + if sess := d.Session(); sess != nil { + return sess.Ping() } - return nil + return db.ErrNotConnected +} + +func (d *database) ping() error { + if d.sess == nil { + return db.ErrNotConnected + } + return d.sess.Ping() } // ClearCache removes all caches. @@ -356,13 +412,14 @@ func (d *database) StatementExec(stmt *exql.Statement, args ...interface{}) (res defer func(start time.Time) { status := db.QueryStatus{ - TxID: d.txID, - SessID: d.sessID, - Query: query, - Args: args, - Err: err, - Start: start, - End: time.Now(), + TxID: d.txID, + SessID: d.sessID, + QueryID: queryID, + Query: query, + Args: args, + Err: err, + Start: start, + End: time.Now(), } if res != nil { @@ -417,13 +474,14 @@ func (d *database) StatementQuery(stmt *exql.Statement, args ...interface{}) (ro if db.Conf.LoggingEnabled() { defer func(start time.Time) { db.Log(&db.QueryStatus{ - TxID: d.txID, - SessID: d.sessID, - Query: query, - Args: args, - Err: err, - Start: start, - End: time.Now(), + TxID: d.txID, + SessID: d.sessID, + QueryID: queryID, + Query: query, + Args: args, + Err: err, + Start: start, + End: time.Now(), }) }(time.Now()) } @@ -463,13 +521,14 @@ func (d *database) StatementQueryRow(stmt *exql.Statement, args ...interface{}) if db.Conf.LoggingEnabled() { defer func(start time.Time) { db.Log(&db.QueryStatus{ - TxID: d.txID, - SessID: d.sessID, - Query: query, - Args: args, - Err: err, - Start: start, - End: time.Now(), + TxID: d.txID, + SessID: d.sessID, + QueryID: queryID, + Query: query, + Args: args, + Err: err, + Start: start, + End: time.Now(), }) }(time.Now()) } @@ -544,40 +603,11 @@ var waitForConnMu sync.Mutex // returns an error, then WaitForConnection will keep trying until connFn // returns nil. Maximum waiting time is 5s after having acquired the lock. func (d *database) WaitForConnection(connFn func() error) error { - // This lock ensures first-come, first-served and prevents opening too many - // file descriptors. - waitForConnMu.Lock() - defer waitForConnMu.Unlock() - - // Minimum waiting time. - waitTime := minConnectionRetryInterval - - // Waitig 5 seconds for a successful connection. - for timeStart := time.Now(); time.Now().Sub(timeStart) < maxConnectionRetryTime; { - err := connFn() - if err == nil { - d.connFn = connFn - return nil // Connected! - } - - // Only attempt to reconnect if the error is too many clients. - switch d.PartialDatabase.Err(err) { - case db.ErrTooManyClients, db.ErrServerRefusedConnection, driver.ErrBadConn: - // Sleep and try again if, and only if, the server replied with a - // temporary error. - time.Sleep(waitTime) - waitTime = waitTime * 2 - if waitTime > maxConnectionRetryInterval { - waitTime = maxConnectionRetryInterval - } - continue - } - - // Return any other error immediately. + if err := d.connect(connFn); err != nil { return err } - - return db.ErrGivingUpTryingToConnect + // Success. + return nil } // ReplaceWithDollarSign turns a SQL statament with '?' placeholders into diff --git a/logger.go b/logger.go index 1703fb58..4f8d949f 100644 --- a/logger.go +++ b/logger.go @@ -47,8 +47,9 @@ var ( // QueryStatus represents the status of a query after being executed. type QueryStatus struct { - SessID uint64 - TxID uint64 + SessID uint64 + TxID uint64 + QueryID uint64 RowsAffected *int64 LastInsertID *int64 diff --git a/postgresql/database.go b/postgresql/database.go index 77a55c76..2fa7f74a 100644 --- a/postgresql/database.go +++ b/postgresql/database.go @@ -206,7 +206,6 @@ func (d *database) NewLocalTransaction() (sqladapter.DatabaseTx, error) { if err := d.BaseDatabase.WaitForConnection(connFn); err != nil { return nil, err } - return sqladapter.NewTx(clone), nil } From 4af4871c6c061d86776b48fe8c9fac5f605292bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Thu, 27 Oct 2016 12:20:03 -0500 Subject: [PATCH 06/21] Cleaning up remaining reconnection bits --- internal/sqladapter/database.go | 201 ++++++++++++++++++-------------- 1 file changed, 112 insertions(+), 89 deletions(-) diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index 03721f5a..73f8b552 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -5,7 +5,7 @@ import ( "database/sql/driver" "errors" "io" - "log" + //"log" "math" "strconv" "sync" @@ -29,12 +29,22 @@ var recoverableErrors = []error{ } var ( - maxQueryRetryAttempts = 2 // Each retry adds a max wait time of maxConnectionRetryTime + // If a query fails with a recoverable error the connection is going to be + // re-estalished and the query can be retried, each retry adds a max wait + // time of maxConnectionRetryTime + maxQueryRetryAttempts = 2 + // Minimum interval when trying to reconnect. minConnectionRetryInterval = time.Millisecond * 100 - maxConnectionRetryInterval = time.Millisecond * 1000 + // Maximum interval when trying to reconnect. + maxConnectionRetryInterval = time.Millisecond * 2500 + + // Maximun time each connection retry attempt can take. maxConnectionRetryTime = time.Second * 20 + + // Maximun reconnection attempts before the session gives up. + maxReconnectionAttempts uint64 = 3 ) var ( @@ -126,10 +136,8 @@ type database struct { PartialDatabase baseTx BaseTx - connectMu sync.Mutex - + connectMu sync.Mutex collectionMu sync.Mutex - databaseMu sync.Mutex connFn func() error @@ -142,6 +150,8 @@ type database struct { sessID uint64 txID uint64 + connectAttempts uint64 + cachedStatements *cache.Cache cachedCollections *cache.Cache @@ -153,35 +163,39 @@ var ( ) func (d *database) reconnect() error { - log.Printf("reconnect: start") + //log.Printf("reconnect: start") if d.Transaction() != nil { // Don't even attempt to recover from within a transaction, this is not // possible. - return errors.New("Can't recover from a transaction.") + return errors.New("Can't recover from within a bad transaction.") } - err := d.PartialDatabase.Err(d.ping()) + err := d.PartialDatabase.Err(d.Ping()) if err == nil { - log.Printf("reconnect: Conn is still there!") + //log.Printf("reconnect: Conn is still there!") return nil } - log.Printf("reconnect: Ping: %v", err) + //log.Printf("reconnect: Ping: %v", err) return d.connect(d.connFn) } func (d *database) connect(connFn func() error) error { - log.Printf("connect: start") + //log.Printf("connect: start") if connFn == nil { return errors.New("Missing connect function") } - log.Printf("connect: wait...") - + //log.Printf("connect: wait...") d.connectMu.Lock() defer d.connectMu.Unlock() + // Attempt to (re)connect + if atomic.AddUint64(&d.connectAttempts, 1) >= maxReconnectionAttempts { + return db.ErrGivingUpTryingToConnect + } + waitTime := minConnectionRetryInterval for start, i := time.Now(), 0; time.Now().Sub(start) < maxConnectionRetryTime; i++ { @@ -191,36 +205,39 @@ func (d *database) connect(connFn func() error) error { } // Wait a bit until retrying. if waitTime > time.Duration(0) { - log.Printf("connect[%d]: Wait %v", i, waitTime) + //log.Printf("connect[%d]: Wait %v", i, waitTime) time.Sleep(waitTime) } - // Attempt to (re)connect err := connFn() - log.Printf("connect[%d]: connFn: %v", i, err) + //log.Printf("connect[%d]: connFn: %v", i, err) if err == nil { - log.Printf("connect[%d]: connected!", i) + atomic.StoreUint64(&d.connectAttempts, 0) + //log.Printf("connect[%d]: connected!", i) d.connFn = connFn return nil } if !d.isRecoverableError(err) { - log.Printf("connect[%d]: We don't know how to handle: %v", i, err) + //log.Printf("connect[%d]: We don't know how to handle: %v", i, err) return err } } - log.Printf("connect: Failed to connect") + //log.Printf("connect: Failed to connect") return db.ErrGivingUpTryingToConnect } // Session returns the underlying *sql.DB func (d *database) Session() *sql.DB { if atomic.LoadUint64(&d.sessID) == 0 { + // This means the session is connecting for the first time, in this case we + // don't block because the session hasn't been returned yet. return d.sess } - // Can't return while connecting. + // Prevents goroutines from using the session until the connection is + // re-established. d.connectMu.Lock() defer d.connectMu.Unlock() @@ -232,15 +249,15 @@ func (d *database) BindTx(t *sql.Tx) error { d.sessMu.Lock() defer d.sessMu.Unlock() - log.Printf("BindTx") + //log.Printf("BindTx") d.baseTx = newTx(t) if err := d.Ping(); err != nil { - log.Printf("BindTx: err: %v", err) + //log.Printf("BindTx: err: %v", err) return err } - log.Printf("BindTx: OK") + //log.Printf("BindTx: OK") d.txID = newTxID() return nil @@ -249,18 +266,14 @@ func (d *database) BindTx(t *sql.Tx) error { // Tx returns a BaseTx, which, if not nil, means that this session is within a // transaction func (d *database) Transaction() BaseTx { + d.sessMu.Lock() + defer d.sessMu.Unlock() + return d.baseTx } // Name returns the database named func (d *database) Name() string { - d.databaseMu.Lock() - defer d.databaseMu.Unlock() - - if d.name == "" { - d.name, _ = d.PartialDatabase.FindDatabaseName() - } - return d.name } @@ -320,7 +333,7 @@ func (d *database) recoverFromErr(err error) error { if d.isRecoverableError(err) { err := d.reconnect() - log.Printf("recoverFromErr: %v", err) + //log.Printf("recoverFromErr: %v", err) return err } @@ -337,17 +350,11 @@ func (d *database) Ping() error { return db.ErrNotConnected } -func (d *database) ping() error { - if d.sess == nil { - return db.ErrNotConnected - } - return d.sess.Ping() -} - // ClearCache removes all caches. func (d *database) ClearCache() { d.collectionMu.Lock() defer d.collectionMu.Unlock() + d.cachedCollections.Clear() d.cachedStatements.Clear() if d.template != nil { @@ -363,7 +370,7 @@ func (d *database) Close() error { d.baseTx = nil d.sessMu.Unlock() }() - if d.sess != nil { + if sess := d.Session(); sess != nil { if cleaner, ok := d.PartialDatabase.(HasCleanUp); ok { cleaner.CleanUp() } @@ -376,6 +383,7 @@ func (d *database) Close() error { return d.sess.Close() } + // Don't close the parent session if within a transaction. if !tx.Committed() { tx.Rollback() } @@ -390,9 +398,9 @@ func (d *database) Collection(name string) db.Collection { h := cache.String(name) - ccol, ok := d.cachedCollections.ReadRaw(h) + cachedCollection, ok := d.cachedCollections.ReadRaw(h) if ok { - return ccol.(db.Collection) + return cachedCollection.(db.Collection) } col := d.PartialDatabase.NewLocalCollection(name) @@ -401,6 +409,21 @@ func (d *database) Collection(name string) db.Collection { return col } +func (d *database) prepareAndExec(stmt *exql.Statement, args ...interface{}) (string, sql.Result, error) { + p, query, err := d.prepareStatement(stmt) + if err != nil { + return query, nil, err + } + + if execer, ok := d.PartialDatabase.(HasStatementExec); ok { + res, err := execer.StatementExec(p.Stmt, args...) + return query, res, err + } + + res, err := p.Exec(args...) + return query, res, err +} + // StatementExec compiles and executes a statement that does not return any // rows. func (d *database) StatementExec(stmt *exql.Statement, args ...interface{}) (res sql.Result, err error) { @@ -410,7 +433,6 @@ func (d *database) StatementExec(stmt *exql.Statement, args ...interface{}) (res if db.Conf.LoggingEnabled() { defer func(start time.Time) { - status := db.QueryStatus{ TxID: d.txID, SessID: d.sessID, @@ -436,25 +458,14 @@ func (d *database) StatementExec(stmt *exql.Statement, args ...interface{}) (res }(time.Now()) } - log.Printf("Exec[%d] start", queryID) + //log.Printf("Exec[%d] start", queryID) for i := 0; i < maxQueryRetryAttempts; i++ { - var p *Stmt - if p, query, err = d.prepareStatement(stmt); err != nil { - log.Printf("prepareStatement[%d] (%d): %v", queryID, i, err) - goto fail - } - - if execer, ok := d.PartialDatabase.(HasStatementExec); ok { - res, err = execer.StatementExec(p.Stmt, args...) - } else { - res, err = p.Exec(args...) - } - log.Printf("Exec[%d] (%d): %v", queryID, i, err) + query, res, err = d.prepareAndExec(stmt, args...) if err == nil { - return res, nil // successful query + return res, nil // success } - fail: + //log.Printf("exec[%d] (%d): %v", queryID, i, err) if d.recoverFromErr(err) == nil { continue // retry } @@ -465,6 +476,16 @@ func (d *database) StatementExec(stmt *exql.Statement, args ...interface{}) (res return nil, err } +func (d *database) prepareAndQuery(stmt *exql.Statement, args ...interface{}) (string, *sql.Rows, error) { + p, query, err := d.prepareStatement(stmt) + if err != nil { + return query, nil, err + } + + rows, err := p.Query(args...) + return query, rows, nil +} + // StatementQuery compiles and executes a statement that returns rows. func (d *database) StatementQuery(stmt *exql.Statement, args ...interface{}) (rows *sql.Rows, err error) { var query string @@ -486,21 +507,14 @@ func (d *database) StatementQuery(stmt *exql.Statement, args ...interface{}) (ro }(time.Now()) } - log.Printf("Query[%d] start", queryID) + //log.Printf("Query[%d] start", queryID) for i := 0; i < maxQueryRetryAttempts; i++ { - var p *Stmt - if p, query, err = d.prepareStatement(stmt); err != nil { - log.Printf("prepareStatement[%d] (%d): %v", queryID, i, err) - goto fail - } - - rows, err = p.Query(args...) - log.Printf("Query[%d] (%d): %v", queryID, i, err) + query, rows, err = d.prepareAndQuery(stmt, args...) if err == nil { - return rows, nil // successful query + return rows, err // success } - fail: + //log.Printf("query[%d] (%d): %v", queryID, i, err) if d.recoverFromErr(err) == nil { continue // retry } @@ -511,6 +525,18 @@ func (d *database) StatementQuery(stmt *exql.Statement, args ...interface{}) (ro return nil, err } +func (d *database) prepareAndQueryRow(stmt *exql.Statement, args ...interface{}) (string, *sql.Row, error) { + p, query, err := d.prepareStatement(stmt) + if err != nil { + return query, nil, err + } + + // Would be nice to find a way to check if this succeeded before using + // Scan. + rows, err := p.QueryRow(args...), nil + return query, rows, nil +} + // StatementQueryRow compiles and executes a statement that returns at most one // row. func (d *database) StatementQueryRow(stmt *exql.Statement, args ...interface{}) (row *sql.Row, err error) { @@ -533,17 +559,14 @@ func (d *database) StatementQueryRow(stmt *exql.Statement, args ...interface{}) }(time.Now()) } - log.Printf("QueryRow[%d] start", queryID) + //log.Printf("QueryRow[%d] start", queryID) for i := 0; i < maxQueryRetryAttempts; i++ { - var p *Stmt - if p, query, err = d.prepareStatement(stmt); err != nil { - log.Printf("prepareStatement [%d] (%d): %v", queryID, i, err) - goto fail + query, row, err = d.prepareAndQueryRow(stmt, args...) + if err == nil { + return row, nil // success } - row, err = p.QueryRow(args...), nil - return row, err - fail: + //log.Printf("queryRow[%d] (%d): %v", queryID, i, err) if d.recoverFromErr(err) == nil { continue // retry } @@ -556,10 +579,9 @@ func (d *database) StatementQueryRow(stmt *exql.Statement, args ...interface{}) // Driver returns the underlying *sql.DB or *sql.Tx instance. func (d *database) Driver() interface{} { if tx := d.Transaction(); tx != nil { - // A transaction return tx.(*sqlTx).Tx } - return d.sess + return d.Session() } // prepareStatement converts a *exql.Statement representation into an actual @@ -572,19 +594,20 @@ func (d *database) prepareStatement(stmt *exql.Statement) (*Stmt, string, error) pc, ok := d.cachedStatements.ReadRaw(stmt) if ok { - // The statement was cached. + // This prepared statement was cached, no need to build or to prepare + // again. ps, err := pc.(*Stmt).Open() if err == nil { return ps, ps.query, nil } } - // Plain SQL query. + // Building the actual SQL query. query := d.PartialDatabase.CompileStatement(stmt) sqlStmt, err := func() (*sql.Stmt, error) { - if d.Transaction() != nil { - return d.Transaction().(*sqlTx).Prepare(query) + if tx := d.Transaction(); tx != nil { + return tx.(*sqlTx).Prepare(query) } return d.sess.Prepare(query) }() @@ -634,24 +657,24 @@ func ReplaceWithDollarSign(in string) string { func newSessionID() uint64 { if atomic.LoadUint64(&lastSessID) == math.MaxUint64 { - atomic.StoreUint64(&lastSessID, 0) - return 0 + atomic.StoreUint64(&lastSessID, 1) + return 1 } return atomic.AddUint64(&lastSessID, 1) } func newTxID() uint64 { if atomic.LoadUint64(&lastTxID) == math.MaxUint64 { - atomic.StoreUint64(&lastTxID, 0) - return 0 + atomic.StoreUint64(&lastTxID, 1) + return 1 } return atomic.AddUint64(&lastTxID, 1) } func newOperationID() uint64 { if atomic.LoadUint64(&lastOperationID) == math.MaxUint64 { - atomic.StoreUint64(&lastOperationID, 0) - return 0 + atomic.StoreUint64(&lastOperationID, 1) + return 1 } return atomic.AddUint64(&lastOperationID, 1) } From f08946e8a17d5f66321a9f7d3603789a75cb0850 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Thu, 3 Nov 2016 19:03:53 -0600 Subject: [PATCH 07/21] Removing log messages and adding a test for bad connection. --- internal/sqladapter/database.go | 91 ++++++++++++++------------------- lib/sqlbuilder/fetch.go | 3 ++ postgresql/local_test.go | 43 ++++++++++++++++ 3 files changed, 84 insertions(+), 53 deletions(-) diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index 73f8b552..4531f73f 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -5,7 +5,6 @@ import ( "database/sql/driver" "errors" "io" - //"log" "math" "strconv" "sync" @@ -32,7 +31,7 @@ var ( // If a query fails with a recoverable error the connection is going to be // re-estalished and the query can be retried, each retry adds a max wait // time of maxConnectionRetryTime - maxQueryRetryAttempts = 2 + maxQueryRetryAttempts = 3 // Minimum interval when trying to reconnect. minConnectionRetryInterval = time.Millisecond * 100 @@ -43,8 +42,8 @@ var ( // Maximun time each connection retry attempt can take. maxConnectionRetryTime = time.Second * 20 - // Maximun reconnection attempts before the session gives up. - maxReconnectionAttempts uint64 = 3 + // Maximun reconnection attempts per session before giving up. + maxReconnectionAttempts uint64 = 6 ) var ( @@ -163,7 +162,6 @@ var ( ) func (d *database) reconnect() error { - //log.Printf("reconnect: start") if d.Transaction() != nil { // Don't even attempt to recover from within a transaction, this is not // possible. @@ -172,22 +170,17 @@ func (d *database) reconnect() error { err := d.PartialDatabase.Err(d.Ping()) if err == nil { - //log.Printf("reconnect: Conn is still there!") return nil } - //log.Printf("reconnect: Ping: %v", err) return d.connect(d.connFn) } func (d *database) connect(connFn func() error) error { - //log.Printf("connect: start") - if connFn == nil { return errors.New("Missing connect function") } - //log.Printf("connect: wait...") d.connectMu.Lock() defer d.connectMu.Unlock() @@ -198,33 +191,28 @@ func (d *database) connect(connFn func() error) error { waitTime := minConnectionRetryInterval - for start, i := time.Now(), 0; time.Now().Sub(start) < maxConnectionRetryTime; i++ { + for start, i := time.Now(), 1; time.Now().Sub(start) < maxConnectionRetryTime; i++ { waitTime = time.Duration(i) * minConnectionRetryInterval if waitTime > maxConnectionRetryInterval { waitTime = maxConnectionRetryInterval } // Wait a bit until retrying. if waitTime > time.Duration(0) { - //log.Printf("connect[%d]: Wait %v", i, waitTime) time.Sleep(waitTime) } err := connFn() - //log.Printf("connect[%d]: connFn: %v", i, err) if err == nil { atomic.StoreUint64(&d.connectAttempts, 0) - //log.Printf("connect[%d]: connected!", i) d.connFn = connFn return nil } if !d.isRecoverableError(err) { - //log.Printf("connect[%d]: We don't know how to handle: %v", i, err) return err } } - //log.Printf("connect: Failed to connect") return db.ErrGivingUpTryingToConnect } @@ -249,16 +237,11 @@ func (d *database) BindTx(t *sql.Tx) error { d.sessMu.Lock() defer d.sessMu.Unlock() - //log.Printf("BindTx") - d.baseTx = newTx(t) if err := d.Ping(); err != nil { - //log.Printf("BindTx: err: %v", err) return err } - //log.Printf("BindTx: OK") - d.txID = newTxID() return nil } @@ -294,6 +277,7 @@ func (d *database) BindSession(sess *sql.DB) error { d.sessMu.Lock() if d.sess != nil { + d.ClearCache() d.sess.Close() // Close before rebind. } d.sess = sess @@ -333,7 +317,6 @@ func (d *database) recoverFromErr(err error) error { if d.isRecoverableError(err) { err := d.reconnect() - //log.Printf("recoverFromErr: %v", err) return err } @@ -345,7 +328,17 @@ func (d *database) recoverFromErr(err error) error { // it func (d *database) Ping() error { if sess := d.Session(); sess != nil { - return sess.Ping() + if err := sess.Ping(); err != nil { + return err + } + + _, err := sess.Exec("SELECT 1") + if err != nil { + return err + } + + return nil + } return db.ErrNotConnected } @@ -458,22 +451,19 @@ func (d *database) StatementExec(stmt *exql.Statement, args ...interface{}) (res }(time.Now()) } - //log.Printf("Exec[%d] start", queryID) - for i := 0; i < maxQueryRetryAttempts; i++ { + for i := 0; ; i++ { query, res, err = d.prepareAndExec(stmt, args...) - if err == nil { - return res, nil // success + if err == nil || i >= maxQueryRetryAttempts { + return res, err } - //log.Printf("exec[%d] (%d): %v", queryID, i, err) - if d.recoverFromErr(err) == nil { - continue // retry + // Try to recover + if recoverErr := d.recoverFromErr(err); recoverErr != nil { + return nil, err // Unable to recover. } - return nil, err } - // All retry attempts failed, return res and err. - return nil, err + panic("reached") } func (d *database) prepareAndQuery(stmt *exql.Statement, args ...interface{}) (string, *sql.Rows, error) { @@ -483,7 +473,7 @@ func (d *database) prepareAndQuery(stmt *exql.Statement, args ...interface{}) (s } rows, err := p.Query(args...) - return query, rows, nil + return query, rows, err } // StatementQuery compiles and executes a statement that returns rows. @@ -507,22 +497,19 @@ func (d *database) StatementQuery(stmt *exql.Statement, args ...interface{}) (ro }(time.Now()) } - //log.Printf("Query[%d] start", queryID) - for i := 0; i < maxQueryRetryAttempts; i++ { + for i := 0; ; i++ { query, rows, err = d.prepareAndQuery(stmt, args...) - if err == nil { - return rows, err // success + if err == nil || i >= maxQueryRetryAttempts { + return rows, err } - //log.Printf("query[%d] (%d): %v", queryID, i, err) - if d.recoverFromErr(err) == nil { - continue // retry + // Try to recover + if recoverErr := d.recoverFromErr(err); recoverErr != nil { + return nil, err // Unable to recover. } - return nil, err } - // All retry attempts failed, return rows and err. - return nil, err + panic("reached") } func (d *database) prepareAndQueryRow(stmt *exql.Statement, args ...interface{}) (string, *sql.Row, error) { @@ -559,21 +546,19 @@ func (d *database) StatementQueryRow(stmt *exql.Statement, args ...interface{}) }(time.Now()) } - //log.Printf("QueryRow[%d] start", queryID) - for i := 0; i < maxQueryRetryAttempts; i++ { + for i := 0; ; i++ { query, row, err = d.prepareAndQueryRow(stmt, args...) - if err == nil { - return row, nil // success + if err == nil || i >= maxQueryRetryAttempts { + return row, err } - //log.Printf("queryRow[%d] (%d): %v", queryID, i, err) - if d.recoverFromErr(err) == nil { - continue // retry + // Try to recover + if recoverErr := d.recoverFromErr(err); recoverErr != nil { + return nil, err // Unable to recover. } - return nil, err } - return nil, err + panic("reached") } // Driver returns the underlying *sql.DB or *sql.Tx instance. diff --git a/lib/sqlbuilder/fetch.go b/lib/sqlbuilder/fetch.go index b5bf577f..e3f6b1fa 100644 --- a/lib/sqlbuilder/fetch.go +++ b/lib/sqlbuilder/fetch.go @@ -81,6 +81,9 @@ func fetchRow(rows *sql.Rows, dst interface{}) error { // slice of structs given by the pointer `dst`. func fetchRows(rows *sql.Rows, dst interface{}) error { var err error + if rows == nil { + panic("rows cannot be nil") + } defer rows.Close() diff --git a/postgresql/local_test.go b/postgresql/local_test.go index b09e86ad..b54682bf 100644 --- a/postgresql/local_test.go +++ b/postgresql/local_test.go @@ -2,6 +2,8 @@ package postgresql import ( "database/sql" + "os" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -117,3 +119,44 @@ func TestIssue210(t *testing.T) { _, err = sess.Collection("hello").Find().Count() assert.NoError(t, err) } + +// The "driver: bad connection" problem (driver.ErrBadConn) happens when the +// database process is abnormally interrupted, this can happen for a variety of +// reasons, for instance when the database process is OOM killed by the OS. +func TestDriverBadConnection(t *testing.T) { + sess := mustOpen() + defer sess.Close() + + var tMu sync.Mutex + tFatal := func(err error) { + tMu.Lock() + defer tMu.Unlock() + t.Fatal(err) + } + + const concurrentOpts = 20 + var wg sync.WaitGroup + + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + _, err := sess.Collection("artist").Find().Count() + if err != nil { + tFatal(err) + } + }(i) + if i%concurrentOpts == (concurrentOpts - 1) { + // This triggers the "bad connection" problem, if you want to see that + // instead of retrying the query set maxQueryRetryAttempts to 0. + wg.Add(1) + go func() { + defer wg.Done() + sess.Query("SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND datname = ?", os.Getenv("DB_NAME")) + }() + wg.Wait() + } + } + + wg.Wait() +} From e523258139cd66f8d56cde33da9d21dd7379b097 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Thu, 3 Nov 2016 19:12:19 -0600 Subject: [PATCH 08/21] Update docstrings --- internal/sqladapter/database.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index 4531f73f..908a16f4 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -33,17 +33,17 @@ var ( // time of maxConnectionRetryTime maxQueryRetryAttempts = 3 - // Minimum interval when trying to reconnect. + // Minimum interval when waiting before trying to reconnect. minConnectionRetryInterval = time.Millisecond * 100 - // Maximum interval when trying to reconnect. + // Maximum interval when waiting before trying to reconnect. maxConnectionRetryInterval = time.Millisecond * 2500 // Maximun time each connection retry attempt can take. - maxConnectionRetryTime = time.Second * 20 + maxConnectionRetryTime = time.Second * 5 // Maximun reconnection attempts per session before giving up. - maxReconnectionAttempts uint64 = 6 + maxReconnectionAttempts uint64 = 12 ) var ( From 2a02240baae94ba041a783da873f72e902c74a67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Thu, 3 Nov 2016 19:17:07 -0600 Subject: [PATCH 09/21] Remove "could not read block" from db.ErrServerRefusedConnection --- postgresql/database.go | 1 - 1 file changed, 1 deletion(-) diff --git a/postgresql/database.go b/postgresql/database.go index 2fa7f74a..2b89919b 100644 --- a/postgresql/database.go +++ b/postgresql/database.go @@ -165,7 +165,6 @@ func (d *database) Err(err error) error { strings.Contains(s, `reset by peer`), strings.Contains(s, `is starting up`), strings.Contains(s, `is in recovery mode`), - strings.Contains(s, `could not read block`), strings.Contains(s, `is closed`), strings.Contains(s, `is shutting down`): return db.ErrServerRefusedConnection From cfb43e3e098dc07e4a22f25182d611e57603273e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Thu, 3 Nov 2016 20:15:01 -0600 Subject: [PATCH 10/21] Ping within a transaction. --- internal/sqladapter/database.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index 908a16f4..a26ce922 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -336,9 +336,11 @@ func (d *database) Ping() error { if err != nil { return err } - return nil - + } + if tx := d.Transaction(); tx != nil { + // When upper wraps a transaction with no original session. + return nil } return db.ErrNotConnected } From 3538ba3ad98430dfce3c46b5b8ed27d8d63ea272 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Thu, 3 Nov 2016 20:25:49 -0600 Subject: [PATCH 11/21] Don't block Transaction() before connecting. --- internal/sqladapter/database.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index a26ce922..13af8e22 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -249,6 +249,12 @@ func (d *database) BindTx(t *sql.Tx) error { // Tx returns a BaseTx, which, if not nil, means that this session is within a // transaction func (d *database) Transaction() BaseTx { + if atomic.LoadUint64(&d.sessID) == 0 { + // This means the session is connecting for the first time, in this case we + // don't block because the session hasn't been returned yet. + return d.baseTx + } + d.sessMu.Lock() defer d.sessMu.Unlock() From ec0678db890e109ecb365e11f3c99e4abe7c619d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Tue, 8 Nov 2016 16:17:55 -0600 Subject: [PATCH 12/21] Use db.ErrTooManyReconnectionAttempts instead of db.ErrGivingUpTryingToConnect --- errors.go | 47 +++++++++++++++++---------------- internal/sqladapter/database.go | 2 +- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/errors.go b/errors.go index b140efbe..4314c91b 100644 --- a/errors.go +++ b/errors.go @@ -27,27 +27,28 @@ import ( // Error messages. var ( - ErrNoMoreRows = errors.New(`upper: no more rows in this result set`) - ErrNotConnected = errors.New(`upper: you're currently not connected`) - ErrMissingDatabaseName = errors.New(`upper: missing database name`) - ErrMissingCollectionName = errors.New(`upper: missing collection name`) - ErrCollectionDoesNotExist = errors.New(`upper: collection does not exist`) - ErrSockerOrHost = errors.New(`upper: you may connect either to a unix socket or a tcp address, but not both`) - ErrQueryLimitParam = errors.New(`upper: a query can accept only one limit parameter`) - ErrQuerySortParam = errors.New(`upper: a query can accept only one order by parameter`) - ErrQueryOffsetParam = errors.New(`upper: a query can accept only one offset parameter`) - ErrMissingConditions = errors.New(`upper: missing selector conditions`) - ErrUnsupported = errors.New(`upper: this action is currently unsupported on this database`) - ErrUndefined = errors.New(`upper: this value is undefined`) - ErrQueryIsPending = errors.New(`upper: can't execute this instruction while the result set is still open`) - ErrUnsupportedDestination = errors.New(`upper: unsupported destination type`) - ErrUnsupportedType = errors.New(`upper: this type does not support marshaling`) - ErrUnsupportedValue = errors.New(`upper: this value does not support unmarshaling`) - ErrUnknownConditionType = errors.New(`upper: arguments of type %T can't be used as constraints`) - ErrTooManyClients = errors.New(`upper: can't connect to database server: too many clients`) - ErrGivingUpTryingToConnect = errors.New(`upper: giving up trying to connect: too many clients`) - ErrMissingConnURL = errors.New(`upper: missing DSN`) - ErrNotImplemented = errors.New(`upper: call not implemented`) - ErrAlreadyWithinTransaction = errors.New(`upper: already within a transaction`) - ErrServerRefusedConnection = errors.New(`upper: database server refused connection`) + ErrNoMoreRows = errors.New(`upper: no more rows in this result set`) + ErrNotConnected = errors.New(`upper: you're currently not connected`) + ErrMissingDatabaseName = errors.New(`upper: missing database name`) + ErrMissingCollectionName = errors.New(`upper: missing collection name`) + ErrCollectionDoesNotExist = errors.New(`upper: collection does not exist`) + ErrSockerOrHost = errors.New(`upper: you may connect either to a unix socket or a tcp address, but not both`) + ErrQueryLimitParam = errors.New(`upper: a query can accept only one limit parameter`) + ErrQuerySortParam = errors.New(`upper: a query can accept only one order by parameter`) + ErrQueryOffsetParam = errors.New(`upper: a query can accept only one offset parameter`) + ErrMissingConditions = errors.New(`upper: missing selector conditions`) + ErrUnsupported = errors.New(`upper: this action is currently unsupported on this database`) + ErrUndefined = errors.New(`upper: this value is undefined`) + ErrQueryIsPending = errors.New(`upper: can't execute this instruction while the result set is still open`) + ErrUnsupportedDestination = errors.New(`upper: unsupported destination type`) + ErrUnsupportedType = errors.New(`upper: this type does not support marshaling`) + ErrUnsupportedValue = errors.New(`upper: this value does not support unmarshaling`) + ErrUnknownConditionType = errors.New(`upper: arguments of type %T can't be used as constraints`) + ErrTooManyClients = errors.New(`upper: can't connect to database server: too many clients`) + ErrGivingUpTryingToConnect = errors.New(`upper: giving up trying to connect: too many clients`) + ErrTooManyReconnectionAttempts = errors.New(`upper: too many reconnection attempts`) + ErrMissingConnURL = errors.New(`upper: missing DSN`) + ErrNotImplemented = errors.New(`upper: call not implemented`) + ErrAlreadyWithinTransaction = errors.New(`upper: already within a transaction`) + ErrServerRefusedConnection = errors.New(`upper: database server refused connection`) ) diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index 13af8e22..d56e87d1 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -186,7 +186,7 @@ func (d *database) connect(connFn func() error) error { // Attempt to (re)connect if atomic.AddUint64(&d.connectAttempts, 1) >= maxReconnectionAttempts { - return db.ErrGivingUpTryingToConnect + return db.ErrTooManyReconnectionAttempts } waitTime := minConnectionRetryInterval From 37e278023fe8f23b457ca9504452e8a30d10bf7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Thu, 10 Nov 2016 12:06:45 -0600 Subject: [PATCH 13/21] Fixes after testing reconnect with a low number of available connections. --- internal/sqladapter/database.go | 35 +++++++++++++------------------ mysql/database.go | 37 +++++++++++++++++++++++++++++---- postgresql/database.go | 36 ++++++++++++++++++++++++++++---- ql/database.go | 25 +++++++++++++++++++--- sqlite/database.go | 29 ++++++++++++++++++++++++-- 5 files changed, 128 insertions(+), 34 deletions(-) diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index 13af8e22..feacf926 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -37,13 +37,13 @@ var ( minConnectionRetryInterval = time.Millisecond * 100 // Maximum interval when waiting before trying to reconnect. - maxConnectionRetryInterval = time.Millisecond * 2500 + maxConnectionRetryInterval = time.Millisecond * 1500 // Maximun time each connection retry attempt can take. maxConnectionRetryTime = time.Second * 5 // Maximun reconnection attempts per session before giving up. - maxReconnectionAttempts uint64 = 12 + maxReconnectionAttempts uint64 = 4 ) var ( @@ -89,6 +89,7 @@ type PartialDatabase interface { NewLocalCollection(name string) db.Collection CompileStatement(stmt *exql.Statement) (query string) ConnectionURL() db.ConnectionURL + ConnCheck() error Err(in error) (out error) NewLocalTransaction() (DatabaseTx, error) @@ -104,7 +105,7 @@ type BaseDatabase interface { Collection(string) db.Collection Driver() interface{} - WaitForConnection(func() error) error + WaitForConnection(func() error, bool) error BindSession(*sql.DB) error Session() *sql.DB @@ -168,7 +169,7 @@ func (d *database) reconnect() error { return errors.New("Can't recover from within a bad transaction.") } - err := d.PartialDatabase.Err(d.Ping()) + err := d.PartialDatabase.Err(d.ConnCheck()) if err == nil { return nil } @@ -204,7 +205,6 @@ func (d *database) connect(connFn func() error) error { err := connFn() if err == nil { atomic.StoreUint64(&d.connectAttempts, 0) - d.connFn = connFn return nil } @@ -238,7 +238,7 @@ func (d *database) BindTx(t *sql.Tx) error { defer d.sessMu.Unlock() d.baseTx = newTx(t) - if err := d.Ping(); err != nil { + if err := d.ConnCheck(); err != nil { return err } @@ -334,19 +334,7 @@ func (d *database) recoverFromErr(err error) error { // it func (d *database) Ping() error { if sess := d.Session(); sess != nil { - if err := sess.Ping(); err != nil { - return err - } - - _, err := sess.Exec("SELECT 1") - if err != nil { - return err - } - return nil - } - if tx := d.Transaction(); tx != nil { - // When upper wraps a transaction with no original session. - return nil + return sess.Ping() } return db.ErrNotConnected } @@ -618,11 +606,16 @@ var waitForConnMu sync.Mutex // WaitForConnection tries to execute the given connFn function, if connFn // returns an error, then WaitForConnection will keep trying until connFn // returns nil. Maximum waiting time is 5s after having acquired the lock. -func (d *database) WaitForConnection(connFn func() error) error { +func (d *database) WaitForConnection(connFn func() error, isDefault bool) error { if err := d.connect(connFn); err != nil { return err } - // Success. + + if isDefault { + d.connectMu.Lock() + d.connFn = connFn + d.connectMu.Unlock() + } return nil } diff --git a/mysql/database.go b/mysql/database.go index 8263e40c..4c01de91 100644 --- a/mysql/database.go +++ b/mysql/database.go @@ -120,7 +120,7 @@ func (d *database) open() error { return err } - if err := d.BaseDatabase.WaitForConnection(connFn); err != nil { + if err := d.BaseDatabase.WaitForConnection(connFn, true); err != nil { return err } @@ -141,7 +141,10 @@ func (d *database) clone() (*database, error) { } clone.Builder = b - clone.BaseDatabase.BindSession(d.BaseDatabase.Session()) + if err := clone.BaseDatabase.BindSession(d.BaseDatabase.Session()); err != nil { + return nil, err + } + return clone, nil } @@ -168,6 +171,27 @@ func (d *database) NewLocalCollection(name string) db.Collection { return newTable(d, name) } +// ConnCheck tests whether a connection is valid or not. A connection is valid +// when the server can be reached, login details are correct and an a simple +// operation can be actually carried out. +func (d *database) ConnCheck() error { + if sess := d.Session(); sess != nil { + if err := sess.Ping(); err != nil { + return err + } + + _, err := d.Session().Exec("SELECT 1") + if err != nil { + return err + } + return nil + } + if tx := d.Transaction(); tx != nil { + return nil + } + return db.ErrNotConnected +} + // Tx creates a transaction and passes it to the given function, if if the // function returns no error then the transaction is commited. func (d *database) Tx(fn func(tx sqlbuilder.Tx) error) error { @@ -182,14 +206,19 @@ func (d *database) NewLocalTransaction() (sqladapter.DatabaseTx, error) { } connFn := func() error { - sqlTx, err := clone.BaseDatabase.Session().Begin() + sess := clone.BaseDatabase.Session() + if sess == nil { + return db.ErrNotConnected + } + + sqlTx, err := sess.Begin() if err == nil { return clone.BindTx(sqlTx) } return err } - if err := d.BaseDatabase.WaitForConnection(connFn); err != nil { + if err := d.BaseDatabase.WaitForConnection(connFn, false); err != nil { return nil, err } diff --git a/postgresql/database.go b/postgresql/database.go index 2b89919b..42ab36a5 100644 --- a/postgresql/database.go +++ b/postgresql/database.go @@ -119,7 +119,7 @@ func (d *database) open() error { return err } - if err := d.BaseDatabase.WaitForConnection(connFn); err != nil { + if err := d.BaseDatabase.WaitForConnection(connFn, true); err != nil { return err } @@ -140,7 +140,10 @@ func (d *database) clone() (*database, error) { } clone.Builder = b - clone.BaseDatabase.BindSession(d.BaseDatabase.Session()) + if err := clone.BaseDatabase.BindSession(d.BaseDatabase.Session()); err != nil { + return nil, err + } + return clone, nil } @@ -178,6 +181,27 @@ func (d *database) NewLocalCollection(name string) db.Collection { return newTable(d, name) } +// ConnCheck tests whether a connection is valid or not. A connection is valid +// when the server can be reached, login details are correct and an a simple +// operation can be actually carried out. +func (d *database) ConnCheck() error { + if sess := d.Session(); sess != nil { + if err := sess.Ping(); err != nil { + return err + } + + _, err := d.Session().Exec("SELECT 1") + if err != nil { + return err + } + return nil + } + if tx := d.Transaction(); tx != nil { + return nil + } + return db.ErrNotConnected +} + // Tx creates a transaction and passes it to the given function, if if the // function returns no error then the transaction is commited. func (d *database) Tx(fn func(tx sqlbuilder.Tx) error) error { @@ -195,14 +219,18 @@ func (d *database) NewLocalTransaction() (sqladapter.DatabaseTx, error) { defer clone.txMu.Unlock() connFn := func() error { - sqlTx, err := clone.BaseDatabase.Session().Begin() + sess := clone.BaseDatabase.Session() + if sess == nil { + return db.ErrNotConnected + } + sqlTx, err := sess.Begin() if err == nil { return clone.BindTx(sqlTx) } return err } - if err := d.BaseDatabase.WaitForConnection(connFn); err != nil { + if err := d.BaseDatabase.WaitForConnection(connFn, false); err != nil { return nil, err } return sqladapter.NewTx(clone), nil diff --git a/ql/database.go b/ql/database.go index 3a5134cc..8409d931 100644 --- a/ql/database.go +++ b/ql/database.go @@ -24,6 +24,7 @@ package ql import ( "database/sql" "errors" + "log" "sync" "sync/atomic" @@ -198,7 +199,7 @@ func (d *database) open() error { return errTooManyOpenFiles } - if err := d.BaseDatabase.WaitForConnection(openFn); err != nil { + if err := d.BaseDatabase.WaitForConnection(openFn, true); err != nil { return err } @@ -227,6 +228,7 @@ func (d *database) CompileStatement(stmt *exql.Statement) string { // Err allows sqladapter to translate some known errors into generic errors. func (d *database) Err(err error) error { if err != nil { + log.Printf("Err: %v", err) if err == errTooManyOpenFiles { return db.ErrTooManyClients } @@ -265,6 +267,19 @@ func (d *database) NewLocalCollection(name string) db.Collection { return newTable(d, name) } +// ConnCheck tests whether a connection is valid or not. A connection is valid +// when the server can be reached, login details are correct and an a simple +// operation can be actually carried out. +func (d *database) ConnCheck() error { + if sess := d.Session(); sess != nil { + return sess.Ping() + } + if tx := d.Transaction(); tx != nil { + return nil + } + return db.ErrNotConnected +} + // Tx creates a transaction and passes it to the given function, if if the // function returns no error then the transaction is commited. func (d *database) Tx(fn func(tx sqlbuilder.Tx) error) error { @@ -282,14 +297,18 @@ func (d *database) NewLocalTransaction() (sqladapter.DatabaseTx, error) { defer clone.txMu.Unlock() openFn := func() error { - sqlTx, err := clone.BaseDatabase.Session().Begin() + sess := clone.BaseDatabase.Session() + if sess == nil { + return db.ErrNotConnected + } + sqlTx, err := sess.Begin() if err == nil { return clone.BindTx(sqlTx) } return err } - if err := d.BaseDatabase.WaitForConnection(openFn); err != nil { + if err := d.BaseDatabase.WaitForConnection(openFn, false); err != nil { return nil, err } diff --git a/sqlite/database.go b/sqlite/database.go index db824671..20b48d3f 100644 --- a/sqlite/database.go +++ b/sqlite/database.go @@ -140,7 +140,7 @@ func (d *database) open() error { return errTooManyOpenFiles } - if err := d.BaseDatabase.WaitForConnection(openFn); err != nil { + if err := d.BaseDatabase.WaitForConnection(openFn, true); err != nil { return err } @@ -181,6 +181,27 @@ func (d *database) NewLocalCollection(name string) db.Collection { return newTable(d, name) } +// ConnCheck tests whether a connection is valid or not. A connection is valid +// when the server can be reached, login details are correct and an a simple +// operation can be actually carried out. +func (d *database) ConnCheck() error { + if sess := d.Session(); sess != nil { + if err := sess.Ping(); err != nil { + return err + } + + _, err := d.Session().Exec("SELECT 1") + if err != nil { + return err + } + return nil + } + if tx := d.Transaction(); tx != nil { + return nil + } + return db.ErrNotConnected +} + // Tx creates a transaction and passes it to the given function, if if the // function returns no error then the transaction is commited. func (d *database) Tx(fn func(tx sqlbuilder.Tx) error) error { @@ -195,6 +216,10 @@ func (d *database) NewLocalTransaction() (sqladapter.DatabaseTx, error) { } openFn := func() error { + sess := clone.BaseDatabase.Session() + if sess == nil { + return db.ErrNotConnected + } sqlTx, err := clone.BaseDatabase.Session().Begin() if err == nil { return clone.BindTx(sqlTx) @@ -202,7 +227,7 @@ func (d *database) NewLocalTransaction() (sqladapter.DatabaseTx, error) { return err } - if err := d.BaseDatabase.WaitForConnection(openFn); err != nil { + if err := d.BaseDatabase.WaitForConnection(openFn, true); err != nil { return nil, err } From 78d28bced0760c06562588a236026e0959e44907 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Thu, 10 Nov 2016 12:25:04 -0600 Subject: [PATCH 14/21] Make retry-on-error configurable and disable it by default. --- config.go | 39 ++++++++++++++++++++++++++------- internal/sqladapter/database.go | 4 ++-- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/config.go b/config.go index f0e34e82..84079ded 100644 --- a/config.go +++ b/config.go @@ -28,6 +28,11 @@ import ( // Settings defines methods to get or set configuration values. type Settings interface { + // SetQueryRetryOnError enables or disable query retry-on-error features. + SetQueryRetryOnError(bool) + // Returns true if query retry is enabled. + QueryRetryOnError() bool + // SetLogging enables or disables logging. SetLogging(bool) // LoggingEnabled returns true if logging is enabled, false otherwise. @@ -45,6 +50,8 @@ type conf struct { queryLogger Logger queryLoggerMu sync.RWMutex defaultLogger defaultLogger + + queryRetryOnError uint32 } func (c *conf) Logger() Logger { @@ -58,6 +65,14 @@ func (c *conf) Logger() Logger { return c.queryLogger } +func (c *conf) SetQueryRetryOnError(v bool) { + c.setBinaryOption(&c.queryRetryOnError, true) +} + +func (c *conf) QueryRetryOnError() bool { + return c.binaryOption(&c.queryRetryOnError) +} + func (c *conf) SetLogger(lg Logger) { c.queryLoggerMu.Lock() defer c.queryLoggerMu.Unlock() @@ -65,20 +80,28 @@ func (c *conf) SetLogger(lg Logger) { c.queryLogger = lg } -func (c *conf) SetLogging(value bool) { +func (c *conf) binaryOption(dest *uint32) bool { + if v := atomic.LoadUint32(dest); v == 1 { + return true + } + return false +} + +func (c *conf) setBinaryOption(dest *uint32, value bool) { if value { - atomic.StoreUint32(&c.loggingEnabled, 1) + atomic.StoreUint32(dest, 1) return } - atomic.StoreUint32(&c.loggingEnabled, 0) + atomic.StoreUint32(dest, 0) +} + +func (c *conf) SetLogging(value bool) { + c.setBinaryOption(&c.loggingEnabled, true) } func (c *conf) LoggingEnabled() bool { - if v := atomic.LoadUint32(&c.loggingEnabled); v == 1 { - return true - } - return false + return c.binaryOption(&c.loggingEnabled) } -// Conf provides global configuration settings for upper-db. +// Conf provides default global configuration settings for upper-db. var Conf Settings = &conf{} diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index feacf926..3e8d72b7 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -495,7 +495,7 @@ func (d *database) StatementQuery(stmt *exql.Statement, args ...interface{}) (ro for i := 0; ; i++ { query, rows, err = d.prepareAndQuery(stmt, args...) - if err == nil || i >= maxQueryRetryAttempts { + if err == nil || i >= maxQueryRetryAttempts || !db.Conf.QueryRetryOnError() { return rows, err } @@ -544,7 +544,7 @@ func (d *database) StatementQueryRow(stmt *exql.Statement, args ...interface{}) for i := 0; ; i++ { query, row, err = d.prepareAndQueryRow(stmt, args...) - if err == nil || i >= maxQueryRetryAttempts { + if err == nil || i >= maxQueryRetryAttempts || !db.Conf.QueryRetryOnError() { return row, err } From 340d8ebe07a8d4c6b9a2959ea62a1e2f4caf37b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Thu, 10 Nov 2016 12:28:49 -0600 Subject: [PATCH 15/21] Add db.Conf.SetRetryQueryOnError to PostgreSQL test. --- config.go | 10 +++++----- internal/sqladapter/database.go | 4 ++-- postgresql/local_test.go | 3 +++ 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/config.go b/config.go index 84079ded..1017feb9 100644 --- a/config.go +++ b/config.go @@ -28,10 +28,10 @@ import ( // Settings defines methods to get or set configuration values. type Settings interface { - // SetQueryRetryOnError enables or disable query retry-on-error features. - SetQueryRetryOnError(bool) + // SetRetryQueryOnError enables or disable query retry-on-error features. + SetRetryQueryOnError(bool) // Returns true if query retry is enabled. - QueryRetryOnError() bool + RetryQueryOnError() bool // SetLogging enables or disables logging. SetLogging(bool) @@ -65,11 +65,11 @@ func (c *conf) Logger() Logger { return c.queryLogger } -func (c *conf) SetQueryRetryOnError(v bool) { +func (c *conf) SetRetryQueryOnError(v bool) { c.setBinaryOption(&c.queryRetryOnError, true) } -func (c *conf) QueryRetryOnError() bool { +func (c *conf) RetryQueryOnError() bool { return c.binaryOption(&c.queryRetryOnError) } diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index 3e8d72b7..1bb30e06 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -495,7 +495,7 @@ func (d *database) StatementQuery(stmt *exql.Statement, args ...interface{}) (ro for i := 0; ; i++ { query, rows, err = d.prepareAndQuery(stmt, args...) - if err == nil || i >= maxQueryRetryAttempts || !db.Conf.QueryRetryOnError() { + if err == nil || i >= maxQueryRetryAttempts || !db.Conf.RetryQueryOnError() { return rows, err } @@ -544,7 +544,7 @@ func (d *database) StatementQueryRow(stmt *exql.Statement, args ...interface{}) for i := 0; ; i++ { query, row, err = d.prepareAndQueryRow(stmt, args...) - if err == nil || i >= maxQueryRetryAttempts || !db.Conf.QueryRetryOnError() { + if err == nil || i >= maxQueryRetryAttempts || !db.Conf.RetryQueryOnError() { return row, err } diff --git a/postgresql/local_test.go b/postgresql/local_test.go index b54682bf..47d68b76 100644 --- a/postgresql/local_test.go +++ b/postgresql/local_test.go @@ -127,6 +127,9 @@ func TestDriverBadConnection(t *testing.T) { sess := mustOpen() defer sess.Close() + db.Conf.SetRetryQueryOnError(true) + defer db.Conf.SetRetryQueryOnError(false) + var tMu sync.Mutex tFatal := func(err error) { tMu.Lock() From 118944dd4ddd89c7a4af5f1ea59c7897d6896fd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Thu, 10 Nov 2016 12:59:22 -0600 Subject: [PATCH 16/21] Fix call to c.setBinaryOption --- config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config.go b/config.go index 1017feb9..c85a476a 100644 --- a/config.go +++ b/config.go @@ -66,7 +66,7 @@ func (c *conf) Logger() Logger { } func (c *conf) SetRetryQueryOnError(v bool) { - c.setBinaryOption(&c.queryRetryOnError, true) + c.setBinaryOption(&c.queryRetryOnError, v) } func (c *conf) RetryQueryOnError() bool { @@ -96,7 +96,7 @@ func (c *conf) setBinaryOption(dest *uint32, value bool) { } func (c *conf) SetLogging(value bool) { - c.setBinaryOption(&c.loggingEnabled, true) + c.setBinaryOption(&c.loggingEnabled, value) } func (c *conf) LoggingEnabled() bool { From 7017914cdccc2d363407cc28506a386a73f096df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Thu, 10 Nov 2016 13:13:43 -0600 Subject: [PATCH 17/21] Add missing db.Conf.RetryQueryOnError --- internal/sqladapter/database.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index 1bb30e06..f0c22979 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -449,7 +449,7 @@ func (d *database) StatementExec(stmt *exql.Statement, args ...interface{}) (res for i := 0; ; i++ { query, res, err = d.prepareAndExec(stmt, args...) - if err == nil || i >= maxQueryRetryAttempts { + if err == nil || i >= maxQueryRetryAttempts || !db.Conf.RetryQueryOnError() { return res, err } From ed133487c02a7cce2ab44c07de22f2b48ae778cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Mon, 14 Nov 2016 15:00:40 -0600 Subject: [PATCH 18/21] Add d.recoverAttempts for counting recovery attempts up to discardConnectionAfterFailedRecoverAttempts times --- internal/sqladapter/database.go | 46 +++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index f0c22979..1ffd7e5e 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -31,7 +31,7 @@ var ( // If a query fails with a recoverable error the connection is going to be // re-estalished and the query can be retried, each retry adds a max wait // time of maxConnectionRetryTime - maxQueryRetryAttempts = 3 + maxQueryRetryAttempts = 6 // Minimum interval when waiting before trying to reconnect. minConnectionRetryInterval = time.Millisecond * 100 @@ -39,11 +39,16 @@ var ( // Maximum interval when waiting before trying to reconnect. maxConnectionRetryInterval = time.Millisecond * 1500 - // Maximun time each connection retry attempt can take. + // Maximum time each connection retry attempt can take. maxConnectionRetryTime = time.Second * 5 - // Maximun reconnection attempts per session before giving up. + // Maximum reconnection attempts per session before giving up. maxReconnectionAttempts uint64 = 4 + + // If this session failed to recover more than + // discardConnectionAfterFailedRecoverAttempts times, assume the entire pool + // is borked and force a clean reconnection. + discardConnectionAfterFailedRecoverAttempts = uint64(maxQueryRetryAttempts / 2) ) var ( @@ -151,6 +156,7 @@ type database struct { txID uint64 connectAttempts uint64 + recoverAttempts uint64 cachedStatements *cache.Cache cachedCollections *cache.Cache @@ -162,21 +168,6 @@ var ( _ = db.Database(&database{}) ) -func (d *database) reconnect() error { - if d.Transaction() != nil { - // Don't even attempt to recover from within a transaction, this is not - // possible. - return errors.New("Can't recover from within a bad transaction.") - } - - err := d.PartialDatabase.Err(d.ConnCheck()) - if err == nil { - return nil - } - - return d.connect(d.connFn) -} - func (d *database) connect(connFn func() error) error { if connFn == nil { return errors.New("Missing connect function") @@ -205,6 +196,7 @@ func (d *database) connect(connFn func() error) error { err := connFn() if err == nil { atomic.StoreUint64(&d.connectAttempts, 0) + atomic.StoreUint64(&d.recoverAttempts, 0) return nil } @@ -321,9 +313,23 @@ func (d *database) recoverFromErr(err error) error { return errNothingToRecoverFrom } + if d.Transaction() != nil { + // Don't even attempt to recover from within a transaction, this is not + // possible. + return errors.New("Can't recover from within a bad transaction.") + } + + if atomic.AddUint64(&d.recoverAttempts, 1) == discardConnectionAfterFailedRecoverAttempts { + d.sess.Close() + } + if d.isRecoverableError(err) { - err := d.reconnect() - return err + err := d.PartialDatabase.Err(d.ConnCheck()) // Let's see if database/sql recovered itself. + if err == nil { + return nil + } + // Let's attempt to connect + return d.connect(d.connFn) } // This is not an error we can recover from. From 02b5a72d2dd20314db92292164bd619adfcedd10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Mon, 14 Nov 2016 17:15:29 -0600 Subject: [PATCH 19/21] Use flushConnectionPoolAfterRecoverAttempts and remove ConnCheck. --- internal/sqladapter/database.go | 21 +++++++++++++++------ mysql/database.go | 21 --------------------- postgresql/database.go | 21 --------------------- ql/database.go | 13 ------------- sqlite/database.go | 21 --------------------- 5 files changed, 15 insertions(+), 82 deletions(-) diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index 1ffd7e5e..b56e2757 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -46,9 +46,9 @@ var ( maxReconnectionAttempts uint64 = 4 // If this session failed to recover more than - // discardConnectionAfterFailedRecoverAttempts times, assume the entire pool + // flushConnectionPoolAfterRecoverAttempts times, assume the entire pool // is borked and force a clean reconnection. - discardConnectionAfterFailedRecoverAttempts = uint64(maxQueryRetryAttempts / 2) + flushConnectionPoolAfterRecoverAttempts = uint64(maxQueryRetryAttempts / 2) ) var ( @@ -94,7 +94,6 @@ type PartialDatabase interface { NewLocalCollection(name string) db.Collection CompileStatement(stmt *exql.Statement) (query string) ConnectionURL() db.ConnectionURL - ConnCheck() error Err(in error) (out error) NewLocalTransaction() (DatabaseTx, error) @@ -230,7 +229,7 @@ func (d *database) BindTx(t *sql.Tx) error { defer d.sessMu.Unlock() d.baseTx = newTx(t) - if err := d.ConnCheck(); err != nil { + if err := d.Ping(); err != nil { return err } @@ -319,12 +318,17 @@ func (d *database) recoverFromErr(err error) error { return errors.New("Can't recover from within a bad transaction.") } - if atomic.AddUint64(&d.recoverAttempts, 1) == discardConnectionAfterFailedRecoverAttempts { + if atomic.AddUint64(&d.recoverAttempts, 1) == flushConnectionPoolAfterRecoverAttempts { + // This happens when d.Ping() says everything is OK but the next query + // fails, and this probably means that a high number of connections in the + // pool are in bad state. Since we don't have any way to check the + // connection pool for valid connections we'll close the database, this + // makes d.Ping() fail and forces a reconnection. d.sess.Close() } if d.isRecoverableError(err) { - err := d.PartialDatabase.Err(d.ConnCheck()) // Let's see if database/sql recovered itself. + err := d.PartialDatabase.Err(d.Ping()) // Let's see if database/sql recovered itself. if err == nil { return nil } @@ -342,6 +346,11 @@ func (d *database) Ping() error { if sess := d.Session(); sess != nil { return sess.Ping() } + if tx := d.Transaction(); tx != nil { + // This is a wrapped transaction, let's assume we had a working + // connection in the first place. + return nil + } return db.ErrNotConnected } diff --git a/mysql/database.go b/mysql/database.go index 4c01de91..ac4f596e 100644 --- a/mysql/database.go +++ b/mysql/database.go @@ -171,27 +171,6 @@ func (d *database) NewLocalCollection(name string) db.Collection { return newTable(d, name) } -// ConnCheck tests whether a connection is valid or not. A connection is valid -// when the server can be reached, login details are correct and an a simple -// operation can be actually carried out. -func (d *database) ConnCheck() error { - if sess := d.Session(); sess != nil { - if err := sess.Ping(); err != nil { - return err - } - - _, err := d.Session().Exec("SELECT 1") - if err != nil { - return err - } - return nil - } - if tx := d.Transaction(); tx != nil { - return nil - } - return db.ErrNotConnected -} - // Tx creates a transaction and passes it to the given function, if if the // function returns no error then the transaction is commited. func (d *database) Tx(fn func(tx sqlbuilder.Tx) error) error { diff --git a/postgresql/database.go b/postgresql/database.go index 42ab36a5..e87799f5 100644 --- a/postgresql/database.go +++ b/postgresql/database.go @@ -181,27 +181,6 @@ func (d *database) NewLocalCollection(name string) db.Collection { return newTable(d, name) } -// ConnCheck tests whether a connection is valid or not. A connection is valid -// when the server can be reached, login details are correct and an a simple -// operation can be actually carried out. -func (d *database) ConnCheck() error { - if sess := d.Session(); sess != nil { - if err := sess.Ping(); err != nil { - return err - } - - _, err := d.Session().Exec("SELECT 1") - if err != nil { - return err - } - return nil - } - if tx := d.Transaction(); tx != nil { - return nil - } - return db.ErrNotConnected -} - // Tx creates a transaction and passes it to the given function, if if the // function returns no error then the transaction is commited. func (d *database) Tx(fn func(tx sqlbuilder.Tx) error) error { diff --git a/ql/database.go b/ql/database.go index 8409d931..60e91f77 100644 --- a/ql/database.go +++ b/ql/database.go @@ -267,19 +267,6 @@ func (d *database) NewLocalCollection(name string) db.Collection { return newTable(d, name) } -// ConnCheck tests whether a connection is valid or not. A connection is valid -// when the server can be reached, login details are correct and an a simple -// operation can be actually carried out. -func (d *database) ConnCheck() error { - if sess := d.Session(); sess != nil { - return sess.Ping() - } - if tx := d.Transaction(); tx != nil { - return nil - } - return db.ErrNotConnected -} - // Tx creates a transaction and passes it to the given function, if if the // function returns no error then the transaction is commited. func (d *database) Tx(fn func(tx sqlbuilder.Tx) error) error { diff --git a/sqlite/database.go b/sqlite/database.go index 20b48d3f..f6aa7380 100644 --- a/sqlite/database.go +++ b/sqlite/database.go @@ -181,27 +181,6 @@ func (d *database) NewLocalCollection(name string) db.Collection { return newTable(d, name) } -// ConnCheck tests whether a connection is valid or not. A connection is valid -// when the server can be reached, login details are correct and an a simple -// operation can be actually carried out. -func (d *database) ConnCheck() error { - if sess := d.Session(); sess != nil { - if err := sess.Ping(); err != nil { - return err - } - - _, err := d.Session().Exec("SELECT 1") - if err != nil { - return err - } - return nil - } - if tx := d.Transaction(); tx != nil { - return nil - } - return db.ErrNotConnected -} - // Tx creates a transaction and passes it to the given function, if if the // function returns no error then the transaction is commited. func (d *database) Tx(fn func(tx sqlbuilder.Tx) error) error { From 838332f2cce71362a955a1cd6dab199461a6668a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Wed, 16 Nov 2016 06:56:04 -0600 Subject: [PATCH 20/21] Removing flawed logic: don't close before having a valid new conn to replace the old one --- internal/sqladapter/database.go | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index cb74ddd8..f304d6e1 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -27,7 +27,7 @@ var recoverableErrors = []error{ db.ErrServerRefusedConnection, } -var ( +const ( // If a query fails with a recoverable error the connection is going to be // re-estalished and the query can be retried, each retry adds a max wait // time of maxConnectionRetryTime @@ -318,26 +318,31 @@ func (d *database) recoverFromErr(err error) error { return errors.New("Can't recover from within a bad transaction.") } - if atomic.AddUint64(&d.recoverAttempts, 1) == flushConnectionPoolAfterRecoverAttempts { - // This happens when d.Ping() says everything is OK but the next query - // fails, and this probably means that a high number of connections in the + if !d.isRecoverableError(err) { + // This is not an error we can recover from. + return errUnableToRecover + } + + reconnect := false + + if err := d.PartialDatabase.Err(d.Ping()); err != nil { // Let's see if database/sql recovered itself. + reconnect = true + } + + if !reconnect && atomic.AddUint64(&d.recoverAttempts, 1) == flushConnectionPoolAfterRecoverAttempts { + // This happens when d.Ping() says everything is OK but queries keep + // failing, that probably means that a high number of connections in the // pool are in bad state. Since we don't have any way to check the - // connection pool for valid connections we'll close the database, this - // makes d.Ping() fail and forces a reconnection. - d.sess.Close() + // connection pool for valid connections we'll force a reconnection (once). + reconnect = true } - if d.isRecoverableError(err) { - err := d.PartialDatabase.Err(d.Ping()) // Let's see if database/sql recovered itself. - if err == nil { - return nil - } + if reconnect { // Let's attempt to connect return d.connect(d.connFn) } - // This is not an error we can recover from. - return errUnableToRecover + return nil } // Ping checks whether a connection to the database is still alive by pinging From 33b8d628319ec6daddde25eaae557d7eb74cafe8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Wed, 16 Nov 2016 07:49:46 -0600 Subject: [PATCH 21/21] Mark prepared statements as ready to be closed after using them --- internal/sqladapter/database.go | 3 +++ internal/sqladapter/testing/adapter.go.tpl | 14 ++++++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index f304d6e1..a7136e6f 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -423,6 +423,7 @@ func (d *database) prepareAndExec(stmt *exql.Statement, args ...interface{}) (st if err != nil { return query, nil, err } + defer p.Close() if execer, ok := d.PartialDatabase.(HasStatementExec); ok { res, err := execer.StatementExec(p.Stmt, args...) @@ -487,6 +488,7 @@ func (d *database) prepareAndQuery(stmt *exql.Statement, args ...interface{}) (s if err != nil { return query, nil, err } + defer p.Close() rows, err := p.Query(args...) return query, rows, err @@ -533,6 +535,7 @@ func (d *database) prepareAndQueryRow(stmt *exql.Statement, args ...interface{}) if err != nil { return query, nil, err } + defer p.Close() // Would be nice to find a way to check if this succeeded before using // Scan. diff --git a/internal/sqladapter/testing/adapter.go.tpl b/internal/sqladapter/testing/adapter.go.tpl index 2d1fba1b..99c64405 100644 --- a/internal/sqladapter/testing/adapter.go.tpl +++ b/internal/sqladapter/testing/adapter.go.tpl @@ -91,7 +91,11 @@ func TestStressPreparedStatementsCache(t *testing.T) { // The max number of elements we can have on our LRU is 128, if an statement // is evicted it will be marked as dead and will be closed only when no other // queries are using it. - const maxPreparedStatements = 128 * 2 + const preparedStatementsLRU = 128 + + // This number can be a bit greater than preparedStatementsLRU when executing + // a lot of concurrent statements. + const maxPreparedStatements = preparedStatementsLRU + 40 var wg sync.WaitGroup for i := 0; i < 1000; i++ { @@ -106,7 +110,8 @@ func TestStressPreparedStatementsCache(t *testing.T) { if err != nil { tFatal(err) } - if activeStatements := sqladapter.NumActiveStatements(); activeStatements > maxPreparedStatements { + activeStatements := sqladapter.NumActiveStatements() + if activeStatements > maxPreparedStatements { tFatal(fmt.Errorf("The number of active statements cannot exceed %d (got %d).", maxPreparedStatements, activeStatements)) } }(i) @@ -116,6 +121,11 @@ func TestStressPreparedStatementsCache(t *testing.T) { } wg.Wait() + + activeStatements := sqladapter.NumActiveStatements() + if activeStatements > preparedStatementsLRU { + t.Fatal(fmt.Errorf("The number of active statements cannot exceed %d (got %d).", preparedStatementsLRU, activeStatements)) + } } func TestTruncateAllCollections(t *testing.T) {