diff --git a/ee/clickhouse/queries/test/__snapshots__/test_cohort_query.ambr b/ee/clickhouse/queries/test/__snapshots__/test_cohort_query.ambr index 3b268d60a2ab4..69d66b4c12a66 100644 --- a/ee/clickhouse/queries/test/__snapshots__/test_cohort_query.ambr +++ b/ee/clickhouse/queries/test/__snapshots__/test_cohort_query.ambr @@ -534,6 +534,99 @@ join_algorithm = 'auto' ''' # --- +# name: TestCohortQuery.test_cohort_filter_with_extra.14 + ''' + ( + (SELECT persons.id AS id + FROM + (SELECT argMax(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(person.properties, 'name'), ''), 'null'), '^"|"$', ''), person.version) AS properties___name, + person.id AS id + FROM person + WHERE and(equals(person.team_id, 99999), in(id, + (SELECT where_optimization.id AS id + FROM person AS where_optimization + WHERE and(equals(where_optimization.team_id, 99999), ifNull(equals(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(where_optimization.properties, 'name'), ''), 'null'), '^"|"$', ''), 'test'), 0))))) + GROUP BY person.id + HAVING and(ifNull(equals(argMax(person.is_deleted, person.version), 0), 0), ifNull(less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))), 0))) AS persons + WHERE ifNull(equals(persons.properties___name, 'test'), 0) + ORDER BY persons.id ASC + LIMIT 1000000000 SETTINGS optimize_aggregation_in_order=1, + join_algorithm='auto')) + UNION DISTINCT ( + (SELECT source.id AS id + FROM + (SELECT actor_id AS actor_id, + count() AS event_count, + groupUniqArray(distinct_id) AS event_distinct_ids, + actor_id AS id + FROM + (SELECT if(not(empty(e__override.distinct_id)), e__override.person_id, e.person_id) AS actor_id, + toTimeZone(e.timestamp, 'UTC') AS timestamp, + e.uuid AS uuid, + e.distinct_id AS distinct_id + FROM events AS e + LEFT OUTER JOIN + (SELECT argMax(person_distinct_id_overrides.person_id, person_distinct_id_overrides.version) AS person_id, + person_distinct_id_overrides.distinct_id AS distinct_id + FROM person_distinct_id_overrides + WHERE equals(person_distinct_id_overrides.team_id, 99999) + GROUP BY person_distinct_id_overrides.distinct_id + HAVING ifNull(equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0), 0) SETTINGS optimize_aggregation_in_order=1) AS e__override ON equals(e.distinct_id, e__override.distinct_id) + WHERE and(equals(e.team_id, 99999), greaterOrEquals(timestamp, toDateTime64('explicit_redacted_timestamp', 6, 'UTC')), lessOrEquals(timestamp, toDateTime64('today', 6, 'UTC')), equals(e.event, '$pageview'))) + GROUP BY actor_id) AS source + ORDER BY source.id ASC + LIMIT 1000000000 SETTINGS optimize_aggregation_in_order=1, + join_algorithm='auto')) SETTINGS readonly=2, + max_execution_time=600, + allow_experimental_object_type=1, + format_csv_allow_double_quotes=0, + max_ast_elements=4000000, + max_expanded_ast_elements=4000000, + max_bytes_before_external_group_by=0, + transform_null_in=1, + optimize_min_equality_disjunction_chain_length=4294967295, + allow_experimental_join_condition=1 + ''' +# --- +# name: TestCohortQuery.test_cohort_filter_with_extra.15 + ''' + + SELECT if(behavior_query.person_id = '00000000-0000-0000-0000-000000000000', person.person_id, behavior_query.person_id) AS id + FROM + (SELECT if(not(empty(pdi.distinct_id)), pdi.person_id, e.person_id) AS person_id, + countIf(timestamp > now() - INTERVAL 1 week + AND timestamp < now() + AND event = '$pageview' + AND 1=1) > 0 AS performed_event_condition_None_level_level_0_level_1_level_0_0 + FROM events e + LEFT OUTER JOIN + (SELECT distinct_id, + argMax(person_id, version) as person_id + FROM person_distinct_id2 + WHERE team_id = 99999 + GROUP BY distinct_id + HAVING argMax(is_deleted, version) = 0) AS pdi ON e.distinct_id = pdi.distinct_id + WHERE team_id = 99999 + AND event IN ['$pageview'] + AND timestamp <= now() + AND timestamp >= now() - INTERVAL 1 week + GROUP BY person_id) behavior_query + FULL OUTER JOIN + (SELECT *, + id AS person_id + FROM + (SELECT id, + argMax(properties, version) as person_props + FROM person + WHERE team_id = 99999 + GROUP BY id + HAVING max(is_deleted) = 0 SETTINGS optimize_aggregation_in_order = 1)) person ON person.person_id = behavior_query.person_id + WHERE 1 = 1 + AND ((((has(['test'], replaceRegexpAll(JSONExtractRaw(person_props, 'name'), '^"|"$', '')))) + OR ((coalesce(performed_event_condition_None_level_level_0_level_1_level_0_0, false))))) SETTINGS optimize_aggregation_in_order = 1, + join_algorithm = 'auto' + ''' +# --- # name: TestCohortQuery.test_cohort_filter_with_extra.2 ''' diff --git a/ee/settings.py b/ee/settings.py index 7e711e78296c1..1455635dd227b 100644 --- a/ee/settings.py +++ b/ee/settings.py @@ -80,6 +80,7 @@ PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES = get_from_env( "PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES", 10.0, type_cast=float ) +TEMPORAL_TASK_TIMEOUT_MINUTES = PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES * 1.5 # Assistant ANTHROPIC_API_KEY = get_from_env("ANTHROPIC_API_KEY", "") diff --git a/ee/tasks/subscriptions/subscription_utils.py b/ee/tasks/subscriptions/subscription_utils.py index c4ff5f379b1aa..dd56880199a8e 100644 --- a/ee/tasks/subscriptions/subscription_utils.py +++ b/ee/tasks/subscriptions/subscription_utils.py @@ -6,7 +6,7 @@ import structlog from celery import chain -from prometheus_client import Histogram +from prometheus_client import Counter, Histogram from posthog.models.exported_asset import ExportedAsset from posthog.models.insight import Insight @@ -21,6 +21,26 @@ UTM_TAGS_BASE = "utm_source=posthog&utm_campaign=subscription_report" DEFAULT_MAX_ASSET_COUNT = 6 + +def _get_failed_asset_info(assets: list[ExportedAsset], resource: Union[Subscription, SharingConfiguration]) -> dict: + failed_assets = [a for a in assets if not a.content and not a.content_location] + failed_insight_ids = [a.insight_id for a in failed_assets if a.insight_id] + failed_insight_urls = [ + f"/project/{resource.team_id}/insights/{a.insight.short_id}" + for a in failed_assets + if a.insight and hasattr(a.insight, "short_id") + ] + + dashboard_url = f"/project/{resource.team_id}/dashboard/{resource.dashboard_id}" if resource.dashboard else None + + return { + "failed_asset_count": len(failed_assets), + "failed_insight_ids": failed_insight_ids, + "failed_insight_urls": failed_insight_urls, + "dashboard_url": dashboard_url, + } + + SUBSCRIPTION_ASSET_GENERATION_TIMER = Histogram( "subscription_asset_generation_duration_seconds", "Time spent generating assets for a subscription", @@ -28,6 +48,12 @@ buckets=(1, 5, 10, 30, 60, 120, 240, 300, 360, 420, 480, 540, 600, float("inf")), ) +SUBSCRIPTION_ASSET_GENERATION_TIMEOUT_COUNTER = Counter( + "subscription_asset_generation_timeout_total", + "Number of times asset generation timed out during subscription delivery", + labelnames=["execution_path"], +) + def generate_assets( resource: Union[Subscription, SharingConfiguration], @@ -123,24 +149,72 @@ async def generate_assets_async( # Create async tasks for each asset export async def export_single_asset(asset: ExportedAsset) -> None: try: - logger.info("generate_assets_async.exporting_asset", asset_id=asset.id) + logger.info( + "generate_assets_async.exporting_asset", + asset_id=asset.id, + insight_id=asset.insight_id, + subscription_id=getattr(resource, "id", None), + team_id=resource.team_id, + ) await database_sync_to_async(exporter.export_asset_direct, thread_sensitive=False)(asset) - logger.info("generate_assets_async.asset_exported", asset_id=asset.id) + logger.info( + "generate_assets_async.asset_exported", + asset_id=asset.id, + insight_id=asset.insight_id, + subscription_id=getattr(resource, "id", None), + team_id=resource.team_id, + ) except Exception as e: logger.error( "generate_assets_async.export_failed", asset_id=asset.id, + insight_id=asset.insight_id, subscription_id=getattr(resource, "id", None), error=str(e), exc_info=True, + team_id=resource.team_id, ) # Save the exception but continue with other assets asset.exception = str(e) await database_sync_to_async(asset.save, thread_sensitive=False)() - # Run all exports concurrently - logger.info("generate_assets_async.starting_exports", asset_count=len(assets)) - await asyncio.gather(*[export_single_asset(asset) for asset in assets]) - logger.info("generate_assets_async.exports_complete", asset_count=len(assets)) + # Reserve buffer time for email/Slack delivery after exports + buffer_seconds = 120 # 2 minutes + export_timeout_seconds = (settings.TEMPORAL_TASK_TIMEOUT_MINUTES * 60) - buffer_seconds + + subscription_id = getattr(resource, "id", None) + + logger.info( + "generate_assets_async.starting_exports", + asset_count=len(assets), + subscription_id=subscription_id, + team_id=resource.team_id, + ) + + try: + await asyncio.wait_for( + asyncio.gather(*[export_single_asset(asset) for asset in assets]), timeout=export_timeout_seconds + ) + logger.info( + "generate_assets_async.exports_complete", + asset_count=len(assets), + subscription_id=subscription_id, + team_id=resource.team_id, + ) + except TimeoutError: + SUBSCRIPTION_ASSET_GENERATION_TIMEOUT_COUNTER.labels(execution_path="temporal").inc() + + # Get failure info for logging + failure_info = _get_failed_asset_info(assets, resource) + + logger.warning( + "generate_assets_async.exports_timeout", + asset_count=len(assets), + subscription_id=subscription_id, + dashboard_id=resource.dashboard_id if resource.dashboard else None, + team_id=resource.team_id, + **failure_info, + ) + # Continue with partial results - some assets may not have content return insights, assets diff --git a/ee/tasks/test/subscriptions/test_generate_assets_async.py b/ee/tasks/test/subscriptions/test_generate_assets_async.py index 0190389cca398..6e61da7e47fd6 100644 --- a/ee/tasks/test/subscriptions/test_generate_assets_async.py +++ b/ee/tasks/test/subscriptions/test_generate_assets_async.py @@ -1,4 +1,5 @@ import pytest +from unittest.mock import MagicMock, patch import pytest_asyncio from asgiref.sync import sync_to_async @@ -18,6 +19,18 @@ pytestmark = [pytest.mark.asyncio, pytest.mark.django_db(transaction=True)] +@pytest_asyncio.fixture(autouse=True) +async def mock_export_asset(): + """Mock export_asset_direct to avoid launching Chrome browser in tests.""" + + def set_content(asset: ExportedAsset) -> None: + asset.content = b"fake image data" + asset.save() + + with patch("ee.tasks.subscriptions.subscription_utils.exporter.export_asset_direct", side_effect=set_content): + yield + + @pytest_asyncio.fixture async def organization(): """Create a test organization.""" @@ -200,3 +213,100 @@ async def test_async_foreign_key_access_with_real_subscription(team, user, dashb pytest.fail( "deliver_subscription_report_async raised SynchronousOnlyOperation - foreign key access not properly handled in async context" ) + + +@patch("ee.tasks.subscriptions.subscription_utils.exporter.export_asset_direct") +async def test_async_generate_assets_basic(mock_export: MagicMock, team, user) -> None: + def export_success(asset: ExportedAsset) -> None: + asset.content = b"fake image data" + asset.save() + + mock_export.side_effect = export_success + + dashboard = await sync_to_async(Dashboard.objects.create)(team=team, name="test dashboard", created_by=user) + + for i in range(3): + insight = await sync_to_async(Insight.objects.create)(team=team, short_id=f"insight-{i}", name=f"Insight {i}") + await sync_to_async(DashboardTile.objects.create)(dashboard=dashboard, insight=insight) + + subscription = await sync_to_async(create_subscription)(team=team, dashboard=dashboard, created_by=user) + + subscription = await sync_to_async( + lambda: type(subscription) + .objects.select_related("team", "dashboard", "insight", "created_by") + .get(id=subscription.id) + )() + + insights, assets = await generate_assets_async(subscription) + + assert len(insights) == 3 + assert len(assets) == 3 + assert mock_export.call_count == 3 + assert all(asset.content for asset in assets) + + +@patch("ee.tasks.subscriptions.subscription_utils.asyncio.wait_for") +@patch("posthog.tasks.exporter.export_asset_direct") +async def test_async_generate_assets_timeout_continues_with_partial_results( + mock_export: MagicMock, mock_wait_for: MagicMock, team, user +) -> None: + mock_export.return_value = None + mock_wait_for.side_effect = TimeoutError() + + dashboard = await sync_to_async(Dashboard.objects.create)(team=team, name="test dashboard", created_by=user) + + for i in range(3): + insight = await sync_to_async(Insight.objects.create)(team=team, short_id=f"insight-{i}", name=f"Insight {i}") + await sync_to_async(DashboardTile.objects.create)(dashboard=dashboard, insight=insight) + + subscription = await sync_to_async(create_subscription)(team=team, dashboard=dashboard, created_by=user) + + subscription = await sync_to_async( + lambda: type(subscription) + .objects.select_related("team", "dashboard", "insight", "created_by") + .get(id=subscription.id) + )() + + insights, assets = await generate_assets_async(subscription) + + assert len(insights) == 3 + assert len(assets) == 3 + assert all(asset.content is None and asset.content_location is None for asset in assets) + assert mock_wait_for.called + + +@patch("posthog.tasks.exporter.export_asset_direct") +async def test_async_generate_assets_partial_success(mock_export: MagicMock, team, user) -> None: + call_count = 0 + + def export_with_partial_success(asset: ExportedAsset) -> None: + nonlocal call_count + call_count += 1 + if call_count <= 2: + asset.content = b"fake image data" + asset.save() + + mock_export.side_effect = export_with_partial_success + + dashboard = await sync_to_async(Dashboard.objects.create)(team=team, name="test dashboard", created_by=user) + + for i in range(3): + insight = await sync_to_async(Insight.objects.create)(team=team, short_id=f"insight-{i}", name=f"Insight {i}") + await sync_to_async(DashboardTile.objects.create)(dashboard=dashboard, insight=insight) + + subscription = await sync_to_async(create_subscription)(team=team, dashboard=dashboard, created_by=user) + + subscription = await sync_to_async( + lambda: type(subscription) + .objects.select_related("team", "dashboard", "insight", "created_by") + .get(id=subscription.id) + )() + + insights, assets = await generate_assets_async(subscription) + + assert len(insights) == 3 + assert len(assets) == 3 + assets_with_content = [a for a in assets if a.content or a.content_location] + assets_without_content = [a for a in assets if not a.content and not a.content_location] + assert len(assets_with_content) == 2 + assert len(assets_without_content) == 1 diff --git a/posthog/clickhouse/query_tagging.py b/posthog/clickhouse/query_tagging.py index 05d3ef1da100f..07ad982057d6a 100644 --- a/posthog/clickhouse/query_tagging.py +++ b/posthog/clickhouse/query_tagging.py @@ -103,6 +103,7 @@ class QueryTags(BaseModel): workload: Optional[str] = None # enum connection.Workload dashboard_id: Optional[int] = None insight_id: Optional[int] = None + exported_asset_id: Optional[int] = None chargeable: Optional[int] = None request_name: Optional[str] = None name: Optional[str] = None diff --git a/posthog/tasks/exporter.py b/posthog/tasks/exporter.py index 8bfc3fc77f3f8..a019d17d52de1 100644 --- a/posthog/tasks/exporter.py +++ b/posthog/tasks/exporter.py @@ -84,6 +84,17 @@ def export_asset_direct(exported_asset: ExportedAsset, limit: Optional[int] = No if current_task and current_task.request and current_task.request.id else None, } + + logger.info( + "export_asset.starting", + exported_asset_id=exported_asset.id, + team_id=team.id, + ) + + from posthog.clickhouse.query_tagging import tag_queries + + tag_queries(exported_asset_id=exported_asset.id) + posthoganalytics.capture( distinct_id=distinct_id, event="export started", @@ -99,6 +110,11 @@ def export_asset_direct(exported_asset: ExportedAsset, limit: Optional[int] = No image_exporter.export_image(exported_asset) EXPORT_QUEUED_COUNTER.labels(type="image").inc() + logger.info( + "export_asset.succeeded", + exported_asset_id=exported_asset.id, + team_id=team.id, + ) posthoganalytics.capture( distinct_id=distinct_id, event="export succeeded", @@ -111,6 +127,13 @@ def export_asset_direct(exported_asset: ExportedAsset, limit: Optional[int] = No except Exception as e: is_retriable = isinstance(e, EXCEPTIONS_TO_RETRY) + logger.exception( + "export_asset.error", + exported_asset_id=exported_asset.id, + error=str(e), + might_retry=is_retriable, + team_id=team.id, + ) posthoganalytics.capture( distinct_id=distinct_id, event="export failed", diff --git a/posthog/temporal/subscriptions/subscription_scheduling_workflow.py b/posthog/temporal/subscriptions/subscription_scheduling_workflow.py index 2b3ce7a0133a7..f86eee220bd29 100644 --- a/posthog/temporal/subscriptions/subscription_scheduling_workflow.py +++ b/posthog/temporal/subscriptions/subscription_scheduling_workflow.py @@ -168,9 +168,7 @@ async def run(self, inputs: ScheduleAllSubscriptionsWorkflowInputs) -> None: task = temporalio.workflow.execute_activity( deliver_subscription_report_activity, DeliverSubscriptionReportActivityInputs(subscription_id=sub_id), - start_to_close_timeout=dt.timedelta( - minutes=settings.PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES * 1.5 - ), + start_to_close_timeout=dt.timedelta(minutes=settings.TEMPORAL_TASK_TIMEOUT_MINUTES), retry_policy=temporalio.common.RetryPolicy( initial_interval=dt.timedelta(seconds=10), maximum_interval=dt.timedelta(minutes=5), @@ -196,7 +194,7 @@ async def run(self, inputs: DeliverSubscriptionReportActivityInputs) -> None: await temporalio.workflow.execute_activity( deliver_subscription_report_activity, inputs, - start_to_close_timeout=dt.timedelta(minutes=settings.PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES * 1.5), + start_to_close_timeout=dt.timedelta(minutes=settings.TEMPORAL_TASK_TIMEOUT_MINUTES), retry_policy=temporalio.common.RetryPolicy( initial_interval=dt.timedelta(seconds=5), maximum_interval=dt.timedelta(minutes=2),