Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/media-gen/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.mp4 filter=lfs diff=lfs merge=lfs -text
3 changes: 3 additions & 0 deletions examples/media-gen/integration_tests/test_video.mp4
Git LFS file not shown
94 changes: 94 additions & 0 deletions examples/media-gen/integration_tests/test_video_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#!/usr/bin/env python3
"""
Integration test for video screenshot extraction utilities.

This script demonstrates video screenshot extraction functionality using the test video.
It extracts screenshots every 10 seconds and saves them to ~/Downloads.

Usage:
python integration_tests/test_video_utils.py

Requirements:
- OpenCV (opencv-python)
- Test video file: integration_tests/test_video.mp4
"""

import os
import sys

from pathlib import Path

# Add parent directory to path for imports
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))

from utils.video_utils import VideoScreenshotExtractor, extract_screenshots


def main():
"""Extract screenshots from the test video every 10 seconds."""
print("🎬 Video Screenshot Extraction Test")
print("=" * 50)

# Path to test video
test_video_path = Path(__file__).parent / "test_video.mp4"

if not test_video_path.exists():
print(f"❌ Test video not found at: {test_video_path}")
print("Please ensure test_video.mp4 exists in the integration_tests directory")
return

print(f"✅ Test video found: {test_video_path}")
print(f"📏 File size: {test_video_path.stat().st_size:,} bytes")

# Get video properties
with VideoScreenshotExtractor(str(test_video_path)) as extractor:
print(f"🎥 Video properties:")
print(f" - FPS: {extractor.fps:.2f}")
print(f" - Duration: {extractor.duration:.2f} seconds")
print(f" - Frame count: {extractor.frame_count:,}")
print()

# Extract screenshots every 5 seconds, starting from 1 second
print("🔄 Extracting screenshots every 5 seconds (starting from 1s)...")
print("-" * 50)

try:
screenshots = extract_screenshots(
video_path=str(test_video_path),
interval_seconds=5.0,
start_time=1.0, # Start from 1 second instead of 0
output_dir="~/Downloads/polymind_video_screenshots",
filename_prefix="test_video_5s"
)

print(f"✅ Extracted {len(screenshots)} screenshots")
for i, ss in enumerate(screenshots):
print(f" {i+1}. Frame {ss.frame_number} at {ss.timestamp_str}")
print()

except Exception as e:
print(f"❌ Screenshot extraction failed: {e}")
return

# Summary
print("📊 Summary")
print("=" * 50)
downloads_dir = Path.home() / "Downloads" / "polymind_video_screenshots"

if downloads_dir.exists():
screenshot_files = list(downloads_dir.glob("test_video_10s_*.jpg"))
print(f"✅ Screenshots saved to: {downloads_dir}")
print(f"📁 Files created: {len(screenshot_files)}")

if screenshot_files:
print(f"📄 File pattern: test_video_10s_*.jpg")
print(f"📏 Sample file size: {screenshot_files[0].stat().st_size:,} bytes")
else:
print("❌ No screenshots were created")

print("\n🎯 Test completed!")
print("💡 You can now view the extracted screenshots in your Downloads folder")


if __name__ == "__main__":
main()
6 changes: 5 additions & 1 deletion examples/media-gen/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ python-dotenv # For environment variable management
anthropic # For Claude integration
openai # For OpenAI integration
replicate # For Replicate integration
requests # For downloading images from URLs
requests # For downloading images from URLs

# Video processing dependencies
opencv-python==4.8.1.78 # For video screenshot extraction (compatible with numpy 1.26.0)
numpy==1.26.0 # Required by polymind and compatible with opencv-python 4.8.1.78
10 changes: 10 additions & 0 deletions examples/media-gen/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""
Utility functions for media generation and processing.

This package contains helper functions for video processing, image manipulation,
and other media-related utilities.
"""

from .video_utils import extract_screenshots, VideoScreenshotExtractor

__all__ = ['extract_screenshots', 'VideoScreenshotExtractor']
188 changes: 188 additions & 0 deletions examples/media-gen/utils/video_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
"""
Simple video screenshot utility.

Extract screenshots from video files at regular time intervals.
"""

import logging
import os
from dataclasses import dataclass
from datetime import timedelta
from typing import List, Optional

import cv2
from pathlib import Path

logger = logging.getLogger(__name__)


def _expand_path(path: str) -> Path:
"""
Expand a path, handling tilde (~) for home directory.

Args:
path: Path string that may contain tilde

Returns:
Expanded Path object
"""
return Path(os.path.expanduser(path))


@dataclass
class ScreenshotInfo:
"""Information about an extracted screenshot."""
frame_number: int
timestamp: float # seconds
timestamp_str: str # formatted as HH:MM:SS
file_path: str


class VideoScreenshotExtractor:
"""
Extract screenshots from video files at regular intervals.

Simple utility to take screenshots every X seconds from a video file.
"""

def __init__(self, video_path: str):
"""
Initialize the screenshot extractor.

Args:
video_path: Path to the video file
"""
self.video_path = Path(video_path)
if not self.video_path.exists():
raise FileNotFoundError(f"Video file not found: {video_path}")

self.cap = cv2.VideoCapture(str(self.video_path))
if not self.cap.isOpened():
raise ValueError(f"Could not open video file: {video_path}")

# Get video properties
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
self.frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.duration = self.frame_count / self.fps if self.fps > 0 else 0

logger.info(
f"Video loaded: {self.fps:.2f} FPS, "
f"{self.frame_count} frames, "
f"{self.duration:.2f}s duration"
)

def __enter__(self):
"""Context manager entry."""
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - release video capture."""
self.release()

def release(self) -> None:
"""Release the video capture object."""
if self.cap:
self.cap.release()

def _format_timestamp(self, seconds: float) -> str:
"""Format seconds into HH:MM:SS string."""
return str(timedelta(seconds=int(seconds)))

def extract_screenshots(
self,
interval_seconds: float = 2.0,
start_time: float = 0.0,
output_dir: Optional[str] = "~/Downloads",
filename_prefix: str = "screenshot"
) -> List[ScreenshotInfo]:
"""
Extract screenshots at regular time intervals.

Args:
interval_seconds: Time interval between screenshots (default: 2.0)
start_time: Start time in seconds (default: 0.0)
output_dir: Directory to save screenshots (default: ~/Downloads)
filename_prefix: Prefix for saved files

Returns:
List of ScreenshotInfo objects
"""
if interval_seconds <= 0:
raise ValueError("Interval must be positive")
if start_time < 0:
raise ValueError("Start time must be non-negative")

screenshots = []
frame_interval = int(self.fps * interval_seconds)
start_frame = int(start_time * self.fps)

# Handle output directory with tilde expansion
output_path = _expand_path(output_dir) if output_dir else _expand_path("~/Downloads")
output_path.mkdir(parents=True, exist_ok=True)

for frame_num in range(start_frame, self.frame_count, frame_interval):
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
ret, frame = self.cap.read()

if not ret:
break

timestamp = frame_num / self.fps
timestamp_str = self._format_timestamp(timestamp)

screenshot_info = ScreenshotInfo(
frame_number=frame_num,
timestamp=timestamp,
timestamp_str=timestamp_str,
file_path=""
)

# Always save screenshots since we have a default output directory
# Use timestamp-based filename for chronological sorting
timestamp_seconds = int(timestamp)
filename = (f"{filename_prefix}_{timestamp_seconds:06d}s_"
f"{timestamp_str.replace(':', '-')}.jpg")
file_path = output_path / filename
cv2.imwrite(str(file_path), frame)
screenshot_info.file_path = str(file_path)

screenshots.append(screenshot_info)
logger.debug(
f"Extracted screenshot at {timestamp_str} "
f"(frame {frame_num})"
)

logger.info(
f"Extracted {len(screenshots)} screenshots "
f"at {interval_seconds}s intervals"
)
return screenshots


def extract_screenshots(
video_path: str,
interval_seconds: float = 2.0,
start_time: float = 0.0,
output_dir: Optional[str] = "~/Downloads",
filename_prefix: str = "screenshot"
) -> List[ScreenshotInfo]:
"""
Convenience function to extract screenshots from a video file.

Args:
video_path: Path to the video file
interval_seconds: Time interval between screenshots (default: 2.0)
start_time: Start time in seconds (default: 0.0)
output_dir: Directory to save screenshots (default: ~/Downloads)
filename_prefix: Prefix for saved files

Returns:
List of ScreenshotInfo objects
"""
with VideoScreenshotExtractor(video_path) as extractor:
return extractor.extract_screenshots(
interval_seconds=interval_seconds,
start_time=start_time,
output_dir=output_dir,
filename_prefix=filename_prefix
)
Loading