27.5 多智慧體協作
多智慧體協作概述
多智慧體協作是指多個AI智慧體協同工作,共同完成複雜任務的能力。透過協作,智慧體可以共享資訊、分工合作、互相補充,從而提高整體效率和效果。
多智能体协作的基本概念
1. 什么是多智能体协作
多智慧體協作是指多個AI智慧體透過通訊、協調和合作,共同完成單個智慧體難以完成的複雜任務。
多智能体协作的特点 :
- 分散式決策 : 每個智慧體獨立決策
- 資訊共享 : 智慧體之間共享資訊
- 任務分工 : 智慧體分工合作
- 協同最佳化 : 整體最佳化而非區域性最佳化
2. 協作模式
| 協作模式 | 特點 | 適用場景 |
|---|
主從模式| 一個主智慧體協調多個從智慧體| 層次化任務 平等模式| 智慧體地位平等,共同決策| 協作任務 競爭模式| 智慧體競爭完成任務| 最佳化問題 混合模式| 結合多種模式| 複雜場景
智能体通信机制
1. 消息传递
示例:消息传递机制
使用者請求: "實現智慧體之間的訊息傳遞機制"
Claude Code 生成的程式碼:
python
python
````python
> - 异步消息传递
> - 支持点对点和广播
> - 消息类型分类
> - 请求-响应模式
`python
from typing import Dict, List, Any, Optional
from datetime import datetime
import logging
import json
logger = logging.getLogger(__name__)
class SharedKnowledgeBase:
"""共享知識庫"""
def __init__(self):
self.knowledge: Dict[str, Any] = {}
self.access_log: List[Dict[str, Any]] = []
self.version: int = 0
def add_knowledge(self, key: str, value: Any, agent_id: str) -> bool:
"""新增知識"""
if key in self.knowledge:
logger.warning(f"Knowledge key already exists: {key}")
return False
self.knowledge[key] = {
'value': value,
'agent_id': agent_id,
'created_at': datetime.utcnow(),
'version': self.version
}
self.version += 1
self._log_access(agent_id, 'add', key)
logger.info(f"Knowledge added: {key} by {agent_id}")
return True
def update_knowledge(self, key: str, value: Any, agent_id: str) -> bool:
"""更新知識"""
if key not in self.knowledge:
logger.warning(f"Knowledge key not found: {key}")
return False
old_value = self.knowledge[key]['value']
self.knowledge[key]['value'] = value
self.knowledge[key]['updated_at'] = datetime.utcnow()
self.knowledge[key]['updated_by'] = agent_id
self.knowledge[key]['version'] = self.version
self.knowledge[key]['old_value'] = old_value
self.version += 1
self._log_access(agent_id, 'update', key)
logger.info(f"Knowledge updated: {key} by {agent_id}")
return True
def get_knowledge(self, key: str, agent_id: str) -> Optional[Any]:
"""獲取知識"""
if key not in self.knowledge:
logger.warning(f"Knowledge key not found: {key}")
return None
self._log_access(agent_id, 'read', key)
return self.knowledge[key]['value']
def delete_knowledge(self, key: str, agent_id: str) -> bool:
"""刪除知識"""
if key not in self.knowledge:
logger.warning(f"Knowledge key not found: {key}")
return False
del self.knowledge[key]
self._log_access(agent_id, 'delete', key)
logger.info(f"Knowledge deleted: {key} by {agent_id}")
return True
def search_knowledge(self, query: str, agent_id: str) -> List[Dict[str, Any]]:
"""搜尋知識"""
results = []
query_lower = query.lower()
for key, knowledge in self.knowledge.items():
if query_lower in key.lower():
results.append({
'key': key,
'value': knowledge['value'],
'agent_id': knowledge['agent_id'],
'version': knowledge['version']
})
self._log_access(agent_id, 'search', query)
logger.info(f"Knowledge search: {query} by {agent_id}, found {len(results)} results")
return results
def get_all_knowledge(self, agent_id: str) -> Dict[str, Any]:
"""獲取所有知識"""
self._log_access(agent_id, 'read_all', 'all')
return {key: knowledge['value'] for key, knowledge in self.knowledge.items()}
def get_knowledge_by_agent(self, agent_id: str) -> Dict[str, Any]:
"""獲取智慧體的知識"""
agent_knowledge = {}
for key, knowledge in self.knowledge.items():
if knowledge['agent_id'] == agent_id:
agent_knowledge[key] = knowledge['value']
return agent_knowledge
def merge_knowledge(self, other_knowledge: Dict[str, Any], agent_id: str) -> int:
"""合併知識"""
merged_count = 0
for key, value in other_knowledge.items():
if key not in self.knowledge:
self.add_knowledge(key, value, agent_id)
merged_count += 1
logger.info(f"Knowledge merged: {merged_count} items by {agent_id}")
return merged_count
def _log_access(self, agent_id: str, action: str, key: str):
"""記錄訪問日誌"""
self.access_log.append({
'agent_id': agent_id,
'action': action,
'key': key,
'timestamp': datetime.utcnow()
})
def get_access_log(self, agent_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""獲取訪問日誌"""
if agent_id:
return [log for log in self.access_log if log['agent_id'] == agent_id]
return self.access_log
def get_statistics(self) -> Dict[str, Any]:
"""獲取統計資訊"""
agent_counts = {}
for knowledge in self.knowledge.values():
agent_id = knowledge['agent_id']
agent_counts[agent_id] = agent_counts.get(agent_id, 0) + 1
return {
'total_knowledge': len(self.knowledge),
'version': self.version,
'agent_counts': agent_counts,
'total_accesses': len(self.access_log)
}
class KnowledgeSharingAgent(Agent):
"""知識共享智慧體"""
def __init__(self, agent_id: str, message_bus: MessageBus, knowledge_base: SharedKnowledgeBase):
super().__init__(agent_id, message_bus)
self.knowledge_base = knowledge_base
async def share_knowledge(self, key: str, value: Any):
"""分享知識"""
success = self.knowledge_base.add_knowledge(key, value, self.id)
if success:
# 廣播知識更新
await self.broadcast({
'type': 'knowledge_update',
'key': key,
'agent_id': self.id
})
async def request_knowledge(self, key: str, target_agent_id: str) -> Optional[Any]:
"""請求知識"""
# 傳送知識請求
await self.send_request(target_agent_id, {
'type': 'knowledge_request',
'key': key
})
# 等待響應
await asyncio.sleep(0.5)
# 從知識庫獲取
return self.knowledge_base.get_knowledge(key, self.id)
async def sync_knowledge(self):
"""同步知識"""
# 獲取其他智慧體的知識
other_agents = [aid for aid in self.message_bus.agents.keys() if aid != self.id]
for agent_id in other_agents:
await self.send_request(agent_id, {
'type': 'knowledge_sync_request'
})
# 使用示例
async def main():
"""主函式"""
# 建立訊息匯流排
message_bus = MessageBus()
await message_bus.start()
# 建立共享知識庫
knowledge_base = SharedKnowledgeBase()
# 建立智慧體
agent1 = KnowledgeSharingAgent("agent1", message_bus, knowledge_base)
agent2 = KnowledgeSharingAgent("agent2", message_bus, knowledge_base)
agent3 = KnowledgeSharingAgent("agent3", message_bus, knowledge_base)
# 註冊智慧體
message_bus.register_agent(agent1)
message_bus.register_agent(agent2)
message_bus.register_agent(agent3)
# 分享知識
await agent1.share_knowledge("python_syntax", "Python uses indentation for code blocks")
await agent2.share_knowledge("javascript_syntax", "JavaScript uses curly braces for code blocks")
await agent3.share_knowledge("java_syntax", "Java uses curly braces and semicolons")
# 請求知識
python_syntax = await agent2.request_knowledge("python_syntax", "agent1")
print(f"Python syntax: {python_syntax}")
# 獲取統計資訊
stats = knowledge_base.get_statistics()
print(f"Knowledge base statistics: {stats}")
# 停止訊息匯流排
await message_bus.stop()
if __name__ == '__main__':
asyncio.run(main())
```> **共享知识库特点**:
> - 集中式知识存储
> - 访问控制和日志
> - 知识搜索和检索
> - 智能体间知识同步
```
## 協作任務執行
### 1. 任務分配
# 示例:任務分配
使用者請求:
"實現智慧體之間的任務分配機制"
Claude Code 生成的程式碼:
````python
`python
from typing import Dict, List, Any, Optional
from datetime import datetime
import logging
import asyncio
logger = logging.getLogger(__name__)
class Task:
"""任务"""
def __init__(self, task_id: str, name: str, description: str, requirements: List[str], estimated_duration: float):
self.id = task_id
self.name = name
self.description = description
self.requirements = requirements
self.estimated_duration = estimated_duration
self.assigned_to: Optional[str] = None
self.status = "pending"
self.created_at = datetime.utcnow()
self.started_at: Optional[datetime] = None
self.completed_at: Optional[datetime] = None
self.result: Optional[Any] = None
class TaskAllocator:
"""任务分配器"""
def __init__(self):
self.tasks: Dict[str, Task] = {}
self.agents: Dict[str, Dict[str, Any]] = {}
self.agent_workload: Dict[str, float] = {}
def register_agent(self, agent_id: str, capabilities: List[str], max_workload: float = 1.0):
"""注册智能体"""
self.agents[agent_id] = {
'capabilities': capabilities,
'max_workload': max_workload,
'current_workload': 0.0
}
self.agent_workload[agent_id] = 0.0
logger.info(f"Agent registered: {agent_id} with capabilities {capabilities}")
def add_task(self, task: Task):
"""添加任务"""
self.tasks[task.id] = task
logger.info(f"Task added: {task.id}")
def allocate_task(self, task_id: str) -> Optional[str]:
"""分配任务"""
task = self.tasks.get(task_id)
if not task:
logger.warning(f"Task not found: {task_id}")
return None
# 查找合适的智能体
suitable_agents = self._find_suitable_agents(task)
if not suitable_agents:
logger.warning(f"No suitable agents for task: {task_id}")
return None
# 选择负载最低的智能体
selected_agent = self._select_least_loaded_agent(suitable_agents)
# 分配任务
task.assigned_to = selected_agent
task.status = "assigned"
self.agents[selected_agent]['current_workload'] += task.estimated_duration
self.agent_workload[selected_agent] = self.agents[selected_agent]['current_workload']
logger.info(f"Task {task_id} assigned to {selected_agent}")
return selected_agent
def _find_suitable_agents(self, task: Task) -> List[str]:
"""查找合适的智能体"""
suitable_agents = []
for agent_id, agent_info in self.agents.items():
# 检查能力匹配
capabilities_match = all(
req in agent_info['capabilities']
for req in task.requirements
)
# 检查负载
has_capacity = (
agent_info['current_workload'] + task.estimated_duration
<= agent_info['max_workload']
)
if capabilities_match and has_capacity:
suitable_agents.append(agent_id)
return suitable_agents
def _select_least_loaded_agent(self, agents: List[str]) -> str:
"""选择负载最低的智能体"""
return min(agents, key=lambda aid: self.agent_workload[aid])
def complete_task(self, task_id: str, result: Any):
"""完成任务"""
task = self.tasks.get(task_id)
if not task:
logger.warning(f"Task not found: {task_id}")
return
# 更新任务状态
task.status = "completed"
task.completed_at = datetime.utcnow()
task.result = result
# 更新智能体负载
if task.assigned_to:
self.agents[task.assigned_to]['current_workload'] -= task.estimated_duration
self.agent_workload[task.assigned_to] = self.agents[task.assigned_to]['current_workload']
logger.info(f"Task {task_id} completed by {task.assigned_to}")
def get_agent_tasks(self, agent_id: str) -> List[Task]:
"""获取智能体的任务"""
return [
task for task in self.tasks.values()
if task.assigned_to == agent_id and task.status != "completed"
]
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
total_tasks = len(self.tasks)
completed_tasks = len([t for t in self.tasks.values() if t.status == "completed"])
pending_tasks = len([t for t in self.tasks.values() if t.status == "pending"])
assigned_tasks = len([t for t in self.tasks.values() if t.status == "assigned"])
return {
'total_tasks': total_tasks,
'completed_tasks': completed_tasks,
'pending_tasks': pending_tasks,
'assigned_tasks': assigned_tasks,
'agent_workload': self.agent_workload.copy()
}
# 使用示例
async def main():
"""主函数"""
allocator = TaskAllocator()
# 注册智能体
allocator.register_agent("agent1", ["code_generation", "code_review"], max_workload=2.0)
allocator.register_agent("agent2", ["code_review", "testing"], max_workload=1.5)
allocator.register_agent("agent3", ["code_generation", "testing"], max_workload=2.0)
# 添加任务
tasks = [
Task("task1", "Generate User Module", "Generate user authentication module", ["code_generation"], 1.0),
Task("task2", "Review User Module", "Review user authentication module", ["code_review"], 0.5),
Task("task3", "Generate Product Module", "Generate product management module", ["code_generation"], 1.0),
Task("task4", "Test User Module", "Test user authentication module", ["testing"], 0.5),
Task("task5", "Review Product Module", "Review product management module", ["code_review"], 0.5),
Task("task6", "Test Product Module", "Test product management module", ["testing"], 0.5),
]
for task in tasks:
allocator.add_task(task)
# 分配任务
for task in tasks:
allocator.allocate_task(task.id)
# 获取统计信息
stats = allocator.get_statistics()
print(f"Allocation statistics: {stats}")
# 获取智能体任务
for agent_id in ["agent1", "agent2", "agent3"]:
agent_tasks = allocator.get_agent_tasks(agent_id)
print(f"\nAgent {agent_id} tasks:")
for task in agent_tasks:
print(f" - {task.name} ({task.estimated_duration}h)")
if __name__ == '__main__':
asyncio.run(main())
```> **任務分配特點**:
> - 基於能力匹配
> - 考慮負載均衡
> - 動態調整分配
> - 任務狀態跟蹤
## 总结
多智能体协作包括:
1. **多智能体协作的基本概念**: 什么是多智能体协作、协作模式
2. **智能体通信机制**: 消息传递、共享知识库
3. **协作任务执行**: 任务分配
通过多智能体协作,Claude Code可以实现更复杂的任务,提高整体效率和效果。
至此,第27章"Agentic AI 核心技术"全部完成。接下来我们将创建第28章"Claude Code 架构解析"。