Skip to content

27.5 多智能体协作

多智能体协作概述

多智能体协作是指多个AI智能体协同工作,共同完成复杂任务的能力。通过协作,智能体可以共享信息、分工合作、互相补充,从而提高整体效率和效果。

多智能体协作的基本概念

1. 什么是多智能体协作

多智能体协作是指多个AI智能体通过通信、协调和合作,共同完成单个智能体难以完成的复杂任务。

多智能体协作的特点 :

  • 分布式决策 : 每个智能体独立决策
  • 信息共享 : 智能体之间共享信息
  • 任务分工 : 智能体分工合作
  • 协同优化 : 整体优化而非局部优化

2. 协作模式

协作模式特点适用场景

主从模式| 一个主智能体协调多个从智能体| 层次化任务 平等模式| 智能体地位平等,共同决策| 协作任务 竞争模式| 智能体竞争完成任务| 优化问题 混合模式| 结合多种模式| 复杂场景

智能体通信机制

1. 消息传递

示例:消息传递机制

用户请求: "实现智能体之间的消息传递机制"

Claude Code 生成的代码:

python
    python


    ````python

```python
    from typing import Dict, List, Any, Optional, Callable
    from datetime import datetime
    from enum import Enum
    import logging
    import asyncio
    from dataclasses import dataclass, field

    logger = logging.getLogger(__name__)

    class MessageType(Enum):
    """消息类型"""
    REQUEST = "request"
    RESPONSE = "response"
    NOTIFICATION = "notification"
    BROADCAST = "broadcast"

    @dataclass
    class Message:
    """消息"""
    id: str
    type: MessageType
    sender: str
    receiver: str
    content: Dict[str, Any]
    timestamp: datetime = field(default_factory=datetime.utcnow)
    reply_to: Optional[str] = None
    metadata: Dict[str, Any] = field(default_factory=dict)

    class MessageBus:
    """消息总线"""

    def __init__(self):
    self.agents: Dict[str, 'Agent'] = {}
    self.message_queue: asyncio.Queue = asyncio.Queue()
    self.message_handlers: Dict[str, Callable] = {}
    self.running = False

    def register_agent(self, agent: 'Agent'):
    """注册智能体"""
    self.agents[agent.id] = agent
    logger.info(f"Agent registered: {agent.id}")

    def unregister_agent(self, agent_id: str):
    """注销智能体"""
    if agent_id in self.agents:
    del self.agents[agent_id]
    logger.info(f"Agent unregistered: {agent_id}")

    async def send_message(self, message: Message):
    """发送消息"""
    receiver = self.agents.get(message.receiver)

    if not receiver:
    logger.warning(f"Receiver not found: {message.receiver}")
    return

     # 添加到消息队列
    await self.message_queue.put(message)
    logger.info(f"Message sent from {message.sender} to {message.receiver}")

    async def broadcast_message(self, message: Message):
    """广播消息"""
    for agent_id, agent in self.agents.items():
    if agent_id != message.sender:
    broadcast_msg = Message(
    id=f"{message.id}_to_{agent_id}",
    type=message.type,
    sender=message.sender,
    receiver=agent_id,
    content=message.content,
    timestamp=message.timestamp,
    metadata=message.metadata
    )
    await self.send_message(broadcast_msg)

    logger.info(f"Message broadcasted from {message.sender}")

    def register_handler(self, message_type: str, handler: Callable):
    """注册消息处理器"""
    self.message_handlers[message_type] = handler
    logger.info(f"Handler registered for message type: {message_type}")

    async def start(self):
    """启动消息总线"""
    self.running = True
    logger.info("Message bus started")

     # 启动消息处理循环
    asyncio.create_task(self._process_messages())

    async def stop(self):
    """停止消息总线"""
    self.running = False
    logger.info("Message bus stopped")

    async def _process_messages(self):
    """处理消息"""
    while self.running:
    try:
     # 获取消息
    message = await asyncio.wait_for(
    self.message_queue.get(),
    timeout=1.0
    )

     # 传递给接收者
    receiver = self.agents.get(message.receiver)
    if receiver:
    await receiver.receive_message(message)

    except asyncio.TimeoutError:
    continue
    except Exception as e:
    logger.error(f"Error processing message: {e}")

    class Agent:
    """智能体基类"""

    def __init__(self, agent_id: str, message_bus: MessageBus):
    self.id = agent_id
    self.message_bus = message_bus
    self.message_handlers: Dict[str, Callable] = {}
    self.knowledge_base: Dict[str, Any] = {}

    async def receive_message(self, message: Message):
    """接收消息"""
    logger.info(f"Agent {self.id} received message from {message.sender}")

     # 查找处理器
    handler = self.message_handlers.get(message.type.value)

    if handler:
     # 调用处理器
    response = await handler(message)

     # 如果是请求消息,发送响应
    if message.type == MessageType.REQUEST:
    response_message = Message(
    id=f"response_{message.id}",
    type=MessageType.RESPONSE,
    sender=self.id,
    receiver=message.sender,
    content=response,
    reply_to=message.id
    )
    await self.message_bus.send_message(response_message)
    else:
    logger.warning(f"No handler for message type: {message.type.value}")

    def register_handler(self, message_type: str, handler: Callable):
    """注册消息处理器"""
    self.message_handlers[message_type] = handler
    logger.info(f"Agent {self.id} registered handler for {message_type}")

    async def send_request(self, receiver_id: str, content: Dict[str, Any]) -> Message:
    """发送请求"""
    message = Message(
    id=f"req_{datetime.utcnow().timestamp()}",
    type=MessageType.REQUEST,
    sender=self.id,
    receiver=receiver_id,
    content=content
    )

    await self.message_bus.send_message(message)
    return message

    async def send_response(self, original_message: Message, content: Dict[str, Any]):
    """发送响应"""
    message = Message(
    id=f"resp_{datetime.utcnow().timestamp()}",
    type=MessageType.RESPONSE,
    sender=self.id,
    receiver=original_message.sender,
    content=content,
    reply_to=original_message.id
    )

    await self.message_bus.send_message(message)

    async def send_notification(self, receiver_id: str, content: Dict[str, Any]):
    """发送通知"""
    message = Message(
    id=f"notif_{datetime.utcnow().timestamp()}",
    type=MessageType.NOTIFICATION,
    sender=self.id,
    receiver=receiver_id,
    content=content
    )

    await self.message_bus.send_message(message)

    async def broadcast(self, content: Dict[str, Any]):
    """广播消息"""
    message = Message(
    id=f"broadcast_{datetime.utcnow().timestamp()}",
    type=MessageType.BROADCAST,
    sender=self.id,
    receiver="all",
    content=content
    )

    await self.message_bus.broadcast_message(message)

    def update_knowledge(self, key: str, value: Any):
    """更新知识库"""
    self.knowledge_base[key] = value
    logger.info(f"Agent {self.id} updated knowledge: {key}")

    def get_knowledge(self, key: str) -> Optional[Any]:
    """获取知识"""
    return self.knowledge_base.get(key)

    # 使用示例
    async def main():
    """主函数"""
     # 创建消息总线
    message_bus = MessageBus()
    await message_bus.start()

     # 创建智能体
    agent1 = Agent("agent1", message_bus)
    agent2 = Agent("agent2", message_bus)
    agent3 = Agent("agent3", message_bus)

     # 注册智能体
    message_bus.register_agent(agent1)
    message_bus.register_agent(agent2)
    message_bus.register_agent(agent3)

     # 注册消息处理器
    async def handle_request(message: Message) -> Dict[str, Any]:
    """处理请求"""
    print(f"Agent {agent1.id} handling request from {message.sender}")
    return {'status': 'success', 'data': 'processed'}

    agent1.register_handler('request', handle_request)

    async def handle_notification(message: Message) -> Dict[str, Any]:
    """处理通知"""
    print(f"Agent {agent2.id} received notification from {message.sender}")
    return {'status': 'acknowledged'}

    agent2.register_handler('notification', handle_notification)

     # 发送消息
    await agent1.send_request('agent2', {'task': 'process_data'})
    await agent2.send_notification('agent3', {'event': 'update'})
    await agent3.broadcast({'announcement': 'system_ready'})

     # 等待消息处理
    await asyncio.sleep(1)

     # 停止消息总线
    await message_bus.stop()

    if __name__ == '__main__':
    asyncio.run(main())

    ```> **消息传递特点**:

    > - 异步消息传递
    > - 支持点对点和广播
    > - 消息类型分类
    > - 请求-响应模式

    ```
    ### 2. 共享知识库

    # 示例:共享知识库
    用户请求:
    "实现智能体之间的共享知识库"
    Claude Code 生成的代码:

    ````python

    `python

    from typing import Dict, List, Any, Optional
    from datetime import datetime
    import logging
    import json

    logger = logging.getLogger(__name__)

    class SharedKnowledgeBase:
    """共享知识库"""

    def __init__(self):
    self.knowledge: Dict[str, Any] = {}
    self.access_log: List[Dict[str, Any]] = []
    self.version: int = 0

    def add_knowledge(self, key: str, value: Any, agent_id: str) -> bool:
    """添加知识"""
    if key in self.knowledge:
    logger.warning(f"Knowledge key already exists: {key}")
    return False

    self.knowledge[key] = {
    'value': value,
    'agent_id': agent_id,
    'created_at': datetime.utcnow(),
    'version': self.version
    }

    self.version += 1
    self._log_access(agent_id, 'add', key)

    logger.info(f"Knowledge added: {key} by {agent_id}")
    return True

    def update_knowledge(self, key: str, value: Any, agent_id: str) -> bool:
    """更新知识"""
    if key not in self.knowledge:
    logger.warning(f"Knowledge key not found: {key}")
    return False

    old_value = self.knowledge[key]['value']
    self.knowledge[key]['value'] = value
    self.knowledge[key]['updated_at'] = datetime.utcnow()
    self.knowledge[key]['updated_by'] = agent_id
    self.knowledge[key]['version'] = self.version
    self.knowledge[key]['old_value'] = old_value

    self.version += 1
    self._log_access(agent_id, 'update', key)

    logger.info(f"Knowledge updated: {key} by {agent_id}")
    return True

    def get_knowledge(self, key: str, agent_id: str) -> Optional[Any]:
    """获取知识"""
    if key not in self.knowledge:
    logger.warning(f"Knowledge key not found: {key}")
    return None

    self._log_access(agent_id, 'read', key)

    return self.knowledge[key]['value']

    def delete_knowledge(self, key: str, agent_id: str) -> bool:
    """删除知识"""
    if key not in self.knowledge:
    logger.warning(f"Knowledge key not found: {key}")
    return False

    del self.knowledge[key]
    self._log_access(agent_id, 'delete', key)

    logger.info(f"Knowledge deleted: {key} by {agent_id}")
    return True

    def search_knowledge(self, query: str, agent_id: str) -> List[Dict[str, Any]]:
    """搜索知识"""
    results = []
    query_lower = query.lower()

    for key, knowledge in self.knowledge.items():
    if query_lower in key.lower():
    results.append({
    'key': key,
    'value': knowledge['value'],
    'agent_id': knowledge['agent_id'],
    'version': knowledge['version']
    })

    self._log_access(agent_id, 'search', query)

    logger.info(f"Knowledge search: {query} by {agent_id}, found {len(results)} results")
    return results

    def get_all_knowledge(self, agent_id: str) -> Dict[str, Any]:
    """获取所有知识"""
    self._log_access(agent_id, 'read_all', 'all')

    return {key: knowledge['value'] for key, knowledge in self.knowledge.items()}

    def get_knowledge_by_agent(self, agent_id: str) -> Dict[str, Any]:
    """获取智能体的知识"""
    agent_knowledge = {}

    for key, knowledge in self.knowledge.items():
    if knowledge['agent_id'] == agent_id:
    agent_knowledge[key] = knowledge['value']

    return agent_knowledge

    def merge_knowledge(self, other_knowledge: Dict[str, Any], agent_id: str) -> int:
    """合并知识"""
    merged_count = 0

    for key, value in other_knowledge.items():
    if key not in self.knowledge:
    self.add_knowledge(key, value, agent_id)
    merged_count += 1

    logger.info(f"Knowledge merged: {merged_count} items by {agent_id}")
    return merged_count

    def _log_access(self, agent_id: str, action: str, key: str):
    """记录访问日志"""
    self.access_log.append({
    'agent_id': agent_id,
    'action': action,
    'key': key,
    'timestamp': datetime.utcnow()
    })

    def get_access_log(self, agent_id: Optional[str] = None) -> List[Dict[str, Any]]:
    """获取访问日志"""
    if agent_id:
    return [log for log in self.access_log if log['agent_id'] == agent_id]
    return self.access_log

    def get_statistics(self) -> Dict[str, Any]:
    """获取统计信息"""
    agent_counts = {}

    for knowledge in self.knowledge.values():
    agent_id = knowledge['agent_id']
    agent_counts[agent_id] = agent_counts.get(agent_id, 0) + 1

    return {
    'total_knowledge': len(self.knowledge),
    'version': self.version,
    'agent_counts': agent_counts,
    'total_accesses': len(self.access_log)
    }

    class KnowledgeSharingAgent(Agent):
    """知识共享智能体"""

    def __init__(self, agent_id: str, message_bus: MessageBus, knowledge_base: SharedKnowledgeBase):
    super().__init__(agent_id, message_bus)
    self.knowledge_base = knowledge_base

    async def share_knowledge(self, key: str, value: Any):
    """分享知识"""
    success = self.knowledge_base.add_knowledge(key, value, self.id)

    if success:

     # 广播知识更新

    await self.broadcast({
    'type': 'knowledge_update',
    'key': key,
    'agent_id': self.id
    })

    async def request_knowledge(self, key: str, target_agent_id: str) -> Optional[Any]:
    """请求知识"""

     # 发送知识请求

    await self.send_request(target_agent_id, {
    'type': 'knowledge_request',
    'key': key
    })

     # 等待响应

    await asyncio.sleep(0.5)

     # 从知识库获取

    return self.knowledge_base.get_knowledge(key, self.id)

    async def sync_knowledge(self):
    """同步知识"""

     # 获取其他智能体的知识

    other_agents = [aid for aid in self.message_bus.agents.keys() if aid != self.id]

    for agent_id in other_agents:
    await self.send_request(agent_id, {
    'type': 'knowledge_sync_request'
    })

    # 使用示例

    async def main():
    """主函数"""

     # 创建消息总线

    message_bus = MessageBus()
    await message_bus.start()

     # 创建共享知识库

    knowledge_base = SharedKnowledgeBase()

     # 创建智能体

    agent1 = KnowledgeSharingAgent("agent1", message_bus, knowledge_base)
    agent2 = KnowledgeSharingAgent("agent2", message_bus, knowledge_base)
    agent3 = KnowledgeSharingAgent("agent3", message_bus, knowledge_base)

     # 注册智能体

    message_bus.register_agent(agent1)
    message_bus.register_agent(agent2)
    message_bus.register_agent(agent3)

     # 分享知识

    await agent1.share_knowledge("python_syntax", "Python uses indentation for code blocks")
    await agent2.share_knowledge("javascript_syntax", "JavaScript uses curly braces for code blocks")
    await agent3.share_knowledge("java_syntax", "Java uses curly braces and semicolons")

     # 请求知识

    python_syntax = await agent2.request_knowledge("python_syntax", "agent1")
    print(f"Python syntax: {python_syntax}")

     # 获取统计信息

    stats = knowledge_base.get_statistics()
    print(f"Knowledge base statistics: {stats}")

     # 停止消息总线

    await message_bus.stop()

    if __name__ == '__main__':
    asyncio.run(main())

    ```> **共享知识库特点**:
    > - 集中式知识存储
    > - 访问控制和日志
    > - 知识搜索和检索
    > - 智能体间知识同步

    ```

    ## 协作任务执行

    ### 1. 任务分配

    # 示例:任务分配

    用户请求:
    "实现智能体之间的任务分配机制"
    Claude Code 生成的代码:

    ````python
    `python

    from typing import Dict, List, Any, Optional
    from datetime import datetime
    import logging
    import asyncio

    logger = logging.getLogger(__name__)

    class Task:
    """任务"""

    def __init__(self, task_id: str, name: str, description: str, requirements: List[str], estimated_duration: float):
    self.id = task_id
    self.name = name
    self.description = description
    self.requirements = requirements
    self.estimated_duration = estimated_duration
    self.assigned_to: Optional[str] = None
    self.status = "pending"
    self.created_at = datetime.utcnow()
    self.started_at: Optional[datetime] = None
    self.completed_at: Optional[datetime] = None
    self.result: Optional[Any] = None

    class TaskAllocator:
    """任务分配器"""

    def __init__(self):
    self.tasks: Dict[str, Task] = {}
    self.agents: Dict[str, Dict[str, Any]] = {}
    self.agent_workload: Dict[str, float] = {}

    def register_agent(self, agent_id: str, capabilities: List[str], max_workload: float = 1.0):
    """注册智能体"""
    self.agents[agent_id] = {
    'capabilities': capabilities,
    'max_workload': max_workload,
    'current_workload': 0.0
    }
    self.agent_workload[agent_id] = 0.0

    logger.info(f"Agent registered: {agent_id} with capabilities {capabilities}")

    def add_task(self, task: Task):
    """添加任务"""
    self.tasks[task.id] = task
    logger.info(f"Task added: {task.id}")

    def allocate_task(self, task_id: str) -> Optional[str]:
    """分配任务"""
    task = self.tasks.get(task_id)
    if not task:
    logger.warning(f"Task not found: {task_id}")
    return None

     # 查找合适的智能体
    suitable_agents = self._find_suitable_agents(task)

    if not suitable_agents:
    logger.warning(f"No suitable agents for task: {task_id}")
    return None

     # 选择负载最低的智能体
    selected_agent = self._select_least_loaded_agent(suitable_agents)

     # 分配任务
    task.assigned_to = selected_agent
    task.status = "assigned"
    self.agents[selected_agent]['current_workload'] += task.estimated_duration
    self.agent_workload[selected_agent] = self.agents[selected_agent]['current_workload']

    logger.info(f"Task {task_id} assigned to {selected_agent}")
    return selected_agent

    def _find_suitable_agents(self, task: Task) -> List[str]:
    """查找合适的智能体"""
    suitable_agents = []

    for agent_id, agent_info in self.agents.items():
     # 检查能力匹配
    capabilities_match = all(
    req in agent_info['capabilities']
    for req in task.requirements
    )

     # 检查负载
    has_capacity = (
    agent_info['current_workload'] + task.estimated_duration
    <= agent_info['max_workload']
    )

    if capabilities_match and has_capacity:
    suitable_agents.append(agent_id)

    return suitable_agents

    def _select_least_loaded_agent(self, agents: List[str]) -> str:
    """选择负载最低的智能体"""
    return min(agents, key=lambda aid: self.agent_workload[aid])

    def complete_task(self, task_id: str, result: Any):
    """完成任务"""
    task = self.tasks.get(task_id)
    if not task:
    logger.warning(f"Task not found: {task_id}")
    return

     # 更新任务状态
    task.status = "completed"
    task.completed_at = datetime.utcnow()
    task.result = result

     # 更新智能体负载
    if task.assigned_to:
    self.agents[task.assigned_to]['current_workload'] -= task.estimated_duration
    self.agent_workload[task.assigned_to] = self.agents[task.assigned_to]['current_workload']

    logger.info(f"Task {task_id} completed by {task.assigned_to}")

    def get_agent_tasks(self, agent_id: str) -> List[Task]:
    """获取智能体的任务"""
    return [
    task for task in self.tasks.values()
    if task.assigned_to == agent_id and task.status != "completed"
    ]

    def get_statistics(self) -> Dict[str, Any]:
    """获取统计信息"""
    total_tasks = len(self.tasks)
    completed_tasks = len([t for t in self.tasks.values() if t.status == "completed"])
    pending_tasks = len([t for t in self.tasks.values() if t.status == "pending"])
    assigned_tasks = len([t for t in self.tasks.values() if t.status == "assigned"])

    return {
    'total_tasks': total_tasks,
    'completed_tasks': completed_tasks,
    'pending_tasks': pending_tasks,
    'assigned_tasks': assigned_tasks,
    'agent_workload': self.agent_workload.copy()
    }

    # 使用示例
    async def main():
    """主函数"""
    allocator = TaskAllocator()

     # 注册智能体
    allocator.register_agent("agent1", ["code_generation", "code_review"], max_workload=2.0)
    allocator.register_agent("agent2", ["code_review", "testing"], max_workload=1.5)
    allocator.register_agent("agent3", ["code_generation", "testing"], max_workload=2.0)

     # 添加任务
    tasks = [
    Task("task1", "Generate User Module", "Generate user authentication module", ["code_generation"], 1.0),
    Task("task2", "Review User Module", "Review user authentication module", ["code_review"], 0.5),
    Task("task3", "Generate Product Module", "Generate product management module", ["code_generation"], 1.0),
    Task("task4", "Test User Module", "Test user authentication module", ["testing"], 0.5),
    Task("task5", "Review Product Module", "Review product management module", ["code_review"], 0.5),
    Task("task6", "Test Product Module", "Test product management module", ["testing"], 0.5),
    ]

    for task in tasks:
    allocator.add_task(task)

     # 分配任务
    for task in tasks:
    allocator.allocate_task(task.id)

     # 获取统计信息
    stats = allocator.get_statistics()
    print(f"Allocation statistics: {stats}")

     # 获取智能体任务
    for agent_id in ["agent1", "agent2", "agent3"]:
    agent_tasks = allocator.get_agent_tasks(agent_id)
    print(f"\nAgent {agent_id} tasks:")
    for task in agent_tasks:
    print(f"  - {task.name} ({task.estimated_duration}h)")

    if __name__ == '__main__':
    asyncio.run(main())

    ```> **任务分配特点**:

    > - 基于能力匹配
    > - 考虑负载均衡
    > - 动态调整分配
    > - 任务状态跟踪

    ## 总结

    多智能体协作包括:

    1. **多智能体协作的基本概念**: 什么是多智能体协作、协作模式
    2. **智能体通信机制**: 消息传递、共享知识库
    3. **协作任务执行**: 任务分配

    通过多智能体协作,Claude Code可以实现更复杂的任务,提高整体效率和效果。

    至此,第27章"Agentic AI 核心技术"全部完成。接下来我们将创建第28章"Claude Code 架构解析"

基于 MIT 许可发布 | 永久导航