Skip to content

Commit

Permalink
Merge pull request #89 from MinaFoundation/feature/ocv-vote-async-worker
Browse files Browse the repository at this point in the history
Feature/ocv vote async worker
  • Loading branch information
iluxonchik authored Dec 19, 2024
2 parents e72b1ff + 04ecf9e commit 6ea1e0d
Show file tree
Hide file tree
Showing 12 changed files with 1,578 additions and 836 deletions.
1,767 changes: 968 additions & 799 deletions package-lock.json

Large diffs are not rendered by default.

14 changes: 9 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
{
"name": "pgt-web-app",
"version": "0.1.19",
"version": "0.1.20",
"private": true,
"type": "module",
"scripts": {
"dev": "npm run build:workers && next dev",
"dev": "npm run build:workers && (next dev & npm run bree:cron:dev & wait)",
"dev:https": "NODE_TLS_REJECT_UNAUTHORIZED=0 next dev --experimental-https --experimental-https-key ./localhost-key.pem --experimental-https-cert ./localhost.pem",
"build": "next build && npm run internal:copy-public && npm run build:workers",
"internal:copy-public": "cp -r public .next/standalone/",
"build": "next build && npm run internal:copy-public && npm run build:workers && npm run internal:copy-dist",
"build:workers": "npx tsx src/scripts/build-workers.ts",
"start": "npx prisma migrate deploy && next start",
"start:prod": "prisma migrate deploy && node server.js",
"start:prod": "prisma migrate deploy && (node server.js & npm run bree:cron:prod & wait)",
"bree:cron:dev": "npx tsx src/scripts/bree-runner.ts",
"bree:cron:prod": "node dist/scripts/bree-runner.js",
"internal:copy-public": "cp -r public .next/standalone/",
"internal:copy-dist": "cp -r dist .next/standalone/",
"lint": "next lint",
"lint:full": "next lint --max-warnings=0",
"lint:types": "tsc --noEmit",
"lint:all": "npm run lint:full && npm run lint:types",
"admin:status": "npx tsx src/scripts/toggle-admin-status.ts"
},
"dependencies": {
"@ladjs/graceful": "^4.1.0",
"@prisma/client": "^5.22.0",
"@radix-ui/react-accordion": "^1.2.1",
"@radix-ui/react-alert-dialog": "^1.1.2",
Expand Down
38 changes: 38 additions & 0 deletions prisma/migrations/20241213201554_add_ocv_votes/migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
-- CreateEnum
CREATE TYPE "WorkerStatus" AS ENUM ('RUNNING', 'COMPLETED', 'FAILED', 'NOT_STARTED');

-- AlterTable
ALTER TABLE "User" ADD COLUMN "oCVConsiderationVoteId" INTEGER;

-- CreateTable
CREATE TABLE "OCVConsiderationVote" (
"id" SERIAL NOT NULL,
"proposalId" INTEGER NOT NULL,
"voteData" JSONB NOT NULL,
"updatedAt" TIMESTAMP(3) NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,

CONSTRAINT "OCVConsiderationVote_pkey" PRIMARY KEY ("id")
);

-- CreateTable
CREATE TABLE "WorkerHeartbeat" (
"jobId" TEXT NOT NULL,
"lastHeartbeat" TIMESTAMP(3) NOT NULL,
"status" "WorkerStatus" NOT NULL DEFAULT 'RUNNING',
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,

CONSTRAINT "WorkerHeartbeat_pkey" PRIMARY KEY ("jobId")
);

-- CreateIndex
CREATE UNIQUE INDEX "OCVConsiderationVote_proposalId_key" ON "OCVConsiderationVote"("proposalId");

-- CreateIndex
CREATE INDEX "OCVConsiderationVote_proposalId_idx" ON "OCVConsiderationVote"("proposalId");

-- AddForeignKey
ALTER TABLE "User" ADD CONSTRAINT "User_oCVConsiderationVoteId_fkey" FOREIGN KEY ("oCVConsiderationVoteId") REFERENCES "OCVConsiderationVote"("id") ON DELETE SET NULL ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "OCVConsiderationVote" ADD CONSTRAINT "OCVConsiderationVote_proposalId_fkey" FOREIGN KEY ("proposalId") REFERENCES "Proposal"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
Warnings:
- The primary key for the `WorkerHeartbeat` table will be changed. If it partially fails, the table could be left without primary key constraint.
- You are about to drop the column `jobId` on the `WorkerHeartbeat` table. All the data in the column will be lost.
- The required column `id` was added to the `WorkerHeartbeat` table with a prisma-level default value. This is not possible if the table is not empty. Please add this column as optional, then populate it before making it required.
- Added the required column `name` to the `WorkerHeartbeat` table without a default value. This is not possible if the table is not empty.
*/
-- AlterTable
ALTER TABLE "WorkerHeartbeat" DROP CONSTRAINT "WorkerHeartbeat_pkey",
DROP COLUMN "jobId",
ADD COLUMN "id" UUID NOT NULL,
ADD COLUMN "name" VARCHAR(100) NOT NULL,
ADD CONSTRAINT "WorkerHeartbeat_pkey" PRIMARY KEY ("id");

-- CreateIndex
CREATE INDEX "WorkerHeartbeat_name_status_idx" ON "WorkerHeartbeat"("name", "status");
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "WorkerHeartbeat" ADD COLUMN "metadata" JSONB;
33 changes: 33 additions & 0 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ enum FundingRoundStatus {
CANCELLED
}

enum WorkerStatus {
RUNNING
COMPLETED
FAILED
NOT_STARTED
}

model User {
id String @id @db.Uuid // UUID v5 derived from authSource
linkId String @db.Uuid // UUID v4 for account linking
Expand All @@ -35,6 +42,8 @@ model User {
considerationVotes ConsiderationVote[]
deliberationCommunityVotes CommunityDeliberationVote[]
deliberationReviewerVotes ReviewerDeliberationVote[]
ocvConsiderationVote OCVConsiderationVote? @relation(fields: [oCVConsiderationVoteId], references: [id])
oCVConsiderationVoteId Int?
@@index([id])
@@index([linkId])
Expand Down Expand Up @@ -64,6 +73,7 @@ model Proposal {
considerationVotes ConsiderationVote[]
deliberationCommunityVotes CommunityDeliberationVote[]
deliberationReviewerVotes ReviewerDeliberationVote[]
OCVConsiderationVote OCVConsiderationVote?
// Indexes
@@index([userId])
Expand Down Expand Up @@ -277,3 +287,26 @@ model ReviewerDeliberationVote {
@@unique([proposalId, userId])
@@index([userId])
}

model OCVConsiderationVote {
id Int @id @default(autoincrement())
proposalId Int @unique
proposal Proposal @relation(fields: [proposalId], references: [id])
voteData Json
updatedAt DateTime @updatedAt
createdAt DateTime @default(now())
User User[]
@@index([proposalId])
}

model WorkerHeartbeat {
id String @id @default(uuid()) @db.Uuid
name String @db.VarChar(100) // For identifying job type
lastHeartbeat DateTime @updatedAt
status WorkerStatus @default(RUNNING)
metadata Json? // Optional JSON column for storing job-specific data
createdAt DateTime @default(now())
@@index([name, status])
}
31 changes: 31 additions & 0 deletions src/scripts/bree-runner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import Bree from 'bree';
import Graceful from '@ladjs/graceful';
import logger from '@/logging';
import path from 'path';

const bree = new Bree({
root: path.join(process.cwd(), 'dist', 'tasks'),
jobs: [
{
name: 'ocv-vote-counting',
path: path.join(process.cwd(), 'dist', 'tasks', 'ocv-vote-counting.js'),
interval: '10m', // run every 10 minutes
timeout: 0, // start immediatly when this script is run
closeWorkerAfterMs: 9 * 60 * 1000 // Kill after 9 minutes if stuck
}
],
errorHandler: (error, workerMetadata) => {
logger.error(`[Bree Runner] Worker ${workerMetadata.name} encountered an error:`, error);
},
workerMessageHandler: (name, message) => {
logger.error(`[Bree Runner] Message from worker ${name}`);
}
});

const graceful = new Graceful({ brees: [bree] });
graceful.listen();

(async () => {
await bree.start();
logger.info('Bree started successfully');
})();
6 changes: 3 additions & 3 deletions src/scripts/build-workers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ import * as esbuild from 'esbuild'
async function buildWorkers() {
try {
await esbuild.build({
entryPoints: ['src/tasks/*.ts'],
entryPoints: ['src/tasks/*.ts', 'src/scripts/bree-runner.ts'],
bundle: true,
platform: 'node',
target: 'node20',
outdir: 'dist/tasks',
outdir: 'dist/',
format: 'esm',
banner: {
js: `
Expand All @@ -57,4 +57,4 @@ async function buildWorkers() {
}
}

buildWorkers()
buildWorkers()
111 changes: 88 additions & 23 deletions src/services/ConsiderationVotingService.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// src/services/ConsiderationVotingService.ts

import { PrismaClient, ConsiderationDecision, ProposalStatus, ConsiderationVote } from "@prisma/client";
import { PrismaClient, ConsiderationDecision, ProposalStatus, ConsiderationVote, Prisma } from "@prisma/client";
import { ProposalStatusMoveService } from "./ProposalStatusMoveService";
import { FundingRoundService } from "./FundingRoundService";
import logger from "@/logging";
Expand Down Expand Up @@ -56,7 +56,7 @@ const voteIncludeQuery = {
metadata: true
}
}
} as const;
} satisfies Prisma.ConsiderationVoteInclude;

export class ConsiderationVotingService {
private statusMoveService: ProposalStatusMoveService;
Expand Down Expand Up @@ -86,24 +86,48 @@ export class ConsiderationVotingService {
): Promise<VoteEligibility> {
const fundingRound = await this.fundingRoundService.getFundingRoundById(fundingRoundId);

if (!fundingRound || fundingRound === null) {
if (!fundingRound) {
return { eligible: false, message: "Funding round not found" };
}

// Ensure all required phases exist
if (!fundingRound.submissionPhase ||
!fundingRound.considerationPhase ||
!fundingRound.deliberationPhase ||
!fundingRound.votingPhase) {
// Ensure all required phases exist and have proper dates
if (!fundingRound.submissionPhase?.startDate ||
!fundingRound.submissionPhase?.endDate ||
!fundingRound.considerationPhase?.startDate ||
!fundingRound.considerationPhase?.endDate ||
!fundingRound.deliberationPhase?.startDate ||
!fundingRound.deliberationPhase?.endDate ||
!fundingRound.votingPhase?.startDate ||
!fundingRound.votingPhase?.endDate) {
return {
eligible: false,
message: "Funding round is not properly configured"
};
}

// Now we can safely pass the funding round with its phases
const currentPhase = this.fundingRoundService.getCurrentPhase({
...fundingRound,
submissionPhase: {
startDate: fundingRound.submissionPhase.startDate,
endDate: fundingRound.submissionPhase.endDate
},
considerationPhase: {
startDate: fundingRound.considerationPhase.startDate,
endDate: fundingRound.considerationPhase.endDate
},
deliberationPhase: {
startDate: fundingRound.deliberationPhase.startDate,
endDate: fundingRound.deliberationPhase.endDate
},
votingPhase: {
startDate: fundingRound.votingPhase.startDate,
endDate: fundingRound.votingPhase.endDate
}
});

// Check current phase
const currentPhase = this.fundingRoundService.getCurrentPhase(fundingRound);
if (currentPhase !== 'consideration') {
if (currentPhase.toUpperCase() !== ProposalStatus.CONSIDERATION) {
return {
eligible: false,
message: "Voting is only allowed during the consideration phase"
Expand All @@ -120,8 +144,8 @@ export class ConsiderationVotingService {
}

// Allow voting if proposal is in CONSIDERATION or DELIBERATION status
if (proposal.status !== ProposalStatus.CONSIDERATION &&
proposal.status !== ProposalStatus.DELIBERATION) {
if (proposal.status.toUpperCase() !== ProposalStatus.CONSIDERATION &&
proposal.status.toUpperCase() !== ProposalStatus.DELIBERATION) {
return {
eligible: false,
message: "Proposal is not eligible for consideration votes"
Expand Down Expand Up @@ -170,20 +194,37 @@ export class ConsiderationVotingService {
proposalId: number,
voterId: string
): Promise<VoteQueryResult | null> {
return this.prisma.considerationVote.findUnique({
const vote = await this.prisma.considerationVote.findUnique({
where: {
proposalId_voterId: { proposalId, voterId }
},
include: voteIncludeQuery
});

if (!vote) return null;

return {
...vote,
proposal: {
...vote.proposal,
user: {
metadata: vote.proposal.user.metadata as Prisma.JsonValue as UserMetadata
}
},
voter: {
metadata: vote.voter.metadata as Prisma.JsonValue as UserMetadata
}
};
}

private async createOrUpdateVote(
input: VoteInput,
existingVote: VoteQueryResult | null
): Promise<VoteQueryResult> {
let returnValue;

if (existingVote) {
return this.prisma.considerationVote.update({
returnValue = await this.prisma.considerationVote.update({
where: {
proposalId_voterId: {
proposalId: input.proposalId,
Expand All @@ -196,17 +237,30 @@ export class ConsiderationVotingService {
},
include: voteIncludeQuery
});
} else {
returnValue = await this.prisma.considerationVote.create({
data: {
proposalId: input.proposalId,
voterId: input.voterId,
decision: input.decision,
feedback: input.feedback,
},
include: voteIncludeQuery
});
}

return this.prisma.considerationVote.create({
data: {
proposalId: input.proposalId,
voterId: input.voterId,
decision: input.decision,
feedback: input.feedback,
return {
...returnValue,
proposal: {
...returnValue.proposal,
user: {
metadata: returnValue.proposal.user.metadata as Prisma.JsonValue as UserMetadata
}
},
include: voteIncludeQuery
});
voter: {
metadata: returnValue.voter.metadata as Prisma.JsonValue as UserMetadata
}
};
}

private async refreshVoteData(
Expand All @@ -224,6 +278,17 @@ export class ConsiderationVotingService {
`Vote refreshed for proposal ${proposalId}. Current status: ${vote.proposal.status}`
);

return vote;
return {
...vote,
proposal: {
...vote.proposal,
user: {
metadata: vote.proposal.user.metadata as UserMetadata
}
},
voter: {
metadata: vote.voter.metadata as UserMetadata
}
};
}
}
Loading

0 comments on commit 6ea1e0d

Please sign in to comment.