Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat/conc) Move fully to a concurrency limit system #1045

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion apps/api/src/controllers/auth.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { parseApi } from "../lib/parseApi";
import { getRateLimiter } from "../services/rate-limiter";
import { getRateLimiter, isTestSuiteToken } from "../services/rate-limiter";
import {
AuthResponse,
NotificationType,
Expand Down Expand Up @@ -319,6 +319,16 @@ export async function supaAuthenticateUser(
// return { success: false, error: "Unauthorized: Invalid token", status: 401 };
}

if (token && isTestSuiteToken(token)) {
return {
success: true,
team_id: teamId ?? undefined,
// Now we have a test suite plan
plan: "testSuite",
chunk
};
}

return {
success: true,
team_id: teamId ?? undefined,
Expand Down
18 changes: 14 additions & 4 deletions apps/api/src/controllers/v1/concurrency-check.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import {
ConcurrencyCheckResponse,
RequestWithAuth,
} from "./types";
import { RateLimiterMode } from "../../types";
import { RateLimiterMode, PlanType } from "../../types";
import { Response } from "express";
import { redisConnection } from "../../services/queue-service";
import { getConcurrencyLimitMax } from "../../services/rate-limiter";

// Basically just middleware and error wrapping
export async function concurrencyCheckController(
req: RequestWithAuth<ConcurrencyCheckParams, undefined, undefined>,
Expand All @@ -19,7 +21,15 @@ export async function concurrencyCheckController(
now,
Infinity,
);
return res
.status(200)
.json({ success: true, concurrency: activeJobsOfTeam.length });

const maxConcurrency = getConcurrencyLimitMax(
req.auth.plan as PlanType,
req.auth.team_id,
);

return res.status(200).json({
success: true,
concurrency: activeJobsOfTeam.length,
maxConcurrency: maxConcurrency,
});
}
1 change: 1 addition & 0 deletions apps/api/src/controllers/v1/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ export type ConcurrencyCheckResponse =
| {
success: true;
concurrency: number;
maxConcurrency: number;
};

export type CrawlStatusResponse =
Expand Down
2 changes: 1 addition & 1 deletion apps/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,4 @@ logger.info(`Worker ${process.pid} started`);
// sq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
// sq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
// sq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));
//
//
14 changes: 9 additions & 5 deletions apps/api/src/lib/concurrency-limit.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import { getRateLimiterPoints } from "../services/rate-limiter";
import { CONCURRENCY_LIMIT } from "../services/rate-limiter";
import { redisConnection } from "../services/queue-service";
import { RateLimiterMode } from "../types";
import { PlanType } from "../types";
import { JobsOptions } from "bullmq";

const constructKey = (team_id: string) => "concurrency-limiter:" + team_id;
const constructQueueKey = (team_id: string) =>
"concurrency-limit-queue:" + team_id;
const stalledJobTimeoutMs = 2 * 60 * 1000;

export function getConcurrencyLimitMax(plan: string): number {
return getRateLimiterPoints(RateLimiterMode.Scrape, undefined, plan);
}


export async function cleanOldConcurrencyLimitEntries(
team_id: string,
Expand Down Expand Up @@ -77,3 +75,9 @@ export async function pushConcurrencyLimitedJob(
JSON.stringify(job),
);
}


export async function getConcurrencyQueueJobsCount(team_id: string): Promise<number> {
const count = await redisConnection.zcard(constructQueueKey(team_id));
return count;
}
4 changes: 4 additions & 0 deletions apps/api/src/lib/job-priority.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ export async function getJobPriority({
let bucketLimit = 0;

switch (plan) {
case "testSuite":
bucketLimit = 1000;
planModifier = 0.25;
break;
case "free":
bucketLimit = 25;
planModifier = 0.5;
Expand Down
94 changes: 94 additions & 0 deletions apps/api/src/services/notification/email_notification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ const emailTemplates: Record<
subject: "Auto recharge failed - Firecrawl",
html: "Hey there,<br/><p>Your auto recharge failed. Please try again manually. If the issue persists, please reach out to us at <a href='mailto:[email protected]'>[email protected]</a></p><br/>Thanks,<br/>Firecrawl Team<br/>",
},
[NotificationType.CONCURRENCY_LIMIT_REACHED]: {
subject: "You could be scraping faster - Firecrawl",
html: `Hey there,
<br/>
<p>We've improved our system by transitioning to concurrency limits, allowing faster scraping by default and eliminating* the often rate limit errors.</p>
<p>You're hitting the concurrency limit for your plan quite often, which means Firecrawl can't scrape as fast as it could. But don't worry, it is not failing your requests and you are still getting your results.</p>
<p>This is just to let you know that you could be scraping more pages faster. Consider upgrading your plan at <a href='https://firecrawl.dev/pricing'>firecrawl.dev/pricing</a>.</p><br/>Thanks,<br/>Firecrawl Team<br/>`,
},
};

export async function sendNotification(
Expand Down Expand Up @@ -183,3 +191,89 @@ export async function sendNotificationInternal(
},
);
}


export async function sendNotificationWithCustomDays(
team_id: string,
notificationType: NotificationType,
daysBetweenEmails: number,
bypassRecentChecks: boolean = false,
) {
return withAuth(async (
team_id: string,
notificationType: NotificationType,
daysBetweenEmails: number,
bypassRecentChecks: boolean,
) => {
const now = new Date();
const pastDate = new Date(now.getTime() - daysBetweenEmails * 24 * 60 * 60 * 1000);

const { data: recentNotifications, error: recentNotificationsError } = await supabase_service
.from("user_notifications")
.select("*")
.eq("team_id", team_id)
.eq("notification_type", notificationType)
.gte("sent_date", pastDate.toISOString());
mogery marked this conversation as resolved.
Show resolved Hide resolved

if (recentNotificationsError) {
logger.debug(`Error fetching recent notifications: ${recentNotificationsError}`);
return { success: false };
}

if (recentNotifications.length > 0 && !bypassRecentChecks) {
logger.debug(`Notification already sent within the last ${daysBetweenEmails} days for team_id: ${team_id} and notificationType: ${notificationType}`);
return { success: true };
}

console.log(
`Sending notification for team_id: ${team_id} and notificationType: ${notificationType}`,
);
// get the emails from the user with the team_id
const { data: emails, error: emailsError } = await supabase_service
.from("users")
.select("email")
.eq("team_id", team_id);

if (emailsError) {
logger.debug(`Error fetching emails: ${emailsError}`);
return { success: false };
}

for (const email of emails) {
await sendEmailNotification(email.email, notificationType);
}

const { error: insertError } = await supabase_service
.from("user_notifications")
.insert([
{
team_id: team_id,
notification_type: notificationType,
sent_date: new Date().toISOString(),
timestamp: new Date().toISOString(),
},
]);

if (process.env.SLACK_ADMIN_WEBHOOK_URL && emails.length > 0) {
sendSlackWebhook(
`${getNotificationString(notificationType)}: Team ${team_id}, with email ${emails[0].email}.`,
false,
process.env.SLACK_ADMIN_WEBHOOK_URL,
).catch((error) => {
logger.debug(`Error sending slack notification: ${error}`);
});
}

if (insertError) {
logger.debug(`Error inserting notification record: ${insertError}`);
return { success: false };
}

return { success: true };
}, undefined)(
team_id,
notificationType,
daysBetweenEmails,
bypassRecentChecks,
);
}
49 changes: 38 additions & 11 deletions apps/api/src/services/queue-jobs.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import { Job, JobsOptions } from "bullmq";
import { getScrapeQueue } from "./queue-service";
import { v4 as uuidv4 } from "uuid";
import { WebScraperOptions } from "../types";
import { NotificationType, PlanType, WebScraperOptions } from "../types";
import * as Sentry from "@sentry/node";
import {
cleanOldConcurrencyLimitEntries,
getConcurrencyLimitActiveJobs,
getConcurrencyLimitMax,
getConcurrencyQueueJobsCount,
pushConcurrencyLimitActiveJob,
pushConcurrencyLimitedJob,
} from "../lib/concurrency-limit";
import { logger } from "../lib/logger";
import { getConcurrencyLimitMax } from "./rate-limiter";
import { sendNotificationWithCustomDays } from "./notification/email_notification";

async function _addScrapeJobToConcurrencyQueue(
webScraperOptions: any,
Expand Down Expand Up @@ -57,21 +60,34 @@ async function addScrapeJobRaw(
jobPriority: number,
) {
let concurrencyLimited = false;
let currentActiveConcurrency = 0;
let maxConcurrency = 0;

console.log("Concurrency check: ", webScraperOptions.team_id);
if (
webScraperOptions &&
webScraperOptions.team_id &&
webScraperOptions.plan
webScraperOptions.team_id
) {
const now = Date.now();
const limit = await getConcurrencyLimitMax(webScraperOptions.plan);
maxConcurrency = getConcurrencyLimitMax(webScraperOptions.plan ?? "free", webScraperOptions.team_id);
cleanOldConcurrencyLimitEntries(webScraperOptions.team_id, now);
concurrencyLimited =
(await getConcurrencyLimitActiveJobs(webScraperOptions.team_id, now))
.length >= limit;
currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(webScraperOptions.team_id, now)).length;
concurrencyLimited = currentActiveConcurrency >= maxConcurrency;
}

const concurrencyQueueJobs = await getConcurrencyQueueJobsCount(webScraperOptions.team_id);

if (concurrencyLimited) {
// Detect if they hit their concurrent limit
// If above by 2x, send them an email
// No need to 2x as if there are more than the max concurrency in the concurrency queue, it is already 2x
if(concurrencyQueueJobs > maxConcurrency) {
logger.info("Concurrency limited 2x (single) - ", "Concurrency queue jobs: ", concurrencyQueueJobs, "Max concurrency: ", maxConcurrency);
sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 10, false).catch((error) => {
logger.error("Error sending notification (concurrency limit reached): ", error);
});
}

await _addScrapeJobToConcurrencyQueue(
webScraperOptions,
options,
Expand Down Expand Up @@ -134,22 +150,33 @@ export async function addScrapeJobs(
if (jobs.length === 0) return true;

let countCanBeDirectlyAdded = Infinity;
let currentActiveConcurrency = 0;
let maxConcurrency = 0;

if (jobs[0].data && jobs[0].data.team_id && jobs[0].data.plan) {
const now = Date.now();
const limit = await getConcurrencyLimitMax(jobs[0].data.plan);
maxConcurrency = getConcurrencyLimitMax(jobs[0].data.plan as PlanType, jobs[0].data.team_id);
cleanOldConcurrencyLimitEntries(jobs[0].data.team_id, now);

currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(jobs[0].data.team_id, now)).length;

countCanBeDirectlyAdded = Math.max(
limit -
(await getConcurrencyLimitActiveJobs(jobs[0].data.team_id, now)).length,
maxConcurrency - currentActiveConcurrency,
0,
);
}

const addToBull = jobs.slice(0, countCanBeDirectlyAdded);
const addToCQ = jobs.slice(countCanBeDirectlyAdded);

// equals 2x the max concurrency
if(addToCQ.length > maxConcurrency) {
logger.info("Concurrency limited 2x (multiple) - ", "Concurrency queue jobs: ", addToCQ.length, "Max concurrency: ", maxConcurrency);
sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 10, false).catch((error) => {
logger.error("Error sending notification (concurrency limit reached): ", error);
});
}

await Promise.all(
addToBull.map(async (job) => {
const size = JSON.stringify(job.data).length;
Expand Down
Loading
Loading