Skip to content

28.2 核心元件詳解

核心元件概述

Claude Code 的核心元件是其功能實現的基礎,包括意圖識別、任務規劃、工具排程等關鍵模組。深入理解這些元件對於掌握Claude Code的工作原理至關重要。

意图识别器

1. 意图识别原理

示例:意图识别器

使用者請求: "實現一個意圖識別器"

Claude Code 生成的程式碼:

python
    python


    ````python

    > - 支持多种识别方法
    > - 模式匹配
    > - 关键词匹配
    > - 实体提取
    `python

    from typing import Dict, List, Any, Optional
    from datetime import datetime, timedelta
    import logging

    logger = logging.getLogger(__name__)

    class Task:
    """任務"""

    def __init__(self, task_id: str, name: str, description: str, action: str):
    self.id = task_id
    self.name = name
    self.description = description
    self.action = action
    self.status = "pending"
    self.dependencies: List[str] = []
    self.created_at = datetime.utcnow()
    self.started_at: Optional[datetime] = None
    self.completed_at: Optional[datetime] = None
    self.result: Optional[Any] = None
    self.error: Optional[str] = None

    class TaskPlanner:
    """任務規劃器"""

    def __init__(self):
    self.tasks: Dict[str, Task] = {}
    self.task_graph: Dict[str, List[str]] = {}
    self.execution_plan: List[str] = []

    def add_task(self, task: Task):
    """新增任務"""
    self.tasks[task.id] = task
    self.task_graph[task.id] = task.dependencies
    logger.info(f"Task added: {task.id}")

    def create_plan(self, goal: str, context: Dict[str, Any]) -> List[Task]:
    """建立執行計劃"""

     # 根據目標和上下文生成任務

    tasks = self._generate_tasks(goal, context)

     # 新增任務

    for task in tasks:
    self.add_task(task)

     # 生成執行順序

    self.execution_plan = self._topological_sort()

     # 返回按順序排列的任務

    return [self.tasks[task_id] for task_id in self.execution_plan]

    def _generate_tasks(self, goal: str, context: Dict[str, Any]) -> List[Task]:
    """生成任務"""
    tasks = []

     # 根據目標型別生成任務

    if 'read' in goal.lower() and 'file' in goal.lower():
    tasks = self._generate_file_reading_tasks(goal, context)
    elif 'write' in goal.lower() and 'file' in goal.lower():
    tasks = self._generate_file_writing_tasks(goal, context)
    elif 'execute' in goal.lower() and 'code' in goal.lower():
    tasks = self._generate_code_execution_tasks(goal, context)
    else:
    tasks = self._generate_default_tasks(goal, context)

    return tasks

    def _generate_file_reading_tasks(self, goal: str, context: Dict[str, Any]) -> List[Task]:
    """生成檔案讀取任務"""
    file_path = context.get('file_path', 'default.txt')

    tasks = [
    Task(
    task_id="validate_file_path",
    name="Validate File Path",
    description=f"Validate file path: {file_path}",
    action="validate_file_path"
    ),
    Task(
    task_id="read_file",
    name="Read File",
    description=f"Read file: {file_path}",
    action="read_file",
    dependencies=["validate_file_path"]
    ),
    Task(
    task_id="parse_content",
    name="Parse Content",
    description="Parse file content",
    action="parse_content",
    dependencies=["read_file"]
    )
    ]

    return tasks

    def _generate_file_writing_tasks(self, goal: str, context: Dict[str, Any]) -> List[Task]:
    """生成檔案寫入任務"""
    file_path = context.get('file_path', 'output.txt')
    content = context.get('content', '')

    tasks = [
    Task(
    task_id="validate_file_path",
    name="Validate File Path",
    description=f"Validate file path: {file_path}",
    action="validate_file_path"
    ),
    Task(
    task_id="prepare_content",
    name="Prepare Content",
    description="Prepare content for writing",
    action="prepare_content",
    dependencies=["validate_file_path"]
    ),
    Task(
    task_id="write_file",
    name="Write File",
    description=f"Write content to file: {file_path}",
    action="write_file",
    dependencies=["prepare_content"]
    ),
    Task(
    task_id="verify_write",
    name="Verify Write",
    description="Verify file was written successfully",
    action="verify_write",
    dependencies=["write_file"]
    )
    ]

    return tasks

    def _generate_code_execution_tasks(self, goal: str, context: Dict[str, Any]) -> List[Task]:
    """生成程式碼執行任務"""
    code = context.get('code', '')
    language = context.get('language', 'python')

    tasks = [
    Task(
    task_id="validate_code",
    name="Validate Code",
    description="Validate code syntax",
    action="validate_code"
    ),
    Task(
    task_id="setup_environment",
    name="Setup Environment",
    description=f"Setup {language} execution environment",
    action="setup_environment",
    dependencies=["validate_code"]
    ),
    Task(
    task_id="execute_code",
    name="Execute Code",
    description="Execute the code",
    action="execute_code",
    dependencies=["setup_environment"]
    ),
    Task(
    task_id="capture_output",
    name="Capture Output",
    description="Capture execution output",
    action="capture_output",
    dependencies=["execute_code"]
    )
    ]

    return tasks

    def _generate_default_tasks(self, goal: str, context: Dict[str, Any]) -> List[Task]:
    """生成預設任務"""
    tasks = [
    Task(
    task_id="analyze_goal",
    name="Analyze Goal",
    description=f"Analyze goal: {goal}",
    action="analyze_goal"
    ),
    Task(
    task_id="execute_goal",
    name="Execute Goal",
    description=f"Execute goal: {goal}",
    action="execute_goal",
    dependencies=["analyze_goal"]
    )
    ]

    return tasks

    def _topological_sort(self) -> List[str]:
    """拓撲排序"""
    visited = set()
    result = []

    def visit(task_id: str):
    if task_id in visited:
    return

    visited.add(task_id)

     # 先訪問依賴

    for dep_id in self.task_graph.get(task_id, []):
    visit(dep_id)

    result.append(task_id)

    for task_id in self.tasks:
    visit(task_id)

    return result

    def update_task_status(self, task_id: str, status: str, result: Any = None, error: str = None):
    """更新任務狀態"""
    task = self.tasks.get(task_id)
    if not task:
    logger.warning(f"Task not found: {task_id}")
    return

    task.status = status

    if status == "in_progress":
    task.started_at = datetime.utcnow()
    elif status == "completed":
    task.completed_at = datetime.utcnow()
    task.result = result
    elif status == "failed":
    task.completed_at = datetime.utcnow()
    task.error = error

    logger.info(f"Task {task_id} status updated to {status}")

    def get_plan_status(self) -> Dict[str, Any]:
    """獲取計劃狀態"""
    total_tasks = len(self.tasks)
    completed_tasks = len([t for t in self.tasks.values() if t.status == "completed"])
    failed_tasks = len([t for t in self.tasks.values() if t.status == "failed"])
    in_progress_tasks = len([t for t in self.tasks.values() if t.status == "in_progress"])
    pending_tasks = len([t for t in self.tasks.values() if t.status == "pending"])

    return {
    'total_tasks': total_tasks,
    'completed_tasks': completed_tasks,
    'failed_tasks': failed_tasks,
    'in_progress_tasks': in_progress_tasks,
    'pending_tasks': pending_tasks,
    'progress': completed_tasks / total_tasks if total_tasks > 0 else 0
    }

    # 使用示例

    planner = TaskPlanner()

    # 建立計劃

    goal = "Read the file config.txt"
    context = {'file_path': 'config.txt'}
    plan = planner.create_plan(goal, context)

    print("Execution plan:")
    for task in plan:
    print(f"  - {task.name}: {task.description}")

    # 更新任務狀態

    planner.update_task_status("validate_file_path", "completed")
    planner.update_task_status("read_file", "in_progress")

    # 獲取計劃狀態

    status = planner.get_plan_status()
    print(f"\nPlan status: {status}")

    ```> **任务规划器特点**:
    > - 根据目标生成任务
    > - 处理任务依赖
    > - 拓扑排序
    > - 状态跟踪

    ```

    ## 工具排程器

    ### 1. 工具排程原理

    # 示例:工具排程器

    使用者請求:
    "實現一個工具排程器"
    Claude Code 生成的程式碼:

    ````python
    `python

    from typing import Dict, List, Any, Optional
    from datetime import datetime
    import logging
    import asyncio

    logger = logging.getLogger(__name__)

    class Tool:
    """工具"""

    def __init__(self, tool_id: str, name: str, description: str, function: callable):
    self.id = tool_id
    self.name = name
    self.description = description
    self.function = function
    self.status = "idle"
    self.last_used: Optional[datetime] = None
    self.usage_count = 0

    class ToolScheduler:
    """工具调度器"""

    def __init__(self, max_concurrent: int = 3):
    self.tools: Dict[str, Tool] = {}
    self.task_queue: asyncio.Queue = asyncio.Queue()
    self.running_tasks: Dict[str, asyncio.Task] = {}
    self.max_concurrent = max_concurrent
    self.running = False

    def register_tool(self, tool: Tool):
    """注册工具"""
    self.tools[tool.id] = tool
    logger.info(f"Tool registered: {tool.id}")

    def schedule_task(self, tool_id: str, parameters: Dict[str, Any]) -> str:
    """调度任务"""
    task_id = f"task_{datetime.utcnow().timestamp()}"

    task = {
    'task_id': task_id,
    'tool_id': tool_id,
    'parameters': parameters,
    'status': 'pending',
    'created_at': datetime.utcnow()
    }

    asyncio.create_task(self.task_queue.put(task))
    logger.info(f"Task scheduled: {task_id}")

    return task_id

    async def start(self):
    """启动调度器"""
    self.running = True
    logger.info("Tool scheduler started")

     # 启动工作线程
    for i in range(self.max_concurrent):
    asyncio.create_task(self._worker(f"worker-{i}"))

    async def stop(self):
    """停止调度器"""
    self.running = False

     # 等待所有任务完成
    await asyncio.gather(*self.running_tasks.values(), return_exceptions=True)

    logger.info("Tool scheduler stopped")

    async def _worker(self, worker_name: str):
    """工作线程"""
    logger.info(f"{worker_name} started")

    while self.running:
    try:
     # 获取任务
    task = await asyncio.wait_for(
    self.task_queue.get(),
    timeout=1.0
    )

    tool_id = task['tool_id']
    tool = self.tools.get(tool_id)

    if not tool:
    logger.warning(f"Tool not found: {tool_id}")
    continue

     # 执行任务
    await self._execute_task(worker_name, task, tool)

    except asyncio.TimeoutError:
    continue
    except Exception as e:
    logger.error(f"{worker_name} error: {e}")

    logger.info(f"{worker_name} stopped")

    async def _execute_task(self, worker_name: str, task: Dict[str, Any], tool: Tool):
    """执行任务"""
    task_id = task['task_id']

     # 更新工具状态
    tool.status = "busy"

    try:
     # 执行工具函数
    result = await self._run_tool_function(tool, task['parameters'])

     # 更新任务状态
    task['status'] = 'completed'
    task['result'] = result
    task['completed_at'] = datetime.utcnow()

     # 更新工具统计
    tool.last_used = datetime.utcnow()
    tool.usage_count += 1

    logger.info(f"{worker_name} completed task {task_id}")

    except Exception as e:
     # 更新任务状态
    task['status'] = 'failed'
    task['error'] = str(e)
    task['completed_at'] = datetime.utcnow()

    logger.error(f"{worker_name} failed task {task_id}: {e}")

    finally:
     # 更新工具状态
    tool.status = "idle"

    async def _run_tool_function(self, tool: Tool, parameters: Dict[str, Any]) -> Any:
    """运行工具函数"""
     # 检查函数是否是协程函数
    if asyncio.iscoroutinefunction(tool.function):
    return await tool.function(**parameters)
    else:
     # 在线程池中运行同步函数
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, tool.function, **parameters)

    def get_tool_status(self, tool_id: str) -> Optional[Dict[str, Any]]:
    """获取工具状态"""
    tool = self.tools.get(tool_id)
    if not tool:
    return None

    return {
    'id': tool.id,
    'name': tool.name,
    'status': tool.status,
    'last_used': tool.last_used.isoformat() if tool.last_used else None,
    'usage_count': tool.usage_count
    }

    def get_all_tool_status(self) -> Dict[str, Dict[str, Any]]:
    """获取所有工具状态"""
    return {
    tool_id: self.get_tool_status(tool_id)
    for tool_id in self.tools
    }

    # 使用示例
    async def main():
    """主函数"""
    scheduler = ToolScheduler(max_concurrent=2)

     # 注册工具
    def read_file_tool(file_path: str) -> str:
    """读取文件工具"""
    with open(file_path, 'r') as f:
    return f.read()

    def write_file_tool(file_path: str, content: str) -> str:
    """写入文件工具"""
    with open(file_path, 'w') as f:
    f.write(content)
    return f"Written to {file_path}"

    scheduler.register_tool(Tool("read_file", "Read File", "Read file content", read_file_tool))
    scheduler.register_tool(Tool("write_file", "Write File", "Write content to file", write_file_tool))

     # 启动调度器
    await scheduler.start()

     # 调度任务
    task1 = scheduler.schedule_task("read_file", {'file_path': 'test.txt'})
    task2 = scheduler.schedule_task("write_file", {'file_path': 'output.txt', 'content': 'Hello'})
    task3 = scheduler.schedule_task("read_file", {'file_path': 'test.txt'})

     # 等待任务完成
    await asyncio.sleep(2)

     # 获取工具状态
    status = scheduler.get_all_tool_status()
    print(f"Tool status: {status}")

     # 停止调度器
    await scheduler.stop()

    if __name__ == '__main__':
    asyncio.run(main())

    ```> **工具排程器特點**:

    > - 非同步任務排程
    > - 併發控制
    > - 工具狀態管理
    > - 使用統計

    ## 总结

    核心组件详解包括:

    1. **意图识别器**: 意图识别原理、实现方法
    2. **任务规划器**: 任务规划原理、执行计划生成
    3. **工具调度器**: 工具调度原理、异步执行

    通过理解这些核心组件,可以更好地掌握Claude Code的工作原理。

    在下一节中,我们将探讨数据流与工作流程。

基于 MIT 许可发布 | 永久导航