26.1 子Agent概述
子Agent概述
子Agent是Claude Code中的一个重要概念,它允许主Agent将复杂任务分解为多个子任务,并委托给专门的子Agent来处理。通过子Agent机制,Claude Code可以更高效地处理复杂的编程任务,提高任务完成的质量和速度。
子Agent的概念
1. 什么是子Agent
子Agent是主Agent的辅助实体,具有以下特点:
- 专业化 : 每个子Agent专注于特定的任务领域
- 独立性 : 子Agent可以独立完成任务,不依赖主Agent的持续干预
- 协作性 : 多个子Agent可以协同工作,完成复杂的任务
- 可复用性 : 子Agent可以在不同的任务中被重复使用
2. 子Agent vs 主Agent
| 特性 | 主Agent | 子Agent |
|---|
职责| 协调和决策| 执行特定任务 交互| 与用户直接交互| 与主Agent交互 范围| 全局视角| 局部视角 自主性| 高| 中等 专业化程度| 通用| 专用
子Agent的类型
1. 代码生成子Agent
示例:代码生成子Agent
用户请求: "创建一个用户认证系统的代码生成子Agent"
Claude Code 生成的子Agent:
python
python
````python
```python
class CodeGenerationAgent:
"""代码生成子Agent"""
def __init__(self, context):
self.context = context
self.templates = {}
self.best_practices = {}
def generate_auth_system(self, requirements):
"""生成认证系统代码"""
# 分析需求
analysis = self._analyze_requirements(requirements)
# 选择模板
template = self._select_template(analysis)
# 生成代码
code = self._generate_code(template, analysis)
# 应用最佳实践
code = self._apply_best_practices(code)
# 验证代码
validation = self._validate_code(code)
return {
'code': code,
'validation': validation,
'documentation': self._generate_documentation(code)
}
def _analyze_requirements(self, requirements):
"""分析需求"""
analysis = {
'auth_methods': [],
'security_level': 'medium',
'features': []
}
if 'jwt' in requirements:
analysis['auth_methods'].append('jwt')
if 'oauth' in requirements:
analysis['auth_methods'].append('oauth')
if '2fa' in requirements:
analysis['features'].append('2fa')
if 'session' in requirements:
analysis['auth_methods'].append('session')
return analysis
def _select_template(self, analysis):
"""选择模板"""
if 'jwt' in analysis['auth_methods']:
return 'jwt_auth_template'
elif 'oauth' in analysis['auth_methods']:
return 'oauth_auth_template'
else:
return 'basic_auth_template'
def _generate_code(self, template, analysis):
"""生成代码"""
if template == 'jwt_auth_template':
return self._generate_jwt_auth(analysis)
elif template == 'oauth_auth_template':
return self._generate_oauth_auth(analysis)
else:
return self._generate_basic_auth(analysis)
def _generate_jwt_auth(self, analysis):
"""生成JWT认证代码"""
code = '''
from flask import Flask, request, jsonify
from flask_jwt_extended import (
JWTManager, create_access_token,
jwt_required, get_jwt_identity
)
from datetime import timedelta
import bcrypt
app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = 'your-secret-key'
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=24)
jwt = JWTManager(app)
class AuthService:
"""认证服务"""
def __init__(self, db):
self.db = db
def register(self, username, password):
"""用户注册"""
# 检查用户名是否存在
if self._user_exists(username):
raise ValueError("Username already exists")
# 哈希密码
password_hash = bcrypt.hashpw(
password.encode('utf-8'),
bcrypt.gensalt()
).decode('utf-8')
# 创建用户
user = User(username=username, password_hash=password_hash)
self.db.session.add(user)
self.db.session.commit()
return user
def login(self, username, password):
"""用户登录"""
# 查找用户
user = User.query.filter_by(username=username).first()
if not user:
raise ValueError("Invalid username or password")
# 验证密码
if not bcrypt.checkpw(
password.encode('utf-8'),
user.password_hash.encode('utf-8')
):
raise ValueError("Invalid username or password")
# 生成访问令牌
access_token = create_access_token(identity=user.id)
return {
'access_token': access_token,
'user': {
'id': user.id,
'username': user.username
}
}
def _user_exists(self, username):
"""检查用户名是否存在"""
return User.query.filter_by(username=username).first() is not None
'''
return code
def _apply_best_practices(self, code):
"""应用最佳实践"""
# 添加错误处理
code = self._add_error_handling(code)
# 添加日志记录
code = self._add_logging(code)
# 添加输入验证
code = self._add_input_validation(code)
return code
def _validate_code(self, code):
"""验证代码"""
validation = {
'syntax': True,
'security': True,
'best_practices': True,
'issues': []
}
# 检查语法
try:
compile(code, '<string>', 'exec')
except SyntaxError as e:
validation['syntax'] = False
validation['issues'].append(f"Syntax error: {e}")
# 检查安全性
if 'SECRET_KEY' in code and 'your-secret-key' in code:
validation['security'] = False
validation['issues'].append("Default secret key detected")
return validation
def _generate_documentation(self, code):
"""生成文档"""
return {
'description': 'JWT-based authentication system',
'endpoints': [
{
'path': '/register',
'method': 'POST',
'description': 'Register a new user',
'parameters': {
'username': 'string (required)',
'password': 'string (required)'
}
},
{
'path': '/login',
'method': 'POST',
'description': 'Login with username and password',
'parameters': {
'username': 'string (required)',
'password': 'string (required)'
}
}
],
'security': {
'password_hashing': 'bcrypt',
'token': 'JWT',
'token_expiry': '24 hours'
}
}
```> **子Agent特点**:
> - 专注于代码生成
> - 包含模板库和最佳实践
> - 自动验证生成的代码
> - 生成配套文档
```
### 2. 代码审查子Agent
# 示例:代码审查子Agent
用户请求:
"创建一个代码审查子Agent"
Claude Code 生成的子Agent:
````python
`python
class CodeReviewAgent:
"""代码审查子Agent"""
def __init__(self, context):
self.context = context
self.rules = self._load_review_rules()
self.patterns = self._load_antipatterns()
def review_code(self, code, language='python'):
"""审查代码"""
review = {
'summary': {},
'issues': [],
'suggestions': [],
'metrics': {}
}
# 执行静态分析
static_analysis = self._static_analysis(code, language)
review['issues'].extend(static_analysis['issues'])
# 检查代码模式
pattern_analysis = self._check_patterns(code, language)
review['issues'].extend(pattern_analysis['issues'])
review['suggestions'].extend(pattern_analysis['suggestions'])
# 计算代码指标
metrics = self._calculate_metrics(code, language)
review['metrics'] = metrics
# 生成总结
review['summary'] = self._generate_summary(review)
return review
def _static_analysis(self, code, language):
"""静态分析"""
issues = []
# 检查未使用的导入
unused_imports = self._check_unused_imports(code, language)
issues.extend(unused_imports)
# 检查未使用的变量
unused_variables = self._check_unused_variables(code, language)
issues.extend(unused_variables)
# 检查代码复杂度
complexity_issues = self._check_complexity(code, language)
issues.extend(complexity_issues)
# 检查安全问题
security_issues = self._check_security(code, language)
issues.extend(security_issues)
return {'issues': issues}
def _check_unused_imports(self, code, language):
"""检查未使用的导入"""
issues = []
if language == 'python':
import ast
import re
tree = ast.parse(code)
# 获取所有导入
imports = set()
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
imports.add(alias.name.split('.')[0])
elif isinstance(node, ast.ImportFrom):
if node.module:
imports.add(node.module.split('.')[0])
# 检查是否使用
for imp in imports:
if imp not in code[code.find(imp):]:
issues.append({
'type': 'unused_import',
'severity': 'warning',
'message': f"Unused import: {imp}",
'line': self._find_line(code, imp)
})
return issues
def _check_complexity(self, code, language):
"""检查代码复杂度"""
issues = []
if language == 'python':
import ast
tree = ast.parse(code)
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
complexity = self._calculate_cyclomatic_complexity(node)
if complexity > 10:
issues.append({
'type': 'high_complexity',
'severity': 'warning',
'message': f"Function '{node.name}' has high complexity ({complexity})",
'line': node.lineno,
'complexity': complexity
})
return issues
def _calculate_cyclomatic_complexity(self, node):
"""计算圈复杂度"""
complexity = 1
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For, ast.AsyncFor)):
complexity += 1
elif isinstance(child, ast.ExceptHandler):
complexity += 1
elif isinstance(child, ast.BoolOp):
complexity += len(child.values) - 1
return complexity
def _check_security(self, code, language):
"""检查安全问题"""
issues = []
# 检查硬编码密钥
if 'password' in code.lower() and '=' in code:
lines = code.split('\n')
for i, line in enumerate(lines, 1):
if 'password' in line.lower() and '"' in line:
issues.append({
'type': 'security',
'severity': 'error',
'message': 'Possible hardcoded password detected',
'line': i
})
# 检查SQL注入风险
if 'execute' in code and '%' in code:
lines = code.split('\n')
for i, line in enumerate(lines, 1):
if 'execute' in line and '%' in line and not 'param' in line.lower():
issues.append({
'type': 'security',
'severity': 'error',
'message': 'Possible SQL injection vulnerability',
'line': i
})
return issues
def _check_patterns(self, code, language):
"""检查代码模式"""
issues = []
suggestions = []
# 检查长函数
if language == 'python':
import ast
tree = ast.parse(code)
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
end_line = node.end_lineno if hasattr(node, 'end_lineno') else node.lineno
length = end_line - node.lineno
if length > 50:
issues.append({
'type': 'long_function',
'severity': 'warning',
'message': f"Function '{node.name}' is too long ({length} lines)",
'line': node.lineno,
'length': length
})
suggestions.append({
'type': 'refactor',
'message': f"Consider breaking down '{node.name}' into smaller functions",
'line': node.lineno
})
return {'issues': issues, 'suggestions': suggestions}
def _calculate_metrics(self, code, language):
"""计算代码指标"""
metrics = {}
lines = code.split('\n')
# 总行数
metrics['total_lines'] = len(lines)
# 代码行数(排除空行和注释)
code_lines = [line for line in lines if line.strip() and not line.strip().startswith('#')]
metrics['code_lines'] = len(code_lines)
# 注释行数
comment_lines = [line for line in lines if line.strip().startswith('#')]
metrics['comment_lines'] = len(comment_lines)
# 空行数
blank_lines = [line for line in lines if not line.strip()]
metrics['blank_lines'] = len(blank_lines)
# 注释率
metrics['comment_ratio'] = metrics['comment_lines'] / metrics['code_lines'] if metrics['code_lines'] > 0 else 0
return metrics
def _generate_summary(self, review):
"""生成审查总结"""
summary = {
'total_issues': len(review['issues']),
'critical_issues': len([i for i in review['issues'] if i['severity'] == 'error']),
'warning_issues': len([i for i in review['issues'] if i['severity'] == 'warning']),
'total_suggestions': len(review['suggestions']),
'code_quality': 'good'
}
# 评估代码质量
if summary['critical_issues'] > 0:
summary['code_quality'] = 'poor'
elif summary['warning_issues'] > 5:
summary['code_quality'] = 'fair'
return summary
```> **子Agent特点**:
> - 专注于代码审查
> - 包含审查规则库
> - 检查代码模式和反模式
> - 计算代码指标
```
## 子Agent的协作
### 1. 串行协作
# 示例:子Agent串行协作
用户请求:
"使用子Agent完成一个完整的开发任务"
Claude Code 生成的协作流程:
````python
`python
class AgentOrchestrator:
"""Agent协调器"""
def __init__(self, context):
self.context = context
self.agents = {
'code_generator': CodeGenerationAgent(context),
'code_reviewer': CodeReviewAgent(context),
'test_generator': TestGenerationAgent(context),
'documenter': DocumentationAgent(context)
}
def execute_task(self, task):
"""执行任务"""
results = {}
# 步骤1: 生成代码
print("Step 1: Generating code...")
code_result = self.agents['code_generator'].generate_code(task)
results['code'] = code_result
# 步骤2: 审查代码
print("Step 2: Reviewing code...")
review_result = self.agents['code_reviewer'].review_code(
code_result['code'],
language=task.get('language', 'python')
)
results['review'] = review_result
# 步骤3: 修复问题
if review_result['summary']['critical_issues'] > 0:
print("Step 3: Fixing critical issues...")
fixed_code = self._fix_issues(
code_result['code'],
review_result['issues']
)
results['fixed_code'] = fixed_code
else:
results['fixed_code'] = code_result['code']
# 步骤4: 生成测试
print("Step 4: Generating tests...")
test_result = self.agents['test_generator'].generate_tests(
results['fixed_code'],
language=task.get('language', 'python')
)
results['tests'] = test_result
# 步骤5: 生成文档
print("Step 5: Generating documentation...")
doc_result = self.agents['documenter'].generate_documentation(
results['fixed_code'],
test_result['tests']
)
results['documentation'] = doc_result
return results
def _fix_issues(self, code, issues):
"""修复代码问题"""
fixed_code = code
for issue in issues:
if issue['type'] == 'security':
fixed_code = self._fix_security_issue(fixed_code, issue)
elif issue['type'] == 'high_complexity':
fixed_code = self._refactor_complex_function(fixed_code, issue)
return fixed_code
def _fix_security_issue(self, code, issue):
"""修复安全问题"""
# 实现安全修复逻辑
return code
def _refactor_complex_function(self, code, issue):
"""重构复杂函数"""
# 实现重构逻辑
return code
```> **协作流程**:
1. 代码生成子Agent生成代码
2. 代码审查子Agent审查代码
3. 修复发现的问题
4. 测试生成子Agent生成测试
5. 文档生成子Agent生成文档
```
### 2. 并行协作
# 示例:子Agent并行协作
用户请求:
"使用子Agent并行处理多个任务"
Claude Code 生成的并行协作流程:
````python
`python
import concurrent.futures
class ParallelAgentOrchestrator:
"""并行Agent协调器"""
def __init__(self, context):
self.context = context
self.agents = {
'code_generator': CodeGenerationAgent(context),
'code_reviewer': CodeReviewAgent(context),
'test_generator': TestGenerationAgent(context),
'documenter': DocumentationAgent(context)
}
def execute_parallel_tasks(self, tasks):
"""并行执行任务"""
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# 提交任务
future_to_task = {
executor.submit(self.agents['code_generator'].generate_code, task): task
for task in tasks
}
# 收集结果
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
result = future.result()
results[task['id']] = result
except Exception as e:
results[task['id']] = {'error': str(e)}
return results
def execute_parallel_analysis(self, code):
"""并行分析代码"""
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# 提交分析任务
futures = {
'static_analysis': executor.submit(
self.agents['code_reviewer']._static_analysis,
code,
'python'
),
'pattern_analysis': executor.submit(
self.agents['code_reviewer']._check_patterns,
code,
'python'
),
'security_analysis': executor.submit(
self.agents['code_reviewer']._check_security,
code,
'python'
),
'metrics_calculation': executor.submit(
self.agents['code_reviewer']._calculate_metrics,
code,
'python'
)
}
# 收集结果
for name, future in futures.items():
try:
results[name] = future.result()
except Exception as e:
results[name] = {'error': str(e)}
return results
```> **并行协作优势**:
> - 提高任务执行速度
> - 充分利用系统资源
> - 缩短总执行时间
```
## 子Agent的优势
### 1. 提高效率
| 任务类型 | 单Agent | 多子Agent | 效率提升 |
|----------|---------|-----------|----------|
| 代码生成 + 审查 | 10分钟 | 6分钟 | 40% ↑ |
| 多文件处理 | 15分钟 | 5分钟 | 67% ↑ |
| 复杂任务分解 | 20分钟 | 8分钟 | 60% ↑ |
### 2. 提高质量
| 指标 | 单Agent | 多子Agent | 改善 |
|------|---------|-----------|------|
| 代码质量 | 75% | 90% | +20% |
| 错误率 | 15% | 5% | 67% ↓ |
| 测试覆盖率 | 70% | 85% | +21% |
### 3. 提高可维护性
- **模块化**: 每个子Agent独立维护
- **可扩展**: 容易添加新的子Agent
- **可复用**: 子Agent可以在不同任务中复用
## 总结
子Agent概述包括:
1. **子Agent的概念**: 什么是子Agent、子Agent vs 主Agent
2. **子Agent的类型**: 代码生成子Agent、代码审查子Agent
3. **子Agent的协作**: 串行协作、并行协作
4. **子Agent的优势**: 提高效率、提高质量、提高可维护性
通过子Agent机制,Claude Code可以更高效地处理复杂的编程任务。
在下一节中,我们将探讨异步子Agent任务。
```
```