Skip to content

Commit

Permalink
basic retry mechanism (#6)
Browse files Browse the repository at this point in the history
* basic retry mechanism

* WIP integration tests for retry handler

* integration tests for retry working with basic functionality 💪🏼

* async all the namings

* add documentation

* implement retry sql methods WIP

* query retry logic WIP

* express example

* tests passing on new retry implementation

* update sql methods to include retry

* update docs and remove comment
  • Loading branch information
GuyHarwood authored Mar 25, 2019
1 parent b8eb44f commit 931d3cf
Show file tree
Hide file tree
Showing 11 changed files with 576 additions and 248 deletions.
41 changes: 38 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,45 @@
# less-tedious

Helper package for the node.js tedious SQL Server connectivity module.
Helper package for the node.js tedious SQL Server library, and accompanying mssql client library.

This package provides the following...

- easy to use `query` and `modify` methods that simplifies common SQL calls.
- easy to use `query` and `modify` methods that simplifies common SQL calls
- automatic connection management and pool initialisation
- promise support. No more wrapping requests in callbacks
- verbose debugging info, enabled by `winston`
- retry logic which tolerates timeouts and other custom conditions of your choosing


## Async Retry mechanism

The async retry handler was created to support the retry of database calls under certain conditions.
For example, when all database connections are saturated, you may want to wait 5 seconds before re-attempting.

### Example

```javascript
const retry = require('./retry-async')
// actual call to database wrapped in an async function...
const callDatabase = async () => {
const data = await sql.query('SELECT * FROM [SomeTable]')
return data
}
// we only want to retry timeout errors...
const onlyRetryTimeoutsPredicate = (error) => {
return error.message.contains('timeout')
}

const retryPolicy = {
attempts: 3,
pauseTimeMs: 5000,
pauseMultiplier: 1.5
}

const attemptCount = 3
try {
const data = await retry(callDatabase, retryPolicy, onlyRetryTimeoutsPredicate)
console.log(`we got some data:${data}`)
} catch (error) {
console.error(`attempted to query database ${retryPolicy.attempts} times, but all calls were unsuccessful`)
}
```
2 changes: 0 additions & 2 deletions date.service.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict'
const moment = require('moment')
const logger = require('winston')

const gdsFullFormat = 'D MMMM YYYY'
const gdsShortFormat = 'D MMM YYYY'
Expand Down Expand Up @@ -63,7 +62,6 @@ const dateService = {

checkAndFormat: function (date, format) {
if (!(date instanceof Date || moment.isMoment(date))) {
logger.warn(`Date parameter is not a Date or Moment object: ${date}`)
return ''
}
const m = moment(date)
Expand Down
2 changes: 2 additions & 0 deletions example-config.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
'use strict'

require('dotenv').config()
const oneMinuteInMilliseconds = 60000

module.exports = {
Expand Down
20 changes: 20 additions & 0 deletions http-example.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
'use strict'

require('dotenv').config()
const express = require('express')
const sqlService = require('./index')
const sqlConfig = require('./example-config')

sqlService.initPool(sqlConfig)

const app = express()
const port = 3000

const callDatabase = async (res) => {
const sql = 'SELECT TOP 1 * FROM [mtc_admin].[settings]'
await sqlService.queryWithRetry(sql)
res.sendStatus(200)
}

app.get('/', (req, res) => callDatabase(res))
app.listen(port, () => console.log(`express listening on port ${port}`))
130 changes: 52 additions & 78 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ const R = require('ramda')
const moment = require('moment')
let cache = {}
const mssql = require('mssql')
const logger = require('winston')
const dateService = require('./date.service')
let pool

Expand All @@ -18,7 +17,6 @@ let pool
* @return {string | undefined}
*/
const findDataType = (type) => Object.keys(sqlService.TYPES).find(k => {
logger.debug(`findDataType('${type}'): called`)
if (type.toUpperCase() === k.toUpperCase()) {
return k
}
Expand Down Expand Up @@ -150,7 +148,6 @@ async function generateParams (tableName, data) {
options.length = cacheData.maxLength
}

logger.debug(`sql.service: generateParams: options set for [${column}]`, options)
params.push({
name: column,
value,
Expand Down Expand Up @@ -203,7 +200,6 @@ sqlService.initPool = async (sqlConfig) => {
throw new Error('sqlConfig is required')
}
if (pool) {
logger.warn('The connection pool has already been initialised')
return
}
validateSqlConfig(sqlConfig)
Expand All @@ -218,19 +214,21 @@ sqlService.initPool = async (sqlConfig) => {
max: sqlConfig.Pooling.MaxCount || 5,
min: sqlConfig.Pooling.MinCount || 0,
idleTimeoutMillis: sqlConfig.Pooling.IdleTimeout || 30000
},
options: {
encrypt: sqlConfig.Encrypt
}
}

pool = new mssql.ConnectionPool(config)
pool.on('error', err => {
logger.error('SQL Pool Error:', err)
})
// TODO emit error
pool.on('error', () => {})
return pool.connect()
}

sqlService.drainPool = async () => {
await pool
if (!pool) {
logger.warn('The connection pool is not initialised')
return
}
return pool.close()
Expand Down Expand Up @@ -262,39 +260,36 @@ function addParamsToRequestSimple (params, request) {
}
}

const retry = require('./retry-async')
const retryConfig = {
attempts: 3,
pauseTimeMs: 5000,
pauseMultiplier: 1.5
}
const connectionLimitReachedErrorCode = 10928

const dbLimitReached = (error) => {
// https://docs.microsoft.com/en-us/azure/sql-database/sql-database-develop-error-messages
return error.number === connectionLimitReachedErrorCode // || error.message.indexOf('request limit') !== -1
}

/**
* Query data from SQL Server via mssql
* @param {string} sql - The SELECT statement to execute
* @param {array} params - Array of parameters for SQL statement
* @return {Promise<*>}
*/
sqlService.query = async (sql, params = []) => {
logger.debug(`sql.service.query(): ${sql}`)
logger.debug('sql.service.query(): Params ', R.map(R.pick(['name', 'value']), params))
await pool

const request = new mssql.Request(pool)
addParamsToRequestSimple(params, request)

let result
try {
result = await request.query(sql)
} catch (error) {
logger.error('sqlService.query(): SQL Query threw an error', error)
if (error.code && (error.code === 'ECONNCLOSED' || error.code === 'ESOCKET')) {
logger.error('sqlService.query(): An SQL request was attempted but the connection is closed', error)
}
try {
logger.error('sqlService.query(): SQL RETRY', error)
const retryRequest = new mssql.Request(pool)
addParamsToRequestSimple(params, retryRequest)
result = await retryRequest.query(sql)
} catch (error2) {
logger.error('sqlService.query(): SQL RETRY FAILED', error2)
throw error2
}
const query = async () => {
const request = new mssql.Request(pool)
addParamsToRequestSimple(params, request)
const result = await request.query(sql)
return sqlService.transformResult(result)
}
return sqlService.transformResult(result)

return retry(query, retryConfig, dbLimitReached)
}

/**
Expand All @@ -317,9 +312,6 @@ function addParamsToRequest (params, request) {
options.scale = param.scale || 5
}
const opts = param.options ? param.options : options
if (opts && Object.keys(opts).length) {
logger.debug('sql.service: addParamsToRequest(): opts to addParameter are: ', opts)
}

if (opts.precision) {
request.input(param.name, param.type(opts.precision, opts.scale), param.value)
Expand All @@ -339,39 +331,24 @@ function addParamsToRequest (params, request) {
* @return {Promise}
*/
sqlService.modify = async (sql, params = []) => {
logger.debug('sql.service.modify(): SQL: ' + sql)
logger.debug('sql.service.modify(): Params ', R.map(R.pick(['name', 'value']), params))
await pool

const request = new mssql.Request(pool)
addParamsToRequest(params, request)
const modify = async () => {
const request = new mssql.Request(pool)
addParamsToRequest(params, request)
return request.query(sql)
}

const returnValue = {}
const insertIds = []
let rawResponse

try {
rawResponse = await request.query(sql)
logger.debug('sql.service.modify(): result:', rawResponse)
} catch (error) {
logger.error('sqlService.modify(): SQL Query threw an error', error)
if (error.code && (error.code === 'ECONNCLOSED' || error.code === 'ESOCKET')) {
logger.error('sqlService.modify(): An SQL request was attempted but the connection is closed', error)
}
try {
logger.error('sqlService.modify(): attempting SQL retry', error)
const retryRequest = new mssql.Request(pool)
addParamsToRequest(params, retryRequest)
rawResponse = await retryRequest.query(sql)
logger.error('sqlService.modify(): SQL retry success', error)
logger.debug('sql.service: modify: result:', rawResponse)
} catch (error2) {
logger.error('sqlService.modify(): SQL RETRY FAILED', error2)
throw error2
}
}
rawResponse = await retry(modify, retryConfig, dbLimitReached)

if (rawResponse && rawResponse.recordset) {
for (let obj of rawResponse.recordset) {
/* TODO remove this strict column name limitation and
extract column value regardless of name */
if (obj && obj.SCOPE_IDENTITY) {
insertIds.push(obj.SCOPE_IDENTITY)
}
Expand Down Expand Up @@ -404,7 +381,12 @@ sqlService.findOneById = async (table, id, schema = '[mtc_admin]') => {
FROM ${schema}.${table}
WHERE id = @id
`
const rows = await sqlService.query(sql, [paramId])

const query = async () => {
return sqlService.query(sql, [paramId])
}

const rows = await retry(query, retryConfig, dbLimitReached)
return R.head(rows)
}

Expand All @@ -423,12 +405,9 @@ sqlService.getCacheEntryForColumn = async function (table, column) {
await sqlService.updateDataTypeCache()
}
if (!cache.hasOwnProperty(key)) {
logger.debug(`sql.service: cache miss for ${key}`)
return undefined
}
const cacheData = cache[key]
logger.debug(`sql.service: cache hit for ${key}`)
logger.debug('sql.service: cache', cacheData)
return cacheData
}

Expand All @@ -440,7 +419,6 @@ sqlService.getCacheEntryForColumn = async function (table, column) {
*/
sqlService.generateInsertStatement = async (table, data, schema = '[mtc_admin]') => {
const params = await generateParams(table, data)
logger.debug('sql.service: Params ', R.compose(R.map(R.pick(['name', 'value'])))(params))
const sql = `
INSERT INTO ${schema}.${table} ( ${extractColumns(data)} ) VALUES ( ${createParamIdentifiers(data)} );
SELECT SCOPE_IDENTITY() AS [SCOPE_IDENTITY]`
Expand Down Expand Up @@ -476,7 +454,6 @@ sqlService.generateMultipleInsertStatements = async (table, data, schema = '[mtc
)
})
params = R.flatten(params)
logger.debug('sql.service: Params ', R.compose(R.map(R.pick(['name', 'value'])))(params))
const sql = `
INSERT INTO ${schema}.${table} ( ${headers} ) VALUES ( ${values} );
SELECT SCOPE_IDENTITY()`
Expand Down Expand Up @@ -520,13 +497,11 @@ sqlService.create = async (tableName, data) => {
params,
outputParams
} = await sqlService.generateInsertStatement(tableName, preparedData)
try {
const res = await sqlService.modify(sql, params, outputParams)
return res
} catch (error) {
logger.warn('sql.service: Failed to INSERT', error)
throw error

const create = async () => {
return sqlService.modify(sql, params, outputParams)
}
return retry(create, retryConfig, dbLimitReached)
}

/**
Expand Down Expand Up @@ -563,7 +538,6 @@ sqlService.updateDataTypeCache = async function () {
maxLength: row.CHARACTER_MAXIMUM_LENGTH && row.CHARACTER_MAXIMUM_LENGTH > 0 ? row.CHARACTER_MAXIMUM_LENGTH : undefined
}
})
logger.debug('sql.service: updateDataTypeCache() complete')
}

/**
Expand All @@ -584,13 +558,12 @@ sqlService.update = async function (tableName, data) {
sql,
params
} = await sqlService.generateUpdateStatement(tableName, preparedData)
try {
const res = await sqlService.modify(sql, params)
return res
} catch (error) {
logger.warn('sql.service: Failed to UPDATE', error)
throw error

const update = async () => {
return sqlService.modify(sql, params)
}

return retry(update, retryConfig, dbLimitReached)
}

/**
Expand Down Expand Up @@ -647,7 +620,8 @@ BEGIN CATCH
);
END CATCH
`
return sqlService.modify(wrappedSQL, params)
const modify = async () => sqlService.modify(wrappedSQL, params)
return retry(modify, retryConfig, dbLimitReached)
}

module.exports = sqlService
8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@
"author": "Guy Harwood, Jon Shipley, George Chatzigiannis",
"license": "GPL-3.0",
"dependencies": {
"dotenv": "^7.0.0",
"express": "^4.16.4",
"moment": "^2.22.2",
"mssql": "^5.0.3",
"ramda": "^0.25.0",
"uuid": "^3.3.2",
"winston": "^3.0.0"
"uuid": "^3.3.2"
},
"devDependencies": {
"jasmine-console-reporter": "^3.0.0",
"standard": "^12.0.1"
},
"scripts": {
"lint": "standard"
"lint": "standard",
"http": "node ./http-example.js"
}
}
Loading

0 comments on commit 931d3cf

Please sign in to comment.