Skip to content

29.2 核心组件实现

29.2.1 LLM 客户端

LLM 客户端是编程 Agent 与大语言模型交互的核心组件,负责处理 API 调用、响应解析、错误处理等。

python
### 基础实现

class LLMClient: """LLM 客户端"""

def **init**(self, config: LLMConfig): self.config = config self.api_key = config.api_key self.base_url = config.base_url self.model = config.model self.max_tokens = config.max_tokens self.temperature = config.temperature

# 会话管理

self.session = requests.Session() self.session.headers.update({ 'Authorization': f'Bearer {self.api_key}', 'Content-Type': 'application/json' })

# 缓存

self.cache = LRUCache(maxsize=1000)

# 统计

self.stats = { 'total_requests': 0, 'cache_hits': 0, 'errors': 0 }

async def complete(self, prompt: str, context: List[Dict] = None, **kwargs) -> str: """完成文本生成"""

# 检查缓存

cache_key = self._generate_cache_key(prompt, context, kwargs) cached_response = self.cache.get(cache_key) if cached_response: self.stats['cache_hits'] += 1 return cached_response

# 构建请求

messages = self._build_messages(prompt, context)

# 合并参数

params = { 'model': kwargs.get('model', self.model), 'messages': messages, 'max_tokens': kwargs.get('max_tokens', self.max_tokens), 'temperature': kwargs.get('temperature', self.temperature), **kwargs }

# 发送请求

try: response = await self._send_request(params) self.stats['total_requests'] += 1

# 解析响应

result = self._parse_response(response)

# 缓存结果

self.cache.set(cache_key, result)

return result

python
except Exception as e: self.stats['errors'] += 1 logger.error(f"LLM request failed: {e}") raise

async def _send_request(self, params: Dict) -> Dict: """发送请求""" url = f"{self.base_url}/chat/completions"

# 使用异步请求

async with aiohttp.ClientSession() as session: async with session.post(url, json=params) as response: if response.status != 200: error_text = await response.text() raise Exception(f"API error: {response.status} - {error_text}")

return await response.json()

def _build_messages(self, prompt: str, context: List[Dict] = None) -> List[Dict]: """构建消息列表""" messages = []

# 添加系统提示

if self.config.system_prompt: messages.append({ 'role': 'system', 'content': self.config.system_prompt })

# 添加上下文

if context: messages.extend(context)

# 添加用户提示

messages.append({ 'role': 'user', 'content': prompt })

return messages

python
def _parse_response(self, response: Dict) -> str: """解析响应""" try: return response['choices'][0]['message']['content'] except (KeyError, IndexError) as e: raise Exception(f"Invalid response format: {e}")

def _generate_cache_key(self, prompt: str, context: List[Dict], kwargs: Dict) -> str: """生成缓存键""" key_data = { 'prompt': prompt, 'context': context, 'kwargs': kwargs } return hashlib.md5( json.dumps(key_data, sort_keys=True).encode() ).hexdigest()

def get_stats(self) -> Dict[str, Any]: """获取统计信息""" return self.stats.copy()

### 流式响应支持

    bash


    python

    class StreamingLLMClient(LLMClient):
        """支持流式响应的 LLM 客户端"""

        async def complete_stream(self, prompt: str,
                                context: List[Dict] = None,
                                **kwargs) -> AsyncIterator[str]:
            """流式完成文本生成"""

            # 构建请求
            messages = self._build_messages(prompt, context)

            params = {
                'model': kwargs.get('model', self.model),
                'messages': messages,
                'max_tokens': kwargs.get('max_tokens', self.max_tokens),
                'temperature': kwargs.get('temperature', self.temperature),
                'stream': True,
                **kwargs
            }

            # 发送流式请求
            url = f"{self.base_url}/chat/completions"

            async with aiohttp.ClientSession() as session:
                async with session.post(url, json=params) as response:
                    if response.status != 200:
                        error_text = await response.text()
                        raise Exception(f"API error: {response.status} - {error_text}")

                    # 处理流式响应
                    async for line in response.content:
                        line = line.decode('utf-8').strip()

                        if not line or line == 'data: [DONE]':
                            continue

                        if line.startswith('data: '):
                            data = json.loads(line[6:])
                            try:
                                content = data['choices'][0]['delta']['content']
                                if content:
                                    yield content
                            except (KeyError, IndexError):
                                continue

    ## 29.2.2 工具管理器

    工具管理器负责注册、管理和执行各种工具,是编程 Agent 与外部系统交互的桥梁。

    ### 工具接口

    class Tool(ABC):
    """工具基类"""
    def __init__(self, tool_id: str, name: str, description: str):
    self.id = tool_id
    self.name = name
    self.description = description
    @abstractmethod
    async def execute(self, parameters: Dict[str, Any]) -> ToolResult:
    """执行工具"""
    pass
    @abstractmethod
    def get_schema(self) -> Dict[str, Any]:
    """获取工具的参数模式"""
    pass
    def validate_parameters(self, parameters: Dict[str, Any]) -> bool:
    """验证参数"""
    schema = self.get_schema()
    return self._validate_against_schema(parameters, schema)
    def _validate_against_schema(self, parameters: Dict,
    schema: Dict) -> bool:
    """根据模式验证参数"""
    required = schema.get('required', [])
    properties = schema.get('properties', {})
    # 检查必需参数
    for param in required:
    if param not in parameters:
    return False
    # 检查参数类型
    for key, value in parameters.items():
    if key in properties:
    expected_type = properties[key].get('type')
    if not self._check_type(value, expected_type):
    return False
    return True
    def _check_type(self, value: Any, expected_type: str) -> bool:
    """检查类型"""
    type_map = {
    'string': str,
    'number': (int, float),
    'integer': int,
    'boolean': bool,
    'array': list,
    'object': dict
    }
    expected_python_type = type_map.get(expected_type)
    if expected_python_type is None:
    return True
    return isinstance(value, expected_python_type)

### 工具管理器实现

    bash


    python

    class ToolManager:
        """工具管理器"""

        def __init__(self):
            self.tools: Dict[str, Tool] = {}
            self.tool_categories: Dict[str, List[str]] = {}
            self.execution_history: List[ToolExecution] = []

        def register_tool(self, tool: Tool, category: str = None):
            """注册工具"""
            tool_id = tool.id

            if tool_id in self.tools:
                logger.warning(f"Tool already registered: {tool_id}")
                return

            self.tools[tool_id] = tool

            if category:
                if category not in self.tool_categories:
                    self.tool_categories[category] = []
                self.tool_categories[category].append(tool_id)

            logger.info(f"Tool registered: {tool_id}")

        async def execute_tool(self, tool_id: str,
                               parameters: Dict[str, Any]) -> ToolResult:
            """执行工具"""
            tool = self.tools.get(tool_id)

            if not tool:
                raise ValueError(f"Tool not found: {tool_id}")

            # 验证参数
            if not tool.validate_parameters(parameters):
                raise ValueError("Invalid parameters")

            # 记录执行开始
            execution = ToolExecution(
                tool_id=tool_id,
                parameters=parameters,
                started_at=datetime.utcnow()
            )

            try:
                # 执行工具
                result = await tool.execute(parameters)

                # 记录执行结果
                execution.completed_at = datetime.utcnow()
                execution.success = True
                execution.result = result

                self.execution_history.append(execution)

                return result

            except Exception as e:
                # 记录执行失败
                execution.completed_at = datetime.utcnow()
                execution.success = False
                execution.error = str(e)

                self.execution_history.append(execution)

                logger.error(f"Tool execution failed: {e}")
                raise

        def get_tool(self, tool_id: str) -> Tool:
            """获取工具"""
            return self.tools.get(tool_id)

        def list_tools(self, category: str = None) -> List[Tool]:
            """列出工具"""
            if category:
                tool_ids = self.tool_categories.get(category, [])
                return [self.tools[tid] for tid in tool_ids]
            return list(self.tools.values())

        def get_tool_schema(self, tool_id: str) -> Dict[str, Any]:
            """获取工具模式"""
            tool = self.get_tool(tool_id)
            if tool:
                return tool.get_schema()
            return None

        def get_execution_history(self, tool_id: str = None,
                                limit: int = 100) -> List[ToolExecution]:
            """获取执行历史"""
            history = self.execution_history

            if tool_id:
                history = [e for e in history if e.tool_id == tool_id]

            return history[-limit:]

    ### 示例工具实现

    class FileReadTool(Tool):
    """文件读取工具"""
    def __init__(self):
    super().__init__(
    tool_id="file_read",
    name="File Read",
    description="Read the contents of a file"
    )
    async def execute(self, parameters: Dict[str, Any]) -> ToolResult:
    """执行文件读取"""
    file_path = parameters['file_path']
    try:
    with open(file_path, 'r', encoding='utf-8') as f:
    content = f.read()
    return ToolResult(
    success=True,
    data={
    'content': content,
    'file_path': file_path
    },
    message=f"Successfully read file: {file_path}"
    )
    except Exception as e:
    return ToolResult(
    success=False,
    error=str(e),
    message=f"Failed to read file: {file_path}"
    )
    def get_schema(self) -> Dict[str, Any]:
    """获取参数模式"""
    return {
    'type': 'object',
    'properties': {
    'file_path': {
    'type': 'string',
    'description': 'Path to the file to read'
    }
    },
    'required': ['file_path']
    }
    class CodeExecuteTool(Tool):
    """代码执行工具"""
    def __init__(self):
    super().__init__(
    tool_id="code_execute",
    name="Code Execute",
    description="Execute code and return the output"
    )
    async def execute(self, parameters: Dict[str, Any]) -> ToolResult:
    """执行代码"""
    code = parameters['code']
    language = parameters.get('language', 'python')
    try:
    if language == 'python':
    result = await self._execute_python(code)
    else:
    raise ValueError(f"Unsupported language: {language}")
    return ToolResult(
    success=True,
    data={
    'output': result['output'],
    'error': result.get('error')
    },
    message="Code executed successfully"
    )
    except Exception as e:
    return ToolResult(
    success=False,
    error=str(e),
    message="Code execution failed"
    )
    async def _execute_python(self, code: str) -> Dict[str, Any]:
    """执行 Python 代码"""
    # 使用 subprocess 执行
    process = await asyncio.create_subprocess_exec(
    'python3',
    '-c',
    code,
    stdout=asyncio.subprocess.PIPE,
    stderr=asyncio.subprocess.PIPE
    )
    stdout, stderr = await process.communicate()
    return {
    'output': stdout.decode('utf-8'),
    'error': stderr.decode('utf-8') if stderr else None
    }
    def get_schema(self) -> Dict[str, Any]:
    """获取参数模式"""
    return {
    'type': 'object',
    'properties': {
    'code': {
    'type': 'string',
    'description': 'Code to execute'
    },
    'language': {
    'type': 'string',
    'description': 'Programming language',
    'default': 'python'
    }
    },
    'required': ['code']
    }

## 29.2.3 记忆系统

记忆系统负责存储和管理 Agent 的知识、经验和交互历史。

python
### 记忆系统架构

    bash


    python

    class MemorySystem:
        """记忆系统"""

        def __init__(self, config: MemoryConfig):
            self.config = config

            # 短期记忆(会话级)
            self.short_term_memory: Dict[str, SessionMemory] = {}

            # 长期记忆(持久化)
            self.long_term_memory = LongTermMemory(config.storage_config)

            # 向量存储(语义检索)
            self.vector_store = VectorStore(config.vector_config)

            # 知识图谱
            self.knowledge_graph = KnowledgeGraph(config.graph_config)

        async def store_interaction(self, request: UserRequest,
                                    response: AgentResponse):
            """存储交互"""
            session_id = request.session_id

            # 获取或创建会话记忆
            if session_id not in self.short_term_memory:
                self.short_term_memory[session_id] = SessionMemory(
                    session_id=session_id
                )

            session_memory = self.short_term_memory[session_id]

            # 存储交互
            interaction = Interaction(
                request=request,
                response=response,
                timestamp=datetime.utcnow()
            )

            session_memory.add_interaction(interaction)

            # 提取并存储知识
            await self._extract_and_store_knowledge(interaction)

        async def retrieve_context(self, session_id: str,
                                  query: str = None) -> Context:
            """检索上下文"""
            context = Context()

            # 获取会话记忆
            session_memory = self.short_term_memory.get(session_id)
            if session_memory:
                context.interactions = session_memory.get_recent_interactions(
                    limit=10
                )

            # 语义检索相关记忆
            if query:
                relevant_memories = await self.vector_store.search(
                    query,
                    top_k=5
                )
                context.relevant_memories = relevant_memories

            return context

        async def _extract_and_store_knowledge(self,
                                               interaction: Interaction):
            """提取并存储知识"""
            # 提取关键信息
            knowledge = await self._extract_knowledge(interaction)

            # 存储到向量存储
            for item in knowledge:
                await self.vector_store.add(
                    id=item.id,
                    text=item.text,
                    metadata=item.metadata
                )

                # 更新知识图谱
                await self.knowledge_graph.add_node(
                    id=item.id,
                    type=item.type,
                    properties=item.metadata
                )

        async def _extract_knowledge(self,
                                    interaction: Interaction) -> List[KnowledgeItem]:
            """提取知识"""
            # 使用 LLM 提取知识
            prompt = f"""
            从以下交互中提取关键知识:

            用户请求:{interaction.request.text}
            Agent 响应:{interaction.response.text}

            请提取:

            """

            response = await self.llm_client.complete(prompt)

            # 解析提取的知识
            return self._parse_knowledge(response)

    ### 向量存储实现

    class VectorStore:
    """向量存储"""
    def __init__(self, config: VectorStoreConfig):
    self.config = config
    self.embedding_client = EmbeddingClient(config.embedding_config)
    self.index = None
    self.documents: Dict[str, Document] = {}
    async def add(self, id: str, text: str, metadata: Dict = None):
    """添加文档"""
    # 生成嵌入向量
    embedding = await self.embedding_client.embed(text)
    # 创建文档
    document = Document(
    id=id,
    text=text,
    embedding=embedding,
    metadata=metadata or {}
    )
    self.documents[id] = document
    # 更新索引
    self._update_index()
    async def search(self, query: str, top_k: int = 10) -> List[Document]:
    """搜索文档"""
    # 生成查询向量
    query_embedding = await self.embedding_client.embed(query)
    # 搜索相似文档
    if not self.index:
    return []
    distances, indices = self.index.search(
    np.array([query_embedding]),
    top_k
    )
    # 返回结果
    results = []
    doc_ids = list(self.documents.keys())
    for i, idx in enumerate(indices[0]):
    if idx >= 0 and idx < len(doc_ids):
    doc_id = doc_ids[idx]
    document = self.documents[doc_id]
    document.similarity = 1.0 / (1.0 + distances[0][i])
    results.append(document)
    return results
    def _update_index(self):
    """更新索引"""
    if not self.documents:
    return
    # 构建向量矩阵
    embeddings = np.array([
    doc.embedding for doc in self.documents.values()
    ])
    # 创建 FAISS 索引
    dimension = embeddings.shape[1]
    self.index = faiss.IndexFlatL2(dimension)
    self.index.add(embeddings)

## 29.2.4 任务规划器

任务规划器负责将用户请求分解为可执行的任务序列。

python
    bash


    python

    class TaskPlanner:
        """任务规划器"""

        def __init__(self):
            self.task_templates: Dict[str, TaskTemplate] = {}
            self.planning_strategies: Dict[str, PlanningStrategy] = {}

            # 注册默认策略
            self._register_default_strategies()

        async def plan(self, intent: Intent,
                      context: Dict[str, Any]) -> List[Task]:
            """规划任务"""

            # 选择规划策略
            strategy = self._select_strategy(intent, context)

            # 生成任务
            tasks = await strategy.generate_tasks(intent, context)

            # 优化任务顺序
            tasks = self._optimize_task_order(tasks)

            return tasks

        def _select_strategy(self, intent: Intent,
                            context: Dict) -> PlanningStrategy:
            """选择规划策略"""
            # 根据意图类型选择策略
            if intent.name == "code_generation":
                return self.planning_strategies.get("code_generation")
            elif intent.name == "code_analysis":
                return self.planning_strategies.get("code_analysis")
            elif intent.name == "debugging":
                return self.planning_strategies.get("debugging")
            else:
                return self.planning_strategies.get("default")

        def _optimize_task_order(self, tasks: List[Task]) -> List[Task]:
            """优化任务顺序"""
            # 构建依赖图
            dependency_graph = self._build_dependency_graph(tasks)

            # 拓扑排序
            return self._topological_sort(dependency_graph)

        def _build_dependency_graph(self,
                                    tasks: List[Task]) -> Dict[str, List[str]]:
            """构建依赖图"""
            graph = {}
            for task in tasks:
                graph[task.id] = task.dependencies
            return graph

        def _topological_sort(self,
                             graph: Dict[str, List[str]]) -> List[Task]:
            """拓扑排序"""
            visited = set()
            result = []

            def visit(task_id: str):
                if task_id in visited:
                    return

                visited.add(task_id)

                # 先访问依赖
                for dep_id in graph.get(task_id, []):
                    visit(dep_id)

                result.append(task_id)

            for task_id in graph:
                visit(task_id)

            # 返回按顺序排列的任务
            task_map = {task.id: task for task in self.tasks}
            return [task_map[tid] for tid in result]

        def _register_default_strategies(self):
            """注册默认策略"""
            self.planning_strategies["code_generation"] = CodeGenerationStrategy()
            self.planning_strategies["code_analysis"] = CodeAnalysisStrategy()
            self.planning_strategies["debugging"] = DebuggingStrategy()
            self.planning_strategies["default"] = DefaultStrategy()

通过实现这些核心组件,我们可以构建一个功能完整的编程 Agent 基础框架。

基于 MIT 许可发布 | 永久导航