Skip to content

28.3 資料流與工作流程

28.3.1 資料流概述

Claude Code 的資料流是指使用者請求從輸入到最終結果輸出的完整處理過程。理解資料流對於掌握 Claude Code 的工作原理至關重要。

資料流特點

  1. 多階段處理 :資料流經過多個處理階段,每個階段都有特定的職責
  2. 非同步處理 :支援非同步處理以提高效能
  3. 錯誤處理 :完善的錯誤處理和恢復機制
  4. 可觀測性 :每個處理步驟都可以被監控和追蹤

資料流架構

使用者輸入 ↓ 預處理(上下文收集、歷史分析) ↓ 意圖識別 ↓ 任務規劃 ↓ 工具選擇 ↓ 工具執行 ↓ 結果處理 ↓ 後處理(格式化、驗證) ↓ 使用者輸出

python
## 28.3.2 核心数据流组件

### 1\. 输入处理器

    python


    python

    class InputProcessor:
        """输入处理器"""

        def __init__(self):
            self.context_manager = ContextManager()
            self.history_analyzer = HistoryAnalyzer()

        def process(self, user_input: str, session_id: str) -> ProcessedInput:
            """处理用户输入"""

            # 收集上下文
            context = self.context_manager.collect_context(session_id)

            # 分析历史
            history = self.history_analyzer.analyze(session_id)

            # 预处理输入
            processed_input = self._preprocess(user_input)

            # 构建处理后的输入
            result = ProcessedInput(
                original_input=user_input,
                processed_input=processed_input,
                context=context,
                history=history,
                metadata={
                    'timestamp': datetime.utcnow(),
                    'session_id': session_id
                }
            )

            return result

        def _preprocess(self, input_text: str) -> str:
            """预处理输入文本"""
            # 去除多余空格
            text = ' '.join(input_text.split())

            # 标准化换行符
            text = text.replace('\r\n', '\n').replace('\r', '\n')

            # 处理特殊字符
            text = self._normalize_special_chars(text)

            return text

        def _normalize_special_chars(self, text: str) -> str:
            """标准化特殊字符"""
            # 统一引号
            text = text.replace('"', '"').replace('"', '"')
            text = text.replace(''', "'").replace(''', "'")

            # 统一破折号
            text = text.replace('–', '-').replace('—', '--')

            return text

    ```### 2. 意圖識別處理器

```python
    class IntentProcessor:
    """意图识别处理器"""
    def __init__(self):
    self.intent_recognizer = IntentRecognizer()
    self.entity_extractor = EntityExtractor()
    def process(self, processed_input: ProcessedInput) -> IntentResult:
    """处理意图识别"""
    # 识别意图
    intent = self.intent_recognizer.recognize(
    processed_input.processed_input
    )
    # 提取实体
    entities = self.entity_extractor.extract(
    processed_input.processed_input,
    intent
    )
    # 构建结果
    result = IntentResult(
    intent=intent,
    entities=entities,
    confidence=intent.confidence,
    metadata={
    'processing_time': self._measure_time(),
    'model_version': self.intent_recognizer.model_version
    }
    )
    return result
    def _measure_time(self) -> float:
    """测量处理时间"""
    return time.time()

### 3\. 任务规划处理器

    python
python

    ```python

    class TaskPlanningProcessor:
        """任務規劃處理器"""

        def __init__(self):
            self.task_planner = TaskPlanner()
            self.dependency_analyzer = DependencyAnalyzer()

        def process(self, intent_result: IntentResult,
                    context: Dict[str, Any]) -> PlanningResult:
            """處理任務規劃"""

            # 分析依賴關係

            dependencies = self.dependency_analyzer.analyze(
                intent_result,
                context
            )

            # 建立執行計劃

            tasks = self.task_planner.create_plan(
                intent_result.intent.name,
                context
            )

            # 構建結果

            result = PlanningResult(
                tasks=tasks,
                dependencies=dependencies,
                execution_order=self.task_planner.execution_plan,
                estimated_time=self._estimate_time(tasks),
                metadata={
                    'planning_algorithm': self.task_planner.algorithm,
                    'optimization_level': self.task_planner.optimization_level
                }
            )

            return result

        def _estimate_time(self, tasks: List[Task]) -> float:
            """估計執行時間"""
            total_time = 0.0
            for task in tasks:
                total_time += task.estimated_duration
            return total_time

    ```### 4. 工具执行处理器

    class ToolExecutionProcessor:
    """工具执行处理器"""
    def __init__(self):
    self.tool_scheduler = ToolScheduler()
    self.result_aggregator = ResultAggregator()
    def process(self, planning_result: PlanningResult) -> ExecutionResult:
    """处理工具执行"""
    # 执行任务
    execution_results = []
    for task in planning_result.tasks:
    result = self.tool_scheduler.execute(task)
    execution_results.append(result)
    # 聚合结果
    aggregated_result = self.result_aggregator.aggregate(
    execution_results
    )
    # 构建结果
    result = ExecutionResult(
    individual_results=execution_results,
    aggregated_result=aggregated_result,
    total_time=sum(r.execution_time for r in execution_results),
    success_rate=sum(1 for r in execution_results if r.success) / len(execution_results),
    metadata={
    'parallel_execution': self.tool_scheduler.parallel,
    'max_concurrency': self.tool_scheduler.max_concurrency
    }
    )
    return result

### 5\. 输出生成处理器

    python
python

    ```python

    class OutputGenerationProcessor:
        """輸出生成處理器"""

        def __init__(self):
            self.formatter = OutputFormatter()
            self.validator = OutputValidator()

        def process(self, execution_result: ExecutionResult,
                    intent_result: IntentResult) -> OutputResult:
            """處理輸出生成"""

            # 格式化輸出

            formatted_output = self.formatter.format(
                execution_result.aggregated_result,
                intent_result.intent
            )

            # 驗證輸出

            validation_result = self.validator.validate(
                formatted_output,
                intent_result
            )

            # 構建結果

            result = OutputResult(
                formatted_output=formatted_output,
                validation_result=validation_result,
                format_type=self.formatter.current_format,
                metadata={
                    'formatter_version': self.formatter.version,
                    'validation_rules': self.validator.rules
                }
            )

            return result

    ```## 28.3.3 完整数据流实现

    class DataPipeline:
    """数据流管道"""
    def __init__(self):
    self.input_processor = InputProcessor()
    self.intent_processor = IntentProcessor()
    self.planning_processor = TaskPlanningProcessor()
    self.execution_processor = ToolExecutionProcessor()
    self.output_processor = OutputGenerationProcessor()
    self.observers: List[PipelineObserver] = []
    def add_observer(self, observer: PipelineObserver):
    """添加观察者"""
    self.observers.append(observer)
    def process(self, user_input: str, session_id: str) -> PipelineResult:
    """处理完整数据流"""
    pipeline_result = PipelineResult()
    try:
    # 1. 输入处理
    self._notify_observers('input_processing_start')
    processed_input = self.input_processor.process(user_input, session_id)
    pipeline_result.processed_input = processed_input
    self._notify_observers('input_processing_complete', processed_input)
    # 2. 意图识别
    self._notify_observers('intent_recognition_start')
    intent_result = self.intent_processor.process(processed_input)
    pipeline_result.intent_result = intent_result
    self._notify_observers('intent_recognition_complete', intent_result)
    # 3. 任务规划
    self._notify_observers('task_planning_start')
    planning_result = self.planning_processor.process(
    intent_result,
    processed_input.context
    )
    pipeline_result.planning_result = planning_result
    self._notify_observers('task_planning_complete', planning_result)
    # 4. 工具执行
    self._notify_observers('tool_execution_start')
    execution_result = self.execution_processor.process(planning_result)
    pipeline_result.execution_result = execution_result
    self._notify_observers('tool_execution_complete', execution_result)
    # 5. 输出生成
    self._notify_observers('output_generation_start')
    output_result = self.output_processor.process(
    execution_result,
    intent_result
    )
    pipeline_result.output_result = output_result
    self._notify_observers('output_generation_complete', output_result)
    # 标记成功
    pipeline_result.success = True
    except Exception as e:
    pipeline_result.success = False
    pipeline_result.error = str(e)
    self._notify_observers('pipeline_error', e)
    logger.error(f"Pipeline error: {e}")
    raise
    finally:
    pipeline_result.total_time = self._calculate_total_time(pipeline_result)
    self._notify_observers('pipeline_complete', pipeline_result)
    return pipeline_result
    def _notify_observers(self, event: str, data: Any = None):
    """通知观察者"""
    for observer in self.observers:
    observer.notify(event, data)
    def _calculate_total_time(self, result: PipelineResult) -> float:
    """计算总处理时间"""
    if not result.output_result:
    return 0.0
    return (
    result.processed_input.metadata['timestamp'] -
    result.output_result.metadata['timestamp']
    ).total_seconds()

## 28.3.4 工作流程设计

### 1\. 顺序工作流程

    python
python

    ```python

    class SequentialWorkflow:
        """順序工作流程"""

        def __init__(self):
            self.steps: List[WorkflowStep] = []

        def add_step(self, step: WorkflowStep):
            """新增步驟"""
            self.steps.append(step)

        def execute(self, context: Dict[str, Any]) -> WorkflowResult:
            """執行工作流程"""

            result = WorkflowResult()
            current_context = context.copy()

            for i, step in enumerate(self.steps):
                try:

                    # 執行步驟

                    step_result = step.execute(current_context)

                    # 更新上下文

                    current_context.update(step_result.output)

                    # 記錄結果

                    result.add_step_result(step.name, step_result)

                except Exception as e:
                    logger.error(f"Error in step {step.name}: {e}")
                    result.success = False
                    result.error = str(e)
                    result.failed_step = i
                    break

            if result.success is None:
                result.success = True

            result.final_context = current_context

            return result

    ```### 2. 并行工作流程

    class ParallelWorkflow:
    """并行工作流程"""
    def __init__(self, max_workers: int = 4):
    self.steps: List[WorkflowStep] = []
    self.max_workers = max_workers
    def add_step(self, step: WorkflowStep):
    """添加步骤"""
    self.steps.append(step)
    async def execute(self, context: Dict[str, Any]) -> WorkflowResult:
    """执行并行工作流程"""
    result = WorkflowResult()
    # 创建异步任务
    tasks = []
    for step in self.steps:
    task = asyncio.create_task(
    self._execute_step(step, context.copy())
    )
    tasks.append(task)
    # 等待所有任务完成
    step_results = await asyncio.gather(*tasks, return_exceptions=True)
    # 处理结果
    for i, step_result in enumerate(step_results):
    if isinstance(step_result, Exception):
    logger.error(f"Error in step {self.steps[i].name}: {step_result}")
    result.success = False
    result.add_step_result(
    self.steps[i].name,
    StepResult(success=False, error=str(step_result))
    )
    else:
    result.add_step_result(self.steps[i].name, step_result)
    result.success = all(r.success for r in result.step_results.values())
    return result
    async def _execute_step(self, step: WorkflowStep,
    context: Dict[str, Any]) -> StepResult:
    """执行单个步骤"""
    return await step.execute_async(context)

### 3\. 条件工作流程

    python
python

    ```python

    class ConditionalWorkflow:
        """條件工作流程"""

        def __init__(self):
            self.branches: Dict[str, List[WorkflowStep]] = {}
            self.default_branch: List[WorkflowStep] = []

        def add_branch(self, condition: str, steps: List[WorkflowStep]):
            """新增分支"""
            self.branches[condition] = steps

        def set_default_branch(self, steps: List[WorkflowStep]):
            """設定預設分支"""
            self.default_branch = steps

        def execute(self, context: Dict[str, Any],
                   condition_evaluator: callable) -> WorkflowResult:
            """執行條件工作流程"""

            result = WorkflowResult()

            # 評估條件

            selected_branch = None
            for condition, steps in self.branches.items():
                if condition_evaluator(condition, context):
                    selected_branch = steps
                    result.selected_branch = condition
                    break

            # 使用預設分支

            if selected_branch is None:
                selected_branch = self.default_branch
                result.selected_branch = 'default'

            # 執行選定的分支

            current_context = context.copy()
            for step in selected_branch:
                try:
                    step_result = step.execute(current_context)
                    current_context.update(step_result.output)
                    result.add_step_result(step.name, step_result)
                except Exception as e:
                    logger.error(f"Error in step {step.name}: {e}")
                    result.success = False
                    result.error = str(e)
                    break

            if result.success is None:
                result.success = True

            result.final_context = current_context

            return result

    ```## 28.3.5 数据流监控与调试

    ### 1. 数据流监控器

    class DataFlowMonitor:
    """数据流监控器"""
    def __init__(self):
    self.metrics: Dict[str, List[float]] = {}
    self.events: List[Dict[str, Any]] = []
    def record_metric(self, name: str, value: float):
    """记录指标"""
    if name not in self.metrics:
    self.metrics[name] = []
    self.metrics[name].append(value)
    def record_event(self, event_type: str, data: Dict[str, Any]):
    """记录事件"""
    event = {
    'type': event_type,
    'timestamp': datetime.utcnow(),
    'data': data
    }
    self.events.append(event)
    def get_metrics_summary(self) -> Dict[str, Dict[str, float]]:
    """获取指标摘要"""
    summary = {}
    for name, values in self.metrics.items():
    summary[name] = {
    'count': len(values),
    'mean': sum(values) / len(values),
    'min': min(values),
    'max': max(values)
    }
    return summary
    def get_events(self, event_type: str = None) -> List[Dict[str, Any]]:
    """获取事件"""
    if event_type:
    return [e for e in self.events if e['type'] == event_type]
    return self.events

### 2\. 数据流调试器

    python
python

    ```python

    class DataFlowDebugger:
        """資料流偵錯程式"""

        def __init__(self):
            self.breakpoints: List[str] = []
            self.trace: List[Dict[str, Any]] = []
            self.enabled = False

        def enable(self):
            """啟用除錯"""
            self.enabled = True

        def disable(self):
            """禁用除錯"""
            self.enabled = False

        def add_breakpoint(self, step_name: str):
            """新增斷點"""
            self.breakpoints.append(step_name)

        def trace_step(self, step_name: str, input_data: Any,
                      output_data: Any):
            """追蹤步驟"""
            if not self.enabled:
                return

            trace_entry = {
                'step': step_name,
                'timestamp': datetime.utcnow(),
                'input': self._serialize(input_data),
                'output': self._serialize(output_data)
            }
            self.trace.append(trace_entry)

            # 檢查斷點

            if step_name in self.breakpoints:
                self._pause_at_breakpoint(step_name, trace_entry)

        def _pause_at_breakpoint(self, step_name: str, trace_entry: Dict):
            """在斷點處暫停"""
            logger.info(f"Breakpoint hit at: {step_name}")
            logger.info(f"Input: {trace_entry['input']}")
            logger.info(f"Output: {trace_entry['output']}")

        def _serialize(self, data: Any) -> Any:
            """序列化資料"""
            if isinstance(data, (str, int, float, bool, type(None))):
                return data
            elif isinstance(data, (list, tuple)):
                return [self._serialize(item) for item in data]
            elif isinstance(data, dict):
                return {k: self._serialize(v) for k, v in data.items()}
            else:
                return str(data)

## 28.3.6 最佳實踐

### 1\. 資料流設計原則

  1. **單一職責** :每個處理器只負責一個特定的任務
  2. **可組合性** :處理器可以靈活組合以構建不同的資料流
  3. **可觀測性** :每個處理步驟都應該可以被監控和追蹤
  4. **錯誤處理** :完善的錯誤處理和恢復機制
  5. **效能最佳化** :支援非同步處理和並行執行

### 2\. 工作流程設計原則

  1. **清晰性** :工作流程的邏輯應該清晰易懂
  2. **可維護性** :易於修改和擴充套件
  3. **可測試性** :每個步驟都應該可以獨立測試
  4. **靈活性** :支援不同的執行模式(順序、並行、條件)
  5. **可重用性** :工作流程元件應該可以在不同場景中重用

### 3\. 監控與除錯建議

  1. **關鍵指標** :監控處理時間、成功率、錯誤率等關鍵指標
  2. **事件追蹤** :記錄重要事件以便後續分析
  3. **斷點除錯** :在關鍵步驟設定斷點進行除錯
  4. **效能分析** :識別效能瓶頸並進行最佳化
  5. **日誌記錄** :詳細的日誌記錄有助於問題診斷

透過合理設計資料流和工作流程,可以構建高效、可靠、可維護的 Claude Code 系統。

基于 MIT 许可发布 | 永久导航