diff --git a/.github/dependabot.yml.sample b/.github/dependabot.yml.sample new file mode 100644 index 0000000..f500d0a --- /dev/null +++ b/.github/dependabot.yml.sample @@ -0,0 +1,55 @@ +version: 2 +updates: + - package-ecosystem: "pip" + directory: "/" + schedule: + interval: "daily" + open-pull-requests-limit: 10 + reviewers: + - "your-github-username" + assignees: + - "your-github-username" + commit-message: + prefix: "chore" + include: "scope" + target-branch: "main" + labels: + - "dependencies" + ignore: + - dependency-name: "*" + update-types: ["version-update:semver-patch"] + extend-update-types: + - name: "security" + security-severities: + - "CRITICAL" + - "HIGH" + remote-url: "http://your-fastapi-service-url/check-dependency" + api-key: "your-api-key" + insecure-external-code-execution: deny + + - package-ecosystem: "docker" + directory: "/" + schedule: + interval: "weekly" + open-pull-requests-limit: 5 + reviewers: + - "your-github-username" + assignees: + - "your-github-username" + commit-message: + prefix: "chore" + include: "scope" + target-branch: "main" + labels: + - "docker" + ignore: + - dependency-name: "*" + update-types: ["version-update:semver-patch"] + extend-update-types: + - name: "security" + security-severities: + - "CRITICAL" + - "HIGH" + remote-url: "http://your-fastapi-service-url/check-docker-image" + api-key: "your-api-key" + insecure-external-code-execution: deny \ No newline at end of file diff --git a/app/README.md b/app/README.md index 9e702eb..a75153c 100644 --- a/app/README.md +++ b/app/README.md @@ -1,3 +1,7 @@ +It's a FastAPI-driven API endpoint that serves several functions for AI and data processing workflows. + +It includes logic for agent and LLM tool-driven reasoning to determine actions, features execution through Docker for AI tasks, MinIO client interactions for S3 storage processes, and data ingestion that structures and stores web data into MinIO and Weaviate. The application is also set up to provide a basic web interface that can be expanded for various front-end use cases. It’s a multifaceted backend designed to support advanced AI data interactions and management. + Dockerizing Instructions `docker build -t cdaprod/cda-minio-control .` diff --git a/app/TODO.md b/app/TODO.md new file mode 100644 index 0000000..9bc6771 --- /dev/null +++ b/app/TODO.md @@ -0,0 +1,239 @@ +gPt4 + +Improving your `App.py` application involves incorporating the enhancements discussed earlier. Below is an updated code snippet that integrates these changes: + +```python +from fastapi import FastAPI, HTTPException, Depends +from pydantic import BaseModel, HttpUrl +from minio import Minio +import requests +from typing import List +from langchain_openai import ChatOpenAI +from langchain.agents import AgentExecutor +from langchain.agents.format_scratchpad.openai_tools import format_to_openai_tool_messages +from langchain.agents.output_parsers.openai_tools import OpenAIToolsAgentOutputParser + +# Additional imports for enhanced functionality +from weaviate_integration import WeaviateClient +from docker_integration import DockerManager +from reasoning_agent import ReasoningAgent + +# Existing BaseModel classes here... + +class AgentAction(BaseModel): + action_type: str + data: dict + +# Enhanced MinIO Operations with metadata handling +class MinIOEnhancedOperations(MinIOOperations): + # Override methods as needed and add metadata operations + pass + +# DockerManager to execute scripts within Docker and bind to tools +class DockerManagerWithToolBinding(DockerManager): + # Override methods as needed and add @tool binding + pass + +# Weaviate client to perform vector searches and sync with MinIO +weaviate_client = WeaviateClient(...) + +# Implement the reasoning logic for agents +reasoning_agent = ReasoningAgent(...) + +app = FastAPI() + +# Replace existing endpoints with enhanced versions... + +@app.post("/execute-script/") +async def execute_script(bucket_name: str, script_name: str): + # Enhanced logic for executing scripts using DockerManagerWithToolBinding + pass + +@app.post("/process-agent-action/") +async def process_agent_action(agent_action: AgentAction): + # Use reasoning_agent to determine and execute the action + pass + +@app.post("/vector-search/") +async def perform_vector_search(query: str): + # Use weaviate_client to perform vector search and return results + pass + +# Additional endpoints as needed for new features... + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) +``` + +Remember to replace the placeholders like `WeaviateClient(...)`, `DockerManagerWithToolBinding`, and `ReasoningAgent(...)` with actual implementation details tailored to your specific requirements. + +Ensure to thoroughly test the enhanced functionalities, especially the interactions between MinIO, Docker, and Weaviate, to validate their correct integration into the FastAPI service. + +--- + +# claud opus +Certainly! Here's an updated version of `app.py` that incorporates the MinIO Docker features into buckets as an agent tool feature store: + +```python +import asyncio +import os +import re +import io +import tempfile +import uuid +import openai +from typing import List, Optional, Dict +from concurrent.futures import ThreadPoolExecutor +from fastapi import FastAPI, HTTPException, Depends +from pydantic import BaseModel, HttpUrl +from minio import Minio +import requests +from unstructured.partition.auto import partition +from docker import from_env as docker_from_env +from langchain.chat_models import ChatOpenAI +from langchain.agents import tool, AgentExecutor +from langchain.agents import AgentType +from langchain.memory import ConversationBufferMemory +from langchain.callbacks.base import BaseCallbackManager +from langchain.agents import initialize_agent +from langchain.prompts import ChatPromptTemplate +import uvicorn +from minio.error import S3Error +import weaviate +import json + +# Additional imports for enhanced functionality +from weaviate_integration import WeaviateClient +from docker_integration import DockerManager +from reasoning_agent import ReasoningAgent + +openai_api_key = os.getenv("OPENAI_API_KEY") +if openai_api_key is None: + raise ValueError("OPENAI_API_KEY environment variable is not set.") + +llm = ChatOpenAI(api_key=openai_api_key, model_name="gpt-3.5-turbo") + +# Existing BaseModel classes here... + +class AgentAction(BaseModel): + action_type: str + data: dict + +# Enhanced MinIO Operations with metadata handling +class MinIOEnhancedOperations(MinIOOperations): + def __init__(self, config: MinIOConfig): + super().__init__(config) + + async def store_object_with_metadata(self, bucket_name: str, object_name: str, file_path: str, metadata: dict): + self.client.fput_object(bucket_name, object_name, file_path, metadata=metadata) + + async def retrieve_object_metadata(self, bucket_name: str, object_name: str) -> dict: + return self.client.stat_object(bucket_name, object_name).metadata + +# DockerManager to execute scripts within Docker and bind to tools +class DockerManagerWithToolBinding(DockerManager): + def __init__(self, config: DockerConfig): + super().__init__(config) + + @tool + async def execute_script_in_docker(self, execution_request: ScriptExecutionRequest): + response = self.minio_ops.client.get_object(execution_request.bucket_name, execution_request.script_name) + script_content = response.read().decode('utf-8') + container_name = await self.docker_ops.execute_script(script_content) + return {"message": "Execution started", "container_name": container_name} + +# MinIO Feature Store for Agent Tools +class MinIOFeatureStore: + def __init__(self, minio_ops: MinIOEnhancedOperations): + self.minio_ops = minio_ops + + async def store_feature(self, tool_name: str, feature_name: str, feature_data: dict): + bucket_name = f"agent-tools-{tool_name}" + object_name = f"{feature_name}.json" + file_path = f"/tmp/{object_name}" + with open(file_path, 'w') as file: + json.dump(feature_data, file) + await self.minio_ops.store_object_with_metadata(bucket_name, object_name, file_path, metadata={}) + os.remove(file_path) + + async def retrieve_feature(self, tool_name: str, feature_name: str) -> dict: + bucket_name = f"agent-tools-{tool_name}" + object_name = f"{feature_name}.json" + file_path = f"/tmp/{object_name}" + await self.minio_ops.retrieve_object(bucket_name, object_name, file_path) + with open(file_path, 'r') as file: + feature_data = json.load(file) + os.remove(file_path) + return feature_data + +# Weaviate client to perform vector searches and sync with MinIO +weaviate_client = WeaviateClient(url="http://localhost:8080", timeout_config=(5, 15)) + +# Implement the reasoning logic for agents +reasoning_agent = ReasoningAgent( + llm=llm, + tools=[create_bucket, upload_file, list_buckets, delete_file], + memory=ConversationBufferMemory(memory_key="chat_history", return_messages=True), + output_parser=OpenAIToolsAgentOutputParser() +) + +app = FastAPI() + +# Replace existing endpoints with enhanced versions... + +@app.post("/execute-script/") +async def execute_script(bucket_name: str, script_name: str): + minio_ops = MinIOEnhancedOperations(MinIOConfig(endpoint="play.min.io:443", access_key="minioadmin", secret_key="minioadmin")) + docker_ops = DockerManagerWithToolBinding(DockerConfig(image="python:3.9-slim", command="", volumes={}, detach=True, remove=True)) + orchestrator = MinIOSystemOrchestrator(minio_ops, docker_ops) + container_name = await orchestrator.execute_script_in_docker(ScriptExecutionRequest(bucket_name=bucket_name, script_name=script_name)) + return {"message": "Execution started", "container_name": container_name} + +@app.post("/process-agent-action/") +async def process_agent_action(agent_action: AgentAction): + result = await reasoning_agent.execute_action(agent_action.action_type, agent_action.data) + return {"message": "Agent action processed", "result": result} + +@app.post("/vector-search/") +async def perform_vector_search(query: str): + result = await weaviate_client.query(query) + return {"message": "Vector search performed", "result": result} + +@app.post("/store-feature/") +async def store_feature(tool_name: str, feature_name: str, feature_data: dict): + minio_ops = MinIOEnhancedOperations(MinIOConfig(endpoint="play.min.io:443", access_key="minioadmin", secret_key="minioadmin")) + feature_store = MinIOFeatureStore(minio_ops) + await feature_store.store_feature(tool_name, feature_name, feature_data) + return {"message": "Feature stored successfully"} + +@app.post("/retrieve-feature/") +async def retrieve_feature(tool_name: str, feature_name: str): + minio_ops = MinIOEnhancedOperations(MinIOConfig(endpoint="play.min.io:443", access_key="minioadmin", secret_key="minioadmin")) + feature_store = MinIOFeatureStore(minio_ops) + feature_data = await feature_store.retrieve_feature(tool_name, feature_name) + return {"message": "Feature retrieved successfully", "feature_data": feature_data} + +# Additional endpoints as needed for new features... + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=8000) +``` + +In this updated version: + +1. The `MinIOFeatureStore` class is introduced to handle the storage and retrieval of agent tool features using MinIO buckets. It uses the `MinIOEnhancedOperations` class to interact with MinIO and store features as JSON objects in buckets named after the agent tools. + +2. Two new endpoints, `/store-feature/` and `/retrieve-feature/`, are added to allow storing and retrieving agent tool features. The `/store-feature/` endpoint takes the tool name, feature name, and feature data as input and stores the feature in the corresponding MinIO bucket. The `/retrieve-feature/` endpoint takes the tool name and feature name as input and retrieves the corresponding feature data from the MinIO bucket. + +3. The `MinIOFeatureStore` class uses the `store_feature` method to store a feature in the appropriate MinIO bucket. It creates a temporary JSON file with the feature data and uses the `store_object_with_metadata` method of `MinIOEnhancedOperations` to store the file in the bucket. + +4. The `MinIOFeatureStore` class uses the `retrieve_feature` method to retrieve a feature from the appropriate MinIO bucket. It uses the `retrieve_object` method of `MinIOEnhancedOperations` to download the JSON file from the bucket and load the feature data. + +This implementation allows agent tools to store and retrieve features using MinIO buckets as a feature store. The features are stored as JSON objects in buckets named after the agent tools, providing a convenient way to persist and access tool-specific data. + +Please note that this is a simplified version, and you may need to adapt it further based on your specific requirements and integration with other components of your system. + +Remember to replace the placeholders and connection details (e.g., MinIO credentials) with the appropriate values for your environment. + +Thoroughly test the implemented functionality to ensure it works as expected and integrates well with the rest of your FastAPI service. \ No newline at end of file diff --git a/app/app.py b/app/app.py new file mode 100644 index 0000000..a8fb5df --- /dev/null +++ b/app/app.py @@ -0,0 +1,425 @@ +import asyncio +import os +import re +import io +import tempfile +import uuid +import openai +from typing import List, Optional, Dict +from concurrent.futures import ThreadPoolExecutor +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel, HttpUrl +from minio import Minio +import requests +from unstructured.partition.auto import partition +from docker import from_env as docker_from_env +from langchain.chat_models import ChatOpenAI +from langchain.agents import tool, AgentExecutor +from langchain.agents import AgentType +from langchain.memory import ConversationBufferMemory +from langchain.callbacks.base import BaseCallbackManager +from langchain.agents import initialize_agent +from langchain.prompts import ChatPromptTemplate +import uvicorn +from minio.error import S3Error +import weaviate +import json + + +openai_api_key = os.getenv("OPENAI_API_KEY") +if openai_api_key is None: + raise ValueError("OPENAI_API_KEY environment variable is not set.") + +#llm = ChatOpenAI(api_key=openai_api_key) +llm = ChatOpenAI(api_key=openai_api_key, model_name="gpt-3.5-turbo") + +class MinIOConfig(BaseModel): + endpoint: str + access_key: str + secret_key: str + secure: bool = True + +class DockerConfig(BaseModel): + image: str + command: str + volumes: dict + detach: bool = True + remove: bool = True + +class URLItem(BaseModel): + url: HttpUrl + +class Document(BaseModel): + source: str + content: str + +class ScriptExecutionRequest(BaseModel): + bucket_name: str + script_name: str + +class LangChainInput(BaseModel): + input_text: str + +class ClientConfig(BaseModel): + minio_endpoint: str = "play.min.io:9000" + minio_access_key: str = "minioadmin" + minio_secret_key: str = "minioadmin" + weaviate_endpoint: str = "http://localhost:8080" + +class MinioClientModel(BaseModel): + config: ClientConfig + + def get_client(self) -> Minio: + return Minio( + self.config.minio_endpoint, + access_key=self.config.minio_access_key, + secret_key=self.config.minio_secret_key, + secure=True # Set to False if you are not using https + ) + +class WeaviateClientModel(BaseModel): + config: ClientConfig + + def get_client(self) -> weaviate.Client: + return weaviate.Client( + url=self.config.weaviate_endpoint, + timeout_config=(5, 15) + ) + +class LangChainQueryModel(BaseModel): + query: str + context: Optional[Dict] = None + +class AgentActionModel(BaseModel): + action: str + parameters: Optional[Dict] = None + +def sanitize_url_to_object_name(url: str) -> str: + clean_url = re.sub(r'^https?://', '', url) + clean_url = re.sub(r'[^\w\-_\.]', '_', clean_url) + return clean_url[:250] + '.txt' + +def prepare_text_for_tokenization(text: str) -> str: + clean_text = re.sub(r'\s+', ' ', text).strip() + return clean_text + +class MinIOOperations: + def __init__(self, config: MinIOConfig): + self.client = Minio(config.endpoint, access_key=config.access_key, secret_key=config.secret_key, secure=config.secure) + + async def store_object(self, bucket_name: str, object_name: str, file_path: str): + self.client.fput_object(bucket_name, object_name, file_path) + + async def retrieve_object(self, bucket_name: str, object_name: str, file_path: str): + self.client.fget_object(bucket_name, object_name, file_path) + +class DockerOperations: + def __init__(self, config: DockerConfig): + self.client = docker_from_env() + self.config = config + + async def execute_script(self, script_content: str): + with tempfile.NamedTemporaryFile(delete=False, suffix=".py") as script_file: + script_file.write(script_content.encode()) + script_path = script_file.name + container_name = f"script_execution_{uuid.uuid4()}" + container = self.client.containers.run( + image=self.config.image, + command=f"python {script_path}", + volumes=self.config.volumes, + detach=self.config.detach, + remove=self.config.remove, + name=container_name + ) + return container_name + +class MinIOSystemOrchestrator: + def __init__(self, minio_config: MinIOConfig, docker_config: DockerConfig): + self.minio_ops = MinIOOperations(minio_config) + self.docker_ops = DockerOperations(docker_config) + + async def execute_url_etl_pipeline(self, urls: List[URLItem]): + async with ThreadPoolExecutor() as executor: + loop = asyncio.get_running_loop() + tasks = [loop.run_in_executor(executor, self.process_url, url) for url in urls] + await asyncio.gather(*tasks) + + def process_url(self, url_item: URLItem): + response = requests.get(url_item.url) + response.raise_for_status() + html_content = io.BytesIO(response.content) + elements = partition(file=html_content, content_type="text/html") + combined_text = "\n".join([e.text for e in elements if hasattr(e, 'text')]) + combined_text = prepare_text_for_tokenization(combined_text) + object_name = sanitize_url_to_object_name(url_item.url) + with tempfile.NamedTemporaryFile(delete=False, mode="w", encoding="utf-8", suffix=".txt") as tmp_file: + tmp_file.write(combined_text) + tmp_file_path = tmp_file.name + asyncio.run(self.minio_ops.store_object("your_bucket_name", object_name, tmp_file_path)) + os.remove(tmp_file_path) + + async def execute_script_in_docker(self, execution_request: ScriptExecutionRequest): + response = self.minio_ops.client.get_object(execution_request.bucket_name, execution_request.script_name) + script_content = response.read().decode('utf-8') + container_name = await self.docker_ops.execute_script(script_content) + return {"message": "Execution started", "container_name": container_name} + +app = FastAPI() + +# Initialize Minio client +minio_client = Minio( + os.getenv("MINIO_ENDPOINT", "play.min.io"), + access_key=os.getenv("MINIO_ACCESS_KEY", "minioadmin"), + secret_key=os.getenv("MINIO_SECRET_KEY", "minioadmin"), + secure=True +) + +@tool +def create_bucket(bucket_name: str) -> str: + """ + Creates a new bucket in Minio. + + Args: + bucket_name (str): The name of the bucket to create. + + Returns: + str: A message indicating the result of the bucket creation. + """ + try: + minio_client.make_bucket(bucket_name) + return f"Bucket {bucket_name} created successfully." + except S3Error as e: + return f"Failed to create bucket {bucket_name}: {str(e)}" + +@tool +def upload_file(bucket_name: str, file_path: str, object_name: str) -> str: + """ + Uploads a file to a specified bucket in Minio. + + Args: + bucket_name (str): The name of the bucket to upload the file to. + file_path (str): The path to the file to upload. + object_name (str): The name to give to the uploaded object. + + Returns: + str: A message indicating the result of the file upload. + """ + try: + minio_client.fput_object(bucket_name, object_name, file_path) + return f"File {file_path} uploaded as {object_name} in bucket {bucket_name}." + except S3Error as e: + return f"Failed to upload file {file_path} to bucket {bucket_name}: {str(e)}" + +@tool +def list_buckets() -> str: + """ + Lists all buckets in Minio. + + Returns: + str: A message containing the list of buckets. + """ + try: + buckets = minio_client.list_buckets() + bucket_list = [bucket.name for bucket in buckets] + return f"Buckets: {', '.join(bucket_list)}" + except S3Error as e: + return f"Failed to list buckets: {str(e)}" + +@tool +def delete_file(bucket_name: str, object_name: str) -> str: + """ + Deletes a specified file from a bucket in Minio. + + Args: + bucket_name (str): The name of the bucket containing the file. + object_name (str): The name of the file to delete. + + Returns: + str: A message indicating the result of the file deletion. + """ + try: + minio_client.remove_object(bucket_name, object_name) + return f"File {object_name} deleted from bucket {bucket_name}." + except S3Error as e: + return f"Failed to delete file {object_name} from bucket {bucket_name}: {str(e)}" + +# Define LangChain prompts for MinIO operations +minio_prompts = { + 'create_bucket': ChatPromptTemplate.from_messages([ + ("system", "You are a helpful MinIO assistant."), + ("human", "Create a new bucket named '{bucket_name}'.") + ]), + 'upload_file': ChatPromptTemplate.from_messages([ + ("system", "You are a helpful MinIO assistant."), + ("human", "Upload file '{file_path}' to bucket '{bucket_name}'.") + ]), + 'list_buckets': ChatPromptTemplate.from_messages([ + ("system", "You are a helpful MinIO assistant."), + ("human", "List all buckets.") + ]), + 'delete_file': ChatPromptTemplate.from_messages([ + ("system", "You are a helpful MinIO assistant."), + ("human", "Delete file '{object_name}' from bucket '{bucket_name}'.") + ]) +} + +# Define a prompt for Docker operations +docker_prompts = { + 'execute_script': ChatPromptTemplate.from_messages([ + ("system", "You are a Docker operations assistant."), + ("human", "Execute script '{script_name}' using Docker image '{image}'.") + ]) +} + +# Rest of the code remains unchanged + +# ... + + + + +def execute_langchain_query(query: str, context: Optional[Dict] = None) -> str: + """ + Executes a query using LangChain and returns the response. + + Args: + query (str): The query to be executed. + context (Optional[Dict], optional): Additional context for the query execution. Defaults to None. + + Returns: + str: The result of the query execution. + """ + # Placeholder for LangChain API endpoint + langchain_endpoint = "https://api.langchain.com/query" + + # Prepare the payload + payload = LangChainQueryModel(query=query, context=context or {}).dict() + + try: + # Execute the query + response = requests.post(langchain_endpoint, json=payload) + + # Check for successful request + if response.status_code == 200: + # Assuming the response contains a JSON with a key 'result' + return response.json().get('result', 'No result found.') + else: + return f"LangChain query failed with status code {response.status_code}" + except Exception as e: + return f"Failed to execute LangChain query: {str(e)}" + +def execute_agent_action(action: str, parameters: Optional[Dict] = None) -> str: + """ + Simulates an agent action execution based on the specified action and parameters. + + Args: + action (str): The action to be executed by the agent. + parameters (Optional[Dict], optional): Parameters for the action execution. Defaults to None. + + Returns: + str: A message indicating the result of the action execution. + """ + # Simulate action processing + # In a real scenario, this function would interact with an agent control system + action_details = AgentActionModel(action=action, parameters=parameters or {}) + print(f"Executing action: {action_details.action} with parameters: {json.dumps(action_details.parameters)}") + + # Placeholder for action execution result + return f"Action '{action}' executed successfully." + +def hydrate_data(url: str, bucket_name: str, config: ClientConfig): + minio_client = MinioClientModel(config=config).get_client() + weaviate_client = WeaviateClientModel(config=config).get_client() + + # Fetch the data + response = requests.get(url) + if response.status_code != 200: + print(f"Failed to fetch {url}") + return + + # Store in Minio + file_name = os.path.basename(url) + file_content = response.content + temp_dir = tempfile.gettempdir() + file_path = os.path.join(temp_dir, file_name) + + with open(file_path, 'wb') as file: + file.write(file_content) + + try: + minio_client.fput_object(bucket_name, file_name, file_path) + print(f"Stored {file_name} in bucket {bucket_name}.") + except Exception as e: + print(f"Failed to store {file_name} in Minio: {str(e)}") + return + + # Index in Weaviate + data_object = { + "content": file_content.decode("utf-8"), # Assuming the content is text + "sourceUrl": url + } + try: + weaviate_client.data_object.create(data_object, "Articles") + print(f"Indexed content from {url}.") + except Exception as e: + print(f"Failed to index content from {url} in Weaviate: {str(e)}") + +@app.get("/") +def read_root(): + return {"Hello": "World"} + +@app.post("/execute/{bucket_name}/{script_name}") +async def execute_script(bucket_name: str, script_name: str): + orchestrator = MinIOSystemOrchestrator( + MinIOConfig(endpoint="play.min.io:443", access_key="minioadmin", secret_key="minioadmin"), + DockerConfig(image="python:3.9-slim", command="", volumes={}, detach=True, remove=True) + ) + container_name = await orchestrator.execute_script_in_docker(ScriptExecutionRequest(bucket_name=bucket_name, script_name=script_name)) + return {"message": "Execution started", "container_name": container_name} + +# openai_api_key = os.getenv("OPENAI_API_KEY") +# if openai_api_key is None: +# raise ValueError("OPENAI_API_KEY environment variable is not set.") + +# llm = ChatOpenAI(api_key=openai_api_key) +#minio_tools = [create_bucket, upload_file, list_buckets, delete_file] +tools = [create_bucket, upload_file, list_buckets, delete_file] +memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) +agent = initialize_agent(tools, llm, agent=AgentType.CONVERSATIONAL_REACT_DESCRIPTION, verbose=True, memory=memory, + callbacks=[BaseCallbackManager()]) + +from langchain.agents import create_structured_chat_agent, create_json_chat_agent +# Initialization for tools, llm, and memory remains the same + +# Structured Chat Agent +structured_chat_agent = create_structured_chat_agent(llm, tools, memory=memory, prompt=prompt, verbose=True) + +# JSON Chat Agent +json_chat_agent = create_json_chat_agent(llm, tools, memory=memory, verbose=True) + +@app.post("/langchain-execute/") +async def execute_langchain_integration(input: LangChainInput): + result = agent.run(input=input.input_text) + return {"message": "LangChain processing completed", "result": result} + +@app.post("/hydrate-data/") +async def hydrate_data_endpoint(input: dict): + hydrate_data(input) + return {"message": "Data hydration completed"} + +@app.post("/agent-action/") +async def execute_agent_action_endpoint(input: dict): + result = execute_agent_action(input) + return {"message": "Agent action executed", "result": result} + +@app.post("/minio-webhook/") +async def handle_minio_webhook(event: dict): + event_name = event.get("EventName") + if event_name == "s3:ObjectCreated:Put": + pass + elif event_name == "s3:ObjectRemoved:Delete": + pass + return {"message": "Webhook event processed successfully"} + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/app/app.py.old b/app/app.py.old new file mode 100644 index 0000000..ab01df9 --- /dev/null +++ b/app/app.py.old @@ -0,0 +1,396 @@ +import asyncio +import os +import re +import io +import tempfile +import uuid +import openai +from typing import List, Optional, Dict +from concurrent.futures import ThreadPoolExecutor +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel, HttpUrl +from minio import Minio +import requests +from unstructured.partition.auto import partition +from docker import from_env as docker_from_env +#from langchain.chat_models import ChatOpenAI +#from langchain_openai import ChatOpenAI +from langchain.agents import tool, AgentExecutor +from langchain.agents import AgentType +from langchain.memory import ConversationBufferMemory +from langchain_core.callbacks.base import BaseCallbackManager +from langchain.agents import initialize_agent +import uvicorn +#from llama_index.tools import FunctionTool +from minio.error import S3Error +import weaviate +import json +#from langchain_community.chat_models import ChatOpenAI +from langchain_community.chat_models import ChatOpenAI + + +openai_api_key = os.getenv("OPENAI_API_KEY") +if openai_api_key is None: + raise ValueError("OPENAI_API_KEY environment variable is not set.") + +#llm = ChatOpenAI(api_key=openai_api_key) +llm = ChatOpenAI(api_key=openai_api_key, model_name="gpt-3.5-turbo") + +class MinIOConfig(BaseModel): + endpoint: str + access_key: str + secret_key: str + secure: bool = True + +class DockerConfig(BaseModel): + image: str + command: str + volumes: dict + detach: bool = True + remove: bool = True + +class URLItem(BaseModel): + url: HttpUrl + +class Document(BaseModel): + source: str + content: str + +class ScriptExecutionRequest(BaseModel): + bucket_name: str + script_name: str + +class LangChainInput(BaseModel): + input_text: str + +class ClientConfig(BaseModel): + minio_endpoint: str = "play.min.io:9000" + minio_access_key: str = "minioadmin" + minio_secret_key: str = "minioadmin" + weaviate_endpoint: str = "http://localhost:8080" + +class MinioClientModel(BaseModel): + config: ClientConfig + + def get_client(self) -> Minio: + return Minio( + self.config.minio_endpoint, + access_key=self.config.minio_access_key, + secret_key=self.config.minio_secret_key, + secure=True # Set to False if you are not using https + ) + +class WeaviateClientModel(BaseModel): + config: ClientConfig + + def get_client(self) -> weaviate.Client: + return weaviate.Client( + url=self.config.weaviate_endpoint, + timeout_config=(5, 15) + ) + +class LangChainQueryModel(BaseModel): + query: str + context: Optional[Dict] = None + +class AgentActionModel(BaseModel): + action: str + parameters: Optional[Dict] = None + +def sanitize_url_to_object_name(url: str) -> str: + clean_url = re.sub(r'^https?://', '', url) + clean_url = re.sub(r'[^\w\-_\.]', '_', clean_url) + return clean_url[:250] + '.txt' + +def prepare_text_for_tokenization(text: str) -> str: + clean_text = re.sub(r'\s+', ' ', text).strip() + return clean_text + +class MinIOOperations: + def __init__(self, config: MinIOConfig): + self.client = Minio(config.endpoint, access_key=config.access_key, secret_key=config.secret_key, secure=config.secure) + + async def store_object(self, bucket_name: str, object_name: str, file_path: str): + self.client.fput_object(bucket_name, object_name, file_path) + + async def retrieve_object(self, bucket_name: str, object_name: str, file_path: str): + self.client.fget_object(bucket_name, object_name, file_path) + +class DockerOperations: + def __init__(self, config: DockerConfig): + self.client = docker_from_env() + self.config = config + + async def execute_script(self, script_content: str): + with tempfile.NamedTemporaryFile(delete=False, suffix=".py") as script_file: + script_file.write(script_content.encode()) + script_path = script_file.name + container_name = f"script_execution_{uuid.uuid4()}" + container = self.client.containers.run( + image=self.config.image, + command=f"python {script_path}", + volumes=self.config.volumes, + detach=self.config.detach, + remove=self.config.remove, + name=container_name + ) + return container_name + +class MinIOSystemOrchestrator: + def __init__(self, minio_config: MinIOConfig, docker_config: DockerConfig): + self.minio_ops = MinIOOperations(minio_config) + self.docker_ops = DockerOperations(docker_config) + + async def execute_url_etl_pipeline(self, urls: List[URLItem]): + async with ThreadPoolExecutor() as executor: + loop = asyncio.get_running_loop() + tasks = [loop.run_in_executor(executor, self.process_url, url) for url in urls] + await asyncio.gather(*tasks) + + def process_url(self, url_item: URLItem): + response = requests.get(url_item.url) + response.raise_for_status() + html_content = io.BytesIO(response.content) + elements = partition(file=html_content, content_type="text/html") + combined_text = "\n".join([e.text for e in elements if hasattr(e, 'text')]) + combined_text = prepare_text_for_tokenization(combined_text) + object_name = sanitize_url_to_object_name(url_item.url) + with tempfile.NamedTemporaryFile(delete=False, mode="w", encoding="utf-8", suffix=".txt") as tmp_file: + tmp_file.write(combined_text) + tmp_file_path = tmp_file.name + asyncio.run(self.minio_ops.store_object("your_bucket_name", object_name, tmp_file_path)) + os.remove(tmp_file_path) + + async def execute_script_in_docker(self, execution_request: ScriptExecutionRequest): + response = self.minio_ops.client.get_object(execution_request.bucket_name, execution_request.script_name) + script_content = response.read().decode('utf-8') + container_name = await self.docker_ops.execute_script(script_content) + return {"message": "Execution started", "container_name": container_name} + +app = FastAPI() + +minio_client = Minio( + os.getenv("MINIO_ENDPOINT", "play.min.io"), + access_key=os.getenv("MINIO_ACCESS_KEY", "minioadmin"), + secret_key=os.getenv("MINIO_SECRET_KEY", "minioadmin"), + secure=True +) + +@tool +def create_bucket(bucket_name: str) -> str: + """ + Creates a new bucket in Minio. + + Args: + bucket_name (str): The name of the bucket to create. + + Returns: + str: A message indicating the result of the bucket creation. + """ + try: + minio_client.make_bucket(bucket_name) + return f"Bucket {bucket_name} created successfully." + except S3Error as e: + return f"Failed to create bucket {bucket_name}: {str(e)}" + +@tool +def upload_file(bucket_name: str, file_path: str, object_name: str) -> str: + """ + Uploads a file to a specified bucket in Minio. + + Args: + bucket_name (str): The name of the bucket to upload the file to. + file_path (str): The path to the file to upload. + object_name (str): The name to give to the uploaded object. + + Returns: + str: A message indicating the result of the file upload. + """ + try: + minio_client.fput_object(bucket_name, object_name, file_path) + return f"File {file_path} uploaded as {object_name} in bucket {bucket_name}." + except S3Error as e: + return f"Failed to upload file {file_path} to bucket {bucket_name}: {str(e)}" + +@tool +def list_buckets() -> str: + """ + Lists all buckets in Minio. + + Returns: + str: A message containing the list of buckets. + """ + try: + buckets = minio_client.list_buckets() + bucket_list = [bucket.name for bucket in buckets] + return f"Buckets: {', '.join(bucket_list)}" + except S3Error as e: + return f"Failed to list buckets: {str(e)}" + +@tool +def delete_file(bucket_name: str, object_name: str) -> str: + """ + Deletes a specified file from a bucket in Minio. + + Args: + bucket_name (str): The name of the bucket containing the file. + object_name (str): The name of the file to delete. + + Returns: + str: A message indicating the result of the file deletion. + """ + try: + minio_client.remove_object(bucket_name, object_name) + return f"File {object_name} deleted from bucket {bucket_name}." + except S3Error as e: + return f"Failed to delete file {object_name} from bucket {bucket_name}: {str(e)}" + + +# Saving the tools in a list for access +minio_tools = [create_bucket, upload_file, list_buckets, delete_file] + +def execute_langchain_query(query: str, context: Optional[Dict] = None) -> str: + """ + Executes a query using LangChain and returns the response. + + Args: + query (str): The query to be executed. + context (Optional[Dict], optional): Additional context for the query execution. Defaults to None. + + Returns: + str: The result of the query execution. + """ + # Placeholder for LangChain API endpoint + langchain_endpoint = "https://api.langchain.com/query" + + # Prepare the payload + payload = LangChainQueryModel(query=query, context=context or {}).dict() + + try: + # Execute the query + response = requests.post(langchain_endpoint, json=payload) + + # Check for successful request + if response.status_code == 200: + # Assuming the response contains a JSON with a key 'result' + return response.json().get('result', 'No result found.') + else: + return f"LangChain query failed with status code {response.status_code}" + except Exception as e: + return f"Failed to execute LangChain query: {str(e)}" + +def execute_agent_action(action: str, parameters: Optional[Dict] = None) -> str: + """ + Simulates an agent action execution based on the specified action and parameters. + + Args: + action (str): The action to be executed by the agent. + parameters (Optional[Dict], optional): Parameters for the action execution. Defaults to None. + + Returns: + str: A message indicating the result of the action execution. + """ + # Simulate action processing + # In a real scenario, this function would interact with an agent control system + action_details = AgentActionModel(action=action, parameters=parameters or {}) + print(f"Executing action: {action_details.action} with parameters: {json.dumps(action_details.parameters)}") + + # Placeholder for action execution result + return f"Action '{action}' executed successfully." + +def hydrate_data(url: str, bucket_name: str, config: ClientConfig): + minio_client = MinioClientModel(config=config).get_client() + weaviate_client = WeaviateClientModel(config=config).get_client() + + # Fetch the data + response = requests.get(url) + if response.status_code != 200: + print(f"Failed to fetch {url}") + return + + # Store in Minio + file_name = os.path.basename(url) + file_content = response.content + temp_dir = tempfile.gettempdir() + file_path = os.path.join(temp_dir, file_name) + + with open(file_path, 'wb') as file: + file.write(file_content) + + try: + minio_client.fput_object(bucket_name, file_name, file_path) + print(f"Stored {file_name} in bucket {bucket_name}.") + except Exception as e: + print(f"Failed to store {file_name} in Minio: {str(e)}") + return + + # Index in Weaviate + data_object = { + "content": file_content.decode("utf-8"), # Assuming the content is text + "sourceUrl": url + } + try: + weaviate_client.data_object.create(data_object, "Articles") + print(f"Indexed content from {url}.") + except Exception as e: + print(f"Failed to index content from {url} in Weaviate: {str(e)}") + +@app.get("/") +def read_root(): + return {"Hello": "World"} + +@app.post("/execute/{bucket_name}/{script_name}") +async def execute_script(bucket_name: str, script_name: str): + orchestrator = MinIOSystemOrchestrator( + MinIOConfig(endpoint="play.min.io:443", access_key="minioadmin", secret_key="minioadmin"), + DockerConfig(image="python:3.9-slim", command="", volumes={}, detach=True, remove=True) + ) + container_name = await orchestrator.execute_script_in_docker(ScriptExecutionRequest(bucket_name=bucket_name, script_name=script_name)) + return {"message": "Execution started", "container_name": container_name} + +# openai_api_key = os.getenv("OPENAI_API_KEY") +# if openai_api_key is None: +# raise ValueError("OPENAI_API_KEY environment variable is not set.") + +# llm = ChatOpenAI(api_key=openai_api_key) +#minio_tools = [create_bucket, upload_file, list_buckets, delete_file] +tools = minio_tools +memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) +agent = initialize_agent(tools, llm, agent=AgentType.CONVERSATIONAL_REACT_DESCRIPTION, verbose=True, memory=memory, + callback_manager=BaseCallbackManager(handlers=[])) + +from langchain.agents import create_structured_chat_agent, create_json_chat_agent +# Initialization for tools, llm, and memory remains the same + +# Structured Chat Agent +structured_chat_agent = create_structured_chat_agent(llm, tools, memory=memory, prompt=prompt, verbose=True) + +# JSON Chat Agent +json_chat_agent = create_json_chat_agent(llm, tools, memory=memory, verbose=True) + +@app.post("/langchain-execute/") +async def execute_langchain_integration(input: LangChainInput): + result = agent.run(input=input.input_text) + return {"message": "LangChain processing completed", "result": result} + +@app.post("/hydrate-data/") +async def hydrate_data_endpoint(input: dict): + hydrate_data(input) + return {"message": "Data hydration completed"} + +@app.post("/agent-action/") +async def execute_agent_action_endpoint(input: dict): + result = execute_agent_action(input) + return {"message": "Agent action executed", "result": result} + +@app.post("/minio-webhook/") +async def handle_minio_webhook(event: dict): + event_name = event.get("EventName") + if event_name == "s3:ObjectCreated:Put": + pass + elif event_name == "s3:ObjectRemoved:Delete": + pass + return {"message": "Webhook event processed successfully"} + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/app/compiled.py.old b/app/compiled.py.old new file mode 100644 index 0000000..ef88108 --- /dev/null +++ b/app/compiled.py.old @@ -0,0 +1,412 @@ +main.py=""" +import asyncio +import os +import re +import io +import tempfile +import uuid +from typing import List +from concurrent.futures import ThreadPoolExecutor +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel, HttpUrl +from minio import Minio +import requests +from unstructured.partition.auto import partition +from docker import from_env as docker_from_env +from langchain_openai import ChatOpenAI +from langchain.agents import tool, AgentExecutor +from langchain.agents.format_scratchpad.openai_tools import format_to_openai_tool_messages +from langchain.agents.output_parsers.openai_tools import OpenAIToolsAgentOutputParser +from langchain_core.messages import AIMessage, HumanMessage +from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder +from langchain_core.runnables import RunnableLambda +import uvicorn + +from minio_langchain_tools import upload_file_to_minio, download_file_from_minio, list_objects_in_minio_bucket +from hydrate_integration import hydrate_data +from langchain_integration import execute_langchain_query +from agent_control_integration import execute_agent_action + +class MinIOConfig(BaseModel): + endpoint: str + access_key: str + secret_key: str + secure: bool = True + +class DockerConfig(BaseModel): + image: str + command: str + volumes: dict + detach: bool = True + remove: bool = True + +class URLItem(BaseModel): + url: HttpUrl + +class Document(BaseModel): + source: str + content: str + +class ScriptExecutionRequest(BaseModel): + bucket_name: str + script_name: str + +class LangChainInput(BaseModel): + input_text: str + +def sanitize_url_to_object_name(url: str) -> str: + clean_url = re.sub(r'^https?://', '', url) + clean_url = re.sub(r'[^\w\-_\.]', '_', clean_url) + return clean_url[:250] + '.txt' + +def prepare_text_for_tokenization(text: str) -> str: + clean_text = re.sub(r'\s+', ' ', text).strip() + return clean_text + +class MinIOOperations: + def __init__(self, config: MinIOConfig): + self.client = Minio(config.endpoint, access_key=config.access_key, secret_key=config.secret_key, secure=config.secure) + + async def store_object(self, bucket_name: str, object_name: str, file_path: str): + self.client.fput_object(bucket_name, object_name, file_path) + + async def retrieve_object(self, bucket_name: str, object_name: str, file_path: str): + self.client.fget_object(bucket_name, object_name, file_path) + +class DockerOperations: + def __init__(self, config: DockerConfig): + self.client = docker_from_env() + self.config = config + + async def execute_script(self, script_content: str): + with tempfile.NamedTemporaryFile(delete=False, suffix=".py") as script_file: + script_file.write(script_content.encode()) + script_path = script_file.name + container_name = f"script_execution_{uuid.uuid4()}" + container = self.client.containers.run( + image=self.config.image, + command=f"python {script_path}", + volumes=self.config.volumes, + detach=self.config.detach, + remove=self.config.remove, + name=container_name + ) + return container_name + +class MinIOSystemOrchestrator: + def __init__(self, minio_config: MinIOConfig, docker_config: DockerConfig): + self.minio_ops = MinIOOperations(minio_config) + self.docker_ops = DockerOperations(docker_config) + + async def execute_url_etl_pipeline(self, urls: List[URLItem]): + async with ThreadPoolExecutor() as executor: + loop = asyncio.get_running_loop() + tasks = [loop.run_in_executor(executor, self.process_url, url) for url in urls] + await asyncio.gather(*tasks) + + def process_url(self, url_item: URLItem): + response = requests.get(url_item.url) + response.raise_for_status() + html_content = io.BytesIO(response.content) + elements = partition(file=html_content, content_type="text/html") + combined_text = "\n".join([e.text for e in elements if hasattr(e, 'text')]) + combined_text = prepare_text_for_tokenization(combined_text) + object_name = sanitize_url_to_object_name(url_item.url) + with tempfile.NamedTemporaryFile(delete=False, mode="w", encoding="utf-8", suffix=".txt") as tmp_file: + tmp_file.write(combined_text) + tmp_file_path = tmp_file.name + asyncio.run(self.minio_ops.store_object("your_bucket_name", object_name, tmp_file_path)) + os.remove(tmp_file_path) + + async def execute_script_in_docker(self, execution_request: ScriptExecutionRequest): + response = self.minio_ops.client.get_object(execution_request.bucket_name, execution_request.script_name) + script_content = response.read().decode('utf-8') + container_name = await self.docker_ops.execute_script(script_content) + return {"message": "Execution started", "container_name": container_name} + +app = FastAPI() + +@app.get("/") +def read_root(settings: Settings = Depends(get_settings)): + return {"Hello": "World", "MinIO Endpoint": settings.minio_endpoint} + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=8000) + +@app.post("/execute/{bucket_name}/{script_name}") +async def execute_script(bucket_name: str, script_name: str): + orchestrator = MinIOSystemOrchestrator( + MinIOConfig(endpoint="play.min.io:443", access_key="minioadmin", secret_key="minioadmin"), + DockerConfig(image="python:3.8-slim", command="", volumes={}, detach=True, remove=True) + ) + container_name = await orchestrator.execute_script_in_docker(ScriptExecutionRequest(bucket_name=bucket_name, script_name=script_name)) + return {"message": "Execution started", "container_name": container_name} + +llm = ChatOpenAI(api_key=os.getenv("OPENAI_API_KEY")) + +@app.post("/langchain-execute/") +async def execute_langchain_integration(input: LangChainInput): + result = await execute_langchain_query(input.input_text) + return {"message": "LangChain processing completed", "result": result} + +@app.post("/hydrate-data/") +async def hydrate_data_endpoint(input: dict): + result = await hydrate_data(input) + return {"message": "Data hydration completed", "result": result} + +@app.post("/agent-action/") +async def execute_agent_action_endpoint(input: dict): + result = await execute_agent_action(input) + return {"message": "Agent action executed", "result": result} + +@app.post("/minio-webhook/") +async def handle_minio_webhook(event: dict): + event_name = event.get("EventName") + if event_name == "s3:ObjectCreated:Put": + pass + elif event_name == "s3:ObjectRemoved:Delete": + pass + return {"message": "Webhook event processed successfully"} + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=8000) +""" + +hydrate_integration.py=""" +from pydantic import BaseModel +import requests +from minio import Minio +import weaviate +import os +import tempfile +import io + +class ClientConfig(BaseModel): + minio_endpoint: str = "play.min.io:9000" + minio_access_key: str = "minioadmin" + minio_secret_key: str = "minioadmin" + weaviate_endpoint: str = "http://localhost:8080" + +class MinioClientModel(BaseModel): + config: ClientConfig + + def get_client(self) -> Minio: + return Minio( + self.config.minio_endpoint, + access_key=self.config.minio_access_key, + secret_key=self.config.minio_secret_key, + secure=True # Set to False if you are not using https + ) + +class WeaviateClientModel(BaseModel): + config: ClientConfig + + def get_client(self) -> weaviate.Client: + return weaviate.Client( + url=self.config.weaviate_endpoint, + timeout_config=(5, 15) + ) + +def hydrate_data(url: str, bucket_name: str, config: ClientConfig): + minio_client = MinioClientModel(config=config).get_client() + weaviate_client = WeaviateClientModel(config=config).get_client() + + # Fetch the data + response = requests.get(url) + if response.status_code != 200: + print(f"Failed to fetch {url}") + return + + # Store in Minio + file_name = os.path.basename(url) + file_content = response.content + temp_dir = tempfile.gettempdir() + file_path = os.path.join(temp_dir, file_name) + + with open(file_path, 'wb') as file: + file.write(file_content) + + try: + minio_client.fput_object(bucket_name, file_name, file_path) + print(f"Stored {file_name} in bucket {bucket_name}.") + except Exception as e: + print(f"Failed to store {file_name} in Minio: {str(e)}") + return + + # Index in Weaviate + data_object = { + "content": file_content.decode("utf-8"), # Assuming the content is text + "sourceUrl": url + } + try: + weaviate_client.data_object.create(data_object, "Articles") + print(f"Indexed content from {url}.") + except Exception as e: + print(f"Failed to index content from {url} in Weaviate: {str(e)}") + +# Example usage +if __name__ == "__main__": + config = ClientConfig() + url = "http://example.com/data" + bucket_name = "example-bucket" + hydrate_data(url, bucket_name, config) + +""" + +minio_langchain_tools.py=""" +from llama_index.tools import FunctionTool +from minio import Minio +from minio.error import S3Error + +# Initialize Minio client +minio_client = Minio( + os.getenv("MINIO_ENDPOINT", "play.min.io"), + access_key=os.getenv("MINIO_ACCESS_KEY", "minioadmin"), + secret_key=os.getenv("MINIO_SECRET_KEY", "minioadmin"), + secure=True +) + +# Function to create a bucket +def create_bucket(bucket_name): + try: + minio_client.make_bucket(bucket_name) + return f"Bucket {bucket_name} created successfully." + except S3Error as e: + return f"Failed to create bucket {bucket_name}: {str(e)}" + +# Function to upload a file +def upload_file(bucket_name, file_path, object_name): + try: + minio_client.fput_object(bucket_name, object_name, file_path) + return f"File {file_path} uploaded as {object_name} in bucket {bucket_name}." + except S3Error as e: + return f"Failed to upload file {file_path} to bucket {bucket_name}: {str(e)}" + +# Function to list buckets +def list_buckets(): + try: + buckets = minio_client.list_buckets() + bucket_list = [bucket.name for bucket in buckets] + return f"Buckets: {', '.join(bucket_list)}" + except S3Error as e: + return f"Failed to list buckets: {str(e)}" + +# Function to delete a file +def delete_file(bucket_name, object_name): + try: + minio_client.remove_object(bucket_name, object_name) + return f"File {object_name} deleted from bucket {bucket_name}." + except S3Error as e: + return f"Failed to delete file {object_name} from bucket {bucket_name}: {str(e)}" + +# Creating FunctionTool objects for each operation +create_bucket_tool = FunctionTool.from_defaults( + fn=create_bucket, + name="create_bucket", + description="Creates a new bucket in Minio" +) + +upload_file_tool = FunctionTool.from_defaults( + fn=upload_file, + name="upload_file", + description="Uploads a file to a specified bucket in Minio" +) + +list_buckets_tool = FunctionTool.from_defaults( + fn=list_buckets, + name="list_buckets", + description="Lists all buckets in Minio" +) + +delete_file_tool = FunctionTool.from_defaults( + fn=delete_file, + name="delete_file", + description="Deletes a specified file from a bucket in Minio" +) + +# Example: Saving the tools in a list or dict for access +minio_tools = [create_bucket_tool, upload_file_tool, list_buckets_tool, delete_file_tool] + +# You can access and call any tool from minio_tools based on your application's logic. +""" + +langchain_integration.py=""" +from pydantic import BaseModel +import requests + +class LangChainQueryModel(BaseModel): + query: str + context: dict + +def execute_langchain_query(query: str, context: dict = None) -> str: + """ + Executes a query using LangChain and returns the response. + + Args: + query (str): The query to be executed. + context (dict, optional): Additional context for the query execution. + + Returns: + str: The result of the query execution. + """ + # Placeholder for LangChain API endpoint + langchain_endpoint = "https://api.langchain.com/query" + + # Prepare the payload + payload = LangChainQueryModel(query=query, context=context or {}).dict() + + try: + # Execute the query + response = requests.post(langchain_endpoint, json=payload) + + # Check for successful request + if response.status_code == 200: + # Assuming the response contains a JSON with a key 'result' + return response.json().get('result', 'No result found.') + else: + return f"LangChain query failed with status code {response.status_code}" + except Exception as e: + return f"Failed to execute LangChain query: {str(e)}" + +# Example usage +if __name__ == "__main__": + query = "What is the capital of France?" + result = execute_langchain_query(query) + print(result) +""" + +agent_control_integration.py=""" +from pydantic import BaseModel +import json + +class AgentActionModel(BaseModel): + action: str + parameters: dict + +def execute_agent_action(action: str, parameters: dict = None) -> str: + """ + Simulates an agent action execution based on the specified action and parameters. + + Args: + action (str): The action to be executed by the agent. + parameters (dict, optional): Parameters for the action execution. + + Returns: + str: A message indicating the result of the action execution. + """ + # Simulate action processing + # In a real scenario, this function would interact with an agent control system + action_details = AgentActionModel(action=action, parameters=parameters or {}) + print(f"Executing action: {action_details.action} with parameters: {json.dumps(action_details.parameters)}") + + # Placeholder for action execution result + return f"Action '{action}' executed successfully." + +# Example usage +if __name__ == "__main__": + action = "retrieve" + parameters = {"key": "value"} + result = execute_agent_action(action, parameters) + print(result) +""" + diff --git a/app/main.py b/app/m-2.py.old similarity index 100% rename from app/main.py rename to app/m-2.py.old diff --git a/app/main.py-3.old b/app/main.py-3.old new file mode 100644 index 0000000..3fb9b2a --- /dev/null +++ b/app/main.py-3.old @@ -0,0 +1,171 @@ +import asyncio +import os +import re +import io +import tempfile +import uuid +from typing import List +from concurrent.futures import ThreadPoolExecutor +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel, HttpUrl +from minio import Minio +import requests +from unstructured.partition.auto import partition +from docker import from_env as docker_from_env +from langchain_openai import ChatOpenAI +from langchain.agents import tool, AgentExecutor +from langchain.agents.format_scratchpad.openai_tools import format_to_openai_tool_messages +from langchain.agents.output_parsers.openai_tools import OpenAIToolsAgentOutputParser +from langchain_core.messages import AIMessage, HumanMessage +from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder +from langchain_core.runnables import RunnableLambda +import uvicorn + +from minio_langchain_tools import upload_file_to_minio, download_file_from_minio, list_objects_in_minio_bucket +from hydrate_integration import hydrate_data +from langchain_integration import execute_langchain_query +from agent_control_integration import execute_agent_action + +class MinIOConfig(BaseModel): + endpoint: str + access_key: str + secret_key: str + secure: bool = True + +class DockerConfig(BaseModel): + image: str + command: str + volumes: dict + detach: bool = True + remove: bool = True + +class URLItem(BaseModel): + url: HttpUrl + +class Document(BaseModel): + source: str + content: str + +class ScriptExecutionRequest(BaseModel): + bucket_name: str + script_name: str + +class LangChainInput(BaseModel): + input_text: str + +def sanitize_url_to_object_name(url: str) -> str: + clean_url = re.sub(r'^https?://', '', url) + clean_url = re.sub(r'[^\w\-_\.]', '_', clean_url) + return clean_url[:250] + '.txt' + +def prepare_text_for_tokenization(text: str) -> str: + clean_text = re.sub(r'\s+', ' ', text).strip() + return clean_text + +class MinIOOperations: + def __init__(self, config: MinIOConfig): + self.client = Minio(config.endpoint, access_key=config.access_key, secret_key=config.secret_key, secure=config.secure) + + async def store_object(self, bucket_name: str, object_name: str, file_path: str): + self.client.fput_object(bucket_name, object_name, file_path) + + async def retrieve_object(self, bucket_name: str, object_name: str, file_path: str): + self.client.fget_object(bucket_name, object_name, file_path) + +class DockerOperations: + def __init__(self, config: DockerConfig): + self.client = docker_from_env() + self.config = config + + async def execute_script(self, script_content: str): + with tempfile.NamedTemporaryFile(delete=False, suffix=".py") as script_file: + script_file.write(script_content.encode()) + script_path = script_file.name + container_name = f"script_execution_{uuid.uuid4()}" + container = self.client.containers.run( + image=self.config.image, + command=f"python {script_path}", + volumes=self.config.volumes, + detach=self.config.detach, + remove=self.config.remove, + name=container_name + ) + return container_name + +class MinIOSystemOrchestrator: + def __init__(self, minio_config: MinIOConfig, docker_config: DockerConfig): + self.minio_ops = MinIOOperations(minio_config) + self.docker_ops = DockerOperations(docker_config) + + async def execute_url_etl_pipeline(self, urls: List[URLItem]): + async with ThreadPoolExecutor() as executor: + loop = asyncio.get_running_loop() + tasks = [loop.run_in_executor(executor, self.process_url, url) for url in urls] + await asyncio.gather(*tasks) + + def process_url(self, url_item: URLItem): + response = requests.get(url_item.url) + response.raise_for_status() + html_content = io.BytesIO(response.content) + elements = partition(file=html_content, content_type="text/html") + combined_text = "\n".join([e.text for e in elements if hasattr(e, 'text')]) + combined_text = prepare_text_for_tokenization(combined_text) + object_name = sanitize_url_to_object_name(url_item.url) + with tempfile.NamedTemporaryFile(delete=False, mode="w", encoding="utf-8", suffix=".txt") as tmp_file: + tmp_file.write(combined_text) + tmp_file_path = tmp_file.name + asyncio.run(self.minio_ops.store_object("your_bucket_name", object_name, tmp_file_path)) + os.remove(tmp_file_path) + + async def execute_script_in_docker(self, execution_request: ScriptExecutionRequest): + response = self.minio_ops.client.get_object(execution_request.bucket_name, execution_request.script_name) + script_content = response.read().decode('utf-8') + container_name = await self.docker_ops.execute_script(script_content) + return {"message": "Execution started", "container_name": container_name} + +app = FastAPI() + +@app.get("/") +def read_root(settings: Settings = Depends(get_settings)): + return {"Hello": "World", "MinIO Endpoint": settings.minio_endpoint} + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=8000) + +@app.post("/execute/{bucket_name}/{script_name}") +async def execute_script(bucket_name: str, script_name: str): + orchestrator = MinIOSystemOrchestrator( + MinIOConfig(endpoint="play.min.io:443", access_key="minioadmin", secret_key="minioadmin"), + DockerConfig(image="python:3.8-slim", command="", volumes={}, detach=True, remove=True) + ) + container_name = await orchestrator.execute_script_in_docker(ScriptExecutionRequest(bucket_name=bucket_name, script_name=script_name)) + return {"message": "Execution started", "container_name": container_name} + +llm = ChatOpenAI(api_key=os.getenv("OPENAI_API_KEY")) + +@app.post("/langchain-execute/") +async def execute_langchain_integration(input: LangChainInput): + result = await execute_langchain_query(input.input_text) + return {"message": "LangChain processing completed", "result": result} + +@app.post("/hydrate-data/") +async def hydrate_data_endpoint(input: dict): + result = await hydrate_data(input) + return {"message": "Data hydration completed", "result": result} + +@app.post("/agent-action/") +async def execute_agent_action_endpoint(input: dict): + result = await execute_agent_action(input) + return {"message": "Agent action executed", "result": result} + +@app.post("/minio-webhook/") +async def handle_minio_webhook(event: dict): + event_name = event.get("EventName") + if event_name == "s3:ObjectCreated:Put": + pass + elif event_name == "s3:ObjectRemoved:Delete": + pass + return {"message": "Webhook event processed successfully"} + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file