28.2 核心组件详解
核心组件概述
Claude Code 的核心组件是其功能实现的基础,包括意图识别、任务规划、工具调度等关键模块。深入理解这些组件对于掌握Claude Code的工作原理至关重要。
意图识别器
1. 意图识别原理
示例:意图识别器
用户请求: "实现一个意图识别器"
Claude Code 生成的代码:
python
python
````python
```python
from typing import Dict, List, Any, Optional
from datetime import datetime
import logging
import re
logger = logging.getLogger(__name__)
class Intent:
"""意图"""
def __init__(self, name: str, description: str, confidence: float = 0.0):
self.name = name
self.description = description
self.confidence = confidence
self.entities: Dict[str, Any] = {}
self.metadata: Dict[str, Any] = {}
class IntentRecognizer:
"""意图识别器"""
def __init__(self):
self.intents: Dict[str, Dict[str, Any]] = {}
self.patterns: Dict[str, List[str]] = {}
self.models: Dict[str, Any] = {}
def register_intent(self, name: str, description: str, patterns: List[str] = None):
"""注册意图"""
self.intents[name] = {
'name': name,
'description': description,
'patterns': patterns or []
}
if patterns:
self.patterns[name] = patterns
logger.info(f"Intent registered: {name}")
def recognize(self, text: str) -> Intent:
"""识别意图"""
scores = {}
# 基于模式匹配
for intent_name, patterns in self.patterns.items():
score = self._match_patterns(text, patterns)
scores[intent_name] = score
# 基于关键词匹配
for intent_name, intent_info in self.intents.items():
score = self._match_keywords(text, intent_info)
scores[intent_name] = max(scores.get(intent_name, 0), score)
# 基于模型预测
if self.models:
model_scores = self._predict_with_model(text)
for intent_name, score in model_scores.items():
scores[intent_name] = max(scores.get(intent_name, 0), score)
# 选择最高分的意图
if not scores:
return Intent("unknown", "Unknown intent", 0.0)
best_intent_name = max(scores, key=scores.get)
confidence = scores[best_intent_name]
# 提取实体
entities = self._extract_entities(text, best_intent_name)
intent = Intent(
name=best_intent_name,
description=self.intents[best_intent_name]['description'],
confidence=confidence
)
intent.entities = entities
return intent
def _match_patterns(self, text: str, patterns: List[str]) -> float:
"""匹配模式"""
max_score = 0.0
for pattern in patterns:
try:
if re.search(pattern, text, re.IGNORECASE):
max_score = max(max_score, 0.9)
except re.error:
continue
return max_score
def _match_keywords(self, text: str, intent_info: Dict[str, Any]) -> float:
"""匹配关键词"""
keywords = intent_info.get('keywords', [])
if not keywords:
return 0.0
text_lower = text.lower()
matched = sum(1 for keyword in keywords if keyword.lower() in text_lower)
return matched / len(keywords) if keywords else 0.0
def _predict_with_model(self, text: str) -> Dict[str, float]:
"""使用模型预测"""
# 这里可以使用机器学习模型进行预测
# 简化版本:返回空字典
return {}
def _extract_entities(self, text: str, intent_name: str) -> Dict[str, Any]:
"""提取实体"""
entities = {}
# 根据意图类型提取实体
if intent_name == 'read_file':
file_match = re.search(r'file\s*["\']?([^"\']+)["\']?', text, re.IGNORECASE)
if file_match:
entities['file_path'] = file_match.group(1)
elif intent_name == 'write_file':
file_match = re.search(r'file\s*["\']?([^"\']+)["\']?', text, re.IGNORECASE)
content_match = re.search(r'content\s*["\']?([^"\']+)["\']?', text, re.IGNORECASE)
if file_match:
entities['file_path'] = file_match.group(1)
if content_match:
entities['content'] = content_match.group(1)
elif intent_name == 'execute_code':
code_match = re.search(r'code\s*["\']?([^"\']+)["\']?', text, re.IGNORECASE)
language_match = re.search(r'language\s*["\']?([^"\']+)["\']?', text, re.IGNORECASE)
if code_match:
entities['code'] = code_match.group(1)
if language_match:
entities['language'] = language_match.group(1)
return entities
# 使用示例
recognizer = IntentRecognizer()
# 注册意图
recognizer.register_intent(
name='read_file',
description='Read a file',
patterns=[r'read\s+file', r'open\s+file', r'cat\s+file'],
keywords=['read', 'file', 'open', 'cat']
)
recognizer.register_intent(
name='write_file',
description='Write to a file',
patterns=[r'write\s+file', r'save\s+file', r'create\s+file'],
keywords=['write', 'file', 'save', 'create']
)
recognizer.register_intent(
name='execute_code',
description='Execute code',
patterns=[r'execute\s+code', r'run\s+code', r'eval\s+code'],
keywords=['execute', 'code', 'run', 'eval']
)
# 识别意图
text = "Please read the file config.txt"
intent = recognizer.recognize(text)
print(f"Intent: {intent.name}")
print(f"Confidence: {intent.confidence}")
print(f"Entities: {intent.entities}")
```> **意图识别器特点**:
> - 支持多种识别方法
> - 模式匹配
> - 关键词匹配
> - 实体提取
```
## 任务规划器
### 1. 任务规划原理
# 示例:任务规划器
用户请求:
"实现一个任务规划器"
Claude Code 生成的代码:
````python
`python
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class Task:
"""任务"""
def __init__(self, task_id: str, name: str, description: str, action: str):
self.id = task_id
self.name = name
self.description = description
self.action = action
self.status = "pending"
self.dependencies: List[str] = []
self.created_at = datetime.utcnow()
self.started_at: Optional[datetime] = None
self.completed_at: Optional[datetime] = None
self.result: Optional[Any] = None
self.error: Optional[str] = None
class TaskPlanner:
"""任务规划器"""
def __init__(self):
self.tasks: Dict[str, Task] = {}
self.task_graph: Dict[str, List[str]] = {}
self.execution_plan: List[str] = []
def add_task(self, task: Task):
"""添加任务"""
self.tasks[task.id] = task
self.task_graph[task.id] = task.dependencies
logger.info(f"Task added: {task.id}")
def create_plan(self, goal: str, context: Dict[str, Any]) -> List[Task]:
"""创建执行计划"""
# 根据目标和上下文生成任务
tasks = self._generate_tasks(goal, context)
# 添加任务
for task in tasks:
self.add_task(task)
# 生成执行顺序
self.execution_plan = self._topological_sort()
# 返回按顺序排列的任务
return [self.tasks[task_id] for task_id in self.execution_plan]
def _generate_tasks(self, goal: str, context: Dict[str, Any]) -> List[Task]:
"""生成任务"""
tasks = []
# 根据目标类型生成任务
if 'read' in goal.lower() and 'file' in goal.lower():
tasks = self._generate_file_reading_tasks(goal, context)
elif 'write' in goal.lower() and 'file' in goal.lower():
tasks = self._generate_file_writing_tasks(goal, context)
elif 'execute' in goal.lower() and 'code' in goal.lower():
tasks = self._generate_code_execution_tasks(goal, context)
else:
tasks = self._generate_default_tasks(goal, context)
return tasks
def _generate_file_reading_tasks(self, goal: str, context: Dict[str, Any]) -> List[Task]:
"""生成文件读取任务"""
file_path = context.get('file_path', 'default.txt')
tasks = [
Task(
task_id="validate_file_path",
name="Validate File Path",
description=f"Validate file path: {file_path}",
action="validate_file_path"
),
Task(
task_id="read_file",
name="Read File",
description=f"Read file: {file_path}",
action="read_file",
dependencies=["validate_file_path"]
),
Task(
task_id="parse_content",
name="Parse Content",
description="Parse file content",
action="parse_content",
dependencies=["read_file"]
)
]
return tasks
def _generate_file_writing_tasks(self, goal: str, context: Dict[str, Any]) -> List[Task]:
"""生成文件写入任务"""
file_path = context.get('file_path', 'output.txt')
content = context.get('content', '')
tasks = [
Task(
task_id="validate_file_path",
name="Validate File Path",
description=f"Validate file path: {file_path}",
action="validate_file_path"
),
Task(
task_id="prepare_content",
name="Prepare Content",
description="Prepare content for writing",
action="prepare_content",
dependencies=["validate_file_path"]
),
Task(
task_id="write_file",
name="Write File",
description=f"Write content to file: {file_path}",
action="write_file",
dependencies=["prepare_content"]
),
Task(
task_id="verify_write",
name="Verify Write",
description="Verify file was written successfully",
action="verify_write",
dependencies=["write_file"]
)
]
return tasks
def _generate_code_execution_tasks(self, goal: str, context: Dict[str, Any]) -> List[Task]:
"""生成代码执行任务"""
code = context.get('code', '')
language = context.get('language', 'python')
tasks = [
Task(
task_id="validate_code",
name="Validate Code",
description="Validate code syntax",
action="validate_code"
),
Task(
task_id="setup_environment",
name="Setup Environment",
description=f"Setup {language} execution environment",
action="setup_environment",
dependencies=["validate_code"]
),
Task(
task_id="execute_code",
name="Execute Code",
description="Execute the code",
action="execute_code",
dependencies=["setup_environment"]
),
Task(
task_id="capture_output",
name="Capture Output",
description="Capture execution output",
action="capture_output",
dependencies=["execute_code"]
)
]
return tasks
def _generate_default_tasks(self, goal: str, context: Dict[str, Any]) -> List[Task]:
"""生成默认任务"""
tasks = [
Task(
task_id="analyze_goal",
name="Analyze Goal",
description=f"Analyze goal: {goal}",
action="analyze_goal"
),
Task(
task_id="execute_goal",
name="Execute Goal",
description=f"Execute goal: {goal}",
action="execute_goal",
dependencies=["analyze_goal"]
)
]
return tasks
def _topological_sort(self) -> List[str]:
"""拓扑排序"""
visited = set()
result = []
def visit(task_id: str):
if task_id in visited:
return
visited.add(task_id)
# 先访问依赖
for dep_id in self.task_graph.get(task_id, []):
visit(dep_id)
result.append(task_id)
for task_id in self.tasks:
visit(task_id)
return result
def update_task_status(self, task_id: str, status: str, result: Any = None, error: str = None):
"""更新任务状态"""
task = self.tasks.get(task_id)
if not task:
logger.warning(f"Task not found: {task_id}")
return
task.status = status
if status == "in_progress":
task.started_at = datetime.utcnow()
elif status == "completed":
task.completed_at = datetime.utcnow()
task.result = result
elif status == "failed":
task.completed_at = datetime.utcnow()
task.error = error
logger.info(f"Task {task_id} status updated to {status}")
def get_plan_status(self) -> Dict[str, Any]:
"""获取计划状态"""
total_tasks = len(self.tasks)
completed_tasks = len([t for t in self.tasks.values() if t.status == "completed"])
failed_tasks = len([t for t in self.tasks.values() if t.status == "failed"])
in_progress_tasks = len([t for t in self.tasks.values() if t.status == "in_progress"])
pending_tasks = len([t for t in self.tasks.values() if t.status == "pending"])
return {
'total_tasks': total_tasks,
'completed_tasks': completed_tasks,
'failed_tasks': failed_tasks,
'in_progress_tasks': in_progress_tasks,
'pending_tasks': pending_tasks,
'progress': completed_tasks / total_tasks if total_tasks > 0 else 0
}
# 使用示例
planner = TaskPlanner()
# 创建计划
goal = "Read the file config.txt"
context = {'file_path': 'config.txt'}
plan = planner.create_plan(goal, context)
print("Execution plan:")
for task in plan:
print(f" - {task.name}: {task.description}")
# 更新任务状态
planner.update_task_status("validate_file_path", "completed")
planner.update_task_status("read_file", "in_progress")
# 获取计划状态
status = planner.get_plan_status()
print(f"\nPlan status: {status}")
```> **任务规划器特点**:
> - 根据目标生成任务
> - 处理任务依赖
> - 拓扑排序
> - 状态跟踪
```
## 工具调度器
### 1. 工具调度原理
# 示例:工具调度器
用户请求:
"实现一个工具调度器"
Claude Code 生成的代码:
````python
`python
from typing import Dict, List, Any, Optional
from datetime import datetime
import logging
import asyncio
logger = logging.getLogger(__name__)
class Tool:
"""工具"""
def __init__(self, tool_id: str, name: str, description: str, function: callable):
self.id = tool_id
self.name = name
self.description = description
self.function = function
self.status = "idle"
self.last_used: Optional[datetime] = None
self.usage_count = 0
class ToolScheduler:
"""工具调度器"""
def __init__(self, max_concurrent: int = 3):
self.tools: Dict[str, Tool] = {}
self.task_queue: asyncio.Queue = asyncio.Queue()
self.running_tasks: Dict[str, asyncio.Task] = {}
self.max_concurrent = max_concurrent
self.running = False
def register_tool(self, tool: Tool):
"""注册工具"""
self.tools[tool.id] = tool
logger.info(f"Tool registered: {tool.id}")
def schedule_task(self, tool_id: str, parameters: Dict[str, Any]) -> str:
"""调度任务"""
task_id = f"task_{datetime.utcnow().timestamp()}"
task = {
'task_id': task_id,
'tool_id': tool_id,
'parameters': parameters,
'status': 'pending',
'created_at': datetime.utcnow()
}
asyncio.create_task(self.task_queue.put(task))
logger.info(f"Task scheduled: {task_id}")
return task_id
async def start(self):
"""启动调度器"""
self.running = True
logger.info("Tool scheduler started")
# 启动工作线程
for i in range(self.max_concurrent):
asyncio.create_task(self._worker(f"worker-{i}"))
async def stop(self):
"""停止调度器"""
self.running = False
# 等待所有任务完成
await asyncio.gather(*self.running_tasks.values(), return_exceptions=True)
logger.info("Tool scheduler stopped")
async def _worker(self, worker_name: str):
"""工作线程"""
logger.info(f"{worker_name} started")
while self.running:
try:
# 获取任务
task = await asyncio.wait_for(
self.task_queue.get(),
timeout=1.0
)
tool_id = task['tool_id']
tool = self.tools.get(tool_id)
if not tool:
logger.warning(f"Tool not found: {tool_id}")
continue
# 执行任务
await self._execute_task(worker_name, task, tool)
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"{worker_name} error: {e}")
logger.info(f"{worker_name} stopped")
async def _execute_task(self, worker_name: str, task: Dict[str, Any], tool: Tool):
"""执行任务"""
task_id = task['task_id']
# 更新工具状态
tool.status = "busy"
try:
# 执行工具函数
result = await self._run_tool_function(tool, task['parameters'])
# 更新任务状态
task['status'] = 'completed'
task['result'] = result
task['completed_at'] = datetime.utcnow()
# 更新工具统计
tool.last_used = datetime.utcnow()
tool.usage_count += 1
logger.info(f"{worker_name} completed task {task_id}")
except Exception as e:
# 更新任务状态
task['status'] = 'failed'
task['error'] = str(e)
task['completed_at'] = datetime.utcnow()
logger.error(f"{worker_name} failed task {task_id}: {e}")
finally:
# 更新工具状态
tool.status = "idle"
async def _run_tool_function(self, tool: Tool, parameters: Dict[str, Any]) -> Any:
"""运行工具函数"""
# 检查函数是否是协程函数
if asyncio.iscoroutinefunction(tool.function):
return await tool.function(**parameters)
else:
# 在线程池中运行同步函数
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, tool.function, **parameters)
def get_tool_status(self, tool_id: str) -> Optional[Dict[str, Any]]:
"""获取工具状态"""
tool = self.tools.get(tool_id)
if not tool:
return None
return {
'id': tool.id,
'name': tool.name,
'status': tool.status,
'last_used': tool.last_used.isoformat() if tool.last_used else None,
'usage_count': tool.usage_count
}
def get_all_tool_status(self) -> Dict[str, Dict[str, Any]]:
"""获取所有工具状态"""
return {
tool_id: self.get_tool_status(tool_id)
for tool_id in self.tools
}
# 使用示例
async def main():
"""主函数"""
scheduler = ToolScheduler(max_concurrent=2)
# 注册工具
def read_file_tool(file_path: str) -> str:
"""读取文件工具"""
with open(file_path, 'r') as f:
return f.read()
def write_file_tool(file_path: str, content: str) -> str:
"""写入文件工具"""
with open(file_path, 'w') as f:
f.write(content)
return f"Written to {file_path}"
scheduler.register_tool(Tool("read_file", "Read File", "Read file content", read_file_tool))
scheduler.register_tool(Tool("write_file", "Write File", "Write content to file", write_file_tool))
# 启动调度器
await scheduler.start()
# 调度任务
task1 = scheduler.schedule_task("read_file", {'file_path': 'test.txt'})
task2 = scheduler.schedule_task("write_file", {'file_path': 'output.txt', 'content': 'Hello'})
task3 = scheduler.schedule_task("read_file", {'file_path': 'test.txt'})
# 等待任务完成
await asyncio.sleep(2)
# 获取工具状态
status = scheduler.get_all_tool_status()
print(f"Tool status: {status}")
# 停止调度器
await scheduler.stop()
if __name__ == '__main__':
asyncio.run(main())
```> **工具调度器特点**:
> - 异步任务调度
> - 并发控制
> - 工具状态管理
> - 使用统计
## 总结
核心组件详解包括:
1. **意图识别器**: 意图识别原理、实现方法
2. **任务规划器**: 任务规划原理、执行计划生成
3. **工具调度器**: 工具调度原理、异步执行
通过理解这些核心组件,可以更好地掌握Claude Code的工作原理。
在下一节中,我们将探讨数据流与工作流程。