-
Notifications
You must be signed in to change notification settings - Fork 76
Expand file tree
/
Copy pathagent_memory.py
More file actions
659 lines (508 loc) · 22.9 KB
/
agent_memory.py
File metadata and controls
659 lines (508 loc) · 22.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
"""
Unified Agent Memory Demo
This example demonstrates the new unified AgentMemory interface that provides
a simple, consistent API for all agent memory scenarios:
- Multi-Agent: Multiple agents with collaboration
- Multi-User: Single agent with multiple users
- Hybrid: Dynamic switching between modes
- Auto: Intelligent mode detection
The interface automatically handles the complexity of the underlying implementations
while providing a clean, easy-to-use API.
"""
import os
import sys
from typing import Dict, Any
from dotenv import load_dotenv
# Add src to path for imports
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
from powermem.agent import AgentMemory
from powermem import auto_config
def load_oceanbase_config():
"""
Load OceanBase configuration from environment variables.
Uses the auto_config() utility function to automatically load from .env.
"""
oceanbase_env_path = os.path.join(os.path.dirname(__file__), '..', '.env')
if os.path.exists(oceanbase_env_path):
load_dotenv(oceanbase_env_path, override=True)
else:
# Try to load from any .env file
load_dotenv()
# Automatically load config from environment variables
config = auto_config()
# Add review_intervals if not present in intelligent_memory config
if 'intelligent_memory' in config and 'review_intervals' not in config['intelligent_memory']:
config['intelligent_memory']['review_intervals'] = [1, 6, 24, 72, 168] # hours
return config
def demonstrate_auto_mode():
"""Demonstrate automatic mode detection."""
print("🤖 Auto Mode Detection Demo")
print("=" * 50)
config = load_oceanbase_config()
# Create AgentMemory with auto mode detection
agent_memory = AgentMemory(config, mode='auto')
print(f"✅ Detected mode: {agent_memory.get_mode()}")
# Add some memories
print("\n📝 Adding memories...")
agent_memory.add(
"User prefers email support over phone calls",
user_id="customer_123",
agent_id="support_agent",
metadata={"priority": "high", "category": "communication_preference"}
)
agent_memory.add(
"Customer budget is $1000/month for enterprise solutions",
user_id="customer_123",
agent_id="sales_agent",
metadata={"budget": 1000, "category": "budget_info"}
)
print("✅ Memories added successfully!")
# Search memories
print("\n🔍 Searching memories...")
results = agent_memory.search("customer preferences", user_id="customer_123")
print(f"Found {len(results)} memories")
# Get statistics
stats = agent_memory.get_statistics()
print(f"\n📊 Statistics: {stats}")
def demonstrate_multi_agent_mode():
"""Demonstrate multi-agent mode."""
print("\n🤖 Multi-Agent Mode Demo")
print("=" * 50)
config = load_oceanbase_config()
# Create AgentMemory in multi-agent mode
agent_memory = AgentMemory(config, mode='multi_agent')
print(f"✅ Mode: {agent_memory.get_mode()}")
# Create agents
support_agent = agent_memory.create_agent("support_agent", "Customer Support")
sales_agent = agent_memory.create_agent("sales_agent", "Sales Agent")
print("✅ Agents created successfully!")
# Add memories for each agent
print("\n📝 Adding agent-specific memories...")
support_agent.add(
"Customer reported slow response times last week",
user_id="customer_123",
metadata={"issue_type": "performance", "category": "complaint"}
)
sales_agent.add(
"Customer decision maker is CTO, prefers technical demos",
user_id="customer_123",
metadata={"decision_maker": "CTO", "category": "stakeholder_info"}
)
print("✅ Agent memories added successfully!")
# Create a group
group_result = agent_memory.create_group(
"customer_team",
["support_agent", "sales_agent"],
permissions={
"owner": ["read", "write", "delete", "admin"],
"collaborator": ["read", "write"],
"viewer": ["read"]
}
)
print(f"✅ Group created: {group_result}")
# Search across agents
print("\n🔍 Cross-agent search...")
support_results = support_agent.search("customer issues", user_id="customer_123")
sales_results = sales_agent.search("customer decision", user_id="customer_123")
print(f"Support agent found {len(support_results)} memories")
print(f"Sales agent found {len(sales_results)} memories")
def demonstrate_multi_user_mode():
"""Demonstrate multi-user mode."""
print("\n👥 Multi-User Mode Demo")
print("=" * 50)
config = load_oceanbase_config()
# Create AgentMemory in multi-user mode
agent_memory = AgentMemory(config, mode='multi_user')
print(f"✅ Mode: {agent_memory.get_mode()}")
# Add memories for different users
print("\n📝 Adding user-specific memories...")
agent_memory.add(
"User Alice likes Python and machine learning",
user_id="alice",
metadata={"interests": ["Python", "ML"], "category": "preferences"}
)
agent_memory.add(
"User Bob prefers Java and enterprise solutions",
user_id="bob",
metadata={"interests": ["Java", "Enterprise"], "category": "preferences"}
)
agent_memory.add(
"User Charlie is interested in DevOps and automation",
user_id="charlie",
metadata={"interests": ["DevOps", "Automation"], "category": "preferences"}
)
print("✅ User memories added successfully!")
# Search for each user
print("\n🔍 User-specific searches...")
alice_results = agent_memory.search("Python", user_id="alice")
bob_results = agent_memory.search("Java", user_id="bob")
charlie_results = agent_memory.search("DevOps", user_id="charlie")
print(f"Alice found {len(alice_results)} memories")
print(f"Bob found {len(bob_results)} memories")
print(f"Charlie found {len(charlie_results)} memories")
# Get all memories
all_memories = agent_memory.get_all()
print(f"\n📊 Total memories across all users: {len(all_memories)}")
def demonstrate_hybrid_mode():
"""Demonstrate hybrid mode with dynamic switching."""
print("\n🔄 Hybrid Mode Demo")
print("=" * 50)
config = load_oceanbase_config()
# Create AgentMemory in hybrid mode
agent_memory = AgentMemory(config, mode='hybrid')
print(f"✅ Mode: {agent_memory.get_mode()}")
# Add memories (will automatically detect context)
print("\n📝 Adding memories with automatic context detection...")
# This might be detected as multi-agent context
agent_memory.add(
"Support agent handled customer complaint about slow response",
user_id="customer_123",
agent_id="support_agent",
metadata={"context": "multi_agent", "category": "support"}
)
# This might be detected as multi-user context
agent_memory.add(
"User Alice requested new features for the mobile app",
user_id="alice",
metadata={"context": "multi_user", "category": "feature_request"}
)
print("✅ Hybrid memories added successfully!")
# Get statistics
stats = agent_memory.get_statistics()
print(f"\n📊 Hybrid statistics: {stats}")
def demonstrate_ebbinghaus_algorithm():
"""Demonstrate Ebbinghaus forgetting curve algorithm in detail."""
print("\n📈 Ebbinghaus Forgetting Curve Algorithm Demo")
print("=" * 50)
config = load_oceanbase_config()
agent_memory = AgentMemory(config, mode='auto')
print("🧠 Testing Ebbinghaus algorithm with different content types...")
# Test different types of content to see how importance is evaluated
test_cases = [
{
"content": "URGENT: System down, all users affected, immediate action required",
"metadata": {"urgency": "critical", "impact": "high", "category": "incident"},
"expected_type": "long_term"
},
{
"content": "User mentioned they prefer dark mode in the application",
"metadata": {"preference": "ui", "category": "user_preference"},
"expected_type": "short_term"
},
{
"content": "Customer said hello and asked about the weather",
"metadata": {"category": "casual_conversation"},
"expected_type": "working"
}
]
results = []
for i, test_case in enumerate(test_cases):
print(f"\n📝 Test Case {i+1}: {test_case['expected_type']} expected")
print(f" Content: {test_case['content']}")
result = agent_memory.add(
test_case['content'],
user_id="test_user",
agent_id="test_agent",
metadata=test_case['metadata']
)
results.append(result)
# Extract intelligence data
intelligence = result.get('metadata', {}).get('intelligence', {})
if intelligence:
importance = intelligence.get('importance_score', 0)
memory_type = intelligence.get('memory_type', 'unknown')
retention = intelligence.get('initial_retention', 0)
decay_rate = intelligence.get('decay_rate', 0)
print(f" 🎯 Importance Score: {importance:.3f}")
print(f" 🧠 Memory Type: {memory_type}")
print(f" 📊 Initial Retention: {retention:.3f}")
print(f" ⚡ Decay Rate: {decay_rate:.3f}")
print(f" ✅ Type Match: {'✓' if memory_type == test_case['expected_type'] else '✗'}")
# Show review schedules
print("\n📅 Review Schedules Comparison:")
print("-" * 40)
for i, result in enumerate(results):
intelligence = result.get('metadata', {}).get('intelligence', {})
memory_type = intelligence.get('memory_type', 'unknown')
review_schedule = intelligence.get('review_schedule', [])
print(f"\n{memory_type.title()} Memory:")
print(f" Next review: {intelligence.get('next_review', 'N/A')}")
print(f" Review schedule: {len(review_schedule)} reviews planned")
if review_schedule:
print(" Review times:")
for j, review_time in enumerate(review_schedule[:3]): # Show first 3 reviews
print(f" {j+1}. {review_time}")
if len(review_schedule) > 3:
print(f" ... and {len(review_schedule) - 3} more reviews")
print("\n✅ Ebbinghaus algorithm demonstration completed!")
def demonstrate_intelligent_memory():
"""Demonstrate intelligent memory management with metadata processing."""
print("\n🧠 Intelligent Memory Management Demo")
print("=" * 50)
config = load_oceanbase_config()
# Create AgentMemory with intelligent memory enabled
agent_memory = AgentMemory(config, mode='auto')
print(f"✅ Intelligent memory enabled: {config['intelligent_memory']['enabled']}")
# Add memories with different importance levels
print("\n📝 Adding memories with different importance levels...")
# High importance memory
high_importance_result = agent_memory.add(
"Customer reported critical security vulnerability in production system",
user_id="customer_123",
agent_id="security_agent",
metadata={"priority": "critical", "category": "security", "severity": "high"}
)
# Medium importance memory
medium_importance_result = agent_memory.add(
"User prefers email notifications over SMS",
user_id="customer_123",
agent_id="preference_agent",
metadata={"priority": "medium", "category": "preference", "type": "notification"}
)
# Low importance memory
low_importance_result = agent_memory.add(
"User mentioned they like the color blue in UI",
user_id="customer_123",
agent_id="ui_agent",
metadata={"priority": "low", "category": "ui_preference", "color": "blue"}
)
print("✅ Memories added successfully!")
# Display intelligence metadata
print("\n🔍 Intelligence Analysis Results:")
print("-" * 40)
memories = [high_importance_result, medium_importance_result, low_importance_result]
descriptions = ["High Importance", "Medium Importance", "Low Importance"]
for i, (memory, desc) in enumerate(zip(memories, descriptions)):
print(f"\n{desc} Memory:")
print(f" Content: {memory.get('content', 'N/A')[:50]}...")
# Extract intelligence metadata
metadata = memory.get('metadata', {})
intelligence = metadata.get('intelligence', {})
if intelligence:
print(f" 🎯 Importance Score: {intelligence.get('importance_score', 'N/A')}")
print(f" 🧠 Memory Type: {intelligence.get('memory_type', 'N/A')}")
print(f" 📊 Initial Retention: {intelligence.get('initial_retention', 'N/A')}")
print(f" ⏰ Next Review: {intelligence.get('next_review', 'N/A')}")
print(f" 📅 Review Schedule: {len(intelligence.get('review_schedule', []))} reviews planned")
# Memory management flags
memory_mgmt = metadata.get('memory_management', {})
print(f" 🔄 Should Promote: {memory_mgmt.get('should_promote', False)}")
print(f" 🗑️ Should Forget: {memory_mgmt.get('should_forget', False)}")
print(f" 📦 Should Archive: {memory_mgmt.get('should_archive', False)}")
else:
print(" ⚠️ No intelligence metadata found")
# Search and show how intelligence affects results
print("\n🔍 Intelligent Search Results:")
print("-" * 40)
search_results = agent_memory.search("customer preferences", user_id="customer_123")
print(f"Found {len(search_results)} memories for 'customer preferences'")
for i, result in enumerate(search_results):
intelligence = result.get('metadata', {}).get('intelligence', {})
importance = intelligence.get('importance_score', 0)
memory_type = intelligence.get('memory_type', 'unknown')
print(f" {i+1}. [{memory_type}] Importance: {importance:.2f} - {result.get('content', '')[:60]}...")
# Show statistics
stats = agent_memory.get_statistics()
print(f"\n📊 Memory Statistics: {stats}")
def demonstrate_unified_api():
"""Demonstrate the unified API across all modes."""
print("\n🎯 Unified API Demo")
print("=" * 50)
config = load_oceanbase_config()
# Test the same API across different modes
modes = ['auto', 'multi_agent', 'multi_user', 'hybrid']
for mode in modes:
print(f"\n📋 Testing {mode} mode...")
try:
# Create agent memory
agent_memory = AgentMemory(config, mode=mode)
# Test basic operations
agent_memory.add(
f"Test memory for {mode} mode",
user_id="test_user",
agent_id="test_agent",
metadata={"mode": mode, "test": True}
)
# Test search
results = agent_memory.search("test", user_id="test_user")
# Test statistics
stats = agent_memory.get_statistics()
print(f" ✅ {mode}: Added memory, found {len(results)} results, stats: {len(stats)} fields")
except Exception as e:
print(f" ❌ {mode}: Error - {e}")
def demonstrate_delete_all():
"""Demonstrate delete_all functionality."""
print("\n🗑️ Delete All Memories Demo")
print("=" * 50)
config = load_oceanbase_config()
agent_memory = AgentMemory(config, mode='multi_user')
print("📝 Adding memories for different users...")
# Add memories for Alice
agent_memory.add(
"Alice likes Python programming",
user_id="alice",
metadata={"category": "preference"}
)
agent_memory.add(
"Alice prefers email notifications",
user_id="alice",
metadata={"category": "preference"}
)
# Add memories for Bob
agent_memory.add(
"Bob prefers Java programming",
user_id="bob",
metadata={"category": "preference"}
)
print("✅ Memories added successfully!")
# Check memories before deletion
print("\n🔍 Checking memories before deletion...")
alice_memories = agent_memory.get_all(user_id="alice")
bob_memories = agent_memory.get_all(user_id="bob")
all_memories = agent_memory.get_all()
print(f" Alice has {len(alice_memories)} memories")
print(f" Bob has {len(bob_memories)} memories")
print(f" Total memories: {len(all_memories)}")
# Delete all memories for Alice
print("\n🗑️ Testing delete_all for user 'alice'...")
try:
deleted_alice = agent_memory.delete_all(user_id="alice")
print(f" ✅ Delete all for Alice: {'Success' if deleted_alice else 'Failed'}")
except Exception as e:
print(f" ❌ Error during delete_all: {e}")
import traceback
traceback.print_exc()
# Verify deletion
print("\n🔍 Verifying deletion...")
alice_memories_after = agent_memory.get_all(user_id="alice")
bob_memories_after = agent_memory.get_all(user_id="bob")
all_memories_after = agent_memory.get_all()
print(f" Alice has {len(alice_memories_after)} memories (deleted: {len(alice_memories) - len(alice_memories_after)})")
print(f" Bob has {len(bob_memories_after)} memories (unchanged: {len(bob_memories) == len(bob_memories_after)})")
print(f" Total memories: {len(all_memories_after)} (reduced by {len(all_memories) - len(all_memories_after)})")
# Test delete_all with agent_id
print("\n🗑️ Testing delete_all with agent_id...")
agent_memory.add(
"Test memory with agent_id",
user_id="test_user",
agent_id="test_agent",
metadata={"test": True}
)
memories_before = agent_memory.get_all(agent_id="test_agent")
print(f" Memories with test_agent before: {len(memories_before)}")
try:
deleted_agent = agent_memory.delete_all(agent_id="test_agent")
print(f" ✅ Delete all for test_agent: {'Success' if deleted_agent else 'Failed'}")
except Exception as e:
print(f" ❌ Error during delete_all with agent_id: {e}")
memories_after = agent_memory.get_all(agent_id="test_agent")
print(f" Memories with test_agent after: {len(memories_after)}")
print("\n✅ Delete all demonstration completed!")
def demonstrate_reset():
"""Demonstrate reset functionality."""
print("\n🔄 Reset Memory Store Demo")
print("=" * 50)
config = load_oceanbase_config()
agent_memory = AgentMemory(config, mode='auto')
print("📝 Adding memories before reset...")
# Add various memories
agent_memory.add(
"Important memory: User prefers dark mode",
user_id="user1",
agent_id="agent1",
metadata={"priority": "high"}
)
agent_memory.add(
"Another memory: Customer budget is $5000",
user_id="user2",
agent_id="agent2",
metadata={"priority": "medium"}
)
agent_memory.add(
"Third memory: System configuration details",
user_id="user3",
agent_id="agent3",
metadata={"priority": "low"}
)
print("✅ Memories added successfully!")
# Check statistics before reset
print("\n📊 Statistics before reset:")
stats_before = agent_memory.get_statistics()
all_memories_before = agent_memory.get_all()
print(f" Total memories: {len(all_memories_before)}")
print(f" Statistics: {stats_before}")
# Perform reset
print("\n🔄 Resetting memory store...")
try:
agent_memory.reset()
print(" ✅ Reset completed successfully!")
except Exception as e:
print(f" ❌ Error during reset: {e}")
import traceback
traceback.print_exc()
return
# Verify reset
print("\n🔍 Verifying reset...")
all_memories_after = agent_memory.get_all()
stats_after = agent_memory.get_statistics()
print(f" Total memories after reset: {len(all_memories_after)}")
print(f" Statistics after reset: {stats_after}")
if len(all_memories_after) == 0:
print(" ✅ Reset successful - all memories cleared!")
else:
print(f" ⚠️ Warning: {len(all_memories_after)} memories still exist after reset")
# Add new memory after reset to verify system still works
print("\n📝 Adding new memory after reset...")
try:
new_memory = agent_memory.add(
"New memory after reset",
user_id="new_user",
metadata={"test": "after_reset"}
)
print(f" ✅ New memory added successfully: {new_memory.get('id', 'N/A')}")
# Verify new memory can be retrieved
search_results = agent_memory.search("new memory", user_id="new_user")
print(f" ✅ Found {len(search_results)} memories after reset")
except Exception as e:
print(f" ❌ Error adding memory after reset: {e}")
print("\n✅ Reset demonstration completed!")
def main():
"""Main function to run the unified agent memory demo."""
print("🚀 Unified Agent Memory Management Demo")
print("=" * 60)
print("Database: OceanBase")
print("LLM: Qwen")
print("Embedding: Qwen text-embedding-v4")
print("=" * 60)
try:
# Demonstrate different modes
demonstrate_auto_mode()
demonstrate_multi_agent_mode()
demonstrate_multi_user_mode()
demonstrate_hybrid_mode()
demonstrate_intelligent_memory()
demonstrate_ebbinghaus_algorithm()
demonstrate_unified_api()
demonstrate_delete_all()
demonstrate_reset()
print("\n🎉 Unified Agent Memory Demo Completed Successfully!")
print("=" * 60)
print("✅ All features demonstrated:")
print(" • Automatic mode detection")
print(" • Multi-agent collaboration")
print(" • Multi-user isolation")
print(" • Hybrid dynamic switching")
print(" • Intelligent memory management with Ebbinghaus algorithm")
print(" • Metadata-based intelligence processing")
print(" • Detailed Ebbinghaus forgetting curve demonstration")
print(" • Review schedule generation and decay calculation")
print(" • Unified API across all modes")
print(" • Delete all memories functionality")
print(" • Reset memory store functionality")
print(" • Simple, consistent interface")
except Exception as e:
print(f"❌ Error during demo: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()