diff --git a/README.md b/README.md index 755814e..fd80ee3 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,7 @@ Casbin is a powerful and efficient open-source access control library for Python - [Benchmarks](#benchmarks) - [Logging](#logging) - [Examples](#examples) +- [Integrations](#integrations) - [Middlewares](#middlewares) - [Our adopters](#our-adopters) @@ -312,6 +313,42 @@ pycasbin leverages the default Python logging mechanism. The pycasbin package ma | Deny-override | [rbac_model_with_deny.conf](https://github.com/casbin/casbin/blob/master/examples/rbac_with_deny_model.conf) | [rbac_policy_with_deny.csv](https://github.com/casbin/casbin/blob/master/examples/rbac_with_deny_policy.csv) | | Priority | [priority_model.conf](https://github.com/casbin/casbin/blob/master/examples/priority_model.conf) | [priority_policy.csv](https://github.com/casbin/casbin/blob/master/examples/priority_policy.csv) | +## Integrations + +### Langchain Integration + +PyCasbin now supports authorization for Langchain applications, including RAG (Retrieval-Augmented Generation) systems. This integration provides: + +- **Document Access Control**: Control which documents users can access in RAG systems +- **Tool Usage Authorization**: Restrict which Langchain tools users can execute +- **Agent Execution Control**: Manage permissions for running different Langchain agents +- **Multi-tenant Support**: Implement authorization across different tenants/domains + +**Quick Example:** + +```python +from casbin import Enforcer +from casbin.integrations import LangchainEnforcer + +# Initialize with Langchain RAG model +enforcer = Enforcer("langchain_rag_model.conf", "langchain_rag_policy.csv") +lc_enforcer = LangchainEnforcer(enforcer) + +# Check document access +can_read = lc_enforcer.can_access_document("alice", "document:public:readme", "read") + +# Check tool usage +can_use = lc_enforcer.can_use_tool("bob", "search") + +# Filter documents for RAG (remove unauthorized documents before sending to LLM) +filtered_docs = lc_enforcer.filter_documents_by_permission("user", retrieved_docs) +``` + +**Learn More:** +- [Langchain Integration Documentation](examples/langchain/README.md) +- [Basic RAG Example](examples/langchain/basic_rag_example.py) +- [Multi-tenant Example](examples/langchain/multi_tenant_example.py) + ## Middlewares Authz middlewares for web frameworks: https://casbin.org/docs/middlewares diff --git a/casbin/__init__.py b/casbin/__init__.py index fa81258..23c2f0d 100644 --- a/casbin/__init__.py +++ b/casbin/__init__.py @@ -22,3 +22,4 @@ from .effect import * from .model import * from .frontend import * +from . import integrations diff --git a/casbin/integrations/__init__.py b/casbin/integrations/__init__.py new file mode 100644 index 0000000..95e4a54 --- /dev/null +++ b/casbin/integrations/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2021 The casbin Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .langchain import * + +__all__ = ["LangchainEnforcer"] diff --git a/casbin/integrations/langchain.py b/casbin/integrations/langchain.py new file mode 100644 index 0000000..51f6b67 --- /dev/null +++ b/casbin/integrations/langchain.py @@ -0,0 +1,202 @@ +# Copyright 2021 The casbin Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Langchain integration for Casbin authorization. + +This module provides authorization support for Langchain applications including: +- Document access control in RAG systems +- Tool usage authorization +- Agent action permissions +- Multi-tenant support for Langchain applications +""" + +from typing import Optional, List, Dict, Any + + +class LangchainEnforcer: + """ + LangchainEnforcer provides authorization helpers for Langchain applications. + + This class wraps a Casbin enforcer with Langchain-specific convenience methods + for common authorization patterns in LLM applications. + """ + + def __init__(self, enforcer): + """ + Initialize the LangchainEnforcer with a Casbin enforcer. + + Args: + enforcer: A Casbin enforcer instance (Enforcer, AsyncEnforcer, etc.) + """ + self.enforcer = enforcer + + def can_access_document(self, user: str, document_id: str, action: str = "read", domain: Optional[str] = None) -> bool: + """ + Check if a user can access a document in a RAG system. + + Args: + user: User identifier + document_id: Document identifier (e.g., "document:public:readme", "document:private:secret") + action: Action to perform (read, write, delete) + domain: Optional domain/tenant identifier for multi-tenant applications + + Returns: + bool: True if access is allowed, False otherwise + """ + if domain: + return self.enforcer.enforce(user, domain, document_id, action) + return self.enforcer.enforce(user, document_id, action) + + def can_use_tool(self, user: str, tool_name: str, domain: Optional[str] = None) -> bool: + """ + Check if a user can use a specific Langchain tool. + + Args: + user: User identifier + tool_name: Tool identifier (e.g., "tool:search", "tool:calculator") + domain: Optional domain/tenant identifier for multi-tenant applications + + Returns: + bool: True if tool usage is allowed, False otherwise + """ + tool_resource = f"tool:{tool_name}" if not tool_name.startswith("tool:") else tool_name + if domain: + return self.enforcer.enforce(user, domain, tool_resource, "use") + return self.enforcer.enforce(user, tool_resource, "use") + + def can_execute_agent(self, user: str, agent_name: str, domain: Optional[str] = None) -> bool: + """ + Check if a user can execute a specific Langchain agent. + + Args: + user: User identifier + agent_name: Agent identifier (e.g., "agent:chatbot", "agent:analyst") + domain: Optional domain/tenant identifier for multi-tenant applications + + Returns: + bool: True if agent execution is allowed, False otherwise + """ + agent_resource = f"agent:{agent_name}" if not agent_name.startswith("agent:") else agent_name + if domain: + return self.enforcer.enforce(user, domain, agent_resource, "execute") + return self.enforcer.enforce(user, agent_resource, "execute") + + def get_accessible_documents(self, user: str, action: str = "read", domain: Optional[str] = None) -> List[str]: + """ + Get all documents accessible to a user. + + Args: + user: User identifier + action: Action to check (read, write, delete) + domain: Optional domain/tenant identifier for multi-tenant applications + + Returns: + List[str]: List of accessible document identifiers + """ + accessible_docs = [] + + # Get all policies + policies = self.enforcer.get_policy() + + for policy in policies: + if domain: + # Format: [sub, dom, obj, act] + if len(policy) >= 4: + pol_sub, pol_dom, pol_obj, pol_act = policy[0], policy[1], policy[2], policy[3] + if pol_obj.startswith("document:") and pol_act == action and pol_dom == domain: + # Check if user has this permission + if self.enforcer.enforce(user, domain, pol_obj, action): + accessible_docs.append(pol_obj) + else: + # Format: [sub, obj, act] + if len(policy) >= 3: + pol_sub, pol_obj, pol_act = policy[0], policy[1], policy[2] + if pol_obj.startswith("document:") and pol_act == action: + # Check if user has this permission + if self.enforcer.enforce(user, pol_obj, action): + accessible_docs.append(pol_obj) + + # Remove duplicates + return list(set(accessible_docs)) + + def get_available_tools(self, user: str, domain: Optional[str] = None) -> List[str]: + """ + Get all tools available to a user. + + Args: + user: User identifier + domain: Optional domain/tenant identifier for multi-tenant applications + + Returns: + List[str]: List of available tool identifiers + """ + available_tools = [] + + # Get all policies + policies = self.enforcer.get_policy() + + for policy in policies: + if domain: + # Format: [sub, dom, obj, act] + if len(policy) >= 4: + pol_sub, pol_dom, pol_obj, pol_act = policy[0], policy[1], policy[2], policy[3] + if pol_obj.startswith("tool:") and pol_act == "use" and pol_dom == domain: + # Check if user has this permission + if self.enforcer.enforce(user, domain, pol_obj, "use"): + available_tools.append(pol_obj) + else: + # Format: [sub, obj, act] + if len(policy) >= 3: + pol_sub, pol_obj, pol_act = policy[0], policy[1], policy[2] + if pol_obj.startswith("tool:") and pol_act == "use": + # Check if user has this permission + if self.enforcer.enforce(user, pol_obj, "use"): + available_tools.append(pol_obj) + + # Remove duplicates + return list(set(available_tools)) + + def filter_documents_by_permission(self, user: str, documents: List[Dict[str, Any]], + document_id_field: str = "id", + action: str = "read", + domain: Optional[str] = None) -> List[Dict[str, Any]]: + """ + Filter a list of documents based on user permissions. + + This is useful for RAG systems where you need to filter retrieved documents + before passing them to the LLM. + + Args: + user: User identifier + documents: List of document dictionaries + document_id_field: Field name containing the document ID + action: Action to check (read, write, delete) + domain: Optional domain/tenant identifier for multi-tenant applications + + Returns: + List[Dict[str, Any]]: Filtered list of documents user can access + """ + filtered_docs = [] + + for doc in documents: + doc_id = doc.get(document_id_field) + if doc_id: + if self.can_access_document(user, doc_id, action, domain): + filtered_docs.append(doc) + + return filtered_docs + + +__all__ = ["LangchainEnforcer"] diff --git a/examples/langchain/README.md b/examples/langchain/README.md new file mode 100644 index 0000000..7b3683b --- /dev/null +++ b/examples/langchain/README.md @@ -0,0 +1,230 @@ +# Langchain Authorization with Casbin + +This directory contains examples demonstrating how to use Casbin for authorization in Langchain applications. + +## Overview + +Casbin provides powerful authorization capabilities for Langchain applications, including: + +- **Document Access Control**: Control which documents users can access in RAG (Retrieval-Augmented Generation) systems +- **Tool Usage Authorization**: Restrict which Langchain tools users can execute +- **Agent Execution Control**: Manage permissions for running different Langchain agents +- **Multi-tenant Support**: Implement authorization across different tenants/domains + +## Examples + +### Basic RAG Authorization + +[`basic_rag_example.py`](basic_rag_example.py) demonstrates: +- Document access control with different user roles (admin, data_analyst, basic_user) +- Tool usage authorization (search, calculator, database) +- Agent execution permissions +- Filtering retrieved documents based on user permissions + +Run the example: +```bash +python examples/langchain/basic_rag_example.py +``` + +### Multi-Tenant Authorization + +[`multi_tenant_example.py`](multi_tenant_example.py) demonstrates: +- Authorization across multiple tenants/domains +- Users having different permissions in different tenants +- Tenant-specific resource access + +Run the example: +```bash +python examples/langchain/multi_tenant_example.py +``` + +## Usage + +### 1. Basic Setup + +```python +from casbin import Enforcer +from casbin.integrations import LangchainEnforcer + +# Initialize Casbin enforcer with Langchain RAG model +enforcer = Enforcer( + "langchain_rag_model.conf", + "langchain_rag_policy.csv" +) + +# Wrap with LangchainEnforcer for convenience methods +lc_enforcer = LangchainEnforcer(enforcer) +``` + +### 2. Document Access Control + +```python +# Check if user can access a document +can_read = lc_enforcer.can_access_document("alice", "document:public:readme", "read") + +# Get all accessible documents for a user +accessible_docs = lc_enforcer.get_accessible_documents("bob", "read") + +# Filter documents based on permissions (useful for RAG) +retrieved_docs = [ + {"id": "document:public:readme", "content": "..."}, + {"id": "document:private:secret", "content": "..."}, +] +filtered = lc_enforcer.filter_documents_by_permission("user", retrieved_docs) +``` + +### 3. Tool Usage Authorization + +```python +# Check if user can use a tool +can_use = lc_enforcer.can_use_tool("bob", "search") + +# Get all available tools for a user +tools = lc_enforcer.get_available_tools("bob") +``` + +### 4. Agent Execution Control + +```python +# Check if user can execute an agent +can_execute = lc_enforcer.can_execute_agent("alice", "chatbot") +``` + +### 5. Multi-Tenant Authorization + +```python +# Check access with tenant/domain +can_read = lc_enforcer.can_access_document( + "alice", "document:private:data", "read", domain="tenant1" +) + +# Get tenant-specific resources +docs = lc_enforcer.get_accessible_documents("bob", "read", domain="tenant1") +``` + +## Model Configuration + +### Basic RAG Model + +The basic model (`langchain_rag_model.conf`) uses RBAC with pattern matching: + +```ini +[request_definition] +r = sub, obj, act + +[policy_definition] +p = sub, obj, act + +[role_definition] +g = _, _ + +[policy_effect] +e = some(where (p.eft == allow)) + +[matchers] +m = g(r.sub, p.sub) && keyMatch(r.obj, p.obj) && r.act == p.act +``` + +### Multi-Tenant Model + +The multi-tenant model (`langchain_with_domains_model.conf`) adds domain support: + +```ini +[request_definition] +r = sub, dom, obj, act + +[policy_definition] +p = sub, dom, obj, act + +[role_definition] +g = _, _, _ + +[policy_effect] +e = some(where (p.eft == allow)) + +[matchers] +m = g(r.sub, p.sub, r.dom) && r.dom == p.dom && keyMatch(r.obj, p.obj) && r.act == p.act +``` + +## Policy Configuration + +Policies define the actual permissions using pattern matching: + +```csv +# Role permissions +p, admin, document:*, read +p, admin, tool:*, use +p, data_analyst, document:public:*, read +p, data_analyst, document:analytics:*, read +p, data_analyst, tool:search, use + +# Role assignments +g, alice, admin +g, bob, data_analyst +``` + +## Resource Naming Conventions + +- **Documents**: `document::` (e.g., `document:public:readme`, `document:analytics:report`) +- **Tools**: `tool:` (e.g., `tool:search`, `tool:calculator`) +- **Agents**: `agent:` (e.g., `agent:chatbot`, `agent:analyst`) + +## Integration with Langchain + +### RAG Document Filtering + +```python +from langchain.vectorstores import FAISS +from langchain.embeddings import OpenAIEmbeddings + +# After retrieving documents from vector store +retrieved_docs = vectorstore.similarity_search(query) + +# Convert to format suitable for filtering +docs_with_ids = [ + {"id": f"document:{doc.metadata['category']}:{doc.metadata['name']}", + "content": doc.page_content} + for doc in retrieved_docs +] + +# Filter based on user permissions +authorized_docs = lc_enforcer.filter_documents_by_permission( + user_id, docs_with_ids, document_id_field="id" +) + +# Pass only authorized documents to LLM +authorized_content = [doc["content"] for doc in authorized_docs] +``` + +### Tool Authorization + +```python +from langchain.agents import initialize_agent, Tool + +# Get available tools for user +available_tool_names = lc_enforcer.get_available_tools(user_id) + +# Filter tools based on authorization +authorized_tools = [ + tool for tool in all_tools + if f"tool:{tool.name}" in available_tool_names or + lc_enforcer.can_use_tool(user_id, tool.name) +] + +# Initialize agent with authorized tools only +agent = initialize_agent(authorized_tools, llm, agent="zero-shot-react-description") +``` + +## Benefits + +1. **Centralized Authorization**: Manage all access control in one place +2. **Fine-grained Control**: Control access at document, tool, and agent level +3. **Multi-tenancy**: Support multiple organizations/tenants with different policies +4. **Flexible Policies**: Use patterns and wildcards for easy policy management +5. **Production-Ready**: Built on battle-tested Casbin authorization library + +## Learn More + +- [Casbin Documentation](https://casbin.org/docs/) +- [Langchain Documentation](https://python.langchain.com/) +- [PyCasbin GitHub](https://github.com/casbin/pycasbin) diff --git a/examples/langchain/basic_rag_example.py b/examples/langchain/basic_rag_example.py new file mode 100644 index 0000000..8944cf9 --- /dev/null +++ b/examples/langchain/basic_rag_example.py @@ -0,0 +1,119 @@ +""" +Basic Langchain RAG Authorization Example + +This example demonstrates how to use Casbin with Langchain for document access control +in a Retrieval-Augmented Generation (RAG) system. +""" + +import os +import sys + +# Add parent directory to path for imports +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) + +from casbin import Enforcer +from casbin.integrations import LangchainEnforcer + + +def get_examples(filename): + """Get the path to an example file.""" + examples_path = os.path.join(os.path.dirname(__file__), "..") + return os.path.join(examples_path, filename) + + +def main(): + """Demonstrate basic Langchain RAG authorization.""" + print("=" * 60) + print("Casbin + Langchain: Basic RAG Authorization Example") + print("=" * 60) + print() + + # Initialize Casbin enforcer with Langchain RAG model + enforcer = Enforcer( + get_examples("langchain_rag_model.conf"), + get_examples("langchain_rag_policy.csv") + ) + + # Wrap with LangchainEnforcer for convenience methods + lc_enforcer = LangchainEnforcer(enforcer) + + # Test document access for different users + print("Document Access Control:") + print("-" * 40) + + users = ["alice", "bob", "charlie"] + documents = [ + "document:public:readme", + "document:analytics:report", + "document:private:secret" + ] + + for user in users: + print(f"\nUser: {user}") + for doc in documents: + can_read = lc_enforcer.can_access_document(user, doc, "read") + print(f" {doc}: {'✓ Allowed' if can_read else '✗ Denied'}") + + # Test tool usage authorization + print("\n\nTool Usage Authorization:") + print("-" * 40) + + tools = ["search", "calculator", "database"] + + for user in users: + print(f"\nUser: {user}") + for tool in tools: + can_use = lc_enforcer.can_use_tool(user, tool) + print(f" tool:{tool}: {'✓ Allowed' if can_use else '✗ Denied'}") + + # Test agent execution + print("\n\nAgent Execution Authorization:") + print("-" * 40) + + agents = ["chatbot", "analyst", "admin_agent"] + + for user in ["alice", "dave", "charlie"]: + print(f"\nUser: {user}") + for agent in agents: + can_execute = lc_enforcer.can_execute_agent(user, agent) + print(f" agent:{agent}: {'✓ Allowed' if can_execute else '✗ Denied'}") + + # Get accessible resources for a user + print("\n\nAccessible Resources for Bob (data_analyst):") + print("-" * 40) + + accessible_docs = lc_enforcer.get_accessible_documents("bob", "read") + print(f"Accessible documents: {accessible_docs}") + + available_tools = lc_enforcer.get_available_tools("bob") + print(f"Available tools: {available_tools}") + + # Demonstrate document filtering for RAG + print("\n\nDocument Filtering for RAG:") + print("-" * 40) + + # Simulated retrieved documents from a vector database + retrieved_docs = [ + {"id": "document:public:readme", "content": "Public readme content"}, + {"id": "document:analytics:report", "content": "Analytics report"}, + {"id": "document:private:secret", "content": "Secret information"}, + ] + + # Filter documents based on user permissions + filtered_docs = lc_enforcer.filter_documents_by_permission( + "charlie", retrieved_docs, document_id_field="id" + ) + + print(f"Charlie retrieved {len(retrieved_docs)} documents from vector DB") + print(f"After authorization filtering: {len(filtered_docs)} documents allowed") + print("Allowed documents:") + for doc in filtered_docs: + print(f" - {doc['id']}") + + print("\n" + "=" * 60) + print("Example completed successfully!") + print("=" * 60) + + +if __name__ == "__main__": + main() diff --git a/examples/langchain/multi_tenant_example.py b/examples/langchain/multi_tenant_example.py new file mode 100644 index 0000000..54206ec --- /dev/null +++ b/examples/langchain/multi_tenant_example.py @@ -0,0 +1,91 @@ +""" +Multi-Tenant Langchain Authorization Example + +This example demonstrates how to use Casbin with Langchain for multi-tenant applications +where users have different permissions in different domains/tenants. +""" + +import os +import sys + +# Add parent directory to path for imports +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) + +from casbin import Enforcer +from casbin.integrations import LangchainEnforcer + + +def get_examples(filename): + """Get the path to an example file.""" + examples_path = os.path.join(os.path.dirname(__file__), "..") + return os.path.join(examples_path, filename) + + +def main(): + """Demonstrate multi-tenant Langchain authorization.""" + print("=" * 60) + print("Casbin + Langchain: Multi-Tenant Authorization Example") + print("=" * 60) + print() + + # Initialize Casbin enforcer with multi-tenant model + enforcer = Enforcer( + get_examples("langchain_with_domains_model.conf"), + get_examples("langchain_with_domains_policy.csv") + ) + + # Wrap with LangchainEnforcer + lc_enforcer = LangchainEnforcer(enforcer) + + # Test access across different tenants + print("Multi-Tenant Document Access:") + print("-" * 40) + + test_cases = [ + ("alice", "tenant1", "document:public:readme"), + ("alice", "tenant1", "document:private:data"), + ("bob", "tenant1", "document:public:readme"), + ("bob", "tenant1", "document:private:data"), + ("carol", "tenant2", "document:shared:doc"), + ("dave", "tenant2", "document:shared:doc"), + ("dave", "tenant2", "document:private:admin"), + ] + + for user, tenant, doc in test_cases: + can_access = lc_enforcer.can_access_document(user, doc, "read", domain=tenant) + print(f"{user}@{tenant} → {doc}: {'✓ Allowed' if can_access else '✗ Denied'}") + + # Test tool access across tenants + print("\n\nMulti-Tenant Tool Access:") + print("-" * 40) + + tool_tests = [ + ("alice", "tenant1", "search"), + ("bob", "tenant1", "search"), + ("bob", "tenant1", "admin_tool"), + ("carol", "tenant2", "search"), + ("dave", "tenant2", "search"), + ] + + for user, tenant, tool in tool_tests: + can_use = lc_enforcer.can_use_tool(user, tool, domain=tenant) + print(f"{user}@{tenant} → tool:{tool}: {'✓ Allowed' if can_use else '✗ Denied'}") + + # Get tenant-specific accessible resources + print("\n\nTenant-Specific Accessible Resources:") + print("-" * 40) + + for user, tenant in [("bob", "tenant1"), ("dave", "tenant2")]: + print(f"\n{user}@{tenant}:") + docs = lc_enforcer.get_accessible_documents(user, "read", domain=tenant) + tools = lc_enforcer.get_available_tools(user, domain=tenant) + print(f" Documents: {docs}") + print(f" Tools: {tools}") + + print("\n" + "=" * 60) + print("Multi-tenant example completed successfully!") + print("=" * 60) + + +if __name__ == "__main__": + main() diff --git a/examples/langchain_rag_model.conf b/examples/langchain_rag_model.conf new file mode 100644 index 0000000..b9f3a43 --- /dev/null +++ b/examples/langchain_rag_model.conf @@ -0,0 +1,17 @@ +# Langchain RAG Authorization Model +# This model controls access to documents, tools, and agent actions in Langchain applications + +[request_definition] +r = sub, obj, act + +[policy_definition] +p = sub, obj, act + +[role_definition] +g = _, _ + +[policy_effect] +e = some(where (p.eft == allow)) + +[matchers] +m = g(r.sub, p.sub) && keyMatch(r.obj, p.obj) && r.act == p.act diff --git a/examples/langchain_rag_policy.csv b/examples/langchain_rag_policy.csv new file mode 100644 index 0000000..d3013d4 --- /dev/null +++ b/examples/langchain_rag_policy.csv @@ -0,0 +1,20 @@ +p, admin, document:*, read +p, admin, document:*, write +p, admin, tool:*, use +p, admin, agent:*, execute + +p, data_analyst, document:public:*, read +p, data_analyst, document:analytics:*, read +p, data_analyst, tool:search, use +p, data_analyst, tool:calculator, use + +p, basic_user, document:public:*, read +p, basic_user, tool:search, use + +p, agent_user, agent:chatbot, execute +p, agent_user, document:public:*, read + +g, alice, admin +g, bob, data_analyst +g, charlie, basic_user +g, dave, agent_user diff --git a/examples/langchain_with_domains_model.conf b/examples/langchain_with_domains_model.conf new file mode 100644 index 0000000..b11aa60 --- /dev/null +++ b/examples/langchain_with_domains_model.conf @@ -0,0 +1,17 @@ +# Langchain Authorization Model with Domains/Tenants +# Supports multi-tenant Langchain applications where users have different permissions in different domains + +[request_definition] +r = sub, dom, obj, act + +[policy_definition] +p = sub, dom, obj, act + +[role_definition] +g = _, _, _ + +[policy_effect] +e = some(where (p.eft == allow)) + +[matchers] +m = g(r.sub, p.sub, r.dom) && r.dom == p.dom && keyMatch(r.obj, p.obj) && r.act == p.act diff --git a/examples/langchain_with_domains_policy.csv b/examples/langchain_with_domains_policy.csv new file mode 100644 index 0000000..060dac5 --- /dev/null +++ b/examples/langchain_with_domains_policy.csv @@ -0,0 +1,18 @@ +p, admin, tenant1, document:*, read +p, admin, tenant1, document:*, write +p, admin, tenant1, tool:*, use +p, admin, tenant1, agent:*, execute + +p, user, tenant1, document:public:*, read +p, user, tenant1, tool:search, use + +p, admin, tenant2, document:*, read +p, admin, tenant2, document:*, write +p, admin, tenant2, tool:*, use + +p, user, tenant2, document:shared:*, read + +g, alice, admin, tenant1 +g, bob, user, tenant1 +g, carol, admin, tenant2 +g, dave, user, tenant2 diff --git a/tests/test_langchain_integration.py b/tests/test_langchain_integration.py new file mode 100644 index 0000000..0cd52fb --- /dev/null +++ b/tests/test_langchain_integration.py @@ -0,0 +1,242 @@ +# Copyright 2021 The casbin Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import casbin +from casbin.integrations import LangchainEnforcer +from unittest import TestCase + + +def get_examples(filename): + """Get the path to an example file.""" + examples_path = os.path.split(os.path.realpath(__file__))[0] + examples_path = os.path.join(examples_path, "..", "examples") + return os.path.join(examples_path, filename) + + +class TestLangchainIntegration(TestCase): + """Test cases for Langchain integration with Casbin.""" + + def test_document_access_basic(self): + """Test basic document access control.""" + e = casbin.Enforcer( + get_examples("langchain_rag_model.conf"), + get_examples("langchain_rag_policy.csv") + ) + lc_enforcer = LangchainEnforcer(e) + + # Admin (alice) should have access to all documents + self.assertTrue(lc_enforcer.can_access_document("alice", "document:public:readme", "read")) + self.assertTrue(lc_enforcer.can_access_document("alice", "document:analytics:report", "read")) + self.assertTrue(lc_enforcer.can_access_document("alice", "document:private:secret", "write")) + + # Data analyst (bob) should have access to public and analytics documents + self.assertTrue(lc_enforcer.can_access_document("bob", "document:public:readme", "read")) + self.assertTrue(lc_enforcer.can_access_document("bob", "document:analytics:report", "read")) + self.assertFalse(lc_enforcer.can_access_document("bob", "document:private:secret", "read")) + + # Basic user (charlie) should only have access to public documents + self.assertTrue(lc_enforcer.can_access_document("charlie", "document:public:readme", "read")) + self.assertFalse(lc_enforcer.can_access_document("charlie", "document:analytics:report", "read")) + self.assertFalse(lc_enforcer.can_access_document("charlie", "document:private:secret", "read")) + + def test_tool_usage(self): + """Test tool usage authorization.""" + e = casbin.Enforcer( + get_examples("langchain_rag_model.conf"), + get_examples("langchain_rag_policy.csv") + ) + lc_enforcer = LangchainEnforcer(e) + + # Admin should be able to use all tools + self.assertTrue(lc_enforcer.can_use_tool("alice", "search")) + self.assertTrue(lc_enforcer.can_use_tool("alice", "calculator")) + self.assertTrue(lc_enforcer.can_use_tool("alice", "database")) + + # Data analyst should be able to use search and calculator + self.assertTrue(lc_enforcer.can_use_tool("bob", "search")) + self.assertTrue(lc_enforcer.can_use_tool("bob", "calculator")) + self.assertFalse(lc_enforcer.can_use_tool("bob", "database")) + + # Basic user should only be able to use search + self.assertTrue(lc_enforcer.can_use_tool("charlie", "search")) + self.assertFalse(lc_enforcer.can_use_tool("charlie", "calculator")) + self.assertFalse(lc_enforcer.can_use_tool("charlie", "database")) + + def test_agent_execution(self): + """Test agent execution authorization.""" + e = casbin.Enforcer( + get_examples("langchain_rag_model.conf"), + get_examples("langchain_rag_policy.csv") + ) + lc_enforcer = LangchainEnforcer(e) + + # Admin should be able to execute all agents + self.assertTrue(lc_enforcer.can_execute_agent("alice", "chatbot")) + self.assertTrue(lc_enforcer.can_execute_agent("alice", "analyst")) + + # Agent user should be able to execute chatbot + self.assertTrue(lc_enforcer.can_execute_agent("dave", "chatbot")) + self.assertFalse(lc_enforcer.can_execute_agent("dave", "analyst")) + + # Basic user should not be able to execute agents + self.assertFalse(lc_enforcer.can_execute_agent("charlie", "chatbot")) + self.assertFalse(lc_enforcer.can_execute_agent("charlie", "analyst")) + + def test_get_accessible_documents(self): + """Test getting list of accessible documents.""" + e = casbin.Enforcer( + get_examples("langchain_rag_model.conf"), + get_examples("langchain_rag_policy.csv") + ) + lc_enforcer = LangchainEnforcer(e) + + # Get accessible documents for data analyst + accessible = lc_enforcer.get_accessible_documents("bob", "read") + + # Bob should have access to public and analytics documents + self.assertTrue(any("public" in doc for doc in accessible)) + self.assertTrue(any("analytics" in doc for doc in accessible)) + + # Get accessible documents for basic user + accessible = lc_enforcer.get_accessible_documents("charlie", "read") + + # Charlie should only have access to public documents + self.assertTrue(any("public" in doc for doc in accessible)) + self.assertFalse(any("analytics" in doc for doc in accessible)) + + def test_get_available_tools(self): + """Test getting list of available tools.""" + e = casbin.Enforcer( + get_examples("langchain_rag_model.conf"), + get_examples("langchain_rag_policy.csv") + ) + lc_enforcer = LangchainEnforcer(e) + + # Get available tools for data analyst + tools = lc_enforcer.get_available_tools("bob") + + # Bob should have access to search and calculator + self.assertTrue(any("search" in tool for tool in tools)) + self.assertTrue(any("calculator" in tool for tool in tools)) + + # Get available tools for basic user + tools = lc_enforcer.get_available_tools("charlie") + + # Charlie should only have access to search + self.assertTrue(any("search" in tool for tool in tools)) + self.assertEqual(len(tools), 1) + + def test_filter_documents_by_permission(self): + """Test filtering documents based on permissions.""" + e = casbin.Enforcer( + get_examples("langchain_rag_model.conf"), + get_examples("langchain_rag_policy.csv") + ) + lc_enforcer = LangchainEnforcer(e) + + # Simulated documents from a RAG system + documents = [ + {"id": "document:public:readme", "content": "Public content"}, + {"id": "document:analytics:report", "content": "Analytics content"}, + {"id": "document:private:secret", "content": "Secret content"}, + ] + + # Filter for basic user (charlie) + filtered = lc_enforcer.filter_documents_by_permission("charlie", documents) + + # Charlie should only see public documents + self.assertEqual(len(filtered), 1) + self.assertTrue("public" in filtered[0]["id"]) + + # Filter for data analyst (bob) + filtered = lc_enforcer.filter_documents_by_permission("bob", documents) + + # Bob should see public and analytics documents + self.assertEqual(len(filtered), 2) + ids = [doc["id"] for doc in filtered] + self.assertTrue(any("public" in id for id in ids)) + self.assertTrue(any("analytics" in id for id in ids)) + + # Filter for admin (alice) + filtered = lc_enforcer.filter_documents_by_permission("alice", documents) + + # Alice should see all documents + self.assertEqual(len(filtered), 3) + + def test_multi_tenant_document_access(self): + """Test document access with multi-tenant support.""" + e = casbin.Enforcer( + get_examples("langchain_with_domains_model.conf"), + get_examples("langchain_with_domains_policy.csv") + ) + lc_enforcer = LangchainEnforcer(e) + + # Test access in tenant1 + self.assertTrue(lc_enforcer.can_access_document("alice", "document:public:readme", "read", "tenant1")) + self.assertTrue(lc_enforcer.can_access_document("bob", "document:public:readme", "read", "tenant1")) + + # Test access in tenant2 + self.assertTrue(lc_enforcer.can_access_document("carol", "document:shared:doc", "read", "tenant2")) + self.assertTrue(lc_enforcer.can_access_document("dave", "document:shared:doc", "read", "tenant2")) + + # Test cross-tenant access (should fail) + self.assertFalse(lc_enforcer.can_access_document("alice", "document:shared:doc", "read", "tenant2")) + self.assertFalse(lc_enforcer.can_access_document("carol", "document:public:readme", "read", "tenant1")) + + def test_multi_tenant_tool_access(self): + """Test tool access with multi-tenant support.""" + e = casbin.Enforcer( + get_examples("langchain_with_domains_model.conf"), + get_examples("langchain_with_domains_policy.csv") + ) + lc_enforcer = LangchainEnforcer(e) + + # Test tool access in tenant1 + self.assertTrue(lc_enforcer.can_use_tool("alice", "search", "tenant1")) + self.assertTrue(lc_enforcer.can_use_tool("bob", "search", "tenant1")) + + # Test tool access in tenant2 + self.assertTrue(lc_enforcer.can_use_tool("carol", "search", "tenant2")) + + # Alice is admin in tenant1 but not in tenant2 + self.assertFalse(lc_enforcer.can_use_tool("alice", "search", "tenant2")) + + def test_tool_name_with_prefix(self): + """Test that tool names work with or without 'tool:' prefix.""" + e = casbin.Enforcer( + get_examples("langchain_rag_model.conf"), + get_examples("langchain_rag_policy.csv") + ) + lc_enforcer = LangchainEnforcer(e) + + # Test with and without prefix + self.assertEqual( + lc_enforcer.can_use_tool("bob", "search"), + lc_enforcer.can_use_tool("bob", "tool:search") + ) + + def test_agent_name_with_prefix(self): + """Test that agent names work with or without 'agent:' prefix.""" + e = casbin.Enforcer( + get_examples("langchain_rag_model.conf"), + get_examples("langchain_rag_policy.csv") + ) + lc_enforcer = LangchainEnforcer(e) + + # Test with and without prefix + self.assertEqual( + lc_enforcer.can_execute_agent("dave", "chatbot"), + lc_enforcer.can_execute_agent("dave", "agent:chatbot") + )