diff --git a/examples/media-gen/README.md b/examples/media-gen/README.md index f2cde22..02b96aa 100644 --- a/examples/media-gen/README.md +++ b/examples/media-gen/README.md @@ -166,10 +166,48 @@ The pipeline uses a modular design with two main components: - Simple two-parameter API - Path expansion support +## Video Generation + +The media generation framework now includes video generation capabilities using the Replicate WAN 2.2 i2v fast model. This allows you to generate videos from images and text prompts. + +### Video Generation Example + +```python +from tools.replicate_video_gen import ReplicateVideoGen + +# Initialize the video generation tool +video_gen = ReplicateVideoGen() + +# Generate video from image and text prompt +result = video_gen.run({ + "image": "path/to/your/image.jpg", + "prompt": "A serene landscape with gentle movement and natural lighting", + "output_folder": "~/Downloads/polymind_videos", + "output_format": "mp4" +}) + +print(f"Video saved to: {result['video_path']}") +``` + +### Video Generation Parameters + +- **image**: Image path, URL, or data URI (required) +- **prompt**: Text description of the desired video (required) +- **output_folder**: Folder path where to save the video (optional, default: "~/Downloads") +- **output_format**: Output format (optional, default: "mp4") +- **model**: Replicate model to use (optional, overrides default) + +### Testing Video Generation + +Run the video generation integration test: + +```bash +python integration_tests/test_replicate_video_gen.py +``` + ## Future Extensions The modular design allows easy extension to other media types: -- **Image to Video**: Add video generation step - **Video Understanding**: Add video analysis capabilities - **Multi-modal**: Support for text, audio, and other media @@ -184,9 +222,15 @@ media-gen/ ├── tools/ # Media generation tools │ ├── image_understanding_tool.py │ ├── openai_image_gen.py +│ ├── replicate_image_gen.py +│ ├── replicate_video_gen.py │ ├── dummy_image_gen.py +│ ├── dummy_video_gen.py │ └── media_gen_tool_base.py -├── integration_tests/ # Test files and examples +├── tests/ # Test files +│ └── test_replicate_video_gen.py +├── integration_tests/ # Integration test files and examples +│ └── test_replicate_video_gen.py └── ~/Downloads/ # Default output location ``` diff --git a/examples/media-gen/integration_tests/test_replicate_video_gen.py b/examples/media-gen/integration_tests/test_replicate_video_gen.py new file mode 100644 index 0000000..8ab9349 --- /dev/null +++ b/examples/media-gen/integration_tests/test_replicate_video_gen.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +""" +Simple script to generate videos using Replicate's WAN 2.2 i2v fast model. + +Usage: + python integration_tests/test_replicate_video_gen.py [image_path] [prompt] \ + [--timeout SECONDS] [--progress-interval SECONDS] + +Examples: + python integration_tests/test_replicate_video_gen.py + python integration_tests/test_replicate_video_gen.py test_image.png \ + "animals playing football" + python integration_tests/test_replicate_video_gen.py /path/to/image.jpg \ + "a magical forest scene" + python integration_tests/test_replicate_video_gen.py test_image.png \ + "magical scene" --timeout 300 --progress-interval 10 + +Requirements: +- REPLICATE_API_TOKEN environment variable set +- Default test image: integration_tests/test_image.png +""" + +import os +import sys + +from dotenv import load_dotenv +from pathlib import Path + +# Add parent directory to path for imports +sys.path.append(os.path.join(os.path.dirname(__file__), '..')) + +# Load environment variables from .env file +load_dotenv(os.path.join(os.path.dirname(__file__), '..', '.env')) + +from tools.replicate_video_gen import ReplicateVideoGen + + +def main(): + """Generate a video from an image and prompt using Replicate.""" + # Check for API token + if not os.getenv("REPLICATE_API_TOKEN"): + print("❌ REPLICATE_API_TOKEN not found in environment variables") + print("Please set: export REPLICATE_API_TOKEN='your_token_here'") + sys.exit(1) + + # Parse command line arguments + if len(sys.argv) > 1: + image_path = sys.argv[1] + # If it's a relative path, make it relative to the script directory + if not Path(image_path).is_absolute(): + image_path = Path(__file__).parent / image_path + else: + # Default to test image + image_path = Path(__file__).parent / "test_image.png" + + if len(sys.argv) > 2: + prompt = sys.argv[2] + else: + # Default prompt + prompt = "the animals standup and start playing football" + + # Parse optional timeout and progress interval + timeout = 600 # 10 minutes default + progress_interval = 5 # 5 seconds default + + # Simple argument parsing for timeout and progress interval + for i, arg in enumerate(sys.argv[3:], 3): + if arg == "--timeout" and i + 1 < len(sys.argv): + timeout = int(sys.argv[i + 1]) + elif arg == "--progress-interval" and i + 1 < len(sys.argv): + progress_interval = int(sys.argv[i + 1]) + + # Validate image path + if not Path(image_path).exists(): + print(f"❌ Image not found: {image_path}") + sys.exit(1) + + print(f"🎬 Generating video from: {image_path}") + print(f"📝 Prompt: {prompt}") + print("📁 Output: ~/Downloads/polymind_video_generation/") + print("-" * 60) + + # Initialize and run video generation + video_gen = ReplicateVideoGen() + + # Debug: Check if image exists and get its size + image_path_obj = Path(image_path) + if image_path_obj.exists(): + size_mb = image_path_obj.stat().st_size / (1024 * 1024) + print(f"📏 Input image size: {size_mb:.2f} MB") + else: + print(f"❌ Image file not found: {image_path}") + sys.exit(1) + + # Expand the output folder path + output_folder = os.path.expanduser("~/Downloads/polymind_video_generation") + + try: + result = video_gen.run({ + "image": str(image_path), + "prompt": prompt, + "output_folder": output_folder, + "output_format": "mp4", + "timeout": timeout, + "progress_interval": progress_interval + }) + + if result["video_path"]: + print("✅ Video generated successfully!") + print(f"📁 Saved to: {result['video_path']}") + + # Show file size if available + video_path = Path(result["video_path"]) + if video_path.exists(): + size_mb = video_path.stat().st_size / (1024 * 1024) + print(f"📏 File size: {size_mb:.1f} MB") + else: + print(f"❌ Generation failed: {result['generation_info']}") + sys.exit(1) + + except Exception as e: + print(f"❌ Error: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/media-gen/tests/test_replicate_video_gen.py b/examples/media-gen/tests/test_replicate_video_gen.py new file mode 100644 index 0000000..2255bce --- /dev/null +++ b/examples/media-gen/tests/test_replicate_video_gen.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +""" +Test script for Replicate video generation tool using WAN 2.2 i2v fast model. + +This script demonstrates how to use the ReplicateVideoGen tool to generate +videos from images and text prompts using the WAN 2.2 i2v fast model. + +Usage: + python tests/test_replicate_video_gen.py + +Requirements: +- Replicate API token set in environment variables +- Test image file: tests/test_image.png +""" + +import base64 +import os +import sys + +from pathlib import Path + +# Add parent directory to path for imports +sys.path.append(os.path.join(os.path.dirname(__file__), '..')) + +from tools.replicate_video_gen import ReplicateVideoGen + + +def test_replicate_video_generation(): + """Test the Replicate video generation functionality.""" + print("🎬 Replicate Video Generation Test") + print("=" * 50) + + # Check for Replicate API token + if not os.getenv("REPLICATE_API_TOKEN"): + print("❌ REPLICATE_API_TOKEN not found in environment variables") + print("Please set your Replicate API token:") + print("export REPLICATE_API_TOKEN='your_token_here'") + return False + + # Path to test image + test_image_path = Path(__file__).parent / "test_image.png" + + if not test_image_path.exists(): + print(f"❌ Test image not found at: {test_image_path}") + print("Please ensure test_image.png exists in the tests directory") + return False + + print(f"✅ Test image found: {test_image_path}") + print(f"📏 File size: {test_image_path.stat().st_size:,} bytes") + + # Initialize the video generation tool + video_gen = ReplicateVideoGen() + + # Test parameters + test_prompt = ( + "Close-up shot of an elderly sailor wearing a yellow raincoat, " + "seated on the deck of a catamaran, slowly puffing on a pipe. " + "His cat lies quietly beside him with eyes closed, enjoying the " + "calm. The warm glow of the setting sun bathes the scene, with " + "gentle waves lapping against the hull and a few seabirds " + "circling slowly above. The camera slowly pushes in, capturing " + "this peaceful and harmonious moment." + ) + + print(f"📝 Test prompt: {test_prompt[:100]}...") + print() + + # Generate video + print("🔄 Generating video from image and text prompt...") + print("-" * 50) + + try: + result = video_gen.run({ + "image": str(test_image_path), + "prompt": test_prompt, + "output_folder": "~/Downloads/polymind_video_generation", + "output_format": "mp4" + }) + + if result["video_path"]: + print(f"✅ Video generated successfully!") + print(f"📁 Video saved to: {result['video_path']}") + print(f"📊 Generation info: {result['generation_info']}") + + # Check if file exists and get size + video_path = Path(result["video_path"]) + if video_path.exists(): + print(f"📏 Video file size: {video_path.stat().st_size:,} bytes") + else: + print("⚠️ Video file not found at expected location") + + return True + else: + print(f"❌ Video generation failed: {result['generation_info']}") + return False + + except Exception as e: + print(f"❌ Video generation failed with exception: {e}") + return False + + +def test_with_data_uri(): + """Test video generation using data URI for image input.""" + print("\n🔄 Testing with data URI image input...") + print("-" * 50) + + # Path to test image + test_image_path = Path(__file__).parent / "test_image.png" + + if not test_image_path.exists(): + print("❌ Test image not found for data URI test") + return False + + # Convert image to data URI + with open(test_image_path, 'rb') as file: + data = base64.b64encode(file.read()).decode('utf-8') + data_uri = f"data:application/octet-stream;base64,{data}" + + print(f"✅ Converted image to data URI ({len(data_uri)} chars)") + + # Initialize the video generation tool + video_gen = ReplicateVideoGen() + + # Test parameters + test_prompt = "A serene landscape with gentle movement and natural lighting" + + try: + result = video_gen.run({ + "image": data_uri, + "prompt": test_prompt, + "output_folder": "~/Downloads/polymind_video_generation", + "output_format": "mp4" + }) + + if result["video_path"]: + print(f"✅ Video generated successfully with data URI!") + print(f"📁 Video saved to: {result['video_path']}") + return True + else: + print(f"❌ Video generation failed: {result['generation_info']}") + return False + + except Exception as e: + print(f"❌ Video generation failed with exception: {e}") + return False + + +def main(): + """Run all video generation tests.""" + print("🎬 Replicate Video Generation Tool Tests") + print("=" * 60) + + # Test 1: Basic video generation + success1 = test_replicate_video_generation() + + # Test 2: Data URI input + success2 = test_with_data_uri() + + # Summary + print("\n📊 Test Summary") + print("=" * 60) + print(f"✅ Basic video generation: {'PASS' if success1 else 'FAIL'}") + print(f"✅ Data URI input: {'PASS' if success2 else 'FAIL'}") + + if success1 and success2: + print("\n🎉 All tests passed!") + else: + print("\n⚠️ Some tests failed. Check the output above for details.") + + print("\n💡 Generated videos are saved to ~/Downloads/polymind_video_generation/") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/media-gen/tools/__init__.py b/examples/media-gen/tools/__init__.py index c845ece..182b08c 100644 --- a/examples/media-gen/tools/__init__.py +++ b/examples/media-gen/tools/__init__.py @@ -12,6 +12,7 @@ from .image_understanding_tool import ImageUnderstandingTool from .openai_image_gen import OpenAIImageGen from .replicate_image_gen import ReplicateImageGen +from .replicate_video_gen import ReplicateVideoGen __all__ = [ "ImageGenerationTool", @@ -20,5 +21,6 @@ "DummyVideoGen", "ImageUnderstandingTool", "OpenAIImageGen", - "ReplicateImageGen" + "ReplicateImageGen", + "ReplicateVideoGen" ] \ No newline at end of file diff --git a/examples/media-gen/tools/replicate_video_gen.py b/examples/media-gen/tools/replicate_video_gen.py new file mode 100644 index 0000000..e343871 --- /dev/null +++ b/examples/media-gen/tools/replicate_video_gen.py @@ -0,0 +1,295 @@ +""" +Replicate video generation tool using WAN 2.2 i2v fast model. + +This module provides a real implementation of a video generation tool using +Replicate's API with the WAN 2.2 i2v fast model. It integrates seamlessly +with the Polymind framework and supports image-to-video generation with +text prompts. +""" + +import base64 +import os +from typing import Union + +import replicate +import requests +from pathlib import Path + +from polymind.core.message import Message + +from .media_gen_tool_base import VideoGenerationTool + + +class ReplicateVideoGen(VideoGenerationTool): + """ + Replicate video generation tool using WAN 2.2 i2v fast model. + + This tool uses Replicate's API to generate videos from images and text + prompts using the WAN 2.2 i2v fast model. It supports various parameters + including image input (file path, URL, or data URI) and text prompts. + + Requires Replicate API token to be set in environment variables. + """ + + def __init__(self, model: str = "wan-video/wan-2.2-i2v-fast", **kwargs): + """ + Initialize the Replicate video generation tool. + + Args: + model (str): Replicate model identifier + (default: "wan-video/wan-2.2-i2v-fast") + **kwargs: Additional arguments passed to parent class + """ + super().__init__( + tool_name="replicate_video_generator", + descriptions=[ + f"Replicate video generation using {model}", + "Generate videos from images and text prompts", + "Supports image-to-video generation with WAN 2.2 i2v fast " + "model" + ], + **kwargs + ) + self._model = model + + def _prepare_image_input(self, image_input: Union[str, Path]) -> str: + """ + Prepare image input for Replicate API. + + Args: + image_input: Image path, URL, or data URI + + Returns: + str: Prepared image input for Replicate API + """ + image_str = str(image_input) + + # If it's already a data URI or URL, return as is + if image_str.startswith(('data:', 'http://', 'https://')): + return image_str + + # If it's a file path, convert to data URI for Replicate API + image_path = Path(image_str) + if not image_path.exists(): + raise FileNotFoundError(f"Image file not found: {image_path}") + + # Always convert to data URI for Replicate API + with open(image_path, 'rb') as file: + data = base64.b64encode(file.read()).decode('utf-8') + return f"data:application/octet-stream;base64,{data}" + + def run(self, input: dict) -> dict: + """ + Generate a video using Replicate WAN 2.2 i2v fast API with progress monitoring. + + Args: + input (dict): Input parameters containing: + - image: Image path, URL, or data URI (required) + - prompt: Text description of the desired video (required) + - output_folder: Folder path where to save the video + (optional, default: "~/Downloads") + - output_format: Output format (optional, default: "mp4") + - model: Replicate model to use (optional, overrides default) + - timeout: Timeout in seconds (optional, default: 300) + - progress_interval: Progress update interval in seconds (optional, default: 5) + + Returns: + dict: Dictionary containing: + - video_path: Path to the generated video file + - generation_info: Generation metadata + """ + # Extract parameters with defaults + image_input = input.get("image", "") + prompt = input.get("prompt", "") + output_folder = input.get( + "output_folder", str(Path.home() / "Downloads") + ) + output_format = input.get("output_format", "mp4") + model = input.get("model", self._model) + timeout = input.get("timeout", 300) # 5 minutes default + progress_interval = input.get("progress_interval", 5) # 5 seconds default + + if not image_input: + raise ValueError("Image input is required") + + if not prompt: + raise ValueError("Text prompt is required") + + # Generate dynamic video name with timestamp to avoid duplication + import datetime + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + base_name = f"replicate_generated_video_{timestamp}" + video_name = f"{base_name}.{output_format}" + + # Ensure unique filename + counter = 1 + full_path = f"{output_folder.rstrip('/')}/{video_name}" + while os.path.exists(full_path): + video_name = f"{base_name}_{counter}.{output_format}" + full_path = f"{output_folder.rstrip('/')}/{video_name}" + counter += 1 + + # Create full path + video_path = f"{output_folder.rstrip('/')}/{video_name}" + + try: + # Prepare image input + prepared_image = self._prepare_image_input(image_input) + + # Prepare input for Replicate + replicate_input = { + "image": prepared_image, + "prompt": prompt + } + + # Ensure directory exists + output_path = Path(video_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + + # Create prediction in background + import time + start_time = time.time() + + # Create prediction using the model string directly + prediction = replicate.predictions.create( + model=model, + input=replicate_input + ) + + print(f"🔄 Started video generation (ID: {prediction.id})") + + # Monitor progress with timeout + last_progress_time = start_time + while True: + # Check timeout + if time.time() - start_time > timeout: + prediction.cancel() + raise TimeoutError( + f"Video generation timed out after {timeout} seconds" + ) + + # Reload prediction to get latest status + prediction.reload() + + # Print progress updates + if time.time() - last_progress_time >= progress_interval: + elapsed = int(time.time() - start_time) + print(f"⏱️ Status: {prediction.status} (elapsed: {elapsed}s)") + if prediction.logs: + print(f"📝 Logs: {prediction.logs[-200:]}...") # Last 200 chars + last_progress_time = time.time() + + # Check if completed + if prediction.status == "succeeded": + print("✅ Video generation completed!") + break + elif prediction.status == "failed": + raise Exception(f"Video generation failed: {prediction.error}") + elif prediction.status == "canceled": + raise Exception("Video generation was canceled") + + # Wait before next check + time.sleep(2) + + # Download the result + if hasattr(prediction.output, 'read'): + # Output is a FileOutput object + with open(video_path, "wb") as file: + file.write(prediction.output.read()) + + return { + "video_path": video_path, + "generation_info": { + "model": model, + "prompt": prompt, + "image_input": str(image_input), + "format": output_format, + "status": "generated successfully", + "prediction_id": prediction.id, + "replicate_url": None, + "elapsed_time": int(time.time() - start_time) + } + } + elif isinstance(prediction.output, list) and len(prediction.output) > 0: + # Output is a list of URLs + video_url = prediction.output[0] + + # Download the video + response = requests.get(video_url) + response.raise_for_status() + + # Save the video + with open(video_path, "wb") as file: + file.write(response.content) + + return { + "video_path": video_path, + "generation_info": { + "model": model, + "prompt": prompt, + "image_input": str(image_input), + "format": output_format, + "status": "generated successfully", + "prediction_id": prediction.id, + "replicate_url": video_url, + "elapsed_time": int(time.time() - start_time) + } + } + elif isinstance(prediction.output, str): + # Output is a direct URL string + video_url = prediction.output + + # Download the video + response = requests.get(video_url) + response.raise_for_status() + + # Save the video + with open(video_path, "wb") as file: + file.write(response.content) + + return { + "video_path": video_path, + "generation_info": { + "model": model, + "prompt": prompt, + "image_input": str(image_input), + "format": output_format, + "status": "generated successfully", + "prediction_id": prediction.id, + "replicate_url": video_url, + "elapsed_time": int(time.time() - start_time) + } + } + else: + raise ValueError(f"Unexpected output format from Replicate: {type(prediction.output)}") + + except Exception as e: + return { + "video_path": "", + "generation_info": { + "model": model, + "prompt": prompt, + "image_input": str(image_input), + "error": str(e), + "status": "generation failed" + } + } + + async def _execute(self, input: Message) -> Message: + """ + Execute the Replicate video generation using the Polymind framework's Message system. + + Args: + input (Message): Input message containing generation parameters + + Returns: + Message: Output message with generated video information + """ + # Convert Message to dict for the run method + input_dict = input.content + + # Call the run method + result = self.run(input_dict) + + # Return result wrapped in a Message + return Message(content=result) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index d00a7c5..c2d6989 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "polymind" -version = "0.0.63" # Update this version before publishing to PyPI +version = "0.0.64" # Update this version before publishing to PyPI description = "PolyMind is a customizable collaborative multi-agent framework for collective intelligence and distributed problem solving." authors = ["TechTao"] license = "MIT License"