-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Feature/replay poison checks (#1254)
* ideas * [functions-app] add poison queue replay function (no bindings) * [functions-app] remove method that was not used * [functions-app] set to disabled and a valid cron expression (once per year: 12 Dec 14:30)
- Loading branch information
1 parent
50761de
commit 0c19015
Showing
7 changed files
with
244 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
'use strict' | ||
|
||
require('dotenv').config() | ||
const azureStorage = require('azure-storage') | ||
const bluebird = require('bluebird') | ||
|
||
let azureTableService | ||
let azureQueueService | ||
let azureBlobService | ||
|
||
const azureStorageHelper = { | ||
/** | ||
* Promisify and cache the azureTableService library as it still lacks Promise support | ||
*/ | ||
getPromisifiedAzureTableService: function getPromisifiedAzureTableService () { | ||
if (azureTableService) { | ||
return azureTableService | ||
} | ||
azureTableService = azureStorage.createTableService() | ||
bluebird.promisifyAll(azureTableService, { | ||
promisifier: (originalFunction) => function (...args) { | ||
return new Promise((resolve, reject) => { | ||
try { | ||
originalFunction.call(this, ...args, (error, result, response) => { | ||
if (error) { | ||
return reject(error) | ||
} | ||
resolve({ result, response }) | ||
}) | ||
} catch (error) { | ||
reject(error) | ||
} | ||
}) | ||
} | ||
}) | ||
|
||
return azureTableService | ||
}, | ||
|
||
/** | ||
* Promisify the azureQueueService | ||
* @return {*} | ||
*/ | ||
getPromisifiedAzureQueueService: function getPromisifiedAzureQueueService () { | ||
if (azureQueueService) { | ||
return azureQueueService | ||
} | ||
azureQueueService = azureStorage.createQueueService() | ||
bluebird.promisifyAll(azureQueueService, { | ||
promisifier: (originalFunction) => function (...args) { | ||
return new Promise((resolve, reject) => { | ||
try { | ||
originalFunction.call(this, ...args, (error, result, response) => { | ||
if (error) { | ||
return reject(error) | ||
} | ||
resolve({ result, response }) | ||
}) | ||
} catch (error) { | ||
reject(error) | ||
} | ||
}) | ||
} | ||
}) | ||
|
||
return azureQueueService | ||
}, | ||
|
||
/** | ||
* Promisify the azureBlobService | ||
* @return {*} | ||
*/ | ||
getPromisifiedAzureBlobService: function getPromisifiedAzureBlobService () { | ||
if (azureBlobService) { | ||
return azureBlobService | ||
} | ||
azureBlobService = azureStorage.createBlobService() | ||
bluebird.promisifyAll(azureBlobService, { | ||
promisifier: (originalFunction) => function (...args) { | ||
return new Promise((resolve, reject) => { | ||
try { | ||
originalFunction.call(this, ...args, (error, result, response) => { | ||
if (error) { | ||
return reject(error) | ||
} | ||
resolve({ result, response }) | ||
}) | ||
} catch (error) { | ||
reject(error) | ||
} | ||
}) | ||
} | ||
}) | ||
|
||
return azureBlobService | ||
} | ||
} | ||
|
||
module.exports = azureStorageHelper |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
11 changes: 11 additions & 0 deletions
11
functions-app/poison-replay-completed-checks/function.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
{ | ||
"disabled": true, | ||
"bindings": [ | ||
{ | ||
"schedule": "0 30 14 20 12 *", | ||
"name": "poisonReplayCompletedChecks", | ||
"type": "timerTrigger", | ||
"direction": "in" | ||
} | ||
] | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
'use strict' | ||
|
||
const fs = require('fs') | ||
const uuid = require('uuid/v4') | ||
const path = require('path') | ||
const compressionService = require('../../functions/lib/compression.service') | ||
|
||
module.exports = async function (context, check) { | ||
let version | ||
let message | ||
if (check.version && check.version === '2') { | ||
version = 2 | ||
context.log('decompressing v2 message') | ||
message = compressionService.decompress(check.archive) | ||
} else { | ||
version = 1 | ||
message = check | ||
} | ||
const id = uuid() | ||
const file = path.join('./', `v${version}`, `${id}.json`) | ||
context.log(`saving to ${file}`) | ||
try { | ||
const jsonString = JSON.stringify(message) | ||
fs.writeFileSync(file, jsonString) | ||
} catch (error) { | ||
context.log(error) | ||
} | ||
} | ||
|
||
/* | ||
1. understand which schools are affected | ||
2. | ||
*/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
'use strict' | ||
|
||
const { performance } = require('perf_hooks') | ||
|
||
const v1 = require('./v1') | ||
const name = 'poison-replay-completed-checks' | ||
|
||
module.exports = async function (context) { | ||
const start = performance.now() | ||
|
||
let meta | ||
try { | ||
meta = await v1.process(context.log) | ||
} catch (error) { | ||
context.log.error(`${name}: ERROR: ${error.message}`) | ||
throw error | ||
} | ||
|
||
const end = performance.now() | ||
const durationInMilliseconds = end - start | ||
const timeStamp = new Date().toISOString() | ||
context.log(`${name}: ${timeStamp} processed ${meta.processCount} checks, errors: ${meta.errorCount}; run took ${durationInMilliseconds} ms`) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
'use strict' | ||
const azureStorageHelper = require('../lib/azure-storage-helper') | ||
const R = require('ramda') | ||
|
||
const functionName = 'poison-replay-completed-checks' | ||
const completedCheckPoisonQueue = 'check-complete-poison' | ||
const completedCheckQueue = 'check-complete' | ||
let logger | ||
|
||
async function replayMessage (result) { | ||
const azureQueueService = azureStorageHelper.getPromisifiedAzureQueueService() | ||
const completedCheckMessage = JSON.parse(Buffer.from(result.messageText, 'base64').toString()) | ||
const checkCode = R.prop('checkCode', completedCheckMessage) | ||
|
||
if (checkCode) { | ||
logger.info(`${functionName}: replaying check [${checkCode}] onto queue [${completedCheckQueue}]`) | ||
} else { | ||
logger.info(`${functionName}: checkCode not found - replaying unlikely to work`) | ||
} | ||
|
||
// replay message | ||
const insertResult = await azureQueueService.createMessageAsync(completedCheckQueue, result.messageText) | ||
if (insertResult.response.isSuccessful === true) { | ||
// delete the message from the poison queue | ||
await azureQueueService.deleteMessageAsync(completedCheckPoisonQueue, result.messageId, result.popReceipt) | ||
} | ||
} | ||
|
||
const v1 = { | ||
|
||
/** | ||
* Replay multiple messages from the poison queue to the processing queue | ||
* See: https://azure.github.io/azure-storage-node/QueueService.html#getMessages__anchor | ||
* @param loggerArg Context.log | ||
* @return {Promise<{processCount: number, errorCount: number}>} | ||
*/ | ||
process: async function process (loggerArg) { | ||
logger = loggerArg | ||
let messages | ||
let done = false | ||
const meta = { processCount: 0, errorCount: 0 } | ||
const azureQueueService = azureStorageHelper.getPromisifiedAzureQueueService() | ||
|
||
while (!done) { | ||
try { | ||
const options = { numOfMessages: 32, visibilityTimeout: 120 } | ||
messages = await azureQueueService.getMessagesAsync(completedCheckPoisonQueue, options) | ||
logger.info(`${functionName}: Got ${messages.result.length} messages from the queue`) | ||
} catch (error) { | ||
logger.error(`${functionName}: Failed to fetch messages: ${error.message}`) | ||
throw error | ||
} | ||
|
||
for (let message of messages.result) { | ||
try { | ||
await replayMessage(message) | ||
meta.processCount += 1 | ||
} catch (error) { | ||
logger.error(`${functionName}: Failed to replay message: ${error.message}`) | ||
meta.errorCount += 1 | ||
} | ||
} | ||
if (messages.result.length === 0) { | ||
done = true | ||
} | ||
} | ||
|
||
return meta | ||
} | ||
} | ||
|
||
module.exports = v1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters