-
Notifications
You must be signed in to change notification settings - Fork 94
Expand file tree
/
Copy pathstreaming_text.py
More file actions
210 lines (172 loc) · 12.4 KB
/
streaming_text.py
File metadata and controls
210 lines (172 loc) · 12.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
"""
Text Streaming Example
======================
Real-time text streaming with cascade events.
Setup:
pip install cascadeflow[all]
export OPENAI_API_KEY="sk-..."
Run:
python examples/streaming_text.py
What You'll See:
- Tokens appearing in real-time (like ChatGPT)
- Draft decisions (accepted = cheap, rejected = expensive)
- Cost and speed metrics after each response
Documentation:
📖 Streaming Guide: docs/guides/streaming.md#tool-streaming
📖 Quick Start: docs/guides/quickstart.md
📚 Examples README: examples/README.md
"""
import asyncio
import os
from cascadeflow import CascadeAgent, ModelConfig
from cascadeflow.streaming import StreamEventType
async def main():
# ═══════════════════════════════════════════════════════════════════════
# STEP 1: Check API Key
# ═══════════════════════════════════════════════════════════════════════
# Verify the OpenAI API key is set before we start
if not os.getenv("OPENAI_API_KEY"):
print("❌ Set OPENAI_API_KEY first: export OPENAI_API_KEY='sk-...'")
return
print("🌊 cascadeflow Text Streaming\n")
# ═══════════════════════════════════════════════════════════════════════
# STEP 2: Setup Agent with Cascade
# ═══════════════════════════════════════════════════════════════════════
# Create agent with 2 models:
# - Tier 1 (gpt-4o-mini): Fast & cheap, tries first
# - Tier 2 (gpt-4o): Slower & expensive, only if needed
#
# This is called "cascading" - start cheap, escalate if needed
agent = CascadeAgent(
models=[
ModelConfig(
name="gpt-4o-mini", provider="openai", cost=0.00015 # ~$0.15 per 1M tokens
),
ModelConfig(
name="gpt-4o", provider="openai", cost=0.00625 # ~$6.25 per 1M tokens (40x more!)
),
]
)
# ═══════════════════════════════════════════════════════════════════════
# STEP 3: Agent Ready
# ═══════════════════════════════════════════════════════════════════════
# The agent is configured with 2 models for cascading
# Streaming is automatically available
print("✓ Agent ready with 2-tier cascade")
print("✓ Streaming enabled\n")
# ═══════════════════════════════════════════════════════════════════════
# EXAMPLE 1: Simple Query (Usually Stays on Tier 1)
# ═══════════════════════════════════════════════════════════════════════
# Simple questions usually get accepted by the cheap model
# You'll see: draft accepted = verifier skipped = money saved!
print("=" * 60)
print("Example 1: Simple question (fast & cheap)\n")
print("Q: What is Python?\nA: ", end="", flush=True)
# Stream the response with real-time events
async for event in agent.stream("What is Python? Answer in one sentence.", max_tokens=100):
# ───────────────────────────────────────────────────────────────────
# EVENT: CHUNK - New text token arrived
# ───────────────────────────────────────────────────────────────────
# This is fired for each piece of text (could be a word or character)
# Print it immediately for real-time streaming effect
if event.type == StreamEventType.CHUNK:
print(event.content, end="", flush=True)
# ───────────────────────────────────────────────────────────────────
# EVENT: DRAFT_DECISION - Quality check complete
# ───────────────────────────────────────────────────────────────────
# After the draft finishes, cascadeflow checks quality:
# - Accepted = Good enough! Skip expensive model (save money)
# - Rejected = Not good enough, need better model (ensure quality)
elif event.type == StreamEventType.DRAFT_DECISION:
if event.data["accepted"]:
# Draft passed quality check - we're done!
confidence = event.data["confidence"]
print(f"\n✓ Draft accepted ({confidence:.0%} confidence)")
print(" → Verifier skipped (saved money!)")
else:
# Draft failed quality check - escalating to better model
print("\n✗ Draft quality insufficient")
print(" → Cascading to better model...")
# ───────────────────────────────────────────────────────────────────
# EVENT: COMPLETE - Stream finished
# ───────────────────────────────────────────────────────────────────
# All done! Show final statistics (cost, speed, model used)
elif event.type == StreamEventType.COMPLETE:
result = event.data["result"]
print(f"💰 Cost: ${result['total_cost']:.6f}")
print(f"⚡ Speed: {result['latency_ms']:.0f}ms")
print(f"🎯 Model: {result['model_used']}")
# ═══════════════════════════════════════════════════════════════════════
# EXAMPLE 2: Complex Query (Usually Cascades to Tier 2)
# ═══════════════════════════════════════════════════════════════════════
# Complex questions often need the better model
# You'll see: draft rejected → switch to verifier → better quality
print("\n" + "=" * 60)
print("Example 2: Complex question (may cascade)\n")
print("Q: Explain quantum computing and its implications.\nA: ", end="", flush=True)
# Stream with the same event handling
async for event in agent.stream(
"Explain quantum computing and its implications for cryptography.", max_tokens=200
):
# ───────────────────────────────────────────────────────────────────
# CHUNK Event - Print tokens in real-time
# ───────────────────────────────────────────────────────────────────
if event.type == StreamEventType.CHUNK:
print(event.content, end="", flush=True)
# ───────────────────────────────────────────────────────────────────
# DRAFT_DECISION Event - Check if cascading happens
# ───────────────────────────────────────────────────────────────────
# For complex queries, you'll often see the draft get rejected
# This means BOTH models run (costs more but ensures quality)
elif event.type == StreamEventType.DRAFT_DECISION:
if event.data["accepted"]:
confidence = event.data["confidence"]
print(f"\n✓ Draft accepted ({confidence:.0%})")
else:
# Draft rejected - now we'll see the SWITCH event next
confidence = event.data["confidence"]
reason = event.data.get("reason", "quality_insufficient")
print(f"\n✗ Draft rejected ({confidence:.0%}): {reason}")
# ───────────────────────────────────────────────────────────────────
# SWITCH Event - Cascading to better model
# ───────────────────────────────────────────────────────────────────
# This only fires when draft is rejected
# Shows which models are being used (tier 1 → tier 2)
elif event.type == StreamEventType.SWITCH:
from_model = event.data.get("from_model", "tier-1")
to_model = event.data.get("to_model", "tier-2")
print(f"⤴️ Cascading: {from_model} → {to_model}")
print("A: ", end="", flush=True) # Start new answer line
# ───────────────────────────────────────────────────────────────────
# COMPLETE Event - Show final stats
# ───────────────────────────────────────────────────────────────────
elif event.type == StreamEventType.COMPLETE:
result = event.data["result"]
print(f"\n💰 Cost: ${result['total_cost']:.6f}")
print(f"⚡ Speed: {result['latency_ms']:.0f}ms")
print(f"🎯 Model: {result['model_used']}")
# ═══════════════════════════════════════════════════════════════════════
# Summary - What You Learned
# ═══════════════════════════════════════════════════════════════════════
print("\n" + "=" * 60)
print("\n✅ Done! Key takeaways:")
print("\n How Cascading Works:")
print(" ├─ Simple queries → Draft accepted → Cheap & fast")
print(" └─ Complex queries → Draft rejected → Expensive but quality")
print("\n Event Flow:")
print(" ├─ CHUNK: New text arrives (print immediately)")
print(" ├─ DRAFT_DECISION: Quality check (accepted or rejected)")
print(" ├─ SWITCH: Cascading to better model (if rejected)")
print(" └─ COMPLETE: All done (show stats)")
print("\n Cost Optimization:")
print(" ├─ Accepted drafts: ~40x cheaper (gpt-4o-mini only)")
print(" └─ Rejected drafts: Both models used (ensures quality)")
print("\n📚 Learn more: docs/guides/streaming.md\n")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n⚠️ Interrupted by user")
except Exception as e:
print(f"\n\n❌ Error: {e}")
print("💡 Tip: Make sure OPENAI_API_KEY is set correctly")