Skip to content

27.5 多智慧體協作

多智慧體協作概述

多智慧體協作是指多個AI智慧體協同工作,共同完成複雜任務的能力。透過協作,智慧體可以共享資訊、分工合作、互相補充,從而提高整體效率和效果。

多智能体协作的基本概念

1. 什么是多智能体协作

多智慧體協作是指多個AI智慧體透過通訊、協調和合作,共同完成單個智慧體難以完成的複雜任務。

多智能体协作的特点 :

  • 分散式決策 : 每個智慧體獨立決策
  • 資訊共享 : 智慧體之間共享資訊
  • 任務分工 : 智慧體分工合作
  • 協同最佳化 : 整體最佳化而非區域性最佳化

2. 協作模式

協作模式特點適用場景

主從模式| 一個主智慧體協調多個從智慧體| 層次化任務 平等模式| 智慧體地位平等,共同決策| 協作任務 競爭模式| 智慧體競爭完成任務| 最佳化問題 混合模式| 結合多種模式| 複雜場景

智能体通信机制

1. 消息传递

示例:消息传递机制

使用者請求: "實現智慧體之間的訊息傳遞機制"

Claude Code 生成的程式碼:

python
    python


    ````python

    > - 异步消息传递
    > - 支持点对点和广播
    > - 消息类型分类
    > - 请求-响应模式
    `python

    from typing import Dict, List, Any, Optional
    from datetime import datetime
    import logging
    import json

    logger = logging.getLogger(__name__)

    class SharedKnowledgeBase:
    """共享知識庫"""

    def __init__(self):
    self.knowledge: Dict[str, Any] = {}
    self.access_log: List[Dict[str, Any]] = []
    self.version: int = 0

    def add_knowledge(self, key: str, value: Any, agent_id: str) -> bool:
    """新增知識"""
    if key in self.knowledge:
    logger.warning(f"Knowledge key already exists: {key}")
    return False

    self.knowledge[key] = {
    'value': value,
    'agent_id': agent_id,
    'created_at': datetime.utcnow(),
    'version': self.version
    }

    self.version += 1
    self._log_access(agent_id, 'add', key)

    logger.info(f"Knowledge added: {key} by {agent_id}")
    return True

    def update_knowledge(self, key: str, value: Any, agent_id: str) -> bool:
    """更新知識"""
    if key not in self.knowledge:
    logger.warning(f"Knowledge key not found: {key}")
    return False

    old_value = self.knowledge[key]['value']
    self.knowledge[key]['value'] = value
    self.knowledge[key]['updated_at'] = datetime.utcnow()
    self.knowledge[key]['updated_by'] = agent_id
    self.knowledge[key]['version'] = self.version
    self.knowledge[key]['old_value'] = old_value

    self.version += 1
    self._log_access(agent_id, 'update', key)

    logger.info(f"Knowledge updated: {key} by {agent_id}")
    return True

    def get_knowledge(self, key: str, agent_id: str) -> Optional[Any]:
    """獲取知識"""
    if key not in self.knowledge:
    logger.warning(f"Knowledge key not found: {key}")
    return None

    self._log_access(agent_id, 'read', key)

    return self.knowledge[key]['value']

    def delete_knowledge(self, key: str, agent_id: str) -> bool:
    """刪除知識"""
    if key not in self.knowledge:
    logger.warning(f"Knowledge key not found: {key}")
    return False

    del self.knowledge[key]
    self._log_access(agent_id, 'delete', key)

    logger.info(f"Knowledge deleted: {key} by {agent_id}")
    return True

    def search_knowledge(self, query: str, agent_id: str) -> List[Dict[str, Any]]:
    """搜尋知識"""
    results = []
    query_lower = query.lower()

    for key, knowledge in self.knowledge.items():
    if query_lower in key.lower():
    results.append({
    'key': key,
    'value': knowledge['value'],
    'agent_id': knowledge['agent_id'],
    'version': knowledge['version']
    })

    self._log_access(agent_id, 'search', query)

    logger.info(f"Knowledge search: {query} by {agent_id}, found {len(results)} results")
    return results

    def get_all_knowledge(self, agent_id: str) -> Dict[str, Any]:
    """獲取所有知識"""
    self._log_access(agent_id, 'read_all', 'all')

    return {key: knowledge['value'] for key, knowledge in self.knowledge.items()}

    def get_knowledge_by_agent(self, agent_id: str) -> Dict[str, Any]:
    """獲取智慧體的知識"""
    agent_knowledge = {}

    for key, knowledge in self.knowledge.items():
    if knowledge['agent_id'] == agent_id:
    agent_knowledge[key] = knowledge['value']

    return agent_knowledge

    def merge_knowledge(self, other_knowledge: Dict[str, Any], agent_id: str) -> int:
    """合併知識"""
    merged_count = 0

    for key, value in other_knowledge.items():
    if key not in self.knowledge:
    self.add_knowledge(key, value, agent_id)
    merged_count += 1

    logger.info(f"Knowledge merged: {merged_count} items by {agent_id}")
    return merged_count

    def _log_access(self, agent_id: str, action: str, key: str):
    """記錄訪問日誌"""
    self.access_log.append({
    'agent_id': agent_id,
    'action': action,
    'key': key,
    'timestamp': datetime.utcnow()
    })

    def get_access_log(self, agent_id: Optional[str] = None) -> List[Dict[str, Any]]:
    """獲取訪問日誌"""
    if agent_id:
    return [log for log in self.access_log if log['agent_id'] == agent_id]
    return self.access_log

    def get_statistics(self) -> Dict[str, Any]:
    """獲取統計資訊"""
    agent_counts = {}

    for knowledge in self.knowledge.values():
    agent_id = knowledge['agent_id']
    agent_counts[agent_id] = agent_counts.get(agent_id, 0) + 1

    return {
    'total_knowledge': len(self.knowledge),
    'version': self.version,
    'agent_counts': agent_counts,
    'total_accesses': len(self.access_log)
    }

    class KnowledgeSharingAgent(Agent):
    """知識共享智慧體"""

    def __init__(self, agent_id: str, message_bus: MessageBus, knowledge_base: SharedKnowledgeBase):
    super().__init__(agent_id, message_bus)
    self.knowledge_base = knowledge_base

    async def share_knowledge(self, key: str, value: Any):
    """分享知識"""
    success = self.knowledge_base.add_knowledge(key, value, self.id)

    if success:

     # 廣播知識更新

    await self.broadcast({
    'type': 'knowledge_update',
    'key': key,
    'agent_id': self.id
    })

    async def request_knowledge(self, key: str, target_agent_id: str) -> Optional[Any]:
    """請求知識"""

     # 傳送知識請求

    await self.send_request(target_agent_id, {
    'type': 'knowledge_request',
    'key': key
    })

     # 等待響應

    await asyncio.sleep(0.5)

     # 從知識庫獲取

    return self.knowledge_base.get_knowledge(key, self.id)

    async def sync_knowledge(self):
    """同步知識"""

     # 獲取其他智慧體的知識

    other_agents = [aid for aid in self.message_bus.agents.keys() if aid != self.id]

    for agent_id in other_agents:
    await self.send_request(agent_id, {
    'type': 'knowledge_sync_request'
    })

    # 使用示例

    async def main():
    """主函式"""

     # 建立訊息匯流排

    message_bus = MessageBus()
    await message_bus.start()

     # 建立共享知識庫

    knowledge_base = SharedKnowledgeBase()

     # 建立智慧體

    agent1 = KnowledgeSharingAgent("agent1", message_bus, knowledge_base)
    agent2 = KnowledgeSharingAgent("agent2", message_bus, knowledge_base)
    agent3 = KnowledgeSharingAgent("agent3", message_bus, knowledge_base)

     # 註冊智慧體

    message_bus.register_agent(agent1)
    message_bus.register_agent(agent2)
    message_bus.register_agent(agent3)

     # 分享知識

    await agent1.share_knowledge("python_syntax", "Python uses indentation for code blocks")
    await agent2.share_knowledge("javascript_syntax", "JavaScript uses curly braces for code blocks")
    await agent3.share_knowledge("java_syntax", "Java uses curly braces and semicolons")

     # 請求知識

    python_syntax = await agent2.request_knowledge("python_syntax", "agent1")
    print(f"Python syntax: {python_syntax}")

     # 獲取統計資訊

    stats = knowledge_base.get_statistics()
    print(f"Knowledge base statistics: {stats}")

     # 停止訊息匯流排

    await message_bus.stop()

    if __name__ == '__main__':
    asyncio.run(main())

    ```> **共享知识库特点**:
    > - 集中式知识存储
    > - 访问控制和日志
    > - 知识搜索和检索
    > - 智能体间知识同步

    ```

    ## 協作任務執行

    ### 1. 任務分配

    # 示例:任務分配

    使用者請求:
    "實現智慧體之間的任務分配機制"
    Claude Code 生成的程式碼:

    ````python
    `python

    from typing import Dict, List, Any, Optional
    from datetime import datetime
    import logging
    import asyncio

    logger = logging.getLogger(__name__)

    class Task:
    """任务"""

    def __init__(self, task_id: str, name: str, description: str, requirements: List[str], estimated_duration: float):
    self.id = task_id
    self.name = name
    self.description = description
    self.requirements = requirements
    self.estimated_duration = estimated_duration
    self.assigned_to: Optional[str] = None
    self.status = "pending"
    self.created_at = datetime.utcnow()
    self.started_at: Optional[datetime] = None
    self.completed_at: Optional[datetime] = None
    self.result: Optional[Any] = None

    class TaskAllocator:
    """任务分配器"""

    def __init__(self):
    self.tasks: Dict[str, Task] = {}
    self.agents: Dict[str, Dict[str, Any]] = {}
    self.agent_workload: Dict[str, float] = {}

    def register_agent(self, agent_id: str, capabilities: List[str], max_workload: float = 1.0):
    """注册智能体"""
    self.agents[agent_id] = {
    'capabilities': capabilities,
    'max_workload': max_workload,
    'current_workload': 0.0
    }
    self.agent_workload[agent_id] = 0.0

    logger.info(f"Agent registered: {agent_id} with capabilities {capabilities}")

    def add_task(self, task: Task):
    """添加任务"""
    self.tasks[task.id] = task
    logger.info(f"Task added: {task.id}")

    def allocate_task(self, task_id: str) -> Optional[str]:
    """分配任务"""
    task = self.tasks.get(task_id)
    if not task:
    logger.warning(f"Task not found: {task_id}")
    return None

     # 查找合适的智能体
    suitable_agents = self._find_suitable_agents(task)

    if not suitable_agents:
    logger.warning(f"No suitable agents for task: {task_id}")
    return None

     # 选择负载最低的智能体
    selected_agent = self._select_least_loaded_agent(suitable_agents)

     # 分配任务
    task.assigned_to = selected_agent
    task.status = "assigned"
    self.agents[selected_agent]['current_workload'] += task.estimated_duration
    self.agent_workload[selected_agent] = self.agents[selected_agent]['current_workload']

    logger.info(f"Task {task_id} assigned to {selected_agent}")
    return selected_agent

    def _find_suitable_agents(self, task: Task) -> List[str]:
    """查找合适的智能体"""
    suitable_agents = []

    for agent_id, agent_info in self.agents.items():
     # 检查能力匹配
    capabilities_match = all(
    req in agent_info['capabilities']
    for req in task.requirements
    )

     # 检查负载
    has_capacity = (
    agent_info['current_workload'] + task.estimated_duration
    <= agent_info['max_workload']
    )

    if capabilities_match and has_capacity:
    suitable_agents.append(agent_id)

    return suitable_agents

    def _select_least_loaded_agent(self, agents: List[str]) -> str:
    """选择负载最低的智能体"""
    return min(agents, key=lambda aid: self.agent_workload[aid])

    def complete_task(self, task_id: str, result: Any):
    """完成任务"""
    task = self.tasks.get(task_id)
    if not task:
    logger.warning(f"Task not found: {task_id}")
    return

     # 更新任务状态
    task.status = "completed"
    task.completed_at = datetime.utcnow()
    task.result = result

     # 更新智能体负载
    if task.assigned_to:
    self.agents[task.assigned_to]['current_workload'] -= task.estimated_duration
    self.agent_workload[task.assigned_to] = self.agents[task.assigned_to]['current_workload']

    logger.info(f"Task {task_id} completed by {task.assigned_to}")

    def get_agent_tasks(self, agent_id: str) -> List[Task]:
    """获取智能体的任务"""
    return [
    task for task in self.tasks.values()
    if task.assigned_to == agent_id and task.status != "completed"
    ]

    def get_statistics(self) -> Dict[str, Any]:
    """获取统计信息"""
    total_tasks = len(self.tasks)
    completed_tasks = len([t for t in self.tasks.values() if t.status == "completed"])
    pending_tasks = len([t for t in self.tasks.values() if t.status == "pending"])
    assigned_tasks = len([t for t in self.tasks.values() if t.status == "assigned"])

    return {
    'total_tasks': total_tasks,
    'completed_tasks': completed_tasks,
    'pending_tasks': pending_tasks,
    'assigned_tasks': assigned_tasks,
    'agent_workload': self.agent_workload.copy()
    }

    # 使用示例
    async def main():
    """主函数"""
    allocator = TaskAllocator()

     # 注册智能体
    allocator.register_agent("agent1", ["code_generation", "code_review"], max_workload=2.0)
    allocator.register_agent("agent2", ["code_review", "testing"], max_workload=1.5)
    allocator.register_agent("agent3", ["code_generation", "testing"], max_workload=2.0)

     # 添加任务
    tasks = [
    Task("task1", "Generate User Module", "Generate user authentication module", ["code_generation"], 1.0),
    Task("task2", "Review User Module", "Review user authentication module", ["code_review"], 0.5),
    Task("task3", "Generate Product Module", "Generate product management module", ["code_generation"], 1.0),
    Task("task4", "Test User Module", "Test user authentication module", ["testing"], 0.5),
    Task("task5", "Review Product Module", "Review product management module", ["code_review"], 0.5),
    Task("task6", "Test Product Module", "Test product management module", ["testing"], 0.5),
    ]

    for task in tasks:
    allocator.add_task(task)

     # 分配任务
    for task in tasks:
    allocator.allocate_task(task.id)

     # 获取统计信息
    stats = allocator.get_statistics()
    print(f"Allocation statistics: {stats}")

     # 获取智能体任务
    for agent_id in ["agent1", "agent2", "agent3"]:
    agent_tasks = allocator.get_agent_tasks(agent_id)
    print(f"\nAgent {agent_id} tasks:")
    for task in agent_tasks:
    print(f"  - {task.name} ({task.estimated_duration}h)")

    if __name__ == '__main__':
    asyncio.run(main())

    ```> **任務分配特點**:

    > - 基於能力匹配
    > - 考慮負載均衡
    > - 動態調整分配
    > - 任務狀態跟蹤

    ## 总结

    多智能体协作包括:

    1. **多智能体协作的基本概念**: 什么是多智能体协作、协作模式
    2. **智能体通信机制**: 消息传递、共享知识库
    3. **协作任务执行**: 任务分配

    通过多智能体协作,Claude Code可以实现更复杂的任务,提高整体效率和效果。

    至此,第27章"Agentic AI 核心技术"全部完成。接下来我们将创建第28章"Claude Code 架构解析"

基于 MIT 许可发布 | 永久导航