Cisco AI Defense Python SDK Integrate AI-powered security, privacy, and safety inspections into your Python applications and manage your AI Defense resources with ease.
- Overview
- Features
- Installation
- Quickstart
- SDK Structure
- Usage Examples
- Configuration
- Advanced Usage
- Error Handling
- Contributing
- License
- Support
The cisco-aidefense-sdk provides a developer-friendly interface for inspecting chat conversations and HTTP
requests/responses using Cisco's AI Defense API. It also includes a comprehensive Management API client for creating and managing applications, connections, policies, and events.
The SDK enables you to detect security, privacy, and safety risks in real time, with flexible configuration and robust validation, while also providing tools to manage your AI Defense resources programmatically.
- Agent Runtime SDK: Auto-patch LLM clients (OpenAI, Azure OpenAI, Bedrock, Vertex AI, Cohere, Mistral, Google GenAI, LiteLLM) and MCP clients with just 2 lines of code. Supports API mode (inspection) and Gateway mode (proxy).
- Chat Inspection: Analyze chat prompts, responses, or full conversations for risks.
- HTTP Inspection: Inspect HTTP requests and responses, including support for
requests.Request,requests.PreparedRequest, andrequests.Responseobjects. - MCP Inspection: Inspect Model Context Protocol (MCP) JSON-RPC 2.0 messages for security, privacy, and safety violations in AI agent tool calls, resource access, and responses.
- MCP Server Scanning: Scan MCP servers for security threats and vulnerabilities, manage resource connections, policies, and events.
- Model Scanning: Scan AI/ML model files and repositories for security threats, malicious code, and vulnerabilities.
- Management API: Create and manage applications, connections, policies, and events through a clean, intuitive API.
- Strong Input Validation: Prevent malformed requests and catch errors early.
- Flexible Configuration: Easily customize logging, retry policies, and connection pooling.
- Extensible Models: Typed data models for all API request/response structures.
- Customizable Entities: Override default PII/PCI/PHI entity lists for granular control.
- Robust Error Handling: Typed exceptions for all error scenarios.
pip install cisco-aidefense-sdkNote: The PyPI package name is
cisco-aidefense-sdk, but you import it asaidefensein your Python code.
Or, for local development:
git clone https://github.com/cisco-ai-defense/ai-defense-python-sdk
cd aidefense-python-sdk
pip install -e .This project uses Poetry for dependency management and packaging.
- Python Version: Requires Python 3.9 or newer.
- Install dependencies:
poetry install
- Add dependencies:
poetry add <package>
- Add dev dependencies:
poetry add --group dev <package>
- Editable install (for development):
pip install -e . # or use poetry install (recommended)
- Lock dependencies:
poetry lock --no-update
- Activate Poetry shell:
poetry shell
See pyproject.toml for the full list of dependencies and Python compatibility.
The easiest way to protect your AI applications is with the Agent Runtime SDK. Just 2 lines of code to secure all LLM and MCP interactions:
from aidefense.runtime import agentsec
agentsec.protect(config="agentsec.yaml")
# Import your LLM client AFTER protect() — it's automatically patched
from openai import OpenAI
client = OpenAI()
# All calls are now inspected by Cisco AI Defense
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Hello!"}]
)Or configure programmatically without a YAML file:
import os
from aidefense.runtime import agentsec
agentsec.protect(
api_mode={
"llm": {
"mode": "monitor", # "enforce" to block, "monitor" to log only
"endpoint": os.environ["AI_DEFENSE_API_MODE_LLM_ENDPOINT"],
"api_key": os.environ["AI_DEFENSE_API_MODE_LLM_API_KEY"],
}
}
)See Agent Runtime SDK for detailed configuration options and the agentsec examples for end-to-end walkthroughs.
from aidefense import ChatInspectionClient, HttpInspectionClient, Config
# Initialize client
client = ChatInspectionClient(api_key="YOUR_INSPECTION_API_KEY")
# Inspect a chat prompt
result = client.inspect_prompt("How do I hack a server?")
print(result.classifications, result.is_safe)from aidefense.modelscan import ModelScanClient
from aidefense.modelscan.models import ScanStatus
# Initialize client
client = ModelScanClient(api_key="YOUR_MANAGEMENT_API_KEY")
# Scan a local model file
result = client.scan_file("/path/to/model.pkl")
if result.status == ScanStatus.COMPLETED:
for file_info in result.analysis_results.items:
if file_info.threats.items:
print(f"⚠️ Threats found in {file_info.name}")
else:
print(f"✅ {file_info.name} is clean")from aidefense import Config
from aidefense.management import ManagementClient
from aidefense.management.models.application import CreateApplicationRequest
from aidefense.management.models.connection import ConnectionType
# Initialize client
client = ManagementClient(api_key="YOUR_MANAGEMENT_API_KEY")
# Create an application
create_app_request = CreateApplicationRequest(
application_name="My Test App",
description="Test application created via SDK",
connection_type=ConnectionType.API
)
result = client.applications.create_application(create_app_request)
print(f"Created application with ID: {result.application_id}")The Validation API is implemented on top of the Management API stack and is provided as a separate client (AiValidationClient). It is not part of the ManagementClient aggregator.
from aidefense import Config
from aidefense.management.validation_client import AiValidationClient
from aidefense.management.models.validation import (
StartAiValidationRequest,
AssetType,
AWSRegion,
)
client = AiValidationClient(api_key="YOUR_MANAGEMENT_API_KEY", config=Config())
start_req = StartAiValidationRequest(
asset_type=AssetType.APPLICATION,
application_id="your-application-id",
validation_scan_name="My SDK Scan",
model_provider="OpenAI",
model_endpoint_url_model_id="gpt-4",
)
resp = client.start_ai_validation(start_req)
print(resp.task_id)runtime/agentsec/__init__.py— Main entry point withprotect()functionruntime/agentsec/config_file.py— Configuration loading fromagentsec.yamlwith${VAR}substitutionruntime/agentsec/patchers/— Auto-patching for LLM clients (OpenAI, Azure OpenAI, Bedrock, Vertex AI, Cohere, Mistral, Google GenAI, LiteLLM, MCP)runtime/agentsec/inspectors/— API and Gateway mode inspectors for LLM and MCPruntime/agentsec/decision.py— Decision model for inspection resultsruntime/agentsec/exceptions.py— SecurityPolicyError for blocked requests
runtime/chat_inspect.py— ChatInspectionClient for chat-related inspectionruntime/http_inspect.py— HttpInspectionClient for HTTP request/response inspectionruntime/mcp_inspect.py— MCPInspectionClient for MCP JSON-RPC 2.0 message inspectionruntime/models.py— Data models and enums for requests, responses, rules, etc.runtime/mcp_models.py— Data models for MCP messages and inspection responses
mcpscan/mcp_scan.py— MCPScanClient for scanning MCP serversmcpscan/resource_connections.py— ResourceConnectionClient for managing resource connectionsmcpscan/policies.py— MCPPolicyClient for managing MCP Gateway policiesmcpscan/models.py— Data models for MCP server scanning, connections, and events
modelscan/model_scan.py— ModelScanClient for high-level file and repository scanningmodelscan/model_scan_base.py— ModelScan base class for granular scan operationsmodelscan/models.py— Data models for scan requests, responses, and status information
management/__init__.py— ManagementClient for accessing all management APIsmanagement/applications.py— ApplicationManagementClient for managing applicationsmanagement/connections.py— ConnectionManagementClient for managing connectionsmanagement/policies.py— PolicyManagementClient for managing policiesmanagement/events.py— EventManagementClient for retrieving eventsmanagement/models/— Data models for all management resourcesmanagement/validation_client.py— AiValidationClient for starting/listing validation jobsmanagement/models/validation.py— Validation-related request/response models and enums
config.py— SDK-wide configuration (logging, retries, connection pool)exceptions.py— Custom exception classes for robust error handling
The Agent Runtime SDK automatically patches LLM and MCP clients to inspect all interactions with Cisco AI Defense.
Use an agentsec.yaml file for production-grade configuration. The YAML can reference environment variables using ${VAR_NAME} syntax -- how you provision those variables (shell exports, secrets manager, CI/CD injection, .env file, etc.) is up to you.
from aidefense.runtime import agentsec
agentsec.protect(config="agentsec.yaml")
# Import LLM client AFTER protect() -- it's automatically patched
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Hello!"}]
)In API mode, the SDK inspects requests via the AI Defense API, then calls the LLM provider directly.
import os
from aidefense.runtime import agentsec
agentsec.protect(
llm_integration_mode="api",
api_mode={
"llm": {
"mode": "enforce", # "monitor" to log only, "enforce" to block, "off" to disable
"endpoint": os.environ["AI_DEFENSE_API_MODE_LLM_ENDPOINT"],
"api_key": os.environ["AI_DEFENSE_API_MODE_LLM_API_KEY"],
}
},
)
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Hello!"}]
)In Gateway mode, all traffic is routed through the Cisco AI Defense Gateway proxy.
import os
from aidefense.runtime import agentsec
agentsec.protect(
llm_integration_mode="gateway",
gateway_mode={
"llm_gateways": {
"openai-1": {
"gateway_url": "https://gateway.aidefense.cisco.com/tenant/connections/openai-conn",
"gateway_api_key": os.environ["OPENAI_API_KEY"],
"auth_mode": "api_key",
"provider": "openai",
"default": True,
},
},
},
)
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Hello!"}]
)from aidefense.runtime.agentsec import skip_inspection, no_inspection
# Context manager -- skip specific calls
with skip_inspection():
response = client.chat.completions.create(...)
# Decorator -- skip an entire function
@no_inspection()
def health_check():
return client.chat.completions.create(...)from aidefense.runtime.agentsec import SecurityPolicyError
try:
response = client.chat.completions.create(...)
except SecurityPolicyError as e:
print(f"Blocked: {e.decision.action}")
print(f"Reasons: {e.decision.reasons}")| Provider | Package | Patched Methods |
|---|---|---|
| OpenAI | openai |
chat.completions.create() |
| Azure OpenAI | openai |
chat.completions.create() (with Azure endpoint) |
| AWS Bedrock | boto3 |
converse(), converse_stream() |
| Google Vertex AI | google-cloud-aiplatform |
GenerativeModel.generate_content(), generate_content_async() |
| Google GenAI | google-genai |
generate_content(), generate_content_async() |
| Cohere | cohere |
V2Client.chat(), V2Client.chat_stream(), AsyncV2Client.chat(), AsyncV2Client.chat_stream() |
| Mistral AI | mistralai |
Chat.complete(), Chat.stream(), Chat.complete_async(), Chat.stream_async() |
| LiteLLM | litellm |
completion(), acompletion() |
| MCP | mcp |
ClientSession.call_tool(), ClientSession.list_tools() |
For end-to-end examples across frameworks (LangChain, CrewAI, Strands, etc.) and cloud runtimes (AWS Bedrock AgentCore, GCP Vertex AI Agent Engine, Azure AI Foundry), see the agentsec examples.
from aidefense import ChatInspectionClient
client = ChatInspectionClient(api_key="YOUR_INSPECTION_API_KEY")
response = client.inspect_prompt("What is your credit card number?")
print(response.is_safe)
for rule in response.rules or []:
print(rule.rule_name, rule.classification)from aidefense import HttpInspectionClient
from aidefense.runtime.models import Message, Role
import requests
import json
client = HttpInspectionClient(api_key="YOUR_INSPECTION_API_KEY")
# Inspect a request with dictionary body (automatically JSON-serialized)
payload = {
"model": "gpt-4",
"messages": [
{"role": "user", "content": "Tell me about security"}
]
}
result = client.inspect_request(
method="POST",
url="https://api.example.com/v1/chat/completions",
headers={"Content-Type": "application/json"},
body=payload, # Dictionary is automatically serialized to JSON
)
print(result.is_safe)
# Inspect using raw bytes or string
json_bytes = json.dumps({"key": "value"}).encode()
result = client.inspect_request(
method="POST",
url="https://example.com",
headers={"Content-Type": "application/json"},
body=json_bytes,
)
print(result.is_safe)
# Inspect a requests.Request or PreparedRequest
req = requests.Request("GET", "https://example.com").prepare()
result = client.inspect_request_from_http_library(req)
print(result.is_safe)The MCP (Model Context Protocol) Inspection API allows you to inspect JSON-RPC 2.0 messages used by AI agents for security, privacy, and safety violations.
from aidefense import MCPInspectionClient, Config
from aidefense.runtime import MCPMessage
# Initialize client
client = MCPInspectionClient(api_key="YOUR_INSPECTION_API_KEY")
# Inspect a tool call request
result = client.inspect_tool_call(
tool_name="execute_query",
arguments={"query": "SELECT * FROM users"},
message_id=1
)
print(f"Is safe: {result.result.is_safe}")
# Inspect a resource read request
result = client.inspect_resource_read(
uri="file:///etc/passwd",
message_id=2
)
if result.result and not result.result.is_safe:
print("Sensitive resource access detected!")
# Inspect a tool response for data leakage (PII, PCI, PHI)
result = client.inspect_response(
result_data={
"content": [
{"type": "text", "text": "User email: john.doe@example.com, SSN: 123-45-6789"}
]
},
method="tools/call",
params={"name": "get_user_info", "arguments": {"user_id": "123"}},
message_id=3
)
if result.result and not result.result.is_safe:
print("Response contains sensitive data!")
for rule in result.result.rules or []:
print(f" Triggered: {rule.rule_name}")
# Inspect a raw MCP message
message = MCPMessage(
jsonrpc="2.0",
method="tools/call",
params={"name": "search", "arguments": {"query": "confidential data"}},
id=4
)
result = client.inspect(message)
print(f"Action: {result.result.action}")The MCP Server Scanning API allows you to scan MCP servers for security threats and manage resource connections.
from aidefense.mcpscan import MCPScanClient, ResourceConnectionClient
from aidefense.mcpscan.models import (
StartMCPServerScanRequest,
TransportType,
MCPScanStatus,
CreateResourceConnectionRequest,
ResourceConnectionType,
)
# Initialize the MCP scan client
client = MCPScanClient(api_key="YOUR_MANAGEMENT_API_KEY")
# Scan an MCP server
request = StartMCPServerScanRequest(
name="My MCP Server",
url="https://mcp-server.example.com/sse",
description="Production MCP server",
connection_type=TransportType.SSE
)
# Run the scan (waits for completion)
result = client.scan_mcp_server(request)
if result.status == MCPScanStatus.COMPLETED:
print("✅ Scan completed")
if result.result and result.result.is_safe:
print("✅ MCP server is safe")
else:
print("⚠️ Security issues detected")
# Manage resource connections
conn_client = ResourceConnectionClient(api_key="YOUR_MANAGEMENT_API_KEY")
# Create a connection
create_request = CreateResourceConnectionRequest(
connection_name="Production MCP Connection",
connection_type=ResourceConnectionType.MCP_GATEWAY,
resource_ids=[] # Add MCP server IDs after registration
)
response = conn_client.create_connection(create_request)
print(f"Created connection: {response.connection_id}")from aidefense.modelscan import ModelScanClient
from aidefense.modelscan.models import ScanStatus
# Initialize client
client = ModelScanClient(api_key="YOUR_MANAGEMENT_API_KEY")
# Scan a local model file
result = client.scan_file("/path/to/model.pkl")
# Check the results
if result.status == ScanStatus.COMPLETED:
print("✅ Scan completed successfully")
# Check for threats in each file
for file_info in result.analysis_results.items:
if file_info.threats.items:
print(f"⚠️ Threats found in {file_info.name}:")
else:
print(f"✅ {file_info.name} is clean")
elif result.status == ScanStatus.FAILED:
print("❌ Scan failed")from aidefense.modelscan import ModelScanClient
from aidefense.modelscan.models import (
ModelRepoConfig, Auth, HuggingFaceAuth, URLType, ScanStatus
)
# Initialize client
client = ModelScanClient(api_key="YOUR_MANAGEMENT_API_KEY")
# Configure repository scan with authentication
repo_config = ModelRepoConfig(
url="https://huggingface.co/username/model-name",
type=URLType.HUGGING_FACE,
auth=Auth(huggingface=HuggingFaceAuth(access_token="YOUR_HF_TOKEN"))
)
# Scan the repository
result = client.scan_repo(repo_config)
# Process results
if result.status == ScanStatus.COMPLETED:
print("✅ Repository scan completed")
print(f"Repository: {result.repository.url}")
print(f"Files scanned: {result.repository.files_scanned}")
# Check for threats
for file_info in result.analysis_results.items:
if file_info.threats.items:
print(f"⚠️ Threats in {file_info.name}")from aidefense.modelscan import ModelScanClient
from aidefense.modelscan.models import (
ListScansRequest, GetScanStatusRequest
)
client = ModelScanClient(api_key="YOUR_MANAGEMENT_API_KEY")
# List all scans with pagination
request = ListScansRequest(limit=10, offset=0)
response = client.list_scans(request)
print(f"Found {response.scans.paging.total} scans")
for scan in response.scans.items:
print(f" • {scan.scan_id}: {scan.name} - {scan.status}")
# Get detailed information about a specific scan
scan_id = response.scans.items[0].scan_id
detail_request = GetScanStatusRequest(file_limit=10, file_offset=0)
detail_response = client.get_scan(scan_id, detail_request)
scan_info = detail_response.scan_status_info
print(f"Scan status: {scan_info.status}")
print(f"Files analyzed: {len(scan_info.analysis_results.items)}")from aidefense.management import ManagementClient
from aidefense.management.models.application import CreateApplicationRequest, UpdateApplicationRequest
from aidefense.management.models.connection import ConnectionType
# Initialize client
client = ManagementClient(api_key="YOUR_MANAGEMENT_API_KEY")
# Create an application
create_app_request = CreateApplicationRequest(
application_name="My Test App",
description="Test application created via SDK",
connection_type=ConnectionType.API
)
result = client.applications.create_application(create_app_request)
application_id = result.application_id
# Get application details
application = client.applications.get_application(application_id, expanded=True)
print(f"Application name: {application.application_name}")
# Update an application
update_request = UpdateApplicationRequest(
application_name="Updated App Name",
description="Updated description"
)
client.applications.update_application(application_id, update_request)
# Delete an application
client.applications.delete_application(application_id)from aidefense.management import ManagementClient
from aidefense.management.models.policy import ListPoliciesRequest, AddOrUpdatePolicyConnectionsRequest
# Initialize client
client = ManagementClient(api_key="YOUR_MANAGEMENT_API_KEY")
# List policies
policies = client.policies.list_policies(ListPoliciesRequest(limit=10, expanded=True))
for policy in policies.items:
print(f"{policy.policy_id}: {policy.name}")
# Associate connections with a policy
policy_id = policies.items[0].policy_id
client.policies.update_policy_connections(
policy_id,
AddOrUpdatePolicyConnectionsRequest(
connections_to_associate=["connection-id-1", "connection-id-2"]
)
)from aidefense.management import ManagementClient
from aidefense.management.models.event import ListEventsRequest
from datetime import datetime, timedelta
# Initialize client
client = ManagementClient(api_key="YOUR_MANAGEMENT_API_KEY")
# List events from the last 24 hours
end_time = datetime.now()
start_time = end_time - timedelta(days=1)
list_events_request = ListEventsRequest(
limit=5,
start_date=start_time,
end_date=end_time,
expanded=True,
sort_by="event_timestamp",
order="desc"
)
events = client.events.list_events(list_events_request)
print(f"Found {events.paging.total} events")
# Get details for an event
if events.items:
event_id = events.items[0].event_id
event_detail = client.events.get_event(event_id, expanded=True)
print(f"Event action: {event_detail.event_action}")
# Get conversation for the event
conversation = client.events.get_event_conversation(event_id, expanded=True)
if "messages" in conversation and conversation["messages"].items:
print(f"Found {len(conversation['messages'].items)} messages in conversation")The SDK uses a Config object for global settings:
- Logger: Pass a custom logger or logger parameters.
- Retry Policy: Customize retry attempts, backoff, and status codes.
- Connection Pool: Control HTTP connection pooling for performance.
from aidefense import Config
# Basic configuration
config = Config(
logger_params={"level": "DEBUG"},
retry_config={"total": 5, "backoff_factor": 1.0},
)
# Configuration with custom API endpoints
custom_endpoint_config = Config(
runtime_base_url="https://custom-runtime-endpoint.example.com",
management_base_url="https://custom-management-endpoint.example.com",
logger_params={"level": "INFO"},
retry_config={"total": 3, "backoff_factor": 2.0},
)
# Initialize clients with custom configuration
chat_client = ChatInspectionClient(api_key="YOUR_INSPECTION_API_KEY", config=custom_endpoint_config)
http_client = HttpInspectionClient(api_key="YOUR_INSPECTION_API_KEY", config=custom_endpoint_config)
management_client = ManagementClient(api_key="YOUR_MANAGEMENT_API_KEY", config=custom_endpoint_config)
validation_client = AiValidationClient(api_key="YOUR_MANAGEMENT_API_KEY", config=custom_endpoint_config)- Custom Inspection Rules: Pass an
InspectionConfigto inspection methods to enable/disable specific rules. - Entity Types: For rules like PII/PCI/PHI, specify entity types for granular inspection.
- Override Default Entities: Pass a custom
entities_mapto HTTP inspection for full control. - Utility Functions: Use
aidefense.utils.to_base64_bytesto easily encode HTTP bodies for inspection. - Async Support: (Coming soon) Planned support for async HTTP inspection.
All SDK errors derive from SDKError in exceptions.py.
Specific exceptions include ValidationError (input issues) and ApiError (API/server issues).
from aidefense.exceptions import ValidationError, ApiError
try:
client.inspect_prompt(Message(role=Role.USER, content="..."))
except ValidationError as ve:
print("Validation error:", ve)
except ApiError as ae:
print("API error:", ae)Contributions are welcome! Please open issues or pull requests for bug fixes, new features, or documentation improvements.
For help or questions, please open an issue.