Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion media_gen/test_scripts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,51 @@ The test will show:
- This test is not run automatically with unit tests
- It requires a valid OpenAI API key
- It makes real API calls and may incur costs
- The test image should be placed in this folder
- The test image should be placed in this folder

## Video Concatenation Test

### Prerequisites
- OpenCV (cv2) installed
- Video files for testing
- Sufficient disk space for output videos

### Running the Test

```bash
# From the media-regen directory
uv run python media_gen/test_scripts/test_video_concatenation.py
```

### What it does
- Tests the new video concatenation functionality in `video_utils.py`
- Demonstrates both folder-based and list-based concatenation
- Shows how to concatenate videos in alphabetical order
- Handles different video formats and resolutions

### Features Tested
- **Folder-based concatenation**: Automatically finds and concatenates all videos in a folder
- **Alphabetical sorting**: Concatenates videos in alphabetical order by filename
- **Format support**: Handles .mp4, .avi, .mov, .mkv, .webm files
- **Resolution conversion**: Can resize videos to target resolution
- **FPS conversion**: Can adjust frame rate of output video
- **Error handling**: Graceful handling of missing files and processing errors

### Configuration
Update the test script to point to your video files:
- `folder_path`: Directory containing video files to concatenate
- `video_paths`: List of specific video file paths
- `output_path`: Where to save the concatenated video

### Expected Output
The test will show:
- 📁 List of video files found
- 🔄 Processing progress for each video
- ✅ Success/failure status
- 📊 Output video properties (duration, frames, resolution, FPS)

### Notes
- This test requires actual video files to be present
- Update the file paths in the script before running
- Output videos are saved in MP4 format using H.264 codec
- The test demonstrates both automatic folder scanning and manual file list approaches
137 changes: 137 additions & 0 deletions media_gen/test_scripts/test_video_concatenation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#!/usr/bin/env python3
"""
Video concatenation integration test.

This script demonstrates the video concatenation functionality by processing
videos from a specific folder and concatenating them in alphabetical order.

Requirements:
- OpenCV (cv2) installed
- Video files in the target folder
- Sufficient disk space for output videos
"""

import sys
from pathlib import Path

# Add the parent directory to the path to import media_gen modules
sys.path.insert(0, str(Path(__file__).parent.parent.parent))

try:
import cv2

from media_gen.utils.video_utils import concatenate_videos_from_folder
except ImportError as e:
if "cv2" in str(e):
print("❌ OpenCV package not installed. Please install it with:")
print(" pip install opencv-python")
sys.exit(1)
else:
raise


def main():
"""Concatenate videos from the specified folder."""
print("🎬 Video Concatenation Integration Test")
print("=" * 60)

# Target folder path
folder_path = "~/Downloads/video_regen_1754563125/generated_videos"
output_path = "~/Downloads/concatenated_videos.mp4"

# Expand user path
folder = Path(folder_path).expanduser()
output_file = Path(output_path).expanduser()

print(f"📁 Source folder: {folder.absolute()}")
print(f"📁 Output file: {output_file.absolute()}")
print()

# Check if folder exists
if not folder.exists():
print(f"❌ Error: Folder not found: {folder.absolute()}")
print("Please ensure the folder exists and contains video files.")
return

# List video files in folder
video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".webm"]
video_files = []
for ext in video_extensions:
video_files.extend(folder.glob(f"*{ext}"))
video_files.extend(folder.glob(f"*{ext.upper()}"))

if not video_files:
print(f"❌ Error: No video files found in {folder.absolute()}")
print("Supported formats: .mp4, .avi, .mov, .mkv, .webm")
return

# Sort files alphabetically
video_files = sorted(video_files, key=lambda x: x.name.lower())

print(f"✅ Found {len(video_files)} video files:")
for i, video_file in enumerate(video_files, 1):
file_size = video_file.stat().st_size
print(f" {i:2d}. {video_file.name} ({file_size:,} bytes)")
print()

# Create output directory if needed
output_file.parent.mkdir(parents=True, exist_ok=True)

# Get the resolution of the first video to maintain original size
first_video = video_files[0]
first_cap = cv2.VideoCapture(str(first_video))
original_width = int(first_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
original_height = int(first_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
original_fps = first_cap.get(cv2.CAP_PROP_FPS)
first_cap.release()

print("🔄 Starting video concatenation...")
print(" - Sorting: Alphabetical by filename")
print(f" - Target FPS: {original_fps:.1f} (from first video)")
print(f" - Target resolution: {original_width}x{original_height} (from first video)")
print()

try:
# Perform concatenation
result = concatenate_videos_from_folder(
folder_path=str(folder),
output_path=str(output_file),
sort_by_name=True, # Sort alphabetically
target_fps=original_fps, # Use first video's FPS
target_resolution=(original_width, original_height), # Use first video's resolution
)

# Display results
if result.success:
print("✅ Concatenation completed successfully!")
print()
print("📊 Output Video Properties:")
print(f" 📁 File: {result.output_path}")
print(f" ⏱️ Duration: {result.total_duration:.2f} seconds")
print(f" 🎞️ Frames: {result.frame_count:,}")
print(f" 📐 Resolution: {result.width}x{result.height}")
print(f" 🎬 FPS: {result.fps}")
print()

# Check if output file exists and show file size
if Path(result.output_path).exists():
file_size = Path(result.output_path).stat().st_size
print(f"📏 Output file size: {file_size:,} bytes ({file_size / 1024 / 1024:.1f} MB)")
else:
print("⚠️ Warning: Output file not found after concatenation")

else:
print("❌ Concatenation failed!")
print(f"Error: {result.error_message}")
return

except Exception as e:
print(f"❌ Error during concatenation: {e}")
return

print("\n✅ Integration test completed successfully!")
print(f"🎬 Your concatenated video is ready: {output_file.absolute()}")


if __name__ == "__main__":
main()
11 changes: 4 additions & 7 deletions media_gen/tools/replicate_image_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,13 @@ def run(self, input: dict) -> dict:

# Return format matching the base class interface
if len(generated_images) == 1:
# Single image - return in the expected format
return {"image_path": generated_images[0], "generation_info": generation_info[0] if generation_info else {}}
else:
# For multiple images, return the first one as primary and include all info
# Multiple images - return in pipeline format for batch processing
return {
"image_path": generated_images[0] if generated_images else "",
"generation_info": {
"all_images": generated_images,
"all_info": generation_info,
"count": len(generated_images),
},
"generated_image_paths": generated_images,
"image_generation_info": generation_info,
}

async def _execute(self, input: Message) -> Message:
Expand Down
62 changes: 44 additions & 18 deletions media_gen/tools/video_understanding_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,20 @@
import os
from typing import Any, ClassVar, Dict, List, Optional

# Load environment variables from .env file if it exists
try:
from dotenv import load_dotenv

load_dotenv()
except ImportError:
# dotenv not installed, continue without it
pass

from openai import OpenAI
from polymind.core.tool import BaseTool, Param
from polymind.core.utils import encode_image_to_base64

from ..utils.video_utils import ScreenshotInfo, extract_key_frames, extract_screenshots
from media_gen.utils.video_utils import ScreenshotInfo, extract_key_frames, extract_screenshots


class VideoUnderstandingTool(BaseTool):
Expand Down Expand Up @@ -82,6 +91,10 @@ def __init__(self, api_key: Optional[str] = None, model: str = "gpt-4o-mini", **
# Set the API key
api_key = api_key or os.getenv("OPENAI_API_KEY")
if not api_key:
print("❌ OpenAI API key not found!")
print(" Please check your .env file contains:")
print(" OPENAI_API_KEY=your_api_key_here")
print(" Or set the environment variable directly")
raise ValueError(
"OpenAI API key is required. Set OPENAI_API_KEY environment " "variable or pass api_key parameter."
)
Expand Down Expand Up @@ -272,26 +285,39 @@ def _analyze_screenshots(self, screenshots: List[ScreenshotInfo], user_preferenc
print(f"Warning: Failed to process screenshot {screenshot.file_path}: {e}")
continue

try:
# Call OpenAI API
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": content}],
max_tokens=2000,
response_format={"type": "json_object"},
)

analysis = response.choices[0].message.content
# Add retry logic for connection issues
max_retries = 3
retry_delay = 2 # seconds

# Parse JSON response
for attempt in range(max_retries):
try:
analysis_dict = json.loads(analysis)
return analysis_dict.get("scenes", [])
except json.JSONDecodeError:
raise RuntimeError("Failed to parse OpenAI response as JSON")
# Call OpenAI API
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": content}],
max_tokens=2000,
response_format={"type": "json_object"},
)

except Exception as e:
raise RuntimeError(f"Failed to analyze screenshots: {e}")
analysis = response.choices[0].message.content

# Parse JSON response
try:
analysis_dict = json.loads(analysis)
return analysis_dict.get("scenes", [])
except json.JSONDecodeError:
raise RuntimeError("Failed to parse OpenAI response as JSON")

except Exception as e:
if attempt < max_retries - 1:
print(f"⚠️ Attempt {attempt + 1} failed: {e}")
print(f" Retrying in {retry_delay} seconds...")
import time

time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
raise RuntimeError(f"Failed to analyze screenshots after {max_retries} attempts: {e}")

def run(self, input: dict) -> dict:
"""
Expand Down
Loading