Skip to content

Commit

Permalink
Bug/64760 ps report run not populating the table (#2803)
Browse files Browse the repository at this point in the history
* Leave the csv staging file intact (tmp)

* Disable cleanup of staging file

* Instrument ps-report-writer function

* add invocationId to logs

* Comment out cleanup() call for ps report

* Instrument ps report 3 with invocation id

* Add duplicate message protection to the ps-report-writer-4 function

* PS report staging 3b: only create a new CSV file if one doesn't exist

This is in case of multiple workers overwriting each others work.

* Add comment

* ps report writer - check for existing before create

* add
  • Loading branch information
jon-shipley authored May 31, 2024
1 parent 7d14085 commit 5bea4f8
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 43 deletions.
3 changes: 2 additions & 1 deletion deploy/service-bus/deploy.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ const config = {
{
name: 'ps-report-staging-complete',
defaultMessageTimeToLive: sixHours,
maxSizeInMegabytes: {}.hasOwnProperty.call(process.env, 'SERVICE_BUS_QUEUE_MAX_SIZE_MEGABYTES_PS_REPORT_EXPORT') ? parseInt(process.env.SERVICE_BUS_QUEUE_MAX_SIZE_MEGABYTES_PS_REPORT_EXPORT, 10) : eightyGigabytes
maxSizeInMegabytes: {}.hasOwnProperty.call(process.env, 'SERVICE_BUS_QUEUE_MAX_SIZE_MEGABYTES_PS_REPORT_EXPORT') ? parseInt(process.env.SERVICE_BUS_QUEUE_MAX_SIZE_MEGABYTES_PS_REPORT_EXPORT, 10) : eightyGigabytes,
requiresDuplicateDetection: true
},
{
name: 'pupil-login',
Expand Down
38 changes: 21 additions & 17 deletions tslib/src/functions-ps-report/ps-report-3b-stage-csv-file/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { CsvTransformer } from './csv-transformer'
import type { PsReportStagingStartMessage, PsReportStagingCompleteMessage } from '../common/ps-report-service-bus-messages'

const functionName = 'ps-report-3b-stage-csv-file'
let logPrefix = functionName
const receiveQueueName = 'ps-report-export'
const sleep = async (ms: number): Promise<void> => { return new Promise((resolve) => setTimeout(resolve, ms)) }
let emptyPollTime: undefined | number
Expand All @@ -24,11 +25,12 @@ let psReportStagingDataService: PsReportStagingDataService
*
*/
const PsReportStageCsvFile: AzureFunction = async function (context: Context, incomingMessage: PsReportStagingStartMessage): Promise<void> {
context.log(`${functionName}: starting`)
logPrefix = functionName + ': ' + context.invocationId
context.log(`${logPrefix}: starting`)
const start = performance.now()

if (config.ServiceBus.ConnectionString === undefined) {
throw new Error(`${functionName}: ServiceBusConnection env var is missing`)
throw new Error(`${logPrefix}: ServiceBusConnection env var is missing`)
}

let busClient: sb.ServiceBusClient
Expand All @@ -43,18 +45,18 @@ const PsReportStageCsvFile: AzureFunction = async function (context: Context, in

// connect to service bus...
try {
context.log(`${functionName}: connecting to service bus...`)
context.log(`${logPrefix}: connecting to service bus...`)
busClient = new sb.ServiceBusClient(config.ServiceBus.ConnectionString)
receiver = busClient.createReceiver(receiveQueueName, {
receiveMode: 'peekLock'
})
context.log(`${functionName}: connected to service bus instance ${busClient.fullyQualifiedNamespace}`)
context.log(`${logPrefix}: connected to service bus instance ${busClient.fullyQualifiedNamespace}`)
} catch (error) {
let errorMessage = 'unknown error'
if (error instanceof Error) {
errorMessage = error.message
}
context.log.error(`${functionName}: unable to connect to service bus at this time: ${errorMessage}`)
context.log.error(`${logPrefix}: unable to connect to service bus at this time: ${errorMessage}`)
throw error
}

Expand All @@ -69,28 +71,30 @@ const PsReportStageCsvFile: AzureFunction = async function (context: Context, in
if (error instanceof Error) {
errorMessage = error.message
}
context.log.error(`${functionName}: unable to connect to service bus at this time: ${errorMessage}`)
context.log.error(`${logPrefix}: unable to connect to service bus at this time: ${errorMessage}`)
throw error
}

let done = false
let batchIndex = 0

while (!done) {
context.log(`${functionName}: starting batch ${batchIndex + 1}`)
context.log(`${logPrefix}: starting batch ${batchIndex + 1}`)
const messageBatch = await receiver.receiveMessages(config.PsReport.StagingFile.ReadMessagesPerBatch)
if (RA.isNilOrEmpty(messageBatch)) {
context.log(`${functionName}: no messages to process`)
context.log(`${logPrefix}: no messages to process`)
if (emptyPollTime === undefined) {
context.log(`${functionName}: Setting flag to mark the time since no messages were found.`)
context.log(`${logPrefix}: Setting flag to mark the time since no messages were found.`)
emptyPollTime = getEpoch()
}
const nowEpoch = getEpoch()
const timeSinceLastMessage = nowEpoch - emptyPollTime
context.log(`${functionName}: nowEpoch: ${nowEpoch} emptyPollTime: ${emptyPollTime} timeSinceLastMessage: ${timeSinceLastMessage} target wait time is ${config.PsReport.StagingFile.WaitTimeToTriggerStagingComplete}`)
context.log(`${logPrefix}: nowEpoch: ${nowEpoch} emptyPollTime: ${emptyPollTime} timeSinceLastMessage: ${timeSinceLastMessage} target wait time is ${config.PsReport.StagingFile.WaitTimeToTriggerStagingComplete}`)
if (timeSinceLastMessage >= config.PsReport.StagingFile.WaitTimeToTriggerStagingComplete) {
context.log(`${functionName}: exiting as no new messages in ${config.PsReport.StagingFile.WaitTimeToTriggerStagingComplete} seconds.`)
context.log(`${logPrefix}: exiting (and sending output binding message) as no new messages in ${config.PsReport.StagingFile.WaitTimeToTriggerStagingComplete} seconds.`)
done = true

// This message should be delivered once - duplicatePrevention is on the sb queue.
const completeMessage: PsReportStagingCompleteMessage = {
filename: incomingMessage.filename,
jobUuid: incomingMessage.jobUuid
Expand All @@ -99,15 +103,15 @@ const PsReportStageCsvFile: AzureFunction = async function (context: Context, in
await disconnect()
return finish(start, context)
} else {
context.log(`${functionName}: waiting for messages...`)
// context.log(`${logPrefix}: waiting for messages...`)
await sleep(config.PsReport.StagingFile.PollInterval) // wait n milliseconds before polling again. Default is 10.
}
} else {
// Messages were received
// so, 1st reset the timer, as it may have been set previously
emptyPollTime = undefined
// and process the messages
context.log(`${functionName}: received batch of ${messageBatch.length} messages`)
context.log(`${logPrefix}: received batch of ${messageBatch.length} messages`)
await process(context, messageBatch, receiver)
}

Expand All @@ -122,15 +126,15 @@ function finish (start: number, context: Context): void {
const end = performance.now()
const durationInMilliseconds = end - start
const timeStamp = new Date().toISOString()
context.log(`${functionName}: ${timeStamp} run complete: ${durationInMilliseconds} ms`)
context.log(`${logPrefix}: ${timeStamp} run complete: ${durationInMilliseconds} ms`)
}

async function completeMessages (messageBatch: sb.ServiceBusReceivedMessage[], receiver: sb.ServiceBusReceiver, context: Context): Promise<void> {
// the sql updates are committed, complete the messages.
// if any completes fail, just abandon.
// the sql updates are idempotent and as such replaying a message
// will not have an adverse effect.
context.log(`${functionName}: batch processed successfully, completing all messages in batch`)
context.log(`${logPrefix}: batch processed successfully, completing all messages in batch`)
for (let index = 0; index < messageBatch.length; index++) {
const msg = messageBatch[index]
try {
Expand All @@ -143,7 +147,7 @@ async function completeMessages (messageBatch: sb.ServiceBusReceivedMessage[], r
if (error instanceof Error) {
errorMessage = error.message
}
context.log.error(`${functionName}: unable to abandon message:${errorMessage}`)
context.log.error(`${logPrefix}: unable to abandon message:${errorMessage}`)
// do nothing.
// the lock will expire and message reprocessed at a later time
}
Expand All @@ -161,7 +165,7 @@ async function abandonMessages (messageBatch: sb.ServiceBusReceivedMessage[], re
if (error instanceof Error) {
errorMessage = error.message
}
context.log.error(`${functionName}: unable to abandon message:${errorMessage}`)
context.log.error(`${logPrefix}: unable to abandon message:${errorMessage}`)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ export class PsReportStagingDataService {
*/
public async createAppendBlock (): Promise<void> {
const containerService = this.blobService.getContainerClient(this.containerName)
// Create the container if missing
await containerService.createIfNotExists()
const appendBlobService = containerService.getAppendBlobClient(this.blobName)
await appendBlobService.create()
// Create the CSV file.
await appendBlobService.createIfNotExists()
}

/**
Expand Down
11 changes: 7 additions & 4 deletions tslib/src/functions-ps-report/ps-report-4-writer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { PsReportWriterService } from './ps-report-writer.service'
import { type PsReportStagingCompleteMessage } from '../common/ps-report-service-bus-messages'
import { JobDataService } from '../../services/data/job.data.service'
import { JobStatusCode } from '../../common/job-status-code'
const funcName = 'ps-report-4-writer'
let funcName = 'ps-report-4-writer'

const serviceBusQueueTrigger: AzureFunction = async function (context: Context, incomingMessage: PsReportStagingCompleteMessage): Promise<void> {
const start = performance.now()
Expand All @@ -16,12 +16,14 @@ const serviceBusQueueTrigger: AzureFunction = async function (context: Context,

async function bulkUpload (context: Context, incomingMessage: PsReportStagingCompleteMessage): Promise<void> {
let dbTable: string = ''
const service = new PsReportWriterService(context.log)
const service = new PsReportWriterService(context.log, context.invocationId)
const jobDataService = new JobDataService()
funcName = funcName + ': ' + context.invocationId
try {
context.log.verbose(`${funcName}: creating new destination table in SQL Server`)
dbTable = await service.createDestinationTableAndView(incomingMessage)
dbTable = await service.createDestinationTableAndViewIfNotExists(incomingMessage)
context.log.verbose(`${funcName}: new table created ${dbTable}`)

await service.prepareForUpload(incomingMessage.filename)
context.log(`${funcName}: starting bulk upload from ${incomingMessage.filename} into table ${dbTable}`)
await service.bulkUpload(incomingMessage, dbTable) // the container is *known* and is stored in the location path of the database 'EXTERNAL DATA SOURCE'.
Expand All @@ -36,7 +38,8 @@ async function bulkUpload (context: Context, incomingMessage: PsReportStagingCom
await jobDataService.setJobComplete(incomingMessage.jobUuid,
JobStatusCode.Failed, JSON.stringify(error))
}
await service.cleanup(incomingMessage.filename)
// Live debugging: 2024-05-31 for PS report. To be re-enabled.
// await service.cleanup(incomingMessage.filename)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,64 @@ import { BlobService } from '../../azure/blob-service'
import * as mssql from 'mssql'
import config from '../../config'
import type { PsReportStagingCompleteMessage } from '../common/ps-report-service-bus-messages'
import * as R from 'ramda'

const containerName = 'ps-report-bulk-upload'

export class PsReportWriterService {
private readonly sqlService: ISqlService
private readonly logger: ILogger
private readonly invocationId: string
private readonly logServiceName = 'PsReportWriterService'

constructor (logger?: ILogger, sqlService?: ISqlService) {
if (logger === undefined) {
logger = new ConsoleLogger()
}
this.logger = logger
constructor (logger?: ILogger, invocationId?: string, sqlService?: ISqlService) {
this.logger = logger ?? new ConsoleLogger()
this.invocationId = invocationId ?? 'n/a'
this.sqlService = sqlService ?? new SqlService(this.logger)
}

public logPrefix (): string {
return `${this.logServiceName}: ${this.invocationId}`
}

if (sqlService === undefined) {
sqlService = new SqlService(this.logger)
public async tableExists (tableName: string): Promise<boolean> {
const sql = `
SELECT
*
FROM
information_schema.tables
WHERE
table_name = @name`

const params = [
{ name: 'name', value: tableName, type: mssql.NVarChar }
]
const res = this.sqlService.query(sql, params)
// empty: res => {}
if (R.isEmpty(res)) {
return true
}
this.sqlService = sqlService
return false
}

/**
*
* @returns Create a new ps report table and return the table name
*/
public async createDestinationTableAndView (incomingMessage: PsReportStagingCompleteMessage): Promise<string> {
public async createDestinationTableAndViewIfNotExists (incomingMessage: PsReportStagingCompleteMessage): Promise<string> {
let ds = moment().format('YYYY_MM_DDTHHmm') // default
// Match 'ps-report-staging-2024-02-27-1510.csv'
const matches = incomingMessage.filename.match(/\d\d\d\d-\d\d-\d\d-\d\d\d\d/)
if (matches !== null) {
ds = matches[0].replaceAll('-', '_')
}
const newTableName = `psychometricReport_${ds}`

const tableExists = await this.tableExists(newTableName)
if (tableExists) {
throw new Error('Table already exists: ' + newTableName)
}

const sql1 = `
CREATE TABLE mtc_results.${newTableName} (
PupilId int NOT NULL CONSTRAINT [PK_${newTableName}] PRIMARY KEY,
Expand Down Expand Up @@ -120,27 +147,27 @@ export class PsReportWriterService {
PupilUPN ASC
) WITH ( PAD_INDEX = OFF,FILLFACTOR = 100,SORT_IN_TEMPDB = OFF, IGNORE_DUP_KEY = OFF, STATISTICS_NORECOMPUTE = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON ) ON [PRIMARY]
`
this.logger.verbose(`${this.logServiceName}: creating table ${newTableName}`)
this.logger.verbose(`${this.logPrefix()}: creating table ${newTableName}`)
await this.sqlService.modify(fullSql, [])

this.logger.verbose(`${this.logServiceName}: Creating trigger`)
this.logger.verbose(`${this.logPrefix()}: Creating trigger`)
await this.sqlService.modify(triggerSql, [])

this.logger.verbose(`${this.logServiceName}: Creating IX1`)
this.logger.verbose(`${this.logPrefix()}: Creating IX1`)
await this.sqlService.modify(ix1Sql, [])

this.logger.verbose(`${this.logServiceName}: Creating IX2`)
this.logger.verbose(`${this.logPrefix()}: Creating IX2`)
await this.sqlService.modify(ix2Sql, [])

this.logger.info(`${this.logServiceName}: new table ${newTableName} created`)
this.logger.info(`${this.logPrefix()}: new table ${newTableName} created`)

const viewSql = `
CREATE OR ALTER VIEW [mtc_results].[vewPsychometricReport]
AS
SELECT * FROM [mtc_results].[${newTableName}]
`
await this.sqlService.modify(viewSql, [])
this.logger.info(`${this.logServiceName}: psychometricReport view recreated`)
this.logger.info(`${this.logPrefix()}: psychometricReport view recreated`)
return newTableName
}

Expand All @@ -151,7 +178,7 @@ export class PsReportWriterService {
public async prepareForUpload (blobFile: string): Promise<void> {
const blobService = new BlobService()
const containerUrl = await blobService.getContainerUrl(containerName)
this.logger.verbose(`${this.logServiceName}: container url is ${containerUrl}`)
this.logger.verbose(`${this.logPrefix()}: container url is ${containerUrl}`)
const sasToken = await blobService.getBlobReadWriteSasToken(containerName, blobFile)
const sql = `
IF (SELECT COUNT(*) FROM sys.database_scoped_credentials WHERE name = 'PsReportBulkUploadCredential') = 0
Expand Down Expand Up @@ -209,7 +236,7 @@ export class PsReportWriterService {
* error in this block means we are running in Azure.
*/
const result = await this.sqlService.query(sql2, [])
this.logger.verbose(`${this.logServiceName} OS is ${JSON.stringify(result)}`)
this.logger.verbose(`${this.logPrefix()}: OS is ${JSON.stringify(result)}`)
let sqlConfig: mssql.config

if (result[0].engineEdition === 'Enterprise' || result[0].engineEdition === 'Azure SQL Edge') {
Expand Down Expand Up @@ -255,14 +282,15 @@ export class PsReportWriterService {
ROWTERMINATOR = '\r\n')
;`
await mssql.query(sql)
await this.cleanup(sFilename)
// tmp
// await this.cleanup(sFilename)
}

public async cleanup (filename: string): Promise<void> {
this.logger.info(`${this.logServiceName}: cleanup() called`)
this.logger.info(`${this.logPrefix()}: cleanup() called`)
// Remove CSV file
const blobService = new BlobService()
await blobService.deleteBlob(filename, containerName)
this.logger.info(`${this.logServiceName}: csv file ${filename} deleted.`)
this.logger.info(`${this.logPrefix()}: csv file ${filename} deleted.`)
}
}

0 comments on commit 5bea4f8

Please sign in to comment.