Real-time content ingestion and processing for breaking news and live data streams
The Realtime Stream Processor is a high-performance, event-driven microservice that complements SynthoraAI's existing batch-oriented article curation system with real-time capabilities. Built with Go for optimal performance and low latency, it enables:
- Event-Driven Content Updates: Instant processing of breaking news and live data streams
- Low-Latency Pipelines: Sub-second processing for urgent government announcements
- Live Feed Endpoints: WebSocket and GraphQL APIs for real-time content delivery
- Scalable Architecture: Horizontal scaling with Kafka/Redis Streams
- Architecture
- Features
- Technology Stack
- Quick Start
- Installation
- Configuration
- API Documentation
- Deployment
- Monitoring
- Integration with SynthoraAI
- Performance
- Contributing
- License
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Real-Time Data Sources β
β RSS Feeds β News APIs β WebSockets β Government Alerts β etc. β
βββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Stream Ingestion Layer β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β RSS Poller β β API Watcher β β WS Listener β β
β ββββββββ¬ββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ β
β ββββββββββββββββββββ΄βββββββββββββββββββ β
ββββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Message Queue (Kafka/Redis) β
β Topics: raw-articles, processed-articles, urgent-alerts β
ββββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Stream Processing Pipeline β
β βββββββββββββ ββββββββββββ βββββββββββββ ββββββββββββ β
β β Validator ββ β Enricher ββ β AI Summaryββ β Publisherβ β
β βββββββββββββ ββββββββββββ βββββββββββββ ββββββββββββ β
ββββββββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Output Layer β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β WebSocket β β GraphQL β β MongoDB β β
β β Server β β Live API β β Storage β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SynthoraAI Frontend/Clients β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
- Multi-Source Ingestion: Concurrent polling of RSS feeds, REST APIs, WebSocket streams, and government alert systems
- Intelligent Routing: Priority-based routing for breaking news vs. standard content
- AI-Powered Processing: Real-time summarization using Google Generative AI with streaming responses
- Deduplication: Content-based deduplication using MinHash and Locality-Sensitive Hashing
- Event Streaming: Kafka and Redis Streams support for scalable message processing
- Live Updates: WebSocket server for pushing real-time updates to connected clients
- GraphQL Subscriptions: Live feed queries with filtering and pagination
- Circuit Breaker: Resilient external API calls with automatic failover
- Rate Limiting: Token bucket algorithm for API protection
- Metrics & Tracing: Prometheus metrics and distributed tracing
- Breaking News Detection: Automatically identifies and prioritizes urgent content
- Sentiment Analysis: Real-time sentiment scoring for government announcements
- Topic Classification: Instant categorization into 15+ topic areas
- Duplicate Detection: Sub-second deduplication across millions of articles
- Live Dashboard: Real-time metrics and content flow visualization
| Component | Technology | Purpose |
|---|---|---|
| Core Language | Go 1.21+ | High-performance stream processing |
| Message Queue | Kafka 3.6+ / Redis 7.0+ | Event streaming and buffering |
| Database | MongoDB 6.0+ | Document storage |
| Cache | Redis 7.0+ | Hot data caching and rate limiting |
| WebSocket | Gorilla WebSocket | Real-time client connections |
| GraphQL | gqlgen | Live feed API |
| AI/NLP | Google Generative AI | Content summarization |
| Monitoring | Prometheus + Grafana | Metrics and dashboards |
| Tracing | Jaeger | Distributed tracing |
| Container | Docker + Docker Compose | Deployment and orchestration |
| CI/CD | GitHub Actions | Automated testing and deployment |
- Go: 1.21 or later
- Docker: 20.10+ and Docker Compose
- MongoDB: 6.0+ (or use Docker)
- Redis: 7.0+ (or use Docker)
- Kafka: 3.6+ (optional, or use Docker)
# Clone the repository
git clone https://github.com/SynthoraAI-AI-News-Content-Curator/Realtime-Stream-Processor.git
cd Realtime-Stream-Processor
# Start all services with Docker Compose
docker-compose up -d
# Verify services are running
docker-compose ps
# View logs
docker-compose logs -f stream-processorThe services will be available at:
- Stream Processor API: http://localhost:8080
- WebSocket Server: ws://localhost:8080/ws
- GraphQL Playground: http://localhost:8080/graphql
- Prometheus Metrics: http://localhost:8080/metrics
- Kafka UI: http://localhost:9000 (if using Kafka)
- Redis Commander: http://localhost:8081
# Install Go dependencies
cd cmd/stream-processor
go mod download
# Set up environment variables
cp .env.example .env
# Edit .env with your configuration
# Run the stream processor
go run main.go
# Or build and run
go build -o stream-processor main.go
./stream-processor# Build Docker image
docker build -t synthoraai/stream-processor:latest .
# Run with Docker
docker run -d \
--name stream-processor \
-p 8080:8080 \
-e REDIS_URL=redis://redis:6379 \
-e MONGODB_URI=mongodb://mongo:27017 \
synthoraai/stream-processor:latestCreate a .env file in the root directory:
# Service Configuration
SERVICE_NAME=synthoraai-stream-processor
SERVICE_PORT=8080
LOG_LEVEL=info
ENVIRONMENT=production
# Message Queue (choose one)
MESSAGE_BROKER=redis # or 'kafka'
# Redis Configuration
REDIS_URL=redis://localhost:6379
REDIS_PASSWORD=
REDIS_DB=0
REDIS_STREAM_NAME=synthoraai-articles
# Kafka Configuration (if using Kafka)
KAFKA_BROKERS=localhost:9092,localhost:9093,localhost:9094
KAFKA_TOPIC_RAW=raw-articles
KAFKA_TOPIC_PROCESSED=processed-articles
KAFKA_TOPIC_URGENT=urgent-alerts
KAFKA_GROUP_ID=synthoraai-processors
# MongoDB Configuration
MONGODB_URI=mongodb://localhost:27017
MONGODB_DATABASE=synthoraai
MONGODB_COLLECTION=articles
# Google AI Configuration
GOOGLE_AI_API_KEY=your_google_ai_api_key_here
GOOGLE_AI_MODEL=gemini-1.5-pro
AI_MAX_TOKENS=500
AI_TEMPERATURE=0.3
# WebSocket Configuration
WS_MAX_CONNECTIONS=10000
WS_READ_BUFFER_SIZE=1024
WS_WRITE_BUFFER_SIZE=1024
WS_PING_INTERVAL=30s
# Rate Limiting
RATE_LIMIT_REQUESTS=100
RATE_LIMIT_WINDOW=1m
# Circuit Breaker
CIRCUIT_BREAKER_THRESHOLD=5
CIRCUIT_BREAKER_TIMEOUT=30s
# Data Sources
RSS_FEEDS=https://www.state.gov/rss-feed/press-releases/feed/,https://www.whitehouse.gov/feed/
API_ENDPOINTS=https://newsapi.org/v2/top-headlines
POLL_INTERVAL=30s
# Monitoring
PROMETHEUS_ENABLED=true
PROMETHEUS_PORT=9090
JAEGER_ENABLED=true
JAEGER_ENDPOINT=http://localhost:14268/api/traces
# Performance Tuning
WORKER_POOL_SIZE=20
BATCH_SIZE=100
PROCESSING_TIMEOUT=10s
MAX_RETRY_ATTEMPTS=3Alternatively, use config.yaml:
service:
name: synthoraai-stream-processor
port: 8080
environment: production
broker:
type: redis # or kafka
redis:
url: redis://localhost:6379
password: ""
db: 0
stream: synthoraai-articles
kafka:
brokers:
- localhost:9092
topics:
raw: raw-articles
processed: processed-articles
urgent: urgent-alerts
group_id: synthoraai-processors
database:
mongodb:
uri: mongodb://localhost:27017
database: synthoraai
collection: articles
ai:
provider: google
api_key: ${GOOGLE_AI_API_KEY}
model: gemini-1.5-pro
max_tokens: 500
temperature: 0.3
websocket:
max_connections: 10000
ping_interval: 30s
sources:
rss:
- url: https://www.state.gov/rss-feed/press-releases/feed/
poll_interval: 30s
- url: https://www.whitehouse.gov/feed/
poll_interval: 30s
apis:
- url: https://newsapi.org/v2/top-headlines
poll_interval: 60s
headers:
X-Api-Key: ${NEWS_API_KEY}GET /healthResponse:
{
"status": "healthy",
"timestamp": "2025-11-16T20:00:00Z",
"uptime": "72h15m30s",
"version": "1.0.0"
}POST /api/v1/stream
Content-Type: application/json
{
"title": "Breaking: New Government Policy Announced",
"content": "Full article content...",
"source": "whitehouse.gov",
"url": "https://www.whitehouse.gov/...",
"priority": "high",
"metadata": {
"author": "Press Secretary",
"published_at": "2025-11-16T19:00:00Z"
}
}Response:
{
"id": "67890abc-def1-2345-6789-0abcdef12345",
"status": "processing",
"estimated_time": "2s"
}Connect to ws://localhost:8080/ws to receive real-time updates.
const ws = new WebSocket('ws://localhost:8080/ws');
ws.onopen = () => {
// Subscribe to specific topics
ws.send(JSON.stringify({
action: 'subscribe',
topics: ['urgent-alerts', 'government', 'politics']
}));
};
ws.onmessage = (event) => {
const article = JSON.parse(event.data);
console.log('New article:', article);
};{
"type": "article",
"priority": "high",
"data": {
"id": "67890abc-def1-2345-6789-0abcdef12345",
"title": "Breaking News Title",
"summary": "AI-generated summary...",
"content": "Full content...",
"source": "whitehouse.gov",
"url": "https://...",
"topics": ["government", "policy"],
"sentiment": {
"score": 0.65,
"label": "positive"
},
"published_at": "2025-11-16T19:00:00Z",
"processed_at": "2025-11-16T19:00:02Z"
}
}Access the GraphQL Playground at http://localhost:8080/graphql.
query GetLiveArticles($limit: Int, $topics: [String!]) {
liveArticles(limit: $limit, topics: $topics) {
id
title
summary
content
source
url
topics
sentiment {
score
label
}
publishedAt
processedAt
}
}subscription OnNewArticle($topics: [String!]) {
newArticle(topics: $topics) {
id
title
summary
priority
topics
publishedAt
}
}query {
liveArticles(limit: 10, topics: ["government", "urgent"]) {
id
title
summary
source
priority
topics
}
}Prometheus metrics are exposed at /metrics:
curl http://localhost:8080/metricsKey metrics:
articles_processed_total: Total articles processedarticles_processing_duration_seconds: Processing time histogramwebsocket_connections_active: Active WebSocket connectionskafka_messages_consumed_total: Kafka messages consumedredis_operations_total: Redis operations countai_api_calls_total: AI API callsai_api_errors_total: AI API errors
# Production deployment
docker-compose -f docker-compose.prod.yml up -d
# Scaling workers
docker-compose -f docker-compose.prod.yml up -d --scale stream-processor=5# Deploy to Kubernetes
kubectl apply -f k8s/
# Scale deployment
kubectl scale deployment stream-processor --replicas=5
# Check status
kubectl get pods -l app=stream-processor# Build and push to ECR
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin <account>.dkr.ecr.us-east-1.amazonaws.com
docker build -t synthoraai/stream-processor:latest .
docker tag synthoraai/stream-processor:latest <account>.dkr.ecr.us-east-1.amazonaws.com/stream-processor:latest
docker push <account>.dkr.ecr.us-east-1.amazonaws.com/stream-processor:latest
# Deploy with ECS/EKS
terraform apply -var-file=aws.tfvars# Build and push to ACR
az acr build --registry synthoraai --image stream-processor:latest .
# Deploy to AKS
az aks get-credentials --resource-group synthoraai --name synthoraai-cluster
kubectl apply -f k8s/# Build and push to GCR
gcloud builds submit --tag gcr.io/synthoraai/stream-processor:latest .
# Deploy to GKE
gcloud container clusters get-credentials synthoraai-cluster --region us-central1
kubectl apply -f k8s/# Start monitoring stack
docker-compose -f docker-compose.monitoring.yml up -d
# Access Grafana
open http://localhost:3000
# Default credentials: admin/admin
# Import dashboard
# Use dashboard ID: 12345 (custom SynthoraAI dashboard)- Throughput: Articles processed per second
- Latency: P50, P95, P99 processing times
- Error Rate: Failed processing attempts
- Queue Depth: Kafka/Redis backlog
- AI API: Response time and error rate
- WebSocket: Active connections and message rate
Configure alerts in config/alerts.yml:
alerts:
- name: HighProcessingLatency
condition: processing_duration_p95 > 5s
severity: warning
- name: AIAPIFailure
condition: ai_api_error_rate > 10%
severity: critical
- name: QueueBacklog
condition: kafka_lag > 10000
severity: warningThe stream processor seamlessly integrates with the existing SynthoraAI backend:
// In your Next.js API route: pages/api/stream/connect.ts
import { WebSocket } from 'ws';
export default async function handler(req, res) {
// Connect to stream processor
const ws = new WebSocket('ws://stream-processor:8080/ws');
ws.on('message', async (data) => {
const article = JSON.parse(data);
// Save to MongoDB (existing backend logic)
await saveArticle(article);
// Trigger newsletter if urgent
if (article.priority === 'high') {
await sendUrgentNewsletter(article);
}
});
}Real-time updates in your Next.js frontend:
// components/LiveFeed.tsx
import { useEffect, useState } from 'react';
import { useWebSocket } from '@/hooks/useWebSocket';
export function LiveFeed() {
const [articles, setArticles] = useState([]);
const { messages } = useWebSocket('ws://localhost:8080/ws');
useEffect(() => {
if (messages.length > 0) {
const newArticle = messages[messages.length - 1];
setArticles(prev => [newArticle, ...prev]);
}
}, [messages]);
return (
<div className="live-feed">
<h2>Live Breaking News</h2>
{articles.map(article => (
<ArticleCard key={article.id} article={article} isLive={true} />
))}
</div>
);
}// lib/graphql/client.ts
import { ApolloClient, InMemoryCache, split, HttpLink } from '@apollo/client';
import { GraphQLWsLink } from '@apollo/client/link/subscriptions';
import { getMainDefinition } from '@apollo/client/utilities';
import { createClient } from 'graphql-ws';
const httpLink = new HttpLink({
uri: 'http://localhost:8080/graphql',
});
const wsLink = new GraphQLWsLink(createClient({
url: 'ws://localhost:8080/graphql',
}));
const splitLink = split(
({ query }) => {
const definition = getMainDefinition(query);
return (
definition.kind === 'OperationDefinition' &&
definition.operation === 'subscription'
);
},
wsLink,
httpLink,
);
export const client = new ApolloClient({
link: splitLink,
cache: new InMemoryCache(),
});Tested on: AWS EC2 t3.xlarge (4 vCPU, 16GB RAM)
| Metric | Value |
|---|---|
| Throughput | 5,000 articles/sec |
| Latency (P50) | 120ms |
| Latency (P95) | 350ms |
| Latency (P99) | 800ms |
| WebSocket Connections | 50,000+ concurrent |
| Memory Usage | ~500MB (steady state) |
| CPU Usage | ~60% (under load) |
- Increase Worker Pool: Adjust
WORKER_POOL_SIZEbased on CPU cores - Tune Batch Size: Larger batches for higher throughput, smaller for lower latency
- Redis vs Kafka: Use Redis for <100K msg/sec, Kafka for higher volumes
- Connection Pooling: Configure MongoDB and Redis connection pools
- Caching: Enable content-based caching for duplicate detection
# Run unit tests
go test ./... -v
# Run integration tests
go test ./... -tags=integration -v
# Run with coverage
go test ./... -coverprofile=coverage.out
go tool cover -html=coverage.out
# Load testing with k6
k6 run tests/load/stream-processor.js
# Stress testing
k6 run --vus 1000 --duration 5m tests/stress/websocket.jsWe welcome contributions! Please see CONTRIBUTING.md for guidelines.
- Fork the repository
- Create a feature branch:
git checkout -b feature/amazing-feature - Commit changes:
git commit -m 'feat: add amazing feature' - Push to branch:
git push origin feature/amazing-feature - Open a Pull Request
- Follow Effective Go guidelines
- Use
gofmtfor formatting - Run
golangci-lintbefore committing - Write tests for new features
This project is licensed under the MIT License - see the LICENSE file for details.
- SynthoraAI Team: For the amazing base platform
- Go Community: For excellent libraries and tools
- Kafka & Redis: For robust streaming infrastructure
- Google AI: For powerful summarization capabilities
- Project Maintainer: David Nguyen
- Email: [email protected]
- Website: https://sonnguyenhoang.com
- GitHub: https://github.com/hoangsonww/Realtime-Stream-Processor
Built with β€οΈ for SynthoraAI - Synthesizing the world's news through AI
π Back to Top