Skip to content

27.4 记忆系统设计

记忆系统概述

记忆系统是Agentic AI系统的重要组成部分,它使AI能够存储、检索和利用过去的信息,从而实现长期学习和持续改进。

记忆系统的基本概念

1. 什么是记忆系统

记忆系统是指AI系统用于存储、组织和检索信息的机制,类似于人类的记忆功能。

记忆系统的特点 :

  • 持久化存储 : 长期保存重要信息
  • 快速检索 : 高效地检索相关信息
  • 智能组织 : 自动组织和分类信息
  • 上下文感知 : 根据上下文检索信息

2. 记忆类型

记忆类型特点存储时间容量

感知记忆| 短暂存储原始输入| 毫秒到秒| 有限 短期记忆| 存储当前上下文| 秒到分钟| 有限 长期记忆| 存储重要知识和经验| 永久| 无限 工作记忆| 处理当前任务| 任务期间| 有限

记忆存储机制

1. 向量存储

示例:向量存储

用户请求: "实现一个基于向量的记忆存储系统"

Claude Code 生成的代码:

python
    python


    ````python

```python
    from typing import Dict, List, Any, Optional
    import numpy as np
    from datetime import datetime
    import json
    import logging

    logger = logging.getLogger(__name__)

    class Memory:
    """记忆"""

    def __init__(self, id: str, content: str, embedding: np.ndarray, metadata: Optional[Dict[str, Any]] = None):
    self.id = id
    self.content = content
    self.embedding = embedding
    self.metadata = metadata or {}
    self.created_at = datetime.utcnow()
    self.accessed_at = datetime.utcnow()
    self.access_count = 0

    def to_dict(self) -> Dict[str, Any]:
    """转换为字典"""
    return {
    'id': self.id,
    'content': self.content,
    'embedding': self.embedding.tolist(),
    'metadata': self.metadata,
    'created_at': self.created_at.isoformat(),
    'accessed_at': self.accessed_at.isoformat(),
    'access_count': self.access_count
    }

    class VectorMemoryStore:
    """向量记忆存储"""

    def __init__(self, embedding_dim: int = 768):
    self.embedding_dim = embedding_dim
    self.memories: Dict[str, Memory] = {}
    self.index: Dict[str, List[str]] = {}

    def add_memory(self, content: str, embedding: np.ndarray, metadata: Optional[Dict[str, Any]] = None) -> Memory:
    """添加记忆"""
    memory_id = f"mem_{len(self.memories)}"
    memory = Memory(memory_id, content, embedding, metadata)

    self.memories[memory_id] = memory

     # 建立索引
    self._index_memory(memory)

    logger.info(f"Memory added: {memory_id}")
    return memory

    def _index_memory(self, memory: Memory):
    """索引记忆"""
     # 按类型索引
    memory_type = memory.metadata.get('type', 'default')
    if memory_type not in self.index:
    self.index[memory_type] = []
    self.index[memory_type].append(memory.id)

     # 按标签索引
    tags = memory.metadata.get('tags', [])
    for tag in tags:
    if tag not in self.index:
    self.index[tag] = []
    self.index[tag].append(memory.id)

    def retrieve_similar(self, query_embedding: np.ndarray, top_k: int = 5, threshold: float = 0.7) -> List[Memory]:
    """检索相似记忆"""
    if not self.memories:
    return []

     # 计算相似度
    similarities = []
    for memory_id, memory in self.memories.items():
    similarity = self._cosine_similarity(query_embedding, memory.embedding)
    if similarity >= threshold:
    similarities.append((memory_id, similarity))

     # 按相似度排序
    similarities.sort(key=lambda x: x[1], reverse=True)

     # 获取top_k个记忆
    top_memories = []
    for memory_id, similarity in similarities[:top_k]:
    memory = self.memories[memory_id]
    memory.accessed_at = datetime.utcnow()
    memory.access_count += 1
    top_memories.append(memory)

    return top_memories

    def _cosine_similarity(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float:
    """计算余弦相似度"""
    dot_product = np.dot(embedding1, embedding2)
    norm1 = np.linalg.norm(embedding1)
    norm2 = np.linalg.norm(embedding2)

    if norm1 == 0 or norm2 == 0:
    return 0.0

    return dot_product / (norm1 * norm2)

    def retrieve_by_type(self, memory_type: str, limit: int = 10) -> List[Memory]:
    """按类型检索记忆"""
    memory_ids = self.index.get(memory_type, [])
    memories = [self.memories[mid] for mid in memory_ids[:limit]]

    return memories

    def retrieve_by_tag(self, tag: str, limit: int = 10) -> List[Memory]:
    """按标签检索记忆"""
    memory_ids = self.index.get(tag, [])
    memories = [self.memories[mid] for mid in memory_ids[:limit]]

    return memories

    def update_memory(self, memory_id: str, content: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None):
    """更新记忆"""
    memory = self.memories.get(memory_id)
    if not memory:
    raise ValueError(f"Memory not found: {memory_id}")

    if content:
    memory.content = content

    if metadata:
    memory.metadata.update(metadata)

    logger.info(f"Memory updated: {memory_id}")

    def delete_memory(self, memory_id: str):
    """删除记忆"""
    if memory_id in self.memories:
     # 删除索引
    memory = self.memories[memory_id]
    memory_type = memory.metadata.get('type', 'default')
    if memory_type in self.index and memory_id in self.index[memory_type]:
    self.index[memory_type].remove(memory_id)

    tags = memory.metadata.get('tags', [])
    for tag in tags:
    if tag in self.index and memory_id in self.index[tag]:
    self.index[tag].remove(memory_id)

     # 删除记忆
    del self.memories[memory_id]

    logger.info(f"Memory deleted: {memory_id}")

    def get_memory(self, memory_id: str) -> Optional[Memory]:
    """获取记忆"""
    memory = self.memories.get(memory_id)
    if memory:
    memory.accessed_at = datetime.utcnow()
    memory.access_count += 1

    return memory

    def get_statistics(self) -> Dict[str, Any]:
    """获取统计信息"""
    total_memories = len(self.memories)
    total_accesses = sum(m.access_count for m in self.memories.values())

    type_counts = {}
    for memory in self.memories.values():
    memory_type = memory.metadata.get('type', 'default')
    type_counts[memory_type] = type_counts.get(memory_type, 0) + 1

    return {
    'total_memories': total_memories,
    'total_accesses': total_accesses,
    'type_counts': type_counts,
    'index_size': len(self.index)
    }

    # 使用示例
    store = VectorMemoryStore()

    # 添加记忆
    memories = [
    ("The quick brown fox jumps over the lazy dog", np.random.rand(768), {'type': 'fact', 'tags': ['animal', 'action']}),
    ("Python is a high-level programming language", np.random.rand(768), {'type': 'knowledge', 'tags': ['programming', 'language']}),
    ("The capital of France is Paris", np.random.rand(768), {'type': 'fact', 'tags': ['geography', 'city']}),
    ("Machine learning is a subset of AI", np.random.rand(768), {'type': 'knowledge', 'tags': ['AI', 'technology']}),
    ]

    for content, embedding, metadata in memories:
    store.add_memory(content, embedding, metadata)

    # 检索相似记忆
    query_embedding = np.random.rand(768)
    similar_memories = store.retrieve_similar(query_embedding, top_k=3)

    print("Similar memories:")
    for memory in similar_memories:
    print(f"  - {memory.content}")

    # 按类型检索
    fact_memories = store.retrieve_by_type('fact')

    print("\nFact memories:")
    for memory in fact_memories:
    print(f"  - {memory.content}")

    # 获取统计信息
    stats = store.get_statistics()
    print(f"\nStatistics: {stats}")

    ```> **向量存储特点**:

    > - 使用向量表示记忆内容
    > - 基于相似度检索
    > - 支持多种索引方式
    > - 记录访问统计

    ```
    ### 2. 层次化存储

    # 示例:层次化存储
    用户请求:
    "实现一个层次化的记忆存储系统"
    Claude Code 生成的代码:

    ````python

    `python

    from typing import Dict, List, Any, Optional
    from datetime import datetime, timedelta
    import json
    import logging

    logger = logging.getLogger(__name__)

    class MemoryLevel(Enum):
    """记忆层级"""
    WORKING = "working"
    SHORT_TERM = "short_term"
    LONG_TERM = "long_term"

    class HierarchicalMemoryStore:
    """层次化记忆存储"""

    def __init__(self):
    self.working_memory: Dict[str, Memory] = {}
    self.short_term_memory: Dict[str, Memory] = {}
    self.long_term_memory: Dict[str, Memory] = {}

     # 配置

    self.working_memory_limit = 10
    self.short_term_memory_limit = 100
    self.short_term_memory_ttl = timedelta(hours=24)

    def add_memory(self, content: str, embedding: np.ndarray, level: MemoryLevel = MemoryLevel.SHORT_TERM, metadata: Optional[Dict[str, Any]] = None) -> Memory:
    """添加记忆"""
    memory_id = f"mem_{datetime.utcnow().timestamp()}"
    memory = Memory(memory_id, content, embedding, metadata)

     # 根据层级存储

    if level == MemoryLevel.WORKING:
    self._add_to_working_memory(memory)
    elif level == MemoryLevel.SHORT_TERM:
    self._add_to_short_term_memory(memory)
    elif level == MemoryLevel.LONG_TERM:
    self._add_to_long_term_memory(memory)

    logger.info(f"Memory added to {level.value}: {memory_id}")
    return memory

    def _add_to_working_memory(self, memory: Memory):
    """添加到工作记忆"""

     # 检查容量

    if len(self.working_memory) >= self.working_memory_limit:

     # 移除最旧的

    oldest_id = min(self.working_memory.keys(), key=lambda k: self.working_memory[k].created_at)
    self._evict_from_working_memory(oldest_id)

    self.working_memory[memory.id] = memory

    def _add_to_short_term_memory(self, memory: Memory):
    """添加到短期记忆"""

     # 检查容量

    if len(self.short_term_memory) >= self.short_term_memory_limit:

     # 移除最旧的

    oldest_id = min(self.short_term_memory.keys(), key=lambda k: self.short_term_memory[k].created_at)
    self._evict_from_short_term_memory(oldest_id)

    self.short_term_memory[memory.id] = memory

    def _add_to_long_term_memory(self, memory: Memory):
    """添加到长期记忆"""
    self.long_term_memory[memory.id] = memory

    def _evict_from_working_memory(self, memory_id: str):
    """从工作记忆中驱逐"""
    memory = self.working_memory.pop(memory_id)

     # 移动到短期记忆

    self._add_to_short_term_memory(memory)
    logger.info(f"Memory evicted from working memory: {memory_id}")

    def _evict_from_short_term_memory(self, memory_id: str):
    """从短期记忆中驱逐"""
    memory = self.short_term_memory.pop(memory_id)

     # 移动到长期记忆

    self._add_to_long_term_memory(memory)
    logger.info(f"Memory evicted from short-term memory: {memory_id}")

    def retrieve(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Memory]:
    """检索记忆"""

     # 从所有层级检索

    all_memories = {
     **self.working_memory,
     **self.short_term_memory,
     **self.long_term_memory
    }

     # 计算相似度

    similarities = []
    for memory_id, memory in all_memories.items():
    similarity = self._cosine_similarity(query_embedding, memory.embedding)
    similarities.append((memory, similarity))

     # 按相似度排序

    similarities.sort(key=lambda x: x[1], reverse=True)

     # 获取top_k个记忆

    top_memories = [memory for memory, similarity in similarities[:top_k]]

    return top_memories

    def _cosine_similarity(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float:
    """计算余弦相似度"""
    dot_product = np.dot(embedding1, embedding2)
    norm1 = np.linalg.norm(embedding1)
    norm2 = np.linalg.norm(embedding2)

    if norm1 == 0 or norm2 == 0:
    return 0.0

    return dot_product / (norm1 * norm2)

    def promote_memory(self, memory_id: str, target_level: MemoryLevel):
    """提升记忆层级"""
    memory = None
    source_level = None

     # 查找记忆

    if memory_id in self.working_memory:
    memory = self.working_memory.pop(memory_id)
    source_level = MemoryLevel.WORKING
    elif memory_id in self.short_term_memory:
    memory = self.short_term_memory.pop(memory_id)
    source_level = MemoryLevel.SHORT_TERM
    elif memory_id in self.long_term_memory:
    memory = self.long_term_memory.pop(memory_id)
    source_level = MemoryLevel.LONG_TERM

    if not memory:
    raise ValueError(f"Memory not found: {memory_id}")

     # 移动到目标层级

    if target_level == MemoryLevel.WORKING:
    self._add_to_working_memory(memory)
    elif target_level == MemoryLevel.SHORT_TERM:
    self._add_to_short_term_memory(memory)
    elif target_level == MemoryLevel.LONG_TERM:
    self._add_to_long_term_memory(memory)

    logger.info(f"Memory promoted from {source_level.value} to {target_level.value}: {memory_id}")

    def cleanup_expired(self):
    """清理过期记忆"""
    current_time = datetime.utcnow()
    expired_ids = []

     # 检查短期记忆

    for memory_id, memory in self.short_term_memory.items():
    if current_time - memory.created_at > self.short_term_memory_ttl:
    expired_ids.append(memory_id)

     # 移除过期记忆

    for memory_id in expired_ids:
    memory = self.short_term_memory.pop(memory_id)

     # 移动到长期记忆

    self._add_to_long_term_memory(memory)
    logger.info(f"Expired memory moved to long-term: {memory_id}")

    def get_statistics(self) -> Dict[str, Any]:
    """获取统计信息"""
    return {
    'working_memory_size': len(self.working_memory),
    'short_term_memory_size': len(self.short_term_memory),
    'long_term_memory_size': len(self.long_term_memory),
    'total_memories': len(self.working_memory) + len(self.short_term_memory) + len(self.long_term_memory)
    }

    # 使用示例

    store = HierarchicalMemoryStore()

    # 添加记忆到不同层级

    store.add_memory("Current task context", np.random.rand(768), MemoryLevel.WORKING, {'type': 'context'})
    store.add_memory("Recent conversation", np.random.rand(768), MemoryLevel.SHORT_TERM, {'type': 'conversation'})
    store.add_memory("Important knowledge", np.random.rand(768), MemoryLevel.LONG_TERM, {'type': 'knowledge'})

    # 获取统计信息

    stats = store.get_statistics()
    print(f"Statistics: {stats}")

    # 检索记忆

    query_embedding = np.random.rand(768)
    memories = store.retrieve(query_embedding, top_k=3)

    print("\nRetrieved memories:")
    for memory in memories:
    print(f"  - {memory.content}")

    # 提升记忆层级

    if store.short_term_memory:
    memory_id = list(store.short_term_memory.keys())[0]
    store.promote_memory(memory_id, MemoryLevel.LONG_TERM)

    ```> **层次化存储特点**:
    > - 分层存储不同重要性的记忆
    > - 自动驱逐和提升记忆
    > - 限制各层容量
    > - 定期清理过期记忆

    ```

    ## 记忆检索优化

    ### 1. 上下文感知检索

    # 示例:上下文感知检索

    用户请求:
    "实现上下文感知的记忆检索"
    Claude Code 生成的代码:

    ````python
    `python

    from typing import Dict, List, Any, Optional
    from datetime import datetime
    import logging

    logger = logging.getLogger(__name__)

    class ContextAwareMemoryStore(VectorMemoryStore):
    """上下文感知记忆存储"""

    def __init__(self, embedding_dim: int = 768):
    super().__init__(embedding_dim)
    self.context_history: List[Dict[str, Any]] = []
    self.context_window_size = 10

    def add_context(self, context: str, embedding: np.ndarray, metadata: Optional[Dict[str, Any]] = None):
    """添加上下文"""
    context_entry = {
    'content': context,
    'embedding': embedding,
    'metadata': metadata or {},
    'timestamp': datetime.utcnow()
    }

    self.context_history.append(context_entry)

     # 限制上下文历史大小
    if len(self.context_history) > self.context_window_size:
    self.context_history.pop(0)

    logger.info(f"Context added: {context[:50]}...")

    def retrieve_with_context(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Dict[str, Any]]:
    """带上下文的检索"""
     # 获取当前上下文
    current_context = self._get_current_context()

     # 结合上下文和查询
    context_enhanced_embedding = self._combine_context_and_query(current_context, query_embedding)

     # 检索记忆
    memories = self.retrieve_similar(context_enhanced_embedding, top_k)

     # 添加上下文信息
    results = []
    for memory in memories:
    results.append({
    'memory': memory,
    'context_relevance': self._calculate_context_relevance(memory, current_context),
    'query_relevance': self._cosine_similarity(query_embedding, memory.embedding)
    })

     # 按综合相关性排序
    results.sort(key=lambda x: 0.7 * x['query_relevance'] + 0.3 * x['context_relevance'], reverse=True)

    return results

    def _get_current_context(self) -> List[Dict[str, Any]]:
    """获取当前上下文"""
    return self.context_history[-self.context_window_size:]

    def _combine_context_and_query(self, context: List[Dict[str, Any]], query_embedding: np.ndarray) -> np.ndarray:
    """结合上下文和查询"""
    if not context:
    return query_embedding

     # 计算上下文的平均嵌入
    context_embeddings = [c['embedding'] for c in context]
    avg_context_embedding = np.mean(context_embeddings, axis=0)

     # 加权组合
    combined = 0.7 * query_embedding + 0.3 * avg_context_embedding

    return combined

    def _calculate_context_relevance(self, memory: Memory, context: List[Dict[str, Any]]) -> float:
    """计算上下文相关性"""
    if not context:
    return 0.0

     # 计算记忆与上下文的平均相似度
    similarities = []
    for ctx in context:
    similarity = self._cosine_similarity(memory.embedding, ctx['embedding'])
    similarities.append(similarity)

    return np.mean(similarities)

    def retrieve_temporal(self, time_range: timedelta, top_k: int = 10) -> List[Memory]:
    """按时间范围检索"""
    current_time = datetime.utcnow()
    cutoff_time = current_time - time_range

     # 筛选时间范围内的记忆
    recent_memories = [
    memory for memory in self.memories.values()
    if memory.created_at >= cutoff_time
    ]

     # 按创建时间排序
    recent_memories.sort(key=lambda m: m.created_at, reverse=True)

    return recent_memories[:top_k]

    # 使用示例
    store = ContextAwareMemoryStore()

    # 添加上下文
    contexts = [
    ("User is asking about Python programming", np.random.rand(768)),
    ("User wants to learn about data structures", np.random.rand(768)),
    ("User is interested in algorithms", np.random.rand(768)),
    ]

    for content, embedding in contexts:
    store.add_context(content, embedding)

    # 添加记忆
    memories = [
    ("Python lists are mutable sequences", np.random.rand(768), {'type': 'knowledge', 'topic': 'python'}),
    ("Dictionaries are key-value pairs", np.random.rand(768), {'type': 'knowledge', 'topic': 'python'}),
    ("Binary search has O(log n) complexity", np.random.rand(768), {'type': 'knowledge', 'topic': 'algorithms'}),
    ]

    for content, embedding, metadata in memories:
    store.add_memory(content, embedding, metadata)

    # 带上下文的检索
    query_embedding = np.random.rand(768)
    results = store.retrieve_with_context(query_embedding, top_k=3)

    print("Retrieved with context:")
    for result in results:
    print(f"  - {result['memory'].content}")
    print(f"    Query relevance: {result['query_relevance']:.3f}")
    print(f"    Context relevance: {result['context_relevance']:.3f}")

    ```> **上下文感知检索特点**:

    > - 维护上下文历史
    > - 结合上下文和查询
    > - 计算上下文相关性
    > - 按时间范围检索

    ## 总结

    记忆系统设计包括:

    1. **记忆系统的基本概念**: 什么是记忆系统、记忆类型
    2. **记忆存储机制**: 向量存储、层次化存储
    3. **记忆检索优化**: 上下文感知检索

    通过记忆系统,Claude Code能够存储和检索重要信息,实现长期学习和持续改进。

    在下一节中,我们将探讨多智能体协作。

基于 MIT 许可发布 | 永久导航