Sophisticated multi-agent system for advanced content processing using LangGraph and LangChain
- Overview
- Architecture
- Features
- Installation
- Quick Start
- Usage
- Agents
- Configuration
- Monitoring & Observability
- Testing
- Performance
- Troubleshooting
- Contributing
The SynthoraAI Agentic AI Pipeline is a production-ready, multi-agent system that processes articles through a series of specialized AI agents. Built on LangGraph's state machine framework, it provides:
- ✅ Sophisticated Content Analysis - Deep understanding of article structure and content
- ✅ Intelligent Summarization - Concise, accurate summaries with key points
- ✅ Advanced Classification - Multi-category topic classification
- ✅ Sentiment Analysis - Emotional tone and objectivity assessment
- ✅ Quality Assurance - Automatic validation with retry logic
- ✅ Production-Ready - Cloud deployment, monitoring, and scaling
The pipeline implements an assembly line pattern where articles flow through specialized agents:
┌─────────┐ ┌─────────┐ ┌───────────┐ ┌──────────┐ ┌───────────┐ ┌─────────┐ ┌────────┐
│ Intake │───▶│ Content │───▶│Summarizer │───▶│Classifier│───▶│ Sentiment │───▶│ Quality │───▶│ Output │
│ Node │ │Analyzer │ │ │ │ │ │ Analyzer │ │ Checker │ │ Node │
└─────────┘ └─────────┘ └───────────┘ └──────────┘ └───────────┘ └─────────┘ └────────┘
│
│ (retry)
└──────────┐
│
┌──────────▼─┐
│ Retry │
│ (max 3x) │
└────────────┘
The pipeline uses LangGraph to orchestrate agent interactions:
- State Management:
ArticleStateflows through all agents - Conditional Routing: Quality checks determine retry logic
- Error Handling: Graceful degradation and error nodes
- Observability: Built-in tracing and metrics
Extracts structured information from articles:
- Document structure (paragraphs, sections)
- Named entities (people, organizations, locations)
- Key dates and events
- Important facts and claims
- Writing style analysis
Generates high-quality summaries:
- 150-200 word concise summaries
- 3-5 key bullet points
- Factual accuracy preservation
- Context-aware summarization
Categorizes articles into topics:
- 15+ government-relevant categories
- Multi-label classification
- Confidence scores
- Automatic tag generation
Categories:
- Politics & Government
- Economy & Finance
- Healthcare
- Education
- Environment & Climate
- Technology & Innovation
- Security & Defense
- International Relations
- Law & Justice
- Social Issues
- Infrastructure
- Energy
- Agriculture
- Science & Research
- Public Safety
Analyzes emotional characteristics:
- Sentiment score (-1 to 1)
- Sentiment label (positive/neutral/negative)
- Objectivity score (0 to 1)
- Urgency level (low/medium/high)
- Controversy score (0 to 1)
Validates all outputs:
- Summary quality assessment
- Classification consistency
- Sentiment reasonableness
- Completeness verification
- Automatic retry logic
- Python 3.11 or later
- MongoDB (local or cloud)
- Redis (for caching)
- Google AI API key
git clone https://github.com/your-org/AI-Gov-Content-Curator.git
cd AI-Gov-Content-Curator/agentic_aipip install -r requirements.txtcp .env.example .env
# Edit .env with your API keys and configurationRequired environment variables:
GOOGLE_AI_API_KEY=your_google_ai_api_key_here
MONGODB_URI=mongodb://localhost:27017/synthoraai
REDIS_URL=redis://localhost:6379/0from agentic_ai.core.pipeline import AgenticPipeline
import asyncio
# Initialize pipeline
pipeline = AgenticPipeline()
# Process an article
result = asyncio.run(pipeline.process_article({
"id": "article-123",
"content": "Full article content here...",
"url": "https://example.com/article",
"source": "government",
"title": "Article Title"
}))
# Access results
print(f"Summary: {result['summary']}")
print(f"Topics: {result['topics']}")
print(f"Sentiment: {result['sentiment_label']}")
print(f"Quality Score: {result['quality_score']}")# Process multiple articles
articles = [
{"id": "1", "content": "...", "url": "...", "source": "gov"},
{"id": "2", "content": "...", "url": "...", "source": "gov"},
# ... more articles
]
results = asyncio.run(pipeline.process_batch(articles, max_concurrent=5))The pipeline can be imported and used directly in your Python code:
from agentic_ai import AgenticPipeline
import asyncio
async def main():
pipeline = AgenticPipeline()
result = await pipeline.process_article({
"id": "article-123",
"content": "Your article content...",
"url": "https://example.com/article",
"source": "government"
})
# Access specific results
print(f"Summary: {result['summary']}")
print(f"Key Points: {result['key_points']}")
print(f"Topics: {result['topics']}")
print(f"Primary Category: {result['primary_category']}")
print(f"Sentiment: {result['sentiment_label']} ({result['sentiment_score']})")
print(f"Objectivity: {result['objectivity_score']}")
print(f"Quality Score: {result['quality_score']}")
if __name__ == "__main__":
asyncio.run(main())The pipeline includes a Model Context Protocol (MCP) server for standardized API access:
# Development
python -m agentic_ai.mcp_server.server
# Production with Uvicorn
uvicorn agentic_ai.mcp_server.server:app --host 0.0.0.0 --port 8000 --workers 4POST /process - Process a single article
curl -X POST http://localhost:8000/process \
-H "Content-Type: application/json" \
-d '{
"id": "article-123",
"url": "https://example.com/article",
"content": "Full article content...",
"source": "government",
"title": "Article Title"
}'POST /process_batch - Process multiple articles
curl -X POST http://localhost:8000/process_batch \
-H "Content-Type: application/json" \
-d '{
"articles": [
{"id": "1", "content": "...", "url": "...", "source": "gov"},
{"id": "2", "content": "...", "url": "...", "source": "gov"}
],
"max_concurrent": 5
}'GET /health - Health check
curl http://localhost:8000/healthGET /metrics - Get pipeline metrics
curl http://localhost:8000/metricsThe pipeline supports deployment to AWS Lambda and Azure Functions.
# Deploy to staging
cd agentic_ai/aws
./deploy.sh staging
# Deploy to production
./deploy.sh productionOr use CloudFormation:
aws cloudformation create-stack \
--stack-name synthoraai-pipeline-staging \
--template-body file://cloudformation.yml \
--parameters \
ParameterKey=Environment,ParameterValue=staging \
ParameterKey=GoogleAIAPIKey,ParameterValue=your_key_here \
ParameterKey=MongoDBURI,ParameterValue=your_mongodb_uri# Deploy to staging
cd agentic_ai/azure
./deploy.sh staging
# Deploy to production
./deploy.sh productionPurpose: Extract structure and key information
Outputs:
structure: Document structure metadataentities: Named entities (people, orgs, locations)key_dates: Important dateskey_facts: Key facts and claimswriting_style: Style classification
Purpose: Generate concise summaries
Outputs:
summary: 150-200 word summarysummary_length: Word countkey_points: 3-5 bullet points
Purpose: Categorize articles
Outputs:
topics: List of relevant topicsprimary_category: Main categoryconfidence_scores: Confidence per topictags: Specific keywords
Purpose: Analyze emotional characteristics
Outputs:
sentiment_score: -1 (negative) to 1 (positive)sentiment_label: positive/neutral/negativeobjectivity_score: 0 (subjective) to 1 (objective)urgency_level: low/medium/highcontroversy_score: 0 (none) to 1 (high)
Purpose: Validate outputs
Outputs:
quality_score: Overall quality (0 to 1)quality_issues: List of issues foundpasses_quality: Boolean pass/failneeds_retry: Whether retry is needed
All configuration is managed through environment variables. See .env.example for a complete list.
Key Settings:
# AI Provider
GOOGLE_AI_API_KEY=your_key_here
GOOGLE_AI_API_KEY1=backup_key_1 # Optional
GOOGLE_AI_API_KEY2=backup_key_2 # Optional
# Pipeline Behavior
PIPELINE_MAX_RETRIES=3
PIPELINE_TIMEOUT=300
QUALITY_THRESHOLD=0.7
# Performance
MAX_CONCURRENT_REQUESTS=10
BATCH_SIZE=5
CACHE_TTL=3600
# Monitoring
LOG_LEVEL=INFO
ENABLE_METRICS=true
PROMETHEUS_PORT=9090from agentic_ai.utils.config import Config
config = Config(
google_ai_api_keys=["key1", "key2"],
mongodb_uri="mongodb://localhost:27017/synthoraai",
quality_threshold=0.8,
pipeline_max_retries=5
)
pipeline = AgenticPipeline(config)The pipeline collects comprehensive metrics:
# Get current metrics
metrics = pipeline.get_metrics()
print(f"Articles processed: {metrics['counters']['articles_processing_completed']}")
print(f"Average processing time: {metrics['histograms']['article_processing_time_seconds']['avg']}s")
print(f"Error rate: {metrics['counters']['articles_processing_errors']}")Available Metrics:
articles_processing_started- Counterarticles_processing_completed- Counterarticles_processing_failed- Counterarticles_processing_errors- Counterarticle_processing_time_seconds- Histogram
The pipeline uses structlog for structured logging:
import structlog
logger = structlog.get_logger()
# Logs include context
logger.info("processing_started", article_id="123", source="government")Enable LangSmith for detailed tracing:
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=your_langsmith_key
LANGCHAIN_PROJECT=synthoraai-agentic-pipeline# Install test dependencies
pip install pytest pytest-asyncio pytest-cov pytest-mock
# Run all tests
pytest tests/
# Run with coverage
pytest --cov=agentic_ai tests/
# Run specific test file
pytest tests/test_pipeline.pyimport pytest
from agentic_ai import AgenticPipeline
@pytest.mark.asyncio
async def test_article_processing():
pipeline = AgenticPipeline()
result = await pipeline.process_article({
"id": "test-123",
"content": "Test article content...",
"url": "https://example.com/test",
"source": "test"
})
assert result["summary"] is not None
assert len(result["topics"]) > 0
assert result["quality_score"] >= 0.7Based on real-world usage:
- Average processing time: 8-12 seconds per article
- Batch throughput: 100-150 articles/minute (with max_concurrent=10)
- Quality threshold: 85% articles pass on first attempt
- Retry rate: ~15% require retry
- Error rate: <1% failures after retries
- Batch Processing: Use
process_batch()for multiple articles - Concurrent Requests: Adjust
max_concurrentbased on your resources - Caching: Enable Redis caching for repeated content
- API Key Rotation: Use multiple API keys to avoid rate limits
- Quality Threshold: Lower threshold to reduce retries (0.6-0.7 recommended)
Issue: Rate limit errors from Google AI
Solution: Add multiple API keys in environment:
GOOGLE_AI_API_KEY=key1
GOOGLE_AI_API_KEY1=key2
GOOGLE_AI_API_KEY2=key3Issue: MongoDB connection errors
Solution: Verify MongoDB is running and URI is correct:
mongo --eval "db.adminCommand('ping')"Issue: Quality check failures
Solution: Lower quality threshold or check input article quality:
QUALITY_THRESHOLD=0.6Issue: Slow processing times
Solution: Increase concurrent requests:
MAX_CONCURRENT_REQUESTS=20Enable debug logging:
LOG_LEVEL=DEBUGOr programmatically:
import structlog
structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.DEBUG))We welcome contributions! Please see the main CONTRIBUTING.md for guidelines.
# Create virtual environment
python -m venv venv
source venv/bin/activate # or `venv\Scripts\activate` on Windows
# Install dev dependencies
pip install -r requirements.txt
pip install -r requirements-dev.txt
# Install pre-commit hooks
pre-commit install
# Run tests
pytest
# Run linters
black agentic_ai/
ruff check agentic_ai/
mypy agentic_ai/This project is licensed under the MIT License - see the LICENSE file for details.
- Built with LangChain and LangGraph
- Powered by Google Generative AI (Gemini)
- Part of the SynthoraAI project
For questions or support:
- Email: [email protected]
- GitHub Issues: Create an issue
- Documentation: Full project docs
Made with ❤️ by the SynthoraAI Team