Skip to content

Commit

Permalink
Background workers and BullMQ (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
genzyy committed Nov 12, 2023
1 parent e7d4f40 commit 066b01d
Show file tree
Hide file tree
Showing 22 changed files with 731 additions and 36 deletions.
25 changes: 25 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Port number
PORT=3000

# Environment
ENVIRONMENT=development

# Postgres URL
DATABASE_URL=postgresql://user:[email protected]:5432/my-db

# Redis URL (for caching)
REDIS_URL=redis://user:[email protected]:6379

# Redis URL (for queue)
QUEUE_REDIS_HOST=127.0.0.1
QUEUE_REDIS_PORT=6380

# JWT
JWT_SECRET=<secret>
JWT_ACCESS_EXPIRATION_MINUTES=30
JWT_REFRESH_EXPIRATION_DAYS=6
JWT_RESET_PASSWORD_EXPIRATION_MINUTES=30
JWT_VERIFY_EMAIL_EXPIRATION_MINUTES=30

# Logging
SHOW_SQL_QUERIES=false
20 changes: 20 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,23 @@ services:
- '5432:5432'
volumes:
- ./pgdata:/var/lib/postgresql/data

redis:
image: redis:7.2.3
ports:
- '6379:6379'
volumes:
- redis:/data

redis-queue:
image: redis:7.2.3
ports:
- '6380:6379'
expose:
- '6380'
volumes:
- redis:/data

volumes:
redis:
redis-queue:
9 changes: 7 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"name": "express-prisma-boilerplate",
"version": "1.0.0",
"main": "index.ts",
"type": "module",
"license": "MIT",
"author": "Rishit",
"scripts": {
Expand All @@ -20,23 +19,28 @@
"build": "rimraf build && tsc -p tsconfig.json"
},
"dependencies": {
"@aws-sdk/types": "^3.449.0",
"@prisma/client": "^5.3.1",
"@sentry/integrations": "^7.79.0",
"@sentry/node": "^7.79.0",
"@types/bcryptjs": "^2.4.4",
"@types/nodemailer": "^6.4.11",
"aws-sdk": "^2.1494.0",
"bcryptjs": "^2.4.3",
"bullmq": "^4.13.2",
"compression": "^1.7.4",
"cors": "^2.8.5",
"dotenv": "^16.3.1",
"express": "^4.18.2",
"express-rate-limit": "^7.0.2",
"helmet": "^7.0.0",
"http-status": "^1.7.0",
"ioredis": "^5.3.2",
"joi": "^17.10.2",
"moment": "^2.29.4",
"morgan": "^1.10.0",
"nodemailer": "^6.9.5",
"node-cron": "^3.0.3",
"nodemailer": "^6.9.7",
"passport": "^0.6.0",
"passport-jwt": "^4.0.1",
"pm2": "^5.3.0",
Expand All @@ -53,6 +57,7 @@
"@types/jest": "^29.5.5",
"@types/morgan": "^1.9.6",
"@types/node": "^20.7.1",
"@types/node-cron": "^3.0.11",
"@types/passport": "^1.0.13",
"@types/passport-jwt": "^3.0.10",
"@types/supertest": "^2.0.13",
Expand Down
8 changes: 8 additions & 0 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ datasource db {
url = env("DATABASE_URL")
}

model ApiMetadata {
id Int @id @default(autoincrement())
key String @unique
value String
created DateTime @default(now())
updated DateTime @updatedAt
}

model User {
id Int @id @default(autoincrement())
uuid String @default(uuid()) @db.Uuid
Expand Down
9 changes: 7 additions & 2 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import morgan from './core/morgan';
import routes from './routes/v1';
import JwtStrategy from './core/passport';
import { NotFound } from './utils/ApiError';
import { StartAllJobs } from './jobs';
import { errorConverter, errorHandler } from './core/middlewares/error';
import { ValidateApiMetadata } from './core/middlewares/apiStatus';

const app = express();

Expand All @@ -28,13 +30,16 @@ app.use(compression());
app.use(passport.initialize());
passport.use('jwt', JwtStrategy);

if (config.environment != 'development') StartAllJobs();

app.use('/v1', routes);

app.use((req, res, next) => {
next(new NotFound());
});

app.use(errorConverter);
app.use(errorHandler);

app.use('/v1', routes);
app.use(ValidateApiMetadata);

export default app;
13 changes: 8 additions & 5 deletions src/controllers/auth.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Request, Response } from 'express';
import userRepository from '../repositories/user';
import catchAsync from '../utils/catchAsync';
import JwtService from '../services/jwt.service';
import EmailService from '../services/email.service';
import { NotFound, Unauthorized } from '../utils/ApiError';
import exclude from '../utils/exclude';
import { UserReturn } from '../types/user';
Expand All @@ -12,12 +13,14 @@ const register = catchAsync(async (req: Request, res: Response) => {
const user = await userRepository.createUser(username, email, password);
const tokens = JwtService.generateAuthTokenForUser(user.id);

await EmailService.sendOnboardingEmail(user.email, user.name ?? '');

return res.status(CREATED).send({ user: exclude(user, ['id', 'password', 'signedOut']), tokens });
});

const login = catchAsync(async (req: Request, res: Response) => {
const { email, password } = req.body;
const accessToken = getUserAccessToken(req);
const accessToken = await getUserAccessToken(req);
const userId = JwtService.verifyToken(accessToken);
const user = await userRepository.getUserByEmailAndId(email, userId);

Expand All @@ -30,14 +33,14 @@ const login = catchAsync(async (req: Request, res: Response) => {
return res.send(OK).send({ user: exclude(user, ['password', 'signedOut']) });
});

const getMe = async (req: Request, res: Response) => {
const accessToken = getUserAccessToken(req);
const getMe = catchAsync(async (req: Request, res: Response) => {
const accessToken = await getUserAccessToken(req);
const userId = JwtService.verifyToken(accessToken);
const dbUser = await userRepository.getUser(userId, UserReturn);
return res.status(OK).send({ ...dbUser });
};
});

const getUserAccessToken = (req: Request): string => {
const getUserAccessToken = async (req: Request): Promise<string> => {
const { authorization } = req.headers;
const values = authorization?.split(' ');
if (values && values.length >= 2) return values[1];
Expand Down
22 changes: 22 additions & 0 deletions src/core/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ const envVarsSchema = Joi.object()
SHOW_SQL_QUERIES: Joi.bool().default(false),
SENTRY_DSN: Joi.string().default(''),
SENTRY_ENVIRONMENT: Joi.string().default('development'),
COMPANY_SENDER_EMAIL: Joi.string().email().default('[email protected]'),
QUEUE_REDIS_HOST: Joi.string().default(''),
QUEUE_REDIS_PORT: Joi.number().default(6379),
QUEUE_CONCURRENCY: Joi.number().default(1),
AWS_REGION: Joi.string().default(''),
AWS_SES_API_VERSION: Joi.string().default(''),
API_STATUS: Joi.string().default(''),
})
.unknown();

Expand All @@ -32,6 +39,9 @@ export default {
environment: envVars.ENVIRONMENT,
port: envVars.PORT,
redisUrl: envVars.REDIS_URL,
api: {
status: envVars.API_STATUS,
},
logging: {
showSqlQueries: envVars.SHOW_SQL_QUERIES,
},
Expand All @@ -46,4 +56,16 @@ export default {
dsn: envVars.SENTRY_DSN,
environment: envVars.SENTRY_ENVIRONMENT,
},
emailService: {
companySenderEmail: envVars.COMPANY_SENDER_EMAIL,
queue: {
redisHost: envVars.QUEUE_REDIS_HOST,
redisPort: envVars.QUEUE_REDIS_PORT,
concurrency: envVars.QUEUE_CONCURRENCY,
},
},
aws: {
region: envVars.AWS_REGION,
ses_api_version: envVars.AWS_SES_API_VERSION,
},
};
14 changes: 14 additions & 0 deletions src/core/middlewares/apiStatus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Request, Response, NextFunction } from 'express';
import config from '../config';
import { ApiStatus } from '../../types/apiMetadata';
import { ApiUnavailable } from '../../utils/ApiError';

export const ValidateApiMetadata = async (req: Request, res: Response, next: NextFunction) => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
(async () => {
if (config.api.status === ApiStatus.Maintenance) throw new ApiUnavailable();
})()
.then(() => next())
// eslint-disable-next-line @typescript-eslint/no-unused-vars
.catch((error) => next(new ApiUnavailable()));
};
7 changes: 3 additions & 4 deletions src/core/middlewares/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ import ApiError from '../../utils/ApiError';

export const errorConverter: ErrorRequestHandler = (err, req, res, next) => {
let error = undefined;
const { statusCode: errStatusCode, message: errMessage, stack } = err;
const { statusCode: errStatusCode, message: errMessage } = err;
if (!(err instanceof ApiError)) {
const statusCode =
errStatusCode || err instanceof Prisma.PrismaClientKnownRequestError
? httpStatus.BAD_REQUEST
: httpStatus.SERVICE_UNAVAILABLE;
const message = errMessage || httpStatus[statusCode];
error = new ApiError(statusCode, message, false, stack);
error = new ApiError(statusCode, message, false);
}
next(error);
};

// eslint-disable-next-line no-unused-vars, @typescript-eslint/no-unused-vars
export const errorHandler: ErrorRequestHandler = (err, req, res, next) => {
let { statusCode, message } = err;
const { isOperational, stack } = err;
const { isOperational } = err;
if (config.environment === 'production' && !isOperational) {
statusCode = httpStatus.INTERNAL_SERVER_ERROR;
message = httpStatus[statusCode];
Expand All @@ -33,7 +33,6 @@ export const errorHandler: ErrorRequestHandler = (err, req, res, next) => {
const response = {
code: statusCode,
message,
...(config.environment === 'development' && { stack }),
};

if (config.environment === 'development') {
Expand Down
23 changes: 23 additions & 0 deletions src/jobs/apiMaintenanace.job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import * as cron from 'node-cron';
import apiMetadataRepository from '../repositories/apiMetadata';
import logger from '../core/logger';
import config from '../core/config';
import { ApiStatus } from '../types/apiMetadata';

export const ApiMaintenanceJob = cron.schedule(
'*/10 * * * * *',
async () => {
let apiStatus = await apiMetadataRepository.getApiMetadata('status');
if (!apiStatus) {
logger.warn('Api status is null, adding status immediately.');
apiStatus = await apiMetadataRepository.createApiMetadata('status', ApiStatus.Maintenance);
}
config.api.status = apiStatus.value;
if (config.api.status === ApiStatus.Live) logger.info('Api is live.');
else if (config.api.status === ApiStatus.Maintenance)
logger.warn('Api is in maintenance mode.');
},
{
scheduled: false,
},
);
11 changes: 11 additions & 0 deletions src/jobs/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import logger from '../core/logger';
import { ApiMaintenanceJob } from './apiMaintenanace.job';

export const StartAllJobs = () => {
ApiMaintenanceJob.start();
logger.info('API maintenance job started...');
};

export const StopAllJobs = () => {
ApiMaintenanceJob.stop();
};
15 changes: 15 additions & 0 deletions src/queues/email/email.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { Email } from '../../types/email';
import { Job } from 'bullmq';
import { SES } from 'aws-sdk';
import nodemailer from 'nodemailer';

import config from '../../core/config';

const emailTransporter = nodemailer.createTransport({
SES: new SES({
region: config.aws.region,
apiVersion: config.aws.ses_api_version,
}),
});

export default (job: Job<Email>) => emailTransporter.sendMail(job.data);
26 changes: 26 additions & 0 deletions src/queues/email/email.queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { EMAIL_QUEUE_IDENTIFIER, Email } from '../../types/email';
import { Queue, Job } from 'bullmq';
import { redisConfig } from './redisConfig';
import EmailWorker from './email.worker';
import logger from '../../core/logger';
import { captureException } from '../../core/sentry';

export const emailQueue = new Queue<Email>(EMAIL_QUEUE_IDENTIFIER, {
connection: redisConfig,
});

EmailWorker.on('ready', () => {
logger.info(`Email queue worker ready.`);
});

EmailWorker.on('completed', (job: Job<Email>) => {
logger.info(`Email sent to ${job.data.recipient} from ${job.data.sender}.`);
});

EmailWorker.on('failed', (job: Job<Email> | undefined, error) => {
logger.warn(`Sending email failed to ${job?.data.recipient} with error ${error}`);
captureException(error, {
recipient: job?.data.recipient,
jobName: job?.name,
});
});
9 changes: 9 additions & 0 deletions src/queues/email/email.worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Worker } from 'bullmq';
import { redisConfig } from './redisConfig';
import { EMAIL_QUEUE_IDENTIFIER } from '../../types/email';

const EmailWorker = new Worker(EMAIL_QUEUE_IDENTIFIER, __dirname + '/email.processor.ts', {
connection: redisConfig,
});

export default EmailWorker;
12 changes: 12 additions & 0 deletions src/queues/email/redisConfig.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import Redis from 'ioredis';
import logger from '../../core/logger';

export const redisConfig = new Redis({
port: 6380,
host: '127.0.0.1',
maxRetriesPerRequest: null,
});

redisConfig.connect(() => {
logger.info('Connected to queue redis.');
});
Loading

0 comments on commit 066b01d

Please sign in to comment.