Skip to content

27.3 自主規劃演算法

自主規劃演算法概述

自主規劃演算法是Agentic AI系統的核心能力之一,它允許AI系統根據目標自主分解任務、制定執行計劃、監控執行進度,並根據實際情況調整計劃。

规划算法的基本概念

1. 什么是自主规划

自主規劃是指AI系統在沒有人類干預的情況下,根據給定的目標,自動生成和執行任務計劃的過程。

自主规划的特点 :

  • 目標驅動 : 以目標為導向進行規劃
  • 任務分解 : 將複雜目標分解為可執行的子任務
  • 動態調整 : 根據執行情況動態調整計劃
  • 資源管理 : 合理分配和利用資源

2. 規劃演算法分類

演算法型別特點適用場景

前向搜尋| 從初始狀態到目標狀態| 目標明確,狀態空間小 後向搜尋| 從目標狀態到初始狀態| 目標狀態明確,初始狀態複雜 雙向搜尋| 同時從兩端搜尋| 狀態空間大,兩端明確 層次規劃| 分層規劃,逐步細化| 複雜任務,多層級目標 部分規劃| 規劃部分路徑,邊執行邊規劃| 動態環境,不確定性高

任务分解算法

1. 层次任务分解

示例:层次任务分解

使用者請求: "實現一個層次任務分解演算法"

Claude Code 生成的程式碼:

python
    python


    ````python

    > - 递归分解复杂任务
    > - 自动生成子任务
    > - 维护任务依赖关系
    > - 生成执行计划
    `python

    from typing import Dict, List, Any, Optional
    from datetime import datetime, timedelta
    import logging

    logger = logging.getLogger(__name__)

    class DynamicTaskPlanner(HierarchicalTaskPlanner):
    """動態任務規劃器"""

    def __init__(self):
    super().__init__()
    self.execution_history: Dict[str, List[Dict[str, Any]]] = {}
    self.performance_metrics: Dict[str, Dict[str, float]] = {}

    def monitor_execution(self, task_id: str, status: TaskStatus, result: Optional[Any] = None, error: Optional[str] = None):
    """監控任務執行"""
    task = self.tasks.get(task_id)
    if not task:
    return

     # 更新任務狀態

    task.status = status
    task.result = result
    task.error = error

     # 記錄執行歷史

    if task_id not in self.execution_history:
    self.execution_history[task_id] = []

    self.execution_history[task_id].append({
    'timestamp': datetime.utcnow(),
    'status': status,
    'result': result,
    'error': error
    })

    logger.info(f"Task {task_id} status updated to {status}")

    def analyze_performance(self, task_id: str) -> Dict[str, float]:
    """分析任務效能"""
    if task_id not in self.execution_history:
    return {}

    history = self.execution_history[task_id]

     # 計算效能指標

    metrics = {
    'total_executions': len(history),
    'success_rate': 0.0,
    'avg_duration': 0.0,
    'failure_rate': 0.0
    }

    successful = [h for h in history if h['status'] == TaskStatus.COMPLETED]
    failed = [h for h in history if h['status'] == TaskStatus.FAILED]

    if len(history) > 0:
    metrics['success_rate'] = len(successful) / len(history)
    metrics['failure_rate'] = len(failed) / len(history)

    self.performance_metrics[task_id] = metrics

    return metrics

    def adjust_plan(self, task_id: str) -> List[Task]:
    """調整執行計劃"""
    task = self.tasks.get(task_id)
    if not task:
    return []

     # 分析效能

    metrics = self.analyze_performance(task_id)

     # 根據效能調整計劃

    if metrics.get('failure_rate', 0) > 0.5:

     # 失敗率高,新增重試任務

    return self._add_retry_tasks(task)
    elif metrics.get('avg_duration', 0) > 3600:

     # 執行時間長,最佳化任務

    return self._optimize_tasks(task)
    else:

     # 正常情況,返回原計劃

    return self.get_execution_plan(task_id)

    def _add_retry_tasks(self, task: Task) -> List[Task]:
    """新增重試任務"""
    retry_task = Task(
    id=f"{task.id}_retry",
    name=f"Retry {task.name}",
    description=f"Retry failed task: {task.description}",
    priority=TaskPriority.HIGH,
    dependencies=[task.id],
    metadata={'type': 'retry', 'original_task_id': task.id}
    )

    self.add_task(retry_task)

     # 更新任務圖

    self.task_graph[task.id].append(retry_task.id)

    return self.get_execution_plan(task.id)

    def _optimize_tasks(self, task: Task) -> List[Task]:
    """最佳化任務"""

     # 分解長時間執行的任務

    if task.subtasks:

     # 已經有子任務,最佳化子任務

    for subtask in task.subtasks:
    if subtask.estimated_duration and subtask.estimated_duration > 1800:
    self.decompose_task(subtask.id, max_depth=1)
    else:

     # 分解當前任務

    self.decompose_task(task.id, max_depth=2)

    return self.get_execution_plan(task.id)

    def predict_completion_time(self, task_id: str) -> Optional[float]:
    """預測完成時間"""
    plan = self.get_execution_plan(task_id)

    total_time = 0.0
    for task in plan:
    if task.estimated_duration:
    total_time += task.estimated_duration
    elif task.id in self.performance_metrics:

     # 使用歷史平均時間

    avg_duration = self.performance_metrics[task.id].get('avg_duration', 0)
    total_time += avg_duration

    return total_time if total_time > 0 else None

    # 使用示例

    planner = DynamicTaskPlanner()

    # 新增任務

    task = Task(
    id="task1",
    name="Process Large Dataset",
    description="Process a large dataset",
    priority=TaskPriority.HIGH,
    estimated_duration=7200,
    metadata={'type': 'data_processing'}
    )

    planner.add_task(task)

    # 分解任務

    planner.decompose_task("task1", max_depth=2)

    # 模擬執行

    planner.monitor_execution("task1", TaskStatus.IN_PROGRESS)

    # 模擬失敗

    planner.monitor_execution("task1", TaskStatus.FAILED, error="Timeout")

    # 分析效能

    metrics = planner.analyze_performance("task1")
    print(f"Performance metrics: {metrics}")

    # 調整計劃

    adjusted_plan = planner.adjust_plan("task1")

    print("\nAdjusted plan:")
    for task in adjusted_plan:
    print(f"  - {task.name}")

    ```> **动态任务调整特点**:
    > - 实时监控任务执行
    > - 分析任务性能
    > - 根据性能调整计划
    > - 预测完成时间

    ```

    ## 規劃演算法最佳化

    ### 1. 並行任務排程

    # 示例:並行任務排程

    使用者請求:
    "實現並行任務排程演算法"
    Claude Code 生成的程式碼:

    ````python
    `python

    from typing import Dict, List, Set, Optional
    from collections import defaultdict, deque
    import logging

    logger = logging.getLogger(__name__)

    class ParallelTaskScheduler:
    """并行任务调度器"""

    def __init__(self, max_workers: int = 4):
    self.max_workers = max_workers
    self.tasks: Dict[str, Task] = {}
    self.dependencies: Dict[str, Set[str]] = defaultdict(set)
    self.dependents: Dict[str, Set[str]] = defaultdict(set)
    self.ready_tasks: Set[str] = set()
    self.running_tasks: Set[str] = set()
    self.completed_tasks: Set[str] = set()

    def add_task(self, task: Task):
    """添加任务"""
    self.tasks[task.id] = task

     # 添加依赖关系
    for dep_id in task.dependencies:
    self.dependencies[task.id].add(dep_id)
    self.dependents[dep_id].add(task.id)

     # 如果没有依赖,标记为就绪
    if not task.dependencies:
    self.ready_tasks.add(task.id)

    logger.info(f"Task added: {task.id}")

    def get_ready_tasks(self) -> List[Task]:
    """获取就绪任务"""
    available = self.ready_tasks - self.running_tasks
    return [self.tasks[task_id] for task_id in available]

    def start_task(self, task_id: str):
    """开始任务"""
    if task_id in self.ready_tasks and task_id not in self.running_tasks:
    self.running_tasks.add(task_id)
    self.ready_tasks.remove(task_id)
    logger.info(f"Task started: {task_id}")

    def complete_task(self, task_id: str, result: Optional[Any] = None, error: Optional[str] = None):
    """完成任务"""
    if task_id not in self.running_tasks:
    logger.warning(f"Task not running: {task_id}")
    return

     # 更新任务状态
    task = self.tasks[task_id]
    task.status = TaskStatus.COMPLETED if not error else TaskStatus.FAILED
    task.result = result
    task.error = error

     # 更新集合
    self.running_tasks.remove(task_id)
    self.completed_tasks.add(task_id)

     # 检查依赖此任务的任务
    for dependent_id in self.dependents[task_id]:
    self.dependencies[dependent_id].remove(task_id)

     # 如果所有依赖都完成,标记为就绪
    if not self.dependencies[dependent_id]:
    self.ready_tasks.add(dependent_id)

    logger.info(f"Task completed: {task_id}")

    def get_schedule(self) -> List[List[str]]:
    """获取调度计划"""
    schedule = []
    remaining = set(self.tasks.keys())

    while remaining:
     # 获取当前可以执行的任务
    current_batch = []
    for task_id in remaining:
    task = self.tasks[task_id]
    if all(dep in self.completed_tasks for dep in task.dependencies):
    current_batch.append(task_id)

     # 限制并发数
    current_batch = current_batch[:self.max_workers]

    if not current_batch:
    logger.warning("Circular dependency detected")
    break

    schedule.append(current_batch)

     # 更新剩余任务
    remaining -= set(current_batch)
    self.completed_tasks.update(current_batch)

    return schedule

    def visualize_schedule(self) -> str:
    """可视化调度"""
    schedule = self.get_schedule()

    visualization = "Task Schedule:\n"
    for i, batch in enumerate(schedule, 1):
    visualization += f"Batch {i} (parallel):\n"
    for task_id in batch:
    task = self.tasks[task_id]
    visualization += f"  - {task.name}\n"

    return visualization

    # 使用示例
    scheduler = ParallelTaskScheduler(max_workers=3)

    # 添加任务
    tasks = [
    Task(id="A", name="Task A", description="Task A", priority=TaskPriority.HIGH),
    Task(id="B", name="Task B", description="Task B", priority=TaskPriority.HIGH, dependencies=["A"]),
    Task(id="C", name="Task C", description="Task C", priority=TaskPriority.HIGH, dependencies=["A"]),
    Task(id="D", name="Task D", description="Task D", priority=TaskPriority.HIGH, dependencies=["B", "C"]),
    Task(id="E", name="Task E", description="Task E", priority=TaskPriority.HIGH, dependencies=["B"]),
    Task(id="F", name="Task F", description="Task F", priority=TaskPriority.HIGH, dependencies=["C"]),
    Task(id="G", name="Task G", description="Task G", priority=TaskPriority.HIGH, dependencies=["D", "E", "F"])
    ]

    for task in tasks:
    scheduler.add_task(task)

    # 获取调度计划
    schedule = scheduler.get_schedule()

    print("Schedule:")
    for i, batch in enumerate(schedule, 1):
    print(f"Batch {i}: {[scheduler.tasks[task_id].name for task_id in batch]}")

    # 可视化调度
    print("\n" + scheduler.visualize_schedule())

    ```> **並行任務排程特點**:

    > - 識別可並行執行的任務
    > - 限制併發任務數量
    > - 處理任務依賴關係
    > - 生成排程計劃

    ## 总结

    自主规划算法包括:

    1. **规划算法的基本概念**: 什么是自主规划、规划算法分类
    2. **任务分解算法**: 层次任务分解、动态任务调整
    3. **规划算法优化**: 并行任务调度

    通过自主规划算法,Claude Code可以自主分解任务、制定执行计划,并根据实际情况动态调整。

    在下一节中,我们将探讨记忆系统设计。

基于 MIT 许可发布 | 永久导航