Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f7f807a
feat(workflows): use SES tenants
havenbarnes Oct 30, 2025
ba1cda3
tweak
havenbarnes Oct 30, 2025
07365ae
tweak
havenbarnes Oct 30, 2025
6d1923a
Merge branch 'master' of https://github.com/PostHog/posthog into feat…
havenbarnes Nov 3, 2025
86b585d
Merge branch 'master' of https://github.com/PostHog/posthog into feat…
havenbarnes Nov 3, 2025
65ce6d6
Merge branch 'master' of https://github.com/PostHog/posthog into feat…
havenbarnes Nov 3, 2025
208603d
Merge branch 'master' into feat/ses-tenants
havenbarnes Nov 3, 2025
f05b079
Merge branch 'master' of https://github.com/PostHog/posthog into feat…
havenbarnes Nov 4, 2025
d2ad379
tweak
havenbarnes Nov 4, 2025
db83917
Merge branch 'feat/ses-tenants' of https://github.com/PostHog/posthog…
havenbarnes Nov 4, 2025
4a1f272
Merge branch 'master' of https://github.com/PostHog/posthog into feat…
havenbarnes Nov 4, 2025
7953344
verified everything e2e locally
havenbarnes Nov 4, 2025
fd8c8f5
fix test
havenbarnes Nov 4, 2025
31fc552
Merge branch 'master' of https://github.com/PostHog/posthog into feat…
havenbarnes Nov 4, 2025
fa2e96e
Merge branch 'master' into feat/ses-tenants
havenbarnes Nov 4, 2025
1ab96c3
cleanup
havenbarnes Nov 5, 2025
0dd8c8b
Merge branch 'feat/ses-tenants' of https://github.com/PostHog/posthog…
havenbarnes Nov 5, 2025
66e9b27
Merge branch 'master' of https://github.com/PostHog/posthog into feat…
havenbarnes Nov 5, 2025
d0ce5d0
fix test
havenbarnes Nov 5, 2025
5f3dc28
tweak
havenbarnes Nov 5, 2025
121d32f
tweak
havenbarnes Nov 5, 2025
335da28
tweak
havenbarnes Nov 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions posthog/management/commands/migrate_ses_tenants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import logging
from collections.abc import Iterable

from django.conf import settings
from django.core.management.base import BaseCommand
from django.core.paginator import Paginator
from django.db.models import Q

import boto3
from botocore.exceptions import BotoCoreError, ClientError

from posthog.models.integration import Integration

logger = logging.getLogger(__name__)


def _batched(iterable: Iterable, size: int) -> Iterable[list]:
batch: list = []
for item in iterable:
batch.append(item)
if len(batch) >= size:
yield batch
batch = []
if batch:
yield batch


def migrate_ses_tenants(team_ids: list[int], domains: list[str], dry_run: bool = False):
"""
Ensure existing SES email identities have SES Tenants and Tenant Resource Associations.

The command is idempotent.
"""
if team_ids and domains:
print("Please provide either team_ids or domains, not both") # noqa: T201
return

query = (
Integration.objects.filter(kind="email")
.filter(Q(config__provider="ses") | Q(config__provider__isnull=True))
.order_by("id")
)

if team_ids:
print("Setting up SES tenants for teams:", team_ids) # noqa: T201
query = query.filter(team_id__in=team_ids)
elif domains:
print("Setting up SES tenants for domains:", domains) # noqa: T201
# Domains are stored in Integration.config["domain"]
query = query.filter(config__domain__in=domains)
else:
print("Setting up SES tenants for all SES email identities") # noqa: T201

# Collect unique (team_id, domain) pairs to avoid duplicate work per domain
pairs: list[tuple[int, str]] = []
paginator = Paginator(query, 200)

for page_num in paginator.page_range:
page = paginator.page(page_num)
for integration in page.object_list:
domain = integration.config.get("domain")
if not domain:
continue
provider = integration.config.get("provider", "mailjet")
if provider != "ses":
continue
pair = (integration.team_id, domain)
if pair not in pairs:
pairs.append(pair)

if not pairs:
print("No SES email identities found to migrate.") # noqa: T201
return

sts_client = boto3.client(
"sts",
)
tenant_client = boto3.client(
"sesv2",
)

try:
aws_account_id = sts_client.get_caller_identity()["Account"]
except (ClientError, BotoCoreError) as e:
logger.exception("Failed to get AWS account id for SES tenant association: %s", e)
print("Error determining AWS account ID. Aborting.") # noqa: T201
return

for batch in _batched(pairs, 50):
for team_id, domain in batch:
tenant_name = f"team-{team_id}"
identity_arn = f"arn:aws:ses:{settings.SES_REGION}:{aws_account_id}:identity/{domain}"

# Create tenant if missing
try:
if dry_run:
print(f"[DRY-RUN] Would ensure tenant '{tenant_name}' exists") # noqa: T201
else:
try:
tenant_client.create_tenant(
TenantName=tenant_name,
Tags=[{"Key": "team_id", "Value": str(team_id)}],
)
print(f"Created SES tenant '{tenant_name}'") # noqa: T201
except ClientError as e:
if e.response.get("Error", {}).get("Code") == "AlreadyExistsException":
print(f"Tenant '{tenant_name}' already exists") # noqa: T201
else:
raise
except (ClientError, BotoCoreError) as e:
logger.exception("Error creating SES tenant '%s': %s", tenant_name, e)
print(f"Error creating tenant '{tenant_name}': {e}") # noqa: T201
continue

# Create association if missing
try:
if dry_run:
print(f"[DRY-RUN] Would associate identity '{identity_arn}' with tenant '{tenant_name}'") # noqa: T201
else:
try:
tenant_client.create_tenant_resource_association(
TenantName=tenant_name,
ResourceArn=identity_arn,
)
print(f"Associated identity '{domain}' with tenant '{tenant_name}'") # noqa: T201
except ClientError as e:
if e.response.get("Error", {}).get("Code") == "AlreadyExistsException":
print(f"Association already exists for '{domain}' and tenant '{tenant_name}'") # noqa: T201
else:
raise
except (ClientError, BotoCoreError) as e:
logger.exception(
"Error creating SES tenant_resource_association for '%s' on '%s': %s",
domain,
tenant_name,
e,
)
print(f"Error creating tenant_resource_association for '{domain}' on '{tenant_name}': {e}") # noqa: T201
continue


class Command(BaseCommand):
help = "Migrate existing SES identities to use SES Tenants and resource associations"

def add_arguments(self, parser):
parser.add_argument(
"--dry-run",
action="store_true",
help="If set, will not perform changes, only print actions",
)
parser.add_argument(
"--team-ids",
type=str,
help="Comma separated list of team ids to migrate",
)
parser.add_argument(
"--domains",
type=str,
help="Comma separated list of email domains to migrate (e.g., example.com,foo.bar)",
)

def handle(self, *args, **options):
dry_run: bool = bool(options.get("dry_run"))
team_ids_opt = options.get("team_ids")
domains_opt = options.get("domains")

team_ids = [int(x) for x in team_ids_opt.split(",")] if team_ids_opt else []
domains = [x.strip() for x in domains_opt.split(",")] if domains_opt else []

migrate_ses_tenants(team_ids=team_ids, domains=domains, dry_run=dry_run)
122 changes: 122 additions & 0 deletions posthog/management/commands/test/test_migrate_ses_tenants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
from posthog.test.base import BaseTest
from unittest.mock import patch

from django.test import override_settings

from posthog.management.commands.migrate_ses_tenants import migrate_ses_tenants
from posthog.models.integration import Integration


class _FakeSESv2Client:
def __init__(self):
self.created_tenants: list[str] = []
self.associations: list[tuple[str, str]] = []

def get_caller_identity(self):
return {"Account": "123456789012"}

def create_tenant(self, TenantName: str, Tags: list[dict]): # noqa: N803
# emulate idempotency externally in test assertions
if TenantName in self.created_tenants:
from botocore.exceptions import ClientError

raise ClientError({"Error": {"Code": "AlreadyExistsException", "Message": "Tenant exists"}}, "CreateTenant")
self.created_tenants.append(TenantName)
return {"TenantName": TenantName}

def create_tenant_resource_association(self, TenantName: str, ResourceArn: str): # noqa: N803
# emulate idempotency externally in test assertions
pair = (TenantName, ResourceArn)
if pair in self.associations:
from botocore.exceptions import ClientError

raise ClientError(
{"Error": {"Code": "AlreadyExistsException", "Message": "Association exists"}},
"CreateTenantResourceAssociation",
)
self.associations.append(pair)
return {"TenantName": TenantName, "ResourceArn": ResourceArn}


class TestMigrateSESTenants(BaseTest):
def setUp(self):
super().setUp()
# Two SES email integrations on the same domain (should dedupe by (team, domain))
Integration.objects.create(
team=self.team,
kind="email",
integration_id="[email protected]",
config={"domain": "example.com", "provider": "ses"},
created_by=self.user,
)
Integration.objects.create(
team=self.team,
kind="email",
integration_id="[email protected]",
config={"domain": "example.com", "provider": "ses"},
created_by=self.user,
)
# Non-SES provider should be ignored
Integration.objects.create(
team=self.team,
kind="email",
integration_id="[email protected]",
config={"domain": "other.com", "provider": "mailjet"},
created_by=self.user,
)

@override_settings(SES_ACCESS_KEY_ID="test", SES_SECRET_ACCESS_KEY="test", SES_REGION="us-east-1", SES_ENDPOINT="")
@patch("posthog.management.commands.migrate_ses_tenants.boto3.client")
def test_dry_run(self, mock_boto_client):
# Arrange stub clients
sesv2 = _FakeSESv2Client()
mock_boto_client.side_effect = lambda service, **kwargs: sesv2

# Act: dry-run should not attempt create calls but will still resolve account id
migrate_ses_tenants(team_ids=[], domains=[], dry_run=True)

# Assert: no tenants/associations performed
assert sesv2.created_tenants == []
assert sesv2.associations == []

@override_settings(SES_ACCESS_KEY_ID="test", SES_SECRET_ACCESS_KEY="test", SES_REGION="us-east-1", SES_ENDPOINT="")
@patch("posthog.management.commands.migrate_ses_tenants.boto3.client")
def test_migrate_for_team(self, mock_boto_client):
sesv2 = _FakeSESv2Client()
mock_boto_client.side_effect = lambda service, **kwargs: sesv2

migrate_ses_tenants(team_ids=[self.team.id], domains=[], dry_run=False)

# Deduped: only one tenant and one association for (team, example.com)
assert sesv2.created_tenants == [f"team-{self.team.id}"]
expected_arn = f"arn:aws:ses:us-east-1:123456789012:identity/example.com"
assert sesv2.associations == [(f"team-{self.team.id}", expected_arn)]

@override_settings(SES_ACCESS_KEY_ID="test", SES_SECRET_ACCESS_KEY="test", SES_REGION="eu-west-1", SES_ENDPOINT="")
@patch("posthog.management.commands.migrate_ses_tenants.boto3.client")
def test_migrate_for_domain_filter(self, mock_boto_client):
sesv2 = _FakeSESv2Client()
mock_boto_client.side_effect = lambda service, **kwargs: sesv2

# Use domains filter; should match example.com only
migrate_ses_tenants(team_ids=[], domains=["example.com"], dry_run=False)

assert sesv2.created_tenants == [f"team-{self.team.id}"]
expected_arn = f"arn:aws:ses:eu-west-1:123456789012:identity/example.com"
assert sesv2.associations == [(f"team-{self.team.id}", expected_arn)]

@override_settings(SES_ACCESS_KEY_ID="test", SES_SECRET_ACCESS_KEY="test", SES_REGION="us-east-1", SES_ENDPOINT="")
@patch("posthog.management.commands.migrate_ses_tenants.boto3.client")
def test_idempotent_on_repeated_run(self, mock_boto_client):
sesv2 = _FakeSESv2Client()
mock_boto_client.side_effect = lambda service, **kwargs: sesv2

# First run creates
migrate_ses_tenants(team_ids=[self.team.id], domains=[], dry_run=False)
# Second run should hit AlreadyExistsException internally and not error
migrate_ses_tenants(team_ids=[self.team.id], domains=[], dry_run=False)

# Still only one tenant and association recorded
assert sesv2.created_tenants == [f"team-{self.team.id}"]
expected_arn = f"arn:aws:ses:us-east-1:123456789012:identity/example.com"
assert sesv2.associations == [(f"team-{self.team.id}", expected_arn)]
47 changes: 38 additions & 9 deletions products/workflows/backend/providers/ses.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,48 @@

class SESProvider:
def __init__(self):
# Initialize SES client
self.client = boto3.client(
# Initialize the boto3 clients
self.sts_client = boto3.client(
"sts",
aws_access_key_id=settings.SES_ACCESS_KEY_ID,
aws_secret_access_key=settings.SES_SECRET_ACCESS_KEY,
region_name=settings.SES_REGION,
)
self.ses_client = boto3.client(
"ses",
aws_access_key_id=settings.SES_ACCESS_KEY_ID,
aws_secret_access_key=settings.SES_SECRET_ACCESS_KEY,
region_name=settings.SES_REGION,
endpoint_url=settings.SES_ENDPOINT if settings.SES_ENDPOINT else None,
)
self.ses_v2_client = boto3.client(
"sesv2",
aws_access_key_id=settings.SES_ACCESS_KEY_ID,
aws_secret_access_key=settings.SES_SECRET_ACCESS_KEY,
region_name=settings.SES_REGION,
)

def create_email_domain(self, domain: str, team_id: int):
# NOTE: For sesv1 creation is done through verification
# NOTE: For sesv1, domain Identity creation is done through verification
self.verify_email_domain(domain, team_id)

# Create a tenant for the domain if not exists
tenant_name = f"team-{team_id}"
try:
self.ses_v2_client.create_tenant(TenantName=tenant_name, Tags=[{"Key": "team_id", "Value": str(team_id)}])
except ClientError as e:
if e.response["Error"]["Code"] != "AlreadyExistsException":
raise

# Associate the new domain identity with the tenant
try:
self.ses_v2_client.create_tenant_resource_association(
TenantName=tenant_name,
ResourceArn=f"arn:aws:ses:{settings.SES_REGION}:{self.sts_client.get_caller_identity()['Account']}:identity/{domain}",
)
except ClientError as e:
if e.response["Error"]["Code"] != "AlreadyExistsException":
raise

def verify_email_domain(self, domain: str, team_id: int):
# Validate the domain contains valid characters for a domain name
DOMAIN_REGEX = r"(?i)^([a-z0-9]+(-[a-z0-9]+)*\.)+[a-z]{2,}$"
Expand All @@ -36,7 +65,7 @@ def verify_email_domain(self, domain: str, team_id: int):
# Start/ensure domain verification (TXT at _amazonses.domain) ---
verification_token = None
try:
resp = self.client.verify_domain_identity(Domain=domain)
resp = self.ses_client.verify_domain_identity(Domain=domain)
verification_token = resp.get("VerificationToken")
except ClientError as e:
# If already requested/exists, carry on; SES v1 is idempotent-ish here
Expand All @@ -57,7 +86,7 @@ def verify_email_domain(self, domain: str, team_id: int):
# Start/ensure DKIM (three CNAMEs) ---
dkim_tokens: list[str] = []
try:
resp = self.client.verify_domain_dkim(Domain=domain)
resp = self.ses_client.verify_domain_dkim(Domain=domain)
dkim_tokens = resp.get("DkimTokens", []) or []
except ClientError as e:
if e.response["Error"]["Code"] not in ("InvalidParameterValue",):
Expand Down Expand Up @@ -86,15 +115,15 @@ def verify_email_domain(self, domain: str, team_id: int):

# Current verification / DKIM statuses to compute overall status & per-record statuses ---
try:
id_attrs = self.client.get_identity_verification_attributes(Identities=[domain])
id_attrs = self.ses_client.get_identity_verification_attributes(Identities=[domain])
verification_status = (
id_attrs["VerificationAttributes"].get(domain, {}).get("VerificationStatus", "Unknown")
)
except ClientError:
verification_status = "Unknown"

try:
dkim_attrs = self.client.get_identity_dkim_attributes(Identities=[domain])
dkim_attrs = self.ses_client.get_identity_dkim_attributes(Identities=[domain])
dkim_status = dkim_attrs["DkimAttributes"].get(domain, {}).get("DkimVerificationStatus", "Unknown")
except ClientError:
dkim_status = "Unknown"
Expand Down Expand Up @@ -131,7 +160,7 @@ def delete_identity(self, identity: str):
Delete an identity from SES
"""
try:
self.client.delete_identity(Identity=identity)
self.ses_client.delete_identity(Identity=identity)
logger.info(f"Identity {identity} deleted from SES")
except (ClientError, BotoCoreError) as e:
logger.exception(f"SES API error deleting identity: {e}")
Expand Down
Loading
Loading