28.3 資料流與工作流程
28.3.1 資料流概述
Claude Code 的資料流是指使用者請求從輸入到最終結果輸出的完整處理過程。理解資料流對於掌握 Claude Code 的工作原理至關重要。
資料流特點
- 多階段處理 :資料流經過多個處理階段,每個階段都有特定的職責
- 非同步處理 :支援非同步處理以提高效能
- 錯誤處理 :完善的錯誤處理和恢復機制
- 可觀測性 :每個處理步驟都可以被監控和追蹤
資料流架構
使用者輸入 ↓ 預處理(上下文收集、歷史分析) ↓ 意圖識別 ↓ 任務規劃 ↓ 工具選擇 ↓ 工具執行 ↓ 結果處理 ↓ 後處理(格式化、驗證) ↓ 使用者輸出
python
## 28.3.2 核心数据流组件
### 1\. 输入处理器
python
python
class InputProcessor:
"""输入处理器"""
def __init__(self):
self.context_manager = ContextManager()
self.history_analyzer = HistoryAnalyzer()
def process(self, user_input: str, session_id: str) -> ProcessedInput:
"""处理用户输入"""
# 收集上下文
context = self.context_manager.collect_context(session_id)
# 分析历史
history = self.history_analyzer.analyze(session_id)
# 预处理输入
processed_input = self._preprocess(user_input)
# 构建处理后的输入
result = ProcessedInput(
original_input=user_input,
processed_input=processed_input,
context=context,
history=history,
metadata={
'timestamp': datetime.utcnow(),
'session_id': session_id
}
)
return result
def _preprocess(self, input_text: str) -> str:
"""预处理输入文本"""
# 去除多余空格
text = ' '.join(input_text.split())
# 标准化换行符
text = text.replace('\r\n', '\n').replace('\r', '\n')
# 处理特殊字符
text = self._normalize_special_chars(text)
return text
def _normalize_special_chars(self, text: str) -> str:
"""标准化特殊字符"""
# 统一引号
text = text.replace('"', '"').replace('"', '"')
text = text.replace(''', "'").replace(''', "'")
# 统一破折号
text = text.replace('–', '-').replace('—', '--')
return text
```### 2. 意圖識別處理器
```python
class IntentProcessor:
"""意图识别处理器"""
def __init__(self):
self.intent_recognizer = IntentRecognizer()
self.entity_extractor = EntityExtractor()
def process(self, processed_input: ProcessedInput) -> IntentResult:
"""处理意图识别"""
# 识别意图
intent = self.intent_recognizer.recognize(
processed_input.processed_input
)
# 提取实体
entities = self.entity_extractor.extract(
processed_input.processed_input,
intent
)
# 构建结果
result = IntentResult(
intent=intent,
entities=entities,
confidence=intent.confidence,
metadata={
'processing_time': self._measure_time(),
'model_version': self.intent_recognizer.model_version
}
)
return result
def _measure_time(self) -> float:
"""测量处理时间"""
return time.time()
### 3\. 任务规划处理器
pythonpython
```python
class TaskPlanningProcessor:
"""任務規劃處理器"""
def __init__(self):
self.task_planner = TaskPlanner()
self.dependency_analyzer = DependencyAnalyzer()
def process(self, intent_result: IntentResult,
context: Dict[str, Any]) -> PlanningResult:
"""處理任務規劃"""
# 分析依賴關係
dependencies = self.dependency_analyzer.analyze(
intent_result,
context
)
# 建立執行計劃
tasks = self.task_planner.create_plan(
intent_result.intent.name,
context
)
# 構建結果
result = PlanningResult(
tasks=tasks,
dependencies=dependencies,
execution_order=self.task_planner.execution_plan,
estimated_time=self._estimate_time(tasks),
metadata={
'planning_algorithm': self.task_planner.algorithm,
'optimization_level': self.task_planner.optimization_level
}
)
return result
def _estimate_time(self, tasks: List[Task]) -> float:
"""估計執行時間"""
total_time = 0.0
for task in tasks:
total_time += task.estimated_duration
return total_time
```### 4. 工具执行处理器
class ToolExecutionProcessor:
"""工具执行处理器"""
def __init__(self):
self.tool_scheduler = ToolScheduler()
self.result_aggregator = ResultAggregator()
def process(self, planning_result: PlanningResult) -> ExecutionResult:
"""处理工具执行"""
# 执行任务
execution_results = []
for task in planning_result.tasks:
result = self.tool_scheduler.execute(task)
execution_results.append(result)
# 聚合结果
aggregated_result = self.result_aggregator.aggregate(
execution_results
)
# 构建结果
result = ExecutionResult(
individual_results=execution_results,
aggregated_result=aggregated_result,
total_time=sum(r.execution_time for r in execution_results),
success_rate=sum(1 for r in execution_results if r.success) / len(execution_results),
metadata={
'parallel_execution': self.tool_scheduler.parallel,
'max_concurrency': self.tool_scheduler.max_concurrency
}
)
return result
### 5\. 输出生成处理器
pythonpython
```python
class OutputGenerationProcessor:
"""輸出生成處理器"""
def __init__(self):
self.formatter = OutputFormatter()
self.validator = OutputValidator()
def process(self, execution_result: ExecutionResult,
intent_result: IntentResult) -> OutputResult:
"""處理輸出生成"""
# 格式化輸出
formatted_output = self.formatter.format(
execution_result.aggregated_result,
intent_result.intent
)
# 驗證輸出
validation_result = self.validator.validate(
formatted_output,
intent_result
)
# 構建結果
result = OutputResult(
formatted_output=formatted_output,
validation_result=validation_result,
format_type=self.formatter.current_format,
metadata={
'formatter_version': self.formatter.version,
'validation_rules': self.validator.rules
}
)
return result
```## 28.3.3 完整数据流实现
class DataPipeline:
"""数据流管道"""
def __init__(self):
self.input_processor = InputProcessor()
self.intent_processor = IntentProcessor()
self.planning_processor = TaskPlanningProcessor()
self.execution_processor = ToolExecutionProcessor()
self.output_processor = OutputGenerationProcessor()
self.observers: List[PipelineObserver] = []
def add_observer(self, observer: PipelineObserver):
"""添加观察者"""
self.observers.append(observer)
def process(self, user_input: str, session_id: str) -> PipelineResult:
"""处理完整数据流"""
pipeline_result = PipelineResult()
try:
# 1. 输入处理
self._notify_observers('input_processing_start')
processed_input = self.input_processor.process(user_input, session_id)
pipeline_result.processed_input = processed_input
self._notify_observers('input_processing_complete', processed_input)
# 2. 意图识别
self._notify_observers('intent_recognition_start')
intent_result = self.intent_processor.process(processed_input)
pipeline_result.intent_result = intent_result
self._notify_observers('intent_recognition_complete', intent_result)
# 3. 任务规划
self._notify_observers('task_planning_start')
planning_result = self.planning_processor.process(
intent_result,
processed_input.context
)
pipeline_result.planning_result = planning_result
self._notify_observers('task_planning_complete', planning_result)
# 4. 工具执行
self._notify_observers('tool_execution_start')
execution_result = self.execution_processor.process(planning_result)
pipeline_result.execution_result = execution_result
self._notify_observers('tool_execution_complete', execution_result)
# 5. 输出生成
self._notify_observers('output_generation_start')
output_result = self.output_processor.process(
execution_result,
intent_result
)
pipeline_result.output_result = output_result
self._notify_observers('output_generation_complete', output_result)
# 标记成功
pipeline_result.success = True
except Exception as e:
pipeline_result.success = False
pipeline_result.error = str(e)
self._notify_observers('pipeline_error', e)
logger.error(f"Pipeline error: {e}")
raise
finally:
pipeline_result.total_time = self._calculate_total_time(pipeline_result)
self._notify_observers('pipeline_complete', pipeline_result)
return pipeline_result
def _notify_observers(self, event: str, data: Any = None):
"""通知观察者"""
for observer in self.observers:
observer.notify(event, data)
def _calculate_total_time(self, result: PipelineResult) -> float:
"""计算总处理时间"""
if not result.output_result:
return 0.0
return (
result.processed_input.metadata['timestamp'] -
result.output_result.metadata['timestamp']
).total_seconds()
## 28.3.4 工作流程设计
### 1\. 顺序工作流程
pythonpython
```python
class SequentialWorkflow:
"""順序工作流程"""
def __init__(self):
self.steps: List[WorkflowStep] = []
def add_step(self, step: WorkflowStep):
"""新增步驟"""
self.steps.append(step)
def execute(self, context: Dict[str, Any]) -> WorkflowResult:
"""執行工作流程"""
result = WorkflowResult()
current_context = context.copy()
for i, step in enumerate(self.steps):
try:
# 執行步驟
step_result = step.execute(current_context)
# 更新上下文
current_context.update(step_result.output)
# 記錄結果
result.add_step_result(step.name, step_result)
except Exception as e:
logger.error(f"Error in step {step.name}: {e}")
result.success = False
result.error = str(e)
result.failed_step = i
break
if result.success is None:
result.success = True
result.final_context = current_context
return result
```### 2. 并行工作流程
class ParallelWorkflow:
"""并行工作流程"""
def __init__(self, max_workers: int = 4):
self.steps: List[WorkflowStep] = []
self.max_workers = max_workers
def add_step(self, step: WorkflowStep):
"""添加步骤"""
self.steps.append(step)
async def execute(self, context: Dict[str, Any]) -> WorkflowResult:
"""执行并行工作流程"""
result = WorkflowResult()
# 创建异步任务
tasks = []
for step in self.steps:
task = asyncio.create_task(
self._execute_step(step, context.copy())
)
tasks.append(task)
# 等待所有任务完成
step_results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
for i, step_result in enumerate(step_results):
if isinstance(step_result, Exception):
logger.error(f"Error in step {self.steps[i].name}: {step_result}")
result.success = False
result.add_step_result(
self.steps[i].name,
StepResult(success=False, error=str(step_result))
)
else:
result.add_step_result(self.steps[i].name, step_result)
result.success = all(r.success for r in result.step_results.values())
return result
async def _execute_step(self, step: WorkflowStep,
context: Dict[str, Any]) -> StepResult:
"""执行单个步骤"""
return await step.execute_async(context)
### 3\. 条件工作流程
pythonpython
```python
class ConditionalWorkflow:
"""條件工作流程"""
def __init__(self):
self.branches: Dict[str, List[WorkflowStep]] = {}
self.default_branch: List[WorkflowStep] = []
def add_branch(self, condition: str, steps: List[WorkflowStep]):
"""新增分支"""
self.branches[condition] = steps
def set_default_branch(self, steps: List[WorkflowStep]):
"""設定預設分支"""
self.default_branch = steps
def execute(self, context: Dict[str, Any],
condition_evaluator: callable) -> WorkflowResult:
"""執行條件工作流程"""
result = WorkflowResult()
# 評估條件
selected_branch = None
for condition, steps in self.branches.items():
if condition_evaluator(condition, context):
selected_branch = steps
result.selected_branch = condition
break
# 使用預設分支
if selected_branch is None:
selected_branch = self.default_branch
result.selected_branch = 'default'
# 執行選定的分支
current_context = context.copy()
for step in selected_branch:
try:
step_result = step.execute(current_context)
current_context.update(step_result.output)
result.add_step_result(step.name, step_result)
except Exception as e:
logger.error(f"Error in step {step.name}: {e}")
result.success = False
result.error = str(e)
break
if result.success is None:
result.success = True
result.final_context = current_context
return result
```## 28.3.5 数据流监控与调试
### 1. 数据流监控器
class DataFlowMonitor:
"""数据流监控器"""
def __init__(self):
self.metrics: Dict[str, List[float]] = {}
self.events: List[Dict[str, Any]] = []
def record_metric(self, name: str, value: float):
"""记录指标"""
if name not in self.metrics:
self.metrics[name] = []
self.metrics[name].append(value)
def record_event(self, event_type: str, data: Dict[str, Any]):
"""记录事件"""
event = {
'type': event_type,
'timestamp': datetime.utcnow(),
'data': data
}
self.events.append(event)
def get_metrics_summary(self) -> Dict[str, Dict[str, float]]:
"""获取指标摘要"""
summary = {}
for name, values in self.metrics.items():
summary[name] = {
'count': len(values),
'mean': sum(values) / len(values),
'min': min(values),
'max': max(values)
}
return summary
def get_events(self, event_type: str = None) -> List[Dict[str, Any]]:
"""获取事件"""
if event_type:
return [e for e in self.events if e['type'] == event_type]
return self.events
### 2\. 数据流调试器
pythonpython
```python
class DataFlowDebugger:
"""資料流偵錯程式"""
def __init__(self):
self.breakpoints: List[str] = []
self.trace: List[Dict[str, Any]] = []
self.enabled = False
def enable(self):
"""啟用除錯"""
self.enabled = True
def disable(self):
"""禁用除錯"""
self.enabled = False
def add_breakpoint(self, step_name: str):
"""新增斷點"""
self.breakpoints.append(step_name)
def trace_step(self, step_name: str, input_data: Any,
output_data: Any):
"""追蹤步驟"""
if not self.enabled:
return
trace_entry = {
'step': step_name,
'timestamp': datetime.utcnow(),
'input': self._serialize(input_data),
'output': self._serialize(output_data)
}
self.trace.append(trace_entry)
# 檢查斷點
if step_name in self.breakpoints:
self._pause_at_breakpoint(step_name, trace_entry)
def _pause_at_breakpoint(self, step_name: str, trace_entry: Dict):
"""在斷點處暫停"""
logger.info(f"Breakpoint hit at: {step_name}")
logger.info(f"Input: {trace_entry['input']}")
logger.info(f"Output: {trace_entry['output']}")
def _serialize(self, data: Any) -> Any:
"""序列化資料"""
if isinstance(data, (str, int, float, bool, type(None))):
return data
elif isinstance(data, (list, tuple)):
return [self._serialize(item) for item in data]
elif isinstance(data, dict):
return {k: self._serialize(v) for k, v in data.items()}
else:
return str(data)
## 28.3.6 最佳實踐
### 1\. 資料流設計原則
1. **單一職責** :每個處理器只負責一個特定的任務
2. **可組合性** :處理器可以靈活組合以構建不同的資料流
3. **可觀測性** :每個處理步驟都應該可以被監控和追蹤
4. **錯誤處理** :完善的錯誤處理和恢復機制
5. **效能最佳化** :支援非同步處理和並行執行
### 2\. 工作流程設計原則
1. **清晰性** :工作流程的邏輯應該清晰易懂
2. **可維護性** :易於修改和擴充套件
3. **可測試性** :每個步驟都應該可以獨立測試
4. **靈活性** :支援不同的執行模式(順序、並行、條件)
5. **可重用性** :工作流程元件應該可以在不同場景中重用
### 3\. 監控與除錯建議
1. **關鍵指標** :監控處理時間、成功率、錯誤率等關鍵指標
2. **事件追蹤** :記錄重要事件以便後續分析
3. **斷點除錯** :在關鍵步驟設定斷點進行除錯
4. **效能分析** :識別效能瓶頸並進行最佳化
5. **日誌記錄** :詳細的日誌記錄有助於問題診斷
透過合理設計資料流和工作流程,可以構建高效、可靠、可維護的 Claude Code 系統。