Skip to content

Commit

Permalink
fix(core): Fix execution cancellation in scaling mode (#9841)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov committed Jun 28, 2024
1 parent 10f7d4b commit e613de2
Show file tree
Hide file tree
Showing 10 changed files with 408 additions and 134 deletions.
4 changes: 4 additions & 0 deletions packages/cli/src/ActiveExecutions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ export class ActiveExecutions {
private readonly concurrencyControl: ConcurrencyControlService,
) {}

has(executionId: string) {
return this.activeExecutions[executionId] !== undefined;
}

/**
* Add a new active execution
*/
Expand Down
6 changes: 6 additions & 0 deletions packages/cli/src/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ export class Queue {
});
}

async findRunningJobBy({ executionId }: { executionId: string }) {
const activeOrWaitingJobs = await this.getJobs(['active', 'waiting']);

return activeOrWaitingJobs.find(({ data }) => data.executionId === executionId) ?? null;
}

decodeWebhookResponse(response: IExecuteResponsePromiseData): IExecuteResponsePromiseData {
if (
typeof response === 'object' &&
Expand Down
61 changes: 10 additions & 51 deletions packages/cli/src/WaitTracker.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import {
ApplicationError,
ErrorReporterProxy as ErrorReporter,
WorkflowOperationError,
} from 'n8n-workflow';
import { ApplicationError, ErrorReporterProxy as ErrorReporter } from 'n8n-workflow';
import { Service } from 'typedi';
import type { ExecutionStopResult, IWorkflowExecutionDataProcess } from '@/Interfaces';
import type { IWorkflowExecutionDataProcess } from '@/Interfaces';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { OwnershipService } from '@/services/ownership.service';
Expand All @@ -30,6 +26,10 @@ export class WaitTracker {
private readonly orchestrationService: OrchestrationService,
) {}

has(executionId: string) {
return this.waitingExecutions[executionId] !== undefined;
}

/**
* @important Requires `OrchestrationService` to be initialized.
*/
Expand Down Expand Up @@ -101,53 +101,12 @@ export class WaitTracker {
}
}

async stopExecution(executionId: string): Promise<ExecutionStopResult> {
if (this.waitingExecutions[executionId] !== undefined) {
// The waiting execution was already scheduled to execute.
// So stop timer and remove.
clearTimeout(this.waitingExecutions[executionId].timer);
delete this.waitingExecutions[executionId];
}
async stopExecution(executionId: string) {
if (!this.waitingExecutions[executionId]) return;

// Also check in database
const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
clearTimeout(this.waitingExecutions[executionId].timer);

if (!fullExecutionData) {
throw new ApplicationError('Execution not found.', {
extra: { executionId },
});
}

if (!['new', 'unknown', 'waiting', 'running'].includes(fullExecutionData.status)) {
throw new WorkflowOperationError(
`Only running or waiting executions can be stopped and ${executionId} is currently ${fullExecutionData.status}.`,
);
}
// Set in execution in DB as failed and remove waitTill time
const error = new WorkflowOperationError('Workflow-Execution has been canceled!');

fullExecutionData.data.resultData.error = {
...error,
message: error.message,
stack: error.stack,
};

fullExecutionData.stoppedAt = new Date();
fullExecutionData.waitTill = null;
fullExecutionData.status = 'canceled';

await this.executionRepository.updateExistingExecution(executionId, fullExecutionData);

return {
mode: fullExecutionData.mode,
startedAt: new Date(fullExecutionData.startedAt),
stoppedAt: fullExecutionData.stoppedAt ? new Date(fullExecutionData.stoppedAt) : undefined,
finished: fullExecutionData.finished,
status: fullExecutionData.status,
};
delete this.waitingExecutions[executionId];
}

startExecution(executionId: string) {
Expand Down
9 changes: 9 additions & 0 deletions packages/cli/src/concurrency/concurrency-control.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ export class ConcurrencyControlService {
});
}

/**
* Check whether an execution is in the production queue.
*/
has(executionId: string) {
if (!this.isEnabled) return false;

return this.productionQueue.getAll().has(executionId);
}

/**
* Block or let through an execution based on concurrency capacity.
*/
Expand Down
31 changes: 29 additions & 2 deletions packages/cli/src/databases/repositories/execution.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import type {
import { parse, stringify } from 'flatted';
import {
ApplicationError,
WorkflowOperationError,
type ExecutionStatus,
type ExecutionSummary,
type IRunExecutionData,
Expand Down Expand Up @@ -609,8 +610,34 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
});
}

async cancel(executionId: string) {
await this.update({ id: executionId }, { status: 'canceled', stoppedAt: new Date() });
async stopBeforeRun(execution: IExecutionResponse) {
execution.status = 'canceled';
execution.stoppedAt = new Date();

await this.update(
{ id: execution.id },
{ status: execution.status, stoppedAt: execution.stoppedAt },
);

return execution;
}

async stopDuringRun(execution: IExecutionResponse) {
const error = new WorkflowOperationError('Workflow-Execution has been canceled!');

execution.data.resultData.error = {
...error,
message: error.message,
stack: error.stack,
};

execution.stoppedAt = new Date();
execution.waitTill = null;
execution.status = 'canceled';

await this.updateExistingExecution(execution.id, execution);

return execution;
}

async cancelMany(executionIds: string[]) {
Expand Down
7 changes: 7 additions & 0 deletions packages/cli/src/errors/missing-execution-stop.error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { ApplicationError } from 'n8n-workflow';

export class MissingExecutionStopError extends ApplicationError {
constructor(executionId: string) {
super('Failed to find execution to stop', { extra: { executionId } });
}
}
Loading

0 comments on commit e613de2

Please sign in to comment.