Skip to content

27.3 自主规划算法

自主规划算法概述

自主规划算法是Agentic AI系统的核心能力之一,它允许AI系统根据目标自主分解任务、制定执行计划、监控执行进度,并根据实际情况调整计划。

规划算法的基本概念

1. 什么是自主规划

自主规划是指AI系统在没有人类干预的情况下,根据给定的目标,自动生成和执行任务计划的过程。

自主规划的特点 :

  • 目标驱动 : 以目标为导向进行规划
  • 任务分解 : 将复杂目标分解为可执行的子任务
  • 动态调整 : 根据执行情况动态调整计划
  • 资源管理 : 合理分配和利用资源

2. 规划算法分类

算法类型特点适用场景

前向搜索| 从初始状态到目标状态| 目标明确,状态空间小 后向搜索| 从目标状态到初始状态| 目标状态明确,初始状态复杂 双向搜索| 同时从两端搜索| 状态空间大,两端明确 层次规划| 分层规划,逐步细化| 复杂任务,多层级目标 部分规划| 规划部分路径,边执行边规划| 动态环境,不确定性高

任务分解算法

1. 层次任务分解

示例:层次任务分解

用户请求: "实现一个层次任务分解算法"

Claude Code 生成的代码:

python
    python


    ````python

```python
    from typing import Dict, List, Any, Optional
    from dataclasses import dataclass, field
    from enum import Enum
    import logging

    logger = logging.getLogger(__name__)

    class TaskStatus(Enum):
    """任务状态"""
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

    class TaskPriority(Enum):
    """任务优先级"""
    CRITICAL = 1
    HIGH = 2
    MEDIUM = 3
    LOW = 4

    @dataclass
    class Task:
    """任务"""
    id: str
    name: str
    description: str
    status: TaskStatus = TaskStatus.PENDING
    priority: TaskPriority = TaskPriority.MEDIUM
    dependencies: List[str] = field(default_factory=list)
    subtasks: List['Task'] = field(default_factory=list)
    estimated_duration: Optional[float] = None
    actual_duration: Optional[float] = None
    result: Optional[Any] = None
    error: Optional[str] = None
    metadata: Dict[str, Any] = field(default_factory=dict)

    class HierarchicalTaskPlanner:
    """层次任务规划器"""

    def __init__(self):
    self.tasks: Dict[str, Task] = {}
    self.task_graph: Dict[str, List[str]] = {}

    def add_task(self, task: Task):
    """添加任务"""
    self.tasks[task.id] = task
    self.task_graph[task.id] = task.dependencies
    logger.info(f"Task added: {task.id}")

    def decompose_task(self, task_id: str, max_depth: int = 3) -> List[Task]:
    """分解任务"""
    task = self.tasks.get(task_id)
    if not task:
    raise ValueError(f"Task not found: {task_id}")

    if max_depth <= 0:
    return [task]

     # 生成子任务
    subtasks = self._generate_subtasks(task)

     # 递归分解子任务
    all_tasks = [task]
    for subtask in subtasks:
    self.add_task(subtask)
    task.subtasks.append(subtask)
    all_tasks.extend(self.decompose_task(subtask.id, max_depth - 1))

     # 更新任务图
    self.task_graph[task.id] = [subtask.id for subtask in subtasks]

    return all_tasks

    def _generate_subtasks(self, task: Task) -> List[Task]:
    """生成子任务"""
    subtasks = []

     # 根据任务类型生成子任务
    task_type = task.metadata.get('type', 'default')

    if task_type == 'code_generation':
    subtasks = self._generate_code_generation_subtasks(task)
    elif task_type == 'code_review':
    subtasks = self._generate_code_review_subtasks(task)
    elif task_type == 'testing':
    subtasks = self._generate_testing_subtasks(task)
    elif task_type == 'deployment':
    subtasks = self._generate_deployment_subtasks(task)
    else:
    subtasks = self._generate_default_subtasks(task)

    return subtasks

    def _generate_code_generation_subtasks(self, task: Task) -> List[Task]:
    """生成代码生成子任务"""
    subtasks = [
    Task(
    id=f"{task.id}_analyze_requirements",
    name="Analyze Requirements",
    description="Analyze the requirements for code generation",
    priority=TaskPriority.HIGH,
    metadata={'type': 'analysis'}
    ),
    Task(
    id=f"{task.id}_design_architecture",
    name="Design Architecture",
    description="Design the system architecture",
    priority=TaskPriority.HIGH,
    dependencies=[f"{task.id}_analyze_requirements"],
    metadata={'type': 'design'}
    ),
    Task(
    id=f"{task.id}_generate_code",
    name="Generate Code",
    description="Generate the code",
    priority=TaskPriority.HIGH,
    dependencies=[f"{task.id}_design_architecture"],
    metadata={'type': 'code_generation'}
    ),
    Task(
    id=f"{task.id}_review_code",
    name="Review Code",
    description="Review the generated code",
    priority=TaskPriority.MEDIUM,
    dependencies=[f"{task.id}_generate_code"],
    metadata={'type': 'code_review'}
    )
    ]

    return subtasks

    def _generate_code_review_subtasks(self, task: Task) -> List[Task]:
    """生成代码审查子任务"""
    subtasks = [
    Task(
    id=f"{task.id}_static_analysis",
    name="Static Analysis",
    description="Perform static code analysis",
    priority=TaskPriority.HIGH,
    metadata={'type': 'analysis'}
    ),
    Task(
    id=f"{task.id}_security_check",
    name="Security Check",
    description="Check for security vulnerabilities",
    priority=TaskPriority.HIGH,
    dependencies=[f"{task.id}_static_analysis"],
    metadata={'type': 'security'}
    ),
    Task(
    id=f"{task.id}_performance_check",
    name="Performance Check",
    description="Check for performance issues",
    priority=TaskPriority.MEDIUM,
    dependencies=[f"{task.id}_static_analysis"],
    metadata={'type': 'performance'}
    ),
    Task(
    id=f"{task.id}_generate_report",
    name="Generate Report",
    description="Generate review report",
    priority=TaskPriority.MEDIUM,
    dependencies=[
    f"{task.id}_security_check",
    f"{task.id}_performance_check"
    ],
    metadata={'type': 'reporting'}
    )
    ]

    return subtasks

    def _generate_testing_subtasks(self, task: Task) -> List[Task]:
    """生成测试子任务"""
    subtasks = [
    Task(
    id=f"{task.id}_unit_tests",
    name="Unit Tests",
    description="Write and run unit tests",
    priority=TaskPriority.HIGH,
    metadata={'type': 'testing'}
    ),
    Task(
    id=f"{task.id}_integration_tests",
    name="Integration Tests",
    description="Write and run integration tests",
    priority=TaskPriority.HIGH,
    dependencies=[f"{task.id}_unit_tests"],
    metadata={'type': 'testing'}
    ),
    Task(
    id=f"{task.id}_e2e_tests",
    name="E2E Tests",
    description="Write and run end-to-end tests",
    priority=TaskPriority.MEDIUM,
    dependencies=[f"{task.id}_integration_tests"],
    metadata={'type': 'testing'}
    )
    ]

    return subtasks

    def _generate_deployment_subtasks(self, task: Task) -> List[Task]:
    """生成部署子任务"""
    subtasks = [
    Task(
    id=f"{task.id}_build",
    name="Build",
    description="Build the application",
    priority=TaskPriority.HIGH,
    metadata={'type': 'build'}
    ),
    Task(
    id=f"{task.id}_test",
    name="Test",
    description="Run tests",
    priority=TaskPriority.HIGH,
    dependencies=[f"{task.id}_build"],
    metadata={'type': 'testing'}
    ),
    Task(
    id=f"{task.id}_deploy",
    name="Deploy",
    description="Deploy to production",
    priority=TaskPriority.HIGH,
    dependencies=[f"{task.id}_test"],
    metadata={'type': 'deployment'}
    ),
    Task(
    id=f"{task.id}_verify",
    name="Verify",
    description="Verify deployment",
    priority=TaskPriority.HIGH,
    dependencies=[f"{task.id}_deploy"],
    metadata={'type': 'verification'}
    )
    ]

    return subtasks

    def _generate_default_subtasks(self, task: Task) -> List[Task]:
    """生成默认子任务"""
    subtasks = [
    Task(
    id=f"{task.id}_prepare",
    name="Prepare",
    description="Prepare for task execution",
    priority=TaskPriority.HIGH,
    metadata={'type': 'preparation'}
    ),
    Task(
    id=f"{task.id}_execute",
    name="Execute",
    description="Execute the task",
    priority=TaskPriority.HIGH,
    dependencies=[f"{task.id}_prepare"],
    metadata={'type': 'execution'}
    ),
    Task(
    id=f"{task.id}_finalize",
    name="Finalize",
    description="Finalize the task",
    priority=TaskPriority.MEDIUM,
    dependencies=[f"{task.id}_execute"],
    metadata={'type': 'finalization'}
    )
    ]

    return subtasks

    def get_execution_plan(self, task_id: str) -> List[Task]:
    """获取执行计划"""
    plan = []
    visited = set()

    def dfs(current_task_id):
    if current_task_id in visited:
    return
    visited.add(current_task_id)

     # 先执行依赖任务
    for dep_id in self.task_graph.get(current_task_id, []):
    dfs(dep_id)

     # 添加当前任务
    task = self.tasks.get(current_task_id)
    if task:
    plan.append(task)

    dfs(task_id)

    return plan

    def visualize_plan(self, task_id: str) -> str:
    """可视化执行计划"""
    plan = self.get_execution_plan(task_id)

    visualization = "Execution Plan:\n"
    for i, task in enumerate(plan, 1):
    status_icon = {
    TaskStatus.PENDING: "○",
    TaskStatus.IN_PROGRESS: "◐",
    TaskStatus.COMPLETED: "●",
    TaskStatus.FAILED: "✗",
    TaskStatus.CANCELLED: "⊘"
    }.get(task.status, "?")

    visualization += f"{i}. {status_icon} {task.name}\n"

    if task.subtasks:
    for j, subtask in enumerate(task.subtasks, 1):
    subtask_icon = {
    TaskStatus.PENDING: "○",
    TaskStatus.IN_PROGRESS: "◐",
    TaskStatus.COMPLETED: "●",
    TaskStatus.FAILED: "✗",
    TaskStatus.CANCELLED: "⊘"
    }.get(subtask.status, "?")

    visualization += f"   {j}. {subtask_icon} {subtask.name}\n"

    return visualization

    # 使用示例
    planner = HierarchicalTaskPlanner()

    # 添加主任务
    main_task = Task(
    id="main",
    name="Develop User Authentication System",
    description="Develop a complete user authentication system",
    priority=TaskPriority.HIGH,
    metadata={'type': 'code_generation'}
    )

    planner.add_task(main_task)

    # 分解任务
    planner.decompose_task("main", max_depth=2)

    # 获取执行计划
    plan = planner.get_execution_plan("main")

    print("Execution Plan:")
    for task in plan:
    print(f"  - {task.name}")

    # 可视化计划
    print("\n" + planner.visualize_plan("main"))

    ```> **层次任务分解特点**:

    > - 递归分解复杂任务
    > - 自动生成子任务
    > - 维护任务依赖关系
    > - 生成执行计划

    ```
    ### 2. 动态任务调整

    # 示例:动态任务调整
    用户请求:
    "实现动态任务调整算法"
    Claude Code 生成的代码:

    ````python

    `python

    from typing import Dict, List, Any, Optional
    from datetime import datetime, timedelta
    import logging

    logger = logging.getLogger(__name__)

    class DynamicTaskPlanner(HierarchicalTaskPlanner):
    """动态任务规划器"""

    def __init__(self):
    super().__init__()
    self.execution_history: Dict[str, List[Dict[str, Any]]] = {}
    self.performance_metrics: Dict[str, Dict[str, float]] = {}

    def monitor_execution(self, task_id: str, status: TaskStatus, result: Optional[Any] = None, error: Optional[str] = None):
    """监控任务执行"""
    task = self.tasks.get(task_id)
    if not task:
    return

     # 更新任务状态

    task.status = status
    task.result = result
    task.error = error

     # 记录执行历史

    if task_id not in self.execution_history:
    self.execution_history[task_id] = []

    self.execution_history[task_id].append({
    'timestamp': datetime.utcnow(),
    'status': status,
    'result': result,
    'error': error
    })

    logger.info(f"Task {task_id} status updated to {status}")

    def analyze_performance(self, task_id: str) -> Dict[str, float]:
    """分析任务性能"""
    if task_id not in self.execution_history:
    return {}

    history = self.execution_history[task_id]

     # 计算性能指标

    metrics = {
    'total_executions': len(history),
    'success_rate': 0.0,
    'avg_duration': 0.0,
    'failure_rate': 0.0
    }

    successful = [h for h in history if h['status'] == TaskStatus.COMPLETED]
    failed = [h for h in history if h['status'] == TaskStatus.FAILED]

    if len(history) > 0:
    metrics['success_rate'] = len(successful) / len(history)
    metrics['failure_rate'] = len(failed) / len(history)

    self.performance_metrics[task_id] = metrics

    return metrics

    def adjust_plan(self, task_id: str) -> List[Task]:
    """调整执行计划"""
    task = self.tasks.get(task_id)
    if not task:
    return []

     # 分析性能

    metrics = self.analyze_performance(task_id)

     # 根据性能调整计划

    if metrics.get('failure_rate', 0) > 0.5:

     # 失败率高,添加重试任务

    return self._add_retry_tasks(task)
    elif metrics.get('avg_duration', 0) > 3600:

     # 执行时间长,优化任务

    return self._optimize_tasks(task)
    else:

     # 正常情况,返回原计划

    return self.get_execution_plan(task_id)

    def _add_retry_tasks(self, task: Task) -> List[Task]:
    """添加重试任务"""
    retry_task = Task(
    id=f"{task.id}_retry",
    name=f"Retry {task.name}",
    description=f"Retry failed task: {task.description}",
    priority=TaskPriority.HIGH,
    dependencies=[task.id],
    metadata={'type': 'retry', 'original_task_id': task.id}
    )

    self.add_task(retry_task)

     # 更新任务图

    self.task_graph[task.id].append(retry_task.id)

    return self.get_execution_plan(task.id)

    def _optimize_tasks(self, task: Task) -> List[Task]:
    """优化任务"""

     # 分解长时间运行的任务

    if task.subtasks:

     # 已经有子任务,优化子任务

    for subtask in task.subtasks:
    if subtask.estimated_duration and subtask.estimated_duration > 1800:
    self.decompose_task(subtask.id, max_depth=1)
    else:

     # 分解当前任务

    self.decompose_task(task.id, max_depth=2)

    return self.get_execution_plan(task.id)

    def predict_completion_time(self, task_id: str) -> Optional[float]:
    """预测完成时间"""
    plan = self.get_execution_plan(task_id)

    total_time = 0.0
    for task in plan:
    if task.estimated_duration:
    total_time += task.estimated_duration
    elif task.id in self.performance_metrics:

     # 使用历史平均时间

    avg_duration = self.performance_metrics[task.id].get('avg_duration', 0)
    total_time += avg_duration

    return total_time if total_time > 0 else None

    # 使用示例

    planner = DynamicTaskPlanner()

    # 添加任务

    task = Task(
    id="task1",
    name="Process Large Dataset",
    description="Process a large dataset",
    priority=TaskPriority.HIGH,
    estimated_duration=7200,
    metadata={'type': 'data_processing'}
    )

    planner.add_task(task)

    # 分解任务

    planner.decompose_task("task1", max_depth=2)

    # 模拟执行

    planner.monitor_execution("task1", TaskStatus.IN_PROGRESS)

    # 模拟失败

    planner.monitor_execution("task1", TaskStatus.FAILED, error="Timeout")

    # 分析性能

    metrics = planner.analyze_performance("task1")
    print(f"Performance metrics: {metrics}")

    # 调整计划

    adjusted_plan = planner.adjust_plan("task1")

    print("\nAdjusted plan:")
    for task in adjusted_plan:
    print(f"  - {task.name}")

    ```> **动态任务调整特点**:
    > - 实时监控任务执行
    > - 分析任务性能
    > - 根据性能调整计划
    > - 预测完成时间

    ```

    ## 规划算法优化

    ### 1. 并行任务调度

    # 示例:并行任务调度

    用户请求:
    "实现并行任务调度算法"
    Claude Code 生成的代码:

    ````python
    `python

    from typing import Dict, List, Set, Optional
    from collections import defaultdict, deque
    import logging

    logger = logging.getLogger(__name__)

    class ParallelTaskScheduler:
    """并行任务调度器"""

    def __init__(self, max_workers: int = 4):
    self.max_workers = max_workers
    self.tasks: Dict[str, Task] = {}
    self.dependencies: Dict[str, Set[str]] = defaultdict(set)
    self.dependents: Dict[str, Set[str]] = defaultdict(set)
    self.ready_tasks: Set[str] = set()
    self.running_tasks: Set[str] = set()
    self.completed_tasks: Set[str] = set()

    def add_task(self, task: Task):
    """添加任务"""
    self.tasks[task.id] = task

     # 添加依赖关系
    for dep_id in task.dependencies:
    self.dependencies[task.id].add(dep_id)
    self.dependents[dep_id].add(task.id)

     # 如果没有依赖,标记为就绪
    if not task.dependencies:
    self.ready_tasks.add(task.id)

    logger.info(f"Task added: {task.id}")

    def get_ready_tasks(self) -> List[Task]:
    """获取就绪任务"""
    available = self.ready_tasks - self.running_tasks
    return [self.tasks[task_id] for task_id in available]

    def start_task(self, task_id: str):
    """开始任务"""
    if task_id in self.ready_tasks and task_id not in self.running_tasks:
    self.running_tasks.add(task_id)
    self.ready_tasks.remove(task_id)
    logger.info(f"Task started: {task_id}")

    def complete_task(self, task_id: str, result: Optional[Any] = None, error: Optional[str] = None):
    """完成任务"""
    if task_id not in self.running_tasks:
    logger.warning(f"Task not running: {task_id}")
    return

     # 更新任务状态
    task = self.tasks[task_id]
    task.status = TaskStatus.COMPLETED if not error else TaskStatus.FAILED
    task.result = result
    task.error = error

     # 更新集合
    self.running_tasks.remove(task_id)
    self.completed_tasks.add(task_id)

     # 检查依赖此任务的任务
    for dependent_id in self.dependents[task_id]:
    self.dependencies[dependent_id].remove(task_id)

     # 如果所有依赖都完成,标记为就绪
    if not self.dependencies[dependent_id]:
    self.ready_tasks.add(dependent_id)

    logger.info(f"Task completed: {task_id}")

    def get_schedule(self) -> List[List[str]]:
    """获取调度计划"""
    schedule = []
    remaining = set(self.tasks.keys())

    while remaining:
     # 获取当前可以执行的任务
    current_batch = []
    for task_id in remaining:
    task = self.tasks[task_id]
    if all(dep in self.completed_tasks for dep in task.dependencies):
    current_batch.append(task_id)

     # 限制并发数
    current_batch = current_batch[:self.max_workers]

    if not current_batch:
    logger.warning("Circular dependency detected")
    break

    schedule.append(current_batch)

     # 更新剩余任务
    remaining -= set(current_batch)
    self.completed_tasks.update(current_batch)

    return schedule

    def visualize_schedule(self) -> str:
    """可视化调度"""
    schedule = self.get_schedule()

    visualization = "Task Schedule:\n"
    for i, batch in enumerate(schedule, 1):
    visualization += f"Batch {i} (parallel):\n"
    for task_id in batch:
    task = self.tasks[task_id]
    visualization += f"  - {task.name}\n"

    return visualization

    # 使用示例
    scheduler = ParallelTaskScheduler(max_workers=3)

    # 添加任务
    tasks = [
    Task(id="A", name="Task A", description="Task A", priority=TaskPriority.HIGH),
    Task(id="B", name="Task B", description="Task B", priority=TaskPriority.HIGH, dependencies=["A"]),
    Task(id="C", name="Task C", description="Task C", priority=TaskPriority.HIGH, dependencies=["A"]),
    Task(id="D", name="Task D", description="Task D", priority=TaskPriority.HIGH, dependencies=["B", "C"]),
    Task(id="E", name="Task E", description="Task E", priority=TaskPriority.HIGH, dependencies=["B"]),
    Task(id="F", name="Task F", description="Task F", priority=TaskPriority.HIGH, dependencies=["C"]),
    Task(id="G", name="Task G", description="Task G", priority=TaskPriority.HIGH, dependencies=["D", "E", "F"])
    ]

    for task in tasks:
    scheduler.add_task(task)

    # 获取调度计划
    schedule = scheduler.get_schedule()

    print("Schedule:")
    for i, batch in enumerate(schedule, 1):
    print(f"Batch {i}: {[scheduler.tasks[task_id].name for task_id in batch]}")

    # 可视化调度
    print("\n" + scheduler.visualize_schedule())

    ```> **并行任务调度特点**:

    > - 识别可并行执行的任务
    > - 限制并发任务数量
    > - 处理任务依赖关系
    > - 生成调度计划

    ## 总结

    自主规划算法包括:

    1. **规划算法的基本概念**: 什么是自主规划、规划算法分类
    2. **任务分解算法**: 层次任务分解、动态任务调整
    3. **规划算法优化**: 并行任务调度

    通过自主规划算法,Claude Code可以自主分解任务、制定执行计划,并根据实际情况动态调整。

    在下一节中,我们将探讨记忆系统设计。

基于 MIT 许可发布 | 永久导航