27.3 自主規劃演算法
自主規劃演算法概述
自主規劃演算法是Agentic AI系統的核心能力之一,它允許AI系統根據目標自主分解任務、制定執行計劃、監控執行進度,並根據實際情況調整計劃。
规划算法的基本概念
1. 什么是自主规划
自主規劃是指AI系統在沒有人類干預的情況下,根據給定的目標,自動生成和執行任務計劃的過程。
自主规划的特点 :
- 目標驅動 : 以目標為導向進行規劃
- 任務分解 : 將複雜目標分解為可執行的子任務
- 動態調整 : 根據執行情況動態調整計劃
- 資源管理 : 合理分配和利用資源
2. 規劃演算法分類
| 演算法型別 | 特點 | 適用場景 |
|---|
前向搜尋| 從初始狀態到目標狀態| 目標明確,狀態空間小 後向搜尋| 從目標狀態到初始狀態| 目標狀態明確,初始狀態複雜 雙向搜尋| 同時從兩端搜尋| 狀態空間大,兩端明確 層次規劃| 分層規劃,逐步細化| 複雜任務,多層級目標 部分規劃| 規劃部分路徑,邊執行邊規劃| 動態環境,不確定性高
任务分解算法
1. 层次任务分解
示例:层次任务分解
使用者請求: "實現一個層次任務分解演算法"
Claude Code 生成的程式碼:
python
python
````python
> - 递归分解复杂任务
> - 自动生成子任务
> - 维护任务依赖关系
> - 生成执行计划
`python
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class DynamicTaskPlanner(HierarchicalTaskPlanner):
"""動態任務規劃器"""
def __init__(self):
super().__init__()
self.execution_history: Dict[str, List[Dict[str, Any]]] = {}
self.performance_metrics: Dict[str, Dict[str, float]] = {}
def monitor_execution(self, task_id: str, status: TaskStatus, result: Optional[Any] = None, error: Optional[str] = None):
"""監控任務執行"""
task = self.tasks.get(task_id)
if not task:
return
# 更新任務狀態
task.status = status
task.result = result
task.error = error
# 記錄執行歷史
if task_id not in self.execution_history:
self.execution_history[task_id] = []
self.execution_history[task_id].append({
'timestamp': datetime.utcnow(),
'status': status,
'result': result,
'error': error
})
logger.info(f"Task {task_id} status updated to {status}")
def analyze_performance(self, task_id: str) -> Dict[str, float]:
"""分析任務效能"""
if task_id not in self.execution_history:
return {}
history = self.execution_history[task_id]
# 計算效能指標
metrics = {
'total_executions': len(history),
'success_rate': 0.0,
'avg_duration': 0.0,
'failure_rate': 0.0
}
successful = [h for h in history if h['status'] == TaskStatus.COMPLETED]
failed = [h for h in history if h['status'] == TaskStatus.FAILED]
if len(history) > 0:
metrics['success_rate'] = len(successful) / len(history)
metrics['failure_rate'] = len(failed) / len(history)
self.performance_metrics[task_id] = metrics
return metrics
def adjust_plan(self, task_id: str) -> List[Task]:
"""調整執行計劃"""
task = self.tasks.get(task_id)
if not task:
return []
# 分析效能
metrics = self.analyze_performance(task_id)
# 根據效能調整計劃
if metrics.get('failure_rate', 0) > 0.5:
# 失敗率高,新增重試任務
return self._add_retry_tasks(task)
elif metrics.get('avg_duration', 0) > 3600:
# 執行時間長,最佳化任務
return self._optimize_tasks(task)
else:
# 正常情況,返回原計劃
return self.get_execution_plan(task_id)
def _add_retry_tasks(self, task: Task) -> List[Task]:
"""新增重試任務"""
retry_task = Task(
id=f"{task.id}_retry",
name=f"Retry {task.name}",
description=f"Retry failed task: {task.description}",
priority=TaskPriority.HIGH,
dependencies=[task.id],
metadata={'type': 'retry', 'original_task_id': task.id}
)
self.add_task(retry_task)
# 更新任務圖
self.task_graph[task.id].append(retry_task.id)
return self.get_execution_plan(task.id)
def _optimize_tasks(self, task: Task) -> List[Task]:
"""最佳化任務"""
# 分解長時間執行的任務
if task.subtasks:
# 已經有子任務,最佳化子任務
for subtask in task.subtasks:
if subtask.estimated_duration and subtask.estimated_duration > 1800:
self.decompose_task(subtask.id, max_depth=1)
else:
# 分解當前任務
self.decompose_task(task.id, max_depth=2)
return self.get_execution_plan(task.id)
def predict_completion_time(self, task_id: str) -> Optional[float]:
"""預測完成時間"""
plan = self.get_execution_plan(task_id)
total_time = 0.0
for task in plan:
if task.estimated_duration:
total_time += task.estimated_duration
elif task.id in self.performance_metrics:
# 使用歷史平均時間
avg_duration = self.performance_metrics[task.id].get('avg_duration', 0)
total_time += avg_duration
return total_time if total_time > 0 else None
# 使用示例
planner = DynamicTaskPlanner()
# 新增任務
task = Task(
id="task1",
name="Process Large Dataset",
description="Process a large dataset",
priority=TaskPriority.HIGH,
estimated_duration=7200,
metadata={'type': 'data_processing'}
)
planner.add_task(task)
# 分解任務
planner.decompose_task("task1", max_depth=2)
# 模擬執行
planner.monitor_execution("task1", TaskStatus.IN_PROGRESS)
# 模擬失敗
planner.monitor_execution("task1", TaskStatus.FAILED, error="Timeout")
# 分析效能
metrics = planner.analyze_performance("task1")
print(f"Performance metrics: {metrics}")
# 調整計劃
adjusted_plan = planner.adjust_plan("task1")
print("\nAdjusted plan:")
for task in adjusted_plan:
print(f" - {task.name}")
```> **动态任务调整特点**:
> - 实时监控任务执行
> - 分析任务性能
> - 根据性能调整计划
> - 预测完成时间
```
## 規劃演算法最佳化
### 1. 並行任務排程
# 示例:並行任務排程
使用者請求:
"實現並行任務排程演算法"
Claude Code 生成的程式碼:
````python
`python
from typing import Dict, List, Set, Optional
from collections import defaultdict, deque
import logging
logger = logging.getLogger(__name__)
class ParallelTaskScheduler:
"""并行任务调度器"""
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
self.tasks: Dict[str, Task] = {}
self.dependencies: Dict[str, Set[str]] = defaultdict(set)
self.dependents: Dict[str, Set[str]] = defaultdict(set)
self.ready_tasks: Set[str] = set()
self.running_tasks: Set[str] = set()
self.completed_tasks: Set[str] = set()
def add_task(self, task: Task):
"""添加任务"""
self.tasks[task.id] = task
# 添加依赖关系
for dep_id in task.dependencies:
self.dependencies[task.id].add(dep_id)
self.dependents[dep_id].add(task.id)
# 如果没有依赖,标记为就绪
if not task.dependencies:
self.ready_tasks.add(task.id)
logger.info(f"Task added: {task.id}")
def get_ready_tasks(self) -> List[Task]:
"""获取就绪任务"""
available = self.ready_tasks - self.running_tasks
return [self.tasks[task_id] for task_id in available]
def start_task(self, task_id: str):
"""开始任务"""
if task_id in self.ready_tasks and task_id not in self.running_tasks:
self.running_tasks.add(task_id)
self.ready_tasks.remove(task_id)
logger.info(f"Task started: {task_id}")
def complete_task(self, task_id: str, result: Optional[Any] = None, error: Optional[str] = None):
"""完成任务"""
if task_id not in self.running_tasks:
logger.warning(f"Task not running: {task_id}")
return
# 更新任务状态
task = self.tasks[task_id]
task.status = TaskStatus.COMPLETED if not error else TaskStatus.FAILED
task.result = result
task.error = error
# 更新集合
self.running_tasks.remove(task_id)
self.completed_tasks.add(task_id)
# 检查依赖此任务的任务
for dependent_id in self.dependents[task_id]:
self.dependencies[dependent_id].remove(task_id)
# 如果所有依赖都完成,标记为就绪
if not self.dependencies[dependent_id]:
self.ready_tasks.add(dependent_id)
logger.info(f"Task completed: {task_id}")
def get_schedule(self) -> List[List[str]]:
"""获取调度计划"""
schedule = []
remaining = set(self.tasks.keys())
while remaining:
# 获取当前可以执行的任务
current_batch = []
for task_id in remaining:
task = self.tasks[task_id]
if all(dep in self.completed_tasks for dep in task.dependencies):
current_batch.append(task_id)
# 限制并发数
current_batch = current_batch[:self.max_workers]
if not current_batch:
logger.warning("Circular dependency detected")
break
schedule.append(current_batch)
# 更新剩余任务
remaining -= set(current_batch)
self.completed_tasks.update(current_batch)
return schedule
def visualize_schedule(self) -> str:
"""可视化调度"""
schedule = self.get_schedule()
visualization = "Task Schedule:\n"
for i, batch in enumerate(schedule, 1):
visualization += f"Batch {i} (parallel):\n"
for task_id in batch:
task = self.tasks[task_id]
visualization += f" - {task.name}\n"
return visualization
# 使用示例
scheduler = ParallelTaskScheduler(max_workers=3)
# 添加任务
tasks = [
Task(id="A", name="Task A", description="Task A", priority=TaskPriority.HIGH),
Task(id="B", name="Task B", description="Task B", priority=TaskPriority.HIGH, dependencies=["A"]),
Task(id="C", name="Task C", description="Task C", priority=TaskPriority.HIGH, dependencies=["A"]),
Task(id="D", name="Task D", description="Task D", priority=TaskPriority.HIGH, dependencies=["B", "C"]),
Task(id="E", name="Task E", description="Task E", priority=TaskPriority.HIGH, dependencies=["B"]),
Task(id="F", name="Task F", description="Task F", priority=TaskPriority.HIGH, dependencies=["C"]),
Task(id="G", name="Task G", description="Task G", priority=TaskPriority.HIGH, dependencies=["D", "E", "F"])
]
for task in tasks:
scheduler.add_task(task)
# 获取调度计划
schedule = scheduler.get_schedule()
print("Schedule:")
for i, batch in enumerate(schedule, 1):
print(f"Batch {i}: {[scheduler.tasks[task_id].name for task_id in batch]}")
# 可视化调度
print("\n" + scheduler.visualize_schedule())
```> **並行任務排程特點**:
> - 識別可並行執行的任務
> - 限制併發任務數量
> - 處理任務依賴關係
> - 生成排程計劃
## 总结
自主规划算法包括:
1. **规划算法的基本概念**: 什么是自主规划、规划算法分类
2. **任务分解算法**: 层次任务分解、动态任务调整
3. **规划算法优化**: 并行任务调度
通过自主规划算法,Claude Code可以自主分解任务、制定执行计划,并根据实际情况动态调整。
在下一节中,我们将探讨记忆系统设计。