Skip to content

26.1 子Agent概述

子Agent概述

子Agent是Claude Code中的一個重要概念,它允許主Agent將複雜任務分解為多個子任務,並委託給專門的子Agent來處理。透過子Agent機制,Claude Code可以更高效地處理複雜的程式設計任務,提高任務完成的質量和速度。

子Agent的概念

1. 什么是子Agent

子Agent是主Agent的輔助實體,具有以下特點:

  • 專業化 : 每個子Agent專注於特定的任務領域
  • 獨立性 : 子Agent可以獨立完成任務,不依賴主Agent的持續干預
  • 協作性 : 多個子Agent可以協同工作,完成複雜的任務
  • 可複用性 : 子Agent可以在不同的任務中被重複使用

2. 子Agent vs 主Agent

特性主Agent子Agent

職責| 協調和決策| 執行特定任務 互動| 與使用者直接互動| 與主Agent互動 範圍| 全域性視角| 區域性視角 自主性| 高| 中等 專業化程度| 通用| 專用

子Agent的类型

1. 代码生成子Agent

示例:代码生成子Agent

使用者請求: "建立一個使用者認證系統的程式碼生成子Agent"

Claude Code 生成的子Agent:

python
    python


    ````python

    > - 专注于代码生成
    > - 包含模板库和最佳实践
    > - 自动验证生成的代码
    > - 生成配套文档
    `python

    class CodeReviewAgent:
    """代码审查子Agent"""

    def __init__(self, context):
    self.context = context
    self.rules = self._load_review_rules()
    self.patterns = self._load_antipatterns()

    def review_code(self, code, language='python'):
    """审查代码"""
    review = {
    'summary': {},
    'issues': [],
    'suggestions': [],
    'metrics': {}
    }

     # 执行静态分析

    static_analysis = self._static_analysis(code, language)
    review['issues'].extend(static_analysis['issues'])

     # 检查代码模式

    pattern_analysis = self._check_patterns(code, language)
    review['issues'].extend(pattern_analysis['issues'])
    review['suggestions'].extend(pattern_analysis['suggestions'])

     # 计算代码指标

    metrics = self._calculate_metrics(code, language)
    review['metrics'] = metrics

     # 生成总结

    review['summary'] = self._generate_summary(review)

    return review

    def _static_analysis(self, code, language):
    """静态分析"""
    issues = []

     # 检查未使用的导入

    unused_imports = self._check_unused_imports(code, language)
    issues.extend(unused_imports)

     # 检查未使用的变量

    unused_variables = self._check_unused_variables(code, language)
    issues.extend(unused_variables)

     # 检查代码复杂度

    complexity_issues = self._check_complexity(code, language)
    issues.extend(complexity_issues)

     # 检查安全问题

    security_issues = self._check_security(code, language)
    issues.extend(security_issues)

    return {'issues': issues}

    def _check_unused_imports(self, code, language):
    """检查未使用的导入"""
    issues = []

    if language == 'python':
    import ast
    import re

    tree = ast.parse(code)

     # 获取所有导入

    imports = set()
    for node in ast.walk(tree):
    if isinstance(node, ast.Import):
    for alias in node.names:
    imports.add(alias.name.split('.')[0])
    elif isinstance(node, ast.ImportFrom):
    if node.module:
    imports.add(node.module.split('.')[0])

     # 检查是否使用

    for imp in imports:
    if imp not in code[code.find(imp):]:
    issues.append({
    'type': 'unused_import',
    'severity': 'warning',
    'message': f"Unused import: {imp}",
    'line': self._find_line(code, imp)
    })

    return issues

    def _check_complexity(self, code, language):
    """检查代码复杂度"""
    issues = []

    if language == 'python':
    import ast

    tree = ast.parse(code)

    for node in ast.walk(tree):
    if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
    complexity = self._calculate_cyclomatic_complexity(node)

    if complexity > 10:
    issues.append({
    'type': 'high_complexity',
    'severity': 'warning',
    'message': f"Function '{node.name}' has high complexity ({complexity})",
    'line': node.lineno,
    'complexity': complexity
    })

    return issues

    def _calculate_cyclomatic_complexity(self, node):
    """计算圈复杂度"""
    complexity = 1

    for child in ast.walk(node):
    if isinstance(child, (ast.If, ast.While, ast.For, ast.AsyncFor)):
    complexity += 1
    elif isinstance(child, ast.ExceptHandler):
    complexity += 1
    elif isinstance(child, ast.BoolOp):
    complexity += len(child.values) - 1

    return complexity

    def _check_security(self, code, language):
    """检查安全问题"""
    issues = []

     # 检查硬编码密钥

    if 'password' in code.lower() and '=' in code:
    lines = code.split('\n')
    for i, line in enumerate(lines, 1):
    if 'password' in line.lower() and '"' in line:
    issues.append({
    'type': 'security',
    'severity': 'error',
    'message': 'Possible hardcoded password detected',
    'line': i
    })

     # 检查SQL注入风险

    if 'execute' in code and '%' in code:
    lines = code.split('\n')
    for i, line in enumerate(lines, 1):
    if 'execute' in line and '%' in line and not 'param' in line.lower():
    issues.append({
    'type': 'security',
    'severity': 'error',
    'message': 'Possible SQL injection vulnerability',
    'line': i
    })

    return issues

    def _check_patterns(self, code, language):
    """检查代码模式"""
    issues = []
    suggestions = []

     # 检查长函数

    if language == 'python':
    import ast

    tree = ast.parse(code)

    for node in ast.walk(tree):
    if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
    end_line = node.end_lineno if hasattr(node, 'end_lineno') else node.lineno
    length = end_line - node.lineno

    if length > 50:
    issues.append({
    'type': 'long_function',
    'severity': 'warning',
    'message': f"Function '{node.name}' is too long ({length} lines)",
    'line': node.lineno,
    'length': length
    })
    suggestions.append({
    'type': 'refactor',
    'message': f"Consider breaking down '{node.name}' into smaller functions",
    'line': node.lineno
    })

    return {'issues': issues, 'suggestions': suggestions}

    def _calculate_metrics(self, code, language):
    """计算代码指标"""
    metrics = {}

    lines = code.split('\n')

     # 总行数

    metrics['total_lines'] = len(lines)

     # 代码行数(排除空行和注释)

    code_lines = [line for line in lines if line.strip() and not line.strip().startswith('#')]
    metrics['code_lines'] = len(code_lines)

     # 注释行数

    comment_lines = [line for line in lines if line.strip().startswith('#')]
    metrics['comment_lines'] = len(comment_lines)

     # 空行数

    blank_lines = [line for line in lines if not line.strip()]
    metrics['blank_lines'] = len(blank_lines)

     # 注释率

    metrics['comment_ratio'] = metrics['comment_lines'] / metrics['code_lines'] if metrics['code_lines'] > 0 else 0

    return metrics

    def _generate_summary(self, review):
    """生成审查总结"""
    summary = {
    'total_issues': len(review['issues']),
    'critical_issues': len([i for i in review['issues'] if i['severity'] == 'error']),
    'warning_issues': len([i for i in review['issues'] if i['severity'] == 'warning']),
    'total_suggestions': len(review['suggestions']),
    'code_quality': 'good'
    }

     # 评估代码质量

    if summary['critical_issues'] > 0:
    summary['code_quality'] = 'poor'
    elif summary['warning_issues'] > 5:
    summary['code_quality'] = 'fair'

    return summary

    ## 子Agent的协作

    ### 1. 串行协作

    # 示例:子Agent串行协作

    用户请求:
    "使用子Agent完成一个完整的开发任务"
    Claude Code 生成的协作流程:
    1. 代码生成子Agent生成代码
    2. 代码审查子Agent审查代码
    3. 修复发现的问题
    4. 测试生成子Agent生成测试
    5. 文档生成子Agent生成文档
    `python

    import concurrent.futures

    class ParallelAgentOrchestrator:
    """並行Agent協調器"""

    def __init__(self, context):
    self.context = context
    self.agents = {
    'code_generator': CodeGenerationAgent(context),
    'code_reviewer': CodeReviewAgent(context),
    'test_generator': TestGenerationAgent(context),
    'documenter': DocumentationAgent(context)
    }

    def execute_parallel_tasks(self, tasks):
    """並行執行任務"""
    results = {}

    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:

     # 提交任務

    future_to_task = {
    executor.submit(self.agents['code_generator'].generate_code, task): task
    for task in tasks
    }

     # 收集結果

    for future in concurrent.futures.as_completed(future_to_task):
    task = future_to_task[future]
    try:
    result = future.result()
    results[task['id']] = result
    except Exception as e:
    results[task['id']] = {'error': str(e)}

    return results

    def execute_parallel_analysis(self, code):
    """並行分析程式碼"""
    results = {}

    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:

     # 提交分析任務

    futures = {
    'static_analysis': executor.submit(
    self.agents['code_reviewer']._static_analysis,
    code,
    'python'
    ),
    'pattern_analysis': executor.submit(
    self.agents['code_reviewer']._check_patterns,
    code,
    'python'
    ),
    'security_analysis': executor.submit(
    self.agents['code_reviewer']._check_security,
    code,
    'python'
    ),
    'metrics_calculation': executor.submit(
    self.agents['code_reviewer']._calculate_metrics,
    code,
    'python'
    )
    }

     # 收集結果

    for name, future in futures.items():
    try:
    results[name] = future.result()
    except Exception as e:
    results[name] = {'error': str(e)}

    return results

    ```> **并行协作优势**:
    > - 提高任务执行速度
    > - 充分利用系统资源
    > - 缩短总执行时间

    ```

    ## 子Agent的優勢

    ### 1. 提高效率

    | 任務型別 | 單Agent | 多子Agent | 效率提升 |
    |----------|---------|-----------|----------|
    | 程式碼生成 + 審查 | 10分鐘 | 6分鐘 | 40%|
    | 多檔案處理 | 15分鐘 | 5分鐘 | 67%|
    | 複雜任務分解 | 20分鐘 | 8分鐘 | 60%|

    ### 2. 提高質量

    | 指標 | 單Agent | 多子Agent | 改善 |
    |------|---------|-----------|------|
    | 程式碼質量 | 75% | 90% | +20% |
    | 錯誤率 | 15% | 5% | 67%|
    | 測試覆蓋率 | 70% | 85% | +21% |

    ### 3. 提高可維護性

    - **模組化**: 每個子Agent獨立維護
    - **可擴充套件**: 容易新增新的子Agent
    - **可複用**: 子Agent可以在不同任務中複用

    ## 總結

    子Agent概述包括:

    1. **子Agent的概念**: 什麼是子Agent、子Agent vs 主Agent
    2. **子Agent的型別**: 程式碼生成子Agent、程式碼審查子Agent
    3. **子Agent的協作**: 序列協作、並行協作
    4. **子Agent的優勢**: 提高效率、提高質量、提高可維護性

    透過子Agent機制,Claude Code可以更高效地處理複雜的程式設計任務。

    在下一節中,我們將探討非同步子Agent任務。

    ```

    ```

基于 MIT 许可发布 | 永久导航