27.4 记忆系统设计
记忆系统概述
记忆系统是Agentic AI系统的重要组成部分,它使AI能够存储、检索和利用过去的信息,从而实现长期学习和持续改进。
记忆系统的基本概念
1. 什么是记忆系统
记忆系统是指AI系统用于存储、组织和检索信息的机制,类似于人类的记忆功能。
记忆系统的特点 :
- 持久化存储 : 长期保存重要信息
- 快速检索 : 高效地检索相关信息
- 智能组织 : 自动组织和分类信息
- 上下文感知 : 根据上下文检索信息
2. 记忆类型
| 记忆类型 | 特点 | 存储时间 | 容量 |
|---|
感知记忆| 短暂存储原始输入| 毫秒到秒| 有限 短期记忆| 存储当前上下文| 秒到分钟| 有限 长期记忆| 存储重要知识和经验| 永久| 无限 工作记忆| 处理当前任务| 任务期间| 有限
记忆存储机制
1. 向量存储
示例:向量存储
用户请求: "实现一个基于向量的记忆存储系统"
Claude Code 生成的代码:
python
python
````python
```python
from typing import Dict, List, Any, Optional
import numpy as np
from datetime import datetime
import json
import logging
logger = logging.getLogger(__name__)
class Memory:
"""记忆"""
def __init__(self, id: str, content: str, embedding: np.ndarray, metadata: Optional[Dict[str, Any]] = None):
self.id = id
self.content = content
self.embedding = embedding
self.metadata = metadata or {}
self.created_at = datetime.utcnow()
self.accessed_at = datetime.utcnow()
self.access_count = 0
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
'id': self.id,
'content': self.content,
'embedding': self.embedding.tolist(),
'metadata': self.metadata,
'created_at': self.created_at.isoformat(),
'accessed_at': self.accessed_at.isoformat(),
'access_count': self.access_count
}
class VectorMemoryStore:
"""向量记忆存储"""
def __init__(self, embedding_dim: int = 768):
self.embedding_dim = embedding_dim
self.memories: Dict[str, Memory] = {}
self.index: Dict[str, List[str]] = {}
def add_memory(self, content: str, embedding: np.ndarray, metadata: Optional[Dict[str, Any]] = None) -> Memory:
"""添加记忆"""
memory_id = f"mem_{len(self.memories)}"
memory = Memory(memory_id, content, embedding, metadata)
self.memories[memory_id] = memory
# 建立索引
self._index_memory(memory)
logger.info(f"Memory added: {memory_id}")
return memory
def _index_memory(self, memory: Memory):
"""索引记忆"""
# 按类型索引
memory_type = memory.metadata.get('type', 'default')
if memory_type not in self.index:
self.index[memory_type] = []
self.index[memory_type].append(memory.id)
# 按标签索引
tags = memory.metadata.get('tags', [])
for tag in tags:
if tag not in self.index:
self.index[tag] = []
self.index[tag].append(memory.id)
def retrieve_similar(self, query_embedding: np.ndarray, top_k: int = 5, threshold: float = 0.7) -> List[Memory]:
"""检索相似记忆"""
if not self.memories:
return []
# 计算相似度
similarities = []
for memory_id, memory in self.memories.items():
similarity = self._cosine_similarity(query_embedding, memory.embedding)
if similarity >= threshold:
similarities.append((memory_id, similarity))
# 按相似度排序
similarities.sort(key=lambda x: x[1], reverse=True)
# 获取top_k个记忆
top_memories = []
for memory_id, similarity in similarities[:top_k]:
memory = self.memories[memory_id]
memory.accessed_at = datetime.utcnow()
memory.access_count += 1
top_memories.append(memory)
return top_memories
def _cosine_similarity(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float:
"""计算余弦相似度"""
dot_product = np.dot(embedding1, embedding2)
norm1 = np.linalg.norm(embedding1)
norm2 = np.linalg.norm(embedding2)
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
def retrieve_by_type(self, memory_type: str, limit: int = 10) -> List[Memory]:
"""按类型检索记忆"""
memory_ids = self.index.get(memory_type, [])
memories = [self.memories[mid] for mid in memory_ids[:limit]]
return memories
def retrieve_by_tag(self, tag: str, limit: int = 10) -> List[Memory]:
"""按标签检索记忆"""
memory_ids = self.index.get(tag, [])
memories = [self.memories[mid] for mid in memory_ids[:limit]]
return memories
def update_memory(self, memory_id: str, content: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None):
"""更新记忆"""
memory = self.memories.get(memory_id)
if not memory:
raise ValueError(f"Memory not found: {memory_id}")
if content:
memory.content = content
if metadata:
memory.metadata.update(metadata)
logger.info(f"Memory updated: {memory_id}")
def delete_memory(self, memory_id: str):
"""删除记忆"""
if memory_id in self.memories:
# 删除索引
memory = self.memories[memory_id]
memory_type = memory.metadata.get('type', 'default')
if memory_type in self.index and memory_id in self.index[memory_type]:
self.index[memory_type].remove(memory_id)
tags = memory.metadata.get('tags', [])
for tag in tags:
if tag in self.index and memory_id in self.index[tag]:
self.index[tag].remove(memory_id)
# 删除记忆
del self.memories[memory_id]
logger.info(f"Memory deleted: {memory_id}")
def get_memory(self, memory_id: str) -> Optional[Memory]:
"""获取记忆"""
memory = self.memories.get(memory_id)
if memory:
memory.accessed_at = datetime.utcnow()
memory.access_count += 1
return memory
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
total_memories = len(self.memories)
total_accesses = sum(m.access_count for m in self.memories.values())
type_counts = {}
for memory in self.memories.values():
memory_type = memory.metadata.get('type', 'default')
type_counts[memory_type] = type_counts.get(memory_type, 0) + 1
return {
'total_memories': total_memories,
'total_accesses': total_accesses,
'type_counts': type_counts,
'index_size': len(self.index)
}
# 使用示例
store = VectorMemoryStore()
# 添加记忆
memories = [
("The quick brown fox jumps over the lazy dog", np.random.rand(768), {'type': 'fact', 'tags': ['animal', 'action']}),
("Python is a high-level programming language", np.random.rand(768), {'type': 'knowledge', 'tags': ['programming', 'language']}),
("The capital of France is Paris", np.random.rand(768), {'type': 'fact', 'tags': ['geography', 'city']}),
("Machine learning is a subset of AI", np.random.rand(768), {'type': 'knowledge', 'tags': ['AI', 'technology']}),
]
for content, embedding, metadata in memories:
store.add_memory(content, embedding, metadata)
# 检索相似记忆
query_embedding = np.random.rand(768)
similar_memories = store.retrieve_similar(query_embedding, top_k=3)
print("Similar memories:")
for memory in similar_memories:
print(f" - {memory.content}")
# 按类型检索
fact_memories = store.retrieve_by_type('fact')
print("\nFact memories:")
for memory in fact_memories:
print(f" - {memory.content}")
# 获取统计信息
stats = store.get_statistics()
print(f"\nStatistics: {stats}")
```> **向量存储特点**:
> - 使用向量表示记忆内容
> - 基于相似度检索
> - 支持多种索引方式
> - 记录访问统计
```
### 2. 层次化存储
# 示例:层次化存储
用户请求:
"实现一个层次化的记忆存储系统"
Claude Code 生成的代码:
````python
`python
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import json
import logging
logger = logging.getLogger(__name__)
class MemoryLevel(Enum):
"""记忆层级"""
WORKING = "working"
SHORT_TERM = "short_term"
LONG_TERM = "long_term"
class HierarchicalMemoryStore:
"""层次化记忆存储"""
def __init__(self):
self.working_memory: Dict[str, Memory] = {}
self.short_term_memory: Dict[str, Memory] = {}
self.long_term_memory: Dict[str, Memory] = {}
# 配置
self.working_memory_limit = 10
self.short_term_memory_limit = 100
self.short_term_memory_ttl = timedelta(hours=24)
def add_memory(self, content: str, embedding: np.ndarray, level: MemoryLevel = MemoryLevel.SHORT_TERM, metadata: Optional[Dict[str, Any]] = None) -> Memory:
"""添加记忆"""
memory_id = f"mem_{datetime.utcnow().timestamp()}"
memory = Memory(memory_id, content, embedding, metadata)
# 根据层级存储
if level == MemoryLevel.WORKING:
self._add_to_working_memory(memory)
elif level == MemoryLevel.SHORT_TERM:
self._add_to_short_term_memory(memory)
elif level == MemoryLevel.LONG_TERM:
self._add_to_long_term_memory(memory)
logger.info(f"Memory added to {level.value}: {memory_id}")
return memory
def _add_to_working_memory(self, memory: Memory):
"""添加到工作记忆"""
# 检查容量
if len(self.working_memory) >= self.working_memory_limit:
# 移除最旧的
oldest_id = min(self.working_memory.keys(), key=lambda k: self.working_memory[k].created_at)
self._evict_from_working_memory(oldest_id)
self.working_memory[memory.id] = memory
def _add_to_short_term_memory(self, memory: Memory):
"""添加到短期记忆"""
# 检查容量
if len(self.short_term_memory) >= self.short_term_memory_limit:
# 移除最旧的
oldest_id = min(self.short_term_memory.keys(), key=lambda k: self.short_term_memory[k].created_at)
self._evict_from_short_term_memory(oldest_id)
self.short_term_memory[memory.id] = memory
def _add_to_long_term_memory(self, memory: Memory):
"""添加到长期记忆"""
self.long_term_memory[memory.id] = memory
def _evict_from_working_memory(self, memory_id: str):
"""从工作记忆中驱逐"""
memory = self.working_memory.pop(memory_id)
# 移动到短期记忆
self._add_to_short_term_memory(memory)
logger.info(f"Memory evicted from working memory: {memory_id}")
def _evict_from_short_term_memory(self, memory_id: str):
"""从短期记忆中驱逐"""
memory = self.short_term_memory.pop(memory_id)
# 移动到长期记忆
self._add_to_long_term_memory(memory)
logger.info(f"Memory evicted from short-term memory: {memory_id}")
def retrieve(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Memory]:
"""检索记忆"""
# 从所有层级检索
all_memories = {
**self.working_memory,
**self.short_term_memory,
**self.long_term_memory
}
# 计算相似度
similarities = []
for memory_id, memory in all_memories.items():
similarity = self._cosine_similarity(query_embedding, memory.embedding)
similarities.append((memory, similarity))
# 按相似度排序
similarities.sort(key=lambda x: x[1], reverse=True)
# 获取top_k个记忆
top_memories = [memory for memory, similarity in similarities[:top_k]]
return top_memories
def _cosine_similarity(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float:
"""计算余弦相似度"""
dot_product = np.dot(embedding1, embedding2)
norm1 = np.linalg.norm(embedding1)
norm2 = np.linalg.norm(embedding2)
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
def promote_memory(self, memory_id: str, target_level: MemoryLevel):
"""提升记忆层级"""
memory = None
source_level = None
# 查找记忆
if memory_id in self.working_memory:
memory = self.working_memory.pop(memory_id)
source_level = MemoryLevel.WORKING
elif memory_id in self.short_term_memory:
memory = self.short_term_memory.pop(memory_id)
source_level = MemoryLevel.SHORT_TERM
elif memory_id in self.long_term_memory:
memory = self.long_term_memory.pop(memory_id)
source_level = MemoryLevel.LONG_TERM
if not memory:
raise ValueError(f"Memory not found: {memory_id}")
# 移动到目标层级
if target_level == MemoryLevel.WORKING:
self._add_to_working_memory(memory)
elif target_level == MemoryLevel.SHORT_TERM:
self._add_to_short_term_memory(memory)
elif target_level == MemoryLevel.LONG_TERM:
self._add_to_long_term_memory(memory)
logger.info(f"Memory promoted from {source_level.value} to {target_level.value}: {memory_id}")
def cleanup_expired(self):
"""清理过期记忆"""
current_time = datetime.utcnow()
expired_ids = []
# 检查短期记忆
for memory_id, memory in self.short_term_memory.items():
if current_time - memory.created_at > self.short_term_memory_ttl:
expired_ids.append(memory_id)
# 移除过期记忆
for memory_id in expired_ids:
memory = self.short_term_memory.pop(memory_id)
# 移动到长期记忆
self._add_to_long_term_memory(memory)
logger.info(f"Expired memory moved to long-term: {memory_id}")
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
'working_memory_size': len(self.working_memory),
'short_term_memory_size': len(self.short_term_memory),
'long_term_memory_size': len(self.long_term_memory),
'total_memories': len(self.working_memory) + len(self.short_term_memory) + len(self.long_term_memory)
}
# 使用示例
store = HierarchicalMemoryStore()
# 添加记忆到不同层级
store.add_memory("Current task context", np.random.rand(768), MemoryLevel.WORKING, {'type': 'context'})
store.add_memory("Recent conversation", np.random.rand(768), MemoryLevel.SHORT_TERM, {'type': 'conversation'})
store.add_memory("Important knowledge", np.random.rand(768), MemoryLevel.LONG_TERM, {'type': 'knowledge'})
# 获取统计信息
stats = store.get_statistics()
print(f"Statistics: {stats}")
# 检索记忆
query_embedding = np.random.rand(768)
memories = store.retrieve(query_embedding, top_k=3)
print("\nRetrieved memories:")
for memory in memories:
print(f" - {memory.content}")
# 提升记忆层级
if store.short_term_memory:
memory_id = list(store.short_term_memory.keys())[0]
store.promote_memory(memory_id, MemoryLevel.LONG_TERM)
```> **层次化存储特点**:
> - 分层存储不同重要性的记忆
> - 自动驱逐和提升记忆
> - 限制各层容量
> - 定期清理过期记忆
```
## 记忆检索优化
### 1. 上下文感知检索
# 示例:上下文感知检索
用户请求:
"实现上下文感知的记忆检索"
Claude Code 生成的代码:
````python
`python
from typing import Dict, List, Any, Optional
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class ContextAwareMemoryStore(VectorMemoryStore):
"""上下文感知记忆存储"""
def __init__(self, embedding_dim: int = 768):
super().__init__(embedding_dim)
self.context_history: List[Dict[str, Any]] = []
self.context_window_size = 10
def add_context(self, context: str, embedding: np.ndarray, metadata: Optional[Dict[str, Any]] = None):
"""添加上下文"""
context_entry = {
'content': context,
'embedding': embedding,
'metadata': metadata or {},
'timestamp': datetime.utcnow()
}
self.context_history.append(context_entry)
# 限制上下文历史大小
if len(self.context_history) > self.context_window_size:
self.context_history.pop(0)
logger.info(f"Context added: {context[:50]}...")
def retrieve_with_context(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Dict[str, Any]]:
"""带上下文的检索"""
# 获取当前上下文
current_context = self._get_current_context()
# 结合上下文和查询
context_enhanced_embedding = self._combine_context_and_query(current_context, query_embedding)
# 检索记忆
memories = self.retrieve_similar(context_enhanced_embedding, top_k)
# 添加上下文信息
results = []
for memory in memories:
results.append({
'memory': memory,
'context_relevance': self._calculate_context_relevance(memory, current_context),
'query_relevance': self._cosine_similarity(query_embedding, memory.embedding)
})
# 按综合相关性排序
results.sort(key=lambda x: 0.7 * x['query_relevance'] + 0.3 * x['context_relevance'], reverse=True)
return results
def _get_current_context(self) -> List[Dict[str, Any]]:
"""获取当前上下文"""
return self.context_history[-self.context_window_size:]
def _combine_context_and_query(self, context: List[Dict[str, Any]], query_embedding: np.ndarray) -> np.ndarray:
"""结合上下文和查询"""
if not context:
return query_embedding
# 计算上下文的平均嵌入
context_embeddings = [c['embedding'] for c in context]
avg_context_embedding = np.mean(context_embeddings, axis=0)
# 加权组合
combined = 0.7 * query_embedding + 0.3 * avg_context_embedding
return combined
def _calculate_context_relevance(self, memory: Memory, context: List[Dict[str, Any]]) -> float:
"""计算上下文相关性"""
if not context:
return 0.0
# 计算记忆与上下文的平均相似度
similarities = []
for ctx in context:
similarity = self._cosine_similarity(memory.embedding, ctx['embedding'])
similarities.append(similarity)
return np.mean(similarities)
def retrieve_temporal(self, time_range: timedelta, top_k: int = 10) -> List[Memory]:
"""按时间范围检索"""
current_time = datetime.utcnow()
cutoff_time = current_time - time_range
# 筛选时间范围内的记忆
recent_memories = [
memory for memory in self.memories.values()
if memory.created_at >= cutoff_time
]
# 按创建时间排序
recent_memories.sort(key=lambda m: m.created_at, reverse=True)
return recent_memories[:top_k]
# 使用示例
store = ContextAwareMemoryStore()
# 添加上下文
contexts = [
("User is asking about Python programming", np.random.rand(768)),
("User wants to learn about data structures", np.random.rand(768)),
("User is interested in algorithms", np.random.rand(768)),
]
for content, embedding in contexts:
store.add_context(content, embedding)
# 添加记忆
memories = [
("Python lists are mutable sequences", np.random.rand(768), {'type': 'knowledge', 'topic': 'python'}),
("Dictionaries are key-value pairs", np.random.rand(768), {'type': 'knowledge', 'topic': 'python'}),
("Binary search has O(log n) complexity", np.random.rand(768), {'type': 'knowledge', 'topic': 'algorithms'}),
]
for content, embedding, metadata in memories:
store.add_memory(content, embedding, metadata)
# 带上下文的检索
query_embedding = np.random.rand(768)
results = store.retrieve_with_context(query_embedding, top_k=3)
print("Retrieved with context:")
for result in results:
print(f" - {result['memory'].content}")
print(f" Query relevance: {result['query_relevance']:.3f}")
print(f" Context relevance: {result['context_relevance']:.3f}")
```> **上下文感知检索特点**:
> - 维护上下文历史
> - 结合上下文和查询
> - 计算上下文相关性
> - 按时间范围检索
## 总结
记忆系统设计包括:
1. **记忆系统的基本概念**: 什么是记忆系统、记忆类型
2. **记忆存储机制**: 向量存储、层次化存储
3. **记忆检索优化**: 上下文感知检索
通过记忆系统,Claude Code能够存储和检索重要信息,实现长期学习和持续改进。
在下一节中,我们将探讨多智能体协作。