Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions ee/clickhouse/queries/test/__snapshots__/test_cohort_query.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,99 @@
join_algorithm = 'auto'
'''
# ---
# name: TestCohortQuery.test_cohort_filter_with_extra.14
'''
(
(SELECT persons.id AS id
FROM
(SELECT argMax(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(person.properties, 'name'), ''), 'null'), '^"|"$', ''), person.version) AS properties___name,
person.id AS id
FROM person
WHERE and(equals(person.team_id, 99999), in(id,
(SELECT where_optimization.id AS id
FROM person AS where_optimization
WHERE and(equals(where_optimization.team_id, 99999), ifNull(equals(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(where_optimization.properties, 'name'), ''), 'null'), '^"|"$', ''), 'test'), 0)))))
GROUP BY person.id
HAVING and(ifNull(equals(argMax(person.is_deleted, person.version), 0), 0), ifNull(less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))), 0))) AS persons
WHERE ifNull(equals(persons.properties___name, 'test'), 0)
ORDER BY persons.id ASC
LIMIT 1000000000 SETTINGS optimize_aggregation_in_order=1,
join_algorithm='auto'))
UNION DISTINCT (
(SELECT source.id AS id
FROM
(SELECT actor_id AS actor_id,
count() AS event_count,
groupUniqArray(distinct_id) AS event_distinct_ids,
actor_id AS id
FROM
(SELECT if(not(empty(e__override.distinct_id)), e__override.person_id, e.person_id) AS actor_id,
toTimeZone(e.timestamp, 'UTC') AS timestamp,
e.uuid AS uuid,
e.distinct_id AS distinct_id
FROM events AS e
LEFT OUTER JOIN
(SELECT argMax(person_distinct_id_overrides.person_id, person_distinct_id_overrides.version) AS person_id,
person_distinct_id_overrides.distinct_id AS distinct_id
FROM person_distinct_id_overrides
WHERE equals(person_distinct_id_overrides.team_id, 99999)
GROUP BY person_distinct_id_overrides.distinct_id
HAVING ifNull(equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0), 0) SETTINGS optimize_aggregation_in_order=1) AS e__override ON equals(e.distinct_id, e__override.distinct_id)
WHERE and(equals(e.team_id, 99999), greaterOrEquals(timestamp, toDateTime64('explicit_redacted_timestamp', 6, 'UTC')), lessOrEquals(timestamp, toDateTime64('today', 6, 'UTC')), equals(e.event, '$pageview')))
GROUP BY actor_id) AS source
ORDER BY source.id ASC
LIMIT 1000000000 SETTINGS optimize_aggregation_in_order=1,
join_algorithm='auto')) SETTINGS readonly=2,
max_execution_time=600,
allow_experimental_object_type=1,
format_csv_allow_double_quotes=0,
max_ast_elements=4000000,
max_expanded_ast_elements=4000000,
max_bytes_before_external_group_by=0,
transform_null_in=1,
optimize_min_equality_disjunction_chain_length=4294967295,
allow_experimental_join_condition=1
'''
# ---
# name: TestCohortQuery.test_cohort_filter_with_extra.15
'''

SELECT if(behavior_query.person_id = '00000000-0000-0000-0000-000000000000', person.person_id, behavior_query.person_id) AS id
FROM
(SELECT if(not(empty(pdi.distinct_id)), pdi.person_id, e.person_id) AS person_id,
countIf(timestamp > now() - INTERVAL 1 week
AND timestamp < now()
AND event = '$pageview'
AND 1=1) > 0 AS performed_event_condition_None_level_level_0_level_1_level_0_0
FROM events e
LEFT OUTER JOIN
(SELECT distinct_id,
argMax(person_id, version) as person_id
FROM person_distinct_id2
WHERE team_id = 99999
GROUP BY distinct_id
HAVING argMax(is_deleted, version) = 0) AS pdi ON e.distinct_id = pdi.distinct_id
WHERE team_id = 99999
AND event IN ['$pageview']
AND timestamp <= now()
AND timestamp >= now() - INTERVAL 1 week
GROUP BY person_id) behavior_query
FULL OUTER JOIN
(SELECT *,
id AS person_id
FROM
(SELECT id,
argMax(properties, version) as person_props
FROM person
WHERE team_id = 99999
GROUP BY id
HAVING max(is_deleted) = 0 SETTINGS optimize_aggregation_in_order = 1)) person ON person.person_id = behavior_query.person_id
WHERE 1 = 1
AND ((((has(['test'], replaceRegexpAll(JSONExtractRaw(person_props, 'name'), '^"|"$', ''))))
OR ((coalesce(performed_event_condition_None_level_level_0_level_1_level_0_0, false))))) SETTINGS optimize_aggregation_in_order = 1,
join_algorithm = 'auto'
'''
# ---
# name: TestCohortQuery.test_cohort_filter_with_extra.2
'''

Expand Down
1 change: 1 addition & 0 deletions ee/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES = get_from_env(
"PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES", 10.0, type_cast=float
)
TEMPORAL_TASK_TIMEOUT_MINUTES = PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES * 1.5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this supposed to cover?

It looks like this is used for deliver_subscription_report_activity which has to wait to generate all the assets?

In the worst case, what happens if one of those asset generations takes the full 10 minutes and fails and has to be retried?

Copy link
Contributor Author

@andyzzhao andyzzhao Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is used for deliver_subscription_report_activity which has to wait to generate all the assets?

Correct

If one asset generation takes the full time, we've configured temporal to retry 2 more times. However, on each retry, we regenerate all the assets (potentially up to 6) again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool yeah, I guess we don't want to do that eh? Maybe better to use temporal for the asset generation workflows too so we can run them more independently. Sgtm for now.


# Assistant
ANTHROPIC_API_KEY = get_from_env("ANTHROPIC_API_KEY", "")
Expand Down
88 changes: 81 additions & 7 deletions ee/tasks/subscriptions/subscription_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import structlog
from celery import chain
from prometheus_client import Histogram
from prometheus_client import Counter, Histogram

from posthog.models.exported_asset import ExportedAsset
from posthog.models.insight import Insight
Expand All @@ -21,13 +21,39 @@
UTM_TAGS_BASE = "utm_source=posthog&utm_campaign=subscription_report"
DEFAULT_MAX_ASSET_COUNT = 6


def _get_failed_asset_info(assets: list[ExportedAsset], resource: Union[Subscription, SharingConfiguration]) -> dict:
failed_assets = [a for a in assets if not a.content and not a.content_location]
failed_insight_ids = [a.insight_id for a in failed_assets if a.insight_id]
failed_insight_urls = [
f"/project/{resource.team_id}/insights/{a.insight.short_id}"
for a in failed_assets
if a.insight and hasattr(a.insight, "short_id")
]

dashboard_url = f"/project/{resource.team_id}/dashboard/{resource.dashboard_id}" if resource.dashboard else None

return {
"failed_asset_count": len(failed_assets),
"failed_insight_ids": failed_insight_ids,
"failed_insight_urls": failed_insight_urls,
"dashboard_url": dashboard_url,
}


SUBSCRIPTION_ASSET_GENERATION_TIMER = Histogram(
"subscription_asset_generation_duration_seconds",
"Time spent generating assets for a subscription",
labelnames=["execution_path"],
buckets=(1, 5, 10, 30, 60, 120, 240, 300, 360, 420, 480, 540, 600, float("inf")),
)

SUBSCRIPTION_ASSET_GENERATION_TIMEOUT_COUNTER = Counter(
"subscription_asset_generation_timeout_total",
"Number of times asset generation timed out during subscription delivery",
labelnames=["execution_path"],
)


def generate_assets(
resource: Union[Subscription, SharingConfiguration],
Expand Down Expand Up @@ -123,24 +149,72 @@ async def generate_assets_async(
# Create async tasks for each asset export
async def export_single_asset(asset: ExportedAsset) -> None:
try:
logger.info("generate_assets_async.exporting_asset", asset_id=asset.id)
logger.info(
"generate_assets_async.exporting_asset",
asset_id=asset.id,
insight_id=asset.insight_id,
subscription_id=getattr(resource, "id", None),
team_id=resource.team_id,
)
await database_sync_to_async(exporter.export_asset_direct, thread_sensitive=False)(asset)
logger.info("generate_assets_async.asset_exported", asset_id=asset.id)
logger.info(
"generate_assets_async.asset_exported",
asset_id=asset.id,
insight_id=asset.insight_id,
subscription_id=getattr(resource, "id", None),
team_id=resource.team_id,
)
except Exception as e:
logger.error(
"generate_assets_async.export_failed",
asset_id=asset.id,
insight_id=asset.insight_id,
subscription_id=getattr(resource, "id", None),
error=str(e),
exc_info=True,
team_id=resource.team_id,
)
# Save the exception but continue with other assets
asset.exception = str(e)
await database_sync_to_async(asset.save, thread_sensitive=False)()

# Run all exports concurrently
logger.info("generate_assets_async.starting_exports", asset_count=len(assets))
await asyncio.gather(*[export_single_asset(asset) for asset in assets])
logger.info("generate_assets_async.exports_complete", asset_count=len(assets))
# Reserve buffer time for email/Slack delivery after exports
buffer_seconds = 120 # 2 minutes
export_timeout_seconds = (settings.TEMPORAL_TASK_TIMEOUT_MINUTES * 60) - buffer_seconds

subscription_id = getattr(resource, "id", None)

logger.info(
"generate_assets_async.starting_exports",
asset_count=len(assets),
subscription_id=subscription_id,
team_id=resource.team_id,
)

try:
await asyncio.wait_for(
asyncio.gather(*[export_single_asset(asset) for asset in assets]), timeout=export_timeout_seconds
)
logger.info(
"generate_assets_async.exports_complete",
asset_count=len(assets),
subscription_id=subscription_id,
team_id=resource.team_id,
)
except TimeoutError:
SUBSCRIPTION_ASSET_GENERATION_TIMEOUT_COUNTER.labels(execution_path="temporal").inc()

# Get failure info for logging
failure_info = _get_failed_asset_info(assets, resource)

logger.warning(
"generate_assets_async.exports_timeout",
asset_count=len(assets),
subscription_id=subscription_id,
dashboard_id=resource.dashboard_id if resource.dashboard else None,
team_id=resource.team_id,
**failure_info,
)
# Continue with partial results - some assets may not have content

return insights, assets
110 changes: 110 additions & 0 deletions ee/tasks/test/subscriptions/test_generate_assets_async.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
from unittest.mock import MagicMock, patch

import pytest_asyncio
from asgiref.sync import sync_to_async
Expand All @@ -18,6 +19,18 @@
pytestmark = [pytest.mark.asyncio, pytest.mark.django_db(transaction=True)]


@pytest_asyncio.fixture(autouse=True)
Copy link
Contributor Author

@andyzzhao andyzzhao Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mocking this because export_asset_direct makes the tests run for 2 min vs 14s with the mock.

============================= test session starts ==============================
PASSED [ 12%]
PASSED [ 25%]
ee/tasks/test/subscriptions/test_generate_assets_async.py::test_generate_assets_async_excludes_deleted_insights PASSED [ 37%]
ee/tasks/test/subscriptions/test_generate_assets_async.py::test_generate_assets_async_raises_if_missing_resource PASSED [ 50%]
PASSED [ 62%]
ee/tasks/test/subscriptions/test_generate_assets_async.py::test_generate_assets_async_handles_empty_dashboard PASSED [ 75%]
PASSED [ 87%]
PASSED [100%]
=============================== inline snapshot ================================
======================== 8 passed in 160.56s (0:02:40) =========================

we already have tests for export_asset which calls export_asset_direct in test_exporter.py

async def mock_export_asset():
"""Mock export_asset_direct to avoid launching Chrome browser in tests."""

def set_content(asset: ExportedAsset) -> None:
asset.content = b"fake image data"
asset.save()

with patch("ee.tasks.subscriptions.subscription_utils.exporter.export_asset_direct", side_effect=set_content):
yield


@pytest_asyncio.fixture
async def organization():
"""Create a test organization."""
Expand Down Expand Up @@ -200,3 +213,100 @@ async def test_async_foreign_key_access_with_real_subscription(team, user, dashb
pytest.fail(
"deliver_subscription_report_async raised SynchronousOnlyOperation - foreign key access not properly handled in async context"
)


@patch("ee.tasks.subscriptions.subscription_utils.exporter.export_asset_direct")
async def test_async_generate_assets_basic(mock_export: MagicMock, team, user) -> None:
def export_success(asset: ExportedAsset) -> None:
asset.content = b"fake image data"
asset.save()

mock_export.side_effect = export_success

dashboard = await sync_to_async(Dashboard.objects.create)(team=team, name="test dashboard", created_by=user)

for i in range(3):
insight = await sync_to_async(Insight.objects.create)(team=team, short_id=f"insight-{i}", name=f"Insight {i}")
await sync_to_async(DashboardTile.objects.create)(dashboard=dashboard, insight=insight)

subscription = await sync_to_async(create_subscription)(team=team, dashboard=dashboard, created_by=user)

subscription = await sync_to_async(
lambda: type(subscription)
.objects.select_related("team", "dashboard", "insight", "created_by")
.get(id=subscription.id)
)()

insights, assets = await generate_assets_async(subscription)

assert len(insights) == 3
assert len(assets) == 3
assert mock_export.call_count == 3
assert all(asset.content for asset in assets)


@patch("ee.tasks.subscriptions.subscription_utils.asyncio.wait_for")
@patch("posthog.tasks.exporter.export_asset_direct")
async def test_async_generate_assets_timeout_continues_with_partial_results(
mock_export: MagicMock, mock_wait_for: MagicMock, team, user
) -> None:
mock_export.return_value = None
mock_wait_for.side_effect = TimeoutError()

dashboard = await sync_to_async(Dashboard.objects.create)(team=team, name="test dashboard", created_by=user)

for i in range(3):
insight = await sync_to_async(Insight.objects.create)(team=team, short_id=f"insight-{i}", name=f"Insight {i}")
await sync_to_async(DashboardTile.objects.create)(dashboard=dashboard, insight=insight)

subscription = await sync_to_async(create_subscription)(team=team, dashboard=dashboard, created_by=user)

subscription = await sync_to_async(
lambda: type(subscription)
.objects.select_related("team", "dashboard", "insight", "created_by")
.get(id=subscription.id)
)()

insights, assets = await generate_assets_async(subscription)

assert len(insights) == 3
assert len(assets) == 3
assert all(asset.content is None and asset.content_location is None for asset in assets)
assert mock_wait_for.called


@patch("posthog.tasks.exporter.export_asset_direct")
async def test_async_generate_assets_partial_success(mock_export: MagicMock, team, user) -> None:
call_count = 0

def export_with_partial_success(asset: ExportedAsset) -> None:
nonlocal call_count
call_count += 1
if call_count <= 2:
asset.content = b"fake image data"
asset.save()

mock_export.side_effect = export_with_partial_success

dashboard = await sync_to_async(Dashboard.objects.create)(team=team, name="test dashboard", created_by=user)

for i in range(3):
insight = await sync_to_async(Insight.objects.create)(team=team, short_id=f"insight-{i}", name=f"Insight {i}")
await sync_to_async(DashboardTile.objects.create)(dashboard=dashboard, insight=insight)

subscription = await sync_to_async(create_subscription)(team=team, dashboard=dashboard, created_by=user)

subscription = await sync_to_async(
lambda: type(subscription)
.objects.select_related("team", "dashboard", "insight", "created_by")
.get(id=subscription.id)
)()

insights, assets = await generate_assets_async(subscription)

assert len(insights) == 3
assert len(assets) == 3
assets_with_content = [a for a in assets if a.content or a.content_location]
assets_without_content = [a for a in assets if not a.content and not a.content_location]
assert len(assets_with_content) == 2
assert len(assets_without_content) == 1
1 change: 1 addition & 0 deletions posthog/clickhouse/query_tagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class QueryTags(BaseModel):
workload: Optional[str] = None # enum connection.Workload
dashboard_id: Optional[int] = None
insight_id: Optional[int] = None
exported_asset_id: Optional[int] = None
chargeable: Optional[int] = None
request_name: Optional[str] = None
name: Optional[str] = None
Expand Down
Loading
Loading