Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ class ManagedSupervisor {
snapshotId: message.snapshot.id,
snapshotFriendlyId: message.snapshot.friendlyId,
placementTags: message.placementTags,
envVars: message.envVars,
});

// Disabled for now
Expand Down
6 changes: 6 additions & 0 deletions apps/supervisor/src/workloadManager/docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ export class DockerWorkloadManager implements WorkloadManager {
});
}

if (opts.envVars) {
Object.entries(opts.envVars).forEach(([key, value]) => {
envVars.push(`${key}=${value}`);
});
}

const hostConfig: Docker.HostConfig = {
AutoRemove: !!this.opts.dockerAutoremove,
};
Expand Down
6 changes: 6 additions & 0 deletions apps/supervisor/src/workloadManager/kubernetes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ export class KubernetesWorkloadManager implements WorkloadManager {
value: value,
}))
: []),
...(opts.envVars
? Object.entries(opts.envVars).map(([key, value]) => ({
name: key,
value: value,
}))
: []),
],
},
],
Expand Down
1 change: 1 addition & 0 deletions apps/supervisor/src/workloadManager/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export interface WorkloadManagerCreateOptions {
nextAttemptNumber?: number;
dequeuedAt: Date;
placementTags?: PlacementTag[];
envVars?: Record<string, string>;
// identifiers
envId: string;
envType: EnvironmentType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,44 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
}

async dequeue({ runnerId }: { runnerId?: string }): Promise<DequeuedMessage[]> {
return await this._engine.dequeueFromWorkerQueue({
const messages = await this._engine.dequeueFromWorkerQueue({
consumerId: this.workerInstanceId,
workerQueue: this.masterQueue,
workerId: this.workerInstanceId,
runnerId,
});

// Fetch and inject environment variables for each message
const messagesWithEnvVars = await Promise.all(
messages.map(async (message) => {
const defaultMachinePreset = machinePresetFromName(defaultMachine);

const environment = await this._prisma.runtimeEnvironment.findFirst({
where: {
id: message.environment.id,
},
include: {
parentEnvironment: true,
},
});

const envVars = environment
? await this.getEnvVars(
environment,
message.run.id,
message.run.machine ?? defaultMachinePreset,
environment.parentEnvironment ?? undefined
)
: {};

return {
...message,
envVars,
};
})
);
Comment on lines +381 to +408
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Missing taskEventStore parameter in getEnvVars call.

The getEnvVars method expects an optional taskEventStore parameter (line 582), which is used to set OTEL_RESOURCE_ATTRIBUTES for observability (lines 599-610). The startRunAttempt method correctly passes this parameter (line 481), but the dequeue enrichment does not.

Apply this diff to include the taskEventStore parameter:

         const envVars = environment
           ? await this.getEnvVars(
               environment,
               message.run.id,
               message.run.machine ?? defaultMachinePreset,
-              environment.parentEnvironment ?? undefined
+              environment.parentEnvironment ?? undefined,
+              message.run.taskEventStore ?? undefined
             )
           : {};
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const messagesWithEnvVars = await Promise.all(
messages.map(async (message) => {
const defaultMachinePreset = machinePresetFromName(defaultMachine);
const environment = await this._prisma.runtimeEnvironment.findFirst({
where: {
id: message.environment.id,
},
include: {
parentEnvironment: true,
},
});
const envVars = environment
? await this.getEnvVars(
environment,
message.run.id,
message.run.machine ?? defaultMachinePreset,
environment.parentEnvironment ?? undefined
)
: {};
return {
...message,
envVars,
};
})
);
const messagesWithEnvVars = await Promise.all(
messages.map(async (message) => {
const defaultMachinePreset = machinePresetFromName(defaultMachine);
const environment = await this._prisma.runtimeEnvironment.findFirst({
where: {
id: message.environment.id,
},
include: {
parentEnvironment: true,
},
});
const envVars = environment
? await this.getEnvVars(
environment,
message.run.id,
message.run.machine ?? defaultMachinePreset,
environment.parentEnvironment ?? undefined,
message.run.taskEventStore ?? undefined
)
: {};
return {
...message,
envVars,
};
})
);
🤖 Prompt for AI Agents
In apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts around
lines 381-408, the call to getEnvVars is missing the optional taskEventStore
parameter; update the call to pass the same taskEventStore used elsewhere in
this class (the same variable passed in startRunAttempt at ~line 481) — e.g. add
, taskEventStore (or this._taskEventStore if it's a class member) as the last
argument so getEnvVars(environment, message.run.id, message.run.machine ??
defaultMachinePreset, environment.parentEnvironment ?? undefined,
taskEventStore).


return messagesWithEnvVars;
}

async heartbeatWorkerInstance() {
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/schemas/runEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,5 +270,6 @@ export const DequeuedMessage = z.object({
id: z.string(),
}),
placementTags: z.array(PlacementTag).optional(),
envVars: z.record(z.string()).optional(),
});
export type DequeuedMessage = z.infer<typeof DequeuedMessage>;