diff --git a/memory_agents/youtube_trend_agent/.streamlit/config.toml b/memory_agents/youtube_trend_agent/.streamlit/config.toml new file mode 100644 index 00000000..6ae713d1 --- /dev/null +++ b/memory_agents/youtube_trend_agent/.streamlit/config.toml @@ -0,0 +1,8 @@ +[theme] +base = "light" + +# Optional: +# primaryColor = "#0b57d0" +# backgroundColor = "#ffffff" +# secondaryBackgroundColor = "#f5f7fb" +# textColor = "#000000" diff --git a/memory_agents/youtube_trend_agent/README.md b/memory_agents/youtube_trend_agent/README.md new file mode 100644 index 00000000..45df21ba --- /dev/null +++ b/memory_agents/youtube_trend_agent/README.md @@ -0,0 +1,58 @@ +## YouTube Trend Analysis Agent + +YouTube channel analysis agent powered by **Memori**, **Agno (Nebius)**, **Exa**, and **yt-dlp**. +Paste your YouTube channel URL, ingest recent videos into Memori, then chat with an agent that surfaces trends and concrete new video ideas grounded in your own content. + +### Features + +- **Direct YouTube scraping**: Uses `yt-dlp` to scrape your channel or playlist from YouTube and collect titles, tags, dates, views, and descriptions. +- **Memori memory store**: Stores each video as a Memori memory (via OpenAI) for fast semantic search and reuse across chats. +- **Web trend context with Exa**: Calls Exa to pull recent articles and topics for your niche and blends them with your own channel history. +- **Streamlit UI**: Sidebar for API keys + channel URL and a chat area for asking about trends and ideas. + +--- + +### Setup (with `uv`) + +1. **Install `uv`** (if you don’t have it yet): + +```bash +curl -LsSf https://astral.sh/uv/install.sh | sh +``` + +2. **Create the environment and install dependencies from `pyproject.toml`:** + +```bash +cd memory_agents/youtube_trend_agent +uv sync +``` + +This will create a virtual environment (if needed) and install all dependencies declared in `pyproject.toml`. + +3. **Environment variables** (set in your shell or a local `.env` file in this folder): + +- `NEBIUS_API_KEY` – required (used both for Memori ingestion and the Agno-powered advisor). +- `EXA_API_KEY` – optional but recommended (for external trend context via Exa). +- `MEMORI_API_KEY` – optional, for Memori Advanced Augmentation / higher quotas. +- `SQLITE_DB_PATH` – optional, defaults to `./memori.sqlite` if unset. + +--- + +### Run + +From the `youtube_trend_agent` directory: + +```bash +uv run streamlit run app.py +``` + +In the **sidebar**: + +1. Enter your **Nebius**, optional **Exa**, and optional **Memori** API keys. +2. Paste your **YouTube channel (or playlist) URL**. +3. Click **“Ingest channel into Memori”** to scrape and store recent videos. + +Then use the main chat box to ask things like: + +- “Suggest 5 new video ideas that build on my existing content and current trends.” +- “What trends am I missing in my current uploads?” diff --git a/memory_agents/youtube_trend_agent/app.py b/memory_agents/youtube_trend_agent/app.py new file mode 100644 index 00000000..70675465 --- /dev/null +++ b/memory_agents/youtube_trend_agent/app.py @@ -0,0 +1,281 @@ +""" +YouTube Trend Analysis Agent with Memori, Agno (Nebius), and YouTube scraping. + +Streamlit app: +- Sidebar: API keys + YouTube channel URL + "Ingest channel into Memori" button. +- Main: Chat interface to ask about trends and get new video ideas. + +This app uses: +- Nebius (via both the OpenAI SDK and Agno's Nebius model) for LLM reasoning. +- yt-dlp to scrape YouTube channel/playlist videos. +- Memori to store and search your channel's video history. +""" + +import base64 +import os + +import streamlit as st +from agno.agent import Agent +from agno.models.nebius import Nebius +from dotenv import load_dotenv + +from core import fetch_exa_trends, ingest_channel_into_memori + + +def _load_inline_image(path: str, height_px: int) -> str: + """Return an inline tag for a local PNG, or empty string on failure.""" + try: + with open(path, "rb") as f: + encoded = base64.b64encode(f.read()).decode() + return ( + f"Logo" + ) + except Exception: + return "" + + +def main(): + load_dotenv() + + # Page config + st.set_page_config( + page_title="YouTube Trend Analysis Agent", + layout="wide", + ) + + # Branded title with Memori logo (reusing the pattern from AI Consultant Agent) + memori_img_inline = _load_inline_image( + "assets/Memori_Logo.png", + height_px=85, + ) + title_html = f""" +
+

+ YouTube Trend Analysis Agent with + {memori_img_inline} +

+
+""" + st.markdown(title_html, unsafe_allow_html=True) + + # Initialize session state + if "messages" not in st.session_state: + st.session_state.messages = [] + # Memori/OpenAI client will be initialized lazily when needed. + + # Sidebar + with st.sidebar: + st.subheader("🔑 API Keys & Channel") + + # Nebius logo above the Nebius API key field + try: + st.image("assets/Nebius_Logo.png", width=120) + except Exception: + # Non-fatal if the logo is missing + pass + + nebius_api_key_input = st.text_input( + "Nebius API Key", + value=os.getenv("NEBIUS_API_KEY", ""), + type="password", + help="Your Nebius API key (used for both Memori and Agno).", + ) + + exa_api_key_input = st.text_input( + "Exa API Key (optional)", + value=os.getenv("EXA_API_KEY", ""), + type="password", + help="Used to fetch external web trends via Exa AI when suggesting new ideas.", + ) + + memori_api_key_input = st.text_input( + "Memori API Key (optional)", + value=os.getenv("MEMORI_API_KEY", ""), + type="password", + help="Used for Memori Advanced Augmentation and higher quotas.", + ) + + channel_url_input = st.text_input( + "YouTube channel / playlist URL", + placeholder="https://www.youtube.com/@YourChannel", + ) + + if st.button("Save Settings"): + if nebius_api_key_input: + os.environ["NEBIUS_API_KEY"] = nebius_api_key_input + if exa_api_key_input: + os.environ["EXA_API_KEY"] = exa_api_key_input + if memori_api_key_input: + os.environ["MEMORI_API_KEY"] = memori_api_key_input + + st.success("✅ API keys saved for this session.") + + st.markdown("---") + + if st.button("Ingest channel into Memori"): + if not os.getenv("NEBIUS_API_KEY"): + st.warning("NEBIUS_API_KEY is required before ingestion.") + elif not channel_url_input.strip(): + st.warning("Please enter a YouTube channel or playlist URL.") + else: + with st.spinner( + "📥 Scraping channel and ingesting videos into Memori…" + ): + count = ingest_channel_into_memori(channel_url_input.strip()) + st.success(f"✅ Ingested {count} video(s) into Memori.") + + st.markdown("---") + st.markdown("### 💡 About") + st.markdown( + """ + This agent: + + - Scrapes your **YouTube channel** directly from YouTube using yt-dlp. + - Stores video metadata & summaries in **Memori**. + - Uses **Exa** and your channel info stored in **Memori** to surface trends and new video ideas. + """ + ) + + # Get keys for main app logic + nebius_key = os.getenv("NEBIUS_API_KEY", "") + if not nebius_key: + st.warning( + "⚠️ Please enter your Nebius API key in the sidebar to start chatting!" + ) + st.stop() + + # Initialize Nebius model for the advisor (once) + if "nebius_model" not in st.session_state: + try: + st.session_state.nebius_model = Nebius( + id=os.getenv( + "YOUTUBE_TREND_MODEL", + "moonshotai/Kimi-K2-Instruct", + ), + api_key=nebius_key, + ) + except Exception as e: + st.error(f"Failed to initialize Nebius model: {e}") + st.stop() + + # Display chat history + st.markdown( + "

YouTube Trend Chat

", + unsafe_allow_html=True, + ) + for message in st.session_state.messages: + with st.chat_message(message["role"]): + st.markdown(message["content"]) + + # Chat input + prompt = st.chat_input("Ask about your channel trends or new video ideas…") + if prompt: + st.session_state.messages.append({"role": "user", "content": prompt}) + with st.chat_message("user"): + st.markdown(prompt) + + with st.chat_message("assistant"): + with st.spinner("🤔 Analyzing your channel memories…"): + try: + # Build context from Memori (if available) and from cached channel videos + memori_context = "" + mem = st.session_state.get("memori") + if mem is not None and hasattr(mem, "search"): + try: + results = mem.search(prompt, limit=5) + if results: + memori_context = ( + "\n\nRelevant snippets from your channel history:\n" + + "\n".join(f"- {r}" for r in results) + ) + except Exception as e: + st.warning(f"Memori search issue: {e}") + + videos = st.session_state.get("channel_videos") or [] + video_summaries = "" + if videos: + video_summaries_lines = [] + for v in videos[:10]: + title = v.get("title") or "Untitled video" + topics = v.get("topics") or [] + topics_str = ", ".join(topics) if topics else "N/A" + views = v.get("views") or "Unknown" + desc = v.get("description") or "" + if len(desc) > 120: + desc_snip = desc[:120].rstrip() + "…" + else: + desc_snip = desc + video_summaries_lines.append( + f"- {title} | topics: {topics_str} | views: {views} | desc: {desc_snip}" + ) + video_summaries = ( + "\n\nRecent videos on this channel:\n" + + "\n".join(video_summaries_lines) + ) + + channel_name = ( + st.session_state.get("channel_title") or "this YouTube channel" + ) + + exa_trends = "" + # Fetch or reuse Exa-based trend context, if Exa is configured + if os.getenv("EXA_API_KEY") and videos: + if "exa_trends" in st.session_state: + exa_trends = st.session_state["exa_trends"] + else: + exa_trends = fetch_exa_trends(channel_name, videos) + st.session_state["exa_trends"] = exa_trends + + full_prompt = f"""You are a YouTube strategy assistant analyzing the channel '{channel_name}'. + +You have access to a memory store of the user's past videos (titles, topics, views). +Use that memory to: +- Identify topics and formats that perform well on the channel. +- Suggest concrete, fresh video ideas aligned with those trends. +- Optionally point out gaps or under-explored themes. + +Always be specific and actionable (titles, angles, hooks, examples), but ONLY answer what the user actually asks. +Do NOT provide long, generic strategy plans unless the user explicitly asks for them. + +User question: +{prompt} + +Memory context (may be partial): +{memori_context} + +Channel metadata from recent scraped videos (titles, topics, views): +{video_summaries} + +External web trends for this niche (may be partial): +{exa_trends} +""" + + advisor = Agent( + name="YouTube Trend Advisor", + model=st.session_state.nebius_model, + markdown=True, + ) + + result = advisor.run(full_prompt) + response_text = ( + str(result.content) + if hasattr(result, "content") + else str(result) + ) + + st.session_state.messages.append( + {"role": "assistant", "content": response_text} + ) + st.markdown(response_text) + except Exception as e: + err = f"❌ Error generating answer: {e}" + st.session_state.messages.append( + {"role": "assistant", "content": err} + ) + st.error(err) + + +if __name__ == "__main__": + main() diff --git a/memory_agents/youtube_trend_agent/assets/Memori_Logo.png b/memory_agents/youtube_trend_agent/assets/Memori_Logo.png new file mode 100644 index 00000000..4b4416b8 Binary files /dev/null and b/memory_agents/youtube_trend_agent/assets/Memori_Logo.png differ diff --git a/memory_agents/youtube_trend_agent/assets/Nebius_Logo.png b/memory_agents/youtube_trend_agent/assets/Nebius_Logo.png new file mode 100644 index 00000000..0d7851e8 Binary files /dev/null and b/memory_agents/youtube_trend_agent/assets/Nebius_Logo.png differ diff --git a/memory_agents/youtube_trend_agent/core.py b/memory_agents/youtube_trend_agent/core.py new file mode 100644 index 00000000..42f4f47d --- /dev/null +++ b/memory_agents/youtube_trend_agent/core.py @@ -0,0 +1,327 @@ +""" +Core logic for the YouTube Trend Analysis Agent. + +This module contains: +- Memori + Nebius initialization helpers. +- YouTube scraping utilities. +- Exa-based trend fetching. +- Channel ingestion into Memori. + +It is imported by `app.py`, which focuses on the Streamlit UI. +""" + +import json +import os + +import streamlit as st +import yt_dlp +from dotenv import load_dotenv +from exa_py import Exa +from memori import Memori +from openai import OpenAI +from sqlalchemy import create_engine, text +from sqlalchemy.orm import sessionmaker + +load_dotenv() + + +class _SilentLogger: + """Minimal logger for yt-dlp that suppresses debug/warning output.""" + + def debug(self, msg): + pass + + def warning(self, msg): + pass + + def error(self, msg): + pass + + +def init_memori_with_nebius() -> Memori | None: + """ + Initialize Memori v3 + Nebius client (via the OpenAI SDK). + + This is used so Memori can automatically persist "memories" when we send + documents through the registered Nebius-backed client. Agno + Nebius power all + YouTube analysis and idea generation. + """ + nebius_key = os.getenv("NEBIUS_API_KEY", "") + if not nebius_key: + st.warning( + "NEBIUS_API_KEY is not set – Memori v3 ingestion will not be active." + ) + return None + + try: + db_path = os.getenv("SQLITE_DB_PATH", "./memori.sqlite") + database_url = f"sqlite:///{db_path}" + engine = create_engine( + database_url, + pool_pre_ping=True, + connect_args={"check_same_thread": False}, + ) + + # Optional DB connectivity check + with engine.connect() as conn: + conn.execute(text("SELECT 1")) + + SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + + client = OpenAI( + base_url="https://api.studio.nebius.com/v1/", + api_key=nebius_key, + ) + # Use the OpenAI-compatible registration API; the client itself points to Nebius. + mem = Memori(conn=SessionLocal).openai.register(client) + # Attribution so Memori can attach memories to this process/entity. + mem.attribution(entity_id="youtube-channel", process_id="youtube-trend-agent") + mem.config.storage.build() + + st.session_state.memori = mem + st.session_state.nebius_client = client + return mem + except Exception as e: + st.warning(f"Memori v3 initialization note: {e}") + return None + + +def fetch_channel_videos(channel_url: str) -> list[dict]: + """ + Use yt-dlp to fetch recent YouTube videos for a given channel or playlist URL. + + Returns: + A list of dicts: + [ + { + "title": "...", + "url": "...", + "published_at": "...", + "views": "...", + "topics": ["...", ...] + }, + ... + ] + """ + ydl_opts = { + # Don't download video files, we only want metadata + "quiet": True, + "no_warnings": True, + "skip_download": True, + # Limit to most recent 20 videos + "playlistend": 20, + # Be forgiving if some videos fail + "ignoreerrors": True, + # Silence yt-dlp's own logging + "logger": _SilentLogger(), + } + + try: + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info(channel_url, download=False) + except Exception as e: + st.error(f"Error fetching YouTube channel info: {e}") + return [] + + entries = info.get("entries") or [] + # Cache channel title for use in prompts + if isinstance(info, dict): + st.session_state["channel_title"] = info.get("title") or "" + + videos: list[dict] = [] + + for entry in entries: + if not isinstance(entry, dict): + continue + video_id = entry.get("id") + url = entry.get("url") + # Build full watch URL when possible + full_url = url + if video_id and (not url or "watch?" not in url): + full_url = f"https://www.youtube.com/watch?v={video_id}" + + upload_date = entry.get("upload_date") or entry.get("release_date") or "" + # Convert YYYYMMDD -> YYYY-MM-DD if present + if ( + isinstance(upload_date, str) + and len(upload_date) == 8 + and upload_date.isdigit() + ): + upload_date = f"{upload_date[0:4]}-{upload_date[4:6]}-{upload_date[6:8]}" + + description = entry.get("description") or "" + duration = entry.get("duration") # in seconds, if available + + videos.append( + { + "title": entry.get("title") or "Untitled video", + "url": full_url or channel_url, + "published_at": upload_date or "Unknown", + "views": entry.get("view_count") or "Unknown", + "topics": entry.get("tags") or [], + "description": description, + "duration_seconds": duration, + } + ) + + return videos + + +def fetch_exa_trends(channel_name: str, videos: list[dict]) -> str: + """ + Use Exa AI to fetch external web trends for the channel's niche. + + Returns: + A formatted string of bullet points describing trending topics/articles. + """ + api_key = os.getenv("EXA_API_KEY", "") + if not api_key: + return "" + + # Build a niche description from tags and titles + tags: set[str] = set() + for v in videos: + for t in v.get("topics") or []: + if isinstance(t, str): + tags.add(t) + + base_niche = ", ".join(list(tags)[:10]) + if not base_niche: + titles = [v.get("title") or "" for v in videos[:5]] + base_niche = ", ".join(titles) + + if not base_niche: + return "" + + query = ( + f"Current trending topics and YouTube-style video ideas for the niche: {base_niche}. " + f"Focus on developer, programming, AI, and technology content if relevant." + ) + + try: + client = Exa(api_key=api_key) + # Keep the API call simple to avoid deprecated options like 'highlights' + res = client.search_and_contents( + query=query, + num_results=5, + type="auto", + ) + except Exception as e: + st.warning(f"Exa web search issue: {e}") + return "" + + results = getattr(res, "results", []) or [] + if not results: + return "" + + trend_lines: list[str] = [] + for doc in results[:5]: + title = getattr(doc, "title", "") or "Untitled" + url = getattr(doc, "url", "") or "" + text = getattr(doc, "text", "") or "" + snippet = " ".join(text.split())[:220] + line = f"- {title} ({url}) — {snippet}" + trend_lines.append(line) + + return "\n".join(trend_lines) + + +def ingest_channel_into_memori(channel_url: str) -> int: + """ + Scrape a YouTube channel and ingest the results into Memori. + + Returns: + Number of video documents ingested. + """ + # Ensure Memori + Nebius client are initialized + memori: Memori | None = st.session_state.get("memori") + client: OpenAI | None = st.session_state.get("nebius_client") + if memori is None or client is None: + memori = init_memori_with_nebius() + client = st.session_state.get("nebius_client") + + if memori is None or client is None: + st.error("Memori/Nebius failed to initialize; cannot ingest channel.") + return 0 + + videos = fetch_channel_videos(channel_url) + if not videos: + st.warning("No videos were parsed from the YouTube channel response.") + raw = st.session_state.get("yt_raw_response") + if raw: + st.caption("Raw YouTube agent output (truncated, for debugging):") + st.code(str(raw)[:4000]) + return 0 + else: + # Debug info to help understand what was parsed + st.info(f"Parsed {len(videos)} video item(s) from the channel response.") + st.caption("First parsed item (truncated):") + try: + st.code(json.dumps(videos[0], indent=2)[:2000]) + except Exception: + # Fallback if item isn't JSON-serializable + st.code(str(videos[0])[:2000]) + + # Cache videos in session state so the chat agent can use them directly + st.session_state["channel_videos"] = videos + + ingested = 0 + for video in videos: + title = video.get("title") or "Untitled video" + url = video.get("url") or channel_url + published_at = video.get("published_at") or "Unknown" + views = video.get("views") or "Unknown" + topics = video.get("topics") or [] + description = video.get("description") or "" + duration = video.get("duration_seconds") or "Unknown" + + topics_str = ", ".join(str(t) for t in topics) if topics else "N/A" + # Truncate very long descriptions for ingestion + desc_snippet = description[:1000] + + doc_text = f"""YouTube Video +Channel URL: {channel_url} +Title: {title} +Video URL: {url} +Published at: {published_at} +Views: {views} +Duration (seconds): {duration} +Topics: {topics_str} +Description: +{desc_snippet} +""" + + try: + # Send this document through the registered Nebius client so that + # Memori v3 can automatically capture it as a "memory". + _ = client.chat.completions.create( + model=os.getenv( + "YOUTUBE_TREND_INGEST_MODEL", + "moonshotai/Kimi-K2-Instruct", + ), + messages=[ + { + "role": "user", + "content": ( + "Store the following YouTube video metadata in memory " + "for future channel-trend analysis. Respond with a short " + "acknowledgement only.\n\n" + f"{doc_text}" + ), + } + ], + ) + ingested += 1 + except Exception as e: + st.warning(f"Memori/Nebius issue ingesting video '{title}': {e}") + + # Flush writes if needed + try: + adapter = getattr(memori.config.storage, "adapter", None) + if adapter is not None: + adapter.commit() + except Exception: + # Non-fatal; Memori will still persist most data + pass + + return ingested diff --git a/memory_agents/youtube_trend_agent/pyproject.toml b/memory_agents/youtube_trend_agent/pyproject.toml new file mode 100644 index 00000000..302fda93 --- /dev/null +++ b/memory_agents/youtube_trend_agent/pyproject.toml @@ -0,0 +1,16 @@ +[project] +name = "youtube-trend-agent" +version = "0.1.0" +description = "YouTube Trend Analysis Agent with Memori, Agno (Nebius), Exa, and yt-dlp" +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "agno>=2.2.1", + "memori>=3.0.0", + "streamlit>=1.50.0", + "python-dotenv>=1.1.0", + "openai>=2.6.1", + "sqlalchemy>=2.0.0", + "exa-py>=1.6.0", + "yt-dlp>=2025.1.1", +]