Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added async queue #466

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,6 @@
}
},
"API_DOC_ENDPOINTS_ENABLED": true,
"FEATURE_ENABLE_EXTENDED_PARTY_ID_TYPE": false
"FEATURE_ENABLE_EXTENDED_PARTY_ID_TYPE": false,
"ASYNC_CONCURRENCY": 5
}
38 changes: 15 additions & 23 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "account-lookup-service",
"description": "Account Lookup Service is used to validate Party and Participant lookups.",
"version": "15.2.0",
"version": "15.3.0-snapshot.2",
"license": "Apache-2.0",
"author": "ModusBox",
"contributors": [
Expand Down Expand Up @@ -94,6 +94,7 @@
"@now-ims/hapi-now-auth": "2.1.0",
"ajv": "8.12.0",
"ajv-keywords": "5.1.0",
"async": "^3.2.5",
"blipp": "4.0.2",
"commander": "11.1.0",
"hapi-auth-bearer-token": "8.0.0",
Expand All @@ -107,15 +108,15 @@
"uuid4": "2.0.3"
},
"devDependencies": {
"@types/jest": "29.5.10",
"@types/jest": "29.5.11",
"audit-ci": "^6.6.1",
"axios": "1.6.2",
"docdash": "2.0.2",
"get-port": "5.1.1",
"jest": "29.7.0",
"jest-junit": "16.0.0",
"jsdoc": "4.0.2",
"nodemon": "3.0.1",
"nodemon": "3.0.2",
"npm-audit-resolver": "3.0.0-RC.0",
"npm-check-updates": "16.14.11",
"nyc": "15.1.0",
Expand Down
48 changes: 44 additions & 4 deletions src/handlers/parties/{Type}/{ID}.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,24 @@ const EventSdk = require('@mojaloop/event-sdk')
const LibUtil = require('../../../lib/util')
const parties = require('../../../domain/parties')
const Metrics = require('@mojaloop/central-services-metrics')
const Async = require('async')
const Config = require('../../../lib/config')

const asyncQueue1 = Async.queue(function (task, callback) {
parties.getPartiesByTypeAndID(task.headers, task.params, task.method, task.query, task.span).then(() => {
callback(null)
}).catch((err) => {
callback(err)
})
}, Config.ASYNC_CONCURRENCY)

const asyncQueue2 = Async.queue(function (task, callback) {
parties.putPartiesByTypeAndID(task.headers, task.params, task.method, task.payload, task.dataUri).then(() => {
callback(null)
}).catch((err) => {
callback(err)
})
}, Config.ASYNC_CONCURRENCY)

/**
* Operations on /parties/{Type}/{ID}
Expand Down Expand Up @@ -56,9 +74,20 @@ module.exports = {
}, EventSdk.AuditEventAction.start)
// Here we call an async function- but as we send an immediate sync response, _all_ errors
// _must_ be handled by getPartiesByTypeAndID.
parties.getPartiesByTypeAndID(request.headers, request.params, request.method, request.query, span, request.server.app.cache).catch(err => {
request.server.log(['error'], `ERROR - getPartiesByTypeAndID: ${LibUtil.getStackOrInspect(err)}`)
asyncQueue1.push({
headers: { ...request.headers },
params: { ...request.params },
method: request.method,
query: { ...request.query },
span
}, function (err) {
if (err) {
request.server.log(['error'], `ERROR - getPartiesByTypeAndID: ${LibUtil.getStackOrInspect(err)}`)
}
})
// parties.getPartiesByTypeAndID(request.headers, request.params, request.method, request.query, span).catch(err => {
// request.server.log(['error'], `ERROR - getPartiesByTypeAndID: ${LibUtil.getStackOrInspect(err)}`)
// })
histTimerEnd({ success: true })
return h.response().code(202)
},
Expand All @@ -85,9 +114,20 @@ module.exports = {
}, EventSdk.AuditEventAction.start)
// Here we call an async function- but as we send an immediate sync response, _all_ errors
// _must_ be handled by putPartiesByTypeAndID.
parties.putPartiesByTypeAndID(request.headers, request.params, request.method, request.payload, request.dataUri).catch(err => {
request.server.log(['error'], `ERROR - putPartiesByTypeAndID: ${LibUtil.getStackOrInspect(err)}`)
asyncQueue2.push({
headers: { ...request.headers },
params: { ...request.params },
method: request.method,
payload: { ...request.payload },
dataUri: request.dataUri
}, function (err) {
if (err) {
request.server.log(['error'], `ERROR - putPartiesByTypeAndID: ${LibUtil.getStackOrInspect(err)}`)
}
})
// parties.putPartiesByTypeAndID(request.headers, request.params, request.method, request.payload, request.dataUri).catch(err => {
// request.server.log(['error'], `ERROR - putPartiesByTypeAndID: ${LibUtil.getStackOrInspect(err)}`)
// })
histTimerEnd({ success: true })
return h.response().code(200)
}
Expand Down
3 changes: 2 additions & 1 deletion src/lib/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ const config = {
JWS_SIGNING_KEY_PATH: RC.ENDPOINT_SECURITY.JWS.JWS_SIGNING_KEY_PATH,
API_DOC_ENDPOINTS_ENABLED: RC.API_DOC_ENDPOINTS_ENABLED || false,
FEATURE_ENABLE_EXTENDED_PARTY_ID_TYPE: RC.FEATURE_ENABLE_EXTENDED_PARTY_ID_TYPE || false,
PROTOCOL_VERSIONS: getProtocolVersions(DEFAULT_PROTOCOL_VERSION, RC.PROTOCOL_VERSIONS)
PROTOCOL_VERSIONS: getProtocolVersions(DEFAULT_PROTOCOL_VERSION, RC.PROTOCOL_VERSIONS),
ASYNC_CONCURRENCY: RC.ASYNC_CONCURRENCY || 5
}

if (config.JWS_SIGN) {
Expand Down
3 changes: 2 additions & 1 deletion src/models/oracle/oracleEndpointCached.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ const getOracleEndpointCached = async (params) => {
exports.initialize = async () => {
/* Register as cache client */
const oracleEndpointCacheClientMeta = {
id: 'oracleEndpoints'
id: 'oracleEndpoints',
preloadCache: async () => Promise.resolve()
}

cacheClient = Cache.registerCacheClient(oracleEndpointCacheClientMeta)
Expand Down
6 changes: 5 additions & 1 deletion src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const Migrator = require('./lib/migrator')
const Handlers = require('./handlers')
const Routes = require('./handlers/routes')
const Cache = require('./lib/cache')
const OracleEndpointCache = require('./models/oracle/oracleEndpointCached')

const connectDatabase = async () => {
return Db.connect(Config.DATABASE)
Expand Down Expand Up @@ -77,7 +78,8 @@ const createServer = async (port, api, routes, isAdmin) => {
}
})
server.app.cache = Cache.registerCacheClient({
id: 'serverGeneralCache'
id: 'serverGeneralCache',
preloadCache: async () => Promise.resolve()
})
await Plugins.registerPlugins(server, api, isAdmin)
await server.ext([
Expand Down Expand Up @@ -119,6 +121,8 @@ const initializeApi = async (port = Config.API_PORT) => {
Logger.isInfoEnabled && Logger.info(`Server running on ${server.info.host}:${server.info.port}`)
await ParticipantEndpointCache.initializeCache(Config.CENTRAL_SHARED_ENDPOINT_CACHE_CONFIG)
await ParticipantCache.initializeCache(Config.CENTRAL_SHARED_PARTICIPANT_CACHE_CONFIG)
await OracleEndpointCache.initialize()
await Cache.initCache()
return server
}

Expand Down