27.5 多智能体协作
多智能体协作概述
多智能体协作是指多个AI智能体协同工作,共同完成复杂任务的能力。通过协作,智能体可以共享信息、分工合作、互相补充,从而提高整体效率和效果。
多智能体协作的基本概念
1. 什么是多智能体协作
多智能体协作是指多个AI智能体通过通信、协调和合作,共同完成单个智能体难以完成的复杂任务。
多智能体协作的特点 :
- 分布式决策 : 每个智能体独立决策
- 信息共享 : 智能体之间共享信息
- 任务分工 : 智能体分工合作
- 协同优化 : 整体优化而非局部优化
2. 协作模式
| 协作模式 | 特点 | 适用场景 |
|---|
主从模式| 一个主智能体协调多个从智能体| 层次化任务 平等模式| 智能体地位平等,共同决策| 协作任务 竞争模式| 智能体竞争完成任务| 优化问题 混合模式| 结合多种模式| 复杂场景
智能体通信机制
1. 消息传递
示例:消息传递机制
用户请求: "实现智能体之间的消息传递机制"
Claude Code 生成的代码:
python
python
````python
```python
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime
from enum import Enum
import logging
import asyncio
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
class MessageType(Enum):
"""消息类型"""
REQUEST = "request"
RESPONSE = "response"
NOTIFICATION = "notification"
BROADCAST = "broadcast"
@dataclass
class Message:
"""消息"""
id: str
type: MessageType
sender: str
receiver: str
content: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.utcnow)
reply_to: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
class MessageBus:
"""消息总线"""
def __init__(self):
self.agents: Dict[str, 'Agent'] = {}
self.message_queue: asyncio.Queue = asyncio.Queue()
self.message_handlers: Dict[str, Callable] = {}
self.running = False
def register_agent(self, agent: 'Agent'):
"""注册智能体"""
self.agents[agent.id] = agent
logger.info(f"Agent registered: {agent.id}")
def unregister_agent(self, agent_id: str):
"""注销智能体"""
if agent_id in self.agents:
del self.agents[agent_id]
logger.info(f"Agent unregistered: {agent_id}")
async def send_message(self, message: Message):
"""发送消息"""
receiver = self.agents.get(message.receiver)
if not receiver:
logger.warning(f"Receiver not found: {message.receiver}")
return
# 添加到消息队列
await self.message_queue.put(message)
logger.info(f"Message sent from {message.sender} to {message.receiver}")
async def broadcast_message(self, message: Message):
"""广播消息"""
for agent_id, agent in self.agents.items():
if agent_id != message.sender:
broadcast_msg = Message(
id=f"{message.id}_to_{agent_id}",
type=message.type,
sender=message.sender,
receiver=agent_id,
content=message.content,
timestamp=message.timestamp,
metadata=message.metadata
)
await self.send_message(broadcast_msg)
logger.info(f"Message broadcasted from {message.sender}")
def register_handler(self, message_type: str, handler: Callable):
"""注册消息处理器"""
self.message_handlers[message_type] = handler
logger.info(f"Handler registered for message type: {message_type}")
async def start(self):
"""启动消息总线"""
self.running = True
logger.info("Message bus started")
# 启动消息处理循环
asyncio.create_task(self._process_messages())
async def stop(self):
"""停止消息总线"""
self.running = False
logger.info("Message bus stopped")
async def _process_messages(self):
"""处理消息"""
while self.running:
try:
# 获取消息
message = await asyncio.wait_for(
self.message_queue.get(),
timeout=1.0
)
# 传递给接收者
receiver = self.agents.get(message.receiver)
if receiver:
await receiver.receive_message(message)
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"Error processing message: {e}")
class Agent:
"""智能体基类"""
def __init__(self, agent_id: str, message_bus: MessageBus):
self.id = agent_id
self.message_bus = message_bus
self.message_handlers: Dict[str, Callable] = {}
self.knowledge_base: Dict[str, Any] = {}
async def receive_message(self, message: Message):
"""接收消息"""
logger.info(f"Agent {self.id} received message from {message.sender}")
# 查找处理器
handler = self.message_handlers.get(message.type.value)
if handler:
# 调用处理器
response = await handler(message)
# 如果是请求消息,发送响应
if message.type == MessageType.REQUEST:
response_message = Message(
id=f"response_{message.id}",
type=MessageType.RESPONSE,
sender=self.id,
receiver=message.sender,
content=response,
reply_to=message.id
)
await self.message_bus.send_message(response_message)
else:
logger.warning(f"No handler for message type: {message.type.value}")
def register_handler(self, message_type: str, handler: Callable):
"""注册消息处理器"""
self.message_handlers[message_type] = handler
logger.info(f"Agent {self.id} registered handler for {message_type}")
async def send_request(self, receiver_id: str, content: Dict[str, Any]) -> Message:
"""发送请求"""
message = Message(
id=f"req_{datetime.utcnow().timestamp()}",
type=MessageType.REQUEST,
sender=self.id,
receiver=receiver_id,
content=content
)
await self.message_bus.send_message(message)
return message
async def send_response(self, original_message: Message, content: Dict[str, Any]):
"""发送响应"""
message = Message(
id=f"resp_{datetime.utcnow().timestamp()}",
type=MessageType.RESPONSE,
sender=self.id,
receiver=original_message.sender,
content=content,
reply_to=original_message.id
)
await self.message_bus.send_message(message)
async def send_notification(self, receiver_id: str, content: Dict[str, Any]):
"""发送通知"""
message = Message(
id=f"notif_{datetime.utcnow().timestamp()}",
type=MessageType.NOTIFICATION,
sender=self.id,
receiver=receiver_id,
content=content
)
await self.message_bus.send_message(message)
async def broadcast(self, content: Dict[str, Any]):
"""广播消息"""
message = Message(
id=f"broadcast_{datetime.utcnow().timestamp()}",
type=MessageType.BROADCAST,
sender=self.id,
receiver="all",
content=content
)
await self.message_bus.broadcast_message(message)
def update_knowledge(self, key: str, value: Any):
"""更新知识库"""
self.knowledge_base[key] = value
logger.info(f"Agent {self.id} updated knowledge: {key}")
def get_knowledge(self, key: str) -> Optional[Any]:
"""获取知识"""
return self.knowledge_base.get(key)
# 使用示例
async def main():
"""主函数"""
# 创建消息总线
message_bus = MessageBus()
await message_bus.start()
# 创建智能体
agent1 = Agent("agent1", message_bus)
agent2 = Agent("agent2", message_bus)
agent3 = Agent("agent3", message_bus)
# 注册智能体
message_bus.register_agent(agent1)
message_bus.register_agent(agent2)
message_bus.register_agent(agent3)
# 注册消息处理器
async def handle_request(message: Message) -> Dict[str, Any]:
"""处理请求"""
print(f"Agent {agent1.id} handling request from {message.sender}")
return {'status': 'success', 'data': 'processed'}
agent1.register_handler('request', handle_request)
async def handle_notification(message: Message) -> Dict[str, Any]:
"""处理通知"""
print(f"Agent {agent2.id} received notification from {message.sender}")
return {'status': 'acknowledged'}
agent2.register_handler('notification', handle_notification)
# 发送消息
await agent1.send_request('agent2', {'task': 'process_data'})
await agent2.send_notification('agent3', {'event': 'update'})
await agent3.broadcast({'announcement': 'system_ready'})
# 等待消息处理
await asyncio.sleep(1)
# 停止消息总线
await message_bus.stop()
if __name__ == '__main__':
asyncio.run(main())
```> **消息传递特点**:
> - 异步消息传递
> - 支持点对点和广播
> - 消息类型分类
> - 请求-响应模式
```
### 2. 共享知识库
# 示例:共享知识库
用户请求:
"实现智能体之间的共享知识库"
Claude Code 生成的代码:
````python
`python
from typing import Dict, List, Any, Optional
from datetime import datetime
import logging
import json
logger = logging.getLogger(__name__)
class SharedKnowledgeBase:
"""共享知识库"""
def __init__(self):
self.knowledge: Dict[str, Any] = {}
self.access_log: List[Dict[str, Any]] = []
self.version: int = 0
def add_knowledge(self, key: str, value: Any, agent_id: str) -> bool:
"""添加知识"""
if key in self.knowledge:
logger.warning(f"Knowledge key already exists: {key}")
return False
self.knowledge[key] = {
'value': value,
'agent_id': agent_id,
'created_at': datetime.utcnow(),
'version': self.version
}
self.version += 1
self._log_access(agent_id, 'add', key)
logger.info(f"Knowledge added: {key} by {agent_id}")
return True
def update_knowledge(self, key: str, value: Any, agent_id: str) -> bool:
"""更新知识"""
if key not in self.knowledge:
logger.warning(f"Knowledge key not found: {key}")
return False
old_value = self.knowledge[key]['value']
self.knowledge[key]['value'] = value
self.knowledge[key]['updated_at'] = datetime.utcnow()
self.knowledge[key]['updated_by'] = agent_id
self.knowledge[key]['version'] = self.version
self.knowledge[key]['old_value'] = old_value
self.version += 1
self._log_access(agent_id, 'update', key)
logger.info(f"Knowledge updated: {key} by {agent_id}")
return True
def get_knowledge(self, key: str, agent_id: str) -> Optional[Any]:
"""获取知识"""
if key not in self.knowledge:
logger.warning(f"Knowledge key not found: {key}")
return None
self._log_access(agent_id, 'read', key)
return self.knowledge[key]['value']
def delete_knowledge(self, key: str, agent_id: str) -> bool:
"""删除知识"""
if key not in self.knowledge:
logger.warning(f"Knowledge key not found: {key}")
return False
del self.knowledge[key]
self._log_access(agent_id, 'delete', key)
logger.info(f"Knowledge deleted: {key} by {agent_id}")
return True
def search_knowledge(self, query: str, agent_id: str) -> List[Dict[str, Any]]:
"""搜索知识"""
results = []
query_lower = query.lower()
for key, knowledge in self.knowledge.items():
if query_lower in key.lower():
results.append({
'key': key,
'value': knowledge['value'],
'agent_id': knowledge['agent_id'],
'version': knowledge['version']
})
self._log_access(agent_id, 'search', query)
logger.info(f"Knowledge search: {query} by {agent_id}, found {len(results)} results")
return results
def get_all_knowledge(self, agent_id: str) -> Dict[str, Any]:
"""获取所有知识"""
self._log_access(agent_id, 'read_all', 'all')
return {key: knowledge['value'] for key, knowledge in self.knowledge.items()}
def get_knowledge_by_agent(self, agent_id: str) -> Dict[str, Any]:
"""获取智能体的知识"""
agent_knowledge = {}
for key, knowledge in self.knowledge.items():
if knowledge['agent_id'] == agent_id:
agent_knowledge[key] = knowledge['value']
return agent_knowledge
def merge_knowledge(self, other_knowledge: Dict[str, Any], agent_id: str) -> int:
"""合并知识"""
merged_count = 0
for key, value in other_knowledge.items():
if key not in self.knowledge:
self.add_knowledge(key, value, agent_id)
merged_count += 1
logger.info(f"Knowledge merged: {merged_count} items by {agent_id}")
return merged_count
def _log_access(self, agent_id: str, action: str, key: str):
"""记录访问日志"""
self.access_log.append({
'agent_id': agent_id,
'action': action,
'key': key,
'timestamp': datetime.utcnow()
})
def get_access_log(self, agent_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""获取访问日志"""
if agent_id:
return [log for log in self.access_log if log['agent_id'] == agent_id]
return self.access_log
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
agent_counts = {}
for knowledge in self.knowledge.values():
agent_id = knowledge['agent_id']
agent_counts[agent_id] = agent_counts.get(agent_id, 0) + 1
return {
'total_knowledge': len(self.knowledge),
'version': self.version,
'agent_counts': agent_counts,
'total_accesses': len(self.access_log)
}
class KnowledgeSharingAgent(Agent):
"""知识共享智能体"""
def __init__(self, agent_id: str, message_bus: MessageBus, knowledge_base: SharedKnowledgeBase):
super().__init__(agent_id, message_bus)
self.knowledge_base = knowledge_base
async def share_knowledge(self, key: str, value: Any):
"""分享知识"""
success = self.knowledge_base.add_knowledge(key, value, self.id)
if success:
# 广播知识更新
await self.broadcast({
'type': 'knowledge_update',
'key': key,
'agent_id': self.id
})
async def request_knowledge(self, key: str, target_agent_id: str) -> Optional[Any]:
"""请求知识"""
# 发送知识请求
await self.send_request(target_agent_id, {
'type': 'knowledge_request',
'key': key
})
# 等待响应
await asyncio.sleep(0.5)
# 从知识库获取
return self.knowledge_base.get_knowledge(key, self.id)
async def sync_knowledge(self):
"""同步知识"""
# 获取其他智能体的知识
other_agents = [aid for aid in self.message_bus.agents.keys() if aid != self.id]
for agent_id in other_agents:
await self.send_request(agent_id, {
'type': 'knowledge_sync_request'
})
# 使用示例
async def main():
"""主函数"""
# 创建消息总线
message_bus = MessageBus()
await message_bus.start()
# 创建共享知识库
knowledge_base = SharedKnowledgeBase()
# 创建智能体
agent1 = KnowledgeSharingAgent("agent1", message_bus, knowledge_base)
agent2 = KnowledgeSharingAgent("agent2", message_bus, knowledge_base)
agent3 = KnowledgeSharingAgent("agent3", message_bus, knowledge_base)
# 注册智能体
message_bus.register_agent(agent1)
message_bus.register_agent(agent2)
message_bus.register_agent(agent3)
# 分享知识
await agent1.share_knowledge("python_syntax", "Python uses indentation for code blocks")
await agent2.share_knowledge("javascript_syntax", "JavaScript uses curly braces for code blocks")
await agent3.share_knowledge("java_syntax", "Java uses curly braces and semicolons")
# 请求知识
python_syntax = await agent2.request_knowledge("python_syntax", "agent1")
print(f"Python syntax: {python_syntax}")
# 获取统计信息
stats = knowledge_base.get_statistics()
print(f"Knowledge base statistics: {stats}")
# 停止消息总线
await message_bus.stop()
if __name__ == '__main__':
asyncio.run(main())
```> **共享知识库特点**:
> - 集中式知识存储
> - 访问控制和日志
> - 知识搜索和检索
> - 智能体间知识同步
```
## 协作任务执行
### 1. 任务分配
# 示例:任务分配
用户请求:
"实现智能体之间的任务分配机制"
Claude Code 生成的代码:
````python
`python
from typing import Dict, List, Any, Optional
from datetime import datetime
import logging
import asyncio
logger = logging.getLogger(__name__)
class Task:
"""任务"""
def __init__(self, task_id: str, name: str, description: str, requirements: List[str], estimated_duration: float):
self.id = task_id
self.name = name
self.description = description
self.requirements = requirements
self.estimated_duration = estimated_duration
self.assigned_to: Optional[str] = None
self.status = "pending"
self.created_at = datetime.utcnow()
self.started_at: Optional[datetime] = None
self.completed_at: Optional[datetime] = None
self.result: Optional[Any] = None
class TaskAllocator:
"""任务分配器"""
def __init__(self):
self.tasks: Dict[str, Task] = {}
self.agents: Dict[str, Dict[str, Any]] = {}
self.agent_workload: Dict[str, float] = {}
def register_agent(self, agent_id: str, capabilities: List[str], max_workload: float = 1.0):
"""注册智能体"""
self.agents[agent_id] = {
'capabilities': capabilities,
'max_workload': max_workload,
'current_workload': 0.0
}
self.agent_workload[agent_id] = 0.0
logger.info(f"Agent registered: {agent_id} with capabilities {capabilities}")
def add_task(self, task: Task):
"""添加任务"""
self.tasks[task.id] = task
logger.info(f"Task added: {task.id}")
def allocate_task(self, task_id: str) -> Optional[str]:
"""分配任务"""
task = self.tasks.get(task_id)
if not task:
logger.warning(f"Task not found: {task_id}")
return None
# 查找合适的智能体
suitable_agents = self._find_suitable_agents(task)
if not suitable_agents:
logger.warning(f"No suitable agents for task: {task_id}")
return None
# 选择负载最低的智能体
selected_agent = self._select_least_loaded_agent(suitable_agents)
# 分配任务
task.assigned_to = selected_agent
task.status = "assigned"
self.agents[selected_agent]['current_workload'] += task.estimated_duration
self.agent_workload[selected_agent] = self.agents[selected_agent]['current_workload']
logger.info(f"Task {task_id} assigned to {selected_agent}")
return selected_agent
def _find_suitable_agents(self, task: Task) -> List[str]:
"""查找合适的智能体"""
suitable_agents = []
for agent_id, agent_info in self.agents.items():
# 检查能力匹配
capabilities_match = all(
req in agent_info['capabilities']
for req in task.requirements
)
# 检查负载
has_capacity = (
agent_info['current_workload'] + task.estimated_duration
<= agent_info['max_workload']
)
if capabilities_match and has_capacity:
suitable_agents.append(agent_id)
return suitable_agents
def _select_least_loaded_agent(self, agents: List[str]) -> str:
"""选择负载最低的智能体"""
return min(agents, key=lambda aid: self.agent_workload[aid])
def complete_task(self, task_id: str, result: Any):
"""完成任务"""
task = self.tasks.get(task_id)
if not task:
logger.warning(f"Task not found: {task_id}")
return
# 更新任务状态
task.status = "completed"
task.completed_at = datetime.utcnow()
task.result = result
# 更新智能体负载
if task.assigned_to:
self.agents[task.assigned_to]['current_workload'] -= task.estimated_duration
self.agent_workload[task.assigned_to] = self.agents[task.assigned_to]['current_workload']
logger.info(f"Task {task_id} completed by {task.assigned_to}")
def get_agent_tasks(self, agent_id: str) -> List[Task]:
"""获取智能体的任务"""
return [
task for task in self.tasks.values()
if task.assigned_to == agent_id and task.status != "completed"
]
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
total_tasks = len(self.tasks)
completed_tasks = len([t for t in self.tasks.values() if t.status == "completed"])
pending_tasks = len([t for t in self.tasks.values() if t.status == "pending"])
assigned_tasks = len([t for t in self.tasks.values() if t.status == "assigned"])
return {
'total_tasks': total_tasks,
'completed_tasks': completed_tasks,
'pending_tasks': pending_tasks,
'assigned_tasks': assigned_tasks,
'agent_workload': self.agent_workload.copy()
}
# 使用示例
async def main():
"""主函数"""
allocator = TaskAllocator()
# 注册智能体
allocator.register_agent("agent1", ["code_generation", "code_review"], max_workload=2.0)
allocator.register_agent("agent2", ["code_review", "testing"], max_workload=1.5)
allocator.register_agent("agent3", ["code_generation", "testing"], max_workload=2.0)
# 添加任务
tasks = [
Task("task1", "Generate User Module", "Generate user authentication module", ["code_generation"], 1.0),
Task("task2", "Review User Module", "Review user authentication module", ["code_review"], 0.5),
Task("task3", "Generate Product Module", "Generate product management module", ["code_generation"], 1.0),
Task("task4", "Test User Module", "Test user authentication module", ["testing"], 0.5),
Task("task5", "Review Product Module", "Review product management module", ["code_review"], 0.5),
Task("task6", "Test Product Module", "Test product management module", ["testing"], 0.5),
]
for task in tasks:
allocator.add_task(task)
# 分配任务
for task in tasks:
allocator.allocate_task(task.id)
# 获取统计信息
stats = allocator.get_statistics()
print(f"Allocation statistics: {stats}")
# 获取智能体任务
for agent_id in ["agent1", "agent2", "agent3"]:
agent_tasks = allocator.get_agent_tasks(agent_id)
print(f"\nAgent {agent_id} tasks:")
for task in agent_tasks:
print(f" - {task.name} ({task.estimated_duration}h)")
if __name__ == '__main__':
asyncio.run(main())
```> **任务分配特点**:
> - 基于能力匹配
> - 考虑负载均衡
> - 动态调整分配
> - 任务状态跟踪
## 总结
多智能体协作包括:
1. **多智能体协作的基本概念**: 什么是多智能体协作、协作模式
2. **智能体通信机制**: 消息传递、共享知识库
3. **协作任务执行**: 任务分配
通过多智能体协作,Claude Code可以实现更复杂的任务,提高整体效率和效果。
至此,第27章"Agentic AI 核心技术"全部完成。接下来我们将创建第28章"Claude Code 架构解析"。