diff --git a/deploy/service-bus/deploy.config.js b/deploy/service-bus/deploy.config.js index 76f17a3d79..f6dbc8bc91 100644 --- a/deploy/service-bus/deploy.config.js +++ b/deploy/service-bus/deploy.config.js @@ -92,7 +92,8 @@ const config = { { name: 'ps-report-staging-complete', defaultMessageTimeToLive: sixHours, - maxSizeInMegabytes: {}.hasOwnProperty.call(process.env, 'SERVICE_BUS_QUEUE_MAX_SIZE_MEGABYTES_PS_REPORT_EXPORT') ? parseInt(process.env.SERVICE_BUS_QUEUE_MAX_SIZE_MEGABYTES_PS_REPORT_EXPORT, 10) : eightyGigabytes + maxSizeInMegabytes: {}.hasOwnProperty.call(process.env, 'SERVICE_BUS_QUEUE_MAX_SIZE_MEGABYTES_PS_REPORT_EXPORT') ? parseInt(process.env.SERVICE_BUS_QUEUE_MAX_SIZE_MEGABYTES_PS_REPORT_EXPORT, 10) : eightyGigabytes, + requiresDuplicateDetection: true }, { name: 'pupil-login', diff --git a/tslib/src/functions-ps-report/ps-report-3b-stage-csv-file/index.ts b/tslib/src/functions-ps-report/ps-report-3b-stage-csv-file/index.ts index e4c810eb3f..c37fc5f2fd 100644 --- a/tslib/src/functions-ps-report/ps-report-3b-stage-csv-file/index.ts +++ b/tslib/src/functions-ps-report/ps-report-3b-stage-csv-file/index.ts @@ -10,6 +10,7 @@ import { CsvTransformer } from './csv-transformer' import type { PsReportStagingStartMessage, PsReportStagingCompleteMessage } from '../common/ps-report-service-bus-messages' const functionName = 'ps-report-3b-stage-csv-file' +let logPrefix = functionName const receiveQueueName = 'ps-report-export' const sleep = async (ms: number): Promise => { return new Promise((resolve) => setTimeout(resolve, ms)) } let emptyPollTime: undefined | number @@ -24,11 +25,12 @@ let psReportStagingDataService: PsReportStagingDataService * */ const PsReportStageCsvFile: AzureFunction = async function (context: Context, incomingMessage: PsReportStagingStartMessage): Promise { - context.log(`${functionName}: starting`) + logPrefix = functionName + ': ' + context.invocationId + context.log(`${logPrefix}: starting`) const start = performance.now() if (config.ServiceBus.ConnectionString === undefined) { - throw new Error(`${functionName}: ServiceBusConnection env var is missing`) + throw new Error(`${logPrefix}: ServiceBusConnection env var is missing`) } let busClient: sb.ServiceBusClient @@ -43,18 +45,18 @@ const PsReportStageCsvFile: AzureFunction = async function (context: Context, in // connect to service bus... try { - context.log(`${functionName}: connecting to service bus...`) + context.log(`${logPrefix}: connecting to service bus...`) busClient = new sb.ServiceBusClient(config.ServiceBus.ConnectionString) receiver = busClient.createReceiver(receiveQueueName, { receiveMode: 'peekLock' }) - context.log(`${functionName}: connected to service bus instance ${busClient.fullyQualifiedNamespace}`) + context.log(`${logPrefix}: connected to service bus instance ${busClient.fullyQualifiedNamespace}`) } catch (error) { let errorMessage = 'unknown error' if (error instanceof Error) { errorMessage = error.message } - context.log.error(`${functionName}: unable to connect to service bus at this time: ${errorMessage}`) + context.log.error(`${logPrefix}: unable to connect to service bus at this time: ${errorMessage}`) throw error } @@ -69,7 +71,7 @@ const PsReportStageCsvFile: AzureFunction = async function (context: Context, in if (error instanceof Error) { errorMessage = error.message } - context.log.error(`${functionName}: unable to connect to service bus at this time: ${errorMessage}`) + context.log.error(`${logPrefix}: unable to connect to service bus at this time: ${errorMessage}`) throw error } @@ -77,20 +79,22 @@ const PsReportStageCsvFile: AzureFunction = async function (context: Context, in let batchIndex = 0 while (!done) { - context.log(`${functionName}: starting batch ${batchIndex + 1}`) + context.log(`${logPrefix}: starting batch ${batchIndex + 1}`) const messageBatch = await receiver.receiveMessages(config.PsReport.StagingFile.ReadMessagesPerBatch) if (RA.isNilOrEmpty(messageBatch)) { - context.log(`${functionName}: no messages to process`) + context.log(`${logPrefix}: no messages to process`) if (emptyPollTime === undefined) { - context.log(`${functionName}: Setting flag to mark the time since no messages were found.`) + context.log(`${logPrefix}: Setting flag to mark the time since no messages were found.`) emptyPollTime = getEpoch() } const nowEpoch = getEpoch() const timeSinceLastMessage = nowEpoch - emptyPollTime - context.log(`${functionName}: nowEpoch: ${nowEpoch} emptyPollTime: ${emptyPollTime} timeSinceLastMessage: ${timeSinceLastMessage} target wait time is ${config.PsReport.StagingFile.WaitTimeToTriggerStagingComplete}`) + context.log(`${logPrefix}: nowEpoch: ${nowEpoch} emptyPollTime: ${emptyPollTime} timeSinceLastMessage: ${timeSinceLastMessage} target wait time is ${config.PsReport.StagingFile.WaitTimeToTriggerStagingComplete}`) if (timeSinceLastMessage >= config.PsReport.StagingFile.WaitTimeToTriggerStagingComplete) { - context.log(`${functionName}: exiting as no new messages in ${config.PsReport.StagingFile.WaitTimeToTriggerStagingComplete} seconds.`) + context.log(`${logPrefix}: exiting (and sending output binding message) as no new messages in ${config.PsReport.StagingFile.WaitTimeToTriggerStagingComplete} seconds.`) done = true + + // This message should be delivered once - duplicatePrevention is on the sb queue. const completeMessage: PsReportStagingCompleteMessage = { filename: incomingMessage.filename, jobUuid: incomingMessage.jobUuid @@ -99,7 +103,7 @@ const PsReportStageCsvFile: AzureFunction = async function (context: Context, in await disconnect() return finish(start, context) } else { - context.log(`${functionName}: waiting for messages...`) + // context.log(`${logPrefix}: waiting for messages...`) await sleep(config.PsReport.StagingFile.PollInterval) // wait n milliseconds before polling again. Default is 10. } } else { @@ -107,7 +111,7 @@ const PsReportStageCsvFile: AzureFunction = async function (context: Context, in // so, 1st reset the timer, as it may have been set previously emptyPollTime = undefined // and process the messages - context.log(`${functionName}: received batch of ${messageBatch.length} messages`) + context.log(`${logPrefix}: received batch of ${messageBatch.length} messages`) await process(context, messageBatch, receiver) } @@ -122,7 +126,7 @@ function finish (start: number, context: Context): void { const end = performance.now() const durationInMilliseconds = end - start const timeStamp = new Date().toISOString() - context.log(`${functionName}: ${timeStamp} run complete: ${durationInMilliseconds} ms`) + context.log(`${logPrefix}: ${timeStamp} run complete: ${durationInMilliseconds} ms`) } async function completeMessages (messageBatch: sb.ServiceBusReceivedMessage[], receiver: sb.ServiceBusReceiver, context: Context): Promise { @@ -130,7 +134,7 @@ async function completeMessages (messageBatch: sb.ServiceBusReceivedMessage[], r // if any completes fail, just abandon. // the sql updates are idempotent and as such replaying a message // will not have an adverse effect. - context.log(`${functionName}: batch processed successfully, completing all messages in batch`) + context.log(`${logPrefix}: batch processed successfully, completing all messages in batch`) for (let index = 0; index < messageBatch.length; index++) { const msg = messageBatch[index] try { @@ -143,7 +147,7 @@ async function completeMessages (messageBatch: sb.ServiceBusReceivedMessage[], r if (error instanceof Error) { errorMessage = error.message } - context.log.error(`${functionName}: unable to abandon message:${errorMessage}`) + context.log.error(`${logPrefix}: unable to abandon message:${errorMessage}`) // do nothing. // the lock will expire and message reprocessed at a later time } @@ -161,7 +165,7 @@ async function abandonMessages (messageBatch: sb.ServiceBusReceivedMessage[], re if (error instanceof Error) { errorMessage = error.message } - context.log.error(`${functionName}: unable to abandon message:${errorMessage}`) + context.log.error(`${logPrefix}: unable to abandon message:${errorMessage}`) } } } diff --git a/tslib/src/functions-ps-report/ps-report-3b-stage-csv-file/ps-report-staging.data.service.ts b/tslib/src/functions-ps-report/ps-report-3b-stage-csv-file/ps-report-staging.data.service.ts index 0e940a5f2d..7087e947fe 100644 --- a/tslib/src/functions-ps-report/ps-report-3b-stage-csv-file/ps-report-staging.data.service.ts +++ b/tslib/src/functions-ps-report/ps-report-3b-stage-csv-file/ps-report-staging.data.service.ts @@ -24,9 +24,11 @@ export class PsReportStagingDataService { */ public async createAppendBlock (): Promise { const containerService = this.blobService.getContainerClient(this.containerName) + // Create the container if missing await containerService.createIfNotExists() const appendBlobService = containerService.getAppendBlobClient(this.blobName) - await appendBlobService.create() + // Create the CSV file. + await appendBlobService.createIfNotExists() } /** diff --git a/tslib/src/functions-ps-report/ps-report-4-writer/index.ts b/tslib/src/functions-ps-report/ps-report-4-writer/index.ts index ccd65f12e0..3adeb12f91 100644 --- a/tslib/src/functions-ps-report/ps-report-4-writer/index.ts +++ b/tslib/src/functions-ps-report/ps-report-4-writer/index.ts @@ -4,7 +4,7 @@ import { PsReportWriterService } from './ps-report-writer.service' import { type PsReportStagingCompleteMessage } from '../common/ps-report-service-bus-messages' import { JobDataService } from '../../services/data/job.data.service' import { JobStatusCode } from '../../common/job-status-code' -const funcName = 'ps-report-4-writer' +let funcName = 'ps-report-4-writer' const serviceBusQueueTrigger: AzureFunction = async function (context: Context, incomingMessage: PsReportStagingCompleteMessage): Promise { const start = performance.now() @@ -16,12 +16,14 @@ const serviceBusQueueTrigger: AzureFunction = async function (context: Context, async function bulkUpload (context: Context, incomingMessage: PsReportStagingCompleteMessage): Promise { let dbTable: string = '' - const service = new PsReportWriterService(context.log) + const service = new PsReportWriterService(context.log, context.invocationId) const jobDataService = new JobDataService() + funcName = funcName + ': ' + context.invocationId try { context.log.verbose(`${funcName}: creating new destination table in SQL Server`) - dbTable = await service.createDestinationTableAndView(incomingMessage) + dbTable = await service.createDestinationTableAndViewIfNotExists(incomingMessage) context.log.verbose(`${funcName}: new table created ${dbTable}`) + await service.prepareForUpload(incomingMessage.filename) context.log(`${funcName}: starting bulk upload from ${incomingMessage.filename} into table ${dbTable}`) await service.bulkUpload(incomingMessage, dbTable) // the container is *known* and is stored in the location path of the database 'EXTERNAL DATA SOURCE'. @@ -36,7 +38,8 @@ async function bulkUpload (context: Context, incomingMessage: PsReportStagingCom await jobDataService.setJobComplete(incomingMessage.jobUuid, JobStatusCode.Failed, JSON.stringify(error)) } - await service.cleanup(incomingMessage.filename) + // Live debugging: 2024-05-31 for PS report. To be re-enabled. + // await service.cleanup(incomingMessage.filename) } } diff --git a/tslib/src/functions-ps-report/ps-report-4-writer/ps-report-writer.service.ts b/tslib/src/functions-ps-report/ps-report-4-writer/ps-report-writer.service.ts index 6efae4cdd3..41a5b0dd0e 100644 --- a/tslib/src/functions-ps-report/ps-report-4-writer/ps-report-writer.service.ts +++ b/tslib/src/functions-ps-report/ps-report-4-writer/ps-report-writer.service.ts @@ -5,30 +5,51 @@ import { BlobService } from '../../azure/blob-service' import * as mssql from 'mssql' import config from '../../config' import type { PsReportStagingCompleteMessage } from '../common/ps-report-service-bus-messages' +import * as R from 'ramda' + const containerName = 'ps-report-bulk-upload' export class PsReportWriterService { private readonly sqlService: ISqlService private readonly logger: ILogger + private readonly invocationId: string private readonly logServiceName = 'PsReportWriterService' - constructor (logger?: ILogger, sqlService?: ISqlService) { - if (logger === undefined) { - logger = new ConsoleLogger() - } - this.logger = logger + constructor (logger?: ILogger, invocationId?: string, sqlService?: ISqlService) { + this.logger = logger ?? new ConsoleLogger() + this.invocationId = invocationId ?? 'n/a' + this.sqlService = sqlService ?? new SqlService(this.logger) + } + + public logPrefix (): string { + return `${this.logServiceName}: ${this.invocationId}` + } - if (sqlService === undefined) { - sqlService = new SqlService(this.logger) + public async tableExists (tableName: string): Promise { + const sql = ` + SELECT + * + FROM + information_schema.tables + WHERE + table_name = @name` + + const params = [ + { name: 'name', value: tableName, type: mssql.NVarChar } + ] + const res = this.sqlService.query(sql, params) + // empty: res => {} + if (R.isEmpty(res)) { + return true } - this.sqlService = sqlService + return false } /** * * @returns Create a new ps report table and return the table name */ - public async createDestinationTableAndView (incomingMessage: PsReportStagingCompleteMessage): Promise { + public async createDestinationTableAndViewIfNotExists (incomingMessage: PsReportStagingCompleteMessage): Promise { let ds = moment().format('YYYY_MM_DDTHHmm') // default // Match 'ps-report-staging-2024-02-27-1510.csv' const matches = incomingMessage.filename.match(/\d\d\d\d-\d\d-\d\d-\d\d\d\d/) @@ -36,6 +57,12 @@ export class PsReportWriterService { ds = matches[0].replaceAll('-', '_') } const newTableName = `psychometricReport_${ds}` + + const tableExists = await this.tableExists(newTableName) + if (tableExists) { + throw new Error('Table already exists: ' + newTableName) + } + const sql1 = ` CREATE TABLE mtc_results.${newTableName} ( PupilId int NOT NULL CONSTRAINT [PK_${newTableName}] PRIMARY KEY, @@ -120,19 +147,19 @@ export class PsReportWriterService { PupilUPN ASC ) WITH ( PAD_INDEX = OFF,FILLFACTOR = 100,SORT_IN_TEMPDB = OFF, IGNORE_DUP_KEY = OFF, STATISTICS_NORECOMPUTE = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON ) ON [PRIMARY] ` - this.logger.verbose(`${this.logServiceName}: creating table ${newTableName}`) + this.logger.verbose(`${this.logPrefix()}: creating table ${newTableName}`) await this.sqlService.modify(fullSql, []) - this.logger.verbose(`${this.logServiceName}: Creating trigger`) + this.logger.verbose(`${this.logPrefix()}: Creating trigger`) await this.sqlService.modify(triggerSql, []) - this.logger.verbose(`${this.logServiceName}: Creating IX1`) + this.logger.verbose(`${this.logPrefix()}: Creating IX1`) await this.sqlService.modify(ix1Sql, []) - this.logger.verbose(`${this.logServiceName}: Creating IX2`) + this.logger.verbose(`${this.logPrefix()}: Creating IX2`) await this.sqlService.modify(ix2Sql, []) - this.logger.info(`${this.logServiceName}: new table ${newTableName} created`) + this.logger.info(`${this.logPrefix()}: new table ${newTableName} created`) const viewSql = ` CREATE OR ALTER VIEW [mtc_results].[vewPsychometricReport] @@ -140,7 +167,7 @@ export class PsReportWriterService { SELECT * FROM [mtc_results].[${newTableName}] ` await this.sqlService.modify(viewSql, []) - this.logger.info(`${this.logServiceName}: psychometricReport view recreated`) + this.logger.info(`${this.logPrefix()}: psychometricReport view recreated`) return newTableName } @@ -151,7 +178,7 @@ export class PsReportWriterService { public async prepareForUpload (blobFile: string): Promise { const blobService = new BlobService() const containerUrl = await blobService.getContainerUrl(containerName) - this.logger.verbose(`${this.logServiceName}: container url is ${containerUrl}`) + this.logger.verbose(`${this.logPrefix()}: container url is ${containerUrl}`) const sasToken = await blobService.getBlobReadWriteSasToken(containerName, blobFile) const sql = ` IF (SELECT COUNT(*) FROM sys.database_scoped_credentials WHERE name = 'PsReportBulkUploadCredential') = 0 @@ -209,7 +236,7 @@ export class PsReportWriterService { * error in this block means we are running in Azure. */ const result = await this.sqlService.query(sql2, []) - this.logger.verbose(`${this.logServiceName} OS is ${JSON.stringify(result)}`) + this.logger.verbose(`${this.logPrefix()}: OS is ${JSON.stringify(result)}`) let sqlConfig: mssql.config if (result[0].engineEdition === 'Enterprise' || result[0].engineEdition === 'Azure SQL Edge') { @@ -255,14 +282,15 @@ export class PsReportWriterService { ROWTERMINATOR = '\r\n') ;` await mssql.query(sql) - await this.cleanup(sFilename) + // tmp + // await this.cleanup(sFilename) } public async cleanup (filename: string): Promise { - this.logger.info(`${this.logServiceName}: cleanup() called`) + this.logger.info(`${this.logPrefix()}: cleanup() called`) // Remove CSV file const blobService = new BlobService() await blobService.deleteBlob(filename, containerName) - this.logger.info(`${this.logServiceName}: csv file ${filename} deleted.`) + this.logger.info(`${this.logPrefix()}: csv file ${filename} deleted.`) } }