Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat/extract) Merge into 1 main queue #1048

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions apps/api/src/controllers/v1/extract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
extractRequestSchema,
ExtractResponse,
} from "./types";
import { getExtractQueue } from "../../services/queue-service";
import { getScrapeQueue } from "../../services/queue-service";
import * as Sentry from "@sentry/node";
import { saveExtract } from "../../lib/extract/extract-redis";
import { getTeamIdSyncB } from "../../lib/extract/team-id-sync";
Expand Down Expand Up @@ -73,12 +73,12 @@ export async function extractController(
op: "queue.publish",
attributes: {
"messaging.message.id": extractId,
"messaging.destination.name": getExtractQueue().name,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": size,
},
},
async (span) => {
await getExtractQueue().add(extractId, {
await getScrapeQueue().add(extractId, {
...jobData,
sentry: {
trace: Sentry.spanToTraceHeader(span),
Expand All @@ -89,7 +89,7 @@ export async function extractController(
},
);
} else {
await getExtractQueue().add(extractId, jobData, {
await getScrapeQueue().add(extractId, jobData, {
jobId: extractId,
});
}
Expand Down
16 changes: 3 additions & 13 deletions apps/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as Sentry from "@sentry/node";
import express, { NextFunction, Request, Response } from "express";
import bodyParser from "body-parser";
import cors from "cors";
import { getExtractQueue, getScrapeQueue } from "./services/queue-service";
import { getScrapeQueue } from "./services/queue-service";
import { v0Router } from "./routes/v0";
import os from "os";
import { logger } from "./lib/logger";
Expand Down Expand Up @@ -45,8 +45,8 @@ const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`);

const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
queues: [new BullAdapter(getScrapeQueue()), new BullAdapter(getExtractQueue())],
serverAdapter: serverAdapter,
queues: [new BullAdapter(getScrapeQueue())],
serverAdapter,
});

app.use(
Expand Down Expand Up @@ -245,13 +245,3 @@ app.use(
);

logger.info(`Worker ${process.pid} started`);

// const sq = getScrapeQueue();

// sq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
// sq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
// sq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
// sq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
// sq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
// sq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));
//
50 changes: 18 additions & 32 deletions apps/api/src/services/queue-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@ import { Queue } from "bullmq";
import { logger } from "../lib/logger";
import IORedis from "ioredis";

let scrapeQueue: Queue;
let extractQueue: Queue;
let mainQueue: Queue;
let loggingQueue: Queue;

export const redisConnection = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null,
});

export const scrapeQueueName = "{scrapeQueue}";
export const extractQueueName = "{extractQueue}";
export const mainQueueName = "{mainQueue}";
export const loggingQueueName = "{loggingQueue}";

export function getScrapeQueue() {
if (!scrapeQueue) {
scrapeQueue = new Queue(
scrapeQueueName,
export function getMainQueue() {
if (!mainQueue) {
mainQueue = new Queue(
mainQueueName,
{
connection: redisConnection,
defaultJobOptions: {
Expand All @@ -30,33 +28,21 @@ export function getScrapeQueue() {
},
}
);
logger.info("Web scraper queue created");
logger.info("Main queue created");
}
return scrapeQueue;
return mainQueue;
}

export function getExtractQueue() {
if (!extractQueue) {
extractQueue = new Queue(
extractQueueName,
{
connection: redisConnection,
defaultJobOptions: {
removeOnComplete: {
age: 90000, // 25 hours
},
removeOnFail: {
age: 90000, // 25 hours
},
},
}
);
logger.info("Extraction queue created");
export function getLoggingQueue() {
if (!loggingQueue) {
loggingQueue = new Queue(loggingQueueName, {
connection: redisConnection,
});
logger.info("Logging queue created");
}
return extractQueue;
return loggingQueue;
}


// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE
// import { QueueEvents } from 'bullmq';
// export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() });
// Backwards compatibility exports
export const getScrapeQueue = getMainQueue;
export const getExtractQueue = getMainQueue;
85 changes: 28 additions & 57 deletions apps/api/src/services/queue-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import {
getScrapeQueue,
getExtractQueue,
redisConnection,
scrapeQueueName,
extractQueueName,
mainQueueName,
} from "./queue-service";
import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook";
Expand Down Expand Up @@ -269,7 +268,23 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => {
await addJobPriority(job.data.team_id, job.id);
let err = null;
try {
if (job.data?.mode === "kickoff") {
// Check job type and process accordingly
if (job.data.extractId) {
// This is an extract job
const result = await performExtraction(job.data.extractId, {
request: job.data.request,
teamId: job.data.teamId,
plan: job.data.plan,
subId: job.data.subId,
});

if (result.success) {
await job.moveToCompleted(result, token, false);
} else {
throw new Error(result.error || "Unknown error during extraction");
}
} else if (job.data?.mode === "kickoff") {
// This is a kickoff job
const result = await processKickoffJob(job, token);
if (result.success) {
try {
Expand All @@ -280,6 +295,7 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => {
await job.moveToFailed((result as any).error, token, false);
}
} else {
// This is a scrape job
const result = await processJob(job, token);
if (result.success) {
try {
Expand All @@ -306,6 +322,14 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => {
Sentry.captureException(error);
err = error;
await job.moveToFailed(error, token, false);

// Handle extract job specific error updates
if (job.data.extractId) {
await updateExtract(job.data.extractId, {
status: "failed",
error: error.error ?? error ?? "Unknown error, please contact [email protected]. Extract id: " + job.data.extractId,
});
}
} finally {
await deleteJobPriority(job.data.team_id, job.id);
clearInterval(extendLockInterval);
Expand All @@ -314,58 +338,6 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => {
return err;
};

const processExtractJobInternal = async (token: string, job: Job & { id: string }) => {
const logger = _logger.child({
module: "extract-worker",
method: "processJobInternal",
jobId: job.id,
extractId: job.data.extractId,
teamId: job.data?.teamId ?? undefined,
});

const extendLockInterval = setInterval(async () => {
logger.info(`🔄 Worker extending lock on job ${job.id}`);
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);

try {
const result = await performExtraction(job.data.extractId, {
request: job.data.request,
teamId: job.data.teamId,
plan: job.data.plan,
subId: job.data.subId,
});

if (result.success) {
// Move job to completed state in Redis
await job.moveToCompleted(result, token, false);
return result;
} else {
throw new Error(result.error || "Unknown error during extraction");
}
} catch (error) {
logger.error(`🚫 Job errored ${job.id} - ${error}`, { error });

Sentry.captureException(error, {
data: {
job: job.id,
},
});

// Move job to failed state in Redis
await job.moveToFailed(error, token, false);

await updateExtract(job.data.extractId, {
status: "failed",
error: error.error ?? error ?? "Unknown error, please contact [email protected]. Extract id: " + job.data.extractId,
});
// throw error;
} finally {

clearInterval(extendLockInterval);
}
};

let isShuttingDown = false;

process.on("SIGINT", () => {
Expand Down Expand Up @@ -522,9 +494,8 @@ const workerFun = async (
}
};

// Start both workers
// Start single worker for all jobs
workerFun(getScrapeQueue(), processJobInternal);
workerFun(getExtractQueue(), processExtractJobInternal);

async function processKickoffJob(job: Job & { id: string }, token: string) {
const logger = _logger.child({
Expand Down
Loading