Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Settings:
AWS_SQS_FUTURE_POSTS_QUEUE_URL = os.getenv("AWS_SQS_FUTURE_POSTS_QUEUE_URL")
AWS_SQS_IMMEDIATE_POSTS_QUEUE_URL = os.getenv("AWS_SQS_IMMEDIATE_POSTS_QUEUE_URL")
AWS_S3_XPOST_BUCKET_NAME = os.getenv("AWS_S3_XPOST_BUCKET_NAME")
AWS_S3_MUSIC_BUCKET_NAME = os.getenv("AWS_S3_MUSIC_BUCKET_NAME")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_REGION = os.getenv("AWS_REGION")
Expand Down
12 changes: 12 additions & 0 deletions backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from routes.posterRoutes import auth_router, protected_router
from routes.casterRoutes import caster_router
from routes.clipsterRoutes import clipster_router
from routes.musicRoutes import music_router
from workers.PostWorker import PostWorker
from workers.TaskPoller import TaskPoller
from workers.MusicPoller import MusicPoller
import asyncio
import threading
from contextlib import asynccontextmanager
Expand Down Expand Up @@ -35,6 +37,7 @@ async def lifespan(app: FastAPI):
app.include_router(protected_router)
app.include_router(caster_router)
app.include_router(clipster_router)
app.include_router(music_router)

# Worker thread function - each worker gets its own event loop
def run_worker(worker_class, worker_name):
Expand Down Expand Up @@ -77,5 +80,14 @@ def run_worker(worker_class, worker_name):
task_poller_thread.start()
worker_threads.append(task_poller_thread)

# MusicPoller thread
music_poller_thread = threading.Thread(
target=run_worker,
args=(MusicPoller, "MusicPoller"),
daemon=True
)
music_poller_thread.start()
worker_threads.append(music_poller_thread)

# Keep track of all worker threads
app.state.worker_threads = worker_threads
2 changes: 1 addition & 1 deletion backend/routes/clipsterRoutes.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ async def generate_narration(request: Request, narration_request: NarrationReque
user_id = request.state.user_id

narration_generator = NarrationGenerator.get_instance()
voice = await narration_generator.generate_narration_elevenlabs(narration_request.script_segments)
voice = await narration_generator.generate_narration(narration_request.script_segments)

# decode audio to wav and store in s3
audio_bytes = base64.b64decode(voice.audio_base_64)
Expand Down
147 changes: 147 additions & 0 deletions backend/routes/musicRoutes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
from urllib.parse import unquote
from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile, Form, File
import boto3
from botocore.exceptions import ClientError
from typing import List, Optional
from config import settings
from clients.supabaseClient import supabase
from middleware import auth_middleware

music_router = APIRouter(
dependencies=[Depends(auth_middleware)]
)

# Initialize S3 client
s3_client = boto3.client(
's3',
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
)

# Use bucket name from config
MUSIC_BUCKET_NAME = settings.AWS_S3_MUSIC_BUCKET_NAME

@music_router.get("/api/music/get-tracks")
async def get_music_tracks(request: Request):
"""Gets the available music tracks from the database and returns presigned URLs"""
expiration = 3600 # Presigned URL expiration time in seconds

# Get the list of music tracks from the database
print("Getting music tracks from database")
response = supabase.table('music_tracks').select('*').execute()
music_data = response.data

tracks = []

try:
for track in music_data:
track['audio_url'] = s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': MUSIC_BUCKET_NAME, 'Key': track['s3_key']},
ExpiresIn=expiration
)
tracks.append(track)

except ClientError as e:
return {"error": str(e)}

return {"tracks": tracks}

@music_router.get("/api/music/get-track/{track_id}")
async def get_music_track(track_id: str, request: Request):
"""Gets a specific music track by ID"""
# Get track from database
print(f"Getting music track with ID: {track_id}")
response = supabase.table('music_tracks').select('*').eq('id', track_id).execute()

if not response.data:
raise HTTPException(status_code=404, detail="Music track not found")

track = response.data[0]

try:
# Generate presigned URL
track['audio_url'] = s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': MUSIC_BUCKET_NAME, 'Key': track['s3_key']},
ExpiresIn=3600 # 1 hour
)
except ClientError as e:
raise HTTPException(status_code=500, detail=f"Error generating URL: {str(e)}")

return track

@music_router.get("/api/music/get_stream_url_by_s3_key/{s3_key:path}")
async def get_music_stream_url_by_s3_key(s3_key: str, request: Request):
"""Generates a presigned URL for a music track by its S3 key"""
# URL decode the key in case it contains URL-encoded characters
s3_key = unquote(s3_key)

try:
# Generate presigned URL that's valid for 1 hour
presigned_url = s3_client.generate_presigned_url(
'get_object',
Params={
'Bucket': MUSIC_BUCKET_NAME,
'Key': s3_key,
},
ExpiresIn=3600
)

return {
"stream_url": presigned_url
}
except Exception as e:
print(f"Exception: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error generating stream URL: {str(e)}")

@music_router.post("/api/music/upload")
async def upload_music_track(
request: Request,
file: UploadFile = File(...),
title: str = Form(None),
artist: str = Form(None),
description: str = Form(""),
category: str = Form("Unknown")
):
"""Uploads a music track to S3 and adds it to the database"""
user_id = request.state.user_id

try:
# Generate a unique S3 key based on filename
filename = file.filename
s3_key = f"demo/{user_id}/{filename}"

# Prepare metadata
metadata = {
"title": title or filename,
"artist": artist or "Unknown",
"description": description,
"genre": category # Store category as genre in metadata
}

# Upload to S3
s3_client.upload_fileobj(
file.file,
MUSIC_BUCKET_NAME,
s3_key,
ExtraArgs={
'ContentType': file.content_type,
'Metadata': metadata
}
)

# Add to Supabase with correct schema fields
response = supabase.table("music_tracks").insert({
"s3_key": s3_key,
"title": metadata["title"],
"artist": metadata["artist"],
"description": description,
"category": category,
"duration": 0 # Default duration until we can determine it
}).execute()

return response.data[0]

except Exception as e:
raise HTTPException(status_code=500, detail=f"Error uploading music track: {str(e)}")
111 changes: 111 additions & 0 deletions backend/workers/MusicPoller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
import boto3
from clients.supabaseClient import supabase
from config import settings

logger = logging.getLogger(__name__)

class MusicPoller:
def __init__(self, poll_interval: int = 60, max_concurrent_tasks: int = 5):
self.poll_interval = poll_interval
self.max_concurrent_tasks = max_concurrent_tasks
self.is_running = False
self.executor = ThreadPoolExecutor(max_workers=max_concurrent_tasks)
self.s3_client = boto3.client(
's3',
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
)
self.MUSIC_BUCKET_NAME = settings.AWS_S3_MUSIC_BUCKET_NAME

async def start(self):
self.is_running = True
while self.is_running:
try:
await self.sync_music_tracks()
await asyncio.sleep(self.poll_interval)
except Exception as e:
logger.error(f"Error in music poller: {str(e)}")
await asyncio.sleep(self.poll_interval)

async def stop(self):
self.is_running = False
self.executor.shutdown(wait=True)

async def sync_music_tracks(self):
"""Synchronize S3 music bucket contents with Supabase music_tracks table"""
try:
# Get all music files from S3
s3_objects = await asyncio.get_event_loop().run_in_executor(
self.executor,
lambda: self.s3_client.list_objects_v2(Bucket=self.MUSIC_BUCKET_NAME)
)

if 'Contents' not in s3_objects:
logger.info("No music tracks found in S3 bucket")
return

# Get existing tracks from Supabase
response = await asyncio.get_event_loop().run_in_executor(
self.executor,
lambda: supabase.table("music_tracks").select("s3_key").execute()
)

# Create a set of existing S3 keys for fast lookup
existing_keys = {track['s3_key'] for track in response.data}

# Process tracks in chunks to control concurrency
s3_tracks = s3_objects['Contents']
chunk_size = self.max_concurrent_tasks

for i in range(0, len(s3_tracks), chunk_size):
chunk = s3_tracks[i:i + chunk_size]
tasks = [self.process_track(track, existing_keys) for track in chunk]
await asyncio.gather(*tasks, return_exceptions=True)

print(f"Synced {len(s3_tracks)} music tracks from S3 to Supabase")
except Exception as e:
logger.error(f"Error in sync_music_tracks: {str(e)}")

async def process_track(self, track, existing_keys):
"""Process a single music track from S3"""
try:
s3_key = track['Key']

# Skip if already in database
if s3_key in existing_keys:
return

# Get file metadata
head_object = await asyncio.get_event_loop().run_in_executor(
self.executor,
lambda: self.s3_client.head_object(Bucket=self.MUSIC_BUCKET_NAME, Key=s3_key)
)

# Extract filename without extension as title
filename = s3_key.split('/')[-1]
title = '.'.join(filename.split('.')[:-1]) if '.' in filename else filename

# Default values
metadata = head_object.get('Metadata', {})

# Insert track into Supabase with correct schema fields
await asyncio.get_event_loop().run_in_executor(
self.executor,
lambda: supabase.table("music_tracks").insert({
"s3_key": s3_key,
"title": metadata.get('title', title),
"artist": metadata.get('artist', 'Unknown'),
"description": metadata.get('description', ''),
"category": metadata.get('genre', 'Unknown'),
"duration": int(float(metadata.get('duration', 0)))
}).execute()
)

logger.info(f"Added new track to database: {s3_key}")
print(f"Added new track to database: {s3_key}")

except Exception as e:
logger.error(f"Error processing track {track.get('Key')}: {str(e)}")
Loading
Loading