Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Fix execution cancellation in scaling mode #9841

Merged
merged 4 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/cli/src/ActiveExecutions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ export class ActiveExecutions {
private readonly concurrencyControl: ConcurrencyControlService,
) {}

has(executionId: string) {
return this.activeExecutions[executionId] !== undefined;
}

/**
* Add a new active execution
*/
Expand Down
6 changes: 6 additions & 0 deletions packages/cli/src/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ export class Queue {
});
}

async findRunningJobBy({ executionId }: { executionId: string }) {
const activeOrWaitingJobs = await this.getJobs(['active', 'waiting']);

return activeOrWaitingJobs.find(({ data }) => data.executionId === executionId) ?? null;
}

decodeWebhookResponse(response: IExecuteResponsePromiseData): IExecuteResponsePromiseData {
if (
typeof response === 'object' &&
Expand Down
61 changes: 10 additions & 51 deletions packages/cli/src/WaitTracker.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import {
ApplicationError,
ErrorReporterProxy as ErrorReporter,
WorkflowOperationError,
} from 'n8n-workflow';
import { ApplicationError, ErrorReporterProxy as ErrorReporter } from 'n8n-workflow';
import { Service } from 'typedi';
import type { ExecutionStopResult, IWorkflowExecutionDataProcess } from '@/Interfaces';
import type { IWorkflowExecutionDataProcess } from '@/Interfaces';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { OwnershipService } from '@/services/ownership.service';
Expand All @@ -30,6 +26,10 @@ export class WaitTracker {
private readonly orchestrationService: OrchestrationService,
) {}

has(executionId: string) {
return this.waitingExecutions[executionId] !== undefined;
}

/**
* @important Requires `OrchestrationService` to be initialized.
*/
Expand Down Expand Up @@ -101,53 +101,12 @@ export class WaitTracker {
}
}

async stopExecution(executionId: string): Promise<ExecutionStopResult> {
if (this.waitingExecutions[executionId] !== undefined) {
// The waiting execution was already scheduled to execute.
// So stop timer and remove.
clearTimeout(this.waitingExecutions[executionId].timer);
delete this.waitingExecutions[executionId];
}
async stopExecution(executionId: string) {
if (!this.waitingExecutions[executionId]) return;

// Also check in database
const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
clearTimeout(this.waitingExecutions[executionId].timer);

if (!fullExecutionData) {
throw new ApplicationError('Execution not found.', {
extra: { executionId },
});
}

if (!['new', 'unknown', 'waiting', 'running'].includes(fullExecutionData.status)) {
throw new WorkflowOperationError(
`Only running or waiting executions can be stopped and ${executionId} is currently ${fullExecutionData.status}.`,
);
}
// Set in execution in DB as failed and remove waitTill time
const error = new WorkflowOperationError('Workflow-Execution has been canceled!');

fullExecutionData.data.resultData.error = {
...error,
message: error.message,
stack: error.stack,
};

fullExecutionData.stoppedAt = new Date();
fullExecutionData.waitTill = null;
fullExecutionData.status = 'canceled';

await this.executionRepository.updateExistingExecution(executionId, fullExecutionData);

return {
mode: fullExecutionData.mode,
startedAt: new Date(fullExecutionData.startedAt),
stoppedAt: fullExecutionData.stoppedAt ? new Date(fullExecutionData.stoppedAt) : undefined,
finished: fullExecutionData.finished,
status: fullExecutionData.status,
};
delete this.waitingExecutions[executionId];
}

startExecution(executionId: string) {
Expand Down
9 changes: 9 additions & 0 deletions packages/cli/src/concurrency/concurrency-control.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ export class ConcurrencyControlService {
});
}

/**
* Check whether an execution is in the production queue.
*/
has(executionId: string) {
if (!this.isEnabled) return false;

return this.productionQueue.getAll().has(executionId);
}

/**
* Block or let through an execution based on concurrency capacity.
*/
Expand Down
31 changes: 29 additions & 2 deletions packages/cli/src/databases/repositories/execution.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import type {
import { parse, stringify } from 'flatted';
import {
ApplicationError,
WorkflowOperationError,
type ExecutionStatus,
type ExecutionSummary,
type IRunExecutionData,
Expand Down Expand Up @@ -609,8 +610,34 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
});
}

async cancel(executionId: string) {
await this.update({ id: executionId }, { status: 'canceled', stoppedAt: new Date() });
async stopBeforeRun(execution: IExecutionResponse) {
execution.status = 'canceled';
execution.stoppedAt = new Date();

await this.update(
{ id: execution.id },
{ status: execution.status, stoppedAt: execution.stoppedAt },
);

return execution;
}

async stopDuringRun(execution: IExecutionResponse) {
const error = new WorkflowOperationError('Workflow-Execution has been canceled!');

execution.data.resultData.error = {
...error,
message: error.message,
stack: error.stack,
};

execution.stoppedAt = new Date();
execution.waitTill = null;
execution.status = 'canceled';

await this.updateExistingExecution(execution.id, execution);

return execution;
}

async cancelMany(executionIds: string[]) {
Expand Down
7 changes: 7 additions & 0 deletions packages/cli/src/errors/missing-execution-stop.error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { ApplicationError } from 'n8n-workflow';

export class MissingExecutionStopError extends ApplicationError {
constructor(executionId: string) {
super('Failed to find execution to stop', { extra: { executionId } });
}
}
Loading
Loading