Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ Commands with JSON output support:
- `captcha-solver` - Template demonstrating Kernel's auto-CAPTCHA solver
- `stagehand` - Template with Stagehand SDK (TypeScript only)
- `browser-use` - Template with Browser Use SDK (Python only)
- `lead-scraper` - Google Maps lead scraper using Browser Use (Python only)
- `anthropic-computer-use` - Anthropic Computer Use prompt loop
- `openai-computer-use` - OpenAI Computer Use Agent sample
- `gemini-computer-use` - Implements a Gemini computer use agent (TypeScript only)
Expand Down Expand Up @@ -449,6 +450,9 @@ kernel create --name my-cu-app --language py --template anthropic-computer-use

# Create a Claude Agent SDK app (TypeScript or Python)
kernel create --name my-claude-agent --language ts --template claude-agent-sdk

# Create a Google Maps Lead Scraper (Python)
kernel create --name my-lead-scraper --language python --template lead-scraper
```

### Deploy with environment variables
Expand Down
4 changes: 4 additions & 0 deletions pkg/templates/python/lead-scraper/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Lead Scraper Environment Variables
# Copy this to .env and fill in the values

ANTHROPIC_API_KEY=your_anthropic_api_key_here
336 changes: 336 additions & 0 deletions pkg/templates/python/lead-scraper/loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,336 @@
"""
Agentic sampling loop that calls the Anthropic API and local implementation of anthropic-defined computer use tools.
From https://github.com/anthropics/anthropic-quickstarts/blob/main/computer-use-demo/computer_use_demo/loop.py
Modified to use Kernel Computer Controls API instead of Playwright.
"""

import os
from datetime import datetime
from enum import StrEnum
from typing import Any, cast

from kernel import Kernel
from anthropic import Anthropic
from anthropic.types.beta import (
BetaCacheControlEphemeralParam,
BetaContentBlockParam,
BetaImageBlockParam,
BetaMessage,
BetaMessageParam,
BetaTextBlock,
BetaTextBlockParam,
BetaToolResultBlockParam,
BetaToolUseBlockParam,
)

from tools import (
TOOL_GROUPS_BY_VERSION,
ToolCollection,
ToolResult,
ToolVersion,
)

PROMPT_CACHING_BETA_FLAG = "prompt-caching-2024-07-31"


class APIProvider(StrEnum):
ANTHROPIC = "anthropic"


# This system prompt is optimized for the Docker environment in this repository and
# specific tool combinations enabled.
# We encourage modifying this system prompt to ensure the model has context for the
# environment it is running in, and to provide any additional information that may be
# helpful for the task at hand.
SYSTEM_PROMPT = f"""<SYSTEM_CAPABILITY>
* You are utilising an Ubuntu virtual machine using {os.uname().machine} architecture with internet access.
* When you connect to the display, CHROMIUM IS ALREADY OPEN. The url bar is not visible but it is there.
* If you need to navigate to a new page, use ctrl+l to focus the url bar and then enter the url.
* You won't be able to see the url bar from the screenshot but ctrl-l still works.
* As the initial step click on the search bar.
* When viewing a page it can be helpful to zoom out so that you can see everything on the page.
* Either that, or make sure you scroll down to see everything before deciding something isn't available.
* When using your computer function calls, they take a while to run and send back to you.
* Where possible/feasible, try to chain multiple of these calls all into one function calls request.
* The current date is {datetime.now().strftime("%A, %B %d, %Y")}.
* After each step, take a screenshot and carefully evaluate if you have achieved the right outcome.
* Explicitly show your thinking: "I have evaluated step X..." If not correct, try again.
* Only when you confirm a step was executed correctly should you move on to the next one.
</SYSTEM_CAPABILITY>

<IMPORTANT>
* When using Chromium, if a startup wizard appears, IGNORE IT. Do not even click "skip this step".
* Instead, click on the search bar on the center of the screen where it says "Search or enter address", and enter the appropriate search term or URL there.
</IMPORTANT>"""


async def sampling_loop(
*,
model: str,
messages: list[BetaMessageParam],
api_key: str,
kernel: Kernel,
session_id: str,
system_prompt_suffix: str = "",
only_n_most_recent_images: int | None = None,
max_tokens: int = 4096,
tool_version: ToolVersion = "computer_use_20250124",
thinking_budget: int | None = None,
token_efficient_tools_beta: bool = False,
):
"""
Agentic sampling loop for the assistant/tool interaction of computer use.

Args:
model: The model to use for the API call
messages: The conversation history
api_key: The API key for authentication
kernel: The Kernel client instance
session_id: The Kernel browser session ID
provider: The API provider (defaults to ANTHROPIC)
system_prompt_suffix: Additional system prompt text (defaults to empty string)
only_n_most_recent_images: Optional limit on number of recent images to keep
max_tokens: Maximum tokens for the response (defaults to 4096)
tool_version: Version of tools to use (defaults to V20250124)
thinking_budget: Optional token budget for thinking
token_efficient_tools_beta: Whether to use token efficient tools beta
"""
tool_group = TOOL_GROUPS_BY_VERSION[tool_version]
tool_collection = ToolCollection(
*(
ToolCls(kernel=kernel, session_id=session_id) if ToolCls.__name__.startswith("ComputerTool") else ToolCls()
for ToolCls in tool_group.tools
)
)
system = BetaTextBlockParam(
type="text",
text=f"{SYSTEM_PROMPT}{' ' + system_prompt_suffix if system_prompt_suffix else ''}",
)

while True:
betas = [tool_group.beta_flag] if tool_group.beta_flag else []
if token_efficient_tools_beta:
betas.append("token-efficient-tools-2025-02-19")
image_truncation_threshold = only_n_most_recent_images or 0
client = Anthropic(api_key=api_key, max_retries=4)

betas.append(PROMPT_CACHING_BETA_FLAG)
_inject_prompt_caching(messages)
# Because cached reads are 10% of the price, we don't think it's
# ever sensible to break the cache by truncating images
only_n_most_recent_images = 0
# Use type ignore to bypass TypedDict check until SDK types are updated
system["cache_control"] = {"type": "ephemeral"} # type: ignore

if only_n_most_recent_images:
_maybe_filter_to_n_most_recent_images(
messages,
only_n_most_recent_images,
min_removal_threshold=image_truncation_threshold,
)
extra_body = {}
if thinking_budget:
# Ensure we only send the required fields for thinking
extra_body = {
"thinking": {"type": "enabled", "budget_tokens": thinking_budget}
}

# Call the API
response = client.beta.messages.create(
max_tokens=max_tokens,
messages=messages,
model=model,
system=[system],
tools=tool_collection.to_params(),
betas=betas,
extra_body=extra_body,
)

response_params = _response_to_params(response)
messages.append(
{
"role": "assistant",
"content": response_params,
}
)

loggable_content = [
{
"text": block.get("text", "") or block.get("thinking", ""),
"input": block.get("input", ""),
}
for block in response_params
]
print("=== LLM RESPONSE ===")
print("Stop reason:", response.stop_reason)
print(loggable_content)
print("===")

if response.stop_reason == "end_turn":
print("LLM has completed its task, ending loop")
return messages

tool_result_content: list[BetaToolResultBlockParam] = []
for content_block in response_params:
if content_block["type"] == "tool_use":
result = await tool_collection.run(
name=content_block["name"],
tool_input=cast(dict[str, Any], content_block["input"]),
)
tool_result_content.append(
_make_api_tool_result(result, content_block["id"])
)

if not tool_result_content:
return messages

messages.append({"content": tool_result_content, "role": "user"})


def _maybe_filter_to_n_most_recent_images(
messages: list[BetaMessageParam],
images_to_keep: int,
min_removal_threshold: int,
):
"""
With the assumption that images are screenshots that are of diminishing value as
the conversation progresses, remove all but the final `images_to_keep` tool_result
images in place, with a chunk of min_removal_threshold to reduce the amount we
break the implicit prompt cache.
"""
if images_to_keep is None:
return messages

tool_result_blocks = cast(
list[BetaToolResultBlockParam],
[
item
for message in messages
for item in (
message["content"] if isinstance(message["content"], list) else []
)
if isinstance(item, dict) and item.get("type") == "tool_result"
],
)

total_images = sum(
1
for tool_result in tool_result_blocks
for content in tool_result.get("content", [])
if isinstance(content, dict) and content.get("type") == "image"
)

images_to_remove = total_images - images_to_keep
# for better cache behavior, we want to remove in chunks
images_to_remove -= images_to_remove % min_removal_threshold

for tool_result in tool_result_blocks:
if isinstance(tool_result.get("content"), list):
new_content = []
for content in tool_result.get("content", []):
if isinstance(content, dict) and content.get("type") == "image":
if images_to_remove > 0:
images_to_remove -= 1
continue
new_content.append(content)
tool_result["content"] = new_content


def _response_to_params(
response: BetaMessage,
) -> list[BetaContentBlockParam]:
res: list[BetaContentBlockParam] = []
for block in response.content:
block_type = getattr(block, "type", None)

if block_type == "thinking":
thinking_block = {
"type": "thinking",
"thinking": getattr(block, "thinking", None),
}
if hasattr(block, "signature"):
thinking_block["signature"] = getattr(block, "signature", None)
res.append(cast(BetaContentBlockParam, thinking_block))
elif block_type == "text" or isinstance(block, BetaTextBlock):
if getattr(block, "text", None):
res.append(BetaTextBlockParam(type="text", text=block.text))
elif block_type == "tool_use":
tool_use_block: BetaToolUseBlockParam = {
"type": "tool_use",
"id": block.id,
"name": block.name,
"input": block.input,
}
res.append(tool_use_block)
else:
# Preserve unexpected block types to avoid silently dropping content
if hasattr(block, "model_dump"):
res.append(cast(BetaContentBlockParam, block.model_dump()))
return res


def _inject_prompt_caching(
messages: list[BetaMessageParam],
):
"""
Set cache breakpoints for the 3 most recent turns
one cache breakpoint is left for tools/system prompt, to be shared across sessions
"""

breakpoints_remaining = 3
for message in reversed(messages):
if message["role"] == "user" and isinstance(
content := message["content"], list
):
if breakpoints_remaining:
breakpoints_remaining -= 1
# Use type ignore to bypass TypedDict check until SDK types are updated
content[-1]["cache_control"] = BetaCacheControlEphemeralParam( # type: ignore
{"type": "ephemeral"}
)
else:
content[-1].pop("cache_control", None)
# we'll only every have one extra turn per loop
break


def _make_api_tool_result(
result: ToolResult, tool_use_id: str
) -> BetaToolResultBlockParam:
"""Convert an agent ToolResult to an API ToolResultBlockParam."""
tool_result_content: list[BetaTextBlockParam | BetaImageBlockParam] | str = []
is_error = False
if result.error:
is_error = True
tool_result_content = _maybe_prepend_system_tool_result(result, result.error)
else:
if result.output:
tool_result_content.append(
{
"type": "text",
"text": _maybe_prepend_system_tool_result(result, result.output),
}
)
if result.base64_image:
tool_result_content.append(
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": result.base64_image,
},
}
)
return {
"type": "tool_result",
"content": tool_result_content,
"tool_use_id": tool_use_id,
"is_error": is_error,
}


def _maybe_prepend_system_tool_result(result: ToolResult, result_text: str):
if result.system:
result_text = f"<system>{result.system}</system>\n{result_text}"
return result_text
Loading