diff --git a/media_gen/test_scripts/README.md b/media_gen/test_scripts/README.md index b1b5ccc..9821590 100644 --- a/media_gen/test_scripts/README.md +++ b/media_gen/test_scripts/README.md @@ -72,4 +72,51 @@ The test will show: - This test is not run automatically with unit tests - It requires a valid OpenAI API key - It makes real API calls and may incur costs -- The test image should be placed in this folder \ No newline at end of file +- The test image should be placed in this folder + +## Video Concatenation Test + +### Prerequisites +- OpenCV (cv2) installed +- Video files for testing +- Sufficient disk space for output videos + +### Running the Test + +```bash +# From the media-regen directory +uv run python media_gen/test_scripts/test_video_concatenation.py +``` + +### What it does +- Tests the new video concatenation functionality in `video_utils.py` +- Demonstrates both folder-based and list-based concatenation +- Shows how to concatenate videos in alphabetical order +- Handles different video formats and resolutions + +### Features Tested +- **Folder-based concatenation**: Automatically finds and concatenates all videos in a folder +- **Alphabetical sorting**: Concatenates videos in alphabetical order by filename +- **Format support**: Handles .mp4, .avi, .mov, .mkv, .webm files +- **Resolution conversion**: Can resize videos to target resolution +- **FPS conversion**: Can adjust frame rate of output video +- **Error handling**: Graceful handling of missing files and processing errors + +### Configuration +Update the test script to point to your video files: +- `folder_path`: Directory containing video files to concatenate +- `video_paths`: List of specific video file paths +- `output_path`: Where to save the concatenated video + +### Expected Output +The test will show: +- šŸ“ List of video files found +- šŸ”„ Processing progress for each video +- āœ… Success/failure status +- šŸ“Š Output video properties (duration, frames, resolution, FPS) + +### Notes +- This test requires actual video files to be present +- Update the file paths in the script before running +- Output videos are saved in MP4 format using H.264 codec +- The test demonstrates both automatic folder scanning and manual file list approaches \ No newline at end of file diff --git a/media_gen/test_scripts/test_video_concatenation.py b/media_gen/test_scripts/test_video_concatenation.py new file mode 100644 index 0000000..cf293b7 --- /dev/null +++ b/media_gen/test_scripts/test_video_concatenation.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +""" +Video concatenation integration test. + +This script demonstrates the video concatenation functionality by processing +videos from a specific folder and concatenating them in alphabetical order. + +Requirements: +- OpenCV (cv2) installed +- Video files in the target folder +- Sufficient disk space for output videos +""" + +import sys +from pathlib import Path + +# Add the parent directory to the path to import media_gen modules +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +try: + import cv2 + + from media_gen.utils.video_utils import concatenate_videos_from_folder +except ImportError as e: + if "cv2" in str(e): + print("āŒ OpenCV package not installed. Please install it with:") + print(" pip install opencv-python") + sys.exit(1) + else: + raise + + +def main(): + """Concatenate videos from the specified folder.""" + print("šŸŽ¬ Video Concatenation Integration Test") + print("=" * 60) + + # Target folder path + folder_path = "~/Downloads/video_regen_1754563125/generated_videos" + output_path = "~/Downloads/concatenated_videos.mp4" + + # Expand user path + folder = Path(folder_path).expanduser() + output_file = Path(output_path).expanduser() + + print(f"šŸ“ Source folder: {folder.absolute()}") + print(f"šŸ“ Output file: {output_file.absolute()}") + print() + + # Check if folder exists + if not folder.exists(): + print(f"āŒ Error: Folder not found: {folder.absolute()}") + print("Please ensure the folder exists and contains video files.") + return + + # List video files in folder + video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".webm"] + video_files = [] + for ext in video_extensions: + video_files.extend(folder.glob(f"*{ext}")) + video_files.extend(folder.glob(f"*{ext.upper()}")) + + if not video_files: + print(f"āŒ Error: No video files found in {folder.absolute()}") + print("Supported formats: .mp4, .avi, .mov, .mkv, .webm") + return + + # Sort files alphabetically + video_files = sorted(video_files, key=lambda x: x.name.lower()) + + print(f"āœ… Found {len(video_files)} video files:") + for i, video_file in enumerate(video_files, 1): + file_size = video_file.stat().st_size + print(f" {i:2d}. {video_file.name} ({file_size:,} bytes)") + print() + + # Create output directory if needed + output_file.parent.mkdir(parents=True, exist_ok=True) + + # Get the resolution of the first video to maintain original size + first_video = video_files[0] + first_cap = cv2.VideoCapture(str(first_video)) + original_width = int(first_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + original_height = int(first_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + original_fps = first_cap.get(cv2.CAP_PROP_FPS) + first_cap.release() + + print("šŸ”„ Starting video concatenation...") + print(" - Sorting: Alphabetical by filename") + print(f" - Target FPS: {original_fps:.1f} (from first video)") + print(f" - Target resolution: {original_width}x{original_height} (from first video)") + print() + + try: + # Perform concatenation + result = concatenate_videos_from_folder( + folder_path=str(folder), + output_path=str(output_file), + sort_by_name=True, # Sort alphabetically + target_fps=original_fps, # Use first video's FPS + target_resolution=(original_width, original_height), # Use first video's resolution + ) + + # Display results + if result.success: + print("āœ… Concatenation completed successfully!") + print() + print("šŸ“Š Output Video Properties:") + print(f" šŸ“ File: {result.output_path}") + print(f" ā±ļø Duration: {result.total_duration:.2f} seconds") + print(f" šŸŽžļø Frames: {result.frame_count:,}") + print(f" šŸ“ Resolution: {result.width}x{result.height}") + print(f" šŸŽ¬ FPS: {result.fps}") + print() + + # Check if output file exists and show file size + if Path(result.output_path).exists(): + file_size = Path(result.output_path).stat().st_size + print(f"šŸ“ Output file size: {file_size:,} bytes ({file_size / 1024 / 1024:.1f} MB)") + else: + print("āš ļø Warning: Output file not found after concatenation") + + else: + print("āŒ Concatenation failed!") + print(f"Error: {result.error_message}") + return + + except Exception as e: + print(f"āŒ Error during concatenation: {e}") + return + + print("\nāœ… Integration test completed successfully!") + print(f"šŸŽ¬ Your concatenated video is ready: {output_file.absolute()}") + + +if __name__ == "__main__": + main() diff --git a/media_gen/tools/replicate_image_gen.py b/media_gen/tools/replicate_image_gen.py index 8d8fd08..4698a71 100644 --- a/media_gen/tools/replicate_image_gen.py +++ b/media_gen/tools/replicate_image_gen.py @@ -181,16 +181,13 @@ def run(self, input: dict) -> dict: # Return format matching the base class interface if len(generated_images) == 1: + # Single image - return in the expected format return {"image_path": generated_images[0], "generation_info": generation_info[0] if generation_info else {}} else: - # For multiple images, return the first one as primary and include all info + # Multiple images - return in pipeline format for batch processing return { - "image_path": generated_images[0] if generated_images else "", - "generation_info": { - "all_images": generated_images, - "all_info": generation_info, - "count": len(generated_images), - }, + "generated_image_paths": generated_images, + "image_generation_info": generation_info, } async def _execute(self, input: Message) -> Message: diff --git a/media_gen/tools/video_understanding_tool.py b/media_gen/tools/video_understanding_tool.py index 03fa67a..a30e9ab 100644 --- a/media_gen/tools/video_understanding_tool.py +++ b/media_gen/tools/video_understanding_tool.py @@ -11,11 +11,20 @@ import os from typing import Any, ClassVar, Dict, List, Optional +# Load environment variables from .env file if it exists +try: + from dotenv import load_dotenv + + load_dotenv() +except ImportError: + # dotenv not installed, continue without it + pass + from openai import OpenAI from polymind.core.tool import BaseTool, Param from polymind.core.utils import encode_image_to_base64 -from ..utils.video_utils import ScreenshotInfo, extract_key_frames, extract_screenshots +from media_gen.utils.video_utils import ScreenshotInfo, extract_key_frames, extract_screenshots class VideoUnderstandingTool(BaseTool): @@ -82,6 +91,10 @@ def __init__(self, api_key: Optional[str] = None, model: str = "gpt-4o-mini", ** # Set the API key api_key = api_key or os.getenv("OPENAI_API_KEY") if not api_key: + print("āŒ OpenAI API key not found!") + print(" Please check your .env file contains:") + print(" OPENAI_API_KEY=your_api_key_here") + print(" Or set the environment variable directly") raise ValueError( "OpenAI API key is required. Set OPENAI_API_KEY environment " "variable or pass api_key parameter." ) @@ -272,26 +285,39 @@ def _analyze_screenshots(self, screenshots: List[ScreenshotInfo], user_preferenc print(f"Warning: Failed to process screenshot {screenshot.file_path}: {e}") continue - try: - # Call OpenAI API - response = self.client.chat.completions.create( - model=self.model, - messages=[{"role": "user", "content": content}], - max_tokens=2000, - response_format={"type": "json_object"}, - ) - - analysis = response.choices[0].message.content + # Add retry logic for connection issues + max_retries = 3 + retry_delay = 2 # seconds - # Parse JSON response + for attempt in range(max_retries): try: - analysis_dict = json.loads(analysis) - return analysis_dict.get("scenes", []) - except json.JSONDecodeError: - raise RuntimeError("Failed to parse OpenAI response as JSON") + # Call OpenAI API + response = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": content}], + max_tokens=2000, + response_format={"type": "json_object"}, + ) - except Exception as e: - raise RuntimeError(f"Failed to analyze screenshots: {e}") + analysis = response.choices[0].message.content + + # Parse JSON response + try: + analysis_dict = json.loads(analysis) + return analysis_dict.get("scenes", []) + except json.JSONDecodeError: + raise RuntimeError("Failed to parse OpenAI response as JSON") + + except Exception as e: + if attempt < max_retries - 1: + print(f"āš ļø Attempt {attempt + 1} failed: {e}") + print(f" Retrying in {retry_delay} seconds...") + import time + + time.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + else: + raise RuntimeError(f"Failed to analyze screenshots after {max_retries} attempts: {e}") def run(self, input: dict) -> dict: """ diff --git a/media_gen/utils/video_utils.py b/media_gen/utils/video_utils.py index 0058f05..89d86d1 100644 --- a/media_gen/utils/video_utils.py +++ b/media_gen/utils/video_utils.py @@ -612,3 +612,273 @@ def extract_frames( """ with VideoExtractor(video_path, mode, **kwargs) as extractor: return extractor.extract(output_dir, filename_prefix) + + +@dataclass +class ConcatenationInfo: + """Information about video concatenation process.""" + + input_videos: List[str] + output_path: str + total_duration: float + frame_count: int + width: int + height: int + fps: float + success: bool + error_message: Optional[str] = None + + +def concatenate_videos_from_folder( + folder_path: str, + output_path: str, + video_extensions: Optional[List[str]] = None, + sort_by_name: bool = True, + target_fps: Optional[float] = None, + target_resolution: Optional[tuple[int, int]] = None, +) -> ConcatenationInfo: + """ + Concatenate videos from a folder in alphabetical order. + + Args: + folder_path: Path to folder containing video files + output_path: Path for the concatenated output video + video_extensions: List of video file extensions to include (default: ['.mp4', '.avi', '.mov', '.mkv']) + sort_by_name: Whether to sort files alphabetically by name (default: True) + target_fps: Target FPS for output video (uses first video's FPS if None) + target_resolution: Target resolution (width, height) for output video (uses first video's resolution if None) + + Returns: + ConcatenationInfo object with process details + """ + if video_extensions is None: + video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".webm"] + + folder = Path(folder_path) + if not folder.exists(): + return ConcatenationInfo( + input_videos=[], + output_path=output_path, + total_duration=0.0, + frame_count=0, + width=0, + height=0, + fps=0.0, + success=False, + error_message=f"Folder not found: {folder_path}", + ) + + # Find all video files + video_files = [] + for ext in video_extensions: + video_files.extend(folder.glob(f"*{ext}")) + video_files.extend(folder.glob(f"*{ext.upper()}")) + + if not video_files: + return ConcatenationInfo( + input_videos=[], + output_path=output_path, + total_duration=0.0, + frame_count=0, + width=0, + height=0, + fps=0.0, + success=False, + error_message=f"No video files found in {folder_path}", + ) + + # Sort files alphabetically if requested + if sort_by_name: + video_files = sorted(video_files, key=lambda x: x.name.lower()) + + video_paths = [str(f) for f in video_files] + logger.info(f"Found {len(video_paths)} video files to concatenate") + + try: + # Get properties from first video + first_cap = cv2.VideoCapture(video_paths[0]) + if not first_cap.isOpened(): + raise ValueError(f"Could not open first video: {video_paths[0]}") + + # Use first video's properties as defaults + default_fps = first_cap.get(cv2.CAP_PROP_FPS) + default_width = int(first_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + default_height = int(first_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + first_cap.release() + + # Use provided targets or defaults + fps = target_fps if target_fps is not None else default_fps + width = target_resolution[0] if target_resolution else default_width + height = target_resolution[1] if target_resolution else default_height + + # Create output directory if needed + output_file = Path(output_path) + output_file.parent.mkdir(parents=True, exist_ok=True) + + # Initialize video writer + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) + + if not out.isOpened(): + raise ValueError(f"Could not create output video writer: {output_path}") + + total_frames = 0 + total_duration = 0.0 + + # Process each video + for video_path in video_paths: + logger.info(f"Processing video: {Path(video_path).name}") + + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + logger.warning(f"Could not open video: {video_path}") + continue + + video_fps = cap.get(cv2.CAP_PROP_FPS) + video_frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + video_duration = video_frame_count / video_fps if video_fps > 0 else 0 + + logger.info(f" Duration: {video_duration:.2f}s, FPS: {video_fps:.2f}, Frames: {video_frame_count}") + + frame_idx = 0 + while True: + ret, frame = cap.read() + if not ret: + break + + # Resize frame if target resolution is different + if frame.shape[1] != width or frame.shape[0] != height: + frame = cv2.resize(frame, (width, height)) + + # Handle frame rate differences + if video_fps != fps: + # Simple frame dropping/duplication for FPS conversion + if video_fps > fps: + # Drop frames + if frame_idx % int(video_fps / fps) == 0: + out.write(frame) + else: + # Duplicate frames + repeat_count = int(fps / video_fps) + for _ in range(repeat_count): + out.write(frame) + else: + out.write(frame) + + frame_idx += 1 + total_frames += 1 + + cap.release() + total_duration += video_duration + + out.release() + + # Calculate final properties + final_duration = total_frames / fps if fps > 0 else 0 + + logger.info( + f"Concatenation completed: {len(video_paths)} videos, " + f"{total_frames} frames, {final_duration:.2f}s duration" + ) + + return ConcatenationInfo( + input_videos=video_paths, + output_path=output_path, + total_duration=final_duration, + frame_count=total_frames, + width=width, + height=height, + fps=fps, + success=True, + ) + + except Exception as e: + logger.error(f"Error during video concatenation: {str(e)}") + return ConcatenationInfo( + input_videos=video_paths, + output_path=output_path, + total_duration=0.0, + frame_count=0, + width=0, + height=0, + fps=0.0, + success=False, + error_message=str(e), + ) + + +def concatenate_videos( + video_paths: List[str], + output_path: str, + target_fps: Optional[float] = None, + target_resolution: Optional[tuple[int, int]] = None, +) -> ConcatenationInfo: + """ + Concatenate a list of video files. + + Args: + video_paths: List of video file paths to concatenate + output_path: Path for the concatenated output video + target_fps: Target FPS for output video (uses first video's FPS if None) + target_resolution: Target resolution (width, height) for output video (uses first video's resolution if None) + + Returns: + ConcatenationInfo object with process details + """ + if not video_paths: + return ConcatenationInfo( + input_videos=[], + output_path=output_path, + total_duration=0.0, + frame_count=0, + width=0, + height=0, + fps=0.0, + success=False, + error_message="No video paths provided", + ) + + # Create a temporary folder structure to use the folder-based function + import shutil + import tempfile + + with tempfile.TemporaryDirectory() as temp_dir: + # Copy videos to temp directory with numbered names to preserve order + temp_video_paths = [] + for i, video_path in enumerate(video_paths): + video_file = Path(video_path) + if not video_file.exists(): + logger.warning(f"Video file not found: {video_path}") + continue + + # Create numbered filename to preserve order + temp_name = f"{i:04d}_{video_file.name}" + temp_path = Path(temp_dir) / temp_name + shutil.copy2(video_path, temp_path) + temp_video_paths.append(str(temp_path)) + + if not temp_video_paths: + return ConcatenationInfo( + input_videos=video_paths, + output_path=output_path, + total_duration=0.0, + frame_count=0, + width=0, + height=0, + fps=0.0, + success=False, + error_message="No valid video files found", + ) + + # Use the folder-based concatenation + result = concatenate_videos_from_folder( + folder_path=temp_dir, + output_path=output_path, + sort_by_name=True, # This will sort by the numbered filenames + target_fps=target_fps, + target_resolution=target_resolution, + ) + + # Update the input_videos to show original paths + result.input_videos = video_paths + return result diff --git a/media_gen/video_regen_pipeline.py b/media_gen/video_regen_pipeline.py index 63f2215..c8c3daf 100644 --- a/media_gen/video_regen_pipeline.py +++ b/media_gen/video_regen_pipeline.py @@ -2,9 +2,10 @@ Video regeneration pipeline. Command-line tool for regenerating videos: -1. Analyze original video using video understanding -2. Generate images for each scene using image generation +1. Analyze original video using video understanding (keyframe-based extraction) +2. Generate images for each scene using image generation (9:16 aspect ratio) 3. Generate videos from each image using video generation +4. Concatenate all videos into a final output Usage: python video_regen_pipeline.py --video-path @@ -33,10 +34,11 @@ # dotenv not installed, continue without it pass -from pipeline import MediaGenerationPipeline, PipelineStep, PipelineStepExecutor -from tools.replicate_image_gen import ReplicateImageGen -from tools.replicate_video_gen import ReplicateVideoGen -from tools.video_understanding_tool import VideoUnderstandingTool +from media_gen.pipeline import MediaGenerationPipeline, PipelineStep, PipelineStepExecutor +from media_gen.tools.replicate_image_gen import ReplicateImageGen +from media_gen.tools.replicate_video_gen import ReplicateVideoGen +from media_gen.tools.video_understanding_tool import VideoUnderstandingTool +from media_gen.utils.video_utils import concatenate_videos_from_folder def expand_path(path: str) -> str: @@ -131,7 +133,7 @@ def __init__( "output_format": "output_format", }, output_mapping={ - "generated_videos": "generated_video_paths", + "generated_video_paths": "generated_video_paths", "video_generation_info": "video_generation_info", }, transform_input=self._prepare_video_generation, @@ -139,12 +141,31 @@ def __init__( ) ) + # Add video concatenation step (final step) + self.add_step( + PipelineStep( + name="video_concatenation", + tool=None, # We'll handle this manually in transform_input + input_mapping={ + "generated_video_paths": "video_paths", + "output_folder": "output_folder", + "output_format": "output_format", + }, + output_mapping={ + "concatenated_video_path": "concatenated_video_path", + "concatenation_info": "concatenation_info", + }, + transform_input=self._prepare_video_concatenation, + transform_output=self._extract_concatenation_result, + ) + ) + def regenerate( self, video_path: str, user_interests: str, output_folder: str = "~/Downloads", - extraction_mode: str = "interval", + extraction_mode: str = "keyframe", screenshot_interval: float = 10.0, keyframe_threshold: float = 30.0, min_interval_frames: int = 30, @@ -205,7 +226,7 @@ def _prepare_image_generation(self, tool_input: Dict[str, Any]) -> Dict[str, Any """ image_prompts = tool_input.get("prompt", []) output_folder = tool_input.get("output_folder", "~/Downloads") - aspect_ratio = tool_input.get("aspect_ratio", "1:1") + aspect_ratio = tool_input.get("aspect_ratio", "9:16") # Always use png for images, regardless of the global output_format output_format = "png" @@ -421,6 +442,153 @@ def _extract_video_paths(self, tool_output: Dict[str, Any]) -> Dict[str, Any]: "video_generation_errors": errors, } + def _prepare_video_concatenation(self, tool_input: Dict[str, Any]) -> Dict[str, Any]: + """ + Prepare input for video concatenation step. + + Takes the list of generated videos and concatenates them into a single video. + """ + # The input mapping should have already mapped generated_video_paths to video_paths + # But let's check both to be safe + generated_videos = tool_input.get("video_paths", []) + if not generated_videos: + # Fallback: try to get from the original key + generated_videos = tool_input.get("generated_video_paths", []) + + output_folder = tool_input.get("output_folder", "~/Downloads") + output_format = tool_input.get("output_format", "mp4") + + # Save concatenated video directly in the main output folder, not a subfolder + concatenated_video_path = f"{output_folder}/final_concatenated_video.{output_format}" + + # Display concatenation info + print("\nšŸŽ¬ VIDEO CONCATENATION STEP") + print(f"šŸ“ Output folder: {output_folder}") + print(f"šŸŽžļø Concatenating {len(generated_videos)} videos:") + + for i, video_path in enumerate(generated_videos): + video_name = os.path.basename(video_path) if video_path else "No video" + print(f" Video {i + 1}: {video_name}") + + # Debug: Print the concatenation input + if self.debug: + print("\nšŸ” DEBUG - Video Concatenation Input:") + print(f" Number of videos: {len(generated_videos)}") + for i, video_path in enumerate(generated_videos): + print(f" Video {i + 1}: {video_path}") + print(f" Output path: {concatenated_video_path}") + + return { + "video_paths": generated_videos, + "output_folder": output_folder, + "output_path": concatenated_video_path, + "output_format": output_format, + } + + def _extract_concatenation_result(self, tool_output: Dict[str, Any]) -> Dict[str, Any]: + """ + Extract concatenation result from video concatenation output. + """ + concatenated_video_path = tool_output.get("concatenated_video_path", "") + concatenation_info = tool_output.get("concatenation_info", {}) + + if concatenated_video_path and os.path.exists(concatenated_video_path): + file_size = os.path.getsize(concatenated_video_path) + print(f"āœ… Concatenated video created: {os.path.basename(concatenated_video_path)}") + print(f"šŸ“ File size: {file_size:,} bytes ({file_size / 1024 / 1024:.1f} MB)") + + if concatenation_info: + duration = concatenation_info.get("total_duration", 0) + frames = concatenation_info.get("frame_count", 0) + resolution = f"{concatenation_info.get('width', 0)}x{concatenation_info.get('height', 0)}" + fps = concatenation_info.get("fps", 0) + print(f"ā±ļø Duration: {duration:.2f} seconds") + print(f"šŸŽžļø Frames: {frames:,}") + print(f"šŸ“ Resolution: {resolution}") + print(f"šŸŽ¬ FPS: {fps}") + else: + print("āŒ Video concatenation failed") + if concatenation_info and concatenation_info.get("error_message"): + print(f"Error: {concatenation_info['error_message']}") + + # Debug: Print the concatenation output + if self.debug: + print("\nšŸ” DEBUG - Video Concatenation Output:") + print(f" Concatenated video path: {concatenated_video_path}") + print(f" Concatenation info: {concatenation_info}") + + return { + "concatenated_video_path": concatenated_video_path, + "concatenation_info": concatenation_info, + } + + def _execute_video_concatenation(self, step_input: Dict[str, Any]) -> Dict[str, Any]: + """ + Execute video concatenation step. + + Args: + step_input: Input data for the concatenation step + + Returns: + Dictionary containing concatenation results + """ + video_paths = step_input.get("video_paths", []) + output_folder = step_input.get("output_folder", "~/Downloads") + output_path = step_input.get("output_path", "") + + if not video_paths: + return { + "concatenated_video_path": "", + "concatenation_info": {"success": False, "error_message": "No videos to concatenate"}, + } + + # Create output directory + os.makedirs(output_folder, exist_ok=True) + + # Create a temporary folder with the videos for concatenation + import shutil + import tempfile + + with tempfile.TemporaryDirectory() as temp_dir: + # Copy videos to temp directory with numbered names to preserve order + temp_video_paths = [] + for i, video_path in enumerate(video_paths): + if os.path.exists(video_path): + # Create numbered filename to preserve order + video_file = Path(video_path) + temp_name = f"{i:04d}_{video_file.name}" + temp_path = Path(temp_dir) / temp_name + shutil.copy2(video_path, temp_path) + temp_video_paths.append(str(temp_path)) + + if not temp_video_paths: + return { + "concatenated_video_path": "", + "concatenation_info": {"success": False, "error_message": "No valid video files found"}, + } + + # Perform concatenation + result = concatenate_videos_from_folder( + folder_path=temp_dir, + output_path=output_path, + sort_by_name=True, # This will sort by the numbered filenames + target_fps=None, # Use first video's FPS + target_resolution=None, # Use first video's resolution + ) + + return { + "concatenated_video_path": result.output_path if result.success else "", + "concatenation_info": { + "success": result.success, + "total_duration": result.total_duration, + "frame_count": result.frame_count, + "width": result.width, + "height": result.height, + "fps": result.fps, + "error_message": result.error_message, + }, + } + def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: """ Execute the pipeline with the given input. @@ -477,8 +645,15 @@ def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: print(f" {key}: {value}") # Execute the step - executor = PipelineStepExecutor(step) - step_output = executor.execute(current_input) + if step.name == "video_concatenation": + # Handle video concatenation manually + step_input = step.transform_input(current_input) if step.transform_input else current_input + step_output = self._execute_video_concatenation(step_input) + if step.transform_output: + step_output = step.transform_output(step_output) + else: + executor = PipelineStepExecutor(step) + step_output = executor.execute(current_input) # Debug: Show output from this step if self.debug: @@ -567,9 +742,9 @@ def main(): parser.add_argument( "--extraction-mode", choices=["interval", "keyframe"], - default="interval", + default="keyframe", help=( - "Extraction mode: 'interval' for regular intervals or " "'keyframe' for scene changes (default: interval)" + "Extraction mode: 'interval' for regular intervals or " "'keyframe' for scene changes (default: keyframe)" ), ) parser.add_argument( @@ -590,7 +765,7 @@ def main(): default=30, help=("Minimum frames between keyframes " "(for keyframe mode, default: 30)"), ) - parser.add_argument("--aspect-ratio", default="1:1", help="Aspect ratio for generated images (default: 1:1)") + parser.add_argument("--aspect-ratio", default="9:16", help="Aspect ratio for generated images (default: 9:16)") parser.add_argument("--output-format", default="mp4", help="Output format for generated videos (default: mp4)") parser.add_argument( "--image-generator", @@ -678,7 +853,7 @@ def main(): # Get video paths from the result generated_videos = result.get("generated_video_paths", []) if generated_videos: - print("šŸ“ Videos stored at:") + print("šŸ“ Individual videos stored at:") for i, video_path in enumerate(generated_videos): print(f" Video {i + 1}: {video_path}") # Show relative path if it's in Downloads @@ -689,6 +864,39 @@ def main(): else: print("āŒ No videos generated") + # Get concatenated video path from the result + concatenated_video_path = result.get("concatenated_video_path", "") + concatenation_info = result.get("concatenation_info", {}) + + if concatenated_video_path and os.path.exists(concatenated_video_path): + print("\nšŸŽ¬ FINAL CONCATENATED VIDEO:") + print(f"šŸ“ File: {concatenated_video_path}") + + # Show relative path if it's in Downloads + downloads_path = os.path.expanduser("~/Downloads") + if concatenated_video_path.startswith(downloads_path): + relative_path = os.path.relpath(concatenated_video_path, downloads_path) + print(f"šŸ“‚ Relative to Downloads: {relative_path}") + + # Show video properties + if concatenation_info: + duration = concatenation_info.get("total_duration", 0) + frames = concatenation_info.get("frame_count", 0) + resolution = f"{concatenation_info.get('width', 0)}x{concatenation_info.get('height', 0)}" + fps = concatenation_info.get("fps", 0) + print(f"ā±ļø Duration: {duration:.2f} seconds") + print(f"šŸŽžļø Frames: {frames:,}") + print(f"šŸ“ Resolution: {resolution}") + print(f"šŸŽ¬ FPS: {fps}") + + # Show file size + file_size = os.path.getsize(concatenated_video_path) + print(f"šŸ“ File size: {file_size:,} bytes ({file_size / 1024 / 1024:.1f} MB)") + elif concatenation_info and not concatenation_info.get("success", True): + print(f"\nāŒ Video concatenation failed: {concatenation_info.get('error_message', 'Unknown error')}") + else: + print("\nāŒ No concatenated video created") + # Get image paths from the result generated_images = result.get("generated_image_paths", []) if generated_images: