Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 102 additions & 3 deletions platform/src/components/aws/bus-queue-subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { ComponentResourceOptions, Input, output } from "@pulumi/pulumi";
import { Component, transform } from "../component";
import { BusBaseSubscriberArgs, createRule } from "./bus-base-subscriber";
import { cloudwatch, sqs } from "@pulumi/aws";
import { cloudwatch, sqs, iam } from "@pulumi/aws";
import { Queue } from "./queue";
import { parseArn } from "./helpers/arn";

export interface Args extends BusBaseSubscriberArgs {
/**
Expand All @@ -22,7 +23,7 @@ export interface Args extends BusBaseSubscriberArgs {
* You'll find this component returned by the `subscribeQueue` method of the `Bus` component.
*/
export class BusQueueSubscriber extends Component {
private readonly policy: sqs.QueuePolicy;
private readonly policy: Output<sqs.QueuePolicy>;
private readonly rule: cloudwatch.EventRule;
private readonly target: cloudwatch.EventTarget;

Expand All @@ -31,9 +32,66 @@ export class BusQueueSubscriber extends Component {

const self = this;
const bus = output(args.bus);
const busArn = bus.arn;
const queueArn = output(args.queue).apply((queue) =>
queue instanceof Queue ? queue.arn : output(queue),
);

// Detect cross-account scenario by comparing account IDs
const isCrossAccount = output(busArn).apply((busArnStr) =>
queueArn.apply((queueArnStr) => {
const busParsed = parseArn(busArnStr);
const queueParsed = parseArn(queueArnStr);
const crossAccount = busParsed.account !== queueParsed.account;
console.log("Cross-account detection:", {
busAccount: busParsed.account,
queueAccount: queueParsed.account,
isCrossAccount: crossAccount,
});
return crossAccount;
}),
);

// Create IAM role only for cross-account scenarios
// This role allows EventBridge in the bus's account to send messages to the queue
const targetRole = isCrossAccount.apply((crossAccount) => {
if (!crossAccount) return undefined;

// IAM role created in bus's account (using the provided provider)
const role = new iam.Role(
`${name}TargetRole`,
{
assumeRolePolicy: iam.assumeRolePolicyForPrincipal({
Service: "events.amazonaws.com",
}),
},
{ provider: opts?.provider },
);

// Inline policy granting sqs:SendMessage to the target queue
new iam.RolePolicy(
`${name}TargetRolePolicy`,
{
role: role.id,
policy: queueArn.apply((arn) =>
JSON.stringify({
Version: "2012-10-17",
Statement: [
{
Effect: "Allow",
Action: "sqs:SendMessage",
Resource: arn,
},
],
}),
),
},
{ provider: opts?.provider },
);

return role;
});

const policy = createPolicy();
const rule = createRule(name, bus.name, args, self);
const target = createTarget();
Expand All @@ -43,7 +101,46 @@ export class BusQueueSubscriber extends Component {
this.target = target;

function createPolicy() {
return Queue.createPolicy(`${name}Policy`, queueArn, { parent: self });
// For cross-account: create queue policy WITHOUT parent to force default provider
// For same-account: use Queue.createPolicy with normal parent relationship
return isCrossAccount.apply((crossAccount) => {
if (crossAccount) {
// Cross-account: Create policy directly with default provider (no parent)
// This is CRITICAL - the policy must be in the queue's account, not bus's account
// Get the role ARN for the queue policy principal
const roleArn = targetRole.apply((role) => role?.arn);

return new sqs.QueuePolicy(
`${name}Policy`,
{
queueUrl: queueArn.apply((arn) => {
// Parse SQS ARN: arn:aws:sqs:region:account-id:queue-name
// Queue URL: https://sqs.{region}.amazonaws.com/{account-id}/{queue-name}
const parsed = parseArn(arn);
return `https://sqs.${parsed.region}.amazonaws.com/${parsed.account}/${parsed.resource}`;
}),
policy: iam.getPolicyDocumentOutput({
statements: [
{
actions: ["sqs:SendMessage"],
resources: [queueArn],
principals: [
{
type: "AWS",
identifiers: [roleArn],
},
],
},
],
}).json,
},
{ retainOnDelete: true }, // No parent, no provider = default provider
);
} else {
// Same-account: Use normal Queue.createPolicy with parent
return Queue.createPolicy(`${name}Policy`, queueArn, { parent: self });
}
});
}

function createTarget() {
Expand All @@ -55,6 +152,8 @@ export class BusQueueSubscriber extends Component {
arn: queueArn,
rule: rule.name,
eventBusName: bus.name,
// roleArn is required only for cross-account scenarios
roleArn: targetRole?.arn,
},
{ parent: self },
),
Expand Down
19 changes: 13 additions & 6 deletions platform/src/components/aws/bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ export class Bus extends Component implements Link.Linkable {
this.nodes.bus.name,
queue,
args,
{ provider: this.constructorOpts.provider },
);
}

Expand Down Expand Up @@ -526,10 +527,11 @@ export class Bus extends Component implements Link.Linkable {
busArn: Input<string>,
queue: Input<string | Queue>,
args?: BusSubscriberArgs,
opts?: ComponentResourceOptions,
) {
return output(busArn).apply((busArn) => {
const busName = parseEventBusArn(busArn).busName;
return this._subscribeQueue(busName, name, busArn, busName, queue, args);
return this._subscribeQueue(busName, name, busArn, busName, queue, args, opts);
});
}

Expand All @@ -540,13 +542,18 @@ export class Bus extends Component implements Link.Linkable {
busName: Input<string>,
queue: Input<string | Queue>,
args: BusSubscriberArgs = {},
opts: ComponentResourceOptions = {},
) {
return output(args).apply((args) => {
return new BusQueueSubscriber(`${name}Subscriber${subscriberName}`, {
bus: { name: busName, arn: busArn },
queue,
...args,
});
return new BusQueueSubscriber(
`${name}Subscriber${subscriberName}`,
{
bus: { name: busName, arn: busArn },
queue,
...args,
},
opts,
);
});
}

Expand Down
33 changes: 33 additions & 0 deletions platform/src/components/aws/helpers/arn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,36 @@ export function parseDsqlPrivateEndpoint(
);
return privateDnsName.replace("*", clusterId);
}

/**
* Parses a generic AWS ARN and extracts its components.
* ARN format: arn:aws:service:region:account-id:resource
*
* @param arn - The ARN string to parse
* @returns Object with service, region, account, and resource components
* @throws VisibleError if the ARN format is invalid
*
* @example
* ```typescript
* parseArn("arn:aws:events:us-east-1:123456789012:event-bus/my-bus")
* // Returns: { service: "events", region: "us-east-1", account: "123456789012", resource: "event-bus/my-bus" }
* ```
*/
export function parseArn(arn: string): {
service: string;
region: string;
account: string;
resource: string;
} {
// ARN format: arn:aws:service:region:account-id:resource
const parts = arn.split(":");
if (parts[0] !== "arn" || parts.length < 6) {
throw new VisibleError(`Invalid ARN format: ${arn}`);
}
return {
service: parts[2],
region: parts[3],
account: parts[4],
resource: parts.slice(5).join(":"),
};
}