From 463b8850447c9aff74d278826f046bf9bf138437 Mon Sep 17 00:00:00 2001 From: Max Wang Date: Sat, 1 Feb 2025 15:34:12 -0800 Subject: [PATCH] fix grade datapuller and add index (#769) * fix grade datapuller and add index * plurarize function --- .../backend/src/modules/catalog/controller.ts | 4 +- .../modules/grade-distribution/controller.ts | 4 +- .../src/scripts/update-grade-distributions.ts | 237 ------------------ .../datapuller/src/lib/grade-distributions.ts | 56 ++++- apps/datapuller/src/lib/terms.ts | 15 +- .../src/pullers/grade-distributions.ts | 40 +-- .../common/src/models/grade-distribution.ts | 67 ++++- 7 files changed, 141 insertions(+), 282 deletions(-) delete mode 100644 apps/backend/src/scripts/update-grade-distributions.ts diff --git a/apps/backend/src/modules/catalog/controller.ts b/apps/backend/src/modules/catalog/controller.ts index c57239639..2140fcddc 100644 --- a/apps/backend/src/modules/catalog/controller.ts +++ b/apps/backend/src/modules/catalog/controller.ts @@ -7,7 +7,7 @@ import { CourseModel, CourseType, GradeDistributionModel, - GradeDistributionType, + IGradeDistributionItem, SectionModel, SectionType, TermModel, @@ -460,7 +460,7 @@ export const getCatalog = async ( return accumulator; }, - {} as Record + {} as Record ); const entries = Object.entries(reducedGradeDistributions); diff --git a/apps/backend/src/modules/grade-distribution/controller.ts b/apps/backend/src/modules/grade-distribution/controller.ts index c58766169..ec376fdef 100644 --- a/apps/backend/src/modules/grade-distribution/controller.ts +++ b/apps/backend/src/modules/grade-distribution/controller.ts @@ -1,6 +1,6 @@ import { GradeDistributionModel, - GradeDistributionType, + IGradeDistributionItem, SectionModel, TermModel, } from "@repo/common"; @@ -33,7 +33,7 @@ interface Grade { count: number; } -export const getDistribution = (distributions: GradeDistributionType[]) => { +export const getDistribution = (distributions: IGradeDistributionItem[]) => { const distribution = distributions.reduce( ( acc, diff --git a/apps/backend/src/scripts/update-grade-distributions.ts b/apps/backend/src/scripts/update-grade-distributions.ts deleted file mode 100644 index 5a4ad94ab..000000000 --- a/apps/backend/src/scripts/update-grade-distributions.ts +++ /dev/null @@ -1,237 +0,0 @@ -import { - AthenaClient, - GetQueryExecutionCommand, - StartQueryExecutionCommand, -} from "@aws-sdk/client-athena"; -import { GetObjectCommand, S3Client } from "@aws-sdk/client-s3"; -import Papa from "papaparse"; - -import { GradeDistributionModel, GradeDistributionType } from "@repo/common"; - -import mongooseLoader from "../bootstrap/loaders/mongoose"; - -interface RawDistribution { - course_id: string; - course_offer_nbr: string; - semester_year_term_cd: string; - class_number: string; - subject_cd: string; - course_number: string; - session_code: string; - class_section_cd: string; - grade_count: string; - distinct_grades: string; - grade_count_a_plus: string; - grade_count_a: string; - grade_count_a_minus: string; - grade_count_b_plus: string; - grade_count_b: string; - grade_count_b_minus: string; - grade_count_c_plus: string; - grade_count_c: string; - grade_count_c_minus: string; - grade_count_d_plus: string; - grade_count_d: string; - grade_count_d_minus: string; - grade_count_f: string; - grade_count_p: string; - grade_count_np: string; - grade_count_s: string; - grade_count_u: string; - grade_count_cr: string; - grade_count_nc: string; - grade_count_hh: string; - grade_count_h: string; - grade_count_pc: string; -} - -const formatDistribution = (distribution: RawDistribution) => { - return { - subject: distribution.subject_cd, - courseNumber: distribution.course_number, - courseOfferingNumber: parseInt(distribution.course_offer_nbr), - termId: distribution.semester_year_term_cd, - session: distribution.session_code, - classNumber: distribution.class_number, - sectionNumber: distribution.class_section_cd, - count: parseInt(distribution.grade_count), - distinct: parseInt(distribution.distinct_grades), - countAPlus: parseInt(distribution.grade_count_a_plus), - countA: parseInt(distribution.grade_count_a), - countAMinus: parseInt(distribution.grade_count_a_minus), - countBPlus: parseInt(distribution.grade_count_b_plus), - countB: parseInt(distribution.grade_count_b), - countBMinus: parseInt(distribution.grade_count_b_minus), - countCPlus: parseInt(distribution.grade_count_c_plus), - countC: parseInt(distribution.grade_count_c), - countCMinus: parseInt(distribution.grade_count_c_minus), - countDPlus: parseInt(distribution.grade_count_d_plus), - countD: parseInt(distribution.grade_count_d), - countDMinus: parseInt(distribution.grade_count_d_minus), - countF: parseInt(distribution.grade_count_f), - countP: parseInt(distribution.grade_count_p), - countNP: parseInt(distribution.grade_count_np), - countS: parseInt(distribution.grade_count_s), - countU: parseInt(distribution.grade_count_u), - countCR: parseInt(distribution.grade_count_cr), - countNC: parseInt(distribution.grade_count_nc), - countHH: parseInt(distribution.grade_count_hh), - countH: parseInt(distribution.grade_count_h), - countPC: parseInt(distribution.grade_count_pc), - }; -}; - -class StudentEnrollment { - private database = process.env.AWS_DATABASE; - private s3Output = process.env.AWS_S3_OUTPUT; - private regionName = process.env.AWS_REGION_NAME; - // private filename = process.env.AWS_FILENAME; - private workGroup = process.env.AWS_WORKGROUP; - - private query: string; - private athenaClient: AthenaClient; - private s3Client: S3Client; - - constructor(query: string) { - this.query = query; - this.athenaClient = new AthenaClient({ region: this.regionName }); - this.s3Client = new S3Client({ region: this.regionName }); - } - - // Load the config details and start query execution - async loadConf(query: string) { - try { - const command = new StartQueryExecutionCommand({ - QueryString: query, - QueryExecutionContext: { - Database: this.database, - }, - ResultConfiguration: { - OutputLocation: this.s3Output, - }, - WorkGroup: this.workGroup, - }); - - const response = await this.athenaClient.send(command); - - console.log("Execution ID: " + response.QueryExecutionId); - - return response.QueryExecutionId; - } catch (error) { - console.error(error); - - return; - } - } - - // Run the Athena query and wait for the result - async runQuery() { - const queries = [this.query]; - - for (const query of queries) { - const queryExecutionId = await this.loadConf(query); - if (!queryExecutionId) return; - - let queryStatus: string | undefined = undefined; - - try { - // Check the status of the query execution - while ( - queryStatus === "QUEUED" || - queryStatus === "RUNNING" || - !queryStatus - ) { - const command = new GetQueryExecutionCommand({ - QueryExecutionId: queryExecutionId, - }); - - const execution = await this.athenaClient.send(command); - - queryStatus = execution.QueryExecution?.Status?.State; - console.log(queryStatus); - - if (queryStatus === "FAILED" || queryStatus === "CANCELLED") { - throw new Error( - `Athena query "${this.query}" failed or was cancelled: ${execution.QueryExecution?.Status?.StateChangeReason}` - ); - } - - // Sleep for 10 seconds - await new Promise((resolve) => setTimeout(resolve, 10000)); - } - - console.log(`Query "${this.query}" finished.`); - - // Download the result from S3 - await this.downloadDataFile(queryExecutionId); - } catch (err) { - console.error(err); - } - } - } - - // Download the result file from S3 - async downloadDataFile(queryId: string) { - try { - const parsedS3Output = new URL(this.s3Output as string); - - const bucket = parsedS3Output.host || ""; - - const path = parsedS3Output.pathname - ? parsedS3Output.pathname.replace(/^\//, "") - : ""; - - const objectKey = `${path}${queryId}.csv`; - - const command = new GetObjectCommand({ - Bucket: bucket, - Key: objectKey, - }); - - const response = await this.s3Client.send(command); - - const body = await response.Body?.transformToString(); - - if (!body) { - throw new Error("No data received from S3"); - } - - const parsedData = Papa.parse(body, { - header: true, - skipEmptyLines: true, - }); - - const formattedData = parsedData.data.map( - (row) => formatDistribution(row) as GradeDistributionType - ); - - const bulkOperations = formattedData.map((data) => ({ - updateOne: { - filter: { classNumber: data.classNumber }, - update: { $set: data }, - upsert: true, - }, - })); - - await GradeDistributionModel.bulkWrite(bulkOperations); - - console.log( - `Inserted ${formattedData.length} records into GradeDistribution` - ); - } catch (err) { - console.error(err); - } - } -} - -const main = async () => { - await mongooseLoader(); - - const enrollment = new StudentEnrollment( - `SELECT course_id, course_offer_nbr, semester_year_term_cd, class_number, subject_cd, course_number, session_code, class_section_cd, grade_count, distinct_grades, grade_count_a_plus, grade_count_a, grade_count_a_minus, grade_count_b_plus, grade_count_b, grade_count_b_minus, grade_count_c_plus, grade_count_c, grade_count_c_minus, grade_count_d_plus, grade_count_d, grade_count_d_minus, grade_count_f, grade_count_p, grade_count_np, grade_count_s, grade_count_u, grade_count_cr, grade_count_nc, grade_count_hh, grade_count_h, grade_count_pc FROM "lf_cs_curated"."student_grade_distribution_data" WHERE semester_year_term_cd = '2242'` - ); - - await enrollment.runQuery(); -}; - -main().catch(console.error); diff --git a/apps/datapuller/src/lib/grade-distributions.ts b/apps/datapuller/src/lib/grade-distributions.ts index 0722894ab..54d032360 100644 --- a/apps/datapuller/src/lib/grade-distributions.ts +++ b/apps/datapuller/src/lib/grade-distributions.ts @@ -1,3 +1,5 @@ +import { IGradeDistributionItem } from "@repo/common"; + import { QueryExecutor } from "./api/aws-athena"; export interface GradeDistributionRow { @@ -37,16 +39,20 @@ export interface GradeDistributionRow { export const formatDistribution = (distribution: GradeDistributionRow) => { // TODO: Pivot the data - return { + const output: IGradeDistributionItem = { + courseId: distribution.course_id, subject: distribution.subject_cd, courseNumber: distribution.course_number, courseOfferingNumber: parseInt(distribution.course_offer_nbr), termId: distribution.semester_year_term_cd, - session: distribution.session_code, + sessionId: distribution.session_code, + classNumber: distribution.class_number, sectionNumber: distribution.class_section_cd, + count: parseInt(distribution.grade_count), distinct: parseInt(distribution.distinct_grades), + countAPlus: parseInt(distribution.grade_count_a_plus), countA: parseInt(distribution.grade_count_a), countAMinus: parseInt(distribution.grade_count_a_minus), @@ -70,19 +76,54 @@ export const formatDistribution = (distribution: GradeDistributionRow) => { countH: parseInt(distribution.grade_count_h), countPC: parseInt(distribution.grade_count_pc), }; + + return output; }; /** * Get grade distribution rows for a specific term */ -export const getGradeDistributionDataByTerm = async ( +export const getGradeDistributionDataByTerms = async ( database: string, s3Output: string, regionName: string, workGroup: string, - termId: string + termIds: string[] ) => { - const query = `SELECT course_id, course_offer_nbr, semester_year_term_cd, class_number, subject_cd, course_number, session_code, class_section_cd, grade_count, distinct_grades, grade_count_a_plus, grade_count_a, grade_count_a_minus, grade_count_b_plus, grade_count_b, grade_count_b_minus, grade_count_c_plus, grade_count_c, grade_count_c_minus, grade_count_d_plus, grade_count_d, grade_count_d_minus, grade_count_f, grade_count_p, grade_count_np, grade_count_s, grade_count_u, grade_count_cr, grade_count_nc, grade_count_hh, grade_count_h, grade_count_pc FROM "lf_cs_curated"."student_grade_distribution_data" WHERE semester_year_term_cd = '${termId}'`; + const query = `SELECT + course_id, + course_offer_nbr, + semester_year_term_cd, + class_number, + subject_cd, + course_number, + session_code, + class_section_cd, + grade_count, + distinct_grades, + grade_count_a_plus, + grade_count_a, + grade_count_a_minus, + grade_count_b_plus, + grade_count_b, + grade_count_b_minus, + grade_count_c_plus, + grade_count_c, + grade_count_c_minus, + grade_count_d_plus, + grade_count_d, + grade_count_d_minus, + grade_count_f, + grade_count_p, + grade_count_np, + grade_count_s, + grade_count_u, + grade_count_cr, + grade_count_nc, + grade_count_hh, + grade_count_h, + grade_count_pc + FROM "lf_cs_curated"."student_grade_distribution_data" WHERE semester_year_term_cd IN ('${termIds.join("', '")}')`; const enrollment = new QueryExecutor( database, @@ -94,5 +135,8 @@ export const getGradeDistributionDataByTerm = async ( const data = await enrollment.execute(); - return data; + if (!data) { + return []; + } + return data.map(formatDistribution); }; diff --git a/apps/datapuller/src/lib/terms.ts b/apps/datapuller/src/lib/terms.ts index 7b46d3bb1..554f8b296 100644 --- a/apps/datapuller/src/lib/terms.ts +++ b/apps/datapuller/src/lib/terms.ts @@ -143,21 +143,22 @@ export const getTerms = async ( }; /** - * Fetch the current term denoted by the "Current" temporal position. + * Fetch all previous terms denoted by the "Previous" temporal position. */ -export const getCurrentTerm = async ( +export const getPreviousTerms = async ( logger: Logger, id: string, key: string ) => { const termsAPI = new TermsAPI(); - logger.info(`Fetching current term...`); + logger.info(`Fetching previous terms...`); try { const response = await termsAPI.v2.getByTermsUsingGet( { - "temporal-position": "Current", + "temporal-position": "Previous", + "as-of-date": new Date().toISOString().split("T")[0], // format as yyyy-mm-dd }, { headers: { @@ -167,12 +168,12 @@ export const getCurrentTerm = async ( } ); - const currentTerm = response.data.response.terms; - if (!currentTerm) throw new Error("No current term found"); + const previousTerms = response.data.response.terms; + if (!previousTerms) throw new Error("No previous terms found"); logger.info(`Fetched current term...`); - return currentTerm[0]; + return previousTerms; } catch (error: unknown) { const parsedError = error as Error; diff --git a/apps/datapuller/src/pullers/grade-distributions.ts b/apps/datapuller/src/pullers/grade-distributions.ts index a371bea62..14232e3c4 100644 --- a/apps/datapuller/src/pullers/grade-distributions.ts +++ b/apps/datapuller/src/pullers/grade-distributions.ts @@ -1,40 +1,50 @@ import { GradeDistributionModel } from "@repo/common"; -import { getGradeDistributionDataByTerm } from "../lib/grade-distributions"; -import { getCurrentTerm } from "../lib/terms"; +import { getGradeDistributionDataByTerms } from "../lib/grade-distributions"; +import { getPreviousTerms } from "../lib/terms"; import { Config } from "../shared/config"; -// TODO: Transaction const updateGradeDistributions = async ({ aws: { DATABASE, S3_OUTPUT, REGION_NAME, WORKGROUP }, sis: { TERM_APP_ID, TERM_APP_KEY }, log, }: Config) => { - log.info("Fetching current term"); + log.info("Fetching previous term"); - // Get current term - const currentTerm = await getCurrentTerm(log, TERM_APP_ID, TERM_APP_KEY); + // Get previous term + const previousTerms = await getPreviousTerms(log, TERM_APP_ID, TERM_APP_KEY); - // TODO: Error for no current term - if (!currentTerm) return; + if (!previousTerms) { + log.error("No previous term found, skipping update"); + return; + } - log.info("Querying grade distributions for current term"); + log.info( + `Querying grade distributions for previous terms: ${previousTerms.map((term) => term.name + " " + term.academicCareer?.description).join(", ")}` + ); + const previousTermIds = previousTerms.map((term) => term.id!); - // Query grade distributions for current term - const gradeDistributions = await getGradeDistributionDataByTerm( + const gradeDistributions = await getGradeDistributionDataByTerms( DATABASE, S3_OUTPUT, REGION_NAME, WORKGROUP, - currentTerm.id as string + previousTermIds ); // TODO: Error for no grade distributions - if (!gradeDistributions) return; + if (!gradeDistributions) { + log.error("No grade distributions found, skipping update"); + return; + } + + log.info( + `Fetched ${gradeDistributions.length.toLocaleString()} grade distributions.` + ); - // Delete existing grade distributions for current term + // Delete existing grade distributions for previous terms await GradeDistributionModel.deleteMany({ - termId: currentTerm.id, + termId: { $in: previousTermIds }, }); // Insert grade distributions in batches of 5000 diff --git a/packages/common/src/models/grade-distribution.ts b/packages/common/src/models/grade-distribution.ts index 3f37abf63..c05f2f7db 100644 --- a/packages/common/src/models/grade-distribution.ts +++ b/packages/common/src/models/grade-distribution.ts @@ -1,21 +1,63 @@ -import mongoose, { InferSchemaType, Schema } from "mongoose"; +import mongoose, { Document, Schema } from "mongoose"; -const gradeDistributionSchema = new Schema( +export interface IGradeDistributionItem { + // Identifiers + courseId: string; + subject: string; + courseNumber: string; + courseOfferingNumber: number; + termId: string; + sessionId: string; + + // TODO: CCN? + classNumber: string; + sectionNumber: string; + + // Grade distribution + count: number; + distinct: number; + + countAPlus: number; + countA: number; + countAMinus: number; + countBPlus: number; + countB: number; + countBMinus: number; + countCPlus: number; + countC: number; + countCMinus: number; + countDPlus: number; + countD: number; + countDMinus: number; + countF: number; + countP: number; + countNP: number; + countS: number; + countU: number; + countCR: number; + countNC: number; + countHH: number; + countH: number; + countPC: number; +} + +export interface IGradeDistributionItemDocument + extends IGradeDistributionItem, + Document {} + +const gradeDistributionSchema = new Schema( { - // Identifiers courseId: { type: String, required: true }, subject: { type: String, required: true }, courseNumber: { type: String, required: true }, courseOfferingNumber: { type: Number, required: true }, termId: { type: String, required: true }, - session: { type: String, required: true }, + sessionId: { type: String, required: true }, - // TODO: CCN? classNumber: { type: String, required: true }, sectionNumber: { type: String, required: true }, - // Grade distribution count: { type: Number, required: true }, distinct: { type: Number, required: true }, @@ -46,13 +88,12 @@ const gradeDistributionSchema = new Schema( timestamps: true, } ); +gradeDistributionSchema.index( + { termId: 1, classNumber: 1, sectionNumber: 1 }, + { unique: true } +); export const GradeDistributionModel = mongoose.model( - "gradeDistribution", - gradeDistributionSchema, - "grade-distributions" + "GradeDistributions", + gradeDistributionSchema ); - -export type GradeDistributionType = InferSchemaType< - typeof gradeDistributionSchema ->;