Skip to content

26.1 子Agent概述

子Agent概述

子Agent是Claude Code中的一个重要概念,它允许主Agent将复杂任务分解为多个子任务,并委托给专门的子Agent来处理。通过子Agent机制,Claude Code可以更高效地处理复杂的编程任务,提高任务完成的质量和速度。

子Agent的概念

1. 什么是子Agent

子Agent是主Agent的辅助实体,具有以下特点:

  • 专业化 : 每个子Agent专注于特定的任务领域
  • 独立性 : 子Agent可以独立完成任务,不依赖主Agent的持续干预
  • 协作性 : 多个子Agent可以协同工作,完成复杂的任务
  • 可复用性 : 子Agent可以在不同的任务中被重复使用

2. 子Agent vs 主Agent

特性主Agent子Agent

职责| 协调和决策| 执行特定任务 交互| 与用户直接交互| 与主Agent交互 范围| 全局视角| 局部视角 自主性| 高| 中等 专业化程度| 通用| 专用

子Agent的类型

1. 代码生成子Agent

示例:代码生成子Agent

用户请求: "创建一个用户认证系统的代码生成子Agent"

Claude Code 生成的子Agent:

python
    python


    ````python

```python
    class CodeGenerationAgent:
    """代码生成子Agent"""

    def __init__(self, context):
    self.context = context
    self.templates = {}
    self.best_practices = {}

    def generate_auth_system(self, requirements):
    """生成认证系统代码"""
     # 分析需求
    analysis = self._analyze_requirements(requirements)

     # 选择模板
    template = self._select_template(analysis)

     # 生成代码
    code = self._generate_code(template, analysis)

     # 应用最佳实践
    code = self._apply_best_practices(code)

     # 验证代码
    validation = self._validate_code(code)

    return {
    'code': code,
    'validation': validation,
    'documentation': self._generate_documentation(code)
    }

    def _analyze_requirements(self, requirements):
    """分析需求"""
    analysis = {
    'auth_methods': [],
    'security_level': 'medium',
    'features': []
    }

    if 'jwt' in requirements:
    analysis['auth_methods'].append('jwt')

    if 'oauth' in requirements:
    analysis['auth_methods'].append('oauth')

    if '2fa' in requirements:
    analysis['features'].append('2fa')

    if 'session' in requirements:
    analysis['auth_methods'].append('session')

    return analysis

    def _select_template(self, analysis):
    """选择模板"""
    if 'jwt' in analysis['auth_methods']:
    return 'jwt_auth_template'
    elif 'oauth' in analysis['auth_methods']:
    return 'oauth_auth_template'
    else:
    return 'basic_auth_template'

    def _generate_code(self, template, analysis):
    """生成代码"""
    if template == 'jwt_auth_template':
    return self._generate_jwt_auth(analysis)
    elif template == 'oauth_auth_template':
    return self._generate_oauth_auth(analysis)
    else:
    return self._generate_basic_auth(analysis)

    def _generate_jwt_auth(self, analysis):
    """生成JWT认证代码"""
    code = '''
    from flask import Flask, request, jsonify
    from flask_jwt_extended import (
    JWTManager, create_access_token,
    jwt_required, get_jwt_identity
    )
    from datetime import timedelta
    import bcrypt

    app = Flask(__name__)
    app.config['JWT_SECRET_KEY'] = 'your-secret-key'
    app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=24)

    jwt = JWTManager(app)

    class AuthService:
    """认证服务"""

    def __init__(self, db):
    self.db = db

    def register(self, username, password):
    """用户注册"""
     # 检查用户名是否存在
    if self._user_exists(username):
    raise ValueError("Username already exists")

     # 哈希密码
    password_hash = bcrypt.hashpw(
    password.encode('utf-8'),
    bcrypt.gensalt()
    ).decode('utf-8')

     # 创建用户
    user = User(username=username, password_hash=password_hash)
    self.db.session.add(user)
    self.db.session.commit()

    return user

    def login(self, username, password):
    """用户登录"""
     # 查找用户
    user = User.query.filter_by(username=username).first()

    if not user:
    raise ValueError("Invalid username or password")

     # 验证密码
    if not bcrypt.checkpw(
    password.encode('utf-8'),
    user.password_hash.encode('utf-8')
    ):
    raise ValueError("Invalid username or password")

     # 生成访问令牌
    access_token = create_access_token(identity=user.id)

    return {
    'access_token': access_token,
    'user': {
    'id': user.id,
    'username': user.username
    }
    }

    def _user_exists(self, username):
    """检查用户名是否存在"""
    return User.query.filter_by(username=username).first() is not None
    '''
    return code

    def _apply_best_practices(self, code):
    """应用最佳实践"""
     # 添加错误处理
    code = self._add_error_handling(code)

     # 添加日志记录
    code = self._add_logging(code)

     # 添加输入验证
    code = self._add_input_validation(code)

    return code

    def _validate_code(self, code):
    """验证代码"""
    validation = {
    'syntax': True,
    'security': True,
    'best_practices': True,
    'issues': []
    }

     # 检查语法
    try:
    compile(code, '<string>', 'exec')
    except SyntaxError as e:
    validation['syntax'] = False
    validation['issues'].append(f"Syntax error: {e}")

     # 检查安全性
    if 'SECRET_KEY' in code and 'your-secret-key' in code:
    validation['security'] = False
    validation['issues'].append("Default secret key detected")

    return validation

    def _generate_documentation(self, code):
    """生成文档"""
    return {
    'description': 'JWT-based authentication system',
    'endpoints': [
    {
    'path': '/register',
    'method': 'POST',
    'description': 'Register a new user',
    'parameters': {
    'username': 'string (required)',
    'password': 'string (required)'
    }
    },
    {
    'path': '/login',
    'method': 'POST',
    'description': 'Login with username and password',
    'parameters': {
    'username': 'string (required)',
    'password': 'string (required)'
    }
    }
    ],
    'security': {
    'password_hashing': 'bcrypt',
    'token': 'JWT',
    'token_expiry': '24 hours'
    }
    }

    ```> **子Agent特点**:

    > - 专注于代码生成
    > - 包含模板库和最佳实践
    > - 自动验证生成的代码
    > - 生成配套文档

    ```
    ### 2. 代码审查子Agent

    # 示例:代码审查子Agent
    用户请求:
    "创建一个代码审查子Agent"
    Claude Code 生成的子Agent:

    ````python

    `python

    class CodeReviewAgent:
    """代码审查子Agent"""

    def __init__(self, context):
    self.context = context
    self.rules = self._load_review_rules()
    self.patterns = self._load_antipatterns()

    def review_code(self, code, language='python'):
    """审查代码"""
    review = {
    'summary': {},
    'issues': [],
    'suggestions': [],
    'metrics': {}
    }

     # 执行静态分析

    static_analysis = self._static_analysis(code, language)
    review['issues'].extend(static_analysis['issues'])

     # 检查代码模式

    pattern_analysis = self._check_patterns(code, language)
    review['issues'].extend(pattern_analysis['issues'])
    review['suggestions'].extend(pattern_analysis['suggestions'])

     # 计算代码指标

    metrics = self._calculate_metrics(code, language)
    review['metrics'] = metrics

     # 生成总结

    review['summary'] = self._generate_summary(review)

    return review

    def _static_analysis(self, code, language):
    """静态分析"""
    issues = []

     # 检查未使用的导入

    unused_imports = self._check_unused_imports(code, language)
    issues.extend(unused_imports)

     # 检查未使用的变量

    unused_variables = self._check_unused_variables(code, language)
    issues.extend(unused_variables)

     # 检查代码复杂度

    complexity_issues = self._check_complexity(code, language)
    issues.extend(complexity_issues)

     # 检查安全问题

    security_issues = self._check_security(code, language)
    issues.extend(security_issues)

    return {'issues': issues}

    def _check_unused_imports(self, code, language):
    """检查未使用的导入"""
    issues = []

    if language == 'python':
    import ast
    import re

    tree = ast.parse(code)

     # 获取所有导入

    imports = set()
    for node in ast.walk(tree):
    if isinstance(node, ast.Import):
    for alias in node.names:
    imports.add(alias.name.split('.')[0])
    elif isinstance(node, ast.ImportFrom):
    if node.module:
    imports.add(node.module.split('.')[0])

     # 检查是否使用

    for imp in imports:
    if imp not in code[code.find(imp):]:
    issues.append({
    'type': 'unused_import',
    'severity': 'warning',
    'message': f"Unused import: {imp}",
    'line': self._find_line(code, imp)
    })

    return issues

    def _check_complexity(self, code, language):
    """检查代码复杂度"""
    issues = []

    if language == 'python':
    import ast

    tree = ast.parse(code)

    for node in ast.walk(tree):
    if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
    complexity = self._calculate_cyclomatic_complexity(node)

    if complexity > 10:
    issues.append({
    'type': 'high_complexity',
    'severity': 'warning',
    'message': f"Function '{node.name}' has high complexity ({complexity})",
    'line': node.lineno,
    'complexity': complexity
    })

    return issues

    def _calculate_cyclomatic_complexity(self, node):
    """计算圈复杂度"""
    complexity = 1

    for child in ast.walk(node):
    if isinstance(child, (ast.If, ast.While, ast.For, ast.AsyncFor)):
    complexity += 1
    elif isinstance(child, ast.ExceptHandler):
    complexity += 1
    elif isinstance(child, ast.BoolOp):
    complexity += len(child.values) - 1

    return complexity

    def _check_security(self, code, language):
    """检查安全问题"""
    issues = []

     # 检查硬编码密钥

    if 'password' in code.lower() and '=' in code:
    lines = code.split('\n')
    for i, line in enumerate(lines, 1):
    if 'password' in line.lower() and '"' in line:
    issues.append({
    'type': 'security',
    'severity': 'error',
    'message': 'Possible hardcoded password detected',
    'line': i
    })

     # 检查SQL注入风险

    if 'execute' in code and '%' in code:
    lines = code.split('\n')
    for i, line in enumerate(lines, 1):
    if 'execute' in line and '%' in line and not 'param' in line.lower():
    issues.append({
    'type': 'security',
    'severity': 'error',
    'message': 'Possible SQL injection vulnerability',
    'line': i
    })

    return issues

    def _check_patterns(self, code, language):
    """检查代码模式"""
    issues = []
    suggestions = []

     # 检查长函数

    if language == 'python':
    import ast

    tree = ast.parse(code)

    for node in ast.walk(tree):
    if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
    end_line = node.end_lineno if hasattr(node, 'end_lineno') else node.lineno
    length = end_line - node.lineno

    if length > 50:
    issues.append({
    'type': 'long_function',
    'severity': 'warning',
    'message': f"Function '{node.name}' is too long ({length} lines)",
    'line': node.lineno,
    'length': length
    })
    suggestions.append({
    'type': 'refactor',
    'message': f"Consider breaking down '{node.name}' into smaller functions",
    'line': node.lineno
    })

    return {'issues': issues, 'suggestions': suggestions}

    def _calculate_metrics(self, code, language):
    """计算代码指标"""
    metrics = {}

    lines = code.split('\n')

     # 总行数

    metrics['total_lines'] = len(lines)

     # 代码行数(排除空行和注释)

    code_lines = [line for line in lines if line.strip() and not line.strip().startswith('#')]
    metrics['code_lines'] = len(code_lines)

     # 注释行数

    comment_lines = [line for line in lines if line.strip().startswith('#')]
    metrics['comment_lines'] = len(comment_lines)

     # 空行数

    blank_lines = [line for line in lines if not line.strip()]
    metrics['blank_lines'] = len(blank_lines)

     # 注释率

    metrics['comment_ratio'] = metrics['comment_lines'] / metrics['code_lines'] if metrics['code_lines'] > 0 else 0

    return metrics

    def _generate_summary(self, review):
    """生成审查总结"""
    summary = {
    'total_issues': len(review['issues']),
    'critical_issues': len([i for i in review['issues'] if i['severity'] == 'error']),
    'warning_issues': len([i for i in review['issues'] if i['severity'] == 'warning']),
    'total_suggestions': len(review['suggestions']),
    'code_quality': 'good'
    }

     # 评估代码质量

    if summary['critical_issues'] > 0:
    summary['code_quality'] = 'poor'
    elif summary['warning_issues'] > 5:
    summary['code_quality'] = 'fair'

    return summary

    ```> **子Agent特点**:
    > - 专注于代码审查
    > - 包含审查规则库
    > - 检查代码模式和反模式
    > - 计算代码指标

    ```

    ## 子Agent的协作

    ### 1. 串行协作

    # 示例:子Agent串行协作

    用户请求:
    "使用子Agent完成一个完整的开发任务"
    Claude Code 生成的协作流程:

    ````python
    `python

    class AgentOrchestrator:
    """Agent协调器"""

    def __init__(self, context):
    self.context = context
    self.agents = {
    'code_generator': CodeGenerationAgent(context),
    'code_reviewer': CodeReviewAgent(context),
    'test_generator': TestGenerationAgent(context),
    'documenter': DocumentationAgent(context)
    }

    def execute_task(self, task):
    """执行任务"""
    results = {}

     # 步骤1: 生成代码
    print("Step 1: Generating code...")
    code_result = self.agents['code_generator'].generate_code(task)
    results['code'] = code_result

     # 步骤2: 审查代码
    print("Step 2: Reviewing code...")
    review_result = self.agents['code_reviewer'].review_code(
    code_result['code'],
    language=task.get('language', 'python')
    )
    results['review'] = review_result

     # 步骤3: 修复问题
    if review_result['summary']['critical_issues'] > 0:
    print("Step 3: Fixing critical issues...")
    fixed_code = self._fix_issues(
    code_result['code'],
    review_result['issues']
    )
    results['fixed_code'] = fixed_code
    else:
    results['fixed_code'] = code_result['code']

     # 步骤4: 生成测试
    print("Step 4: Generating tests...")
    test_result = self.agents['test_generator'].generate_tests(
    results['fixed_code'],
    language=task.get('language', 'python')
    )
    results['tests'] = test_result

     # 步骤5: 生成文档
    print("Step 5: Generating documentation...")
    doc_result = self.agents['documenter'].generate_documentation(
    results['fixed_code'],
    test_result['tests']
    )
    results['documentation'] = doc_result

    return results

    def _fix_issues(self, code, issues):
    """修复代码问题"""
    fixed_code = code

    for issue in issues:
    if issue['type'] == 'security':
    fixed_code = self._fix_security_issue(fixed_code, issue)
    elif issue['type'] == 'high_complexity':
    fixed_code = self._refactor_complex_function(fixed_code, issue)

    return fixed_code

    def _fix_security_issue(self, code, issue):
    """修复安全问题"""
     # 实现安全修复逻辑
    return code

    def _refactor_complex_function(self, code, issue):
    """重构复杂函数"""
     # 实现重构逻辑
    return code

    ```> **协作流程**:

    1. 代码生成子Agent生成代码
    2. 代码审查子Agent审查代码
    3. 修复发现的问题
    4. 测试生成子Agent生成测试
    5. 文档生成子Agent生成文档

    ```
    ### 2. 并行协作

    # 示例:子Agent并行协作
    用户请求:
    "使用子Agent并行处理多个任务"
    Claude Code 生成的并行协作流程:

    ````python

    `python

    import concurrent.futures

    class ParallelAgentOrchestrator:
    """并行Agent协调器"""

    def __init__(self, context):
    self.context = context
    self.agents = {
    'code_generator': CodeGenerationAgent(context),
    'code_reviewer': CodeReviewAgent(context),
    'test_generator': TestGenerationAgent(context),
    'documenter': DocumentationAgent(context)
    }

    def execute_parallel_tasks(self, tasks):
    """并行执行任务"""
    results = {}

    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:

     # 提交任务

    future_to_task = {
    executor.submit(self.agents['code_generator'].generate_code, task): task
    for task in tasks
    }

     # 收集结果

    for future in concurrent.futures.as_completed(future_to_task):
    task = future_to_task[future]
    try:
    result = future.result()
    results[task['id']] = result
    except Exception as e:
    results[task['id']] = {'error': str(e)}

    return results

    def execute_parallel_analysis(self, code):
    """并行分析代码"""
    results = {}

    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:

     # 提交分析任务

    futures = {
    'static_analysis': executor.submit(
    self.agents['code_reviewer']._static_analysis,
    code,
    'python'
    ),
    'pattern_analysis': executor.submit(
    self.agents['code_reviewer']._check_patterns,
    code,
    'python'
    ),
    'security_analysis': executor.submit(
    self.agents['code_reviewer']._check_security,
    code,
    'python'
    ),
    'metrics_calculation': executor.submit(
    self.agents['code_reviewer']._calculate_metrics,
    code,
    'python'
    )
    }

     # 收集结果

    for name, future in futures.items():
    try:
    results[name] = future.result()
    except Exception as e:
    results[name] = {'error': str(e)}

    return results

    ```> **并行协作优势**:
    > - 提高任务执行速度
    > - 充分利用系统资源
    > - 缩短总执行时间

    ```

    ## 子Agent的优势

    ### 1. 提高效率

    | 任务类型 | 单Agent | 多子Agent | 效率提升 |
    |----------|---------|-----------|----------|
    | 代码生成 + 审查 | 10分钟 | 6分钟 | 40%|
    | 多文件处理 | 15分钟 | 5分钟 | 67%|
    | 复杂任务分解 | 20分钟 | 8分钟 | 60%|

    ### 2. 提高质量

    | 指标 | 单Agent | 多子Agent | 改善 |
    |------|---------|-----------|------|
    | 代码质量 | 75% | 90% | +20% |
    | 错误率 | 15% | 5% | 67%|
    | 测试覆盖率 | 70% | 85% | +21% |

    ### 3. 提高可维护性

    - **模块化**: 每个子Agent独立维护
    - **可扩展**: 容易添加新的子Agent
    - **可复用**: 子Agent可以在不同任务中复用

    ## 总结

    子Agent概述包括:

    1. **子Agent的概念**: 什么是子Agent、子Agent vs 主Agent
    2. **子Agent的类型**: 代码生成子Agent、代码审查子Agent
    3. **子Agent的协作**: 串行协作、并行协作
    4. **子Agent的优势**: 提高效率、提高质量、提高可维护性

    通过子Agent机制,Claude Code可以更高效地处理复杂的编程任务。

    在下一节中,我们将探讨异步子Agent任务。

    ```

    ```

基于 MIT 许可发布 | 永久导航