15.3 Skills 与主代理的交互
交互机制概述
Skills 与主代理的交互是 Claude Code 系统的核心。主代理负责协调和管理 Skills,而 Skills 则提供具体的功能实现。本节将深入探讨两者之间的交互机制。
交互模式
1. 主动调用模式
1.1 调用流程
主动调用流程
步骤 1:任务识别
主代理接收用户请求,分析任务类型和需求
步骤 2:Skill 选择
根据任务需求,从可用 Skills 中选择最合适的 Skill
步骤 3:参数准备
准备 Skill 需要的参数和上下文信息
步骤 4:Skill 调用
调用选定的 Skill,传递参数和上下文
步骤 5:结果处理
接收 Skill 的执行结果,进行必要的处理和整合
步骤 6:响应生成
基于 Skill 的结果,生成最终的响应返回给用户
python
#### 1.2 代码示例
bash
python
class MainAgent:
def __init__(self):
self.skills = load_skills()
self.context_manager = ContextManager()
def process_request(self, user_request):
# 1. 任务识别
task = self.analyze_task(user_request)
# 2. Skill 选择
skill = self.select_skill(task)
# 3. 参数准备
context = self.context_manager.collect_context(skill, task)
parameters = self.prepare_parameters(task, context)
# 4. Skill 调用
result = skill.execute(parameters, context)
# 5. 结果处理
processed_result = self.process_result(result, context)
# 6. 响应生成
response = self.generate_response(processed_result)
return response
### 2. 被动调用模式
#### 2.1 调用流程
## 被动调用流程
### 步骤 1:用户指定
用户明确指定要使用的 Skill
### 步骤 2:参数验证
验证用户提供的参数是否有效
### 步骤 3:上下文收集
收集 Skill 需要的上下文信息
### 步骤 4:Skill 执行
执行指定的 Skill
### 步骤 5:结果返回
直接返回 Skill 的执行结果
#### 2.2 代码示例
bash
python
class MainAgent:
def execute_skill(self, skill_name, user_parameters):
# 1. 验证 Skill 存在
if skill_name not in self.skills:
raise SkillNotFoundError(skill_name)
skill = self.skills[skill_name]
# 2. 参数验证
validated_params = skill.validate_parameters(user_parameters)
# 3. 上下文收集
context = self.context_manager.collect_context(skill, validated_params)
# 4. Skill 执行
result = skill.execute(validated_params, context)
# 5. 结果返回
return result
### 3. 嵌套调用模式
#### 3.1 调用流程
## 嵌套调用流程
### 示例场景:部署应用
### 调用层次
主代理
└─> 部署 Skill
├─> 测试 Skill
│ └─> 代码分析 Skill
│ └─> 文档检查 Skill
├─> 构建 Skill
│ └─> 依赖检查 Skill
└─> 验证 Skill
└─> 健康检查 Skill
### 执行流程
#### 3.2 代码示例
bash
python
class DeploymentSkill(Skill):
def execute(self, parameters, context):
# 调用测试 Skill
test_result = self.call_skill("test", context)
if not test_result.success:
return DeploymentResult(success=False, error="Tests failed")
# 调用构建 Skill
build_result = self.call_skill("build", context)
if not build_result.success:
return DeploymentResult(success=False, error="Build failed")
# 执行部署
deploy_result = self.deploy(build_result.artifact)
# 调用验证 Skill
verify_result = self.call_skill("verify", context)
return DeploymentResult(
success=verify_result.success,
deploy_result=deploy_result,
verify_result=verify_result
)
## 通信机制
### 1. 消息传递
#### 1.1 消息格式
## 消息格式
### 请求消息
~~~`json
`json
{
"message_id": "msg_123456",
"timestamp": "2024-01-15T10:30:00Z",
"type": "skill_request",
"skill_name": "code-review",
"parameters": {
"file": "src/main.py",
"strict": true
},
"context": {
"project": {...},
"code": {...},
"user": {...}
}
}
```### 响应消息json
{ "message_id": "msg_123456", "timestamp": "2024-01-15T10:30:15Z", "type": "skill_response", "status": "success", "result": { "issues": [...], "summary": {...} }, "metadata": { "execution_time": 15.2, "memory_used": "256MB" } }
错误消息
bash
````json
python
{
"message_id": "msg_123456",
"timestamp": "2024-01-15T10:30:10Z",
"type": "skill_error",
"error": {
"code": "FILE_NOT_FOUND",
"message": "File src/main.py not found",
"details": {...}
}
}
```> >
```python
#### 1.2 消息队列
class MessageQueue: def **init**(self): self.queue = asyncio.Queue() self.handlers = {} async def send(self, message): await self.queue.put(message) async def receive(self): return await self.queue.get() def register_handler(self, message_type, handler): self.handlers[message_type] = handler async def process_messages(self): while True: message = await self.receive() handler = self.handlers.get(message.type) if handler: await handler(message)
bash
### 2. 事件驱动
#### 2.1 事件类型
markdown
## 事件类型
### Skill 事件
* skill_started: Skill 开始执行
* skill_progress: Skill 执行进度更新
* skill_completed: Skill 执行完成
* skill_failed: Skill 执行失败
### 上下文事件
* context_updated: 上下文更新
* context_invalidated: 上下文失效
### 工具事件
* tool_called: 工具被调用
* tool_completed: 工具执行完成
* tool_failed: 工具执行失败
#### 2.2 事件处理
class EventHandler: def **init**(self): self.listeners = defaultdict(list) def on(self, event_type, callback): self.listeners[event_type].append(callback) async def emit(self, event_type, data): for callback in self.listeners.get(event_type, []): await callback(data) async def handle_skill_started(self, event): print(f"Skill {event.skill_name} started") async def handle_skill_progress(self, event): print(f"Progress: {event.progress}%") async def handle_skill_completed(self, event): print(f"Skill {event.skill_name} completed")
bash
### 3. 流式通信
#### 3.1 流式输出
python
class StreamingSkill(Skill): async def execute_stream(self, parameters, context): # 步骤 1 yield {"step": 1, "message": "Analyzing code..."} result1 = await self.analyze_code(parameters, context)
bash
# 步骤 2
yield {"step": 2, "message": "Checking security..."}
result2 = await self.check_security(result1, context)
# 步骤 3
yield {"step": 3, "message": "Generating report..."}
result3 = await self.generate_report(result2, context)
# 最终结果
yield {"step": 4, "message": "Completed", "result": result3}
#### 3.2 流式消费
async def consume_stream(skill, parameters, context): async for chunk in skill.execute_stream(parameters, context): if "message" in chunk: print(chunk["message"]) if "result" in chunk: return chunk["result"]
bash
## 状态管理
### 1. 执行状态
#### 1.1 状态类型
markdown
## 执行状态
### 状态定义
* PENDING: 等待执行
* RUNNING: 正在执行
* PAUSED: 已暂停
* COMPLETED: 已完成
* FAILED: 执行失败
* CANCELLED: 已取消
### 状态转换
PENDING → RUNNING → COMPLETED PENDING → RUNNING → FAILED RUNNING → PAUSED → RUNNING RUNNING → CANCELLED
#### 1.2 状态管理
class ExecutionState: def **init**(self): self.state = "PENDING" self.start_time = None self.end_time = None self.progress = 0 self.error = None def start(self): self.state = "RUNNING" self.start_time = datetime.now() def complete(self): self.state = "COMPLETED" self.end_time = datetime.now() def fail(self, error): self.state = "FAILED" self.error = error self.end_time = datetime.now() def update_progress(self, progress): self.progress = progress def get_duration(self): if self.start_time and self.end_time: return (self.end_time - self.start_time).total_seconds() return None
bash
### 2. 上下文状态
#### 2.1 上下文快照
python
class ContextSnapshot: def **init**(self, context): self.timestamp = datetime.now() self.context = copy.deepcopy(context) self.version = self.generate_version()
bash
def generate_version(self):
return hashlib.md5(
json.dumps(self.context, sort_keys=True).encode()
).hexdigest()
def compare(self, other_snapshot):
return self.version == other_snapshot.version
#### 2.2 上下文恢复
class ContextManager: def **init**(self): self.snapshots = [] self.current_context = {} def create_snapshot(self): snapshot = ContextSnapshot(self.current_context) self.snapshots.append(snapshot) return snapshot def restore_snapshot(self, snapshot): self.current_context = copy.deepcopy(snapshot.context) def rollback_to(self, version): for snapshot in reversed(self.snapshots): if snapshot.version == version: self.restore_snapshot(snapshot) return True return False
bash
### 3. 会话状态
#### 3.1 会话管理
python
class SessionManager: def **init**(self): self.sessions = {} self.current_session_id = None
bash
def create_session(self):
session_id = generate_id()
self.sessions[session_id] = {
"id": session_id,
"created_at": datetime.now(),
"context": {},
"history": [],
"state": "ACTIVE"
}
self.current_session_id = session_id
return session_id
def get_session(self, session_id):
return self.sessions.get(session_id)
def update_session(self, session_id, updates):
if session_id in self.sessions:
self.sessions[session_id].update(updates)
def close_session(self, session_id):
if session_id in self.sessions:
self.sessions[session_id]["state"] = "CLOSED"
self.sessions[session_id]["closed_at"] = datetime.now()
## 错误处理
### 1\. 错误传播
#### 1.1 错误类型
## 错误类型
### Skill 错误
* SkillNotFoundError: Skill 不存在
* SkillExecutionError: Skill 执行失败
* SkillTimeoutError: Skill 执行超时
### 参数错误
* ParameterValidationError: 参数验证失败
* MissingParameterError: 缺少必需参数
* InvalidParameterError: 参数值无效
### 上下文错误
* ContextNotFoundError: 上下文不存在
* ContextInvalidError: 上下文无效
* ContextTimeoutError: 上下文获取超时
bash
#### 1.2 错误处理策略
python
class ErrorHandler: def **init**(self): self.retries = {} self.fallbacks = {}
bash
def handle_error(self, error, context):
error_type = type(error).__name__
# 检查是否应该重试
if self.should_retry(error_type):
return self.retry(error, context)
# 检查是否有回退方案
if self.has_fallback(error_type):
return self.fallback(error, context)
# 否则抛出错误
raise error
def should_retry(self, error_type):
return error_type in self.retries
def retry(self, error, context):
retry_config = self.retries[type(error).__name__]
max_attempts = retry_config.get("max_attempts", 3)
delay = retry_config.get("delay", 1)
attempt = context.get("attempt", 0) + 1
if attempt < max_attempts:
context["attempt"] = attempt
time.sleep(delay * attempt)
return "RETRY"
return error
def has_fallback(self, error_type):
return error_type in self.fallbacks
def fallback(self, error, context):
fallback_func = self.fallbacks[type(error).__name__]
return fallback_func(error, context)
### 2\. 错误恢复
#### 2.1 恢复策略
## 恢复策略
### 自动恢复
* 重试机制
* 回退方案
* 降级处理
### 手动恢复
* 用户确认
* 参数修正
* 上下文调整
### 状态恢复
* 快照恢复
* 断点续传
* 事务回滚
bash
#### 2.2 恢复实现
python
class RecoveryManager: def **init**(self): self.checkpoints = {}
bash
def create_checkpoint(self, execution_id, state):
self.checkpoints[execution_id] = {
"timestamp": datetime.now(),
"state": copy.deepcopy(state)
}
def restore_checkpoint(self, execution_id):
if execution_id in self.checkpoints:
return copy.deepcopy(self.checkpoints[execution_id]["state"])
return None
def recover_from_error(self, error, execution_id):
# 恢复到检查点
state = self.restore_checkpoint(execution_id)
if state:
# 尝试恢复执行
return self.resume_execution(state)
# 如果没有检查点,尝试其他恢复策略
return self.alternative_recovery(error)
## 性能优化
### 1\. 并行执行
#### 1.1 并行策略
class ParallelExecutor: def **init**(self, max_workers=4): self.max_workers = max_workers self.executor = ThreadPoolExecutor(max_workers=max_workers) async def execute_parallel(self, tasks): futures = [] for task in tasks: future = self.executor.submit(task.execute) futures.append(future) results = [] for future in futures: result = await asyncio.wrap_future(future) results.append(result) return results
bash
#### 1.2 依赖管理
python
class DependencyManager: def **init**(self): self.dependencies = {}
bash
def add_dependency(self, task, depends_on):
if task not in self.dependencies:
self.dependencies[task] = []
self.dependencies[task].extend(depends_on)
def get_execution_order(self, tasks):
order = []
visited = set()
def visit(task):
if task in visited:
return
visited.add(task)
for dep in self.dependencies.get(task, []):
visit(dep)
order.append(task)
for task in tasks:
visit(task)
return order
### 2\. 资源管理
#### 2.1 资源池
class ResourcePool: def **init**(self, max_resources): self.max_resources = max_resources self.available = max_resources self.lock = asyncio.Lock() async def acquire(self): async with self.lock: while self.available <= 0: await asyncio.sleep(0.1) self.available -= 1 return True async def release(self): async with self.lock: self.available += 1 async def **aenter**(self): await self.acquire() return self async def **aexit**(self, exc_type, exc_val, exc_tb): await self.release()
bash
#### 2.2 资源监控
python
class ResourceMonitor: def **init**(self): self.metrics = defaultdict(list)
bash
def record_metric(self, name, value):
self.metrics[name].append({
"value": value,
"timestamp": datetime.now()
})
def get_average(self, name):
values = [m["value"] for m in self.metrics[name]]
return sum(values) / len(values) if values else 0
def get_peak(self, name):
values = [m["value"] for m in self.metrics[name]]
return max(values) if values else 0
bash
## 总结
Skills 与主代理的交互机制是一个复杂而精密的系统,涉及多种交互模式、通信机制、状态管理、错误处理和性能优化。理解这些机制有助于:
1. **优化性能**:通过并行执行和资源管理提高性能
2. **增强可靠性**:通过完善的错误处理和恢复机制提高可靠性
3. **改善体验**:通过流式通信和事件驱动改善用户体验
4. **支持扩展**:通过灵活的交互机制支持功能扩展
在下一节中,我们将探讨 Skills 的性能优化策略,了解如何进一步提高 Skills 的执行效率。