Skip to content

15.3 Skills 與主代理的互動

互動機制概述

Skills 與主代理的互動是 Claude Code 系統的核心。主代理負責協調和管理 Skills,而 Skills 則提供具體的功能實現。本節將深入探討兩者之間的互動機制。

交互模式

1. 主动调用模式

1.1 调用流程

主动调用流程

步骤 1:任务识别

主代理接收使用者請求,分析任務型別和需求

步驟 2:Skill 選擇

根據任務需求,從可用 Skills 中選擇最合適的 Skill

步驟 3:引數準備

準備 Skill 需要的引數和上下文資訊

步驟 4:Skill 呼叫

呼叫選定的 Skill,傳遞引數和上下文

步驟 5:結果處理

接收 Skill 的執行結果,進行必要的處理和整合

步驟 6:響應生成

基於 Skill 的結果,生成最終的響應返回給使用者

python
#### 1.2 代码示例

    bash


    python

    class MainAgent:
        def __init__(self):
            self.skills = load_skills()
            self.context_manager = ContextManager()

        def process_request(self, user_request):
            # 1. 任务识别
            task = self.analyze_task(user_request)

            # 2. Skill 选择
            skill = self.select_skill(task)

            # 3. 参数准备
            context = self.context_manager.collect_context(skill, task)
            parameters = self.prepare_parameters(task, context)

            # 4. Skill 调用
            result = skill.execute(parameters, context)

            # 5. 结果处理
            processed_result = self.process_result(result, context)

            # 6. 响应生成
            response = self.generate_response(processed_result)

            return response

    ### 2. 被动调用模式

    #### 2.1 调用流程

    ## 被动调用流程
    ### 步骤 1:用户指定
    用户明确指定要使用的 Skill
    ### 步骤 2:参数验证
    验证用户提供的参数是否有效
    ### 步骤 3:上下文收集
    收集 Skill 需要的上下文信息
    ### 步骤 4:Skill 执行
    执行指定的 Skill
    ### 步骤 5:结果返回
    直接返回 Skill 的执行结果

#### 2.2 代码示例

    bash


    python

    class MainAgent:
        def execute_skill(self, skill_name, user_parameters):
            # 1. 验证 Skill 存在
            if skill_name not in self.skills:
                raise SkillNotFoundError(skill_name)

            skill = self.skills[skill_name]

            # 2. 参数验证
            validated_params = skill.validate_parameters(user_parameters)

            # 3. 上下文收集
            context = self.context_manager.collect_context(skill, validated_params)

            # 4. Skill 执行
            result = skill.execute(validated_params, context)

            # 5. 结果返回
            return result

    ### 3. 嵌套调用模式

    #### 3.1 调用流程

    ## 嵌套调用流程
    ### 示例场景:部署应用
    ### 调用层次
    主代理
    └─> 部署 Skill
    ├─> 测试 Skill
    │   └─> 代码分析 Skill
    │       └─> 文档检查 Skill
    ├─> 构建 Skill
    │   └─> 依赖检查 Skill
    └─> 验证 Skill
    └─> 健康检查 Skill
    ### 执行流程

#### 3.2 代码示例

    bash


    python

    class DeploymentSkill(Skill):
        def execute(self, parameters, context):
            # 调用测试 Skill
            test_result = self.call_skill("test", context)

            if not test_result.success:
                return DeploymentResult(success=False, error="Tests failed")

            # 调用构建 Skill
            build_result = self.call_skill("build", context)

            if not build_result.success:
                return DeploymentResult(success=False, error="Build failed")

            # 执行部署
            deploy_result = self.deploy(build_result.artifact)

            # 调用验证 Skill
            verify_result = self.call_skill("verify", context)

            return DeploymentResult(
                success=verify_result.success,
                deploy_result=deploy_result,
                verify_result=verify_result
            )

    ## 通信机制

    ### 1. 消息传递

    #### 1.1 消息格式

    ## 消息格式
    ### 请求消息
    ~~~`json
    `json

    {
    "message_id": "msg_123456",
    "timestamp": "2024-01-15T10:30:00Z",
    "type": "skill_request",
    "skill_name": "code-review",
    "parameters": {
    "file": "src/main.py",
    "strict": true
    },
    "context": {
    "project": {...},
    "code": {...},
    "user": {...}
    }
    }

    ```### 響應訊息

json

{ "message_id": "msg_123456", "timestamp": "2024-01-15T10:30:15Z", "type": "skill_response", "status": "success", "result": { "issues": [...], "summary": {...} }, "metadata": { "execution_time": 15.2, "memory_used": "256MB" } }

错误消息

bash


````json
markdown

## 事件型別

### Skill 事件

  * skill_started: Skill 開始執行
  * skill_progress: Skill 執行進度更新
  * skill_completed: Skill 執行完成
  * skill_failed: Skill 執行失敗

### 上下文事件

  * context_updated: 上下文更新
  * context_invalidated: 上下文失效

### 工具事件

  * tool_called: 工具被呼叫
  * tool_completed: 工具執行完成
  * tool_failed: 工具執行失敗

#### 2.2 事件處理

class EventHandler: def **init**(self): self.listeners = defaultdict(list) def on(self, event_type, callback): self.listeners[event_type].append(callback) async def emit(self, event_type, data): for callback in self.listeners.get(event_type, []): await callback(data) async def handle_skill_started(self, event): print(f"Skill {event.skill_name} started") async def handle_skill_progress(self, event): print(f"Progress: {event.progress}%") async def handle_skill_completed(self, event): print(f"Skill {event.skill_name} completed")

    bash


    ### 3. 流式通訊
    #### 3.1 流式輸出

python

class StreamingSkill(Skill): async def execute_stream(self, parameters, context): # 步驟 1 yield {"step": 1, "message": "Analyzing code..."} result1 = await self.analyze_code(parameters, context)

    bash


        # 步驟 2
        yield {"step": 2, "message": "Checking security..."}
        result2 = await self.check_security(result1, context)

        # 步驟 3
        yield {"step": 3, "message": "Generating report..."}
        result3 = await self.generate_report(result2, context)

        # 最終結果
        yield {"step": 4, "message": "Completed", "result": result3}

#### 3.2 流式消費

async def consume_stream(skill, parameters, context): async for chunk in skill.execute_stream(parameters, context): if "message" in chunk: print(chunk["message"]) if "result" in chunk: return chunk["result"]

    bash


    ## 狀態管理
    ### 1. 執行狀態
    #### 1.1 狀態型別

markdown

## 執行狀態

### 狀態定義

  * PENDING: 等待執行
  * RUNNING: 正在執行
  * PAUSED: 已暫停
  * COMPLETED: 已完成
  * FAILED: 執行失敗
  * CANCELLED: 已取消

### 狀態轉換

PENDING → RUNNING → COMPLETED PENDING → RUNNING → FAILED RUNNING → PAUSED → RUNNING RUNNING → CANCELLED

#### 1.2 狀態管理

class ExecutionState: def **init**(self): self.state = "PENDING" self.start_time = None self.end_time = None self.progress = 0 self.error = None def start(self): self.state = "RUNNING" self.start_time = datetime.now() def complete(self): self.state = "COMPLETED" self.end_time = datetime.now() def fail(self, error): self.state = "FAILED" self.error = error self.end_time = datetime.now() def update_progress(self, progress): self.progress = progress def get_duration(self): if self.start_time and self.end_time: return (self.end_time - self.start_time).total_seconds() return None

    bash


    ### 2. 上下文狀態
    #### 2.1 上下文快照

python

class ContextSnapshot: def **init**(self, context): self.timestamp = datetime.now() self.context = copy.deepcopy(context) self.version = self.generate_version()

    bash


    def generate_version(self):
        return hashlib.md5(
            json.dumps(self.context, sort_keys=True).encode()
        ).hexdigest()

    def compare(self, other_snapshot):
        return self.version == other_snapshot.version

#### 2.2 上下文恢復

class ContextManager: def **init**(self): self.snapshots = [] self.current_context = {} def create_snapshot(self): snapshot = ContextSnapshot(self.current_context) self.snapshots.append(snapshot) return snapshot def restore_snapshot(self, snapshot): self.current_context = copy.deepcopy(snapshot.context) def rollback_to(self, version): for snapshot in reversed(self.snapshots): if snapshot.version == version: self.restore_snapshot(snapshot) return True return False

    bash


    ### 3. 會話狀態
    #### 3.1 會話管理

python

class SessionManager: def **init**(self): self.sessions = {} self.current_session_id = None

    bash


    def create_session(self):
        session_id = generate_id()
        self.sessions[session_id] = {
            "id": session_id,
            "created_at": datetime.now(),
            "context": {},
            "history": [],
            "state": "ACTIVE"
        }
        self.current_session_id = session_id
        return session_id

    def get_session(self, session_id):
        return self.sessions.get(session_id)

    def update_session(self, session_id, updates):
        if session_id in self.sessions:
            self.sessions[session_id].update(updates)

    def close_session(self, session_id):
        if session_id in self.sessions:
            self.sessions[session_id]["state"] = "CLOSED"
            self.sessions[session_id]["closed_at"] = datetime.now()

## 錯誤處理

### 1\. 錯誤傳播

#### 1.1 錯誤型別

## 錯誤型別

### Skill 錯誤

  * SkillNotFoundError: Skill 不存在
  * SkillExecutionError: Skill 執行失敗
  * SkillTimeoutError: Skill 執行超時

### 引數錯誤

  * ParameterValidationError: 引數驗證失敗
  * MissingParameterError: 缺少必需引數
  * InvalidParameterError: 引數值無效

### 上下文錯誤

  * ContextNotFoundError: 上下文不存在
  * ContextInvalidError: 上下文無效
  * ContextTimeoutError: 上下文獲取超時

    bash


    #### 1.2 錯誤處理策略

python

class ErrorHandler: def **init**(self): self.retries = {} self.fallbacks = {}

    bash


    def handle_error(self, error, context):
        error_type = type(error).__name__

        # 檢查是否應該重試
        if self.should_retry(error_type):
            return self.retry(error, context)

        # 檢查是否有回退方案
        if self.has_fallback(error_type):
            return self.fallback(error, context)

        # 否則丟擲錯誤
        raise error

    def should_retry(self, error_type):
        return error_type in self.retries

    def retry(self, error, context):
        retry_config = self.retries[type(error).__name__]
        max_attempts = retry_config.get("max_attempts", 3)
        delay = retry_config.get("delay", 1)

        attempt = context.get("attempt", 0) + 1
        if attempt < max_attempts:
            context["attempt"] = attempt
            time.sleep(delay * attempt)
            return "RETRY"

        return error

    def has_fallback(self, error_type):
        return error_type in self.fallbacks

    def fallback(self, error, context):
        fallback_func = self.fallbacks[type(error).__name__]
        return fallback_func(error, context)

### 2\. 錯誤恢復

#### 2.1 恢復策略

## 恢復策略

### 自動恢復

  * 重試機制
  * 回退方案
  * 降級處理

### 手動恢復

  * 使用者確認
  * 引數修正
  * 上下文調整

### 狀態恢復

  * 快照恢復
  * 斷點續傳
  * 事務回滾

    bash


    #### 2.2 恢復實現

python

class RecoveryManager: def **init**(self): self.checkpoints = {}

    bash


    def create_checkpoint(self, execution_id, state):
        self.checkpoints[execution_id] = {
            "timestamp": datetime.now(),
            "state": copy.deepcopy(state)
        }

    def restore_checkpoint(self, execution_id):
        if execution_id in self.checkpoints:
            return copy.deepcopy(self.checkpoints[execution_id]["state"])
        return None

    def recover_from_error(self, error, execution_id):
        # 恢復到檢查點
        state = self.restore_checkpoint(execution_id)
        if state:
            # 嘗試恢復執行
            return self.resume_execution(state)

        # 如果沒有檢查點,嘗試其他恢復策略
        return self.alternative_recovery(error)

## 效能最佳化

### 1\. 並行執行

#### 1.1 並行策略

class ParallelExecutor: def **init**(self, max_workers=4): self.max_workers = max_workers self.executor = ThreadPoolExecutor(max_workers=max_workers) async def execute_parallel(self, tasks): futures = [] for task in tasks: future = self.executor.submit(task.execute) futures.append(future) results = [] for future in futures: result = await asyncio.wrap_future(future) results.append(result) return results

    bash


    #### 1.2 依賴管理

python

class DependencyManager: def **init**(self): self.dependencies = {}

    bash


    def add_dependency(self, task, depends_on):
        if task not in self.dependencies:
            self.dependencies[task] = []
        self.dependencies[task].extend(depends_on)

    def get_execution_order(self, tasks):
        order = []
        visited = set()

        def visit(task):
            if task in visited:
                return
            visited.add(task)

            for dep in self.dependencies.get(task, []):
                visit(dep)

            order.append(task)

        for task in tasks:
            visit(task)

        return order

### 2\. 資源管理

#### 2.1 資源池

class ResourcePool: def **init**(self, max_resources): self.max_resources = max_resources self.available = max_resources self.lock = asyncio.Lock() async def acquire(self): async with self.lock: while self.available <= 0: await asyncio.sleep(0.1) self.available -= 1 return True async def release(self): async with self.lock: self.available += 1 async def **aenter**(self): await self.acquire() return self async def **aexit**(self, exc_type, exc_val, exc_tb): await self.release()

    bash


    #### 2.2 資源監控

python

class ResourceMonitor: def **init**(self): self.metrics = defaultdict(list)

    bash


    def record_metric(self, name, value):
        self.metrics[name].append({
            "value": value,
            "timestamp": datetime.now()
        })

    def get_average(self, name):
        values = [m["value"] for m in self.metrics[name]]
        return sum(values) / len(values) if values else 0

    def get_peak(self, name):
        values = [m["value"] for m in self.metrics[name]]
        return max(values) if values else 0

    bash


    ## 總結

    Skills 與主代理的互動機制是一個複雜而精密的系統,涉及多種互動模式、通訊機制、狀態管理、錯誤處理和效能最佳化。理解這些機制有助於:

    1. **最佳化效能**:透過並行執行和資源管理提高效能
    2. **增強可靠性**:透過完善的錯誤處理和恢復機制提高可靠性
    3. **改善體驗**:透過流式通訊和事件驅動改善使用者體驗
    4. **支援擴充套件**:透過靈活的互動機制支援功能擴充套件

    在下一節中,我們將探討 Skills 的效能最佳化策略,瞭解如何進一步提高 Skills 的執行效率。

基于 MIT 许可发布 | 永久导航