diff --git a/01-tutorials/13-AgentCore-payments/02-use-cases/README.md b/01-tutorials/13-AgentCore-payments/02-use-cases/README.md index 859e3eba2..05c2294ee 100644 --- a/01-tutorials/13-AgentCore-payments/02-use-cases/README.md +++ b/01-tutorials/13-AgentCore-payments/02-use-cases/README.md @@ -15,6 +15,16 @@ An AI agent built with **Strands Agents** and **AgentCoreBrowser** autonomously - Deployable CDK content-provider stack included for end-to-end testing - Tested end-to-end on Base Sepolia testnet +### [Pay for Data (Heurist)](pay-for-data/) + +A finance research agent built with **Strands Agents** that calls paid [Heurist](https://heurist.xyz) x402 endpoints for real-time prices, SEC filings, and macro indicators. The `AgentCorePaymentsPlugin` intercepts HTTP 402 responses, asks the AgentCore payment manager to generate a payment proof against the configured payment instrument and payment session, and retries — tool code stays an ordinary `http_request` call. Data is analyzed in AgentCore Code Interpreter and exported as charts and reports. + +**Highlights** +- HTTP 402 payment processing via `AgentCorePaymentsPlugin` — no manual payment code in tools +- Embedded wallet (Coinbase CDP) with USDC as the settlement asset +- AgentCore Code Interpreter for pandas/matplotlib analysis and artifact export +- Targets x402 on Base mainnet (Heurist endpoints settle on Base) + --- More use cases coming soon. diff --git a/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/.env.example b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/.env.example new file mode 100644 index 000000000..aed76ac56 --- /dev/null +++ b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/.env.example @@ -0,0 +1,30 @@ +# AgentCore payments credentials for the Heurist finance agent. +# See ../../01-tutorials/ for setup instructions. + +AWS_REGION=us-west-2 +AWS_PROFILE=payments-admin +BEDROCK_PROFILE=payments-admin + +# Payment manager ARN (created during setup) +PAYMENT_MANAGER_ARN=arn:aws:bedrock-agentcore:us-west-2:123456789012:payment-manager/your-manager-id + +# Payment session and payment instrument (created per-user) +PAYMENT_SESSION_ID=payment-session-REPLACE_ME +PAYMENT_INSTRUMENT_ID=payment-instrument-REPLACE_ME + +# User identifier for payment tracking +USER_ID=demo-user + +# Bedrock model for the agent +BEDROCK_MODEL_ID=us.anthropic.claude-sonnet-4-20250514-v1:0 + +# Heurist catalog configuration +HEURIST_CATALOG_URL=https://mesh.heurist.xyz/x402/agents?details=true +HEURIST_AGENT_IDS=ExaSearchDigestAgent,YahooFinanceAgent,FredMacroAgent,SecEdgarAgent + +# Code Interpreter session name +CODE_INTERPRETER_SESSION_NAME=heurist-finance + +# Agent limits +AGENT_TIMEOUT_SECONDS=300 +AGENT_MAX_TOKENS=64000 diff --git a/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/.gitignore b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/.gitignore new file mode 100644 index 000000000..c56c03c61 --- /dev/null +++ b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/.gitignore @@ -0,0 +1,12 @@ +.venv/ +dist/ +*.egg-info/ +__pycache__/ +*.py[cod] +.pytest_cache/ +.ruff_cache/ +.env +.ipynb_checkpoints/ +heurist_finance_agent/artifacts/* +!heurist_finance_agent/artifacts/.gitkeep +heurist_finance_agent/catalog_live_cache.json diff --git a/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/README.md b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/README.md new file mode 100644 index 000000000..0c5d336fc --- /dev/null +++ b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/README.md @@ -0,0 +1,197 @@ +# Pay for Data — Heurist Finance Agent + +## Overview + +A finance research agent that pays for real-time market data using **Amazon Bedrock AgentCore payments**. The agent calls paid [Heurist](https://heurist.xyz) endpoints for live prices, SEC filings, and macro indicators, analyzes the data with AgentCore Code Interpreter, and returns charts and reports as S3 presigned URLs — all without any manual payment code in the tools. + +The agent is deployed to **AgentCore Runtime**: a managed container endpoint with HTTPS invocation, SigV4 auth, and automatic observability via CloudWatch. + +Heurist endpoints use the [x402 protocol](https://x402.org) — they return HTTP 402 until a valid payment proof is attached. The `AgentCorePaymentsPlugin` handles payment end-to-end: it intercepts 402 responses, generates a USDC proof via the AgentCore payment manager, attaches it, and retries. Your tool code stays a plain `http_request` call. + +## Architecture + +``` +App Backend (ManagementRole) AgentCore Runtime + | +------------------------------+ + | create_session(budget=$X) | runtime_agent.py | + | | BedrockAgentCoreApp | + |-- invoke(manager_arn, session_id, --> | + AgentCorePaymentsPlugin | + | instrument_id, prompt) | | + | | http_request -> 402 | + |<-- {response, artifacts: [{url}]} --- | -> ProcessPayment -> retry | + | | -> Code Interpreter | + | get_session(check spend) | -> export to S3 | + +------------------------------+ + | + v + CloudWatch GenAI Observability + (automatic via OpenTelemetry) +``` + +## How It Works + +`AgentCorePaymentsPlugin` handles the entire x402 payment lifecycle: + +```python +from bedrock_agentcore.payments.integrations.strands import ( + AgentCorePaymentsPlugin, + AgentCorePaymentsPluginConfig, +) + +payment_plugin = AgentCorePaymentsPlugin( + config=AgentCorePaymentsPluginConfig( + payment_manager_arn=PAYMENT_MANAGER_ARN, + user_id=USER_ID, + payment_instrument_id=PAYMENT_INSTRUMENT_ID, + payment_session_id=PAYMENT_SESSION_ID, + region="us-west-2", + ) +) + +agent = Agent( + model=BedrockModel(model_id=MODEL_ID), + tools=[http_request, code_interpreter, export_artifact_to_s3, ...], + plugins=[payment_plugin], +) +``` + +See [`runtime_agent.py`](heurist_finance_agent/runtime_agent.py) for the full implementation. + +## Sample Details + +| | | +|---|---| +| AgentCore components | AgentCore payments, AgentCore Code Interpreter, AgentCore Runtime | +| Agent framework | [Strands Agents](https://strandsagents.com/) | +| Model | Claude Sonnet 4 on Amazon Bedrock (configurable) | +| Payment protocol | [x402](https://x402.org) | +| Payment network | Base (USDC) | + +## Data Sources + +Fetched at runtime from the [Heurist mesh registry](https://mesh.heurist.xyz/x402/agents?details=true). By default the sample loads tools from four agents: + +| Agent | Representative tools | Typical price | +|-------|----------------------|---------------| +| `YahooFinanceAgent` | `price_history`, `quote_snapshot`, `futures_snapshot` | $0.002 | +| `FredMacroAgent` | `macro_series_snapshot`, `macro_regime_context` | $0.003 | +| `SecEdgarAgent` | `filing_timeline`, `filing_diff`, `xbrl_fact_trends` | $0.002 | +| `ExaSearchDigestAgent` | `exa_web_search`, `exa_scrape_url` | $0.005 | + +Override with the `HEURIST_AGENT_IDS` environment variable. + +## Prerequisites + +- An **AgentCore payment manager** created in your AWS account +- A **payment instrument** created and funded (embedded crypto wallet, USDC on Base mainnet) +- Python 3.11+ +- AWS credentials with Bedrock and AgentCore access in `us-west-2` +- Node.js 20+ (for the `@aws/agentcore` CLI) +- Docker (running, for `agentcore deploy` container build) +- [AWS CDK](https://docs.aws.amazon.com/cdk/v2/guide/getting_started.html) installed globally + +## Layout + +``` +pay-for-data/ +├── README.md +├── requirements.txt +├── .env.example +├── pay-for-data.ipynb # notebook: deploy and invoke via AgentCore Runtime +└── heurist_finance_agent/ + ├── runtime_agent.py # AgentCore Runtime entry point (BedrockAgentCoreApp) + ├── catalog.py # fetches Heurist registry, formats for system prompt + ├── catalog_live_cache.json # synced catalog (bundled in Runtime image) + ├── config.py # loads .env + └── scripts/ + └── sync_registry.py # CLI: refreshes cached Heurist catalog``` + +## Quick Start + +Open [`pay-for-data.ipynb`](pay-for-data.ipynb) and run the cells in order: + +| Step | What happens | +|------|-------------| +| 1 | Configure credentials and confirm AWS identity | +| 2 | Sync the Heurist tool catalog (bundled in the container image) | +| 3 | Create the S3 artifacts bucket | +| 4 | Install the AgentCore CLI, scaffold and deploy | +| 5 | Add IAM permissions to the execution role | +| 6 | Invoke the deployed agent and inspect results | +| 7 | View observability traces in CloudWatch | +| 8 | Cleanup | + +## Payment Flow + +When the agent calls a paid Heurist endpoint: + +1. `http_request` sends a POST to the endpoint URL. +2. Heurist returns HTTP 402 with x402 payment terms (network, asset, amount, recipient). +3. `AgentCorePaymentsPlugin` intercepts the response. +4. The plugin asks the AgentCore payment manager to generate a payment proof. +5. The payment manager uses the payment instrument to sign a USDC transfer and returns a proof. +6. The plugin attaches the proof as `X-PAYMENT` and retries — Heurist validates and returns the data. + +The plugin retries up to 3 times per tool call. Payment limits are enforced at the session scope — the agent cannot exceed `maxSpendAmount`. + +## How the Runtime Agent Works + +`runtime_agent.py` implements the AgentCore Runtime service contract with full feature parity: + +**Stateless, payload-driven** +All payment config (manager ARN, session ID, instrument ID) comes from the invocation payload. The container holds no credentials. The app backend (ManagementRole) creates payment sessions with spending limits before each invocation. The Runtime execution role (ProcessPaymentRole) can only spend within those limits. + +**AgentCore Code Interpreter** +Code Interpreter is a remote AWS API — it works identically from a Runtime container as from any other environment. The agent uses it for pandas/matplotlib analysis and chart generation. + +**S3 artifact storage** +Artifacts produced by Code Interpreter are uploaded to S3 and returned as presigned download URLs. The response shape is: + +```json +{ + "response": "", + "artifacts": [ + {"name": "chart.png", "url": "https://...", "expires_in": 3600} + ] +} +``` + +If `CI_ARTIFACTS_BUCKET` is not configured, the agent degrades gracefully: charts become markdown tables, text returns inline. + +**Observability** +The `agentcore deploy` CLI configures the container to run under `opentelemetry-instrument`. Combined with `aws-opentelemetry-distro` (included in `pyproject.toml`), this provides: +- Strands agent spans (LLM calls, tool calls, agent turns) → CloudWatch GenAI Observability +- Code Interpreter calls stitched as child spans via W3C `traceparent` botocore instrumentation +- Payment calls (`ProcessPayment`, `GetPaymentInstrument`) as boto3 child spans + +No instrumentation code required in `runtime_agent.py`. + +**Execution role permissions** (attached by the notebook, Step 5): + +| Permission set | Actions | Resource scope | +|---|---|---| +| Payment data-plane | `ProcessPayment`, `GetPaymentInstrument`, `GetPaymentSession` | `payment-manager/*` | +| Code Interpreter | `StartCodeInterpreterSession`, `InvokeCodeInterpreter`, `StopCodeInterpreterSession` | `code-interpreter/*` | +| S3 artifacts | `PutObject`, `GetObject` | `/heurist-finance-artifacts/*` | + +## Environment Variables + +See [`.env.example`](.env.example). Required: + +| Variable | Description | +|----------|-------------| +| `PAYMENT_MANAGER_ARN` | ARN of the AgentCore payment manager | +| `PAYMENT_SESSION_ID` | ID of an active payment session | +| `PAYMENT_INSTRUMENT_ID` | ID of a funded payment instrument (embedded crypto wallet) | +| `USER_ID` | User identifier for payment tracking | +| `BEDROCK_MODEL_ID` | Bedrock model (default: Claude Sonnet 4) | +| `HEURIST_AGENT_IDS` | Comma-separated Heurist agents to load | + +These values are passed in the invocation payload at runtime. The `.env` bundled in the container image contains only non-sensitive service config: `CI_ARTIFACTS_BUCKET`, `AWS_REGION`, `BEDROCK_MODEL_ID`. + +## Notes + +- Payment sessions expire. Create a fresh session before each invocation in automated workflows. +- Each paid call settles USDC on Base. Ensure your payment instrument is funded. +- Sync the catalog cache before building the container image (`sync_registry.py`). The cache is bundled in the image — the container does not call the Heurist registry at startup. +- Presigned artifact URLs expire after `CI_ARTIFACTS_TTL` seconds (default: 1 hour). Download or forward the URL to the end user promptly. diff --git a/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/__init__.py b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/__init__.py new file mode 100644 index 000000000..bce9c27dc --- /dev/null +++ b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/__init__.py @@ -0,0 +1 @@ +"""Heurist x402 finance agent package.""" diff --git a/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/artifact_export.py b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/artifact_export.py new file mode 100644 index 000000000..fd162320e --- /dev/null +++ b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/artifact_export.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +"""Export files from an AgentCore Code Interpreter session into the local workspace.""" + +from __future__ import annotations + +import ast +import base64 +import json +import mimetypes +import re +from pathlib import Path +from typing import Any + +from heurist_finance_agent.config import PROJECT_DIR + +ARTIFACTS_DIR = PROJECT_DIR / "artifacts" + +# Cap the amount of base64 payload we accept from the code interpreter so a +# runaway script cannot fill the local disk via this helper. +MAX_EXPORT_PAYLOAD_BYTES = 50 * 1024 * 1024 # 50 MiB base64 blob +_SAFE_FILENAME_RE = re.compile(r"[^A-Za-z0-9._-]+") + + +def safe_artifact_path(raw_name: str) -> Path: + """Return a path inside ``ARTIFACTS_DIR`` that cannot escape it. + + The LLM agent supplies ``local_filename``. Without sanitization a value + like ``../../etc/passwd`` would resolve outside the artifacts directory. + We strip any directory components, reject empty / dot-only names, and + allow-list a small set of characters safe for cross-platform filenames. + """ + if not raw_name: + raise ValueError("Artifact filename must not be empty") + base = Path(raw_name).name # drops any directory components + base = _SAFE_FILENAME_RE.sub("_", base).strip("._") or "" + if not base or base in {".", ".."}: + raise ValueError(f"Unsafe artifact filename: {raw_name!r}") + candidate = (ARTIFACTS_DIR / base).resolve() + artifacts_root = ARTIFACTS_DIR.resolve() + # Belt-and-suspenders: make sure the resolved path is still inside + # the artifacts directory. + try: + candidate.relative_to(artifacts_root) + except ValueError as exc: + raise ValueError( + f"Artifact path escapes artifacts directory: {raw_name!r}" + ) from exc + return candidate + + +def _extract_text_payload(tool_result: dict[str, Any]) -> str: + content = tool_result.get("content", []) + if not content: + raise ValueError(f"Missing tool content: {tool_result}") + text_blob = content[0].get("text") + if not text_blob: + raise ValueError(f"Missing text payload: {tool_result}") + parsed = ast.literal_eval(text_blob) + if not parsed or "text" not in parsed[0]: + raise ValueError(f"Unexpected tool payload: {tool_result}") + return parsed[0]["text"] + + +def _as_dict(value: Any) -> dict[str, Any] | None: + """Return ``value`` if it is a dict, otherwise ``None``.""" + return value if isinstance(value, dict) else None + + +def _extract_json_payload(text: str) -> dict[str, Any]: + """Parse ``text`` into a JSON object. + + The Code Interpreter runs our own script that does ``print(json.dumps(payload))``, + so the output is always well-formed JSON. We parse it directly and reject + non-dict top-level values so errors surface here with a clear message. + """ + stripped = text.strip() + if not stripped: + raise ValueError("Empty payload text") + try: + result = json.loads(stripped) + except json.JSONDecodeError as exc: + raise ValueError( + f"Could not parse JSON payload from Code Interpreter output: {text[:500]}" + ) from exc + if not isinstance(result, dict): + raise ValueError( + f"Expected a JSON object from Code Interpreter, got {type(result).__name__}: {text[:200]}" + ) + return result + + +def export_code_interpreter_file( + code_interpreter, + remote_path: str, + session_name: str, + local_filename: str | None = None, +) -> dict[str, Any]: + """Read a remote file as base64 within the interpreter and persist it locally.""" + export_code = f""" +import base64 +import json +import mimetypes +from pathlib import Path + +p = Path({remote_path!r}) +if not p.exists(): + raise FileNotFoundError(str(p)) + +payload = {{ + "path": str(p), + "name": p.name, + "mime_type": mimetypes.guess_type(str(p))[0], + "base64": base64.b64encode(p.read_bytes()).decode(), +}} +print(json.dumps(payload)) +""" + result = code_interpreter.code_interpreter( + { + "action": { + "type": "executeCode", + "session_name": session_name, + "language": "python", + "code": export_code, + } + } + ) + payload_text = _extract_text_payload(result) + payload = _extract_json_payload(payload_text) + + ARTIFACTS_DIR.mkdir(parents=True, exist_ok=True) + output_name = local_filename or payload.get("name") + if not isinstance(output_name, str) or not output_name: + raise ValueError("Exported payload did not include a usable filename") + + local_path = safe_artifact_path(output_name) + + encoded = payload.get("base64", "") + if not isinstance(encoded, str): + raise ValueError("Exported payload 'base64' field must be a string") + if len(encoded) > MAX_EXPORT_PAYLOAD_BYTES: + raise ValueError( + f"Exported payload is {len(encoded)} bytes which exceeds the " + f"{MAX_EXPORT_PAYLOAD_BYTES} byte limit." + ) + local_path.write_bytes(base64.b64decode(encoded)) + + return { + "status": "success", + "remote_path": payload.get("path"), + "local_path": str(local_path), + "mime_type": payload.get("mime_type") + or mimetypes.guess_type(local_path.name)[0], + "size_bytes": local_path.stat().st_size, + } diff --git a/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/artifacts/.gitkeep b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/artifacts/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/catalog.py b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/catalog.py new file mode 100644 index 000000000..7ab3bab88 --- /dev/null +++ b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/catalog.py @@ -0,0 +1,260 @@ +#!/usr/bin/env python3 +"""Load the Heurist tool catalog and format it for the agent system prompt.""" + +from __future__ import annotations + +import json +import math +import os +import re +import tempfile +from pathlib import Path +from typing import Any + +import requests + +from heurist_finance_agent.config import LIVE_CATALOG_CACHE_PATH, get_config + +# --- Safety limits --------------------------------------------------------- +# These caps are intentionally generous for a sample but prevent accidental +# memory blow-ups from a misconfigured endpoint or disk corruption. +MAX_CATALOG_BYTES = 5 * 1024 * 1024 # 5 MiB on-disk cache +MAX_CATALOG_RESPONSE_BYTES = 10 * 1024 * 1024 # 10 MiB network payload +MAX_PROMPT_FIELD_LEN = 500 # per-field cap when rendered into the system prompt + +# Sentinel used when an external field is dropped because it cannot be safely +# rendered inside a markdown table. +_UNSAFE_FIELD_PLACEHOLDER = "(unavailable)" + +# Characters we refuse to echo back into the system prompt because they can +# be used to break the markdown table or inject links / code fences. +_UNSAFE_PROMPT_CHARS = re.compile(r"[\x00-\x1f\x7f`|\[\]]") + + +def _sanitize_prompt_text(value: Any, max_len: int = MAX_PROMPT_FIELD_LEN) -> str: + """Return a markdown-safe single-line string derived from ``value``. + + External catalog data is interpolated into the agent's system prompt. + Without sanitization a malicious registry entry could inject links, + code fences, or table pipes that alter the prompt structure. + """ + if value is None: + return "" + text = str(value) + # Strip control characters and markdown metacharacters that would break + # the rendered table or allow link/code-fence injection. + text = _UNSAFE_PROMPT_CHARS.sub(" ", text) + # Collapse whitespace so the line stays on a single row. + text = re.sub(r"\s+", " ", text).strip() + if len(text) > max_len: + text = text[:max_len].rstrip() + "…" + return text + + +def _sanitize_url(value: Any) -> str: + """Only accept http(s) URLs; otherwise return a placeholder.""" + text = _sanitize_prompt_text(value, max_len=MAX_PROMPT_FIELD_LEN) + if not text: + return _UNSAFE_FIELD_PLACEHOLDER + if not re.match(r"^https?://[^\s]+$", text, re.IGNORECASE): + return _UNSAFE_FIELD_PLACEHOLDER + return text + + +def _coerce_price(raw: Any) -> float: + """Convert a raw price value into a finite, non-negative float. + + Rejects ``NaN``, infinities and negative values since they break + comparisons and downstream arithmetic. + """ + try: + price = float(raw) + except (TypeError, ValueError) as exc: + raise ValueError(f"Invalid price value {raw!r}") from exc + if not math.isfinite(price) or price < 0: + raise ValueError( + f"Invalid price value {raw!r}: must be a finite, non-negative number" + ) + return price + + +def _atomic_write_text(path: Path, content: str) -> None: + """Write ``content`` to ``path`` atomically via a same-directory temp file. + + Using ``tempfile.mkstemp`` + ``os.replace`` prevents concurrent readers and + writers from seeing a half-written file (the TOCTOU bug flagged by the + validator). + """ + path.parent.mkdir(parents=True, exist_ok=True) + fd, tmp_name = tempfile.mkstemp( + prefix=f".{path.name}.", suffix=".tmp", dir=str(path.parent) + ) + try: + with os.fdopen(fd, "w", encoding="utf-8") as fh: + fh.write(content) + fh.flush() + os.fsync(fh.fileno()) + os.replace(tmp_name, path) + except Exception: + # Best-effort cleanup of the temp file on failure. + try: + os.unlink(tmp_name) + except OSError: + pass + raise + + +def fetch_live_catalog(session: requests.Session | None = None) -> dict[str, Any]: + """Fetch the live Heurist mesh registry and cache it locally.""" + cfg = get_config() + http = session or requests.Session() + response = http.get(cfg.heurist_catalog_url, timeout=30, stream=True) + response.raise_for_status() + + # Enforce a size cap on the streamed response before JSON-decoding it. + chunks: list[bytes] = [] + total = 0 + for chunk in response.iter_content(chunk_size=64 * 1024): + if not chunk: + continue + total += len(chunk) + if total > MAX_CATALOG_RESPONSE_BYTES: + raise ValueError( + f"Heurist catalog response exceeded {MAX_CATALOG_RESPONSE_BYTES} bytes" + ) + chunks.append(chunk) + body = b"".join(chunks).decode(response.encoding or "utf-8") + payload = json.loads(body) + + _atomic_write_text(LIVE_CATALOG_CACHE_PATH, json.dumps(payload, indent=2)) + return payload + + +def load_live_catalog(path: Path | None = None) -> dict[str, Any]: + input_path = path or LIVE_CATALOG_CACHE_PATH + if not input_path.exists(): + raise FileNotFoundError(f"Live catalog cache not found: {input_path}") + size = input_path.stat().st_size + if size > MAX_CATALOG_BYTES: + raise ValueError( + f"Catalog cache at {input_path} is {size} bytes which exceeds the " + f"{MAX_CATALOG_BYTES} byte limit. Delete or regenerate the file." + ) + return json.loads(input_path.read_text(encoding="utf-8")) + + +def get_live_catalog( + refresh: bool = False, session: requests.Session | None = None +) -> dict[str, Any]: + if refresh or not LIVE_CATALOG_CACHE_PATH.exists(): + return fetch_live_catalog(session=session) + return load_live_catalog() + + +def get_tools_for_agents( + agent_ids: tuple[str, ...] | list[str], + refresh: bool = False, + session: requests.Session | None = None, +) -> list[dict[str, Any]]: + """Return normalized tool definitions for the selected Heurist agents.""" + import logging + + logger = logging.getLogger(__name__) + + selected = set(agent_ids) + live_catalog = get_live_catalog(refresh=refresh, session=session) + tools: list[dict[str, Any]] = [] + found_ids: set[str] = set() + + for agent in live_catalog.get("agents", []): + agent_id = agent.get("agentId") + if not agent_id or agent_id not in selected: + continue + found_ids.add(agent_id) + for tool in agent.get("tools", []): + try: + price_usd = _coerce_price(tool["priceUsd"]) + except (KeyError, ValueError): + # Skip tools with an invalid or missing price rather than + # letting NaN/Infinity/negative values leak into the prompt. + continue + tools.append( + { + "agent_id": agent_id, + "tool_name": tool.get("name", ""), + "resource_url": tool.get("resourceUrl", ""), + "price_usd": price_usd, + "method": tool.get("method", "POST"), + "description": tool.get("description", ""), + "parameters": tool.get("parameters", {}) or {}, + } + ) + + missing = selected - found_ids + if missing: + logger.warning( + "The following agent IDs were not found in the Heurist catalog and will be " + "skipped. They may have been renamed or removed: %s. " + "Run sync_registry to refresh the catalog, or update HEURIST_AGENT_IDS in .env.", + ", ".join(sorted(missing)), + ) + + return tools + + +def format_catalog_for_prompt(tools: list[dict[str, Any]]) -> str: + """Format the tool catalog as a reference table for the agent system prompt. + + The agent uses this to know which URLs to call via http_request. All + externally-sourced fields are sanitized so that untrusted registry data + cannot inject markdown links, code fences, or extra table columns into + the agent's system prompt. + """ + lines = ["## Available Paid Endpoints (Heurist x402)", ""] + lines.append("| Agent | Tool | URL | Method | Price | Description |") + lines.append("|-------|------|-----|--------|-------|-------------|") + + for t in tools: + agent_id = _sanitize_prompt_text(t.get("agent_id"), max_len=80) + tool_name = _sanitize_prompt_text(t.get("tool_name"), max_len=80) + url = _sanitize_url(t.get("resource_url")) + method = _sanitize_prompt_text(t.get("method"), max_len=10) or "POST" + desc = _sanitize_prompt_text(t.get("description"), max_len=80) + price = t.get("price_usd") + price_str = ( + f"${price:.3f}" + if isinstance(price, (int, float)) and math.isfinite(price) + else "n/a" + ) + lines.append( + f"| {agent_id} | {tool_name} | {url} | {method} | {price_str} | {desc} |" + ) + + lines.append("") + lines.append("### Parameter Schemas") + lines.append("") + for t in tools: + params = t.get("parameters", {}) or {} + props = params.get("properties", {}) or {} + if not props: + continue + agent_id = _sanitize_prompt_text(t.get("agent_id"), max_len=80) + tool_name = _sanitize_prompt_text(t.get("tool_name"), max_len=80) + method = _sanitize_prompt_text(t.get("method"), max_len=10) or "POST" + url = _sanitize_url(t.get("resource_url")) + lines.append(f"**{agent_id}/{tool_name}** (`{method} {url}`)") + required_fields = params.get("required", []) or [] + for name, schema in props.items(): + if not isinstance(schema, dict): + schema = {} + safe_name = _sanitize_prompt_text(name, max_len=80) + required = safe_name in { + _sanitize_prompt_text(r, max_len=80) for r in required_fields + } + req_marker = " (required)" if required else "" + type_name = _sanitize_prompt_text(schema.get("type", "any"), max_len=40) + desc = _sanitize_prompt_text(schema.get("description", ""), max_len=120) + lines.append(f" - `{safe_name}`: {type_name}{req_marker} — {desc}") + lines.append("") + + return "\n".join(lines) diff --git a/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/config.py b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/config.py new file mode 100644 index 000000000..e558cabd9 --- /dev/null +++ b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/config.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +"""Shared configuration for the Heurist finance agent.""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + +from dotenv import load_dotenv + +PROJECT_DIR = Path(__file__).resolve().parent +LIVE_CATALOG_CACHE_PATH = PROJECT_DIR / "catalog_live_cache.json" + +# The README instructs users to place `.env` in the use-case root +# (`pay-for-data/.env`). We also accept a file placed directly inside the +# package directory for developers who prefer that layout. +USE_CASE_DIR = PROJECT_DIR.parent +ENV_CANDIDATE_PATHS: tuple[Path, ...] = ( + USE_CASE_DIR / ".env", + PROJECT_DIR / ".env", +) + +DEFAULT_HEURIST_AGENT_IDS = ( + "ExaSearchDigestAgent", + "YahooFinanceAgent", + "FredMacroAgent", + "SecEdgarAgent", +) + +# Required environment variables for the agent to run. +_REQUIRED_ENV_VARS: tuple[str, ...] = ( + "PAYMENT_MANAGER_ARN", + "PAYMENT_SESSION_ID", + "PAYMENT_INSTRUMENT_ID", +) + + +def load_environment() -> None: + """Load the local .env file from any of the supported locations. + + Missing .env files are tolerated so that values supplied via the real + environment (shell, CI secrets, etc.) still work. + """ + for candidate in ENV_CANDIDATE_PATHS: + if candidate.is_file(): + load_dotenv(candidate, override=False) + + +@dataclass(frozen=True) +class AppConfig: + aws_region: str + aws_profile: str | None + bedrock_profile: str | None + bedrock_model_id: str + payment_manager_arn: str + payment_session_id: str + payment_instrument_id: str + user_id: str + heurist_catalog_url: str + heurist_tool_agent_ids: tuple[str, ...] + code_interpreter_session_name: str + agent_timeout_seconds: int + agent_max_tokens: int + + +def _parse_csv_tuple( + raw_value: str | None, default: tuple[str, ...] +) -> tuple[str, ...]: + if not raw_value: + return default + values = tuple(item.strip() for item in raw_value.split(",") if item.strip()) + return values or default + + +def _require_env(name: str) -> str: + value = os.environ.get(name) + if value: + return value + searched = ", ".join(str(p) for p in ENV_CANDIDATE_PATHS) + raise RuntimeError( + f"Missing required environment variable {name!r}. " + f"Set it in your shell or in a .env file at one of: {searched}. " + f"See .env.example for the full list of required values." + ) + + +def get_config() -> AppConfig: + load_environment() + missing = [name for name in _REQUIRED_ENV_VARS if not os.environ.get(name)] + if missing: + searched = ", ".join(str(p) for p in ENV_CANDIDATE_PATHS) + raise RuntimeError( + "Missing required environment variables: " + + ", ".join(missing) + + f". Set them in your shell or in a .env file at one of: {searched}. " + + "See .env.example for the full list of required values." + ) + return AppConfig( + aws_region=os.environ.get("AWS_REGION", "us-west-2"), + aws_profile=os.environ.get("AWS_PROFILE"), + bedrock_profile=os.environ.get("BEDROCK_PROFILE") + or os.environ.get("AWS_PROFILE"), + bedrock_model_id=os.environ.get( + "BEDROCK_MODEL_ID", "us.anthropic.claude-sonnet-4-20250514-v1:0" + ), + payment_manager_arn=_require_env("PAYMENT_MANAGER_ARN"), + payment_session_id=_require_env("PAYMENT_SESSION_ID"), + payment_instrument_id=_require_env("PAYMENT_INSTRUMENT_ID"), + user_id=os.environ.get("USER_ID", "demo-user"), + heurist_catalog_url=os.environ.get( + "HEURIST_CATALOG_URL", "https://mesh.heurist.xyz/x402/agents?details=true" + ), + heurist_tool_agent_ids=_parse_csv_tuple( + os.environ.get("HEURIST_AGENT_IDS"), DEFAULT_HEURIST_AGENT_IDS + ), + code_interpreter_session_name=os.environ.get( + "CODE_INTERPRETER_SESSION_NAME", "heurist-finance" + ), + agent_timeout_seconds=int(os.environ.get("AGENT_TIMEOUT_SECONDS", "300")), + agent_max_tokens=int(os.environ.get("AGENT_MAX_TOKENS", "64000")), + ) diff --git a/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/local_agent.py b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/local_agent.py new file mode 100644 index 000000000..a42b9c7d1 --- /dev/null +++ b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/local_agent.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 +"""Heurist finance agent using AgentCorePaymentsPlugin for automatic x402 payments. + +The agent uses the built-in http_request tool from strands-agents-tools to call +paid Heurist endpoints. The AgentCorePaymentsPlugin intercepts HTTP 402 responses, +generates payment proofs via AgentCore payments, and retries automatically. +""" + +from __future__ import annotations + +import sys +from datetime import datetime +from pathlib import Path +from typing import Any + +import boto3 +from strands import Agent, tool +from strands.models import BedrockModel +from strands_tools import http_request +from strands_tools.code_interpreter import AgentCoreCodeInterpreter + +from bedrock_agentcore.payments.integrations.strands import ( + AgentCorePaymentsPlugin, + AgentCorePaymentsPluginConfig, +) + +if __package__ in (None, ""): + sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from heurist_finance_agent.artifact_export import ( + ARTIFACTS_DIR, + export_code_interpreter_file, + safe_artifact_path, +) +from heurist_finance_agent.catalog import ( + format_catalog_for_prompt, + get_tools_for_agents, +) +from heurist_finance_agent.config import get_config, load_environment + +load_environment() +CFG = get_config() +CODE_INTERPRETER = AgentCoreCodeInterpreter( + region=CFG.aws_region, + session_name=CFG.code_interpreter_session_name, +) +ARTIFACTS_DIR.mkdir(parents=True, exist_ok=True) + +# --- Load Heurist catalog for the system prompt --- +HEURIST_TOOLS = get_tools_for_agents(CFG.heurist_tool_agent_ids) +CATALOG_REFERENCE = format_catalog_for_prompt(HEURIST_TOOLS) + + +# --- Artifact helper tools --- + + +@tool +def export_code_interpreter_artifact( + remote_path: str, local_filename: str | None = None, session_name: str | None = None +) -> dict[str, Any]: + """Export a file from AgentCore Code Interpreter to the local workspace.""" + active_session = session_name or CFG.code_interpreter_session_name + return export_code_interpreter_file( + CODE_INTERPRETER, + remote_path=remote_path, + session_name=active_session, + local_filename=local_filename, + ) + + +@tool +def list_exported_artifacts(limit: int = 50) -> dict[str, Any]: + """List files already exported into the local artifacts directory.""" + items = [] + for path in sorted(ARTIFACTS_DIR.glob("*")): + if path.is_file(): + items.append( + { + "name": path.name, + "path": str(path), + "size_bytes": path.stat().st_size, + } + ) + return {"total": len(items), "items": items[:limit]} + + +@tool +def save_text_artifact(filename: str, content: str) -> dict[str, Any]: + """Save text content directly into the local artifacts directory.""" + output_path = safe_artifact_path(filename) + output_path.write_text(content) + return { + "status": "success", + "path": str(output_path), + "size_bytes": output_path.stat().st_size, + } + + +# --- System prompt --- + +SYSTEM_PROMPT = f"""You are a finance research and data visualization agent. + +You have access to paid financial data endpoints via the Heurist network. To fetch data, +use the `http_request` tool to call the endpoint URLs listed below. All endpoints accept +POST requests with JSON bodies. + +**Payment is handled automatically.** When an endpoint returns HTTP 402 (Payment Required), +the system processes the payment and retries your request. You do not need to handle +payments yourself — just make the http_request call and you will receive the data. + +{CATALOG_REFERENCE} + +## Working Rules + +- Use http_request to call the Heurist endpoints above. Always use method="POST" and + pass the parameters as a JSON body string. +- Use AgentCore Code Interpreter to analyze data with pandas/matplotlib. +- If you create a chart or report in code_interpreter, export it with export_code_interpreter_artifact. +- For text reports, use save_text_artifact directly. +- Parallelize data fetches when possible (call multiple http_requests in the same round). +- Never fabricate or simulate market data. Only use data returned by the tools. +- If a tool call fails, report the error and stop. Do not invent fallback data. + +## Code Interpreter Usage + +- Start with initSession when the session is not ready. +- Use writeFiles for datasets so your code stays readable. +- Use pandas/matplotlib for analysis and plotting. +- Export every user-facing artifact to the local workspace. + +Code interpreter action examples: +- init session: + {{"action": {{"type": "initSession", "session_name": "{CFG.code_interpreter_session_name}", "description": "analysis session"}}}} +- write file: + {{"action": {{"type": "writeFiles", "session_name": "{CFG.code_interpreter_session_name}", "content": [{{"path": "data.json", "text": "{{...}}"}}]}}}} +- execute python: + {{"action": {{"type": "executeCode", "session_name": "{CFG.code_interpreter_session_name}", "language": "python", "code": "print(1)"}}}} + +## Context + +- Model: {CFG.bedrock_model_id} +- Code interpreter session: {CFG.code_interpreter_session_name} +- Export directory: {str(ARTIFACTS_DIR)!r} +- Today's date: {datetime.now().strftime("%Y-%m-%d")} +""" + + +def create_agent() -> Agent: + """Build and return the Strands agent with the payments plugin.""" + bedrock_session = boto3.Session( + profile_name=CFG.bedrock_profile, region_name=CFG.aws_region + ) + model = BedrockModel( + boto_session=bedrock_session, + model_id=CFG.bedrock_model_id, + temperature=0, + max_tokens=CFG.agent_max_tokens, + ) + + # Configure the AgentCore payments plugin — handles x402 automatically + payment_plugin = AgentCorePaymentsPlugin( + config=AgentCorePaymentsPluginConfig( + payment_manager_arn=CFG.payment_manager_arn, + user_id=CFG.user_id, + payment_instrument_id=CFG.payment_instrument_id, + payment_session_id=CFG.payment_session_id, + region=CFG.aws_region, + ) + ) + + agent = Agent( + system_prompt=SYSTEM_PROMPT, + model=model, + tools=[ + http_request, + list_exported_artifacts, + save_text_artifact, + export_code_interpreter_artifact, + CODE_INTERPRETER.code_interpreter, + ], + plugins=[payment_plugin], + ) + + return agent + + +def invoke_agent(prompt: str): + """Create an agent and run a single prompt to completion.""" + agent = create_agent() + return agent(prompt) + + +def main() -> None: + if len(sys.argv) > 1: + prompt = " ".join(sys.argv[1:]) + print(invoke_agent(prompt)) + return + + agent = create_agent() + while True: + try: + prompt = input("You: ").strip() + except (EOFError, KeyboardInterrupt): + print() + return + if not prompt or prompt.lower() in {"quit", "exit", "q"}: + return + print(agent(prompt)) + + +if __name__ == "__main__": + main() diff --git a/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/runtime_agent.py b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/runtime_agent.py new file mode 100644 index 000000000..40e65fad5 --- /dev/null +++ b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/runtime_agent.py @@ -0,0 +1,612 @@ +#!/usr/bin/env python3 +""" +Heurist Finance Agent — AgentCore Runtime entry point. + +This is the **recommended production deployment** of the pay-for-data agent. +For local development and testing, see local_agent.py. + +Production-grade, full-featured version: + - AgentCore Code Interpreter for sandboxed pandas/matplotlib analysis + (Code Interpreter is a remote AWS API — it works identically from a Runtime + container as from a local machine) + - S3 artifact storage: charts, CSVs, and reports are uploaded to S3 and + returned as presigned URLs in the invocation response + - Stateless: ALL payment config comes from the invocation payload — the + container holds no credentials + +Compared to the local local_agent.py: + - artifact_export.py is NOT used (writes to ephemeral container disk) + - Instead, export_artifact_to_s3 / save_report_to_s3 upload to S3 and + return presigned URLs that the caller can download or display + - If CI_ARTIFACTS_BUCKET is not set, the agent degrades gracefully: + charts are described as markdown tables, text reports are returned inline + +Required IAM permissions for the execution role (added by pay-for-data.ipynb Step 5f): + Payments: + bedrock-agentcore:ProcessPayment + bedrock-agentcore:GetPaymentInstrument, GetPaymentInstrumentBalance + bedrock-agentcore:GetPaymentSession, GetResourcePaymentToken + Resource: arn:aws:bedrock-agentcore:::payment-manager/* + + Code Interpreter: + bedrock-agentcore:StartCodeInterpreterSession + bedrock-agentcore:StopCodeInterpreterSession + bedrock-agentcore:InvokeCodeInterpreter + Resource: arn:aws:bedrock-agentcore:::code-interpreter/* + + S3 (required only if CI_ARTIFACTS_BUCKET is set): + s3:PutObject on arn:aws:s3:::/heurist-finance-artifacts/* + s3:GetObject on arn:aws:s3:::/heurist-finance-artifacts/* + + Bedrock + CloudWatch — added automatically by `agentcore deploy` + +Environment variables (set via .env bundled in the container image): + CI_ARTIFACTS_BUCKET S3 bucket for artifact storage (optional but recommended) + CI_ARTIFACTS_PREFIX S3 key prefix (default: "heurist-finance-artifacts/") + CI_ARTIFACTS_TTL Presigned URL TTL seconds (default: 3600) + HEURIST_AGENT_IDS Comma-separated Heurist agent IDs to load + BEDROCK_MODEL_ID Override the default Bedrock model ID + AWS_REGION Set automatically by AgentCore Runtime + +Invocation payload: + prompt (str, required) — the research request + payment_manager_arn (str, required) + user_id (str, required) + payment_session_id (str, required) — created by app backend with budget + payment_instrument_id (str, required) + bedrock_model_id (str, optional) — per-invocation model override + +Response: + { + "response": "", + "artifacts": [ # empty list if no artifacts produced + {"name": "chart.png", "url": "https://...", "expires_in": 3600}, + {"name": "report.md", "url": "https://...", "expires_in": 3600} + ] + } + +Deployment (see pay-for-data.ipynb Step 5 for the full walkthrough): + 1. python -m heurist_finance_agent.scripts.sync_registry # refresh catalog cache + 2. agentcore create --name HeuristFinanceAgent ... + 3. cp heurist_finance_agent/runtime_agent.py HeuristFinanceAgent/.../main.py + 4. cp -r heurist_finance_agent HeuristFinanceAgent/.../ + 5. echo "CI_ARTIFACTS_BUCKET=" >> HeuristFinanceAgent/.../.env + 6. agentcore deploy -y + 7. Attach payment + CI + S3 IAM policies to the auto-created execution role +""" + +from __future__ import annotations + +import ast +import base64 +import json +import logging +import os +import re +import sys +import threading +import uuid +from datetime import datetime +from pathlib import Path +from typing import Any + +# Load .env if present — supports local testing and container config injection. +# python-dotenv is included in requirements.txt; safe to import unconditionally. +from dotenv import load_dotenv + +load_dotenv() + +# --------------------------------------------------------------------------- +# sys.path — ensure heurist_finance_agent is importable whether this file +# lives in the use-case root or inside a scaffolded AgentCore project. +# --------------------------------------------------------------------------- +_THIS_DIR = Path(__file__).resolve().parent +_AGENT_PKG_PARENT = _THIS_DIR.parent +for _candidate in [_THIS_DIR, _AGENT_PKG_PARENT]: + if str(_candidate) not in sys.path: + sys.path.insert(0, str(_candidate)) + +import boto3 # noqa: E402 +from bedrock_agentcore.payments.integrations.strands import ( # noqa: E402 + AgentCorePaymentsPlugin, + AgentCorePaymentsPluginConfig, +) +from bedrock_agentcore.runtime import BedrockAgentCoreApp # noqa: E402 +from strands import Agent, tool # noqa: E402 +from strands.models import BedrockModel # noqa: E402 +from strands_tools import http_request # noqa: E402 +from strands_tools.code_interpreter import AgentCoreCodeInterpreter # noqa: E402 + +from heurist_finance_agent.catalog import ( # noqa: E402 + format_catalog_for_prompt, + get_tools_for_agents, +) +from heurist_finance_agent.config import DEFAULT_HEURIST_AGENT_IDS # noqa: E402 + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# App +# --------------------------------------------------------------------------- +app = BedrockAgentCoreApp() + +# --------------------------------------------------------------------------- +# Configuration — read from environment at module startup +# --------------------------------------------------------------------------- +REGION = os.environ.get("AWS_REGION", "us-west-2") +MODEL_ID = os.environ.get( + "BEDROCK_MODEL_ID", "us.anthropic.claude-sonnet-4-20250514-v1:0" +) + +CI_ARTIFACTS_BUCKET = os.environ.get("CI_ARTIFACTS_BUCKET", "") +CI_ARTIFACTS_PREFIX = os.environ.get( + "CI_ARTIFACTS_PREFIX", "heurist-finance-artifacts" +).rstrip("/") +CI_ARTIFACTS_TTL = int(os.environ.get("CI_ARTIFACTS_TTL", "3600")) + +_raw_agent_ids = os.environ.get("HEURIST_AGENT_IDS", "") +HEURIST_AGENT_IDS: tuple[str, ...] = ( + tuple(a.strip() for a in _raw_agent_ids.split(",") if a.strip()) + if _raw_agent_ids + else DEFAULT_HEURIST_AGENT_IDS +) + +# --------------------------------------------------------------------------- +# Module-level service clients +# (boto3 clients are thread-safe and reuse the container's IAM execution role) +# --------------------------------------------------------------------------- +_CI_CLIENT = AgentCoreCodeInterpreter(region=REGION, session_name="runtime-init") +_S3_CLIENT = boto3.client("s3", region_name=REGION) + +# --------------------------------------------------------------------------- +# Heurist catalog — loaded from the pre-bundled cache at startup. +# refresh=False avoids calling get_config() which requires payment env vars +# that are intentionally absent from the container. +# --------------------------------------------------------------------------- +try: + _heurist_tools = get_tools_for_agents(HEURIST_AGENT_IDS, refresh=False) + _catalog_ref = format_catalog_for_prompt(_heurist_tools) + logger.info("Loaded %d Heurist tools from catalog cache.", len(_heurist_tools)) +except Exception as _e: + logger.warning("Could not load Heurist catalog at startup: %s", _e) + _catalog_ref = ( + "(catalog unavailable — sync_registry was not run before image build)" + ) + +# --------------------------------------------------------------------------- +# Per-invocation state (thread-local for concurrent request isolation) +# --------------------------------------------------------------------------- +_invocation = threading.local() + + +def _artifacts() -> list[dict]: + """Return the artifact list for the current invocation thread.""" + if not hasattr(_invocation, "artifacts"): + _invocation.artifacts = [] + return _invocation.artifacts + + +def _session_name() -> str: + """Return (or lazily create) the Code Interpreter session name for this invocation.""" + if not hasattr(_invocation, "session_name"): + _invocation.session_name = f"heurist-{uuid.uuid4().hex[:12]}" + return _invocation.session_name + + +def _reset_invocation_state() -> None: + """Reset thread-local state at the start of each invocation.""" + _invocation.artifacts = [] + _invocation.session_name = f"heurist-{uuid.uuid4().hex[:12]}" + + +# --------------------------------------------------------------------------- +# CI result extraction helpers +# (mirrors the logic in artifact_export.py without the local-disk dependency) +# --------------------------------------------------------------------------- + + +def _extract_ci_text(tool_result: dict) -> str: + """Extract the printed text output from a Code Interpreter tool result. + + AgentCore Code Interpreter wraps output in: + {"content": [{"text": "", ...}]} + where the inner text is a Python list-of-dicts literal, and the first + element's "text" key holds the actual printed output from the code. + """ + content = tool_result.get("content", []) + if not content: + raise ValueError("Code Interpreter returned empty content") + text_blob = content[0].get("text", "") + if not text_blob: + raise ValueError("Code Interpreter returned no text") + try: + parsed = ast.literal_eval(text_blob) + return parsed[0]["text"] + except Exception: + # Some CI versions return plain text directly + return text_blob + + +# --------------------------------------------------------------------------- +# Artifact tools +# --------------------------------------------------------------------------- + + +def _safe_s3_key_name(raw: str) -> str: + """Return a safe S3 key filename component.""" + name = Path(raw).name + name = re.sub(r"[^A-Za-z0-9._-]", "_", name).strip("._") + return name or "artifact" + + +@tool +def export_artifact_to_s3( + remote_path: str, artifact_name: str | None = None +) -> dict[str, Any]: + """Export a file from the AgentCore Code Interpreter sandbox to S3. + + Use this after creating a chart (PNG), CSV, or any file in the Code + Interpreter session. Returns a presigned URL the caller can download. + + If S3 is not configured (CI_ARTIFACTS_BUCKET not set), returns an error + with a suggestion to represent the data as a markdown table instead. + + Args: + remote_path: Path to the file inside the CI sandbox (e.g. "/tmp/chart.png") + artifact_name: Optional override for the output filename + """ + if not CI_ARTIFACTS_BUCKET: + return { + "error": "S3 artifact storage is not configured (CI_ARTIFACTS_BUCKET not set).", + "suggestion": ( + "Represent charts as markdown tables using the underlying data. " + "Use save_report_to_s3 for text/CSV content, which returns it inline." + ), + } + + sn = _session_name() + export_code = f""" +import base64, json, mimetypes +from pathlib import Path +p = Path({remote_path!r}) +if not p.exists(): + raise FileNotFoundError(f"File not found in CI sandbox: {{str(p)}}") +print(json.dumps({{ + "name": p.name, + "mime_type": mimetypes.guess_type(str(p))[0] or "application/octet-stream", + "b64": base64.b64encode(p.read_bytes()).decode(), + "size": p.stat().st_size, +}})) +""" + ci_result = _CI_CLIENT.code_interpreter( + { + "action": { + "type": "executeCode", + "session_name": sn, + "language": "python", + "code": export_code, + } + } + ) + + try: + payload = json.loads(_extract_ci_text(ci_result)) + except Exception as exc: + return {"error": f"Could not parse CI export output: {exc}"} + + if "b64" not in payload: + return {"error": f"Unexpected CI payload — missing b64 field: {payload}"} + + file_bytes = base64.b64decode(payload["b64"]) + safe_name = _safe_s3_key_name(artifact_name or payload.get("name", "artifact")) + s3_key = f"{CI_ARTIFACTS_PREFIX}/{sn}/{safe_name}" + + _S3_CLIENT.put_object( + Bucket=CI_ARTIFACTS_BUCKET, + Key=s3_key, + Body=file_bytes, + ContentType=payload.get("mime_type", "application/octet-stream"), + ) + + url = _S3_CLIENT.generate_presigned_url( + "get_object", + Params={"Bucket": CI_ARTIFACTS_BUCKET, "Key": s3_key}, + ExpiresIn=CI_ARTIFACTS_TTL, + ) + + artifact = { + "name": safe_name, + "url": url, + "s3_key": s3_key, + "size_bytes": len(file_bytes), + "mime_type": payload.get("mime_type", "application/octet-stream"), + "expires_in": CI_ARTIFACTS_TTL, + } + _artifacts().append(artifact) + + logger.info( + "Exported artifact %s → s3://%s/%s", safe_name, CI_ARTIFACTS_BUCKET, s3_key + ) + return { + "status": "success", + "name": safe_name, + "url": url, + "expires_in": CI_ARTIFACTS_TTL, + } + + +@tool +def save_report_to_s3(content: str, filename: str) -> dict[str, Any]: + """Save a text report (markdown, CSV, JSON) to S3 and return a presigned URL. + + Use this for structured text output — financial summaries, data tables, + model outputs. For binary files produced in the Code Interpreter sandbox, + use export_artifact_to_s3 instead. + + If S3 is not configured, the content is returned inline. + + Args: + content: The text content to save + filename: Desired filename (e.g. "macro_summary.md", "prices.csv") + """ + if not CI_ARTIFACTS_BUCKET: + # Graceful degradation: return inline + return { + "status": "inline", + "note": "S3 not configured — content returned inline.", + "filename": filename, + "content": content, + } + + safe_name = _safe_s3_key_name(filename) + s3_key = f"{CI_ARTIFACTS_PREFIX}/{_session_name()}/{safe_name}" + + content_type = "text/plain" + if safe_name.endswith(".md"): + content_type = "text/markdown" + elif safe_name.endswith(".csv"): + content_type = "text/csv" + elif safe_name.endswith(".json"): + content_type = "application/json" + elif safe_name.endswith(".html"): + content_type = "text/html" + + encoded = content.encode("utf-8") + _S3_CLIENT.put_object( + Bucket=CI_ARTIFACTS_BUCKET, + Key=s3_key, + Body=encoded, + ContentType=content_type, + ) + + url = _S3_CLIENT.generate_presigned_url( + "get_object", + Params={"Bucket": CI_ARTIFACTS_BUCKET, "Key": s3_key}, + ExpiresIn=CI_ARTIFACTS_TTL, + ) + + artifact = { + "name": safe_name, + "url": url, + "s3_key": s3_key, + "size_bytes": len(encoded), + "mime_type": content_type, + "expires_in": CI_ARTIFACTS_TTL, + } + _artifacts().append(artifact) + + logger.info("Saved report %s → s3://%s/%s", safe_name, CI_ARTIFACTS_BUCKET, s3_key) + return { + "status": "success", + "name": safe_name, + "url": url, + "expires_in": CI_ARTIFACTS_TTL, + } + + +@tool +def list_invocation_artifacts() -> dict[str, Any]: + """List all artifacts exported to S3 during this invocation. + + Call this to verify what has been exported before composing the final response. + """ + arts = _artifacts() + return { + "count": len(arts), + "artifacts": [ + {"name": a["name"], "url": a["url"], "expires_in": a["expires_in"]} + for a in arts + ], + } + + +# --------------------------------------------------------------------------- +# System prompt builder (invocation-specific: includes CI session name) +# --------------------------------------------------------------------------- + + +def _build_system_prompt(ci_session: str) -> str: + s3_instructions = ( + ( + f"- Charts/images: save to `/tmp/` inside the CI session, then call " + f"`export_artifact_to_s3` with that path to upload to S3 and get a download URL.\n" + f"- Text reports/CSVs: call `save_report_to_s3` directly — no need to write to CI first.\n" + f"- Presigned URLs are valid for {CI_ARTIFACTS_TTL} seconds.\n" + f"- After exporting, include the URL in your response so the caller can access the file." + ) + if CI_ARTIFACTS_BUCKET + else ( + "- S3 artifact storage is not configured in this deployment.\n" + "- Represent all chart data as markdown tables using the underlying numbers.\n" + "- Use `save_report_to_s3` for text content — it will return the content inline." + ) + ) + + return f"""You are a finance research and data visualization agent. + +You have access to paid financial data endpoints via the Heurist network. Use the +`http_request` tool to call the endpoint URLs listed below. All endpoints accept POST +requests with JSON bodies. + +**Payment is handled automatically.** When an endpoint returns HTTP 402, the system +settles USDC on-chain and retries the request. You do not need to handle payments. + +{_catalog_ref} + +## Working Rules + +- Use http_request for all Heurist endpoint calls. Always method="POST", params as JSON body. +- Parallelize data fetches when possible — multiple http_request calls in the same round. +- Use AgentCore Code Interpreter for pandas/matplotlib analysis. +- Never fabricate data. Only use values returned by tools. +- If a tool call fails, report the error and stop. + +## Code Interpreter — session: `{ci_session}` + +**Session lifecycle** +- Start with `initSession` if the session is not initialized. +- Use `writeFiles` to pass datasets into the sandbox as JSON/CSV files. +- Use `executeCode` for analysis and charting. +- The session is private to this invocation and auto-expires. + +**Artifact export** +{s3_instructions} + +**CI action examples:** +- Init: `{{"action": {{"type": "initSession", "session_name": "{ci_session}", "description": "analysis"}}}}` +- Write: `{{"action": {{"type": "writeFiles", "session_name": "{ci_session}", "content": [{{"path": "data.json", "text": "{{...}}"}}]}}}}` +- Execute: `{{"action": {{"type": "executeCode", "session_name": "{ci_session}", "language": "python", "code": "import pandas as pd; ..." }}}}` + +## Context +- Today: {datetime.now().strftime("%Y-%m-%d")} +- Region: {REGION} +- S3 artifacts: {"enabled (bucket: " + CI_ARTIFACTS_BUCKET + ")" if CI_ARTIFACTS_BUCKET else "not configured — text/table output only"} +""" + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + + +@app.entrypoint +def handle_request(payload: dict, context=None) -> dict: + """Handle an invocation from the app backend. + + The app backend creates a payment session with an appropriate budget before + invoking. Session ID and instrument ID are passed in the payload — the agent + cannot create or modify sessions (enforced at the IAM level). + + Required payload fields: + prompt (str) — the research request + payment_manager_arn (str) — ARN of the Payment Manager + user_id (str) — user identity for payment isolation + payment_session_id (str) — active session with a spending limit + payment_instrument_id (str) — funded embedded wallet + + Optional payload fields: + bedrock_model_id (str) — per-invocation model override + + Returns: + { + "response": "", + "artifacts": [{"name": "...", "url": "...", "expires_in": }, ...] + } + or {"error": ""} on bad payload. + """ + # Reset thread-local state for this invocation + _reset_invocation_state() + ci_session = _session_name() + + # ----------------------------------------------------------------------- + # Unwrap the agentcore invoke double-wrapping: + # `agentcore invoke '{"key": "val"}'` → payload = {"prompt": '{"key":"val"}'} + # ----------------------------------------------------------------------- + raw_prompt = payload.get("prompt", "") + if isinstance(raw_prompt, str) and raw_prompt.strip().startswith("{"): + try: + inner = json.loads(raw_prompt) + if isinstance(inner, dict) and "payment_manager_arn" in inner: + payload = inner + except json.JSONDecodeError: + pass + + prompt = payload.get("prompt", "").strip() + payment_manager_arn = payload.get("payment_manager_arn", "").strip() + user_id = payload.get("user_id", "").strip() + session_id = payload.get("payment_session_id", "").strip() + instrument_id = payload.get("payment_instrument_id", "").strip() + + missing = [ + name + for name, val in [ + ("prompt", prompt), + ("payment_manager_arn", payment_manager_arn), + ("user_id", user_id), + ("payment_session_id", session_id), + ("payment_instrument_id", instrument_id), + ] + if not val + ] + if missing: + return {"error": f"Missing required payload fields: {', '.join(missing)}"} + + model_id = payload.get("bedrock_model_id", MODEL_ID) + + # ----------------------------------------------------------------------- + # Build agent for this invocation + # ----------------------------------------------------------------------- + payment_plugin = AgentCorePaymentsPlugin( + config=AgentCorePaymentsPluginConfig( + payment_manager_arn=payment_manager_arn, + user_id=user_id, + payment_instrument_id=instrument_id, + payment_session_id=session_id, + region=REGION, + ) + ) + + model = BedrockModel( + boto_session=boto3.Session(region_name=REGION), + model_id=model_id, + streaming=True, + temperature=0, + ) + + agent = Agent( + system_prompt=_build_system_prompt(ci_session), + model=model, + tools=[ + http_request, + _CI_CLIENT.code_interpreter, + export_artifact_to_s3, + save_report_to_s3, + list_invocation_artifacts, + ], + plugins=[payment_plugin], + ) + + result = agent(prompt) + + # Extract response text + content = result.message.get("content", []) + text = next( + ( + block.get("text", "") + for block in content + if isinstance(block, dict) and "text" in block + ), + str(result), + ) + + return { + "response": text, + "artifacts": [ + {"name": a["name"], "url": a["url"], "expires_in": a["expires_in"]} + for a in _artifacts() + ], + } + + +if __name__ == "__main__": + app.run() diff --git a/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/scripts/__init__.py b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/scripts/__init__.py new file mode 100644 index 000000000..eaee13330 --- /dev/null +++ b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/scripts/__init__.py @@ -0,0 +1 @@ +"""Support scripts for registry sync and autonomous agent runs.""" diff --git a/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/scripts/sync_registry.py b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/scripts/sync_registry.py new file mode 100644 index 000000000..a271446db --- /dev/null +++ b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/heurist_finance_agent/scripts/sync_registry.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 +"""Fetch the live Heurist catalog and refresh the local cache. + +Usage: + python -m heurist_finance_agent.scripts.sync_registry +""" + +from __future__ import annotations + +import sys +from pathlib import Path + +if __package__ in (None, ""): + sys.path.insert(0, str(Path(__file__).resolve().parents[2])) + +from heurist_finance_agent.catalog import fetch_live_catalog, get_tools_for_agents +from heurist_finance_agent.config import LIVE_CATALOG_CACHE_PATH, get_config + + +def main() -> None: + cfg = get_config() + catalog = fetch_live_catalog() + selected_tools = get_tools_for_agents(cfg.heurist_tool_agent_ids, refresh=False) + print(f"Saved live catalog cache to {LIVE_CATALOG_CACHE_PATH}") + print(f"Catalog agents: {catalog['count']}") + print(f"Selected agents: {', '.join(cfg.heurist_tool_agent_ids)}") + print(f"Selected tools: {len(selected_tools)}") + + +if __name__ == "__main__": + main() diff --git a/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/pay-for-data.ipynb b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/pay-for-data.ipynb new file mode 100644 index 000000000..2670356f4 --- /dev/null +++ b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/pay-for-data.ipynb @@ -0,0 +1,675 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Pay for Data — Heurist Finance Agent\n", + "\n", + "## Overview\n", + "\n", + "A finance research agent that pays for real-time market data using **Amazon Bedrock AgentCore payments**. The agent calls paid [Heurist](https://heurist.xyz) endpoints for live prices, SEC filings, and macro indicators, analyzes the data with AgentCore Code Interpreter, and returns charts and reports as S3 presigned URLs — all without any manual payment code in the tools.\n", + "\n", + "The agent is deployed to **AgentCore Runtime**: a managed container endpoint with HTTPS invocation, SigV4 auth, and automatic observability via CloudWatch.\n", + "\n", + "### Use Case Details\n", + "\n", + "| Information | Details |\n", + "|:---|:---|\n", + "| Use case type | Agentic data retrieval with autonomous micropayments |\n", + "| Agent type | Single |\n", + "| Payment protocol | x402 (HTTP 402 Payment Required) |\n", + "| Agentic framework | [Strands Agents](https://strandsagents.com/) |\n", + "| LLM model | Claude Sonnet 4 on Amazon Bedrock (configurable) |\n", + "| SDK used | `bedrock-agentcore[strands-agents]` (public PyPI) |\n", + "| Wallet type | Embedded crypto wallet (Coinbase CDP) |\n", + "| Payment network | Base mainnet (USDC) |\n", + "\n", + "### Architecture\n", + "\n", + "```\n", + "App Backend (ManagementRole) AgentCore Runtime\n", + " | +------------------------------+\n", + " | create_session(budget=$X) | runtime_agent.py |\n", + " | | BedrockAgentCoreApp |\n", + " |-- invoke(manager_arn, session_id, --> | + AgentCorePaymentsPlugin |\n", + " | instrument_id, prompt) | |\n", + " | | http_request -> 402 |\n", + " |<-- {response, artifacts: [{url}]} --- | -> ProcessPayment -> retry |\n", + " | | -> Code Interpreter |\n", + " | get_session(check spend) | -> export to S3 |\n", + " +------------------------------+\n", + " |\n", + " v\n", + " CloudWatch GenAI Observability\n", + " (automatic via OpenTelemetry)\n", + "```\n", + "\n", + "### AgentCore Capabilities Demonstrated\n", + "\n", + "| Capability | How it is used here |\n", + "|:---|:---|\n", + "| **Payment manager** | Central resource that authorizes and tracks all payment activity. |\n", + "| **Payment instrument** | An embedded crypto wallet (Coinbase CDP, USDC on Base). |\n", + "| **Payment session** | A time-bounded, budget-capped authorization (`maxSpendAmount`). |\n", + "| **Payment processing** | End-to-end x402 negotiation, proof generation, retry, and on-chain settlement. |\n", + "| **AgentCore Runtime** | Managed container hosting with HTTPS endpoint and SigV4 auth. |\n", + "| **AgentCore Code Interpreter** | Remote sandboxed Python environment for pandas/matplotlib analysis. |\n", + "| **Observability** | Automatic OTel traces + logs in CloudWatch GenAI dashboard. |\n", + "\n", + "### Notebook Flow\n", + "\n", + "| Step | What happens |\n", + "|------|-------------|\n", + "| 1 | Configure credentials and confirm AWS identity |\n", + "| 2 | Sync the Heurist tool catalog (bundled in the container image) |\n", + "| 3 | Create the S3 artifacts bucket |\n", + "| 4 | Install the AgentCore CLI, scaffold and deploy |\n", + "| 5 | Add IAM permissions to the execution role |\n", + "| 6 | Invoke the deployed agent and inspect results |\n", + "| 7 | View observability traces in CloudWatch |\n", + "| 8 | Cleanup |\n", + "\n", + "**Before running:**\n", + "1. `pip install -r requirements.txt`\n", + "2. `cp .env.example .env` and fill in your credentials (payment manager ARN, instrument ID, session ID)\n", + "3. Ensure Node.js 20+, Docker, and AWS CDK are installed\n", + "\n", + "See [`README.md`](README.md) for full setup details." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Install dependencies" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%pip install -r requirements.txt --quiet" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 1 — Configure credentials\n", + "\n", + "Load credentials from `.env` and confirm AWS identity. Copy `.env.example` to `.env`\n", + "and fill in the values from the setup tutorial (`00-getting-started/`)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import boto3\n", + "import json\n", + "from heurist_finance_agent.config import get_config\n", + "\n", + "cfg = get_config()\n", + "print(f\"Region: {cfg.aws_region}\")\n", + "print(f\"Payment manager: {cfg.payment_manager_arn}\")\n", + "print(f\"Payment session: {cfg.payment_session_id}\")\n", + "print(f\"Payment instrument: {cfg.payment_instrument_id}\")\n", + "print(f\"Model: {cfg.bedrock_model_id}\")\n", + "\n", + "session = boto3.Session()\n", + "identity = session.client('sts').get_caller_identity()\n", + "account_id = identity['Account']\n", + "region = cfg.aws_region\n", + "print(f\"\\nAuthenticated as: {identity['Arn']}\")\n", + "print(f\"Account: {account_id}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 2 — Sync the Heurist tool catalog\n", + "\n", + "Fetches the current registry of x402-enabled endpoints from the Heurist mesh and caches\n", + "it locally. The Runtime container image bundles this cache at build time — the deployed\n", + "agent reads the catalog without calling the Heurist registry at startup.\n", + "\n", + "Re-run this cell before deploying to ensure the container has a current catalog." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from heurist_finance_agent.catalog import fetch_live_catalog, get_tools_for_agents\n", + "\n", + "catalog = fetch_live_catalog()\n", + "selected = get_tools_for_agents(cfg.heurist_tool_agent_ids)\n", + "\n", + "print(f\"Agents in registry: {catalog['count']}\")\n", + "print(f\"Selected agents: {', '.join(cfg.heurist_tool_agent_ids)}\")\n", + "print(f\"Loaded paid tools: {len(selected)}\")\n", + "print()\n", + "for t in selected:\n", + " print(f\" {t['agent_id']:30s} {t['tool_name']:35s} ${t['price_usd']:.3f}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 3 — Create the S3 artifacts bucket\n", + "\n", + "Charts, reports, and CSVs produced by the agent in Code Interpreter are uploaded to S3.\n", + "The agent returns presigned download URLs in the invocation response — valid for\n", + "`CI_ARTIFACTS_TTL` seconds (default: 1 hour).\n", + "\n", + "The bucket is private; no public access is granted. Skip this cell if you already have\n", + "a bucket — just set `ARTIFACTS_BUCKET` to its name." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ARTIFACTS_BUCKET = f\"heurist-finance-artifacts-{account_id}-{region}\"\n", + "\n", + "s3 = boto3.client('s3', region_name=region)\n", + "try:\n", + " if region == 'us-east-1':\n", + " s3.create_bucket(Bucket=ARTIFACTS_BUCKET)\n", + " else:\n", + " s3.create_bucket(\n", + " Bucket=ARTIFACTS_BUCKET,\n", + " CreateBucketConfiguration={'LocationConstraint': region},\n", + " )\n", + " s3.put_public_access_block(\n", + " Bucket=ARTIFACTS_BUCKET,\n", + " PublicAccessBlockConfiguration={\n", + " 'BlockPublicAcls': True, 'IgnorePublicAcls': True,\n", + " 'BlockPublicPolicy': True, 'RestrictPublicBuckets': True,\n", + " },\n", + " )\n", + " print(f'Created bucket: {ARTIFACTS_BUCKET}')\n", + "except s3.exceptions.BucketAlreadyOwnedByYou:\n", + " print(f'Bucket already exists: {ARTIFACTS_BUCKET}')\n", + "\n", + "print(f'Artifacts will be stored at: s3://{ARTIFACTS_BUCKET}/heurist-finance-artifacts/')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 4 — Deploy to AgentCore Runtime\n", + "\n", + "The `@aws/agentcore` CLI scaffolds a project, builds a Docker image, pushes it to ECR,\n", + "and deploys via CDK. First deploy takes ~3–5 minutes.\n", + "\n", + "> **Prerequisites:** Node.js 20+, Docker running, AWS CDK installed\n", + ">\n", + "> **Cost notice:** This creates billable AWS resources (ECR, Runtime endpoint, CloudWatch logs).\n", + "> Run the cleanup section when finished." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!npm install -g @aws/agentcore\n", + "!agentcore --version" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import shutil\n", + "\n", + "PROJECT_NAME = 'HeuristFinanceAgent'\n", + "\n", + "# Scaffold\n", + "if not os.path.exists(PROJECT_NAME):\n", + " !agentcore create --name {PROJECT_NAME} --framework Strands --protocol HTTP --model-provider Bedrock --memory none\n", + "else:\n", + " print(f'{PROJECT_NAME}/ already exists — skipping create')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dest_app = f'{PROJECT_NAME}/app/{PROJECT_NAME}'\n", + "\n", + "# Copy runtime_agent.py as the entry point\n", + "shutil.copy('heurist_finance_agent/runtime_agent.py', f'{dest_app}/main.py')\n", + "print(f'Copied runtime_agent.py -> {dest_app}/main.py')\n", + "\n", + "# Copy the heurist_finance_agent package (includes catalog cache)\n", + "pkg_dest = f'{dest_app}/heurist_finance_agent'\n", + "if os.path.exists(pkg_dest):\n", + " shutil.rmtree(pkg_dest)\n", + "shutil.copytree('heurist_finance_agent', pkg_dest)\n", + "print(f'Copied heurist_finance_agent/ -> {pkg_dest}/')\n", + "\n", + "cache_in_image = os.path.join(pkg_dest, 'catalog_live_cache.json')\n", + "if os.path.exists(cache_in_image):\n", + " print('catalog_live_cache.json bundled in image')\n", + "else:\n", + " print('WARNING: catalog_live_cache.json missing — re-run Step 2 first')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Write pyproject.toml — includes Code Interpreter\n", + "# (Code Interpreter is a remote AWS API; it works identically from Runtime containers)\n", + "# aws-opentelemetry-distro enables botocore auto-instrumentation so Code Interpreter\n", + "# calls are stitched into the Runtime trace via W3C traceparent header propagation.\n", + "# See: https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/observability-configure.html\n", + "pyproject_content = f'''[project]\n", + "name = \"heurist-finance-agent\"\n", + "version = \"0.1.0\"\n", + "requires-python = \">=3.11\"\n", + "dependencies = [\n", + " \"bedrock-agentcore[strands-agents]>=1.9.0\",\n", + " \"boto3>=1.43.1\",\n", + " \"strands-agents>=1.36.0\",\n", + " \"strands-agents-tools[http_request,agent_core_code_interpreter]>=0.5.0\",\n", + " \"aws-opentelemetry-distro>=0.10.0\",\n", + " \"requests>=2.32.0\",\n", + " \"python-dotenv>=1.0.0\",\n", + "]\n", + "'''\n", + "\n", + "# Write .env — service config only, no payment credentials\n", + "runtime_env = f'''# Runtime config bundled in container image\n", + "# Payment credentials are NOT here — they come from the invocation payload\n", + "CI_ARTIFACTS_BUCKET={ARTIFACTS_BUCKET}\n", + "CI_ARTIFACTS_PREFIX=heurist-finance-artifacts\n", + "CI_ARTIFACTS_TTL=3600\n", + "AWS_REGION={region}\n", + "BEDROCK_MODEL_ID={cfg.bedrock_model_id}\n", + "'''\n", + "\n", + "with open(f'{dest_app}/pyproject.toml', 'w') as f:\n", + " f.write(pyproject_content)\n", + "with open(f'{dest_app}/.env', 'w') as f:\n", + " f.write(runtime_env)\n", + "\n", + "lock_file = f'{dest_app}/uv.lock'\n", + "if os.path.exists(lock_file):\n", + " os.remove(lock_file)\n", + "\n", + "print('pyproject.toml and .env written')\n", + "print(f' CI_ARTIFACTS_BUCKET={ARTIFACTS_BUCKET}')\n", + "print(f' BEDROCK_MODEL_ID={cfg.bedrock_model_id}')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Deploy (~3-5 min first time)\n", + "!cd {PROJECT_NAME} && agentcore deploy -y" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!cd {PROJECT_NAME} && agentcore status" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 5 — Add permissions to the execution role\n", + "\n", + "The CLI auto-creates an execution role with Bedrock + CloudWatch permissions.\n", + "Add three more sets:\n", + "\n", + "1. **Payment data-plane** — `ProcessPayment` and read operations\n", + "2. **Code Interpreter** — `StartCodeInterpreterSession`, `InvokeCodeInterpreter`, `StopCodeInterpreterSession`\n", + "3. **S3 artifacts** — `PutObject` + `GetObject` scoped to the artifacts bucket\n", + "\n", + "The execution role cannot create payment sessions or instruments — that stays with the app backend (ManagementRole)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "iam = boto3.client('iam')\n", + "\n", + "# Find the execution role created by agentcore deploy\n", + "paginator = iam.get_paginator('list_roles')\n", + "runtime_roles = []\n", + "for page in paginator.paginate():\n", + " for role in page['Roles']:\n", + " name = role['RoleName']\n", + " if PROJECT_NAME.lower().replace('-', '') in name.lower().replace('-', '') and \\\n", + " ('execution' in name.lower() or 'exec' in name.lower()):\n", + " runtime_roles.append(name)\n", + "\n", + "if not runtime_roles:\n", + " for page in paginator.paginate():\n", + " for role in page['Roles']:\n", + " if PROJECT_NAME.lower() in role['RoleName'].lower():\n", + " runtime_roles.append(role['RoleName'])\n", + "\n", + "assert runtime_roles, f'No {PROJECT_NAME} execution role found. Check agentcore deploy output.'\n", + "RUNTIME_ROLE_NAME = runtime_roles[0]\n", + "print(f'Execution role: {RUNTIME_ROLE_NAME}')\n", + "\n", + "# 1. Payment data-plane\n", + "iam.put_role_policy(\n", + " RoleName=RUNTIME_ROLE_NAME,\n", + " PolicyName='PaymentDataPlaneAccess',\n", + " PolicyDocument=json.dumps({\n", + " 'Version': '2012-10-17',\n", + " 'Statement': [{\n", + " 'Sid': 'PaymentDataPlaneAccess',\n", + " 'Effect': 'Allow',\n", + " 'Action': [\n", + " 'bedrock-agentcore:ProcessPayment',\n", + " 'bedrock-agentcore:GetPaymentInstrument',\n", + " 'bedrock-agentcore:GetPaymentInstrumentBalance',\n", + " 'bedrock-agentcore:GetPaymentSession',\n", + " 'bedrock-agentcore:GetResourcePaymentToken',\n", + " ],\n", + " 'Resource': f'arn:aws:bedrock-agentcore:{region}:{account_id}:payment-manager/*',\n", + " }],\n", + " }),\n", + ")\n", + "print('Payment data-plane permissions added')\n", + "\n", + "# 2. Code Interpreter\n", + "iam.put_role_policy(\n", + " RoleName=RUNTIME_ROLE_NAME,\n", + " PolicyName='CodeInterpreterAccess',\n", + " PolicyDocument=json.dumps({\n", + " 'Version': '2012-10-17',\n", + " 'Statement': [{\n", + " 'Sid': 'CodeInterpreterAccess',\n", + " 'Effect': 'Allow',\n", + " 'Action': [\n", + " 'bedrock-agentcore:StartCodeInterpreterSession',\n", + " 'bedrock-agentcore:StopCodeInterpreterSession',\n", + " 'bedrock-agentcore:InvokeCodeInterpreter',\n", + " ],\n", + " 'Resource': f'arn:aws:bedrock-agentcore:{region}:{account_id}:code-interpreter/*',\n", + " }],\n", + " }),\n", + ")\n", + "print('Code Interpreter permissions added')\n", + "\n", + "# 3. S3 artifacts\n", + "iam.put_role_policy(\n", + " RoleName=RUNTIME_ROLE_NAME,\n", + " PolicyName='S3ArtifactsAccess',\n", + " PolicyDocument=json.dumps({\n", + " 'Version': '2012-10-17',\n", + " 'Statement': [{\n", + " 'Sid': 'S3ArtifactsReadWrite',\n", + " 'Effect': 'Allow',\n", + " 'Action': ['s3:PutObject', 's3:GetObject'],\n", + " 'Resource': f'arn:aws:s3:::{ARTIFACTS_BUCKET}/heurist-finance-artifacts/*',\n", + " }],\n", + " }),\n", + ")\n", + "print(f'S3 artifact permissions added (bucket: {ARTIFACTS_BUCKET})')\n", + "\n", + "print('\\nPermissions summary:')\n", + "print(' Payment: ProcessPayment, GetPaymentInstrument, GetPaymentSession')\n", + "print(' Code Interpreter: StartSession, StopSession, InvokeCodeInterpreter')\n", + "print(' S3: PutObject + GetObject (artifacts bucket prefix only)')\n", + "print(' Not granted: CreateSession, CreateInstrument (stays with ManagementRole)')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 6 — Invoke the deployed agent\n", + "\n", + "Create a fresh payment session (the app backend controls the budget), then invoke\n", + "the deployed agent. The response includes the research summary and presigned S3 URLs\n", + "for any charts or reports the agent produced.\n", + "\n", + "```json\n", + "{\n", + " \"response\": \"\",\n", + " \"artifacts\": [\n", + " {\"name\": \"chart.png\", \"url\": \"https://...\", \"expires_in\": 3600}\n", + " ]\n", + "}\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from bedrock_agentcore.payments import PaymentManager\n", + "\n", + "manager = PaymentManager(payment_manager_arn=cfg.payment_manager_arn, region_name=region)\n", + "\n", + "invoke_session = manager.create_payment_session(\n", + " user_id=cfg.user_id,\n", + " limits={'maxSpendAmount': {'value': '0.25', 'currency': 'USD'}},\n", + " expiry_time_in_minutes=60,\n", + ")\n", + "session_id = invoke_session['paymentSessionId']\n", + "print(f'Payment session: {session_id} (budget: $0.25, expiry: 60 min)')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "invoke_payload = json.dumps({\n", + " 'prompt': (\n", + " 'Use FredMacroAgent to fetch the latest US GDP growth rate and unemployment rate. '\n", + " 'Use Code Interpreter to create a bar chart comparing them and a markdown summary. '\n", + " 'Save both as artifacts.'\n", + " ),\n", + " 'payment_manager_arn': cfg.payment_manager_arn,\n", + " 'user_id': cfg.user_id,\n", + " 'payment_session_id': session_id,\n", + " 'payment_instrument_id': cfg.payment_instrument_id,\n", + "})\n", + "\n", + "print(f'Invoking {PROJECT_NAME}...')\n", + "!cd {PROJECT_NAME} && agentcore invoke '{invoke_payload}'" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Check session spend\n", + "session_info = manager.get_payment_session(\n", + " user_id=cfg.user_id,\n", + " payment_session_id=session_id,\n", + ")\n", + "\n", + "available = session_info.get('availableLimits', {}).get('availableSpendAmount', {})\n", + "budget = session_info.get('limits', {}).get('maxSpendAmount', {})\n", + "budget_val = float(budget.get('value', 0))\n", + "avail_val = float(available.get('value', 0)) if available.get('value') else budget_val\n", + "spent = budget_val - avail_val\n", + "\n", + "print(f'Session spend summary:')\n", + "print(f' Budget: ${budget_val:.4f} {budget.get(\"currency\", \"USD\")}')\n", + "print(f' Remaining: ${avail_val:.4f} {available.get(\"currency\", \"USD\")}')\n", + "print(f' Spent: ${spent:.4f} USD')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 7 — Observability\n", + "\n", + "AgentCore Runtime automatically instruments the container with OpenTelemetry. Traces\n", + "and logs appear in **CloudWatch GenAI Observability** immediately after the first\n", + "invocation — no additional setup required.\n", + "\n", + "Each invocation produces a unified trace with spans for:\n", + "- **LLM calls** — model ID, token counts, latency\n", + "- **Tool calls** — `http_request` invocations including x402 retry attempts\n", + "- **Agent turns** — full prompt → tool use → response cycle\n", + "- **Code Interpreter calls** — `StartCodeInterpreterSession`, `InvokeCodeInterpreter`,\n", + " `StopCodeInterpreterSession` stitched as child spans via W3C `traceparent` header\n", + " propagation (enabled by `aws-opentelemetry-distro` botocore auto-instrumentation)\n", + "\n", + "The `AgentCorePaymentsPlugin` calls (`ProcessPayment`, `GetPaymentInstrument`) also\n", + "appear as boto3 child spans in the trace tree via botocore instrumentation.\n", + "\n", + "> **Note:** Payment manager vended logs (`ProcessPayment`, `CreateSession` events)\n", + "> are configured separately via `enable_observability()` in `utils.py`. See Tutorial 00\n", + "> Step 8a for that setup." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print('CloudWatch GenAI Observability dashboard:')\n", + "print(f' https://{region}.console.aws.amazon.com/cloudwatch/home?region={region}#gen-ai-observability/agent-core')\n", + "print()\n", + "print('Stream live logs:')\n", + "print(f' cd {PROJECT_NAME} && agentcore logs')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 8 — Cleanup\n", + "\n", + "> ⚠️ The following cells permanently delete the Runtime deployment and the S3 artifacts\n", + "> bucket. Download any artifacts you want to keep before proceeding." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Remove the AgentCore Runtime stack (ECR image, CloudWatch logs, CDK stack)\n", + "!cd {PROJECT_NAME} && agentcore remove all -y" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Remove the scaffolded project directory\n", + "import shutil\n", + "if os.path.exists(PROJECT_NAME):\n", + " shutil.rmtree(PROJECT_NAME)\n", + " print(f'Removed {PROJECT_NAME}/')\n", + "else:\n", + " print(f'{PROJECT_NAME}/ already removed')\n", + "\n", + "# Empty and delete the S3 artifacts bucket\n", + "import boto3\n", + "s3_resource = boto3.resource('s3')\n", + "bucket = s3_resource.Bucket(ARTIFACTS_BUCKET)\n", + "try:\n", + " bucket.objects.all().delete()\n", + " bucket.delete()\n", + " print(f'Deleted S3 bucket: {ARTIFACTS_BUCKET}')\n", + "except Exception as e:\n", + " print(f'Could not delete bucket: {e}')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### AWS resource cleanup\n", + "\n", + "Payment sessions expire automatically when `expiryTimeInMinutes` elapses — no manual\n", + "deletion required. To clean up the payment manager and payment instrument, refer to\n", + "the cleanup section of the setup tutorial (`00-getting-started/00-setup-agentcore-payments/`).\n", + "\n", + "---\n", + "\n", + "## Shared Responsibility\n", + "\n", + "| Responsibility | AWS | You |\n", + "|:---|:---:|:---:|\n", + "| Securing the AgentCore payments service infrastructure | ✅ | |\n", + "| Encrypting payment credentials at rest | ✅ | |\n", + "| Enforcing payment session limits at the service level | ✅ | |\n", + "| Settling on-chain transactions (Coinbase CDP) | ✅ | |\n", + "| Managing Runtime container compute and networking | ✅ | |\n", + "| Configuring IAM roles with least-privilege permissions | | ✅ |\n", + "| Setting appropriate `maxSpendAmount` payment limits | | ✅ |\n", + "| Protecting AWS credentials from exposure | | ✅ |\n", + "| Funding the payment instrument with sufficient USDC | | ✅ |\n", + "| Monitoring agent spend and session usage | | ✅ |\n", + "| Validating prompts to prevent prompt injection | | ✅ |\n", + "| Reviewing Heurist endpoint terms of service | | ✅ |\n", + "\n", + "> **Security note:** Never commit `.env` files or payment credentials to source control.\n", + "> Use AWS Secrets Manager for production credentials.\n", + "> Payment sessions are time-bounded and budget-capped — set conservative `maxSpendAmount`\n", + "> limits when running agents in automated or unattended contexts." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.11" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/requirements.txt b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/requirements.txt new file mode 100644 index 000000000..881d656ae --- /dev/null +++ b/01-tutorials/13-AgentCore-payments/02-use-cases/pay-for-data/requirements.txt @@ -0,0 +1,7 @@ +bedrock-agentcore[strands-agents]==1.9.0 +strands-agents==1.36.0 +strands-agents-tools[http_request,agent_core_code_interpreter]==0.5.0 +boto3==1.43.1 +botocore==1.43.3 +requests>=2.32.0 +python-dotenv>=1.0.0 diff --git a/01-tutorials/13-AgentCore-payments/README.md b/01-tutorials/13-AgentCore-payments/README.md index a33725a5e..aa93b8046 100644 --- a/01-tutorials/13-AgentCore-payments/README.md +++ b/01-tutorials/13-AgentCore-payments/README.md @@ -27,6 +27,7 @@ Real-world use cases that demonstrate AgentCore payments end-to-end. See [02-use | Use case | What it demonstrates | |---|---| | [Pay for Content (Browser Use)](02-use-cases/pay-for-content-browser-use/) | A Strands agent uses AgentCore Browser Tool to navigate a paywalled website, reads the embedded x402 requirement from the page DOM, calls `ProcessPayment` to generate a USDC proof, and returns the unlocked content — all without human involvement in the payment step. Includes a deployable CDK content-provider stack. | +| [Pay for Data (Heurist)](02-use-cases/pay-for-data/) | A Strands finance-research agent calls paid [Heurist](https://heurist.xyz) x402 endpoints for live prices, SEC filings, and macro indicators. The `AgentCorePaymentsPlugin` handles HTTP 402 payment processing end-to-end, and AgentCore Code Interpreter analyzes the data and exports charts and reports. Uses the Coinbase CDP embedded wallet; targets x402 on Base mainnet. | ## Prerequisites