Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
8386889
feat(array): add sandbox snapshots
joshsny Sep 30, 2025
bf71939
Merge branch 'master' into array/sandbox-snapshots
joshsny Sep 30, 2025
4c5fe76
make integration nullable
joshsny Sep 30, 2025
2065db5
Merge branch 'array/sandbox-snapshots' of https://github.com/posthog/…
joshsny Sep 30, 2025
bb8a824
fix lint issues
joshsny Sep 30, 2025
66c2e92
(wip) add sandbox related activities
joshsny Sep 30, 2025
509e4e4
wip splitting out repo setup in temporal
joshsny Sep 30, 2025
a471b21
wip moving to sandbox flow
joshsny Sep 30, 2025
76b0b0a
wip - new temporal workflow
joshsny Oct 1, 2025
5c0c756
wip testing temporal flow
joshsny Oct 1, 2025
029bba8
wip activities tests
joshsny Oct 1, 2025
7546364
wip get sandbox for setup tests
joshsny Oct 1, 2025
000b55e
tests for get sandbox for setup
joshsny Oct 1, 2025
990fcf7
add default ttl, fix clone repo output, setup repo activity tests
joshsny Oct 1, 2025
dacaaa5
add tests for setup repository
joshsny Oct 1, 2025
1a68c7e
add more tests, add snapshot deletion functionality
joshsny Oct 1, 2025
844f118
add tests for cleanup and task execution
joshsny Oct 1, 2025
d02efa2
wip tests for creating sandbox from a snapshot
joshsny Oct 2, 2025
da488d5
add some workflow tests, update output of workflow
joshsny Oct 2, 2025
392be71
add support for sandbox environment flow based off a falg
joshsny Oct 2, 2025
2b282ef
add asyncify util for sync code in temporal, use it in some activities
joshsny Oct 2, 2025
5725649
add sandbox metadata
joshsny Oct 2, 2025
39a442b
add task for fetching latest github token and injecting
joshsny Oct 2, 2025
3037fea
add steps for injecting and cleaning up personal api key, fix issues …
joshsny Oct 3, 2025
ca85a5d
update tests for injecting api key
joshsny Oct 3, 2025
97fe254
update conftest
joshsny Oct 3, 2025
2ab71f9
update workflow tests, update test snapshots
joshsny Oct 3, 2025
b2bc69f
drop sandbox agent tests as they are covered elsewhere
joshsny Oct 3, 2025
3cf3c48
full workflow
joshsny Oct 3, 2025
b6cb346
Merge branch 'master' into array/sandbox-snapshot-activities
joshsny Oct 4, 2025
2bdd4b1
add better error handling and observability
joshsny Oct 4, 2025
6fdc34c
more activity logging
joshsny Oct 4, 2025
8c28c0e
merge migrations
joshsny Oct 4, 2025
e5d4eec
update exceptions to be non retriable for fatal errors, add track wo…
joshsny Oct 4, 2025
62ddb7e
drop index migration
joshsny Oct 4, 2025
bd756ef
update tests to use accurate errors
joshsny Oct 4, 2025
5a817a2
add temporal tests
joshsny Oct 4, 2025
4c07fc4
type fixes
joshsny Oct 4, 2025
7ef6322
fix test errors
joshsny Oct 4, 2025
22bc092
type fixes, test cleanup
joshsny Oct 4, 2025
d9bbdf5
feat: add created_by to task
joshsny Oct 4, 2025
ba9b2da
please ci
joshsny Oct 4, 2025
3557103
Merge branch 'master' into array/sandbox-snapshot-activities
joshsny Oct 4, 2025
f0a948e
suggested fixes
joshsny Oct 7, 2025
a3d7b08
fix task details
joshsny Oct 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/ci-backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ jobs:
backend_files: ${{ steps.filter.outputs.backend_files }}
migrations: ${{ steps.filter.outputs.migrations }}
migrations_files: ${{ steps.filter.outputs.migrations_files }}
tasks_temporal: ${{ steps.filter.outputs.tasks_temporal }}
steps:
# For pull requests it's not necessary to checkout the code, but we
# also want this to run on master so we need to checkout
Expand Down Expand Up @@ -96,6 +97,8 @@ jobs:
- 'posthog/migrations/*.py'
- 'products/*/backend/migrations/*.py'
- 'products/*/migrations/*.py' # Legacy structure
tasks_temporal:
- 'products/tasks/backend/temporal/**/*'

check-migrations:
needs: [changes]
Expand Down Expand Up @@ -722,9 +725,10 @@ jobs:
shell: bash
env:
AWS_S3_ALLOW_UNSAFE_RENAME: 'true'
RUNLOOP_API_KEY: ${{ needs.changes.outputs.tasks_temporal == 'true' && secrets.RUNLOOP_API_KEY || '' }}
run: |
set +e
pytest posthog/temporal products/batch_exports/backend/tests/temporal -m "not async_migrations" \
pytest posthog/temporal products/batch_exports/backend/tests/temporal products/tasks/backend/temporal -m "not async_migrations" \
--splits ${{ matrix.concurrency }} --group ${{ matrix.group }} \
--durations=100 --durations-min=1.0 --store-durations \
--splitting-algorithm=duration_based_chunks \
Expand Down
33 changes: 33 additions & 0 deletions posthog/temporal/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,40 @@
import inspect
from collections.abc import Callable, Coroutine
from datetime import datetime
from functools import wraps
from typing import Any, ParamSpec, TypeVar

from asgiref.sync import sync_to_async
from temporalio import workflow

P = ParamSpec("P")
T = TypeVar("T")


def asyncify(fn: Callable[P, T]) -> Callable[P, Coroutine[Any, Any, T]]:
"""Decorator to convert a sync function using sync_to_async - this preserves type hints for Temporal's serialization while allowing sync Django ORM code.

This preserves type hints for Temporal's serialization while allowing
sync Django ORM code.

Usage:
@activity.defn
@asyncify
def my_activity(task_id: str) -> TaskDetails:
task = Task.objects.get(id=task_id)
return TaskDetails(...)
"""
if inspect.iscoroutinefunction(fn):
raise TypeError(
f"@asyncify should only be used on sync functions. " f"'{fn.__name__}' is already async. Remove @asyncify."
)

@wraps(fn)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
return await sync_to_async(fn)(*args, **kwargs)

return wrapper


def get_scheduled_start_time():
"""Return the start time of a workflow.
Expand Down
19 changes: 19 additions & 0 deletions products/tasks/backend/lib/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
SETUP_REPOSITORY_PROMPT = """
Your goal is to setup the repository in the current environment.
You are operating in a sandbox environment. You must install all dependencies necessary and setup the environment such that it is ready for executing code tasks.
CONTEXT:
CWD: {cwd}
REPOSITORY: {repository}
INSTRUCTIONS:
1. Install all dependencies necessary to run the repository
2. Run any setup scripts that are available
3. Verify the setup by running tests or build if available
DO NOT make any code changes to the repository. The final state of the disk of this sandbox is what will be used for subsequent tasks, so do not leave any cruft behind, and make sure the repository is in a ready to use state.
"""
26 changes: 26 additions & 0 deletions products/tasks/backend/migrations/0009_task_created_by.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Generated by Django 4.2.22 on 2025-10-04 16:57

import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
("tasks", "0008_task_task_number"),
]

operations = [
migrations.AddField(
model_name="task",
name="created_by",
field=models.ForeignKey(
blank=True,
db_index=False,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to=settings.AUTH_USER_MODEL,
),
),
]
2 changes: 1 addition & 1 deletion products/tasks/backend/migrations/max_migration.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0008_task_task_number
0009_task_created_by
4 changes: 3 additions & 1 deletion products/tasks/backend/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ class OriginProduct(models.TextChoices):

id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
team = models.ForeignKey("posthog.Team", on_delete=models.CASCADE)
created_by = models.ForeignKey("posthog.User", on_delete=models.SET_NULL, null=True, blank=True, db_index=False)
task_number = models.IntegerField(null=True, blank=True)
title = models.CharField(max_length=255)
description = models.TextField()
Expand Down Expand Up @@ -331,12 +332,13 @@ def repository_list(self) -> list[dict]:
"""
config = self.repository_config
if config.get("organization") and config.get("repository"):
full_name = f"{config.get('organization')}/{config.get('repository')}".lower()
return [
{
"org": config.get("organization"),
"repo": config.get("repository"),
"integration_id": self.github_integration_id,
"full_name": f"{config.get('organization')}/{config.get('repository')}",
"full_name": full_name,
}
]
return []
Expand Down
3 changes: 3 additions & 0 deletions products/tasks/backend/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ def validate_repository_config(self, value):
def create(self, validated_data):
validated_data["team"] = self.context["team"]

if "request" in self.context and hasattr(self.context["request"], "user"):
validated_data["created_by"] = self.context["request"].user

# Set default GitHub integration if not provided
if not validated_data.get("github_integration"):
default_integration = Integration.objects.filter(team=self.context["team"], kind="github").first()
Expand Down
121 changes: 56 additions & 65 deletions products/tasks/backend/services/sandbox_agent.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import logging
from typing import Optional

from django.conf import settings

from pydantic import BaseModel

from .sandbox_environment import ExecutionResult, SandboxEnvironment, SandboxEnvironmentConfig
from products.tasks.backend.lib.constants import SETUP_REPOSITORY_PROMPT

from .sandbox_environment import ExecutionResult, SandboxEnvironment

logger = logging.getLogger(__name__)

Expand All @@ -11,7 +16,8 @@
DEFAULT_TASK_TIMEOUT_SECONDS = 20 * 60 # 20 minutes


class SandboxAgentConfig(BaseModel):
class SandboxAgentCreateConfig(BaseModel):
name: str
repository_url: str
github_token: str
task_id: str
Expand All @@ -20,83 +26,76 @@ class SandboxAgentConfig(BaseModel):


class SandboxAgent:
"""
Agent that uses sandbox environments to execute tasks.
"""
"""Agent that uses sandbox environments to execute tasks."""

config: SandboxAgentConfig
sandbox: SandboxEnvironment

def __init__(self, sandbox: SandboxEnvironment, config: SandboxAgentConfig):
def __init__(self, sandbox: SandboxEnvironment):
self.sandbox = sandbox
self.config = config

@classmethod
async def create(
cls,
sandbox: SandboxEnvironment,
config: SandboxAgentConfig,
) -> "SandboxAgent":
environment_variables = {
"REPOSITORY_URL": config.repository_url,
"POSTHOG_CLI_TOKEN": config.posthog_personal_api_key,
"POSTHOG_CLI_ENV_ID": config.posthog_project_id,
}

sandbox_config = SandboxEnvironmentConfig(
name=sandbox.config.name,
template=sandbox.config.template,
environment_variables=environment_variables,
entrypoint=sandbox.config.entrypoint,
)

sandbox = await SandboxEnvironment.create(sandbox_config)
agent = cls(sandbox, config)

return agent

async def setup_repository(self) -> ExecutionResult:
async def clone_repository(self, repository: str, github_token: Optional[str] = "") -> ExecutionResult:
if not self.sandbox.is_running:
raise RuntimeError(f"Sandbox not in running state. Current status: {self.sandbox.status}")

return await self.clone_repository(self.config.repository_url)
org, repo = repository.lower().split("/")
repo_url = (
f"https://x-access-token:{github_token}@github.com/{org}/{repo}.git"
if github_token
else f"https://github.com/{org}/{repo}.git"
)

target_path = f"/tmp/workspace/repos/{org}/{repo}"

# Wipe existing directory if present, then clone
clone_command = (
f"rm -rf {target_path} && "
f"mkdir -p /tmp/workspace/repos/{org} && "
f"cd /tmp/workspace/repos/{org} && "
f"git clone {repo_url} {repo}"
)

logger.info(f"Cloning repository {repository} to {target_path} in sandbox {self.sandbox.id}")
return await self.sandbox.execute(clone_command, timeout_seconds=5 * 60)

async def clone_repository(self, repo_url: str) -> ExecutionResult:
async def setup_repository(self, repository: str) -> ExecutionResult:
"""Setup a repository for snapshotting using the PostHog Code Agent."""
if not self.sandbox.is_running:
raise RuntimeError(f"Sandbox not in running state. Current status: {self.sandbox.status}")

if repo_url.startswith("https://github.com/"):
auth_url = repo_url.replace(
"https://github.com/",
f"https://x-access-token:{self.config.github_token}@github.com/",
)
else:
raise ValueError("Only GitHub is supported")
org, repo = repository.lower().split("/")
repo_path = f"/tmp/workspace/repos/{org}/{repo}"

check_result = await self.sandbox.execute(f"test -d {repo_path} && echo 'exists' || echo 'missing'")
if "missing" in check_result.stdout:
raise RuntimeError(f"Repository path {repo_path} does not exist. Clone the repository first.")

clone_command = f"git clone {auth_url} {WORKING_DIR}/{REPOSITORY_TARGET_DIR}"
setup_command = f"cd {repo_path} && {self._get_setup_command(repo_path)}"

logger.info(f"Cloning repository {repo_url} to {self.repository_dir} in sandbox {self.sandbox.id}")
return await self.sandbox.execute(clone_command)
logger.info(f"Running code agent setup for {repository} in sandbox {self.sandbox.id}")
return await self.sandbox.execute(setup_command, timeout_seconds=15 * 60)

async def execute_task(self) -> ExecutionResult:
"""Execute Claude Code commands in the sandbox."""
async def execute_task(self, task_id: str, repository: str) -> ExecutionResult:
if not self.sandbox.is_running:
raise RuntimeError(f"Sandbox not in running state. Current status: {self.sandbox.status}")

full_command = f"cd {self.repository_dir} && {self.get_task_command()}"
org, repo = repository.lower().split("/")
repo_path = f"/tmp/workspace/repos/{org}/{repo}"

logger.info(
f"Executing task {self.config.task_id} in directory {self.repository_dir} in sandbox {self.sandbox.id}"
)
return await self.sandbox.execute(full_command, timeout_seconds=DEFAULT_TASK_TIMEOUT_SECONDS)
command = f"cd {repo_path} && {self._get_task_command(task_id)}"

logger.info(f"Executing task {task_id} in {repo_path} in sandbox {self.sandbox.id}")
return await self.sandbox.execute(command, timeout_seconds=DEFAULT_TASK_TIMEOUT_SECONDS)

# TODO: Replace these once our coding agent is ready
def _get_task_command(self, task_id: str) -> str:
# return f"npx @posthog/code-agent@latest --yes --task-id {task_id}"
return f"export ANTHROPIC_API_KEY={settings.ANTHROPIC_API_KEY} && claude --dangerously-skip-permissions -p 'replace the readme with an ice cream cone'"

def get_task_command(self) -> str:
"""Get the command to execute the task."""
# TODO: Replace with actual task execution: posthog-cli task run --task-id {self.config.task_id}
return "posthog-cli --help"
def _get_setup_command(self, repo_path: str) -> str:
# return f"npx @posthog/code-agent@latest --yes --prompt '{SETUP_REPOSITORY_PROMPT.format(cwd=repo_path, repository=repo_path)}'"
return f"export ANTHROPIC_API_KEY={settings.ANTHROPIC_API_KEY} && claude --dangerously-skip-permissions -p '{SETUP_REPOSITORY_PROMPT.format(cwd=repo_path, repository=repo_path)}'"

async def destroy(self) -> None:
"""Destroy the underlying sandbox."""
await self.sandbox.destroy()

async def __aenter__(self):
Expand All @@ -105,14 +104,6 @@ async def __aenter__(self):
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.destroy()

@property
def working_dir(self) -> str:
return WORKING_DIR

@property
def repository_dir(self) -> str:
return f"{WORKING_DIR}/{REPOSITORY_TARGET_DIR}"

@property
def is_running(self) -> bool:
return self.sandbox.is_running
Expand Down
Loading
Loading