Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ name: Build/run tgov

on:
push:
branches: [ "main", "deploy-lambda", "test-flows" ]
branches: [ "main", "diarize-flow" ]
pull_request:
branches: [ "main" ]

permissions:
contents: read
id-token: write
contents: read

jobs:
build:
Expand All @@ -16,13 +17,15 @@ jobs:
env:
POETRY_VERSION: "1.3.2"
POETRY_VENV: "/opt/poetry-venv"

S3_BUCKET: ${{ secrets.S3_BUCKET }}
AWS_DEFAULT_REGION: us-east-2
steps:
- uses: actions/checkout@v4
- name: Set up Python 3.11
uses: actions/setup-python@v3
with:
python-version: "3.11"

- name: Install dependencies
run: |
set -ex
Expand All @@ -36,14 +39,14 @@ jobs:
/opt/poetry-venv/bin/poetry config virtualenvs.create false
/opt/poetry-venv/bin/poetry install
/opt/poetry-venv/bin/poetry env info

- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-session-token: ${{ secrets.AWS_SESSION_TOKEN }}
role-to-assume: arn:aws:iam::480103772849:role/GithubCFTRole
aws-region: us-east-2
audience: sts.amazonaws.com

- name: Run Diarization
run: |
/opt/poetry-venv/bin/python -m flows.translate_meetings
/opt/poetry-venv/bin/python -m flows.transcribe_meetings
31 changes: 31 additions & 0 deletions db/migrate_s3_path.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
Migration script to add s3_path field to existing meeting records.
"""

from dyntastic import A
from src.models.meeting import Meeting


def migrate_s3_path():
"""
Add s3_path field to all existing meeting records that don't have it.
"""
print("Starting migration to add s3_path field to existing meetings...")

# Get all meetings
meetings = Meeting.scan()
updated_count = 0

for meeting in meetings:
# Check if s3_path field exists and is None
if not hasattr(meeting, "s3_path") or meeting.s3_path is None:
print(f"Updating meeting: {meeting.meeting} ({meeting.date})")
meeting.s3_path = None
meeting.save()
updated_count += 1

print(f"Migration complete. Updated {updated_count} meetings.")


if __name__ == "__main__":
migrate_s3_path()
11 changes: 9 additions & 2 deletions db/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from src.models.meeting import Meeting


def get_meetings(days: int = 7, video: Optional[bool] = None) -> List[Meeting]:
def get_meetings(
days: int = 7, video: Optional[bool] = None, s3_path: Optional[bool] = None
) -> List[Meeting]:
"""
Get meetings that occurred in the past number of days from now.
"""
Expand All @@ -13,6 +15,11 @@ def get_meetings(days: int = 7, video: Optional[bool] = None) -> List[Meeting]:
meetings = Meeting.scan(
A.date >= target_date,
)
meetings_list = [m for m in meetings if (video is None or bool(m.video) == video)]
meetings_list = [
m
for m in meetings
if (video is None or bool(m.video) == video)
and (s3_path is None or bool(m.s3_path) == s3_path)
]

return list(meetings_list)
16 changes: 8 additions & 8 deletions flows/translate_meetings.py → flows/download_meetings.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
from prefect import flow

from db.queries import get_meetings
from tasks.diarize import diarize_meeting
from tasks.diarize import download_video_and_put_in_s3
from tasks.meetings import register_meetings


@flow(log_prints=True)
def translate_meetings():
# @flow(log_prints=True)
def download_meetings():
new_meetings = register_meetings()
print(f"Registered {len(new_meetings)} new meetings")
meetings_to_diarize = get_meetings(video=True)
print(f"Found {len(meetings_to_diarize)} meetings to diarize")
for meeting in meetings_to_diarize:
diarize_meeting(meeting)
meetings_to_download = get_meetings(days=7, video=True, s3_path=False)
print(f"Found {len(meetings_to_download)} meetings to download")
for meeting in meetings_to_download:
download_video_and_put_in_s3(meeting)
# new_subtitled_video_pages = await create_subtitled_video_pages(new_transcribed_meetings)
# new_translated_meetings = await translate_transcriptions(new_transcribed_meetings)


if __name__ == "__main__":
translate_meetings()
download_meetings()
21 changes: 21 additions & 0 deletions flows/transcribe_meetings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from prefect import flow
import os

from db.queries import get_meetings
from tasks.diarize import diarize_meeting, BUCKET_NAME


@flow(log_prints=True)
def transcribe_meetings():
print(f"S3_BUCKET environment variable: {os.getenv('S3_BUCKET')}")
print(f"BUCKET_NAME from tasks.diarize: {BUCKET_NAME}")

meetings_to_diarize = get_meetings(video=True, s3_path=True)
print(f"Found {len(meetings_to_diarize)} meetings to diarize")
for meeting in meetings_to_diarize:
print(f"Processing meeting: {meeting.meeting} with s3_path: {meeting.s3_path}")
diarize_meeting(meeting)


if __name__ == "__main__":
transcribe_meetings()
19 changes: 19 additions & 0 deletions src/aws.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from pathlib import Path

import boto3
from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError
Expand Down Expand Up @@ -52,3 +53,21 @@ def save_content_to_s3(content, bucket_name, s3_key, content_type):
region = s3_client.meta.region_name
url = f"https://{bucket_name}.s3.{region}.amazonaws.com/{s3_key}"
return HttpUrl(url)


def get_video_from_s3(bucket_name, s3_path):
try:
# Create output directory if it doesn't exist
output_dir = Path("data/video")
output_dir.mkdir(parents=True, exist_ok=True)

# Define output path
output_path = output_dir / Path(s3_path).name

# Download file from S3
s3_client.download_file(bucket_name, s3_path, str(output_path))
print(f"Downloaded {s3_path} from S3 to {output_path}")
return output_path
except ClientError as e:
print(f"Failed to get video from S3: {str(e)}")
return None
9 changes: 7 additions & 2 deletions src/models/meeting.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Optional

from dyntastic import Dyntastic
from pydantic import BaseModel, Field, HttpUrl
from pydantic import BaseModel, Field, HttpUrl, ConfigDict
from datetime import datetime
from typing import List

Expand All @@ -18,18 +18,23 @@ class Meeting(Dyntastic):
__table_name__ = "tgov-meeting"
__hash_key__ = "clip_id"

model_config = ConfigDict(extra="ignore")

clip_id: Optional[str] = Field(None, description="Granicus clip ID")
meeting: str = Field(description="Name of the meeting")
date: datetime = Field(description="Date and time of the meeting")
duration: str = Field(description="Duration of the meeting")
agenda: Optional[HttpUrl] = Field(None, description="URL to the meeting agenda")
video: Optional[HttpUrl] = Field(None, description="URL to the meeting video")
transcripts: Optional[List[HttpUrl]] = Field(
transcripts: Optional[List[str]] = Field(
None, description="URLs to the meeting transcripts"
)
subtitles: Optional[List[HttpUrl]] = Field(
None, description="URLs to the meeting subtitle tracks"
)
s3_path: Optional[str] = Field(
default=None, description="S3 path to the meeting video"
)

def __str__(self) -> str:
"""String representation of the meeting"""
Expand Down
12 changes: 9 additions & 3 deletions src/run_diarization.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import asyncio
import os
import json

from pathlib import Path

from src.aws import save_content_to_s3
from src.models.meeting import GranicusPlayerPage
from src.models.meeting import GranicusPlayerPage, Meeting
from src.granicus import get_video_player
from src.videos import download_file, transcribe_video_with_diarization

Expand Down Expand Up @@ -38,19 +39,24 @@ def download_video(file_name: str, video_url: str):
return video_file


def run_diarization(video_file: Path):
def run_diarization(video_file: Path, meeting: Meeting):
transcription_dir = Path("data/transcripts")

transcription = asyncio.run(
transcribe_video_with_diarization(video_file, transcription_dir)
)
# Add transcript to S3
# Convert dictionary to JSON string before saving
transcription_json = json.dumps(transcription, indent=2, ensure_ascii=False)
save_content_to_s3(
transcription,
transcription_json,
BUCKET_NAME,
f"{FOLDER_NAME}/{video_file.name}.json",
"application/json",
)
meeting.transcripts = [f"{FOLDER_NAME}/{video_file.name}.json"]
meeting.save()

print(transcription)


Expand Down
2 changes: 0 additions & 2 deletions src/videos.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ def download_file(url: str, output_path: Path):
)

print(f"Download complete: {url}")
# Add to S3
upload_to_s3(output_path, BUCKET_NAME, f"{FOLDER_NAME}/{output_path.name}")
return output_path


Expand Down
32 changes: 30 additions & 2 deletions tasks/diarize.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,42 @@
import os
from src.aws import get_video_from_s3, upload_to_s3
from src.run_diarization import download_video, run_diarization
from prefect import task

from src.models.meeting import Meeting


BUCKET_NAME = os.getenv("S3_BUCKET")
FOLDER_NAME = "videos"


# @task
def download_video_and_put_in_s3(meeting: Meeting):
video_file = download_video(f"{meeting.meeting}_{meeting.date}", str(meeting.video))
if video_file:
print(f"Uploading video to S3: {video_file}")
s3_path = f"{FOLDER_NAME}/{video_file.name}"
upload_to_s3(video_file, BUCKET_NAME, f"{FOLDER_NAME}/{video_file.name}")
print(f"Uploaded video to S3: {s3_path}")
print("Saving meeting.")
meeting.s3_path = s3_path
meeting.save()
else:
print("Video file not found")


@task
def diarize_meeting(meeting: Meeting):
video_file = download_video(f"{meeting.meeting}_{meeting.date}", str(meeting.video))
if BUCKET_NAME is None:
raise ValueError("S3_BUCKET environment variable is not set")

if meeting.s3_path is None:
print(f"Meeting {meeting.meeting} has no s3_path, skipping")
return

video_file = get_video_from_s3(BUCKET_NAME, meeting.s3_path)
if video_file:
run_diarization(video_file)
run_diarization(video_file, meeting)
else:
print("Video file not found")
# TODO: Update meeting with transcript location
2 changes: 1 addition & 1 deletion tasks/meetings.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from src.models.meeting import Meeting


@task
# @task
def register_meetings() -> List[Meeting]:
# TODO: accept max_limit parameter
tgov_meetings = asyncio.run(get_tgov_meetings())
Expand Down
Loading